concurrent_queue/lib.rs
1//! A concurrent multi-producer multi-consumer queue.
2//!
3//! There are two kinds of queues:
4//!
5//! 1. [Bounded] queue with limited capacity.
6//! 2. [Unbounded] queue with unlimited capacity.
7//!
8//! Queues also have the capability to get [closed] at any point. When closed, no more items can be
9//! pushed into the queue, although the remaining items can still be popped.
10//!
11//! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
12//! crate.
13//!
14//! # Examples
15//!
16//! ```
17//! use concurrent_queue::ConcurrentQueue;
18//!
19//! let q = ConcurrentQueue::unbounded();
20//! q.push(1).unwrap();
21//! q.push(2).unwrap();
22//!
23//! assert_eq!(q.pop(), Ok(1));
24//! assert_eq!(q.pop(), Ok(2));
25//! ```
26//!
27//! # Features
28//!
29//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
30//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
31//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
32//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
33//!
34//! There is also a `portable-atomic` feature, which uses a polyfill from the
35//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
36//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
37//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
38//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
39//! information.
40//!
41//! [Bounded]: `ConcurrentQueue::bounded()`
42//! [Unbounded]: `ConcurrentQueue::unbounded()`
43//! [closed]: `ConcurrentQueue::close()`
44//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
45//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
46
47#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
48#![no_std]
49#![doc(
50 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
51)]
52#![doc(
53 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55
56extern crate alloc;
57#[cfg(feature = "std")]
58extern crate std;
59
60use core::fmt;
61use core::panic::{RefUnwindSafe, UnwindSafe};
62use sync::atomic::{self, Ordering};
63
64#[cfg(feature = "std")]
65use std::error;
66
67use crate::bounded::Bounded;
68use crate::single::Single;
69use crate::sync::busy_wait;
70use crate::unbounded::Unbounded;
71
72mod bounded;
73mod single;
74mod unbounded;
75
76mod sync;
77
78/// Make the given function const if the given condition is true.
79macro_rules! const_fn {
80 (
81 const_if: #[cfg($($cfg:tt)+)];
82 $(#[$($attr:tt)*])*
83 $vis:vis const fn $($rest:tt)*
84 ) => {
85 #[cfg($($cfg)+)]
86 $(#[$($attr)*])*
87 $vis const fn $($rest)*
88 #[cfg(not($($cfg)+))]
89 $(#[$($attr)*])*
90 $vis fn $($rest)*
91 };
92}
93
94pub(crate) use const_fn;
95
96/// A concurrent queue.
97///
98/// # Examples
99///
100/// ```
101/// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
102///
103/// let q = ConcurrentQueue::bounded(2);
104///
105/// assert_eq!(q.push('a'), Ok(()));
106/// assert_eq!(q.push('b'), Ok(()));
107/// assert_eq!(q.push('c'), Err(PushError::Full('c')));
108///
109/// assert_eq!(q.pop(), Ok('a'));
110/// assert_eq!(q.pop(), Ok('b'));
111/// assert_eq!(q.pop(), Err(PopError::Empty));
112/// ```
113pub struct ConcurrentQueue<T>(Inner<T>);
114
115unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
116unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
117
118impl<T> UnwindSafe for ConcurrentQueue<T> {}
119impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
120
121#[allow(clippy::large_enum_variant)]
122enum Inner<T> {
123 Single(Single<T>),
124 Bounded(Bounded<T>),
125 Unbounded(Unbounded<T>),
126}
127
128impl<T> ConcurrentQueue<T> {
129 /// Creates a new bounded queue.
130 ///
131 /// The queue allocates enough space for `cap` items.
132 ///
133 /// # Panics
134 ///
135 /// If the capacity is zero, this constructor will panic.
136 ///
137 /// # Examples
138 ///
139 /// ```
140 /// use concurrent_queue::ConcurrentQueue;
141 ///
142 /// let q = ConcurrentQueue::<i32>::bounded(100);
143 /// ```
144 pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
145 if cap == 1 {
146 ConcurrentQueue(Inner::Single(Single::new()))
147 } else {
148 ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
149 }
150 }
151
152 const_fn!(
153 const_if: #[cfg(not(loom))];
154 /// Creates a new unbounded queue.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use concurrent_queue::ConcurrentQueue;
160 ///
161 /// let q = ConcurrentQueue::<i32>::unbounded();
162 /// ```
163 pub const fn unbounded() -> ConcurrentQueue<T> {
164 ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
165 }
166 );
167
168 /// Attempts to push an item into the queue.
169 ///
170 /// If the queue is full or closed, the item is returned back as an error.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use concurrent_queue::{ConcurrentQueue, PushError};
176 ///
177 /// let q = ConcurrentQueue::bounded(1);
178 ///
179 /// // Push succeeds because there is space in the queue.
180 /// assert_eq!(q.push(10), Ok(()));
181 ///
182 /// // Push errors because the queue is now full.
183 /// assert_eq!(q.push(20), Err(PushError::Full(20)));
184 ///
185 /// // Close the queue, which will prevent further pushes.
186 /// q.close();
187 ///
188 /// // Pushing now errors indicating the queue is closed.
189 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
190 ///
191 /// // Pop the single item in the queue.
192 /// assert_eq!(q.pop(), Ok(10));
193 ///
194 /// // Even though there is space, no more items can be pushed.
195 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
196 /// ```
197 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
198 match &self.0 {
199 Inner::Single(q) => q.push(value),
200 Inner::Bounded(q) => q.push(value),
201 Inner::Unbounded(q) => q.push(value),
202 }
203 }
204
205 /// Push an element into the queue, potentially displacing another element.
206 ///
207 /// Attempts to push an element into the queue. If the queue is full, one item from the
208 /// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
209 /// If the queue is closed, an error is returned.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
215 ///
216 /// let q = ConcurrentQueue::bounded(3);
217 ///
218 /// // We can push to the queue.
219 /// for i in 1..=3 {
220 /// assert_eq!(q.force_push(i), Ok(None));
221 /// }
222 ///
223 /// // Push errors because the queue is now full.
224 /// assert_eq!(q.push(4), Err(PushError::Full(4)));
225 ///
226 /// // Pushing a new value replaces the old ones.
227 /// assert_eq!(q.force_push(5), Ok(Some(1)));
228 /// assert_eq!(q.force_push(6), Ok(Some(2)));
229 ///
230 /// // Close the queue to stop further pushes.
231 /// q.close();
232 ///
233 /// // Pushing will return an error.
234 /// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
235 ///
236 /// // Popping items will return the force-pushed ones.
237 /// assert_eq!(q.pop(), Ok(3));
238 /// assert_eq!(q.pop(), Ok(5));
239 /// assert_eq!(q.pop(), Ok(6));
240 /// ```
241 pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
242 match &self.0 {
243 Inner::Single(q) => q.force_push(value),
244 Inner::Bounded(q) => q.force_push(value),
245 Inner::Unbounded(q) => match q.push(value) {
246 Ok(()) => Ok(None),
247 Err(PushError::Closed(value)) => Err(ForcePushError(value)),
248 Err(PushError::Full(_)) => unreachable!(),
249 },
250 }
251 }
252
253 /// Attempts to pop an item from the queue.
254 ///
255 /// If the queue is empty, an error is returned.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// use concurrent_queue::{ConcurrentQueue, PopError};
261 ///
262 /// let q = ConcurrentQueue::bounded(1);
263 ///
264 /// // Pop errors when the queue is empty.
265 /// assert_eq!(q.pop(), Err(PopError::Empty));
266 ///
267 /// // Push one item and close the queue.
268 /// assert_eq!(q.push(10), Ok(()));
269 /// q.close();
270 ///
271 /// // Remaining items can be popped.
272 /// assert_eq!(q.pop(), Ok(10));
273 ///
274 /// // Again, pop errors when the queue is empty,
275 /// // but now also indicates that the queue is closed.
276 /// assert_eq!(q.pop(), Err(PopError::Closed));
277 /// ```
278 pub fn pop(&self) -> Result<T, PopError> {
279 match &self.0 {
280 Inner::Single(q) => q.pop(),
281 Inner::Bounded(q) => q.pop(),
282 Inner::Unbounded(q) => q.pop(),
283 }
284 }
285
286 /// Get an iterator over the items in the queue.
287 ///
288 /// The iterator will continue until the queue is empty or closed. It will never block;
289 /// if the queue is empty, the iterator will return `None`. If new items are pushed into
290 /// the queue, the iterator may return `Some` in the future after returning `None`.
291 ///
292 /// # Examples
293 ///
294 /// ```
295 /// use concurrent_queue::ConcurrentQueue;
296 ///
297 /// let q = ConcurrentQueue::bounded(5);
298 /// q.push(1).unwrap();
299 /// q.push(2).unwrap();
300 /// q.push(3).unwrap();
301 ///
302 /// let mut iter = q.try_iter();
303 /// assert_eq!(iter.by_ref().sum::<i32>(), 6);
304 /// assert_eq!(iter.next(), None);
305 ///
306 /// // Pushing more items will make them available to the iterator.
307 /// q.push(4).unwrap();
308 /// assert_eq!(iter.next(), Some(4));
309 /// assert_eq!(iter.next(), None);
310 /// ```
311 pub fn try_iter(&self) -> TryIter<'_, T> {
312 TryIter { queue: self }
313 }
314
315 /// Returns `true` if the queue is empty.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// use concurrent_queue::ConcurrentQueue;
321 ///
322 /// let q = ConcurrentQueue::<i32>::unbounded();
323 ///
324 /// assert!(q.is_empty());
325 /// q.push(1).unwrap();
326 /// assert!(!q.is_empty());
327 /// ```
328 pub fn is_empty(&self) -> bool {
329 match &self.0 {
330 Inner::Single(q) => q.is_empty(),
331 Inner::Bounded(q) => q.is_empty(),
332 Inner::Unbounded(q) => q.is_empty(),
333 }
334 }
335
336 /// Returns `true` if the queue is full.
337 ///
338 /// An unbounded queue is never full.
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use concurrent_queue::ConcurrentQueue;
344 ///
345 /// let q = ConcurrentQueue::bounded(1);
346 ///
347 /// assert!(!q.is_full());
348 /// q.push(1).unwrap();
349 /// assert!(q.is_full());
350 /// ```
351 pub fn is_full(&self) -> bool {
352 match &self.0 {
353 Inner::Single(q) => q.is_full(),
354 Inner::Bounded(q) => q.is_full(),
355 Inner::Unbounded(q) => q.is_full(),
356 }
357 }
358
359 /// Returns the number of items in the queue.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use concurrent_queue::ConcurrentQueue;
365 ///
366 /// let q = ConcurrentQueue::unbounded();
367 /// assert_eq!(q.len(), 0);
368 ///
369 /// assert_eq!(q.push(10), Ok(()));
370 /// assert_eq!(q.len(), 1);
371 ///
372 /// assert_eq!(q.push(20), Ok(()));
373 /// assert_eq!(q.len(), 2);
374 /// ```
375 pub fn len(&self) -> usize {
376 match &self.0 {
377 Inner::Single(q) => q.len(),
378 Inner::Bounded(q) => q.len(),
379 Inner::Unbounded(q) => q.len(),
380 }
381 }
382
383 /// Returns the capacity of the queue.
384 ///
385 /// Unbounded queues have infinite capacity, represented as [`None`].
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use concurrent_queue::ConcurrentQueue;
391 ///
392 /// let q = ConcurrentQueue::<i32>::bounded(7);
393 /// assert_eq!(q.capacity(), Some(7));
394 ///
395 /// let q = ConcurrentQueue::<i32>::unbounded();
396 /// assert_eq!(q.capacity(), None);
397 /// ```
398 pub fn capacity(&self) -> Option<usize> {
399 match &self.0 {
400 Inner::Single(_) => Some(1),
401 Inner::Bounded(q) => Some(q.capacity()),
402 Inner::Unbounded(_) => None,
403 }
404 }
405
406 /// Closes the queue.
407 ///
408 /// Returns `true` if this call closed the queue, or `false` if it was already closed.
409 ///
410 /// When a queue is closed, no more items can be pushed but the remaining items can still be
411 /// popped.
412 ///
413 /// # Examples
414 ///
415 /// ```
416 /// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
417 ///
418 /// let q = ConcurrentQueue::unbounded();
419 /// assert_eq!(q.push(10), Ok(()));
420 ///
421 /// assert!(q.close()); // `true` because this call closes the queue.
422 /// assert!(!q.close()); // `false` because the queue is already closed.
423 ///
424 /// // Cannot push any more items when closed.
425 /// assert_eq!(q.push(20), Err(PushError::Closed(20)));
426 ///
427 /// // Remaining items can still be popped.
428 /// assert_eq!(q.pop(), Ok(10));
429 ///
430 /// // When no more items are present, the error is `Closed`.
431 /// assert_eq!(q.pop(), Err(PopError::Closed));
432 /// ```
433 pub fn close(&self) -> bool {
434 match &self.0 {
435 Inner::Single(q) => q.close(),
436 Inner::Bounded(q) => q.close(),
437 Inner::Unbounded(q) => q.close(),
438 }
439 }
440
441 /// Returns `true` if the queue is closed.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use concurrent_queue::ConcurrentQueue;
447 ///
448 /// let q = ConcurrentQueue::<i32>::unbounded();
449 ///
450 /// assert!(!q.is_closed());
451 /// q.close();
452 /// assert!(q.is_closed());
453 /// ```
454 pub fn is_closed(&self) -> bool {
455 match &self.0 {
456 Inner::Single(q) => q.is_closed(),
457 Inner::Bounded(q) => q.is_closed(),
458 Inner::Unbounded(q) => q.is_closed(),
459 }
460 }
461}
462
463impl<T> fmt::Debug for ConcurrentQueue<T> {
464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465 f.debug_struct("ConcurrentQueue")
466 .field("len", &self.len())
467 .field("capacity", &self.capacity())
468 .field("is_closed", &self.is_closed())
469 .finish()
470 }
471}
472
473/// An iterator that pops items from a [`ConcurrentQueue`].
474///
475/// This iterator will never block; it will return `None` once the queue has
476/// been exhausted. Calling `next` after `None` may yield `Some(item)` if more items
477/// are pushed to the queue.
478#[must_use = "iterators are lazy and do nothing unless consumed"]
479#[derive(Clone)]
480pub struct TryIter<'a, T> {
481 queue: &'a ConcurrentQueue<T>,
482}
483
484impl<T> fmt::Debug for TryIter<'_, T> {
485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486 f.debug_tuple("Iter").field(&self.queue).finish()
487 }
488}
489
490impl<T> Iterator for TryIter<'_, T> {
491 type Item = T;
492
493 fn next(&mut self) -> Option<Self::Item> {
494 self.queue.pop().ok()
495 }
496}
497
498/// Error which occurs when popping from an empty queue.
499#[derive(Clone, Copy, Eq, PartialEq)]
500pub enum PopError {
501 /// The queue is empty but not closed.
502 Empty,
503
504 /// The queue is empty and closed.
505 Closed,
506}
507
508impl PopError {
509 /// Returns `true` if the queue is empty but not closed.
510 pub fn is_empty(&self) -> bool {
511 match self {
512 PopError::Empty => true,
513 PopError::Closed => false,
514 }
515 }
516
517 /// Returns `true` if the queue is empty and closed.
518 pub fn is_closed(&self) -> bool {
519 match self {
520 PopError::Empty => false,
521 PopError::Closed => true,
522 }
523 }
524}
525
526#[cfg(feature = "std")]
527impl error::Error for PopError {}
528
529impl fmt::Debug for PopError {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 match self {
532 PopError::Empty => write!(f, "Empty"),
533 PopError::Closed => write!(f, "Closed"),
534 }
535 }
536}
537
538impl fmt::Display for PopError {
539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540 match self {
541 PopError::Empty => write!(f, "Empty"),
542 PopError::Closed => write!(f, "Closed"),
543 }
544 }
545}
546
547/// Error which occurs when pushing into a full or closed queue.
548#[derive(Clone, Copy, Eq, PartialEq)]
549pub enum PushError<T> {
550 /// The queue is full but not closed.
551 Full(T),
552
553 /// The queue is closed.
554 Closed(T),
555}
556
557impl<T> PushError<T> {
558 /// Unwraps the item that couldn't be pushed.
559 pub fn into_inner(self) -> T {
560 match self {
561 PushError::Full(t) => t,
562 PushError::Closed(t) => t,
563 }
564 }
565
566 /// Returns `true` if the queue is full but not closed.
567 pub fn is_full(&self) -> bool {
568 match self {
569 PushError::Full(_) => true,
570 PushError::Closed(_) => false,
571 }
572 }
573
574 /// Returns `true` if the queue is closed.
575 pub fn is_closed(&self) -> bool {
576 match self {
577 PushError::Full(_) => false,
578 PushError::Closed(_) => true,
579 }
580 }
581}
582
583#[cfg(feature = "std")]
584impl<T: fmt::Debug> error::Error for PushError<T> {}
585
586impl<T: fmt::Debug> fmt::Debug for PushError<T> {
587 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588 match self {
589 PushError::Full(t) => f.debug_tuple("Full").field(t).finish(),
590 PushError::Closed(t) => f.debug_tuple("Closed").field(t).finish(),
591 }
592 }
593}
594
595impl<T> fmt::Display for PushError<T> {
596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597 match self {
598 PushError::Full(_) => write!(f, "Full"),
599 PushError::Closed(_) => write!(f, "Closed"),
600 }
601 }
602}
603
604/// Error that occurs when force-pushing into a full queue.
605#[derive(Clone, Copy, PartialEq, Eq)]
606pub struct ForcePushError<T>(pub T);
607
608impl<T> ForcePushError<T> {
609 /// Return the inner value that failed to be force-pushed.
610 pub fn into_inner(self) -> T {
611 self.0
612 }
613}
614
615impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 f.debug_tuple("ForcePushError").field(&self.0).finish()
618 }
619}
620
621impl<T> fmt::Display for ForcePushError<T> {
622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
623 write!(f, "Closed")
624 }
625}
626
627#[cfg(feature = "std")]
628impl<T: fmt::Debug> error::Error for ForcePushError<T> {}
629
630/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
631#[inline]
632fn full_fence() {
633 #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
634 {
635 use core::{arch::asm, cell::UnsafeCell};
636 // HACK(stjepang): On x86 architectures there are two different ways of executing
637 // a `SeqCst` fence.
638 //
639 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
640 // 2. A `lock <op>` instruction.
641 //
642 // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
643 // that the second one is sometimes a bit faster.
644 let a = UnsafeCell::new(0_usize);
645 // It is common to use `lock or` here, but when using a local variable, `lock not`, which
646 // does not change the flag, should be slightly more efficient.
647 // Refs: https://www.felixcloutier.com/x86/not
648 unsafe {
649 #[cfg(target_pointer_width = "64")]
650 asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags));
651 #[cfg(target_pointer_width = "32")]
652 asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags));
653 }
654 return;
655 }
656 #[allow(unreachable_code)]
657 {
658 atomic::fence(Ordering::SeqCst);
659 }
660}