crossbeam_utils/sync/
parker.rs

1use crate::primitive::sync::atomic::AtomicUsize;
2use crate::primitive::sync::{Arc, Condvar, Mutex};
3use core::sync::atomic::Ordering::SeqCst;
4use std::fmt;
5use std::marker::PhantomData;
6use std::time::{Duration, Instant};
7
8/// A thread parking primitive.
9///
10/// Conceptually, each `Parker` has an associated token which is initially not present:
11///
12/// * The [`park`] method blocks the current thread unless or until the token is available, at
13///   which point it automatically consumes the token.
14///
15/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16///   a specified maximum time.
17///
18/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
19///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
20///   returning immediately.
21///
22/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
23/// [`park`] and [`unpark`].
24///
25/// # Examples
26///
27/// ```
28/// use std::thread;
29/// use std::time::Duration;
30/// use crossbeam_utils::sync::Parker;
31///
32/// let p = Parker::new();
33/// let u = p.unparker().clone();
34///
35/// // Make the token available.
36/// u.unpark();
37/// // Wakes up immediately and consumes the token.
38/// p.park();
39///
40/// thread::spawn(move || {
41///     thread::sleep(Duration::from_millis(500));
42///     u.unpark();
43/// });
44///
45/// // Wakes up when `u.unpark()` provides the token.
46/// p.park();
47/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
48/// ```
49///
50/// [`park`]: Parker::park
51/// [`park_timeout`]: Parker::park_timeout
52/// [`park_deadline`]: Parker::park_deadline
53/// [`unpark`]: Unparker::unpark
54pub struct Parker {
55    unparker: Unparker,
56    _marker: PhantomData<*const ()>,
57}
58
59unsafe impl Send for Parker {}
60
61impl Default for Parker {
62    fn default() -> Self {
63        Self {
64            unparker: Unparker {
65                inner: Arc::new(Inner {
66                    state: AtomicUsize::new(EMPTY),
67                    lock: Mutex::new(()),
68                    cvar: Condvar::new(),
69                }),
70            },
71            _marker: PhantomData,
72        }
73    }
74}
75
76impl Parker {
77    /// Creates a new `Parker`.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use crossbeam_utils::sync::Parker;
83    ///
84    /// let p = Parker::new();
85    /// ```
86    ///
87    pub fn new() -> Parker {
88        Self::default()
89    }
90
91    /// Blocks the current thread until the token is made available.
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// use crossbeam_utils::sync::Parker;
97    ///
98    /// let p = Parker::new();
99    /// let u = p.unparker().clone();
100    ///
101    /// // Make the token available.
102    /// u.unpark();
103    ///
104    /// // Wakes up immediately and consumes the token.
105    /// p.park();
106    /// ```
107    pub fn park(&self) {
108        self.unparker.inner.park(None);
109    }
110
111    /// Blocks the current thread until the token is made available, but only for a limited time.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use std::time::Duration;
117    /// use crossbeam_utils::sync::Parker;
118    ///
119    /// let p = Parker::new();
120    ///
121    /// // Waits for the token to become available, but will not wait longer than 500 ms.
122    /// p.park_timeout(Duration::from_millis(500));
123    /// ```
124    pub fn park_timeout(&self, timeout: Duration) {
125        self.park_deadline(Instant::now() + timeout)
126    }
127
128    /// Blocks the current thread until the token is made available, or until a certain deadline.
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// use std::time::{Duration, Instant};
134    /// use crossbeam_utils::sync::Parker;
135    ///
136    /// let p = Parker::new();
137    /// let deadline = Instant::now() + Duration::from_millis(500);
138    ///
139    /// // Waits for the token to become available, but will not wait longer than 500 ms.
140    /// p.park_deadline(deadline);
141    /// ```
142    pub fn park_deadline(&self, deadline: Instant) {
143        self.unparker.inner.park(Some(deadline))
144    }
145
146    /// Returns a reference to an associated [`Unparker`].
147    ///
148    /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// use crossbeam_utils::sync::Parker;
154    ///
155    /// let p = Parker::new();
156    /// let u = p.unparker().clone();
157    ///
158    /// // Make the token available.
159    /// u.unpark();
160    /// // Wakes up immediately and consumes the token.
161    /// p.park();
162    /// ```
163    ///
164    /// [`park`]: Parker::park
165    /// [`park_timeout`]: Parker::park_timeout
166    pub fn unparker(&self) -> &Unparker {
167        &self.unparker
168    }
169
170    /// Converts a `Parker` into a raw pointer.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// use crossbeam_utils::sync::Parker;
176    ///
177    /// let p = Parker::new();
178    /// let raw = Parker::into_raw(p);
179    /// # let _ = unsafe { Parker::from_raw(raw) };
180    /// ```
181    pub fn into_raw(this: Parker) -> *const () {
182        Unparker::into_raw(this.unparker)
183    }
184
185    /// Converts a raw pointer into a `Parker`.
186    ///
187    /// # Safety
188    ///
189    /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use crossbeam_utils::sync::Parker;
195    ///
196    /// let p = Parker::new();
197    /// let raw = Parker::into_raw(p);
198    /// let p = unsafe { Parker::from_raw(raw) };
199    /// ```
200    pub unsafe fn from_raw(ptr: *const ()) -> Parker {
201        Parker {
202            unparker: Unparker::from_raw(ptr),
203            _marker: PhantomData,
204        }
205    }
206}
207
208impl fmt::Debug for Parker {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.pad("Parker { .. }")
211    }
212}
213
214/// Unparks a thread parked by the associated [`Parker`].
215pub struct Unparker {
216    inner: Arc<Inner>,
217}
218
219unsafe impl Send for Unparker {}
220unsafe impl Sync for Unparker {}
221
222impl Unparker {
223    /// Atomically makes the token available if it is not already.
224    ///
225    /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
226    /// any.
227    ///
228    /// # Examples
229    ///
230    /// ```
231    /// use std::thread;
232    /// use std::time::Duration;
233    /// use crossbeam_utils::sync::Parker;
234    ///
235    /// let p = Parker::new();
236    /// let u = p.unparker().clone();
237    ///
238    /// thread::spawn(move || {
239    ///     thread::sleep(Duration::from_millis(500));
240    ///     u.unpark();
241    /// });
242    ///
243    /// // Wakes up when `u.unpark()` provides the token.
244    /// p.park();
245    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
246    /// ```
247    ///
248    /// [`park`]: Parker::park
249    /// [`park_timeout`]: Parker::park_timeout
250    pub fn unpark(&self) {
251        self.inner.unpark()
252    }
253
254    /// Converts an `Unparker` into a raw pointer.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use crossbeam_utils::sync::{Parker, Unparker};
260    ///
261    /// let p = Parker::new();
262    /// let u = p.unparker().clone();
263    /// let raw = Unparker::into_raw(u);
264    /// # let _ = unsafe { Unparker::from_raw(raw) };
265    /// ```
266    pub fn into_raw(this: Unparker) -> *const () {
267        Arc::into_raw(this.inner).cast::<()>()
268    }
269
270    /// Converts a raw pointer into an `Unparker`.
271    ///
272    /// # Safety
273    ///
274    /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use crossbeam_utils::sync::{Parker, Unparker};
280    ///
281    /// let p = Parker::new();
282    /// let u = p.unparker().clone();
283    ///
284    /// let raw = Unparker::into_raw(u);
285    /// let u = unsafe { Unparker::from_raw(raw) };
286    /// ```
287    pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
288        Unparker {
289            inner: Arc::from_raw(ptr.cast::<Inner>()),
290        }
291    }
292}
293
294impl fmt::Debug for Unparker {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        f.pad("Unparker { .. }")
297    }
298}
299
300impl Clone for Unparker {
301    fn clone(&self) -> Unparker {
302        Unparker {
303            inner: self.inner.clone(),
304        }
305    }
306}
307
308const EMPTY: usize = 0;
309const PARKED: usize = 1;
310const NOTIFIED: usize = 2;
311
312struct Inner {
313    state: AtomicUsize,
314    lock: Mutex<()>,
315    cvar: Condvar,
316}
317
318impl Inner {
319    fn park(&self, deadline: Option<Instant>) {
320        // If we were previously notified then we consume this notification and return quickly.
321        if self
322            .state
323            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
324            .is_ok()
325        {
326            return;
327        }
328
329        // If the timeout is zero, then there is no need to actually block.
330        if let Some(deadline) = deadline {
331            if deadline <= Instant::now() {
332                return;
333            }
334        }
335
336        // Otherwise we need to coordinate going to sleep.
337        let mut m = self.lock.lock().unwrap();
338
339        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
340            Ok(_) => {}
341            // Consume this notification to avoid spurious wakeups in the next park.
342            Err(NOTIFIED) => {
343                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
344                // because `unpark` may have been called again since we read `NOTIFIED` in the
345                // `compare_exchange` above. We must perform an acquire operation that synchronizes
346                // with that `unpark` to observe any writes it made before the call to `unpark`. To
347                // do that we must read from the write it made to `state`.
348                let old = self.state.swap(EMPTY, SeqCst);
349                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
350                return;
351            }
352            Err(n) => panic!("inconsistent park_timeout state: {}", n),
353        }
354
355        loop {
356            // Block the current thread on the conditional variable.
357            m = match deadline {
358                None => self.cvar.wait(m).unwrap(),
359                Some(deadline) => {
360                    let now = Instant::now();
361                    if now < deadline {
362                        // We could check for a timeout here, in the return value of wait_timeout,
363                        // but in the case that a timeout and an unpark arrive simultaneously, we
364                        // prefer to report the former.
365                        self.cvar.wait_timeout(m, deadline - now).unwrap().0
366                    } else {
367                        // We've timed out; swap out the state back to empty on our way out
368                        match self.state.swap(EMPTY, SeqCst) {
369                            NOTIFIED | PARKED => return,
370                            n => panic!("inconsistent park_timeout state: {}", n),
371                        };
372                    }
373                }
374            };
375
376            if self
377                .state
378                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
379                .is_ok()
380            {
381                // got a notification
382                return;
383            }
384
385            // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
386            // in the branch above, when we discover the deadline is in the past
387        }
388    }
389
390    pub(crate) fn unpark(&self) {
391        // To ensure the unparked thread will observe any writes we made before this call, we must
392        // perform a release operation that `park` can synchronize with. To do that we must write
393        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
394        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
395        match self.state.swap(NOTIFIED, SeqCst) {
396            EMPTY => return,    // no one was waiting
397            NOTIFIED => return, // already unparked
398            PARKED => {}        // gotta go wake someone up
399            _ => panic!("inconsistent state in unpark"),
400        }
401
402        // There is a period between when the parked thread sets `state` to `PARKED` (or last
403        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
404        // If we were to notify during this period it would be ignored and then when the parked
405        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
406        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
407        //
408        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
409        // it doesn't get woken only to have to wait for us to release `lock`.
410        drop(self.lock.lock().unwrap());
411        self.cvar.notify_one();
412    }
413}