concurrent_queue/bounded.rs
1use alloc::{boxed::Box, vec::Vec};
2use core::mem::MaybeUninit;
3
4use crossbeam_utils::CachePadded;
5
6use crate::sync::atomic::{AtomicUsize, Ordering};
7use crate::sync::cell::UnsafeCell;
8#[allow(unused_imports)]
9use crate::sync::prelude::*;
10use crate::{busy_wait, ForcePushError, PopError, PushError};
11
12/// A slot in a queue.
13struct Slot<T> {
14 /// The current stamp.
15 stamp: AtomicUsize,
16
17 /// The value in this slot.
18 value: UnsafeCell<MaybeUninit<T>>,
19}
20
21/// A bounded queue.
22pub struct Bounded<T> {
23 /// The head of the queue.
24 ///
25 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
26 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
27 /// represent the lap. The mark bit in the head is always zero.
28 ///
29 /// Values are popped from the head of the queue.
30 head: CachePadded<AtomicUsize>,
31
32 /// The tail of the queue.
33 ///
34 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
35 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
36 /// represent the lap. The mark bit indicates that the queue is closed.
37 ///
38 /// Values are pushed into the tail of the queue.
39 tail: CachePadded<AtomicUsize>,
40
41 /// The buffer holding slots.
42 buffer: Box<[Slot<T>]>,
43
44 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
45 one_lap: usize,
46
47 /// If this bit is set in the tail, that means the queue is closed.
48 mark_bit: usize,
49}
50
51impl<T> Bounded<T> {
52 /// Creates a new bounded queue.
53 pub fn new(cap: usize) -> Bounded<T> {
54 assert!(cap > 0, "capacity must be positive");
55
56 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
57 let head = 0;
58 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
59 let tail = 0;
60
61 // Allocate a buffer of `cap` slots initialized with stamps.
62 let mut buffer = Vec::with_capacity(cap);
63 for i in 0..cap {
64 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
65 buffer.push(Slot {
66 stamp: AtomicUsize::new(i),
67 value: UnsafeCell::new(MaybeUninit::uninit()),
68 });
69 }
70
71 // Compute constants `mark_bit` and `one_lap`.
72 let mark_bit = (cap + 1).next_power_of_two();
73 let one_lap = mark_bit * 2;
74
75 Bounded {
76 buffer: buffer.into(),
77 one_lap,
78 mark_bit,
79 head: CachePadded::new(AtomicUsize::new(head)),
80 tail: CachePadded::new(AtomicUsize::new(tail)),
81 }
82 }
83
84 /// Attempts to push an item into the queue.
85 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
86 self.push_or_else(value, |value, tail, _, _| {
87 let head = self.head.load(Ordering::Relaxed);
88
89 // If the head lags one lap behind the tail as well...
90 if head.wrapping_add(self.one_lap) == tail {
91 // ...then the queue is full.
92 Err(PushError::Full(value))
93 } else {
94 Ok(value)
95 }
96 })
97 }
98
99 /// Pushes an item into the queue, displacing another item if needed.
100 pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
101 let result = self.push_or_else(value, |value, tail, new_tail, slot| {
102 let head = tail.wrapping_sub(self.one_lap);
103 let new_head = new_tail.wrapping_sub(self.one_lap);
104
105 // Try to move the head.
106 if self
107 .head
108 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
109 .is_ok()
110 {
111 // Move the tail.
112 self.tail.store(new_tail, Ordering::SeqCst);
113
114 // Swap out the old value.
115 // SAFETY: We know this is initialized, since it's covered by the current queue.
116 let old = unsafe {
117 slot.value
118 .with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
119 };
120
121 // Update the stamp.
122 slot.stamp.store(tail + 1, Ordering::Release);
123
124 // Return a PushError.
125 Err(PushError::Full(old))
126 } else {
127 Ok(value)
128 }
129 });
130
131 match result {
132 Ok(()) => Ok(None),
133 Err(PushError::Full(old_value)) => Ok(Some(old_value)),
134 Err(PushError::Closed(value)) => Err(ForcePushError(value)),
135 }
136 }
137
138 /// Attempts to push an item into the queue, running a closure on failure.
139 ///
140 /// `fail` is run when there is no more room left in the tail of the queue. The parameters of
141 /// this function are as follows:
142 ///
143 /// - The item that failed to push.
144 /// - The value of `self.tail` before the new value would be inserted.
145 /// - The value of `self.tail` after the new value would be inserted.
146 /// - The slot that we attempted to push into.
147 ///
148 /// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise,
149 /// this function will return the error.
150 fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
151 where
152 F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
153 {
154 let mut tail = self.tail.load(Ordering::Relaxed);
155
156 loop {
157 // Check if the queue is closed.
158 if tail & self.mark_bit != 0 {
159 return Err(PushError::Closed(value));
160 }
161
162 // Deconstruct the tail.
163 let index = tail & (self.mark_bit - 1);
164 let lap = tail & !(self.one_lap - 1);
165
166 // Calculate the new location of the tail.
167 let new_tail = if index + 1 < self.buffer.len() {
168 // Same lap, incremented index.
169 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
170 tail + 1
171 } else {
172 // One lap forward, index wraps around to zero.
173 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
174 lap.wrapping_add(self.one_lap)
175 };
176
177 // Inspect the corresponding slot.
178 let slot = &self.buffer[index];
179 let stamp = slot.stamp.load(Ordering::Acquire);
180
181 // If the tail and the stamp match, we may attempt to push.
182 if tail == stamp {
183 // Try moving the tail.
184 match self.tail.compare_exchange_weak(
185 tail,
186 new_tail,
187 Ordering::SeqCst,
188 Ordering::Relaxed,
189 ) {
190 Ok(_) => {
191 // Write the value into the slot and update the stamp.
192 slot.value.with_mut(|slot| unsafe {
193 slot.write(MaybeUninit::new(value));
194 });
195 slot.stamp.store(tail + 1, Ordering::Release);
196 return Ok(());
197 }
198 Err(t) => {
199 tail = t;
200 }
201 }
202 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
203 crate::full_fence();
204
205 // We've failed to push; run our failure closure.
206 value = fail(value, tail, new_tail, slot)?;
207
208 // Loom complains if there isn't an explicit busy wait here.
209 #[cfg(loom)]
210 busy_wait();
211
212 tail = self.tail.load(Ordering::Relaxed);
213 } else {
214 // Yield because we need to wait for the stamp to get updated.
215 busy_wait();
216 tail = self.tail.load(Ordering::Relaxed);
217 }
218 }
219 }
220
221 /// Attempts to pop an item from the queue.
222 pub fn pop(&self) -> Result<T, PopError> {
223 let mut head = self.head.load(Ordering::Relaxed);
224
225 loop {
226 // Deconstruct the head.
227 let index = head & (self.mark_bit - 1);
228 let lap = head & !(self.one_lap - 1);
229
230 // Inspect the corresponding slot.
231 let slot = &self.buffer[index];
232 let stamp = slot.stamp.load(Ordering::Acquire);
233
234 // If the the stamp is ahead of the head by 1, we may attempt to pop.
235 if head + 1 == stamp {
236 let new = if index + 1 < self.buffer.len() {
237 // Same lap, incremented index.
238 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
239 head + 1
240 } else {
241 // One lap forward, index wraps around to zero.
242 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
243 lap.wrapping_add(self.one_lap)
244 };
245
246 // Try moving the head.
247 match self.head.compare_exchange_weak(
248 head,
249 new,
250 Ordering::SeqCst,
251 Ordering::Relaxed,
252 ) {
253 Ok(_) => {
254 // Read the value from the slot and update the stamp.
255 let value = slot
256 .value
257 .with_mut(|slot| unsafe { slot.read().assume_init() });
258 slot.stamp
259 .store(head.wrapping_add(self.one_lap), Ordering::Release);
260 return Ok(value);
261 }
262 Err(h) => {
263 head = h;
264 }
265 }
266 } else if stamp == head {
267 crate::full_fence();
268 let tail = self.tail.load(Ordering::Relaxed);
269
270 // If the tail equals the head, that means the queue is empty.
271 if (tail & !self.mark_bit) == head {
272 // Check if the queue is closed.
273 if tail & self.mark_bit != 0 {
274 return Err(PopError::Closed);
275 } else {
276 return Err(PopError::Empty);
277 }
278 }
279
280 // Loom complains if there isn't a busy-wait here.
281 #[cfg(loom)]
282 busy_wait();
283
284 head = self.head.load(Ordering::Relaxed);
285 } else {
286 // Yield because we need to wait for the stamp to get updated.
287 busy_wait();
288 head = self.head.load(Ordering::Relaxed);
289 }
290 }
291 }
292
293 /// Returns the number of items in the queue.
294 pub fn len(&self) -> usize {
295 loop {
296 // Load the tail, then load the head.
297 let tail = self.tail.load(Ordering::SeqCst);
298 let head = self.head.load(Ordering::SeqCst);
299
300 // If the tail didn't change, we've got consistent values to work with.
301 if self.tail.load(Ordering::SeqCst) == tail {
302 let hix = head & (self.mark_bit - 1);
303 let tix = tail & (self.mark_bit - 1);
304
305 return if hix < tix {
306 tix - hix
307 } else if hix > tix {
308 self.buffer.len() - hix + tix
309 } else if (tail & !self.mark_bit) == head {
310 0
311 } else {
312 self.buffer.len()
313 };
314 }
315 }
316 }
317
318 /// Returns `true` if the queue is empty.
319 pub fn is_empty(&self) -> bool {
320 let head = self.head.load(Ordering::SeqCst);
321 let tail = self.tail.load(Ordering::SeqCst);
322
323 // Is the tail equal to the head?
324 //
325 // Note: If the head changes just before we load the tail, that means there was a moment
326 // when the queue was not empty, so it is safe to just return `false`.
327 (tail & !self.mark_bit) == head
328 }
329
330 /// Returns `true` if the queue is full.
331 pub fn is_full(&self) -> bool {
332 let tail = self.tail.load(Ordering::SeqCst);
333 let head = self.head.load(Ordering::SeqCst);
334
335 // Is the head lagging one lap behind tail?
336 //
337 // Note: If the tail changes just before we load the head, that means there was a moment
338 // when the queue was not full, so it is safe to just return `false`.
339 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
340 }
341
342 /// Returns the capacity of the queue.
343 pub fn capacity(&self) -> usize {
344 self.buffer.len()
345 }
346
347 /// Closes the queue.
348 ///
349 /// Returns `true` if this call closed the queue.
350 pub fn close(&self) -> bool {
351 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
352 tail & self.mark_bit == 0
353 }
354
355 /// Returns `true` if the queue is closed.
356 pub fn is_closed(&self) -> bool {
357 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
358 }
359}
360
361impl<T> Drop for Bounded<T> {
362 fn drop(&mut self) {
363 // Get the index of the head.
364 let Self {
365 head,
366 tail,
367 buffer,
368 mark_bit,
369 ..
370 } = self;
371
372 let mark_bit = *mark_bit;
373
374 head.with_mut(|&mut head| {
375 tail.with_mut(|&mut tail| {
376 let hix = head & (mark_bit - 1);
377 let tix = tail & (mark_bit - 1);
378
379 let len = if hix < tix {
380 tix - hix
381 } else if hix > tix {
382 buffer.len() - hix + tix
383 } else if (tail & !mark_bit) == head {
384 0
385 } else {
386 buffer.len()
387 };
388
389 // Loop over all slots that hold a value and drop them.
390 for i in 0..len {
391 // Compute the index of the next slot holding a value.
392 let index = if hix + i < buffer.len() {
393 hix + i
394 } else {
395 hix + i - buffer.len()
396 };
397
398 // Drop the value in the slot.
399 let slot = &buffer[index];
400 slot.value.with_mut(|slot| unsafe {
401 let value = &mut *slot;
402 value.as_mut_ptr().drop_in_place();
403 });
404 }
405 });
406 });
407 }
408}