trait ProducerF[F[_], E, A] extends Serializable
A simple interface that models the producer side of a producer-consumer communication channel.
In a producer-consumer communication channel we've got these concerns to take care of:
- back-pressure, which is handled automatically via this interface
- halting the channel with a final event and informing all current and future consumers about it, while stopping future producers from pushing more events
The ProducerF
interface takes care of these concerns via:
- the
F[Boolean]
result, which should returntrue
for as long as the channel wasn't halted, so further events can be pushed; these tasks also block (asynchronously) when internal buffers are full, so back-pressure concerns are handled automatically - halt, being able to close the channel with a final event that will be visible to all current and future consumers
Currently implemented by ConcurrentChannel.
- Alphabetic
- By Inheritance
- ProducerF
- Serializable
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Abstract Value Members
-
abstract
def
awaitConsumers(n: Int): F[Boolean]
Awaits for the specified number of consumers to be connected.
Awaits for the specified number of consumers to be connected.
This is an utility to ensure that a certain number of consumers are connected before we start emitting events.
- n
is a number indicating the number of consumers that need to be connected before the returned task completes
- returns
a task that will complete only after the required number of consumers are observed as being connected to the channel
-
abstract
def
halt(e: E): F[Unit]
Closes the communication channel with a message that will be visible to all current and future consumers.
Closes the communication channel with a message that will be visible to all current and future consumers.
Note that if multiple
halt
events happen, then only the first one will be taken into account, all otherhalt
messages are ignored. -
abstract
def
push(a: A): F[Boolean]
Publishes an event on the channel.
Publishes an event on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space for pushing the message, or until the channel was halted, whichever comes first
- returns
true
in case the message was pushed in the internal buffer orfalse
in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Example:
import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { if (from + 1 < until) channel.push(from).flatMap { case true => // keep going range(channel, from + 1, until) case false => // channel was halted by another producer loop, so stopping F.unit } else // we are done, publish the final event channel.halt(from + 1).as(()) } else { F.unit // invalid range } }
- a
is the message to publish
- returns
true
in case the message was published successfully, orfalse
in case the channel was halted by another producer
-
abstract
def
pushMany(seq: Iterable[A]): F[Boolean]
Publishes multiple events on the channel.
Publishes multiple events on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space and the whole sequence was published, or until the channel was halted, in which case no further messages are allowed for being pushed
- returns
true
in case the whole sequence was pushed in the internal b orfalse
in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Note: implementations may try to push events one by one. This is not an atomic operation. In case of concurrent producers, there's absolutely no guarantee for the order of messages coming from multiple producers. Also in case the channel is halted (and the resulting task returns
false
), the outcome can be that the sequence gets published partially.import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { val to = until - 1 channel.pushMany(Range(from, to)).flatMap { case true => channel.halt(to).as(()) // final event case false => // channel was halted by a concurrent producer, so stop F.unit } } else { F.unit // invalid range } }
- seq
is the sequence of messages to publish on the channel
- returns
true
in case the message was published successfully, orfalse
in case the channel was halted by another producer
Concrete Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )