crossbeam_utils/sync/parker.rs
1use crate::primitive::sync::atomic::AtomicUsize;
2use crate::primitive::sync::{Arc, Condvar, Mutex};
3use core::sync::atomic::Ordering::SeqCst;
4use std::fmt;
5use std::marker::PhantomData;
6use std::time::{Duration, Instant};
7
8/// A thread parking primitive.
9///
10/// Conceptually, each `Parker` has an associated token which is initially not present:
11///
12/// * The [`park`] method blocks the current thread unless or until the token is available, at
13/// which point it automatically consumes the token.
14///
15/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16/// a specified maximum time.
17///
18/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
19/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
20/// returning immediately.
21///
22/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
23/// [`park`] and [`unpark`].
24///
25/// # Examples
26///
27/// ```
28/// use std::thread;
29/// use std::time::Duration;
30/// use crossbeam_utils::sync::Parker;
31///
32/// let p = Parker::new();
33/// let u = p.unparker().clone();
34///
35/// // Make the token available.
36/// u.unpark();
37/// // Wakes up immediately and consumes the token.
38/// p.park();
39///
40/// thread::spawn(move || {
41/// thread::sleep(Duration::from_millis(500));
42/// u.unpark();
43/// });
44///
45/// // Wakes up when `u.unpark()` provides the token.
46/// p.park();
47/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
48/// ```
49///
50/// [`park`]: Parker::park
51/// [`park_timeout`]: Parker::park_timeout
52/// [`park_deadline`]: Parker::park_deadline
53/// [`unpark`]: Unparker::unpark
54pub struct Parker {
55 unparker: Unparker,
56 _marker: PhantomData<*const ()>,
57}
58
59unsafe impl Send for Parker {}
60
61impl Default for Parker {
62 fn default() -> Self {
63 Self {
64 unparker: Unparker {
65 inner: Arc::new(Inner {
66 state: AtomicUsize::new(EMPTY),
67 lock: Mutex::new(()),
68 cvar: Condvar::new(),
69 }),
70 },
71 _marker: PhantomData,
72 }
73 }
74}
75
76impl Parker {
77 /// Creates a new `Parker`.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// use crossbeam_utils::sync::Parker;
83 ///
84 /// let p = Parker::new();
85 /// ```
86 ///
87 pub fn new() -> Parker {
88 Self::default()
89 }
90
91 /// Blocks the current thread until the token is made available.
92 ///
93 /// # Examples
94 ///
95 /// ```
96 /// use crossbeam_utils::sync::Parker;
97 ///
98 /// let p = Parker::new();
99 /// let u = p.unparker().clone();
100 ///
101 /// // Make the token available.
102 /// u.unpark();
103 ///
104 /// // Wakes up immediately and consumes the token.
105 /// p.park();
106 /// ```
107 pub fn park(&self) {
108 self.unparker.inner.park(None);
109 }
110
111 /// Blocks the current thread until the token is made available, but only for a limited time.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use std::time::Duration;
117 /// use crossbeam_utils::sync::Parker;
118 ///
119 /// let p = Parker::new();
120 ///
121 /// // Waits for the token to become available, but will not wait longer than 500 ms.
122 /// p.park_timeout(Duration::from_millis(500));
123 /// ```
124 pub fn park_timeout(&self, timeout: Duration) {
125 self.park_deadline(Instant::now() + timeout)
126 }
127
128 /// Blocks the current thread until the token is made available, or until a certain deadline.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use std::time::{Duration, Instant};
134 /// use crossbeam_utils::sync::Parker;
135 ///
136 /// let p = Parker::new();
137 /// let deadline = Instant::now() + Duration::from_millis(500);
138 ///
139 /// // Waits for the token to become available, but will not wait longer than 500 ms.
140 /// p.park_deadline(deadline);
141 /// ```
142 pub fn park_deadline(&self, deadline: Instant) {
143 self.unparker.inner.park(Some(deadline))
144 }
145
146 /// Returns a reference to an associated [`Unparker`].
147 ///
148 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
149 ///
150 /// # Examples
151 ///
152 /// ```
153 /// use crossbeam_utils::sync::Parker;
154 ///
155 /// let p = Parker::new();
156 /// let u = p.unparker().clone();
157 ///
158 /// // Make the token available.
159 /// u.unpark();
160 /// // Wakes up immediately and consumes the token.
161 /// p.park();
162 /// ```
163 ///
164 /// [`park`]: Parker::park
165 /// [`park_timeout`]: Parker::park_timeout
166 pub fn unparker(&self) -> &Unparker {
167 &self.unparker
168 }
169
170 /// Converts a `Parker` into a raw pointer.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use crossbeam_utils::sync::Parker;
176 ///
177 /// let p = Parker::new();
178 /// let raw = Parker::into_raw(p);
179 /// # let _ = unsafe { Parker::from_raw(raw) };
180 /// ```
181 pub fn into_raw(this: Parker) -> *const () {
182 Unparker::into_raw(this.unparker)
183 }
184
185 /// Converts a raw pointer into a `Parker`.
186 ///
187 /// # Safety
188 ///
189 /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// use crossbeam_utils::sync::Parker;
195 ///
196 /// let p = Parker::new();
197 /// let raw = Parker::into_raw(p);
198 /// let p = unsafe { Parker::from_raw(raw) };
199 /// ```
200 pub unsafe fn from_raw(ptr: *const ()) -> Parker {
201 Parker {
202 unparker: Unparker::from_raw(ptr),
203 _marker: PhantomData,
204 }
205 }
206}
207
208impl fmt::Debug for Parker {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.pad("Parker { .. }")
211 }
212}
213
214/// Unparks a thread parked by the associated [`Parker`].
215pub struct Unparker {
216 inner: Arc<Inner>,
217}
218
219unsafe impl Send for Unparker {}
220unsafe impl Sync for Unparker {}
221
222impl Unparker {
223 /// Atomically makes the token available if it is not already.
224 ///
225 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
226 /// any.
227 ///
228 /// # Examples
229 ///
230 /// ```
231 /// use std::thread;
232 /// use std::time::Duration;
233 /// use crossbeam_utils::sync::Parker;
234 ///
235 /// let p = Parker::new();
236 /// let u = p.unparker().clone();
237 ///
238 /// thread::spawn(move || {
239 /// thread::sleep(Duration::from_millis(500));
240 /// u.unpark();
241 /// });
242 ///
243 /// // Wakes up when `u.unpark()` provides the token.
244 /// p.park();
245 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
246 /// ```
247 ///
248 /// [`park`]: Parker::park
249 /// [`park_timeout`]: Parker::park_timeout
250 pub fn unpark(&self) {
251 self.inner.unpark()
252 }
253
254 /// Converts an `Unparker` into a raw pointer.
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use crossbeam_utils::sync::{Parker, Unparker};
260 ///
261 /// let p = Parker::new();
262 /// let u = p.unparker().clone();
263 /// let raw = Unparker::into_raw(u);
264 /// # let _ = unsafe { Unparker::from_raw(raw) };
265 /// ```
266 pub fn into_raw(this: Unparker) -> *const () {
267 Arc::into_raw(this.inner).cast::<()>()
268 }
269
270 /// Converts a raw pointer into an `Unparker`.
271 ///
272 /// # Safety
273 ///
274 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use crossbeam_utils::sync::{Parker, Unparker};
280 ///
281 /// let p = Parker::new();
282 /// let u = p.unparker().clone();
283 ///
284 /// let raw = Unparker::into_raw(u);
285 /// let u = unsafe { Unparker::from_raw(raw) };
286 /// ```
287 pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
288 Unparker {
289 inner: Arc::from_raw(ptr.cast::<Inner>()),
290 }
291 }
292}
293
294impl fmt::Debug for Unparker {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.pad("Unparker { .. }")
297 }
298}
299
300impl Clone for Unparker {
301 fn clone(&self) -> Unparker {
302 Unparker {
303 inner: self.inner.clone(),
304 }
305 }
306}
307
308const EMPTY: usize = 0;
309const PARKED: usize = 1;
310const NOTIFIED: usize = 2;
311
312struct Inner {
313 state: AtomicUsize,
314 lock: Mutex<()>,
315 cvar: Condvar,
316}
317
318impl Inner {
319 fn park(&self, deadline: Option<Instant>) {
320 // If we were previously notified then we consume this notification and return quickly.
321 if self
322 .state
323 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
324 .is_ok()
325 {
326 return;
327 }
328
329 // If the timeout is zero, then there is no need to actually block.
330 if let Some(deadline) = deadline {
331 if deadline <= Instant::now() {
332 return;
333 }
334 }
335
336 // Otherwise we need to coordinate going to sleep.
337 let mut m = self.lock.lock().unwrap();
338
339 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
340 Ok(_) => {}
341 // Consume this notification to avoid spurious wakeups in the next park.
342 Err(NOTIFIED) => {
343 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
344 // because `unpark` may have been called again since we read `NOTIFIED` in the
345 // `compare_exchange` above. We must perform an acquire operation that synchronizes
346 // with that `unpark` to observe any writes it made before the call to `unpark`. To
347 // do that we must read from the write it made to `state`.
348 let old = self.state.swap(EMPTY, SeqCst);
349 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
350 return;
351 }
352 Err(n) => panic!("inconsistent park_timeout state: {}", n),
353 }
354
355 loop {
356 // Block the current thread on the conditional variable.
357 m = match deadline {
358 None => self.cvar.wait(m).unwrap(),
359 Some(deadline) => {
360 let now = Instant::now();
361 if now < deadline {
362 // We could check for a timeout here, in the return value of wait_timeout,
363 // but in the case that a timeout and an unpark arrive simultaneously, we
364 // prefer to report the former.
365 self.cvar.wait_timeout(m, deadline - now).unwrap().0
366 } else {
367 // We've timed out; swap out the state back to empty on our way out
368 match self.state.swap(EMPTY, SeqCst) {
369 NOTIFIED | PARKED => return,
370 n => panic!("inconsistent park_timeout state: {}", n),
371 };
372 }
373 }
374 };
375
376 if self
377 .state
378 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
379 .is_ok()
380 {
381 // got a notification
382 return;
383 }
384
385 // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
386 // in the branch above, when we discover the deadline is in the past
387 }
388 }
389
390 pub(crate) fn unpark(&self) {
391 // To ensure the unparked thread will observe any writes we made before this call, we must
392 // perform a release operation that `park` can synchronize with. To do that we must write
393 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
394 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
395 match self.state.swap(NOTIFIED, SeqCst) {
396 EMPTY => return, // no one was waiting
397 NOTIFIED => return, // already unparked
398 PARKED => {} // gotta go wake someone up
399 _ => panic!("inconsistent state in unpark"),
400 }
401
402 // There is a period between when the parked thread sets `state` to `PARKED` (or last
403 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
404 // If we were to notify during this period it would be ignored and then when the parked
405 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
406 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
407 //
408 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
409 // it doesn't get woken only to have to wait for us to release `lock`.
410 drop(self.lock.lock().unwrap());
411 self.cvar.notify_one();
412 }
413}