crossbeam_queue/
seg_queue.rs

1use alloc::boxed::Box;
2use core::cell::UnsafeCell;
3use core::fmt;
4use core::marker::PhantomData;
5use core::mem::MaybeUninit;
6use core::ptr;
7use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8
9use crossbeam_utils::{Backoff, CachePadded};
10
11// Bits indicating the state of a slot:
12// * If a value has been written into the slot, `WRITE` is set.
13// * If a value has been read from the slot, `READ` is set.
14// * If the block is being destroyed, `DESTROY` is set.
15const WRITE: usize = 1;
16const READ: usize = 2;
17const DESTROY: usize = 4;
18
19// Each block covers one "lap" of indices.
20const LAP: usize = 32;
21// The maximum number of values a block can hold.
22const BLOCK_CAP: usize = LAP - 1;
23// How many lower bits are reserved for metadata.
24const SHIFT: usize = 1;
25// Indicates that the block is not the last one.
26const HAS_NEXT: usize = 1;
27
28/// A slot in a block.
29struct Slot<T> {
30    /// The value.
31    value: UnsafeCell<MaybeUninit<T>>,
32
33    /// The state of the slot.
34    state: AtomicUsize,
35}
36
37impl<T> Slot<T> {
38    /// Waits until a value is written into the slot.
39    fn wait_write(&self) {
40        let backoff = Backoff::new();
41        while self.state.load(Ordering::Acquire) & WRITE == 0 {
42            backoff.snooze();
43        }
44    }
45}
46
47/// A block in a linked list.
48///
49/// Each block in the list can hold up to `BLOCK_CAP` values.
50struct Block<T> {
51    /// The next block in the linked list.
52    next: AtomicPtr<Block<T>>,
53
54    /// Slots for values.
55    slots: [Slot<T>; BLOCK_CAP],
56}
57
58impl<T> Block<T> {
59    /// Creates an empty block that starts at `start_index`.
60    fn new() -> Block<T> {
61        // SAFETY: This is safe because:
62        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65        //       holds a MaybeUninit.
66        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67        unsafe { MaybeUninit::zeroed().assume_init() }
68    }
69
70    /// Waits until the next pointer is set.
71    fn wait_next(&self) -> *mut Block<T> {
72        let backoff = Backoff::new();
73        loop {
74            let next = self.next.load(Ordering::Acquire);
75            if !next.is_null() {
76                return next;
77            }
78            backoff.snooze();
79        }
80    }
81
82    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
83    unsafe fn destroy(this: *mut Block<T>, start: usize) {
84        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85        // begun destruction of the block.
86        for i in start..BLOCK_CAP - 1 {
87            let slot = (*this).slots.get_unchecked(i);
88
89            // Mark the `DESTROY` bit if a thread is still using the slot.
90            if slot.state.load(Ordering::Acquire) & READ == 0
91                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92            {
93                // If a thread is still using the slot, it will continue destruction of the block.
94                return;
95            }
96        }
97
98        // No thread is using the block, now it is safe to destroy it.
99        drop(Box::from_raw(this));
100    }
101}
102
103/// A position in a queue.
104struct Position<T> {
105    /// The index in the queue.
106    index: AtomicUsize,
107
108    /// The block in the linked list.
109    block: AtomicPtr<Block<T>>,
110}
111
112/// An unbounded multi-producer multi-consumer queue.
113///
114/// This queue is implemented as a linked list of segments, where each segment is a small buffer
115/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
116/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
117/// this queue is somewhat slower than [`ArrayQueue`].
118///
119/// [`ArrayQueue`]: super::ArrayQueue
120///
121/// # Examples
122///
123/// ```
124/// use crossbeam_queue::SegQueue;
125///
126/// let q = SegQueue::new();
127///
128/// q.push('a');
129/// q.push('b');
130///
131/// assert_eq!(q.pop(), Some('a'));
132/// assert_eq!(q.pop(), Some('b'));
133/// assert!(q.pop().is_none());
134/// ```
135pub struct SegQueue<T> {
136    /// The head of the queue.
137    head: CachePadded<Position<T>>,
138
139    /// The tail of the queue.
140    tail: CachePadded<Position<T>>,
141
142    /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
143    _marker: PhantomData<T>,
144}
145
146unsafe impl<T: Send> Send for SegQueue<T> {}
147unsafe impl<T: Send> Sync for SegQueue<T> {}
148
149impl<T> SegQueue<T> {
150    /// Creates a new unbounded queue.
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use crossbeam_queue::SegQueue;
156    ///
157    /// let q = SegQueue::<i32>::new();
158    /// ```
159    pub const fn new() -> SegQueue<T> {
160        SegQueue {
161            head: CachePadded::new(Position {
162                block: AtomicPtr::new(ptr::null_mut()),
163                index: AtomicUsize::new(0),
164            }),
165            tail: CachePadded::new(Position {
166                block: AtomicPtr::new(ptr::null_mut()),
167                index: AtomicUsize::new(0),
168            }),
169            _marker: PhantomData,
170        }
171    }
172
173    /// Pushes an element into the queue.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use crossbeam_queue::SegQueue;
179    ///
180    /// let q = SegQueue::new();
181    ///
182    /// q.push(10);
183    /// q.push(20);
184    /// ```
185    pub fn push(&self, value: T) {
186        let backoff = Backoff::new();
187        let mut tail = self.tail.index.load(Ordering::Acquire);
188        let mut block = self.tail.block.load(Ordering::Acquire);
189        let mut next_block = None;
190
191        loop {
192            // Calculate the offset of the index into the block.
193            let offset = (tail >> SHIFT) % LAP;
194
195            // If we reached the end of the block, wait until the next one is installed.
196            if offset == BLOCK_CAP {
197                backoff.snooze();
198                tail = self.tail.index.load(Ordering::Acquire);
199                block = self.tail.block.load(Ordering::Acquire);
200                continue;
201            }
202
203            // If we're going to have to install the next block, allocate it in advance in order to
204            // make the wait for other threads as short as possible.
205            if offset + 1 == BLOCK_CAP && next_block.is_none() {
206                next_block = Some(Box::new(Block::<T>::new()));
207            }
208
209            // If this is the first push operation, we need to allocate the first block.
210            if block.is_null() {
211                let new = Box::into_raw(Box::new(Block::<T>::new()));
212
213                if self
214                    .tail
215                    .block
216                    .compare_and_swap(block, new, Ordering::Release)
217                    == block
218                {
219                    self.head.block.store(new, Ordering::Release);
220                    block = new;
221                } else {
222                    next_block = unsafe { Some(Box::from_raw(new)) };
223                    tail = self.tail.index.load(Ordering::Acquire);
224                    block = self.tail.block.load(Ordering::Acquire);
225                    continue;
226                }
227            }
228
229            let new_tail = tail + (1 << SHIFT);
230
231            // Try advancing the tail forward.
232            match self.tail.index.compare_exchange_weak(
233                tail,
234                new_tail,
235                Ordering::SeqCst,
236                Ordering::Acquire,
237            ) {
238                Ok(_) => unsafe {
239                    // If we've reached the end of the block, install the next one.
240                    if offset + 1 == BLOCK_CAP {
241                        let next_block = Box::into_raw(next_block.unwrap());
242                        let next_index = new_tail.wrapping_add(1 << SHIFT);
243
244                        self.tail.block.store(next_block, Ordering::Release);
245                        self.tail.index.store(next_index, Ordering::Release);
246                        (*block).next.store(next_block, Ordering::Release);
247                    }
248
249                    // Write the value into the slot.
250                    let slot = (*block).slots.get_unchecked(offset);
251                    slot.value.get().write(MaybeUninit::new(value));
252                    slot.state.fetch_or(WRITE, Ordering::Release);
253
254                    return;
255                },
256                Err(t) => {
257                    tail = t;
258                    block = self.tail.block.load(Ordering::Acquire);
259                    backoff.spin();
260                }
261            }
262        }
263    }
264
265    /// Pops an element from the queue.
266    ///
267    /// If the queue is empty, `None` is returned.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// use crossbeam_queue::SegQueue;
273    ///
274    /// let q = SegQueue::new();
275    ///
276    /// q.push(10);
277    /// assert_eq!(q.pop(), Some(10));
278    /// assert!(q.pop().is_none());
279    /// ```
280    pub fn pop(&self) -> Option<T> {
281        let backoff = Backoff::new();
282        let mut head = self.head.index.load(Ordering::Acquire);
283        let mut block = self.head.block.load(Ordering::Acquire);
284
285        loop {
286            // Calculate the offset of the index into the block.
287            let offset = (head >> SHIFT) % LAP;
288
289            // If we reached the end of the block, wait until the next one is installed.
290            if offset == BLOCK_CAP {
291                backoff.snooze();
292                head = self.head.index.load(Ordering::Acquire);
293                block = self.head.block.load(Ordering::Acquire);
294                continue;
295            }
296
297            let mut new_head = head + (1 << SHIFT);
298
299            if new_head & HAS_NEXT == 0 {
300                atomic::fence(Ordering::SeqCst);
301                let tail = self.tail.index.load(Ordering::Relaxed);
302
303                // If the tail equals the head, that means the queue is empty.
304                if head >> SHIFT == tail >> SHIFT {
305                    return None;
306                }
307
308                // If head and tail are not in the same block, set `HAS_NEXT` in head.
309                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310                    new_head |= HAS_NEXT;
311                }
312            }
313
314            // The block can be null here only if the first push operation is in progress. In that
315            // case, just wait until it gets initialized.
316            if block.is_null() {
317                backoff.snooze();
318                head = self.head.index.load(Ordering::Acquire);
319                block = self.head.block.load(Ordering::Acquire);
320                continue;
321            }
322
323            // Try moving the head index forward.
324            match self.head.index.compare_exchange_weak(
325                head,
326                new_head,
327                Ordering::SeqCst,
328                Ordering::Acquire,
329            ) {
330                Ok(_) => unsafe {
331                    // If we've reached the end of the block, move to the next one.
332                    if offset + 1 == BLOCK_CAP {
333                        let next = (*block).wait_next();
334                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335                        if !(*next).next.load(Ordering::Relaxed).is_null() {
336                            next_index |= HAS_NEXT;
337                        }
338
339                        self.head.block.store(next, Ordering::Release);
340                        self.head.index.store(next_index, Ordering::Release);
341                    }
342
343                    // Read the value.
344                    let slot = (*block).slots.get_unchecked(offset);
345                    slot.wait_write();
346                    let value = slot.value.get().read().assume_init();
347
348                    // Destroy the block if we've reached the end, or if another thread wanted to
349                    // destroy but couldn't because we were busy reading from the slot.
350                    if offset + 1 == BLOCK_CAP {
351                        Block::destroy(block, 0);
352                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353                        Block::destroy(block, offset + 1);
354                    }
355
356                    return Some(value);
357                },
358                Err(h) => {
359                    head = h;
360                    block = self.head.block.load(Ordering::Acquire);
361                    backoff.spin();
362                }
363            }
364        }
365    }
366
367    /// Returns `true` if the queue is empty.
368    ///
369    /// # Examples
370    ///
371    /// ```
372    /// use crossbeam_queue::SegQueue;
373    ///
374    /// let q = SegQueue::new();
375    ///
376    /// assert!(q.is_empty());
377    /// q.push(1);
378    /// assert!(!q.is_empty());
379    /// ```
380    pub fn is_empty(&self) -> bool {
381        let head = self.head.index.load(Ordering::SeqCst);
382        let tail = self.tail.index.load(Ordering::SeqCst);
383        head >> SHIFT == tail >> SHIFT
384    }
385
386    /// Returns the number of elements in the queue.
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// use crossbeam_queue::SegQueue;
392    ///
393    /// let q = SegQueue::new();
394    /// assert_eq!(q.len(), 0);
395    ///
396    /// q.push(10);
397    /// assert_eq!(q.len(), 1);
398    ///
399    /// q.push(20);
400    /// assert_eq!(q.len(), 2);
401    /// ```
402    pub fn len(&self) -> usize {
403        loop {
404            // Load the tail index, then load the head index.
405            let mut tail = self.tail.index.load(Ordering::SeqCst);
406            let mut head = self.head.index.load(Ordering::SeqCst);
407
408            // If the tail index didn't change, we've got consistent indices to work with.
409            if self.tail.index.load(Ordering::SeqCst) == tail {
410                // Erase the lower bits.
411                tail &= !((1 << SHIFT) - 1);
412                head &= !((1 << SHIFT) - 1);
413
414                // Fix up indices if they fall onto block ends.
415                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416                    tail = tail.wrapping_add(1 << SHIFT);
417                }
418                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419                    head = head.wrapping_add(1 << SHIFT);
420                }
421
422                // Rotate indices so that head falls into the first block.
423                let lap = (head >> SHIFT) / LAP;
424                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425                head = head.wrapping_sub((lap * LAP) << SHIFT);
426
427                // Remove the lower bits.
428                tail >>= SHIFT;
429                head >>= SHIFT;
430
431                // Return the difference minus the number of blocks between tail and head.
432                return tail - head - tail / LAP;
433            }
434        }
435    }
436}
437
438impl<T> Drop for SegQueue<T> {
439    fn drop(&mut self) {
440        let mut head = self.head.index.load(Ordering::Relaxed);
441        let mut tail = self.tail.index.load(Ordering::Relaxed);
442        let mut block = self.head.block.load(Ordering::Relaxed);
443
444        // Erase the lower bits.
445        head &= !((1 << SHIFT) - 1);
446        tail &= !((1 << SHIFT) - 1);
447
448        unsafe {
449            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450            while head != tail {
451                let offset = (head >> SHIFT) % LAP;
452
453                if offset < BLOCK_CAP {
454                    // Drop the value in the slot.
455                    let slot = (*block).slots.get_unchecked(offset);
456                    let p = &mut *slot.value.get();
457                    p.as_mut_ptr().drop_in_place();
458                } else {
459                    // Deallocate the block and move to the next one.
460                    let next = (*block).next.load(Ordering::Relaxed);
461                    drop(Box::from_raw(block));
462                    block = next;
463                }
464
465                head = head.wrapping_add(1 << SHIFT);
466            }
467
468            // Deallocate the last remaining block.
469            if !block.is_null() {
470                drop(Box::from_raw(block));
471            }
472        }
473    }
474}
475
476impl<T> fmt::Debug for SegQueue<T> {
477    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478        f.pad("SegQueue { .. }")
479    }
480}
481
482impl<T> Default for SegQueue<T> {
483    fn default() -> SegQueue<T> {
484        SegQueue::new()
485    }
486}