concurrent_queue/
single.rs

1use core::mem::MaybeUninit;
2use core::ptr;
3
4use crate::sync::atomic::{AtomicUsize, Ordering};
5use crate::sync::cell::UnsafeCell;
6#[allow(unused_imports)]
7use crate::sync::prelude::*;
8use crate::{busy_wait, ForcePushError, PopError, PushError};
9
10const LOCKED: usize = 1 << 0;
11const PUSHED: usize = 1 << 1;
12const CLOSED: usize = 1 << 2;
13
14/// A single-element queue.
15pub struct Single<T> {
16    state: AtomicUsize,
17    slot: UnsafeCell<MaybeUninit<T>>,
18}
19
20impl<T> Single<T> {
21    /// Creates a new single-element queue.
22    pub fn new() -> Single<T> {
23        Single {
24            state: AtomicUsize::new(0),
25            slot: UnsafeCell::new(MaybeUninit::uninit()),
26        }
27    }
28
29    /// Attempts to push an item into the queue.
30    pub fn push(&self, value: T) -> Result<(), PushError<T>> {
31        // Lock and fill the slot.
32        let state = self
33            .state
34            .compare_exchange(0, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
35            .unwrap_or_else(|x| x);
36
37        if state == 0 {
38            // Write the value and unlock.
39            self.slot.with_mut(|slot| unsafe {
40                slot.write(MaybeUninit::new(value));
41            });
42            self.state.fetch_and(!LOCKED, Ordering::Release);
43            Ok(())
44        } else if state & CLOSED != 0 {
45            Err(PushError::Closed(value))
46        } else {
47            Err(PushError::Full(value))
48        }
49    }
50
51    /// Attempts to push an item into the queue, displacing another if necessary.
52    pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
53        // Attempt to lock the slot.
54        let mut state = 0;
55
56        loop {
57            // Lock the slot.
58            let prev = self
59                .state
60                .compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
61                .unwrap_or_else(|x| x);
62
63            if prev & CLOSED != 0 {
64                return Err(ForcePushError(value));
65            }
66
67            if prev == state {
68                // If the value was pushed, swap out the value.
69                let prev_value = if prev & PUSHED == 0 {
70                    // SAFETY: write is safe because we have locked the state.
71                    self.slot.with_mut(|slot| unsafe {
72                        slot.write(MaybeUninit::new(value));
73                    });
74                    None
75                } else {
76                    // SAFETY: replace is safe because we have locked the state, and
77                    // assume_init is safe because we have checked that the value was pushed.
78                    let prev_value = unsafe {
79                        self.slot.with_mut(move |slot| {
80                            ptr::replace(slot, MaybeUninit::new(value)).assume_init()
81                        })
82                    };
83                    Some(prev_value)
84                };
85
86                // We can unlock the slot now.
87                self.state.fetch_and(!LOCKED, Ordering::Release);
88
89                // Return the old value.
90                return Ok(prev_value);
91            }
92
93            // Try to go for the current (pushed) state.
94            if prev & LOCKED == 0 {
95                state = prev;
96            } else {
97                // State is locked.
98                busy_wait();
99                state = prev & !LOCKED;
100            }
101        }
102    }
103
104    /// Attempts to pop an item from the queue.
105    pub fn pop(&self) -> Result<T, PopError> {
106        let mut state = PUSHED;
107        loop {
108            // Lock and empty the slot.
109            let prev = self
110                .state
111                .compare_exchange(
112                    state,
113                    (state | LOCKED) & !PUSHED,
114                    Ordering::SeqCst,
115                    Ordering::SeqCst,
116                )
117                .unwrap_or_else(|x| x);
118
119            if prev == state {
120                // Read the value and unlock.
121                let value = self
122                    .slot
123                    .with_mut(|slot| unsafe { slot.read().assume_init() });
124                self.state.fetch_and(!LOCKED, Ordering::Release);
125                return Ok(value);
126            }
127
128            if prev & PUSHED == 0 {
129                if prev & CLOSED == 0 {
130                    return Err(PopError::Empty);
131                } else {
132                    return Err(PopError::Closed);
133                }
134            }
135
136            if prev & LOCKED == 0 {
137                state = prev;
138            } else {
139                busy_wait();
140                state = prev & !LOCKED;
141            }
142        }
143    }
144
145    /// Returns the number of items in the queue.
146    pub fn len(&self) -> usize {
147        usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)
148    }
149
150    /// Returns `true` if the queue is empty.
151    pub fn is_empty(&self) -> bool {
152        self.len() == 0
153    }
154
155    /// Returns `true` if the queue is full.
156    pub fn is_full(&self) -> bool {
157        self.len() == 1
158    }
159
160    /// Closes the queue.
161    ///
162    /// Returns `true` if this call closed the queue.
163    pub fn close(&self) -> bool {
164        let state = self.state.fetch_or(CLOSED, Ordering::SeqCst);
165        state & CLOSED == 0
166    }
167
168    /// Returns `true` if the queue is closed.
169    pub fn is_closed(&self) -> bool {
170        self.state.load(Ordering::SeqCst) & CLOSED != 0
171    }
172}
173
174impl<T> Drop for Single<T> {
175    fn drop(&mut self) {
176        // Drop the value in the slot.
177        let Self { state, slot } = self;
178        state.with_mut(|state| {
179            if *state & PUSHED != 0 {
180                slot.with_mut(|slot| unsafe {
181                    let value = &mut *slot;
182                    value.as_mut_ptr().drop_in_place();
183                });
184            }
185        });
186    }
187}