concurrent_queue/
bounded.rs

1use alloc::{boxed::Box, vec::Vec};
2use core::mem::MaybeUninit;
3
4use crossbeam_utils::CachePadded;
5
6use crate::sync::atomic::{AtomicUsize, Ordering};
7use crate::sync::cell::UnsafeCell;
8#[allow(unused_imports)]
9use crate::sync::prelude::*;
10use crate::{busy_wait, ForcePushError, PopError, PushError};
11
12/// A slot in a queue.
13struct Slot<T> {
14    /// The current stamp.
15    stamp: AtomicUsize,
16
17    /// The value in this slot.
18    value: UnsafeCell<MaybeUninit<T>>,
19}
20
21/// A bounded queue.
22pub struct Bounded<T> {
23    /// The head of the queue.
24    ///
25    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
26    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
27    /// represent the lap. The mark bit in the head is always zero.
28    ///
29    /// Values are popped from the head of the queue.
30    head: CachePadded<AtomicUsize>,
31
32    /// The tail of the queue.
33    ///
34    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
35    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
36    /// represent the lap. The mark bit indicates that the queue is closed.
37    ///
38    /// Values are pushed into the tail of the queue.
39    tail: CachePadded<AtomicUsize>,
40
41    /// The buffer holding slots.
42    buffer: Box<[Slot<T>]>,
43
44    /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
45    one_lap: usize,
46
47    /// If this bit is set in the tail, that means the queue is closed.
48    mark_bit: usize,
49}
50
51impl<T> Bounded<T> {
52    /// Creates a new bounded queue.
53    pub fn new(cap: usize) -> Bounded<T> {
54        assert!(cap > 0, "capacity must be positive");
55
56        // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
57        let head = 0;
58        // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
59        let tail = 0;
60
61        // Allocate a buffer of `cap` slots initialized with stamps.
62        let mut buffer = Vec::with_capacity(cap);
63        for i in 0..cap {
64            // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
65            buffer.push(Slot {
66                stamp: AtomicUsize::new(i),
67                value: UnsafeCell::new(MaybeUninit::uninit()),
68            });
69        }
70
71        // Compute constants `mark_bit` and `one_lap`.
72        let mark_bit = (cap + 1).next_power_of_two();
73        let one_lap = mark_bit * 2;
74
75        Bounded {
76            buffer: buffer.into(),
77            one_lap,
78            mark_bit,
79            head: CachePadded::new(AtomicUsize::new(head)),
80            tail: CachePadded::new(AtomicUsize::new(tail)),
81        }
82    }
83
84    /// Attempts to push an item into the queue.
85    pub fn push(&self, value: T) -> Result<(), PushError<T>> {
86        self.push_or_else(value, |value, tail, _, _| {
87            let head = self.head.load(Ordering::Relaxed);
88
89            // If the head lags one lap behind the tail as well...
90            if head.wrapping_add(self.one_lap) == tail {
91                // ...then the queue is full.
92                Err(PushError::Full(value))
93            } else {
94                Ok(value)
95            }
96        })
97    }
98
99    /// Pushes an item into the queue, displacing another item if needed.
100    pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
101        let result = self.push_or_else(value, |value, tail, new_tail, slot| {
102            let head = tail.wrapping_sub(self.one_lap);
103            let new_head = new_tail.wrapping_sub(self.one_lap);
104
105            // Try to move the head.
106            if self
107                .head
108                .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
109                .is_ok()
110            {
111                // Move the tail.
112                self.tail.store(new_tail, Ordering::SeqCst);
113
114                // Swap out the old value.
115                // SAFETY: We know this is initialized, since it's covered by the current queue.
116                let old = unsafe {
117                    slot.value
118                        .with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
119                };
120
121                // Update the stamp.
122                slot.stamp.store(tail + 1, Ordering::Release);
123
124                // Return a PushError.
125                Err(PushError::Full(old))
126            } else {
127                Ok(value)
128            }
129        });
130
131        match result {
132            Ok(()) => Ok(None),
133            Err(PushError::Full(old_value)) => Ok(Some(old_value)),
134            Err(PushError::Closed(value)) => Err(ForcePushError(value)),
135        }
136    }
137
138    /// Attempts to push an item into the queue, running a closure on failure.
139    ///
140    /// `fail` is run when there is no more room left in the tail of the queue. The parameters of
141    /// this function are as follows:
142    ///
143    /// - The item that failed to push.
144    /// - The value of `self.tail` before the new value would be inserted.
145    /// - The value of `self.tail` after the new value would be inserted.
146    /// - The slot that we attempted to push into.
147    ///
148    /// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise,
149    /// this function will return the error.
150    fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
151    where
152        F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
153    {
154        let mut tail = self.tail.load(Ordering::Relaxed);
155
156        loop {
157            // Check if the queue is closed.
158            if tail & self.mark_bit != 0 {
159                return Err(PushError::Closed(value));
160            }
161
162            // Deconstruct the tail.
163            let index = tail & (self.mark_bit - 1);
164            let lap = tail & !(self.one_lap - 1);
165
166            // Calculate the new location of the tail.
167            let new_tail = if index + 1 < self.buffer.len() {
168                // Same lap, incremented index.
169                // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
170                tail + 1
171            } else {
172                // One lap forward, index wraps around to zero.
173                // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
174                lap.wrapping_add(self.one_lap)
175            };
176
177            // Inspect the corresponding slot.
178            let slot = &self.buffer[index];
179            let stamp = slot.stamp.load(Ordering::Acquire);
180
181            // If the tail and the stamp match, we may attempt to push.
182            if tail == stamp {
183                // Try moving the tail.
184                match self.tail.compare_exchange_weak(
185                    tail,
186                    new_tail,
187                    Ordering::SeqCst,
188                    Ordering::Relaxed,
189                ) {
190                    Ok(_) => {
191                        // Write the value into the slot and update the stamp.
192                        slot.value.with_mut(|slot| unsafe {
193                            slot.write(MaybeUninit::new(value));
194                        });
195                        slot.stamp.store(tail + 1, Ordering::Release);
196                        return Ok(());
197                    }
198                    Err(t) => {
199                        tail = t;
200                    }
201                }
202            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
203                crate::full_fence();
204
205                // We've failed to push; run our failure closure.
206                value = fail(value, tail, new_tail, slot)?;
207
208                // Loom complains if there isn't an explicit busy wait here.
209                #[cfg(loom)]
210                busy_wait();
211
212                tail = self.tail.load(Ordering::Relaxed);
213            } else {
214                // Yield because we need to wait for the stamp to get updated.
215                busy_wait();
216                tail = self.tail.load(Ordering::Relaxed);
217            }
218        }
219    }
220
221    /// Attempts to pop an item from the queue.
222    pub fn pop(&self) -> Result<T, PopError> {
223        let mut head = self.head.load(Ordering::Relaxed);
224
225        loop {
226            // Deconstruct the head.
227            let index = head & (self.mark_bit - 1);
228            let lap = head & !(self.one_lap - 1);
229
230            // Inspect the corresponding slot.
231            let slot = &self.buffer[index];
232            let stamp = slot.stamp.load(Ordering::Acquire);
233
234            // If the the stamp is ahead of the head by 1, we may attempt to pop.
235            if head + 1 == stamp {
236                let new = if index + 1 < self.buffer.len() {
237                    // Same lap, incremented index.
238                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
239                    head + 1
240                } else {
241                    // One lap forward, index wraps around to zero.
242                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
243                    lap.wrapping_add(self.one_lap)
244                };
245
246                // Try moving the head.
247                match self.head.compare_exchange_weak(
248                    head,
249                    new,
250                    Ordering::SeqCst,
251                    Ordering::Relaxed,
252                ) {
253                    Ok(_) => {
254                        // Read the value from the slot and update the stamp.
255                        let value = slot
256                            .value
257                            .with_mut(|slot| unsafe { slot.read().assume_init() });
258                        slot.stamp
259                            .store(head.wrapping_add(self.one_lap), Ordering::Release);
260                        return Ok(value);
261                    }
262                    Err(h) => {
263                        head = h;
264                    }
265                }
266            } else if stamp == head {
267                crate::full_fence();
268                let tail = self.tail.load(Ordering::Relaxed);
269
270                // If the tail equals the head, that means the queue is empty.
271                if (tail & !self.mark_bit) == head {
272                    // Check if the queue is closed.
273                    if tail & self.mark_bit != 0 {
274                        return Err(PopError::Closed);
275                    } else {
276                        return Err(PopError::Empty);
277                    }
278                }
279
280                // Loom complains if there isn't a busy-wait here.
281                #[cfg(loom)]
282                busy_wait();
283
284                head = self.head.load(Ordering::Relaxed);
285            } else {
286                // Yield because we need to wait for the stamp to get updated.
287                busy_wait();
288                head = self.head.load(Ordering::Relaxed);
289            }
290        }
291    }
292
293    /// Returns the number of items in the queue.
294    pub fn len(&self) -> usize {
295        loop {
296            // Load the tail, then load the head.
297            let tail = self.tail.load(Ordering::SeqCst);
298            let head = self.head.load(Ordering::SeqCst);
299
300            // If the tail didn't change, we've got consistent values to work with.
301            if self.tail.load(Ordering::SeqCst) == tail {
302                let hix = head & (self.mark_bit - 1);
303                let tix = tail & (self.mark_bit - 1);
304
305                return if hix < tix {
306                    tix - hix
307                } else if hix > tix {
308                    self.buffer.len() - hix + tix
309                } else if (tail & !self.mark_bit) == head {
310                    0
311                } else {
312                    self.buffer.len()
313                };
314            }
315        }
316    }
317
318    /// Returns `true` if the queue is empty.
319    pub fn is_empty(&self) -> bool {
320        let head = self.head.load(Ordering::SeqCst);
321        let tail = self.tail.load(Ordering::SeqCst);
322
323        // Is the tail equal to the head?
324        //
325        // Note: If the head changes just before we load the tail, that means there was a moment
326        // when the queue was not empty, so it is safe to just return `false`.
327        (tail & !self.mark_bit) == head
328    }
329
330    /// Returns `true` if the queue is full.
331    pub fn is_full(&self) -> bool {
332        let tail = self.tail.load(Ordering::SeqCst);
333        let head = self.head.load(Ordering::SeqCst);
334
335        // Is the head lagging one lap behind tail?
336        //
337        // Note: If the tail changes just before we load the head, that means there was a moment
338        // when the queue was not full, so it is safe to just return `false`.
339        head.wrapping_add(self.one_lap) == tail & !self.mark_bit
340    }
341
342    /// Returns the capacity of the queue.
343    pub fn capacity(&self) -> usize {
344        self.buffer.len()
345    }
346
347    /// Closes the queue.
348    ///
349    /// Returns `true` if this call closed the queue.
350    pub fn close(&self) -> bool {
351        let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
352        tail & self.mark_bit == 0
353    }
354
355    /// Returns `true` if the queue is closed.
356    pub fn is_closed(&self) -> bool {
357        self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
358    }
359}
360
361impl<T> Drop for Bounded<T> {
362    fn drop(&mut self) {
363        // Get the index of the head.
364        let Self {
365            head,
366            tail,
367            buffer,
368            mark_bit,
369            ..
370        } = self;
371
372        let mark_bit = *mark_bit;
373
374        head.with_mut(|&mut head| {
375            tail.with_mut(|&mut tail| {
376                let hix = head & (mark_bit - 1);
377                let tix = tail & (mark_bit - 1);
378
379                let len = if hix < tix {
380                    tix - hix
381                } else if hix > tix {
382                    buffer.len() - hix + tix
383                } else if (tail & !mark_bit) == head {
384                    0
385                } else {
386                    buffer.len()
387                };
388
389                // Loop over all slots that hold a value and drop them.
390                for i in 0..len {
391                    // Compute the index of the next slot holding a value.
392                    let index = if hix + i < buffer.len() {
393                        hix + i
394                    } else {
395                        hix + i - buffer.len()
396                    };
397
398                    // Drop the value in the slot.
399                    let slot = &buffer[index];
400                    slot.value.with_mut(|slot| unsafe {
401                        let value = &mut *slot;
402                        value.as_mut_ptr().drop_in_place();
403                    });
404                }
405            });
406        });
407    }
408}