final class AsyncQueue[A] extends AnyRef
A high-performance, back-pressured, asynchronous queue implementation.
This is the impure, future-enabled version of monix.catnap.ConcurrentQueue.
Example
import monix.execution.Scheduler.Implicits.global val queue = AsyncQueue(capacity = 32) def producer(n: Int): CancelableFuture[Unit] = queue.offer(n).flatMap { _ => if (n >= 0) producer(n - 1) else CancelableFuture.unit } def consumer(index: Int): CancelableFuture[Unit] = queue.poll().flatMap { a => println(s"Worker $$index: $$a") }
Back-Pressuring and the Polling Model
The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via AsyncQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the AsyncQueue.withConfig builder.
On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.
For both offer and poll, in case awaiting a result happens, the
implementation does so asynchronously, without any threads being blocked.
Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.
Multi-threading Scenario
This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario — and this can yield better performance:
- ChannelType.MPMC: multi-producer, multi-consumer
 - ChannelType.MPSC: multi-producer, single-consumer
 - ChannelType.SPMC: single-producer, multi-consumer
 - ChannelType.SPSC: single-producer, single-consumer
 
The default is MPMC, because that's the safest scenario.
import monix.execution.ChannelType.MPSC val queue = AsyncQueue( capacity = 64, channelType = MPSC )
WARNING: default is MPMC, however any other scenario implies
a relaxation of the internal synchronization between threads.
This means that using the wrong scenario can lead to severe
concurrency bugs. If you're not sure what multi-threading scenario you
have, then just stick with the default MPMC.
- Alphabetic
 - By Inheritance
 
- AsyncQueue
 - AnyRef
 - Any
 
- Hide All
 - Show All
 
- Public
 - All
 
Value Members
- 
      
      
      
        
      
    
      
        final 
        def
      
      
        !=(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ##(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ==(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        asInstanceOf[T0]: T0
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        clear(): Unit
      
      
      
Removes all items from the queue.
Removes all items from the queue.
Called from the consumer thread, subject to the restrictions appropriate to the implementation indicated by ChannelType.
WARNING: the
clearoperation should be done on the consumer side, so it must be called from the same thread(s) that call poll.- Annotations
 - @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        clone(): AnyRef
      
      
      
- Attributes
 - protected[java.lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @native() @throws( ... )
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        drain(minLength: Int, maxLength: Int): CancelableFuture[Seq[A]]
      
      
      
Fetches multiple elements from the queue, if available.
Fetches multiple elements from the queue, if available.
This operation back-pressures until the
minLengthrequirement is achieved.- minLength
 specifies the minimum length of the returned sequence; the operation back-pressures until this length is satisfied
- maxLength
 is the capacity of the used buffer, being the max length of the returned sequence
- returns
 a future with a sequence of length between minLength and maxLength; it can also be cancelled, interrupting the wait
- Annotations
 - @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        eq(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        equals(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        finalize(): Unit
      
      
      
- Attributes
 - protected[java.lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @throws( classOf[java.lang.Throwable] )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        getClass(): Class[_]
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        hashCode(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        isInstanceOf[T0]: Boolean
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ne(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notify(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notifyAll(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        offer(a: A): CancelableFuture[Unit]
      
      
      
Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.
Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.
- returns
 a CancelableFuture that will eventually complete when the push has succeeded; it can also be cancelled, interrupting the waiting
- Annotations
 - @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        offerMany(seq: Iterable[A]): CancelableFuture[Unit]
      
      
      
Pushes multiple values in the queue.
Pushes multiple values in the queue. Back-pressures if the queue is full.
- returns
 a CancelableFuture that will eventually complete when the push has succeeded; it can also be cancelled, interrupting the waiting
- Annotations
 - @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        poll(): CancelableFuture[A]
      
      
      
Fetches a value from the queue, or if the queue is empty continuously polls the queue until a value is made available.
Fetches a value from the queue, or if the queue is empty continuously polls the queue until a value is made available.
- returns
 a CancelableFuture that will eventually complete with a value; it can also be cancelled, interrupting the waiting
- Annotations
 - @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        synchronized[T0](arg0: ⇒ T0): T0
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        toString(): String
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        tryOffer(a: A): Boolean
      
      
      
Try pushing a value to the queue.
Try pushing a value to the queue.
The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.
- a
 is the value pushed in the queue
- returns
 trueif the operation succeeded, orfalseif the queue is full and cannot accept any more elements
- Annotations
 - @UnsafeProtocol() @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        tryPoll(): Option[A]
      
      
      
Try pulling a value out of the queue.
Try pulling a value out of the queue.
The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.
- returns
 Some(a)in case a value was successfully retrieved from the queue, orNonein case the queue is empty
- Annotations
 - @UnsafeProtocol() @UnsafeBecauseImpure()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long, arg1: Int): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native() @throws( ... )