concurrent_queue/
single.rs
1use core::mem::MaybeUninit;
2use core::ptr;
3
4use crate::sync::atomic::{AtomicUsize, Ordering};
5use crate::sync::cell::UnsafeCell;
6#[allow(unused_imports)]
7use crate::sync::prelude::*;
8use crate::{busy_wait, ForcePushError, PopError, PushError};
9
10const LOCKED: usize = 1 << 0;
11const PUSHED: usize = 1 << 1;
12const CLOSED: usize = 1 << 2;
13
14pub struct Single<T> {
16 state: AtomicUsize,
17 slot: UnsafeCell<MaybeUninit<T>>,
18}
19
20impl<T> Single<T> {
21 pub fn new() -> Single<T> {
23 Single {
24 state: AtomicUsize::new(0),
25 slot: UnsafeCell::new(MaybeUninit::uninit()),
26 }
27 }
28
29 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
31 let state = self
33 .state
34 .compare_exchange(0, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
35 .unwrap_or_else(|x| x);
36
37 if state == 0 {
38 self.slot.with_mut(|slot| unsafe {
40 slot.write(MaybeUninit::new(value));
41 });
42 self.state.fetch_and(!LOCKED, Ordering::Release);
43 Ok(())
44 } else if state & CLOSED != 0 {
45 Err(PushError::Closed(value))
46 } else {
47 Err(PushError::Full(value))
48 }
49 }
50
51 pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
53 let mut state = 0;
55
56 loop {
57 let prev = self
59 .state
60 .compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
61 .unwrap_or_else(|x| x);
62
63 if prev & CLOSED != 0 {
64 return Err(ForcePushError(value));
65 }
66
67 if prev == state {
68 let prev_value = if prev & PUSHED == 0 {
70 self.slot.with_mut(|slot| unsafe {
72 slot.write(MaybeUninit::new(value));
73 });
74 None
75 } else {
76 let prev_value = unsafe {
79 self.slot.with_mut(move |slot| {
80 ptr::replace(slot, MaybeUninit::new(value)).assume_init()
81 })
82 };
83 Some(prev_value)
84 };
85
86 self.state.fetch_and(!LOCKED, Ordering::Release);
88
89 return Ok(prev_value);
91 }
92
93 if prev & LOCKED == 0 {
95 state = prev;
96 } else {
97 busy_wait();
99 state = prev & !LOCKED;
100 }
101 }
102 }
103
104 pub fn pop(&self) -> Result<T, PopError> {
106 let mut state = PUSHED;
107 loop {
108 let prev = self
110 .state
111 .compare_exchange(
112 state,
113 (state | LOCKED) & !PUSHED,
114 Ordering::SeqCst,
115 Ordering::SeqCst,
116 )
117 .unwrap_or_else(|x| x);
118
119 if prev == state {
120 let value = self
122 .slot
123 .with_mut(|slot| unsafe { slot.read().assume_init() });
124 self.state.fetch_and(!LOCKED, Ordering::Release);
125 return Ok(value);
126 }
127
128 if prev & PUSHED == 0 {
129 if prev & CLOSED == 0 {
130 return Err(PopError::Empty);
131 } else {
132 return Err(PopError::Closed);
133 }
134 }
135
136 if prev & LOCKED == 0 {
137 state = prev;
138 } else {
139 busy_wait();
140 state = prev & !LOCKED;
141 }
142 }
143 }
144
145 pub fn len(&self) -> usize {
147 usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)
148 }
149
150 pub fn is_empty(&self) -> bool {
152 self.len() == 0
153 }
154
155 pub fn is_full(&self) -> bool {
157 self.len() == 1
158 }
159
160 pub fn close(&self) -> bool {
164 let state = self.state.fetch_or(CLOSED, Ordering::SeqCst);
165 state & CLOSED == 0
166 }
167
168 pub fn is_closed(&self) -> bool {
170 self.state.load(Ordering::SeqCst) & CLOSED != 0
171 }
172}
173
174impl<T> Drop for Single<T> {
175 fn drop(&mut self) {
176 let Self { state, slot } = self;
178 state.with_mut(|state| {
179 if *state & PUSHED != 0 {
180 slot.with_mut(|slot| unsafe {
181 let value = &mut *slot;
182 value.as_mut_ptr().drop_in_place();
183 });
184 }
185 });
186 }
187}