concurrent_queue/
lib.rs

1//! A concurrent multi-producer multi-consumer queue.
2//!
3//! There are two kinds of queues:
4//!
5//! 1. [Bounded] queue with limited capacity.
6//! 2. [Unbounded] queue with unlimited capacity.
7//!
8//! Queues also have the capability to get [closed] at any point. When closed, no more items can be
9//! pushed into the queue, although the remaining items can still be popped.
10//!
11//! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
12//! crate.
13//!
14//! # Examples
15//!
16//! ```
17//! use concurrent_queue::ConcurrentQueue;
18//!
19//! let q = ConcurrentQueue::unbounded();
20//! q.push(1).unwrap();
21//! q.push(2).unwrap();
22//!
23//! assert_eq!(q.pop(), Ok(1));
24//! assert_eq!(q.pop(), Ok(2));
25//! ```
26//!
27//! # Features
28//!
29//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
30//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
31//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
32//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
33//!
34//! There is also a `portable-atomic` feature, which uses a polyfill from the
35//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
36//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
37//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
38//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
39//! information.
40//!
41//! [Bounded]: `ConcurrentQueue::bounded()`
42//! [Unbounded]: `ConcurrentQueue::unbounded()`
43//! [closed]: `ConcurrentQueue::close()`
44//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
45//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
46
47#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
48#![no_std]
49#![doc(
50    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
51)]
52#![doc(
53    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55
56extern crate alloc;
57#[cfg(feature = "std")]
58extern crate std;
59
60use core::fmt;
61use core::panic::{RefUnwindSafe, UnwindSafe};
62use sync::atomic::{self, Ordering};
63
64#[cfg(feature = "std")]
65use std::error;
66
67use crate::bounded::Bounded;
68use crate::single::Single;
69use crate::sync::busy_wait;
70use crate::unbounded::Unbounded;
71
72mod bounded;
73mod single;
74mod unbounded;
75
76mod sync;
77
78/// Make the given function const if the given condition is true.
79macro_rules! const_fn {
80    (
81        const_if: #[cfg($($cfg:tt)+)];
82        $(#[$($attr:tt)*])*
83        $vis:vis const fn $($rest:tt)*
84    ) => {
85        #[cfg($($cfg)+)]
86        $(#[$($attr)*])*
87        $vis const fn $($rest)*
88        #[cfg(not($($cfg)+))]
89        $(#[$($attr)*])*
90        $vis fn $($rest)*
91    };
92}
93
94pub(crate) use const_fn;
95
96/// A concurrent queue.
97///
98/// # Examples
99///
100/// ```
101/// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
102///
103/// let q = ConcurrentQueue::bounded(2);
104///
105/// assert_eq!(q.push('a'), Ok(()));
106/// assert_eq!(q.push('b'), Ok(()));
107/// assert_eq!(q.push('c'), Err(PushError::Full('c')));
108///
109/// assert_eq!(q.pop(), Ok('a'));
110/// assert_eq!(q.pop(), Ok('b'));
111/// assert_eq!(q.pop(), Err(PopError::Empty));
112/// ```
113pub struct ConcurrentQueue<T>(Inner<T>);
114
115unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
116unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
117
118impl<T> UnwindSafe for ConcurrentQueue<T> {}
119impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
120
121#[allow(clippy::large_enum_variant)]
122enum Inner<T> {
123    Single(Single<T>),
124    Bounded(Bounded<T>),
125    Unbounded(Unbounded<T>),
126}
127
128impl<T> ConcurrentQueue<T> {
129    /// Creates a new bounded queue.
130    ///
131    /// The queue allocates enough space for `cap` items.
132    ///
133    /// # Panics
134    ///
135    /// If the capacity is zero, this constructor will panic.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// use concurrent_queue::ConcurrentQueue;
141    ///
142    /// let q = ConcurrentQueue::<i32>::bounded(100);
143    /// ```
144    pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
145        if cap == 1 {
146            ConcurrentQueue(Inner::Single(Single::new()))
147        } else {
148            ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
149        }
150    }
151
152    const_fn!(
153        const_if: #[cfg(not(loom))];
154        /// Creates a new unbounded queue.
155        ///
156        /// # Examples
157        ///
158        /// ```
159        /// use concurrent_queue::ConcurrentQueue;
160        ///
161        /// let q = ConcurrentQueue::<i32>::unbounded();
162        /// ```
163        pub const fn unbounded() -> ConcurrentQueue<T> {
164            ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
165        }
166    );
167
168    /// Attempts to push an item into the queue.
169    ///
170    /// If the queue is full or closed, the item is returned back as an error.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// use concurrent_queue::{ConcurrentQueue, PushError};
176    ///
177    /// let q = ConcurrentQueue::bounded(1);
178    ///
179    /// // Push succeeds because there is space in the queue.
180    /// assert_eq!(q.push(10), Ok(()));
181    ///
182    /// // Push errors because the queue is now full.
183    /// assert_eq!(q.push(20), Err(PushError::Full(20)));
184    ///
185    /// // Close the queue, which will prevent further pushes.
186    /// q.close();
187    ///
188    /// // Pushing now errors indicating the queue is closed.
189    /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
190    ///
191    /// // Pop the single item in the queue.
192    /// assert_eq!(q.pop(), Ok(10));
193    ///
194    /// // Even though there is space, no more items can be pushed.
195    /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
196    /// ```
197    pub fn push(&self, value: T) -> Result<(), PushError<T>> {
198        match &self.0 {
199            Inner::Single(q) => q.push(value),
200            Inner::Bounded(q) => q.push(value),
201            Inner::Unbounded(q) => q.push(value),
202        }
203    }
204
205    /// Push an element into the queue, potentially displacing another element.
206    ///
207    /// Attempts to push an element into the queue. If the queue is full, one item from the
208    /// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
209    /// If the queue is closed, an error is returned.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
215    ///
216    /// let q = ConcurrentQueue::bounded(3);
217    ///
218    /// // We can push to the queue.
219    /// for i in 1..=3 {
220    ///     assert_eq!(q.force_push(i), Ok(None));
221    /// }
222    ///
223    /// // Push errors because the queue is now full.
224    /// assert_eq!(q.push(4), Err(PushError::Full(4)));
225    ///
226    /// // Pushing a new value replaces the old ones.
227    /// assert_eq!(q.force_push(5), Ok(Some(1)));
228    /// assert_eq!(q.force_push(6), Ok(Some(2)));
229    ///
230    /// // Close the queue to stop further pushes.
231    /// q.close();
232    ///
233    /// // Pushing will return an error.
234    /// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
235    ///
236    /// // Popping items will return the force-pushed ones.
237    /// assert_eq!(q.pop(), Ok(3));
238    /// assert_eq!(q.pop(), Ok(5));
239    /// assert_eq!(q.pop(), Ok(6));
240    /// ```
241    pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
242        match &self.0 {
243            Inner::Single(q) => q.force_push(value),
244            Inner::Bounded(q) => q.force_push(value),
245            Inner::Unbounded(q) => match q.push(value) {
246                Ok(()) => Ok(None),
247                Err(PushError::Closed(value)) => Err(ForcePushError(value)),
248                Err(PushError::Full(_)) => unreachable!(),
249            },
250        }
251    }
252
253    /// Attempts to pop an item from the queue.
254    ///
255    /// If the queue is empty, an error is returned.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// use concurrent_queue::{ConcurrentQueue, PopError};
261    ///
262    /// let q = ConcurrentQueue::bounded(1);
263    ///
264    /// // Pop errors when the queue is empty.
265    /// assert_eq!(q.pop(), Err(PopError::Empty));
266    ///
267    /// // Push one item and close the queue.
268    /// assert_eq!(q.push(10), Ok(()));
269    /// q.close();
270    ///
271    /// // Remaining items can be popped.
272    /// assert_eq!(q.pop(), Ok(10));
273    ///
274    /// // Again, pop errors when the queue is empty,
275    /// // but now also indicates that the queue is closed.
276    /// assert_eq!(q.pop(), Err(PopError::Closed));
277    /// ```
278    pub fn pop(&self) -> Result<T, PopError> {
279        match &self.0 {
280            Inner::Single(q) => q.pop(),
281            Inner::Bounded(q) => q.pop(),
282            Inner::Unbounded(q) => q.pop(),
283        }
284    }
285
286    /// Get an iterator over the items in the queue.
287    ///
288    /// The iterator will continue until the queue is empty or closed. It will never block;
289    /// if the queue is empty, the iterator will return `None`. If new items are pushed into
290    /// the queue, the iterator may return `Some` in the future after returning `None`.
291    ///
292    /// # Examples
293    ///
294    /// ```
295    /// use concurrent_queue::ConcurrentQueue;
296    ///
297    /// let q = ConcurrentQueue::bounded(5);
298    /// q.push(1).unwrap();
299    /// q.push(2).unwrap();
300    /// q.push(3).unwrap();
301    ///
302    /// let mut iter = q.try_iter();
303    /// assert_eq!(iter.by_ref().sum::<i32>(), 6);
304    /// assert_eq!(iter.next(), None);
305    ///
306    /// // Pushing more items will make them available to the iterator.
307    /// q.push(4).unwrap();
308    /// assert_eq!(iter.next(), Some(4));
309    /// assert_eq!(iter.next(), None);
310    /// ```
311    pub fn try_iter(&self) -> TryIter<'_, T> {
312        TryIter { queue: self }
313    }
314
315    /// Returns `true` if the queue is empty.
316    ///
317    /// # Examples
318    ///
319    /// ```
320    /// use concurrent_queue::ConcurrentQueue;
321    ///
322    /// let q = ConcurrentQueue::<i32>::unbounded();
323    ///
324    /// assert!(q.is_empty());
325    /// q.push(1).unwrap();
326    /// assert!(!q.is_empty());
327    /// ```
328    pub fn is_empty(&self) -> bool {
329        match &self.0 {
330            Inner::Single(q) => q.is_empty(),
331            Inner::Bounded(q) => q.is_empty(),
332            Inner::Unbounded(q) => q.is_empty(),
333        }
334    }
335
336    /// Returns `true` if the queue is full.
337    ///
338    /// An unbounded queue is never full.
339    ///
340    /// # Examples
341    ///
342    /// ```
343    /// use concurrent_queue::ConcurrentQueue;
344    ///
345    /// let q = ConcurrentQueue::bounded(1);
346    ///
347    /// assert!(!q.is_full());
348    /// q.push(1).unwrap();
349    /// assert!(q.is_full());
350    /// ```
351    pub fn is_full(&self) -> bool {
352        match &self.0 {
353            Inner::Single(q) => q.is_full(),
354            Inner::Bounded(q) => q.is_full(),
355            Inner::Unbounded(q) => q.is_full(),
356        }
357    }
358
359    /// Returns the number of items in the queue.
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// use concurrent_queue::ConcurrentQueue;
365    ///
366    /// let q = ConcurrentQueue::unbounded();
367    /// assert_eq!(q.len(), 0);
368    ///
369    /// assert_eq!(q.push(10), Ok(()));
370    /// assert_eq!(q.len(), 1);
371    ///
372    /// assert_eq!(q.push(20), Ok(()));
373    /// assert_eq!(q.len(), 2);
374    /// ```
375    pub fn len(&self) -> usize {
376        match &self.0 {
377            Inner::Single(q) => q.len(),
378            Inner::Bounded(q) => q.len(),
379            Inner::Unbounded(q) => q.len(),
380        }
381    }
382
383    /// Returns the capacity of the queue.
384    ///
385    /// Unbounded queues have infinite capacity, represented as [`None`].
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use concurrent_queue::ConcurrentQueue;
391    ///
392    /// let q = ConcurrentQueue::<i32>::bounded(7);
393    /// assert_eq!(q.capacity(), Some(7));
394    ///
395    /// let q = ConcurrentQueue::<i32>::unbounded();
396    /// assert_eq!(q.capacity(), None);
397    /// ```
398    pub fn capacity(&self) -> Option<usize> {
399        match &self.0 {
400            Inner::Single(_) => Some(1),
401            Inner::Bounded(q) => Some(q.capacity()),
402            Inner::Unbounded(_) => None,
403        }
404    }
405
406    /// Closes the queue.
407    ///
408    /// Returns `true` if this call closed the queue, or `false` if it was already closed.
409    ///
410    /// When a queue is closed, no more items can be pushed but the remaining items can still be
411    /// popped.
412    ///
413    /// # Examples
414    ///
415    /// ```
416    /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
417    ///
418    /// let q = ConcurrentQueue::unbounded();
419    /// assert_eq!(q.push(10), Ok(()));
420    ///
421    /// assert!(q.close());  // `true` because this call closes the queue.
422    /// assert!(!q.close()); // `false` because the queue is already closed.
423    ///
424    /// // Cannot push any more items when closed.
425    /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
426    ///
427    /// // Remaining items can still be popped.
428    /// assert_eq!(q.pop(), Ok(10));
429    ///
430    /// // When no more items are present, the error is `Closed`.
431    /// assert_eq!(q.pop(), Err(PopError::Closed));
432    /// ```
433    pub fn close(&self) -> bool {
434        match &self.0 {
435            Inner::Single(q) => q.close(),
436            Inner::Bounded(q) => q.close(),
437            Inner::Unbounded(q) => q.close(),
438        }
439    }
440
441    /// Returns `true` if the queue is closed.
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// use concurrent_queue::ConcurrentQueue;
447    ///
448    /// let q = ConcurrentQueue::<i32>::unbounded();
449    ///
450    /// assert!(!q.is_closed());
451    /// q.close();
452    /// assert!(q.is_closed());
453    /// ```
454    pub fn is_closed(&self) -> bool {
455        match &self.0 {
456            Inner::Single(q) => q.is_closed(),
457            Inner::Bounded(q) => q.is_closed(),
458            Inner::Unbounded(q) => q.is_closed(),
459        }
460    }
461}
462
463impl<T> fmt::Debug for ConcurrentQueue<T> {
464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465        f.debug_struct("ConcurrentQueue")
466            .field("len", &self.len())
467            .field("capacity", &self.capacity())
468            .field("is_closed", &self.is_closed())
469            .finish()
470    }
471}
472
473/// An iterator that pops items from a [`ConcurrentQueue`].
474///
475/// This iterator will never block; it will return `None` once the queue has
476/// been exhausted. Calling `next` after `None` may yield `Some(item)` if more items
477/// are pushed to the queue.
478#[must_use = "iterators are lazy and do nothing unless consumed"]
479#[derive(Clone)]
480pub struct TryIter<'a, T> {
481    queue: &'a ConcurrentQueue<T>,
482}
483
484impl<T> fmt::Debug for TryIter<'_, T> {
485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486        f.debug_tuple("Iter").field(&self.queue).finish()
487    }
488}
489
490impl<T> Iterator for TryIter<'_, T> {
491    type Item = T;
492
493    fn next(&mut self) -> Option<Self::Item> {
494        self.queue.pop().ok()
495    }
496}
497
498/// Error which occurs when popping from an empty queue.
499#[derive(Clone, Copy, Eq, PartialEq)]
500pub enum PopError {
501    /// The queue is empty but not closed.
502    Empty,
503
504    /// The queue is empty and closed.
505    Closed,
506}
507
508impl PopError {
509    /// Returns `true` if the queue is empty but not closed.
510    pub fn is_empty(&self) -> bool {
511        match self {
512            PopError::Empty => true,
513            PopError::Closed => false,
514        }
515    }
516
517    /// Returns `true` if the queue is empty and closed.
518    pub fn is_closed(&self) -> bool {
519        match self {
520            PopError::Empty => false,
521            PopError::Closed => true,
522        }
523    }
524}
525
526#[cfg(feature = "std")]
527impl error::Error for PopError {}
528
529impl fmt::Debug for PopError {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        match self {
532            PopError::Empty => write!(f, "Empty"),
533            PopError::Closed => write!(f, "Closed"),
534        }
535    }
536}
537
538impl fmt::Display for PopError {
539    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540        match self {
541            PopError::Empty => write!(f, "Empty"),
542            PopError::Closed => write!(f, "Closed"),
543        }
544    }
545}
546
547/// Error which occurs when pushing into a full or closed queue.
548#[derive(Clone, Copy, Eq, PartialEq)]
549pub enum PushError<T> {
550    /// The queue is full but not closed.
551    Full(T),
552
553    /// The queue is closed.
554    Closed(T),
555}
556
557impl<T> PushError<T> {
558    /// Unwraps the item that couldn't be pushed.
559    pub fn into_inner(self) -> T {
560        match self {
561            PushError::Full(t) => t,
562            PushError::Closed(t) => t,
563        }
564    }
565
566    /// Returns `true` if the queue is full but not closed.
567    pub fn is_full(&self) -> bool {
568        match self {
569            PushError::Full(_) => true,
570            PushError::Closed(_) => false,
571        }
572    }
573
574    /// Returns `true` if the queue is closed.
575    pub fn is_closed(&self) -> bool {
576        match self {
577            PushError::Full(_) => false,
578            PushError::Closed(_) => true,
579        }
580    }
581}
582
583#[cfg(feature = "std")]
584impl<T: fmt::Debug> error::Error for PushError<T> {}
585
586impl<T: fmt::Debug> fmt::Debug for PushError<T> {
587    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588        match self {
589            PushError::Full(t) => f.debug_tuple("Full").field(t).finish(),
590            PushError::Closed(t) => f.debug_tuple("Closed").field(t).finish(),
591        }
592    }
593}
594
595impl<T> fmt::Display for PushError<T> {
596    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597        match self {
598            PushError::Full(_) => write!(f, "Full"),
599            PushError::Closed(_) => write!(f, "Closed"),
600        }
601    }
602}
603
604/// Error that occurs when force-pushing into a full queue.
605#[derive(Clone, Copy, PartialEq, Eq)]
606pub struct ForcePushError<T>(pub T);
607
608impl<T> ForcePushError<T> {
609    /// Return the inner value that failed to be force-pushed.
610    pub fn into_inner(self) -> T {
611        self.0
612    }
613}
614
615impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        f.debug_tuple("ForcePushError").field(&self.0).finish()
618    }
619}
620
621impl<T> fmt::Display for ForcePushError<T> {
622    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
623        write!(f, "Closed")
624    }
625}
626
627#[cfg(feature = "std")]
628impl<T: fmt::Debug> error::Error for ForcePushError<T> {}
629
630/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
631#[inline]
632fn full_fence() {
633    #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
634    {
635        use core::{arch::asm, cell::UnsafeCell};
636        // HACK(stjepang): On x86 architectures there are two different ways of executing
637        // a `SeqCst` fence.
638        //
639        // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
640        // 2. A `lock <op>` instruction.
641        //
642        // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
643        // that the second one is sometimes a bit faster.
644        let a = UnsafeCell::new(0_usize);
645        // It is common to use `lock or` here, but when using a local variable, `lock not`, which
646        // does not change the flag, should be slightly more efficient.
647        // Refs: https://www.felixcloutier.com/x86/not
648        unsafe {
649            #[cfg(target_pointer_width = "64")]
650            asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags));
651            #[cfg(target_pointer_width = "32")]
652            asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags));
653        }
654        return;
655    }
656    #[allow(unreachable_code)]
657    {
658        atomic::fence(Ordering::SeqCst);
659    }
660}