crossbeam_queue/array_queue.rs
1//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2//!
3//! Source:
4//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
5//!
6//! Copyright & License:
7//! - Copyright (c) 2010-2011 Dmitry Vyukov
8//! - Simplified BSD License and Apache License, Version 2.0
9//! - <http://www.1024cores.net/home/code-license>
10
11use alloc::boxed::Box;
12use core::cell::UnsafeCell;
13use core::fmt;
14use core::marker::PhantomData;
15use core::mem::{self, MaybeUninit};
16use core::sync::atomic::{self, AtomicUsize, Ordering};
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20/// A slot in a queue.
21struct Slot<T> {
22 /// The current stamp.
23 ///
24 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
25 /// this node will be next read from.
26 stamp: AtomicUsize,
27
28 /// The value in this slot.
29 value: UnsafeCell<MaybeUninit<T>>,
30}
31
32/// A bounded multi-producer multi-consumer queue.
33///
34/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
35/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
36/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
37/// faster than [`SegQueue`].
38///
39/// [`SegQueue`]: super::SegQueue
40///
41/// # Examples
42///
43/// ```
44/// use crossbeam_queue::ArrayQueue;
45///
46/// let q = ArrayQueue::new(2);
47///
48/// assert_eq!(q.push('a'), Ok(()));
49/// assert_eq!(q.push('b'), Ok(()));
50/// assert_eq!(q.push('c'), Err('c'));
51/// assert_eq!(q.pop(), Some('a'));
52/// ```
53pub struct ArrayQueue<T> {
54 /// The head of the queue.
55 ///
56 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
57 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
58 ///
59 /// Elements are popped from the head of the queue.
60 head: CachePadded<AtomicUsize>,
61
62 /// The tail of the queue.
63 ///
64 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
65 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
66 ///
67 /// Elements are pushed into the tail of the queue.
68 tail: CachePadded<AtomicUsize>,
69
70 /// The buffer holding slots.
71 buffer: *mut Slot<T>,
72
73 /// The queue capacity.
74 cap: usize,
75
76 /// A stamp with the value of `{ lap: 1, index: 0 }`.
77 one_lap: usize,
78
79 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
80 _marker: PhantomData<T>,
81}
82
83unsafe impl<T: Send> Sync for ArrayQueue<T> {}
84unsafe impl<T: Send> Send for ArrayQueue<T> {}
85
86impl<T> ArrayQueue<T> {
87 /// Creates a new bounded queue with the given capacity.
88 ///
89 /// # Panics
90 ///
91 /// Panics if the capacity is zero.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// use crossbeam_queue::ArrayQueue;
97 ///
98 /// let q = ArrayQueue::<i32>::new(100);
99 /// ```
100 pub fn new(cap: usize) -> ArrayQueue<T> {
101 assert!(cap > 0, "capacity must be non-zero");
102
103 // Head is initialized to `{ lap: 0, index: 0 }`.
104 // Tail is initialized to `{ lap: 0, index: 0 }`.
105 let head = 0;
106 let tail = 0;
107
108 // Allocate a buffer of `cap` slots initialized
109 // with stamps.
110 let buffer = {
111 let mut boxed: Box<[Slot<T>]> = (0..cap)
112 .map(|i| {
113 // Set the stamp to `{ lap: 0, index: i }`.
114 Slot {
115 stamp: AtomicUsize::new(i),
116 value: UnsafeCell::new(MaybeUninit::uninit()),
117 }
118 })
119 .collect();
120 let ptr = boxed.as_mut_ptr();
121 mem::forget(boxed);
122 ptr
123 };
124
125 // One lap is the smallest power of two greater than `cap`.
126 let one_lap = (cap + 1).next_power_of_two();
127
128 ArrayQueue {
129 buffer,
130 cap,
131 one_lap,
132 head: CachePadded::new(AtomicUsize::new(head)),
133 tail: CachePadded::new(AtomicUsize::new(tail)),
134 _marker: PhantomData,
135 }
136 }
137
138 /// Attempts to push an element into the queue.
139 ///
140 /// If the queue is full, the element is returned back as an error.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use crossbeam_queue::ArrayQueue;
146 ///
147 /// let q = ArrayQueue::new(1);
148 ///
149 /// assert_eq!(q.push(10), Ok(()));
150 /// assert_eq!(q.push(20), Err(20));
151 /// ```
152 pub fn push(&self, value: T) -> Result<(), T> {
153 let backoff = Backoff::new();
154 let mut tail = self.tail.load(Ordering::Relaxed);
155
156 loop {
157 // Deconstruct the tail.
158 let index = tail & (self.one_lap - 1);
159 let lap = tail & !(self.one_lap - 1);
160
161 // Inspect the corresponding slot.
162 let slot = unsafe { &*self.buffer.add(index) };
163 let stamp = slot.stamp.load(Ordering::Acquire);
164
165 // If the tail and the stamp match, we may attempt to push.
166 if tail == stamp {
167 let new_tail = if index + 1 < self.cap {
168 // Same lap, incremented index.
169 // Set to `{ lap: lap, index: index + 1 }`.
170 tail + 1
171 } else {
172 // One lap forward, index wraps around to zero.
173 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
174 lap.wrapping_add(self.one_lap)
175 };
176
177 // Try moving the tail.
178 match self.tail.compare_exchange_weak(
179 tail,
180 new_tail,
181 Ordering::SeqCst,
182 Ordering::Relaxed,
183 ) {
184 Ok(_) => {
185 // Write the value into the slot and update the stamp.
186 unsafe {
187 slot.value.get().write(MaybeUninit::new(value));
188 }
189 slot.stamp.store(tail + 1, Ordering::Release);
190 return Ok(());
191 }
192 Err(t) => {
193 tail = t;
194 backoff.spin();
195 }
196 }
197 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
198 atomic::fence(Ordering::SeqCst);
199 let head = self.head.load(Ordering::Relaxed);
200
201 // If the head lags one lap behind the tail as well...
202 if head.wrapping_add(self.one_lap) == tail {
203 // ...then the queue is full.
204 return Err(value);
205 }
206
207 backoff.spin();
208 tail = self.tail.load(Ordering::Relaxed);
209 } else {
210 // Snooze because we need to wait for the stamp to get updated.
211 backoff.snooze();
212 tail = self.tail.load(Ordering::Relaxed);
213 }
214 }
215 }
216
217 /// Attempts to pop an element from the queue.
218 ///
219 /// If the queue is empty, `None` is returned.
220 ///
221 /// # Examples
222 ///
223 /// ```
224 /// use crossbeam_queue::ArrayQueue;
225 ///
226 /// let q = ArrayQueue::new(1);
227 /// assert_eq!(q.push(10), Ok(()));
228 ///
229 /// assert_eq!(q.pop(), Some(10));
230 /// assert!(q.pop().is_none());
231 /// ```
232 pub fn pop(&self) -> Option<T> {
233 let backoff = Backoff::new();
234 let mut head = self.head.load(Ordering::Relaxed);
235
236 loop {
237 // Deconstruct the head.
238 let index = head & (self.one_lap - 1);
239 let lap = head & !(self.one_lap - 1);
240
241 // Inspect the corresponding slot.
242 let slot = unsafe { &*self.buffer.add(index) };
243 let stamp = slot.stamp.load(Ordering::Acquire);
244
245 // If the the stamp is ahead of the head by 1, we may attempt to pop.
246 if head + 1 == stamp {
247 let new = if index + 1 < self.cap {
248 // Same lap, incremented index.
249 // Set to `{ lap: lap, index: index + 1 }`.
250 head + 1
251 } else {
252 // One lap forward, index wraps around to zero.
253 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
254 lap.wrapping_add(self.one_lap)
255 };
256
257 // Try moving the head.
258 match self.head.compare_exchange_weak(
259 head,
260 new,
261 Ordering::SeqCst,
262 Ordering::Relaxed,
263 ) {
264 Ok(_) => {
265 // Read the value from the slot and update the stamp.
266 let msg = unsafe { slot.value.get().read().assume_init() };
267 slot.stamp
268 .store(head.wrapping_add(self.one_lap), Ordering::Release);
269 return Some(msg);
270 }
271 Err(h) => {
272 head = h;
273 backoff.spin();
274 }
275 }
276 } else if stamp == head {
277 atomic::fence(Ordering::SeqCst);
278 let tail = self.tail.load(Ordering::Relaxed);
279
280 // If the tail equals the head, that means the channel is empty.
281 if tail == head {
282 return None;
283 }
284
285 backoff.spin();
286 head = self.head.load(Ordering::Relaxed);
287 } else {
288 // Snooze because we need to wait for the stamp to get updated.
289 backoff.snooze();
290 head = self.head.load(Ordering::Relaxed);
291 }
292 }
293 }
294
295 /// Returns the capacity of the queue.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use crossbeam_queue::ArrayQueue;
301 ///
302 /// let q = ArrayQueue::<i32>::new(100);
303 ///
304 /// assert_eq!(q.capacity(), 100);
305 /// ```
306 pub fn capacity(&self) -> usize {
307 self.cap
308 }
309
310 /// Returns `true` if the queue is empty.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// use crossbeam_queue::ArrayQueue;
316 ///
317 /// let q = ArrayQueue::new(100);
318 ///
319 /// assert!(q.is_empty());
320 /// q.push(1).unwrap();
321 /// assert!(!q.is_empty());
322 /// ```
323 pub fn is_empty(&self) -> bool {
324 let head = self.head.load(Ordering::SeqCst);
325 let tail = self.tail.load(Ordering::SeqCst);
326
327 // Is the tail lagging one lap behind head?
328 // Is the tail equal to the head?
329 //
330 // Note: If the head changes just before we load the tail, that means there was a moment
331 // when the channel was not empty, so it is safe to just return `false`.
332 tail == head
333 }
334
335 /// Returns `true` if the queue is full.
336 ///
337 /// # Examples
338 ///
339 /// ```
340 /// use crossbeam_queue::ArrayQueue;
341 ///
342 /// let q = ArrayQueue::new(1);
343 ///
344 /// assert!(!q.is_full());
345 /// q.push(1).unwrap();
346 /// assert!(q.is_full());
347 /// ```
348 pub fn is_full(&self) -> bool {
349 let tail = self.tail.load(Ordering::SeqCst);
350 let head = self.head.load(Ordering::SeqCst);
351
352 // Is the head lagging one lap behind tail?
353 //
354 // Note: If the tail changes just before we load the head, that means there was a moment
355 // when the queue was not full, so it is safe to just return `false`.
356 head.wrapping_add(self.one_lap) == tail
357 }
358
359 /// Returns the number of elements in the queue.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use crossbeam_queue::ArrayQueue;
365 ///
366 /// let q = ArrayQueue::new(100);
367 /// assert_eq!(q.len(), 0);
368 ///
369 /// q.push(10).unwrap();
370 /// assert_eq!(q.len(), 1);
371 ///
372 /// q.push(20).unwrap();
373 /// assert_eq!(q.len(), 2);
374 /// ```
375 pub fn len(&self) -> usize {
376 loop {
377 // Load the tail, then load the head.
378 let tail = self.tail.load(Ordering::SeqCst);
379 let head = self.head.load(Ordering::SeqCst);
380
381 // If the tail didn't change, we've got consistent values to work with.
382 if self.tail.load(Ordering::SeqCst) == tail {
383 let hix = head & (self.one_lap - 1);
384 let tix = tail & (self.one_lap - 1);
385
386 return if hix < tix {
387 tix - hix
388 } else if hix > tix {
389 self.cap - hix + tix
390 } else if tail == head {
391 0
392 } else {
393 self.cap
394 };
395 }
396 }
397 }
398}
399
400impl<T> Drop for ArrayQueue<T> {
401 fn drop(&mut self) {
402 // Get the index of the head.
403 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
404
405 // Loop over all slots that hold a message and drop them.
406 for i in 0..self.len() {
407 // Compute the index of the next slot holding a message.
408 let index = if hix + i < self.cap {
409 hix + i
410 } else {
411 hix + i - self.cap
412 };
413
414 unsafe {
415 let p = {
416 let slot = &mut *self.buffer.add(index);
417 let value = &mut *slot.value.get();
418 value.as_mut_ptr()
419 };
420 p.drop_in_place();
421 }
422 }
423
424 // Finally, deallocate the buffer, but don't run any destructors.
425 unsafe {
426 // Create a slice from the buffer to make
427 // a fat pointer. Then, use Box::from_raw
428 // to deallocate it.
429 let ptr = core::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
430 Box::from_raw(ptr);
431 }
432 }
433}
434
435impl<T> fmt::Debug for ArrayQueue<T> {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 f.pad("ArrayQueue { .. }")
438 }
439}