concurrent_queue/
unbounded.rs

1use alloc::boxed::Box;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5use crossbeam_utils::CachePadded;
6
7use crate::const_fn;
8use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use crate::sync::cell::UnsafeCell;
10#[allow(unused_imports)]
11use crate::sync::prelude::*;
12use crate::{busy_wait, PopError, PushError};
13
14// Bits indicating the state of a slot:
15// * If a value has been written into the slot, `WRITE` is set.
16// * If a value has been read from the slot, `READ` is set.
17// * If the block is being destroyed, `DESTROY` is set.
18const WRITE: usize = 1;
19const READ: usize = 2;
20const DESTROY: usize = 4;
21
22// Each block covers one "lap" of indices.
23const LAP: usize = 32;
24// The maximum number of items a block can hold.
25const BLOCK_CAP: usize = LAP - 1;
26// How many lower bits are reserved for metadata.
27const SHIFT: usize = 1;
28// Has two different purposes:
29// * If set in head, indicates that the block is not the last one.
30// * If set in tail, indicates that the queue is closed.
31const MARK_BIT: usize = 1;
32
33/// A slot in a block.
34struct Slot<T> {
35    /// The value.
36    value: UnsafeCell<MaybeUninit<T>>,
37
38    /// The state of the slot.
39    state: AtomicUsize,
40}
41
42impl<T> Slot<T> {
43    #[cfg(not(loom))]
44    const UNINIT: Slot<T> = Slot {
45        value: UnsafeCell::new(MaybeUninit::uninit()),
46        state: AtomicUsize::new(0),
47    };
48
49    #[cfg(not(loom))]
50    fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
51        [Self::UNINIT; BLOCK_CAP]
52    }
53
54    #[cfg(loom)]
55    fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
56        // Repeat this expression 31 times.
57        // Update if we change BLOCK_CAP
58        macro_rules! repeat_31 {
59            ($e: expr) => {
60                [
61                    $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
62                    $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
63                ]
64            };
65        }
66
67        repeat_31!(Slot {
68            value: UnsafeCell::new(MaybeUninit::uninit()),
69            state: AtomicUsize::new(0),
70        })
71    }
72
73    /// Waits until a value is written into the slot.
74    fn wait_write(&self) {
75        while self.state.load(Ordering::Acquire) & WRITE == 0 {
76            busy_wait();
77        }
78    }
79}
80
81/// A block in a linked list.
82///
83/// Each block in the list can hold up to `BLOCK_CAP` values.
84struct Block<T> {
85    /// The next block in the linked list.
86    next: AtomicPtr<Block<T>>,
87
88    /// Slots for values.
89    slots: [Slot<T>; BLOCK_CAP],
90}
91
92impl<T> Block<T> {
93    /// Creates an empty block.
94    fn new() -> Block<T> {
95        Block {
96            next: AtomicPtr::new(ptr::null_mut()),
97            slots: Slot::uninit_block(),
98        }
99    }
100
101    /// Waits until the next pointer is set.
102    fn wait_next(&self) -> *mut Block<T> {
103        loop {
104            let next = self.next.load(Ordering::Acquire);
105            if !next.is_null() {
106                return next;
107            }
108            busy_wait();
109        }
110    }
111
112    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
113    unsafe fn destroy(this: *mut Block<T>, start: usize) {
114        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
115        // begun destruction of the block.
116        for i in start..BLOCK_CAP - 1 {
117            let slot = (*this).slots.get_unchecked(i);
118
119            // Mark the `DESTROY` bit if a thread is still using the slot.
120            if slot.state.load(Ordering::Acquire) & READ == 0
121                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
122            {
123                // If a thread is still using the slot, it will continue destruction of the block.
124                return;
125            }
126        }
127
128        // No thread is using the block, now it is safe to destroy it.
129        drop(Box::from_raw(this));
130    }
131}
132
133/// A position in a queue.
134struct Position<T> {
135    /// The index in the queue.
136    index: AtomicUsize,
137
138    /// The block in the linked list.
139    block: AtomicPtr<Block<T>>,
140}
141
142/// An unbounded queue.
143pub struct Unbounded<T> {
144    /// The head of the queue.
145    head: CachePadded<Position<T>>,
146
147    /// The tail of the queue.
148    tail: CachePadded<Position<T>>,
149}
150
151impl<T> Unbounded<T> {
152    const_fn!(
153        const_if: #[cfg(not(loom))];
154        /// Creates a new unbounded queue.
155        pub const fn new() -> Unbounded<T> {
156            Unbounded {
157                head: CachePadded::new(Position {
158                    block: AtomicPtr::new(ptr::null_mut()),
159                    index: AtomicUsize::new(0),
160                }),
161                tail: CachePadded::new(Position {
162                    block: AtomicPtr::new(ptr::null_mut()),
163                    index: AtomicUsize::new(0),
164                }),
165            }
166        }
167    );
168
169    /// Pushes an item into the queue.
170    pub fn push(&self, value: T) -> Result<(), PushError<T>> {
171        let mut tail = self.tail.index.load(Ordering::Acquire);
172        let mut block = self.tail.block.load(Ordering::Acquire);
173        let mut next_block = None;
174
175        loop {
176            // Check if the queue is closed.
177            if tail & MARK_BIT != 0 {
178                return Err(PushError::Closed(value));
179            }
180
181            // Calculate the offset of the index into the block.
182            let offset = (tail >> SHIFT) % LAP;
183
184            // If we reached the end of the block, wait until the next one is installed.
185            if offset == BLOCK_CAP {
186                busy_wait();
187                tail = self.tail.index.load(Ordering::Acquire);
188                block = self.tail.block.load(Ordering::Acquire);
189                continue;
190            }
191
192            // If we're going to have to install the next block, allocate it in advance in order to
193            // make the wait for other threads as short as possible.
194            if offset + 1 == BLOCK_CAP && next_block.is_none() {
195                next_block = Some(Box::new(Block::<T>::new()));
196            }
197
198            // If this is the first value to be pushed into the queue, we need to allocate the
199            // first block and install it.
200            if block.is_null() {
201                let new = Box::into_raw(Box::new(Block::<T>::new()));
202
203                if self
204                    .tail
205                    .block
206                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
207                    .is_ok()
208                {
209                    self.head.block.store(new, Ordering::Release);
210                    block = new;
211                } else {
212                    next_block = unsafe { Some(Box::from_raw(new)) };
213                    tail = self.tail.index.load(Ordering::Acquire);
214                    block = self.tail.block.load(Ordering::Acquire);
215                    continue;
216                }
217            }
218
219            let new_tail = tail + (1 << SHIFT);
220
221            // Try advancing the tail forward.
222            match self.tail.index.compare_exchange_weak(
223                tail,
224                new_tail,
225                Ordering::SeqCst,
226                Ordering::Acquire,
227            ) {
228                Ok(_) => unsafe {
229                    // If we've reached the end of the block, install the next one.
230                    if offset + 1 == BLOCK_CAP {
231                        let next_block = Box::into_raw(next_block.unwrap());
232                        self.tail.block.store(next_block, Ordering::Release);
233                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
234                        (*block).next.store(next_block, Ordering::Release);
235                    }
236
237                    // Write the value into the slot.
238                    let slot = (*block).slots.get_unchecked(offset);
239                    slot.value.with_mut(|slot| {
240                        slot.write(MaybeUninit::new(value));
241                    });
242                    slot.state.fetch_or(WRITE, Ordering::Release);
243                    return Ok(());
244                },
245                Err(t) => {
246                    tail = t;
247                    block = self.tail.block.load(Ordering::Acquire);
248                }
249            }
250        }
251    }
252
253    /// Pops an item from the queue.
254    pub fn pop(&self) -> Result<T, PopError> {
255        let mut head = self.head.index.load(Ordering::Acquire);
256        let mut block = self.head.block.load(Ordering::Acquire);
257
258        loop {
259            // Calculate the offset of the index into the block.
260            let offset = (head >> SHIFT) % LAP;
261
262            // If we reached the end of the block, wait until the next one is installed.
263            if offset == BLOCK_CAP {
264                busy_wait();
265                head = self.head.index.load(Ordering::Acquire);
266                block = self.head.block.load(Ordering::Acquire);
267                continue;
268            }
269
270            let mut new_head = head + (1 << SHIFT);
271
272            if new_head & MARK_BIT == 0 {
273                crate::full_fence();
274                let tail = self.tail.index.load(Ordering::Relaxed);
275
276                // If the tail equals the head, that means the queue is empty.
277                if head >> SHIFT == tail >> SHIFT {
278                    // Check if the queue is closed.
279                    if tail & MARK_BIT != 0 {
280                        return Err(PopError::Closed);
281                    } else {
282                        return Err(PopError::Empty);
283                    }
284                }
285
286                // If head and tail are not in the same block, set `MARK_BIT` in head.
287                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
288                    new_head |= MARK_BIT;
289                }
290            }
291
292            // The block can be null here only if the first push operation is in progress.
293            if block.is_null() {
294                busy_wait();
295                head = self.head.index.load(Ordering::Acquire);
296                block = self.head.block.load(Ordering::Acquire);
297                continue;
298            }
299
300            // Try moving the head index forward.
301            match self.head.index.compare_exchange_weak(
302                head,
303                new_head,
304                Ordering::SeqCst,
305                Ordering::Acquire,
306            ) {
307                Ok(_) => unsafe {
308                    // If we've reached the end of the block, move to the next one.
309                    if offset + 1 == BLOCK_CAP {
310                        let next = (*block).wait_next();
311                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
312                        if !(*next).next.load(Ordering::Relaxed).is_null() {
313                            next_index |= MARK_BIT;
314                        }
315
316                        self.head.block.store(next, Ordering::Release);
317                        self.head.index.store(next_index, Ordering::Release);
318                    }
319
320                    // Read the value.
321                    let slot = (*block).slots.get_unchecked(offset);
322                    slot.wait_write();
323                    let value = slot.value.with_mut(|slot| slot.read().assume_init());
324
325                    // Destroy the block if we've reached the end, or if another thread wanted to
326                    // destroy but couldn't because we were busy reading from the slot.
327                    if offset + 1 == BLOCK_CAP {
328                        Block::destroy(block, 0);
329                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
330                        Block::destroy(block, offset + 1);
331                    }
332
333                    return Ok(value);
334                },
335                Err(h) => {
336                    head = h;
337                    block = self.head.block.load(Ordering::Acquire);
338                }
339            }
340        }
341    }
342
343    /// Returns the number of items in the queue.
344    pub fn len(&self) -> usize {
345        loop {
346            // Load the tail index, then load the head index.
347            let mut tail = self.tail.index.load(Ordering::SeqCst);
348            let mut head = self.head.index.load(Ordering::SeqCst);
349
350            // If the tail index didn't change, we've got consistent indices to work with.
351            if self.tail.index.load(Ordering::SeqCst) == tail {
352                // Erase the lower bits.
353                tail &= !((1 << SHIFT) - 1);
354                head &= !((1 << SHIFT) - 1);
355
356                // Fix up indices if they fall onto block ends.
357                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
358                    tail = tail.wrapping_add(1 << SHIFT);
359                }
360                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
361                    head = head.wrapping_add(1 << SHIFT);
362                }
363
364                // Rotate indices so that head falls into the first block.
365                let lap = (head >> SHIFT) / LAP;
366                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
367                head = head.wrapping_sub((lap * LAP) << SHIFT);
368
369                // Remove the lower bits.
370                tail >>= SHIFT;
371                head >>= SHIFT;
372
373                // Return the difference minus the number of blocks between tail and head.
374                return tail - head - tail / LAP;
375            }
376        }
377    }
378
379    /// Returns `true` if the queue is empty.
380    pub fn is_empty(&self) -> bool {
381        let head = self.head.index.load(Ordering::SeqCst);
382        let tail = self.tail.index.load(Ordering::SeqCst);
383        head >> SHIFT == tail >> SHIFT
384    }
385
386    /// Returns `true` if the queue is full.
387    pub fn is_full(&self) -> bool {
388        false
389    }
390
391    /// Closes the queue.
392    ///
393    /// Returns `true` if this call closed the queue.
394    pub fn close(&self) -> bool {
395        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
396        tail & MARK_BIT == 0
397    }
398
399    /// Returns `true` if the queue is closed.
400    pub fn is_closed(&self) -> bool {
401        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
402    }
403}
404
405impl<T> Drop for Unbounded<T> {
406    fn drop(&mut self) {
407        let Self { head, tail } = self;
408        let Position { index: head, block } = &mut **head;
409
410        head.with_mut(|&mut mut head| {
411            tail.index.with_mut(|&mut mut tail| {
412                // Erase the lower bits.
413                head &= !((1 << SHIFT) - 1);
414                tail &= !((1 << SHIFT) - 1);
415
416                unsafe {
417                    // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
418                    while head != tail {
419                        let offset = (head >> SHIFT) % LAP;
420
421                        if offset < BLOCK_CAP {
422                            // Drop the value in the slot.
423                            block.with_mut(|block| {
424                                let slot = (**block).slots.get_unchecked(offset);
425                                slot.value.with_mut(|slot| {
426                                    let value = &mut *slot;
427                                    value.as_mut_ptr().drop_in_place();
428                                });
429                            });
430                        } else {
431                            // Deallocate the block and move to the next one.
432                            block.with_mut(|block| {
433                                let next_block = (**block).next.with_mut(|next| *next);
434                                drop(Box::from_raw(*block));
435                                *block = next_block;
436                            });
437                        }
438
439                        head = head.wrapping_add(1 << SHIFT);
440                    }
441
442                    // Deallocate the last remaining block.
443                    block.with_mut(|block| {
444                        if !block.is_null() {
445                            drop(Box::from_raw(*block));
446                        }
447                    });
448                }
449            });
450        });
451    }
452}