1use core::mem::MaybeUninit;
12use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13
14use crossbeam_utils::CachePadded;
15
16use crate::{unprotected, Atomic, Guard, Owned, Shared};
17
18#[derive(Debug)]
22pub(crate) struct Queue<T> {
23 head: CachePadded<Atomic<Node<T>>>,
24 tail: CachePadded<Atomic<Node<T>>>,
25}
26
27struct Node<T> {
28 data: MaybeUninit<T>,
35
36 next: Atomic<Node<T>>,
37}
38
39unsafe impl<T: Send> Sync for Queue<T> {}
41unsafe impl<T: Send> Send for Queue<T> {}
42
43impl<T> Queue<T> {
44 pub(crate) fn new() -> Queue<T> {
46 let q = Queue {
47 head: CachePadded::new(Atomic::null()),
48 tail: CachePadded::new(Atomic::null()),
49 };
50 let sentinel = Owned::new(Node {
51 data: MaybeUninit::uninit(),
52 next: Atomic::null(),
53 });
54 unsafe {
55 let guard = unprotected();
56 let sentinel = sentinel.into_shared(guard);
57 q.head.store(sentinel, Relaxed);
58 q.tail.store(sentinel, Relaxed);
59 q
60 }
61 }
62
63 #[inline(always)]
66 fn push_internal(
67 &self,
68 onto: Shared<'_, Node<T>>,
69 new: Shared<'_, Node<T>>,
70 guard: &Guard,
71 ) -> bool {
72 let o = unsafe { onto.deref() };
74 let next = o.next.load(Acquire, guard);
75 if unsafe { next.as_ref().is_some() } {
76 let _ = self
78 .tail
79 .compare_exchange(onto, next, Release, Relaxed, guard);
80 false
81 } else {
82 let result = o
84 .next
85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86 .is_ok();
87 if result {
88 let _ = self
90 .tail
91 .compare_exchange(onto, new, Release, Relaxed, guard);
92 }
93 result
94 }
95 }
96
97 pub(crate) fn push(&self, t: T, guard: &Guard) {
99 let new = Owned::new(Node {
100 data: MaybeUninit::new(t),
101 next: Atomic::null(),
102 });
103 let new = Owned::into_shared(new, guard);
104
105 loop {
106 let tail = self.tail.load(Acquire, guard);
108
109 if self.push_internal(tail, new, guard) {
111 break;
112 }
113 }
114 }
115
116 #[inline(always)]
118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119 let head = self.head.load(Acquire, guard);
120 let h = unsafe { head.deref() };
121 let next = h.next.load(Acquire, guard);
122 match unsafe { next.as_ref() } {
123 Some(n) => unsafe {
124 self.head
125 .compare_exchange(head, next, Release, Relaxed, guard)
126 .map(|_| {
127 let tail = self.tail.load(Relaxed, guard);
128 if head == tail {
130 let _ = self
131 .tail
132 .compare_exchange(tail, next, Release, Relaxed, guard);
133 }
134 guard.defer_destroy(head);
135 Some(n.data.as_ptr().read())
137 })
138 .map_err(|_| ())
139 },
140 None => Ok(None),
141 }
142 }
143
144 #[inline(always)]
147 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
148 where
149 T: Sync,
150 F: Fn(&T) -> bool,
151 {
152 let head = self.head.load(Acquire, guard);
153 let h = unsafe { head.deref() };
154 let next = h.next.load(Acquire, guard);
155 match unsafe { next.as_ref() } {
156 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
157 self.head
158 .compare_exchange(head, next, Release, Relaxed, guard)
159 .map(|_| {
160 let tail = self.tail.load(Relaxed, guard);
161 if head == tail {
163 let _ = self
164 .tail
165 .compare_exchange(tail, next, Release, Relaxed, guard);
166 }
167 guard.defer_destroy(head);
168 Some(n.data.as_ptr().read())
169 })
170 .map_err(|_| ())
171 },
172 None | Some(_) => Ok(None),
173 }
174 }
175
176 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
180 loop {
181 if let Ok(head) = self.pop_internal(guard) {
182 return head;
183 }
184 }
185 }
186
187 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
192 where
193 T: Sync,
194 F: Fn(&T) -> bool,
195 {
196 loop {
197 if let Ok(head) = self.pop_if_internal(&condition, guard) {
198 return head;
199 }
200 }
201 }
202}
203
204impl<T> Drop for Queue<T> {
205 fn drop(&mut self) {
206 unsafe {
207 let guard = unprotected();
208
209 while self.try_pop(guard).is_some() {}
210
211 let sentinel = self.head.load(Relaxed, guard);
213 drop(sentinel.into_owned());
214 }
215 }
216}
217
218#[cfg(all(test, not(crossbeam_loom)))]
219mod test {
220 use super::*;
221 use crate::pin;
222 use crossbeam_utils::thread;
223
224 struct Queue<T> {
225 queue: super::Queue<T>,
226 }
227
228 impl<T> Queue<T> {
229 pub(crate) fn new() -> Queue<T> {
230 Queue {
231 queue: super::Queue::new(),
232 }
233 }
234
235 pub(crate) fn push(&self, t: T) {
236 let guard = &pin();
237 self.queue.push(t, guard);
238 }
239
240 pub(crate) fn is_empty(&self) -> bool {
241 let guard = &pin();
242 let head = self.queue.head.load(Acquire, guard);
243 let h = unsafe { head.deref() };
244 h.next.load(Acquire, guard).is_null()
245 }
246
247 pub(crate) fn try_pop(&self) -> Option<T> {
248 let guard = &pin();
249 self.queue.try_pop(guard)
250 }
251
252 pub(crate) fn pop(&self) -> T {
253 loop {
254 match self.try_pop() {
255 None => continue,
256 Some(t) => return t,
257 }
258 }
259 }
260 }
261
262 const CONC_COUNT: i64 = 1000000;
263
264 #[test]
265 fn push_try_pop_1() {
266 let q: Queue<i64> = Queue::new();
267 assert!(q.is_empty());
268 q.push(37);
269 assert!(!q.is_empty());
270 assert_eq!(q.try_pop(), Some(37));
271 assert!(q.is_empty());
272 }
273
274 #[test]
275 fn push_try_pop_2() {
276 let q: Queue<i64> = Queue::new();
277 assert!(q.is_empty());
278 q.push(37);
279 q.push(48);
280 assert_eq!(q.try_pop(), Some(37));
281 assert!(!q.is_empty());
282 assert_eq!(q.try_pop(), Some(48));
283 assert!(q.is_empty());
284 }
285
286 #[test]
287 fn push_try_pop_many_seq() {
288 let q: Queue<i64> = Queue::new();
289 assert!(q.is_empty());
290 for i in 0..200 {
291 q.push(i)
292 }
293 assert!(!q.is_empty());
294 for i in 0..200 {
295 assert_eq!(q.try_pop(), Some(i));
296 }
297 assert!(q.is_empty());
298 }
299
300 #[test]
301 fn push_pop_1() {
302 let q: Queue<i64> = Queue::new();
303 assert!(q.is_empty());
304 q.push(37);
305 assert!(!q.is_empty());
306 assert_eq!(q.pop(), 37);
307 assert!(q.is_empty());
308 }
309
310 #[test]
311 fn push_pop_2() {
312 let q: Queue<i64> = Queue::new();
313 q.push(37);
314 q.push(48);
315 assert_eq!(q.pop(), 37);
316 assert_eq!(q.pop(), 48);
317 }
318
319 #[test]
320 fn push_pop_many_seq() {
321 let q: Queue<i64> = Queue::new();
322 assert!(q.is_empty());
323 for i in 0..200 {
324 q.push(i)
325 }
326 assert!(!q.is_empty());
327 for i in 0..200 {
328 assert_eq!(q.pop(), i);
329 }
330 assert!(q.is_empty());
331 }
332
333 #[test]
334 fn push_try_pop_many_spsc() {
335 let q: Queue<i64> = Queue::new();
336 assert!(q.is_empty());
337
338 thread::scope(|scope| {
339 scope.spawn(|_| {
340 let mut next = 0;
341
342 while next < CONC_COUNT {
343 if let Some(elem) = q.try_pop() {
344 assert_eq!(elem, next);
345 next += 1;
346 }
347 }
348 });
349
350 for i in 0..CONC_COUNT {
351 q.push(i)
352 }
353 })
354 .unwrap();
355 }
356
357 #[test]
358 fn push_try_pop_many_spmc() {
359 fn recv(_t: i32, q: &Queue<i64>) {
360 let mut cur = -1;
361 for _i in 0..CONC_COUNT {
362 if let Some(elem) = q.try_pop() {
363 assert!(elem > cur);
364 cur = elem;
365
366 if cur == CONC_COUNT - 1 {
367 break;
368 }
369 }
370 }
371 }
372
373 let q: Queue<i64> = Queue::new();
374 assert!(q.is_empty());
375 thread::scope(|scope| {
376 for i in 0..3 {
377 let q = &q;
378 scope.spawn(move |_| recv(i, q));
379 }
380
381 scope.spawn(|_| {
382 for i in 0..CONC_COUNT {
383 q.push(i);
384 }
385 });
386 })
387 .unwrap();
388 }
389
390 #[test]
391 fn push_try_pop_many_mpmc() {
392 enum LR {
393 Left(i64),
394 Right(i64),
395 }
396
397 let q: Queue<LR> = Queue::new();
398 assert!(q.is_empty());
399
400 thread::scope(|scope| {
401 for _t in 0..2 {
402 scope.spawn(|_| {
403 for i in CONC_COUNT - 1..CONC_COUNT {
404 q.push(LR::Left(i))
405 }
406 });
407 scope.spawn(|_| {
408 for i in CONC_COUNT - 1..CONC_COUNT {
409 q.push(LR::Right(i))
410 }
411 });
412 scope.spawn(|_| {
413 let mut vl = vec![];
414 let mut vr = vec![];
415 for _i in 0..CONC_COUNT {
416 match q.try_pop() {
417 Some(LR::Left(x)) => vl.push(x),
418 Some(LR::Right(x)) => vr.push(x),
419 _ => {}
420 }
421 }
422
423 let mut vl2 = vl.clone();
424 let mut vr2 = vr.clone();
425 vl2.sort();
426 vr2.sort();
427
428 assert_eq!(vl, vl2);
429 assert_eq!(vr, vr2);
430 });
431 }
432 })
433 .unwrap();
434 }
435
436 #[test]
437 fn push_pop_many_spsc() {
438 let q: Queue<i64> = Queue::new();
439
440 thread::scope(|scope| {
441 scope.spawn(|_| {
442 let mut next = 0;
443 while next < CONC_COUNT {
444 assert_eq!(q.pop(), next);
445 next += 1;
446 }
447 });
448
449 for i in 0..CONC_COUNT {
450 q.push(i)
451 }
452 })
453 .unwrap();
454 assert!(q.is_empty());
455 }
456
457 #[test]
458 fn is_empty_dont_pop() {
459 let q: Queue<i64> = Queue::new();
460 q.push(20);
461 q.push(20);
462 assert!(!q.is_empty());
463 assert!(!q.is_empty());
464 assert!(q.try_pop().is_some());
465 }
466}