Packages

  • package root
    Definition Classes
    root
  • package monix
    Definition Classes
    root
  • package execution
    Definition Classes
    monix
  • package annotations
    Definition Classes
    execution
  • package atomic

    A small toolkit of classes that support compare-and-swap semantics for safe mutation of variables.

    A small toolkit of classes that support compare-and-swap semantics for safe mutation of variables.

    On top of the JVM, this means dealing with lock-free thread-safe programming. Also works on top of Javascript, with Scala.js, for API compatibility purposes and because it's a useful way to box a value.

    The backbone of Atomic references is this method:

    def compareAndSet(expect: T, update: T): Boolean

    This method atomically sets a variable to the update value if it currently holds the expect value, reporting true on success or false on failure. The classes in this package also contain methods to get and unconditionally set values.

    Building a reference is easy with the provided constructor, which will automatically return the most specific type needed (in the following sample, that's an AtomicDouble, inheriting from AtomicNumber[A]):

    val atomicNumber = Atomic(12.2)
    
    atomicNumber.incrementAndGet()
    // => 13.2

    These also provide useful helpers for atomically mutating of values (i.e. transform, transformAndGet, getAndTransform, etc...) or of numbers of any kind (incrementAndGet, getAndAdd, etc...).

    Definition Classes
    execution
  • package cancelables

    Cancelables represent asynchronous units of work or other things scheduled for execution and whose execution can be canceled.

    Cancelables represent asynchronous units of work or other things scheduled for execution and whose execution can be canceled.

    One use-case is the scheduling done by monix.execution.Scheduler, in which the scheduling methods return a Cancelable, allowing the canceling of the scheduling.

    Example:

    val s = ConcurrentScheduler()
    val task = s.scheduleRepeated(10.seconds, 50.seconds, {
      doSomething()
    })
    
    // later, cancels the scheduling ...
    task.cancel()
    Definition Classes
    execution
  • package exceptions
    Definition Classes
    execution
  • package internal
    Definition Classes
    execution
  • package misc
    Definition Classes
    execution
  • package rstreams

    Package exposing utilities for working with the Reactive Streams specification.

    Package exposing utilities for working with the Reactive Streams specification.

    Definition Classes
    execution
  • package schedulers
    Definition Classes
    execution
  • Ack
  • AsyncQueue
  • AsyncSemaphore
  • AsyncVar
  • BufferCapacity
  • Callback
  • Cancelable
  • CancelableFuture
  • CancelablePromise
  • ChannelType
  • ExecutionModel
  • FutureUtils
  • Scheduler
  • UncaughtExceptionReporter
  • compat

final class AsyncQueue[A] extends AnyRef

A high-performance, back-pressured, asynchronous queue implementation.

This is the impure, future-enabled version of monix.catnap.ConcurrentQueue.

Example

import monix.execution.Scheduler.Implicits.global

val queue = AsyncQueue(capacity = 32)

def producer(n: Int): CancelableFuture[Unit] =
  queue.offer(n).flatMap { _ =>
    if (n >= 0) producer(n - 1)
    else CancelableFuture.unit
  }

def consumer(index: Int): CancelableFuture[Unit] =
  queue.poll().flatMap { a =>
    println(s"Worker $$index: $$a")
  }

Back-Pressuring and the Polling Model

The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via AsyncQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the AsyncQueue.withConfig builder.

On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.

For both offer and poll, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.

Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.

Multi-threading Scenario

This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario — and this can yield better performance:

The default is MPMC, because that's the safest scenario.

import monix.execution.ChannelType.MPSC

val queue = AsyncQueue(
  capacity = 64,
  channelType = MPSC
)

WARNING: default is MPMC, however any other scenario implies a relaxation of the internal synchronization between threads.

This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default MPMC.

Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. AsyncQueue
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clear(): Unit

    Removes all items from the queue.

    Removes all items from the queue.

    Called from the consumer thread, subject to the restrictions appropriate to the implementation indicated by ChannelType.

    WARNING: the clear operation should be done on the consumer side, so it must be called from the same thread(s) that call poll.

    Annotations
    @UnsafeBecauseImpure()
  6. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @native() @throws( ... )
  7. def drain(minLength: Int, maxLength: Int): CancelableFuture[Seq[A]]

    Fetches multiple elements from the queue, if available.

    Fetches multiple elements from the queue, if available.

    This operation back-pressures until the minLength requirement is achieved.

    minLength

    specifies the minimum length of the returned sequence; the operation back-pressures until this length is satisfied

    maxLength

    is the capacity of the used buffer, being the max length of the returned sequence

    returns

    a future with a sequence of length between minLength and maxLength; it can also be cancelled, interrupting the wait

    Annotations
    @UnsafeBecauseImpure()
  8. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  9. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  10. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  11. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  12. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  13. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  14. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  15. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  16. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  17. def offer(a: A): CancelableFuture[Unit]

    Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.

    Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.

    returns

    a CancelableFuture that will eventually complete when the push has succeeded; it can also be cancelled, interrupting the waiting

    Annotations
    @UnsafeBecauseImpure()
  18. def offerMany(seq: Iterable[A]): CancelableFuture[Unit]

    Pushes multiple values in the queue.

    Pushes multiple values in the queue. Back-pressures if the queue is full.

    returns

    a CancelableFuture that will eventually complete when the push has succeeded; it can also be cancelled, interrupting the waiting

    Annotations
    @UnsafeBecauseImpure()
  19. def poll(): CancelableFuture[A]

    Fetches a value from the queue, or if the queue is empty continuously polls the queue until a value is made available.

    Fetches a value from the queue, or if the queue is empty continuously polls the queue until a value is made available.

    returns

    a CancelableFuture that will eventually complete with a value; it can also be cancelled, interrupting the waiting

    Annotations
    @UnsafeBecauseImpure()
  20. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  21. def toString(): String
    Definition Classes
    AnyRef → Any
  22. def tryOffer(a: A): Boolean

    Try pushing a value to the queue.

    Try pushing a value to the queue.

    The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.

    a

    is the value pushed in the queue

    returns

    true if the operation succeeded, or false if the queue is full and cannot accept any more elements

    Annotations
    @UnsafeProtocol() @UnsafeBecauseImpure()
  23. def tryPoll(): Option[A]

    Try pulling a value out of the queue.

    Try pulling a value out of the queue.

    The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.

    returns

    Some(a) in case a value was successfully retrieved from the queue, or None in case the queue is empty

    Annotations
    @UnsafeProtocol() @UnsafeBecauseImpure()
  24. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  25. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  26. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @throws( ... )

Inherited from AnyRef

Inherited from Any

Ungrouped