1use alloc::boxed::Box;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5use crossbeam_utils::CachePadded;
6
7use crate::const_fn;
8use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use crate::sync::cell::UnsafeCell;
10#[allow(unused_imports)]
11use crate::sync::prelude::*;
12use crate::{busy_wait, PopError, PushError};
13
14const WRITE: usize = 1;
19const READ: usize = 2;
20const DESTROY: usize = 4;
21
22const LAP: usize = 32;
24const BLOCK_CAP: usize = LAP - 1;
26const SHIFT: usize = 1;
28const MARK_BIT: usize = 1;
32
33struct Slot<T> {
35 value: UnsafeCell<MaybeUninit<T>>,
37
38 state: AtomicUsize,
40}
41
42impl<T> Slot<T> {
43 #[cfg(not(loom))]
44 const UNINIT: Slot<T> = Slot {
45 value: UnsafeCell::new(MaybeUninit::uninit()),
46 state: AtomicUsize::new(0),
47 };
48
49 #[cfg(not(loom))]
50 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
51 [Self::UNINIT; BLOCK_CAP]
52 }
53
54 #[cfg(loom)]
55 fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
56 macro_rules! repeat_31 {
59 ($e: expr) => {
60 [
61 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
62 $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
63 ]
64 };
65 }
66
67 repeat_31!(Slot {
68 value: UnsafeCell::new(MaybeUninit::uninit()),
69 state: AtomicUsize::new(0),
70 })
71 }
72
73 fn wait_write(&self) {
75 while self.state.load(Ordering::Acquire) & WRITE == 0 {
76 busy_wait();
77 }
78 }
79}
80
81struct Block<T> {
85 next: AtomicPtr<Block<T>>,
87
88 slots: [Slot<T>; BLOCK_CAP],
90}
91
92impl<T> Block<T> {
93 fn new() -> Block<T> {
95 Block {
96 next: AtomicPtr::new(ptr::null_mut()),
97 slots: Slot::uninit_block(),
98 }
99 }
100
101 fn wait_next(&self) -> *mut Block<T> {
103 loop {
104 let next = self.next.load(Ordering::Acquire);
105 if !next.is_null() {
106 return next;
107 }
108 busy_wait();
109 }
110 }
111
112 unsafe fn destroy(this: *mut Block<T>, start: usize) {
114 for i in start..BLOCK_CAP - 1 {
117 let slot = (*this).slots.get_unchecked(i);
118
119 if slot.state.load(Ordering::Acquire) & READ == 0
121 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
122 {
123 return;
125 }
126 }
127
128 drop(Box::from_raw(this));
130 }
131}
132
133struct Position<T> {
135 index: AtomicUsize,
137
138 block: AtomicPtr<Block<T>>,
140}
141
142pub struct Unbounded<T> {
144 head: CachePadded<Position<T>>,
146
147 tail: CachePadded<Position<T>>,
149}
150
151impl<T> Unbounded<T> {
152 const_fn!(
153 const_if: #[cfg(not(loom))];
154 pub const fn new() -> Unbounded<T> {
156 Unbounded {
157 head: CachePadded::new(Position {
158 block: AtomicPtr::new(ptr::null_mut()),
159 index: AtomicUsize::new(0),
160 }),
161 tail: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 }
166 }
167 );
168
169 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
171 let mut tail = self.tail.index.load(Ordering::Acquire);
172 let mut block = self.tail.block.load(Ordering::Acquire);
173 let mut next_block = None;
174
175 loop {
176 if tail & MARK_BIT != 0 {
178 return Err(PushError::Closed(value));
179 }
180
181 let offset = (tail >> SHIFT) % LAP;
183
184 if offset == BLOCK_CAP {
186 busy_wait();
187 tail = self.tail.index.load(Ordering::Acquire);
188 block = self.tail.block.load(Ordering::Acquire);
189 continue;
190 }
191
192 if offset + 1 == BLOCK_CAP && next_block.is_none() {
195 next_block = Some(Box::new(Block::<T>::new()));
196 }
197
198 if block.is_null() {
201 let new = Box::into_raw(Box::new(Block::<T>::new()));
202
203 if self
204 .tail
205 .block
206 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
207 .is_ok()
208 {
209 self.head.block.store(new, Ordering::Release);
210 block = new;
211 } else {
212 next_block = unsafe { Some(Box::from_raw(new)) };
213 tail = self.tail.index.load(Ordering::Acquire);
214 block = self.tail.block.load(Ordering::Acquire);
215 continue;
216 }
217 }
218
219 let new_tail = tail + (1 << SHIFT);
220
221 match self.tail.index.compare_exchange_weak(
223 tail,
224 new_tail,
225 Ordering::SeqCst,
226 Ordering::Acquire,
227 ) {
228 Ok(_) => unsafe {
229 if offset + 1 == BLOCK_CAP {
231 let next_block = Box::into_raw(next_block.unwrap());
232 self.tail.block.store(next_block, Ordering::Release);
233 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
234 (*block).next.store(next_block, Ordering::Release);
235 }
236
237 let slot = (*block).slots.get_unchecked(offset);
239 slot.value.with_mut(|slot| {
240 slot.write(MaybeUninit::new(value));
241 });
242 slot.state.fetch_or(WRITE, Ordering::Release);
243 return Ok(());
244 },
245 Err(t) => {
246 tail = t;
247 block = self.tail.block.load(Ordering::Acquire);
248 }
249 }
250 }
251 }
252
253 pub fn pop(&self) -> Result<T, PopError> {
255 let mut head = self.head.index.load(Ordering::Acquire);
256 let mut block = self.head.block.load(Ordering::Acquire);
257
258 loop {
259 let offset = (head >> SHIFT) % LAP;
261
262 if offset == BLOCK_CAP {
264 busy_wait();
265 head = self.head.index.load(Ordering::Acquire);
266 block = self.head.block.load(Ordering::Acquire);
267 continue;
268 }
269
270 let mut new_head = head + (1 << SHIFT);
271
272 if new_head & MARK_BIT == 0 {
273 crate::full_fence();
274 let tail = self.tail.index.load(Ordering::Relaxed);
275
276 if head >> SHIFT == tail >> SHIFT {
278 if tail & MARK_BIT != 0 {
280 return Err(PopError::Closed);
281 } else {
282 return Err(PopError::Empty);
283 }
284 }
285
286 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
288 new_head |= MARK_BIT;
289 }
290 }
291
292 if block.is_null() {
294 busy_wait();
295 head = self.head.index.load(Ordering::Acquire);
296 block = self.head.block.load(Ordering::Acquire);
297 continue;
298 }
299
300 match self.head.index.compare_exchange_weak(
302 head,
303 new_head,
304 Ordering::SeqCst,
305 Ordering::Acquire,
306 ) {
307 Ok(_) => unsafe {
308 if offset + 1 == BLOCK_CAP {
310 let next = (*block).wait_next();
311 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
312 if !(*next).next.load(Ordering::Relaxed).is_null() {
313 next_index |= MARK_BIT;
314 }
315
316 self.head.block.store(next, Ordering::Release);
317 self.head.index.store(next_index, Ordering::Release);
318 }
319
320 let slot = (*block).slots.get_unchecked(offset);
322 slot.wait_write();
323 let value = slot.value.with_mut(|slot| slot.read().assume_init());
324
325 if offset + 1 == BLOCK_CAP {
328 Block::destroy(block, 0);
329 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
330 Block::destroy(block, offset + 1);
331 }
332
333 return Ok(value);
334 },
335 Err(h) => {
336 head = h;
337 block = self.head.block.load(Ordering::Acquire);
338 }
339 }
340 }
341 }
342
343 pub fn len(&self) -> usize {
345 loop {
346 let mut tail = self.tail.index.load(Ordering::SeqCst);
348 let mut head = self.head.index.load(Ordering::SeqCst);
349
350 if self.tail.index.load(Ordering::SeqCst) == tail {
352 tail &= !((1 << SHIFT) - 1);
354 head &= !((1 << SHIFT) - 1);
355
356 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
358 tail = tail.wrapping_add(1 << SHIFT);
359 }
360 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
361 head = head.wrapping_add(1 << SHIFT);
362 }
363
364 let lap = (head >> SHIFT) / LAP;
366 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
367 head = head.wrapping_sub((lap * LAP) << SHIFT);
368
369 tail >>= SHIFT;
371 head >>= SHIFT;
372
373 return tail - head - tail / LAP;
375 }
376 }
377 }
378
379 pub fn is_empty(&self) -> bool {
381 let head = self.head.index.load(Ordering::SeqCst);
382 let tail = self.tail.index.load(Ordering::SeqCst);
383 head >> SHIFT == tail >> SHIFT
384 }
385
386 pub fn is_full(&self) -> bool {
388 false
389 }
390
391 pub fn close(&self) -> bool {
395 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
396 tail & MARK_BIT == 0
397 }
398
399 pub fn is_closed(&self) -> bool {
401 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
402 }
403}
404
405impl<T> Drop for Unbounded<T> {
406 fn drop(&mut self) {
407 let Self { head, tail } = self;
408 let Position { index: head, block } = &mut **head;
409
410 head.with_mut(|&mut mut head| {
411 tail.index.with_mut(|&mut mut tail| {
412 head &= !((1 << SHIFT) - 1);
414 tail &= !((1 << SHIFT) - 1);
415
416 unsafe {
417 while head != tail {
419 let offset = (head >> SHIFT) % LAP;
420
421 if offset < BLOCK_CAP {
422 block.with_mut(|block| {
424 let slot = (**block).slots.get_unchecked(offset);
425 slot.value.with_mut(|slot| {
426 let value = &mut *slot;
427 value.as_mut_ptr().drop_in_place();
428 });
429 });
430 } else {
431 block.with_mut(|block| {
433 let next_block = (**block).next.with_mut(|next| *next);
434 drop(Box::from_raw(*block));
435 *block = next_block;
436 });
437 }
438
439 head = head.wrapping_add(1 << SHIFT);
440 }
441
442 block.with_mut(|block| {
444 if !block.is_null() {
445 drop(Box::from_raw(*block));
446 }
447 });
448 }
449 });
450 });
451 }
452}