parking/lib.rs
1//! Thread parking and unparking.
2//!
3//! A parker is in either notified or unnotified state. Method [`park()`][`Parker::park()`] blocks
4//! the current thread until the parker becomes notified and then puts it back into unnotified
5//! state. Method [`unpark()`][`Unparker::unpark()`] puts it into notified state.
6//!
7//! # Examples
8//!
9//! ```
10//! use std::thread;
11//! use std::time::Duration;
12//! use parking::Parker;
13//!
14//! let p = Parker::new();
15//! let u = p.unparker();
16//!
17//! // Notify the parker.
18//! u.unpark();
19//!
20//! // Wakes up immediately because the parker is notified.
21//! p.park();
22//!
23//! thread::spawn(move || {
24//! thread::sleep(Duration::from_millis(500));
25//! u.unpark();
26//! });
27//!
28//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
29//! p.park();
30//! ```
31
32#![forbid(unsafe_code)]
33#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
34
35use std::cell::Cell;
36use std::fmt;
37use std::marker::PhantomData;
38use std::sync::atomic::AtomicUsize;
39use std::sync::atomic::Ordering::SeqCst;
40use std::sync::{Arc, Condvar, Mutex};
41use std::time::{Duration, Instant};
42
43/// Creates a parker and an associated unparker.
44///
45/// # Examples
46///
47/// ```
48/// let (p, u) = parking::pair();
49/// ```
50pub fn pair() -> (Parker, Unparker) {
51 let p = Parker::new();
52 let u = p.unparker();
53 (p, u)
54}
55
56/// Waits for a notification.
57pub struct Parker {
58 unparker: Unparker,
59 _marker: PhantomData<Cell<()>>,
60}
61
62impl Parker {
63 /// Creates a new parker.
64 ///
65 /// # Examples
66 ///
67 /// ```
68 /// use parking::Parker;
69 ///
70 /// let p = Parker::new();
71 /// ```
72 ///
73 pub fn new() -> Parker {
74 Parker {
75 unparker: Unparker {
76 inner: Arc::new(Inner {
77 state: AtomicUsize::new(EMPTY),
78 lock: Mutex::new(()),
79 cvar: Condvar::new(),
80 }),
81 },
82 _marker: PhantomData,
83 }
84 }
85
86 /// Blocks until notified and then goes back into unnotified state.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use parking::Parker;
92 ///
93 /// let p = Parker::new();
94 /// let u = p.unparker();
95 ///
96 /// // Notify the parker.
97 /// u.unpark();
98 ///
99 /// // Wakes up immediately because the parker is notified.
100 /// p.park();
101 /// ```
102 pub fn park(&self) {
103 self.unparker.inner.park(None);
104 }
105
106 /// Blocks until notified and then goes back into unnotified state, or times out after
107 /// `duration`.
108 ///
109 /// Returns `true` if notified before the timeout.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use std::time::Duration;
115 /// use parking::Parker;
116 ///
117 /// let p = Parker::new();
118 ///
119 /// // Wait for a notification, or time out after 500 ms.
120 /// p.park_timeout(Duration::from_millis(500));
121 /// ```
122 pub fn park_timeout(&self, duration: Duration) -> bool {
123 self.unparker.inner.park(Some(duration))
124 }
125
126 /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
127 ///
128 /// Returns `true` if notified before the deadline.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use std::time::{Duration, Instant};
134 /// use parking::Parker;
135 ///
136 /// let p = Parker::new();
137 ///
138 /// // Wait for a notification, or time out after 500 ms.
139 /// p.park_deadline(Instant::now() + Duration::from_millis(500));
140 /// ```
141 pub fn park_deadline(&self, instant: Instant) -> bool {
142 self.unparker
143 .inner
144 .park(Some(instant.saturating_duration_since(Instant::now())))
145 }
146
147 /// Notifies the parker.
148 ///
149 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
150 /// was already notified.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use std::thread;
156 /// use std::time::Duration;
157 /// use parking::Parker;
158 ///
159 /// let p = Parker::new();
160 ///
161 /// assert_eq!(p.unpark(), true);
162 /// assert_eq!(p.unpark(), false);
163 ///
164 /// // Wakes up immediately.
165 /// p.park();
166 /// ```
167 pub fn unpark(&self) -> bool {
168 self.unparker.unpark()
169 }
170
171 /// Returns a handle for unparking.
172 ///
173 /// The returned [`Unparker`] can be cloned and shared among threads.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use parking::Parker;
179 ///
180 /// let p = Parker::new();
181 /// let u = p.unparker();
182 ///
183 /// // Notify the parker.
184 /// u.unpark();
185 ///
186 /// // Wakes up immediately because the parker is notified.
187 /// p.park();
188 /// ```
189 pub fn unparker(&self) -> Unparker {
190 self.unparker.clone()
191 }
192}
193
194impl Default for Parker {
195 fn default() -> Parker {
196 Parker::new()
197 }
198}
199
200impl fmt::Debug for Parker {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 f.pad("Parker { .. }")
203 }
204}
205
206/// Notifies a parker.
207pub struct Unparker {
208 inner: Arc<Inner>,
209}
210
211impl Unparker {
212 /// Notifies the associated parker.
213 ///
214 /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
215 /// was already notified.
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use std::thread;
221 /// use std::time::Duration;
222 /// use parking::Parker;
223 ///
224 /// let p = Parker::new();
225 /// let u = p.unparker();
226 ///
227 /// thread::spawn(move || {
228 /// thread::sleep(Duration::from_millis(500));
229 /// u.unpark();
230 /// });
231 ///
232 /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
233 /// p.park();
234 /// ```
235 pub fn unpark(&self) -> bool {
236 self.inner.unpark()
237 }
238}
239
240impl fmt::Debug for Unparker {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 f.pad("Unparker { .. }")
243 }
244}
245
246impl Clone for Unparker {
247 fn clone(&self) -> Unparker {
248 Unparker {
249 inner: self.inner.clone(),
250 }
251 }
252}
253
254const EMPTY: usize = 0;
255const PARKED: usize = 1;
256const NOTIFIED: usize = 2;
257
258struct Inner {
259 state: AtomicUsize,
260 lock: Mutex<()>,
261 cvar: Condvar,
262}
263
264impl Inner {
265 fn park(&self, timeout: Option<Duration>) -> bool {
266 // If we were previously notified then we consume this notification and return quickly.
267 if self
268 .state
269 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
270 .is_ok()
271 {
272 return true;
273 }
274
275 // If the timeout is zero, then there is no need to actually block.
276 if let Some(dur) = timeout {
277 if dur == Duration::from_millis(0) {
278 return false;
279 }
280 }
281
282 // Otherwise we need to coordinate going to sleep.
283 let mut m = self.lock.lock().unwrap();
284
285 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
286 Ok(_) => {}
287 // Consume this notification to avoid spurious wakeups in the next park.
288 Err(NOTIFIED) => {
289 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
290 // because `unpark` may have been called again since we read `NOTIFIED` in the
291 // `compare_exchange` above. We must perform an acquire operation that synchronizes
292 // with that `unpark` to observe any writes it made before the call to `unpark`. To
293 // do that we must read from the write it made to `state`.
294 let old = self.state.swap(EMPTY, SeqCst);
295 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
296 return true;
297 }
298 Err(n) => panic!("inconsistent park_timeout state: {}", n),
299 }
300
301 match timeout {
302 None => {
303 loop {
304 // Block the current thread on the conditional variable.
305 m = self.cvar.wait(m).unwrap();
306
307 if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
308 // got a notification
309 return true;
310 }
311 }
312 }
313 Some(timeout) => {
314 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
315 // notification we just want to unconditionally set `state` back to `EMPTY`, either
316 // consuming a notification or un-flagging ourselves as parked.
317 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
318
319 match self.state.swap(EMPTY, SeqCst) {
320 NOTIFIED => true, // got a notification
321 PARKED => false, // no notification
322 n => panic!("inconsistent park_timeout state: {}", n),
323 }
324 }
325 }
326 }
327
328 pub fn unpark(&self) -> bool {
329 // To ensure the unparked thread will observe any writes we made before this call, we must
330 // perform a release operation that `park` can synchronize with. To do that we must write
331 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
332 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
333 match self.state.swap(NOTIFIED, SeqCst) {
334 EMPTY => return true, // no one was waiting
335 NOTIFIED => return false, // already unparked
336 PARKED => {} // gotta go wake someone up
337 _ => panic!("inconsistent state in unpark"),
338 }
339
340 // There is a period between when the parked thread sets `state` to `PARKED` (or last
341 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
342 // If we were to notify during this period it would be ignored and then when the parked
343 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
344 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
345 //
346 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
347 // it doesn't get woken only to have to wait for us to release `lock`.
348 drop(self.lock.lock().unwrap());
349 self.cvar.notify_one();
350 true
351 }
352}