parking/
lib.rs

1//! Thread parking and unparking.
2//!
3//! A parker is in either notified or unnotified state. Method [`park()`][`Parker::park()`] blocks
4//! the current thread until the parker becomes notified and then puts it back into unnotified
5//! state. Method [`unpark()`][`Unparker::unpark()`] puts it into notified state.
6//!
7//! # Examples
8//!
9//! ```
10//! use std::thread;
11//! use std::time::Duration;
12//! use parking::Parker;
13//!
14//! let p = Parker::new();
15//! let u = p.unparker();
16//!
17//! // Notify the parker.
18//! u.unpark();
19//!
20//! // Wakes up immediately because the parker is notified.
21//! p.park();
22//!
23//! thread::spawn(move || {
24//!     thread::sleep(Duration::from_millis(500));
25//!     u.unpark();
26//! });
27//!
28//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
29//! p.park();
30//! ```
31
32#![forbid(unsafe_code)]
33#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
34
35use std::cell::Cell;
36use std::fmt;
37use std::marker::PhantomData;
38use std::sync::atomic::AtomicUsize;
39use std::sync::atomic::Ordering::SeqCst;
40use std::sync::{Arc, Condvar, Mutex};
41use std::time::{Duration, Instant};
42
43/// Creates a parker and an associated unparker.
44///
45/// # Examples
46///
47/// ```
48/// let (p, u) = parking::pair();
49/// ```
50pub fn pair() -> (Parker, Unparker) {
51    let p = Parker::new();
52    let u = p.unparker();
53    (p, u)
54}
55
56/// Waits for a notification.
57pub struct Parker {
58    unparker: Unparker,
59    _marker: PhantomData<Cell<()>>,
60}
61
62impl Parker {
63    /// Creates a new parker.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use parking::Parker;
69    ///
70    /// let p = Parker::new();
71    /// ```
72    ///
73    pub fn new() -> Parker {
74        Parker {
75            unparker: Unparker {
76                inner: Arc::new(Inner {
77                    state: AtomicUsize::new(EMPTY),
78                    lock: Mutex::new(()),
79                    cvar: Condvar::new(),
80                }),
81            },
82            _marker: PhantomData,
83        }
84    }
85
86    /// Blocks until notified and then goes back into unnotified state.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use parking::Parker;
92    ///
93    /// let p = Parker::new();
94    /// let u = p.unparker();
95    ///
96    /// // Notify the parker.
97    /// u.unpark();
98    ///
99    /// // Wakes up immediately because the parker is notified.
100    /// p.park();
101    /// ```
102    pub fn park(&self) {
103        self.unparker.inner.park(None);
104    }
105
106    /// Blocks until notified and then goes back into unnotified state, or times out after
107    /// `duration`.
108    ///
109    /// Returns `true` if notified before the timeout.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use std::time::Duration;
115    /// use parking::Parker;
116    ///
117    /// let p = Parker::new();
118    ///
119    /// // Wait for a notification, or time out after 500 ms.
120    /// p.park_timeout(Duration::from_millis(500));
121    /// ```
122    pub fn park_timeout(&self, duration: Duration) -> bool {
123        self.unparker.inner.park(Some(duration))
124    }
125
126    /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
127    ///
128    /// Returns `true` if notified before the deadline.
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// use std::time::{Duration, Instant};
134    /// use parking::Parker;
135    ///
136    /// let p = Parker::new();
137    ///
138    /// // Wait for a notification, or time out after 500 ms.
139    /// p.park_deadline(Instant::now() + Duration::from_millis(500));
140    /// ```
141    pub fn park_deadline(&self, instant: Instant) -> bool {
142        self.unparker
143            .inner
144            .park(Some(instant.saturating_duration_since(Instant::now())))
145    }
146
147    /// Notifies the parker.
148    ///
149    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
150    /// was already notified.
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use std::thread;
156    /// use std::time::Duration;
157    /// use parking::Parker;
158    ///
159    /// let p = Parker::new();
160    ///
161    /// assert_eq!(p.unpark(), true);
162    /// assert_eq!(p.unpark(), false);
163    ///
164    /// // Wakes up immediately.
165    /// p.park();
166    /// ```
167    pub fn unpark(&self) -> bool {
168        self.unparker.unpark()
169    }
170
171    /// Returns a handle for unparking.
172    ///
173    /// The returned [`Unparker`] can be cloned and shared among threads.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use parking::Parker;
179    ///
180    /// let p = Parker::new();
181    /// let u = p.unparker();
182    ///
183    /// // Notify the parker.
184    /// u.unpark();
185    ///
186    /// // Wakes up immediately because the parker is notified.
187    /// p.park();
188    /// ```
189    pub fn unparker(&self) -> Unparker {
190        self.unparker.clone()
191    }
192}
193
194impl Default for Parker {
195    fn default() -> Parker {
196        Parker::new()
197    }
198}
199
200impl fmt::Debug for Parker {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        f.pad("Parker { .. }")
203    }
204}
205
206/// Notifies a parker.
207pub struct Unparker {
208    inner: Arc<Inner>,
209}
210
211impl Unparker {
212    /// Notifies the associated parker.
213    ///
214    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
215    /// was already notified.
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// use std::thread;
221    /// use std::time::Duration;
222    /// use parking::Parker;
223    ///
224    /// let p = Parker::new();
225    /// let u = p.unparker();
226    ///
227    /// thread::spawn(move || {
228    ///     thread::sleep(Duration::from_millis(500));
229    ///     u.unpark();
230    /// });
231    ///
232    /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
233    /// p.park();
234    /// ```
235    pub fn unpark(&self) -> bool {
236        self.inner.unpark()
237    }
238}
239
240impl fmt::Debug for Unparker {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        f.pad("Unparker { .. }")
243    }
244}
245
246impl Clone for Unparker {
247    fn clone(&self) -> Unparker {
248        Unparker {
249            inner: self.inner.clone(),
250        }
251    }
252}
253
254const EMPTY: usize = 0;
255const PARKED: usize = 1;
256const NOTIFIED: usize = 2;
257
258struct Inner {
259    state: AtomicUsize,
260    lock: Mutex<()>,
261    cvar: Condvar,
262}
263
264impl Inner {
265    fn park(&self, timeout: Option<Duration>) -> bool {
266        // If we were previously notified then we consume this notification and return quickly.
267        if self
268            .state
269            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
270            .is_ok()
271        {
272            return true;
273        }
274
275        // If the timeout is zero, then there is no need to actually block.
276        if let Some(dur) = timeout {
277            if dur == Duration::from_millis(0) {
278                return false;
279            }
280        }
281
282        // Otherwise we need to coordinate going to sleep.
283        let mut m = self.lock.lock().unwrap();
284
285        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
286            Ok(_) => {}
287            // Consume this notification to avoid spurious wakeups in the next park.
288            Err(NOTIFIED) => {
289                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
290                // because `unpark` may have been called again since we read `NOTIFIED` in the
291                // `compare_exchange` above. We must perform an acquire operation that synchronizes
292                // with that `unpark` to observe any writes it made before the call to `unpark`. To
293                // do that we must read from the write it made to `state`.
294                let old = self.state.swap(EMPTY, SeqCst);
295                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
296                return true;
297            }
298            Err(n) => panic!("inconsistent park_timeout state: {}", n),
299        }
300
301        match timeout {
302            None => {
303                loop {
304                    // Block the current thread on the conditional variable.
305                    m = self.cvar.wait(m).unwrap();
306
307                    if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
308                        // got a notification
309                        return true;
310                    }
311                }
312            }
313            Some(timeout) => {
314                // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
315                // notification we just want to unconditionally set `state` back to `EMPTY`, either
316                // consuming a notification or un-flagging ourselves as parked.
317                let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
318
319                match self.state.swap(EMPTY, SeqCst) {
320                    NOTIFIED => true, // got a notification
321                    PARKED => false,  // no notification
322                    n => panic!("inconsistent park_timeout state: {}", n),
323                }
324            }
325        }
326    }
327
328    pub fn unpark(&self) -> bool {
329        // To ensure the unparked thread will observe any writes we made before this call, we must
330        // perform a release operation that `park` can synchronize with. To do that we must write
331        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
332        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
333        match self.state.swap(NOTIFIED, SeqCst) {
334            EMPTY => return true,     // no one was waiting
335            NOTIFIED => return false, // already unparked
336            PARKED => {}              // gotta go wake someone up
337            _ => panic!("inconsistent state in unpark"),
338        }
339
340        // There is a period between when the parked thread sets `state` to `PARKED` (or last
341        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
342        // If we were to notify during this period it would be ignored and then when the parked
343        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
344        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
345        //
346        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
347        // it doesn't get woken only to have to wait for us to release `lock`.
348        drop(self.lock.lock().unwrap());
349        self.cvar.notify_one();
350        true
351    }
352}