package execution
- Alphabetic
- Public
- All
Type Members
-
sealed abstract
class
Ack extends Future[Ack] with Serializable
Represents an acknowledgement of processing that a consumer sends back upstream.
Represents an acknowledgement of processing that a consumer sends back upstream. Useful to implement back-pressure.
-
final
class
AsyncQueue[A] extends AnyRef
A high-performance, back-pressured, asynchronous queue implementation.
A high-performance, back-pressured, asynchronous queue implementation.
This is the impure, future-enabled version of monix.catnap.ConcurrentQueue.
Example
import monix.execution.Scheduler.Implicits.global val queue = AsyncQueue(capacity = 32) def producer(n: Int): CancelableFuture[Unit] = queue.offer(n).flatMap { _ => if (n >= 0) producer(n - 1) else CancelableFuture.unit } def consumer(index: Int): CancelableFuture[Unit] = queue.poll().flatMap { a => println(s"Worker $$index: $$a") }
Back-Pressuring and the Polling Model
The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via AsyncQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the AsyncQueue.withConfig builder.
On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.
For both
offerandpoll, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.
Multi-threading Scenario
This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario — and this can yield better performance:
- ChannelType.MPMC: multi-producer, multi-consumer
- ChannelType.MPSC: multi-producer, single-consumer
- ChannelType.SPMC: single-producer, multi-consumer
- ChannelType.SPSC: single-producer, single-consumer
The default is
MPMC, because that's the safest scenario.import monix.execution.ChannelType.MPSC val queue = AsyncQueue( capacity = 64, channelType = MPSC )
WARNING: default is
MPMC, however any other scenario implies a relaxation of the internal synchronization between threads.This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default
MPMC. -
final
class
AsyncSemaphore extends GenericSemaphore[Cancelable]
The
AsyncSemaphoreis an asynchronous semaphore implementation that limits the parallelism onFutureexecution.The
AsyncSemaphoreis an asynchronous semaphore implementation that limits the parallelism onFutureexecution.The following example instantiates a semaphore with a maximum parallelism of 10:
val semaphore = AsyncSemaphore(maxParallelism = 10) def makeRequest(r: HttpRequest): Future[HttpResponse] = ??? // For such a task no more than 10 requests // are allowed to be executed in parallel. val future = semaphore.greenLight(() => makeRequest(???))
-
final
class
AsyncVar[A] extends GenericVar[A, Cancelable]
Asynchronous mutable location, that is either empty or contains a value of type
A.Asynchronous mutable location, that is either empty or contains a value of type
A.It has these fundamental atomic operations:
- put which fills the var if empty, or waits (asynchronously) otherwise until the var is empty again (with the putByCallback overload)
- tryPut which fills the var if empty, returning
trueif it succeeded, or returning immediatelyfalsein case the var was full and thus the operation failed - take which empties the var if full, returning the contained value, or waits (asynchronously) otherwise until there is a value to pull (with the takeByCallback overload)
- tryTake which empties the var if full, returning the
contained value immediately as
Some(a), or otherwise returningNonein case the var was empty and thus the operation failed - read which reads the var if full, but without taking it from the interval var, or waits (asynchronously) until there is a value to read
- tryRead tries reading the var without modifying it in
any way; if full then returns
Some(a), orNoneif empty
The
AsyncVaris appropriate for building synchronization primitives and performing simple inter-thread communications. If it helps, it's similar with aBlockingQueue(capacity = 1), except that it doesn't block any threads, all waiting being callback-based.Given its asynchronous, non-blocking nature, it can be used on top of Javascript as well.
This is inspired by Control.Concurrent.MVar from Haskell, except that the implementation is made to work with plain Scala futures (and is thus impure).
-
sealed abstract
class
BufferCapacity extends Product with Serializable
Describes the capacity of internal buffers.
Describes the capacity of internal buffers.
For abstractions that use an internal buffer, like AsyncQueue, this type provides the info required to build the internal buffer.
-
abstract
class
Callback[-E, -A] extends (Either[E, A]) ⇒ Unit
Represents a callback that should be called asynchronously with the result of a computation.
Represents a callback that should be called asynchronously with the result of a computation.
This is an
Either[E, A] => Unitwith an OOP interface that avoids extra boxing, along with overloads ofapply.The
onSuccessmethod should be called only once, with the successful result, whereasonErrorshould be called if the result is an error.Obviously
Callbackdescribes unsafe side-effects, a fact that is highlighted by the usage ofUnitas the return type. Obviously callbacks are unsafe to use in pure code, but are necessary for describing asynchronous processes. -
trait
Cancelable extends Serializable
Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.
Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.
It is equivalent to
java.io.Closeable, but without the I/O focus, or toIDisposablein Microsoft .NET, or toakka.actor.Cancellable. -
sealed abstract
class
CancelableFuture[+A] extends Future[A] with Cancelable
Represents an asynchronous computation that can be canceled as long as it isn't complete.
-
sealed abstract
class
CancelablePromise[A] extends Promise[A]
CancelablePromiseis a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.CancelablePromiseis a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.It does so by:
- adding a low-level subscribe method, that allows for callbacks to be subscribed
- returning CancelableFuture in its future method implementation, allowing created future objects to unsubscribe (being the high-level subscribe that should be preferred for most usage)
Being able to unsubscribe listeners helps with avoiding memory leaks in case of listeners or futures that are being timed-out due to promises that take a long time to complete.
-
sealed abstract
class
ChannelType extends Serializable
An enumeration of all types
-
sealed abstract
class
ExecutionModel extends Product with Serializable
Specification for run-loops, imposed by the
Scheduler.Specification for run-loops, imposed by the
Scheduler.When executing tasks, a run-loop can always execute tasks asynchronously (by forking logical threads), or it can always execute them synchronously (same thread and call-stack, by using an internal trampoline), or it can do a mixed mode that executes tasks in batches before forking.
The specification is considered a recommendation for how run loops should behave, but ultimately it's up to the client to choose the best execution model. This can be related to recursive loops or to events pushed into consumers.
-
trait
Scheduler extends ExecutionContext with UncaughtExceptionReporter with Executor
A Scheduler is an
scala.concurrent.ExecutionContextthat additionally can schedule the execution of units of work to run with a delay or periodically.A Scheduler is an
scala.concurrent.ExecutionContextthat additionally can schedule the execution of units of work to run with a delay or periodically.- Annotations
- @implicitNotFound( ... )
-
trait
UncaughtExceptionReporter extends Serializable
An exception reporter is a function that logs an uncaught error.
An exception reporter is a function that logs an uncaught error.
Usually taken as an implicit when executing computations that could fail, but that must not blow up the call-stack, like asynchronous tasks.
A default implicit is provided that simply logs the error on STDERR.
- Annotations
- @implicitNotFound( ... )
Value Members
- object Ack extends Serializable
- object AsyncQueue
- object AsyncSemaphore extends Serializable
- object AsyncVar
- object BufferCapacity extends Serializable
- object Callback
- object Cancelable extends Serializable
- object CancelableFuture extends CancelableFutureForPlatform with Serializable
- object CancelablePromise
- object ChannelType extends Serializable
- object ExecutionModel extends Serializable
-
object
FutureUtils extends FutureUtilsForPlatform
Utilities for Scala's standard
concurrent.Future. - object Scheduler extends SchedulerCompanionImpl with Serializable
- object UncaughtExceptionReporter extends Serializable
- object compat