tokio/sync/notify.rs
1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::task::{Context, Poll, Waker};
21
22type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25/// Notifies a single task to wake up.
26///
27/// `Notify` provides a basic mechanism to notify a single task of an event.
28/// `Notify` itself does not carry any data. Instead, it is to be used to signal
29/// another task to perform an operation.
30///
31/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32/// [`notified().await`] method waits for a permit to become available, and
33/// [`notify_one()`] sets a permit **if there currently are no available
34/// permits**.
35///
36/// The synchronization details of `Notify` are similar to
37/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38/// value contains a single permit. [`notified().await`] waits for the permit to
39/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
40/// the permit, waking a pending task if there is one.
41///
42/// If `notify_one()` is called **before** `notified().await`, then the next
43/// call to `notified().await` will complete immediately, consuming the permit.
44/// Any subsequent calls to `notified().await` will wait for a new permit.
45///
46/// If `notify_one()` is called **multiple** times before `notified().await`,
47/// only a **single** permit is stored. The next call to `notified().await` will
48/// complete immediately, but the one after will wait for a new permit.
49///
50/// # Examples
51///
52/// Basic usage.
53///
54/// ```
55/// use tokio::sync::Notify;
56/// use std::sync::Arc;
57///
58/// #[tokio::main]
59/// async fn main() {
60/// let notify = Arc::new(Notify::new());
61/// let notify2 = notify.clone();
62///
63/// let handle = tokio::spawn(async move {
64/// notify2.notified().await;
65/// println!("received notification");
66/// });
67///
68/// println!("sending notification");
69/// notify.notify_one();
70///
71/// // Wait for task to receive notification.
72/// handle.await.unwrap();
73/// }
74/// ```
75///
76/// Unbound multi-producer single-consumer (mpsc) channel.
77///
78/// No wakeups can be lost when using this channel because the call to
79/// `notify_one()` will store a permit in the `Notify`, which the following call
80/// to `notified()` will consume.
81///
82/// ```
83/// use tokio::sync::Notify;
84///
85/// use std::collections::VecDeque;
86/// use std::sync::Mutex;
87///
88/// struct Channel<T> {
89/// values: Mutex<VecDeque<T>>,
90/// notify: Notify,
91/// }
92///
93/// impl<T> Channel<T> {
94/// pub fn send(&self, value: T) {
95/// self.values.lock().unwrap()
96/// .push_back(value);
97///
98/// // Notify the consumer a value is available
99/// self.notify.notify_one();
100/// }
101///
102/// // This is a single-consumer channel, so several concurrent calls to
103/// // `recv` are not allowed.
104/// pub async fn recv(&self) -> T {
105/// loop {
106/// // Drain values
107/// if let Some(value) = self.values.lock().unwrap().pop_front() {
108/// return value;
109/// }
110///
111/// // Wait for values to be available
112/// self.notify.notified().await;
113/// }
114/// }
115/// }
116/// ```
117///
118/// Unbound multi-producer multi-consumer (mpmc) channel.
119///
120/// The call to [`enable`] is important because otherwise if you have two
121/// calls to `recv` and two calls to `send` in parallel, the following could
122/// happen:
123///
124/// 1. Both calls to `try_recv` return `None`.
125/// 2. Both new elements are added to the vector.
126/// 3. The `notify_one` method is called twice, adding only a single
127/// permit to the `Notify`.
128/// 4. Both calls to `recv` reach the `Notified` future. One of them
129/// consumes the permit, and the other sleeps forever.
130///
131/// By adding the `Notified` futures to the list by calling `enable` before
132/// `try_recv`, the `notify_one` calls in step three would remove the
133/// futures from the list and mark them notified instead of adding a permit
134/// to the `Notify`. This ensures that both futures are woken.
135///
136/// Notice that this failure can only happen if there are two concurrent calls
137/// to `recv`. This is why the mpsc example above does not require a call to
138/// `enable`.
139///
140/// ```
141/// use tokio::sync::Notify;
142///
143/// use std::collections::VecDeque;
144/// use std::sync::Mutex;
145///
146/// struct Channel<T> {
147/// messages: Mutex<VecDeque<T>>,
148/// notify_on_sent: Notify,
149/// }
150///
151/// impl<T> Channel<T> {
152/// pub fn send(&self, msg: T) {
153/// let mut locked_queue = self.messages.lock().unwrap();
154/// locked_queue.push_back(msg);
155/// drop(locked_queue);
156///
157/// // Send a notification to one of the calls currently
158/// // waiting in a call to `recv`.
159/// self.notify_on_sent.notify_one();
160/// }
161///
162/// pub fn try_recv(&self) -> Option<T> {
163/// let mut locked_queue = self.messages.lock().unwrap();
164/// locked_queue.pop_front()
165/// }
166///
167/// pub async fn recv(&self) -> T {
168/// let future = self.notify_on_sent.notified();
169/// tokio::pin!(future);
170///
171/// loop {
172/// // Make sure that no wakeup is lost if we get
173/// // `None` from `try_recv`.
174/// future.as_mut().enable();
175///
176/// if let Some(msg) = self.try_recv() {
177/// return msg;
178/// }
179///
180/// // Wait for a call to `notify_one`.
181/// //
182/// // This uses `.as_mut()` to avoid consuming the future,
183/// // which lets us call `Pin::set` below.
184/// future.as_mut().await;
185///
186/// // Reset the future in case another call to
187/// // `try_recv` got the message before us.
188/// future.set(self.notify_on_sent.notified());
189/// }
190/// }
191/// }
192/// ```
193///
194/// [park]: std::thread::park
195/// [unpark]: std::thread::Thread::unpark
196/// [`notified().await`]: Notify::notified()
197/// [`notify_one()`]: Notify::notify_one()
198/// [`enable`]: Notified::enable()
199/// [`Semaphore`]: crate::sync::Semaphore
200#[derive(Debug)]
201pub struct Notify {
202 // `state` uses 2 bits to store one of `EMPTY`,
203 // `WAITING` or `NOTIFIED`. The rest of the bits
204 // are used to store the number of times `notify_waiters`
205 // was called.
206 //
207 // Throughout the code there are two assumptions:
208 // - state can be transitioned *from* `WAITING` only if
209 // `waiters` lock is held
210 // - number of times `notify_waiters` was called can
211 // be modified only if `waiters` lock is held
212 state: AtomicUsize,
213 waiters: Mutex<WaitList>,
214}
215
216#[derive(Debug)]
217struct Waiter {
218 /// Intrusive linked-list pointers.
219 pointers: linked_list::Pointers<Waiter>,
220
221 /// Waiting task's waker. Depending on the value of `notification`,
222 /// this field is either protected by the `waiters` lock in
223 /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224 waker: UnsafeCell<Option<Waker>>,
225
226 /// Notification for this waiter. Uses 2 bits to store if and how was
227 /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
228 /// the rest of it is unused.
229 /// * if it's `None`, then `waker` is protected by the `waiters` lock.
230 /// * if it's `Some`, then `waker` is exclusively owned by the
231 /// enclosing `Waiter` and can be accessed without locking.
232 notification: AtomicNotification,
233
234 /// Should not be `Unpin`.
235 _p: PhantomPinned,
236}
237
238impl Waiter {
239 fn new() -> Waiter {
240 Waiter {
241 pointers: linked_list::Pointers::new(),
242 waker: UnsafeCell::new(None),
243 notification: AtomicNotification::none(),
244 _p: PhantomPinned,
245 }
246 }
247}
248
249generate_addr_of_methods! {
250 impl<> Waiter {
251 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
252 &self.pointers
253 }
254 }
255}
256
257// No notification.
258const NOTIFICATION_NONE: usize = 0b000;
259
260// Notification type used by `notify_one`.
261const NOTIFICATION_ONE: usize = 0b001;
262
263// Notification type used by `notify_last`.
264const NOTIFICATION_LAST: usize = 0b101;
265
266// Notification type used by `notify_waiters`.
267const NOTIFICATION_ALL: usize = 0b010;
268
269/// Notification for a `Waiter`.
270/// This struct is equivalent to `Option<Notification>`, but uses
271/// `AtomicUsize` inside for atomic operations.
272#[derive(Debug)]
273struct AtomicNotification(AtomicUsize);
274
275impl AtomicNotification {
276 fn none() -> Self {
277 AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
278 }
279
280 /// Store-release a notification.
281 /// This method should be called exactly once.
282 fn store_release(&self, notification: Notification) {
283 let data: usize = match notification {
284 Notification::All => NOTIFICATION_ALL,
285 Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
286 Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
287 };
288 self.0.store(data, Release);
289 }
290
291 fn load(&self, ordering: Ordering) -> Option<Notification> {
292 let data = self.0.load(ordering);
293 match data {
294 NOTIFICATION_NONE => None,
295 NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
296 NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
297 NOTIFICATION_ALL => Some(Notification::All),
298 _ => unreachable!(),
299 }
300 }
301
302 /// Clears the notification.
303 /// This method is used by a `Notified` future to consume the
304 /// notification. It uses relaxed ordering and should be only
305 /// used once the atomic notification is no longer shared.
306 fn clear(&self) {
307 self.0.store(NOTIFICATION_NONE, Relaxed);
308 }
309}
310
311#[derive(Debug, PartialEq, Eq)]
312#[repr(usize)]
313enum NotifyOneStrategy {
314 Fifo,
315 Lifo,
316}
317
318#[derive(Debug, PartialEq, Eq)]
319#[repr(usize)]
320enum Notification {
321 One(NotifyOneStrategy),
322 All,
323}
324
325/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
326/// and gates the access to it on `notify.waiters` mutex. It also empties
327/// the list on drop.
328struct NotifyWaitersList<'a> {
329 list: GuardedWaitList,
330 is_empty: bool,
331 notify: &'a Notify,
332}
333
334impl<'a> NotifyWaitersList<'a> {
335 fn new(
336 unguarded_list: WaitList,
337 guard: Pin<&'a Waiter>,
338 notify: &'a Notify,
339 ) -> NotifyWaitersList<'a> {
340 let guard_ptr = NonNull::from(guard.get_ref());
341 let list = unguarded_list.into_guarded(guard_ptr);
342 NotifyWaitersList {
343 list,
344 is_empty: false,
345 notify,
346 }
347 }
348
349 /// Removes the last element from the guarded list. Modifying this list
350 /// requires an exclusive access to the main list in `Notify`.
351 fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
352 let result = self.list.pop_back();
353 if result.is_none() {
354 // Save information about emptiness to avoid waiting for lock
355 // in the destructor.
356 self.is_empty = true;
357 }
358 result
359 }
360}
361
362impl Drop for NotifyWaitersList<'_> {
363 fn drop(&mut self) {
364 // If the list is not empty, we unlink all waiters from it.
365 // We do not wake the waiters to avoid double panics.
366 if !self.is_empty {
367 let _lock_guard = self.notify.waiters.lock();
368 while let Some(waiter) = self.list.pop_back() {
369 // Safety: we never make mutable references to waiters.
370 let waiter = unsafe { waiter.as_ref() };
371 waiter.notification.store_release(Notification::All);
372 }
373 }
374 }
375}
376
377/// Future returned from [`Notify::notified()`].
378///
379/// This future is fused, so once it has completed, any future calls to poll
380/// will immediately return `Poll::Ready`.
381#[derive(Debug)]
382pub struct Notified<'a> {
383 /// The `Notify` being received on.
384 notify: &'a Notify,
385
386 /// The current state of the receiving process.
387 state: State,
388
389 /// Number of calls to `notify_waiters` at the time of creation.
390 notify_waiters_calls: usize,
391
392 /// Entry in the waiter `LinkedList`.
393 waiter: Waiter,
394}
395
396unsafe impl<'a> Send for Notified<'a> {}
397unsafe impl<'a> Sync for Notified<'a> {}
398
399#[derive(Debug)]
400enum State {
401 Init,
402 Waiting,
403 Done,
404}
405
406const NOTIFY_WAITERS_SHIFT: usize = 2;
407const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
408const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
409
410/// Initial "idle" state.
411const EMPTY: usize = 0;
412
413/// One or more threads are currently waiting to be notified.
414const WAITING: usize = 1;
415
416/// Pending notification.
417const NOTIFIED: usize = 2;
418
419fn set_state(data: usize, state: usize) -> usize {
420 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
421}
422
423fn get_state(data: usize) -> usize {
424 data & STATE_MASK
425}
426
427fn get_num_notify_waiters_calls(data: usize) -> usize {
428 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
429}
430
431fn inc_num_notify_waiters_calls(data: usize) -> usize {
432 data + (1 << NOTIFY_WAITERS_SHIFT)
433}
434
435fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
436 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
437}
438
439impl Notify {
440 /// Create a new `Notify`, initialized without a permit.
441 ///
442 /// # Examples
443 ///
444 /// ```
445 /// use tokio::sync::Notify;
446 ///
447 /// let notify = Notify::new();
448 /// ```
449 pub fn new() -> Notify {
450 Notify {
451 state: AtomicUsize::new(0),
452 waiters: Mutex::new(LinkedList::new()),
453 }
454 }
455
456 /// Create a new `Notify`, initialized without a permit.
457 ///
458 /// When using the `tracing` [unstable feature], a `Notify` created with
459 /// `const_new` will not be instrumented. As such, it will not be visible
460 /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
461 /// an instrumented object if that is needed.
462 ///
463 /// # Examples
464 ///
465 /// ```
466 /// use tokio::sync::Notify;
467 ///
468 /// static NOTIFY: Notify = Notify::const_new();
469 /// ```
470 ///
471 /// [`tokio-console`]: https://github.com/tokio-rs/console
472 /// [unstable feature]: crate#unstable-features
473 #[cfg(not(all(loom, test)))]
474 pub const fn const_new() -> Notify {
475 Notify {
476 state: AtomicUsize::new(0),
477 waiters: Mutex::const_new(LinkedList::new()),
478 }
479 }
480
481 /// Wait for a notification.
482 ///
483 /// Equivalent to:
484 ///
485 /// ```ignore
486 /// async fn notified(&self);
487 /// ```
488 ///
489 /// Each `Notify` value holds a single permit. If a permit is available from
490 /// an earlier call to [`notify_one()`], then `notified().await` will complete
491 /// immediately, consuming that permit. Otherwise, `notified().await` waits
492 /// for a permit to be made available by the next call to `notify_one()`.
493 ///
494 /// The `Notified` future is not guaranteed to receive wakeups from calls to
495 /// `notify_one()` if it has not yet been polled. See the documentation for
496 /// [`Notified::enable()`] for more details.
497 ///
498 /// The `Notified` future is guaranteed to receive wakeups from
499 /// `notify_waiters()` as soon as it has been created, even if it has not
500 /// yet been polled.
501 ///
502 /// [`notify_one()`]: Notify::notify_one
503 /// [`Notified::enable()`]: Notified::enable
504 ///
505 /// # Cancel safety
506 ///
507 /// This method uses a queue to fairly distribute notifications in the order
508 /// they were requested. Cancelling a call to `notified` makes you lose your
509 /// place in the queue.
510 ///
511 /// # Examples
512 ///
513 /// ```
514 /// use tokio::sync::Notify;
515 /// use std::sync::Arc;
516 ///
517 /// #[tokio::main]
518 /// async fn main() {
519 /// let notify = Arc::new(Notify::new());
520 /// let notify2 = notify.clone();
521 ///
522 /// tokio::spawn(async move {
523 /// notify2.notified().await;
524 /// println!("received notification");
525 /// });
526 ///
527 /// println!("sending notification");
528 /// notify.notify_one();
529 /// }
530 /// ```
531 pub fn notified(&self) -> Notified<'_> {
532 // we load the number of times notify_waiters
533 // was called and store that in the future.
534 let state = self.state.load(SeqCst);
535 Notified {
536 notify: self,
537 state: State::Init,
538 notify_waiters_calls: get_num_notify_waiters_calls(state),
539 waiter: Waiter::new(),
540 }
541 }
542
543 /// Notifies the first waiting task.
544 ///
545 /// If a task is currently waiting, that task is notified. Otherwise, a
546 /// permit is stored in this `Notify` value and the **next** call to
547 /// [`notified().await`] will complete immediately consuming the permit made
548 /// available by this call to `notify_one()`.
549 ///
550 /// At most one permit may be stored by `Notify`. Many sequential calls to
551 /// `notify_one` will result in a single permit being stored. The next call to
552 /// `notified().await` will complete immediately, but the one after that
553 /// will wait.
554 ///
555 /// [`notified().await`]: Notify::notified()
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// use tokio::sync::Notify;
561 /// use std::sync::Arc;
562 ///
563 /// #[tokio::main]
564 /// async fn main() {
565 /// let notify = Arc::new(Notify::new());
566 /// let notify2 = notify.clone();
567 ///
568 /// tokio::spawn(async move {
569 /// notify2.notified().await;
570 /// println!("received notification");
571 /// });
572 ///
573 /// println!("sending notification");
574 /// notify.notify_one();
575 /// }
576 /// ```
577 // Alias for old name in 0.x
578 #[cfg_attr(docsrs, doc(alias = "notify"))]
579 pub fn notify_one(&self) {
580 self.notify_with_strategy(NotifyOneStrategy::Fifo);
581 }
582
583 /// Notifies the last waiting task.
584 ///
585 /// This function behaves similar to `notify_one`. The only difference is that it wakes
586 /// the most recently added waiter instead of the oldest waiter.
587 ///
588 /// Check the [`notify_one()`] documentation for more info and
589 /// examples.
590 ///
591 /// [`notify_one()`]: Notify::notify_one
592 pub fn notify_last(&self) {
593 self.notify_with_strategy(NotifyOneStrategy::Lifo);
594 }
595
596 fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
597 // Load the current state
598 let mut curr = self.state.load(SeqCst);
599
600 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
601 while let EMPTY | NOTIFIED = get_state(curr) {
602 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
603 // happens-before synchronization must happen between this atomic
604 // operation and a task calling `notified().await`.
605 let new = set_state(curr, NOTIFIED);
606 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
607
608 match res {
609 // No waiters, no further work to do
610 Ok(_) => return,
611 Err(actual) => {
612 curr = actual;
613 }
614 }
615 }
616
617 // There are waiters, the lock must be acquired to notify.
618 let mut waiters = self.waiters.lock();
619
620 // The state must be reloaded while the lock is held. The state may only
621 // transition out of WAITING while the lock is held.
622 curr = self.state.load(SeqCst);
623
624 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
625 drop(waiters);
626 waker.wake();
627 }
628 }
629
630 /// Notifies all waiting tasks.
631 ///
632 /// If a task is currently waiting, that task is notified. Unlike with
633 /// `notify_one()`, no permit is stored to be used by the next call to
634 /// `notified().await`. The purpose of this method is to notify all
635 /// already registered waiters. Registering for notification is done by
636 /// acquiring an instance of the `Notified` future via calling `notified()`.
637 ///
638 /// # Examples
639 ///
640 /// ```
641 /// use tokio::sync::Notify;
642 /// use std::sync::Arc;
643 ///
644 /// #[tokio::main]
645 /// async fn main() {
646 /// let notify = Arc::new(Notify::new());
647 /// let notify2 = notify.clone();
648 ///
649 /// let notified1 = notify.notified();
650 /// let notified2 = notify.notified();
651 ///
652 /// let handle = tokio::spawn(async move {
653 /// println!("sending notifications");
654 /// notify2.notify_waiters();
655 /// });
656 ///
657 /// notified1.await;
658 /// notified2.await;
659 /// println!("received notifications");
660 /// }
661 /// ```
662 pub fn notify_waiters(&self) {
663 let mut waiters = self.waiters.lock();
664
665 // The state must be loaded while the lock is held. The state may only
666 // transition out of WAITING while the lock is held.
667 let curr = self.state.load(SeqCst);
668
669 if matches!(get_state(curr), EMPTY | NOTIFIED) {
670 // There are no waiting tasks. All we need to do is increment the
671 // number of times this method was called.
672 atomic_inc_num_notify_waiters_calls(&self.state);
673 return;
674 }
675
676 // Increment the number of times this method was called
677 // and transition to empty.
678 let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
679 self.state.store(new_state, SeqCst);
680
681 // It is critical for `GuardedLinkedList` safety that the guard node is
682 // pinned in memory and is not dropped until the guarded list is dropped.
683 let guard = Waiter::new();
684 pin!(guard);
685
686 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
687 // underneath to allow every waiter to safely remove itself from it.
688 //
689 // * This list will be still guarded by the `waiters` lock.
690 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
691 // * This wrapper will empty the list on drop. It is critical for safety
692 // that we will not leave any list entry with a pointer to the local
693 // guard node after this function returns / panics.
694 let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
695
696 let mut wakers = WakeList::new();
697 'outer: loop {
698 while wakers.can_push() {
699 match list.pop_back_locked(&mut waiters) {
700 Some(waiter) => {
701 // Safety: we never make mutable references to waiters.
702 let waiter = unsafe { waiter.as_ref() };
703
704 // Safety: we hold the lock, so we can access the waker.
705 if let Some(waker) =
706 unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
707 {
708 wakers.push(waker);
709 }
710
711 // This waiter is unlinked and will not be shared ever again, release it.
712 waiter.notification.store_release(Notification::All);
713 }
714 None => {
715 break 'outer;
716 }
717 }
718 }
719
720 // Release the lock before notifying.
721 drop(waiters);
722
723 // One of the wakers may panic, but the remaining waiters will still
724 // be unlinked from the list in `NotifyWaitersList` destructor.
725 wakers.wake_all();
726
727 // Acquire the lock again.
728 waiters = self.waiters.lock();
729 }
730
731 // Release the lock before notifying
732 drop(waiters);
733
734 wakers.wake_all();
735 }
736}
737
738impl Default for Notify {
739 fn default() -> Notify {
740 Notify::new()
741 }
742}
743
744impl UnwindSafe for Notify {}
745impl RefUnwindSafe for Notify {}
746
747fn notify_locked(
748 waiters: &mut WaitList,
749 state: &AtomicUsize,
750 curr: usize,
751 strategy: NotifyOneStrategy,
752) -> Option<Waker> {
753 match get_state(curr) {
754 EMPTY | NOTIFIED => {
755 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
756
757 match res {
758 Ok(_) => None,
759 Err(actual) => {
760 let actual_state = get_state(actual);
761 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
762 state.store(set_state(actual, NOTIFIED), SeqCst);
763 None
764 }
765 }
766 }
767 WAITING => {
768 // At this point, it is guaranteed that the state will not
769 // concurrently change as holding the lock is required to
770 // transition **out** of `WAITING`.
771 //
772 // Get a pending waiter using one of the available dequeue strategies.
773 let waiter = match strategy {
774 NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
775 NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
776 };
777
778 // Safety: we never make mutable references to waiters.
779 let waiter = unsafe { waiter.as_ref() };
780
781 // Safety: we hold the lock, so we can access the waker.
782 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
783
784 // This waiter is unlinked and will not be shared ever again, release it.
785 waiter
786 .notification
787 .store_release(Notification::One(strategy));
788
789 if waiters.is_empty() {
790 // As this the **final** waiter in the list, the state
791 // must be transitioned to `EMPTY`. As transitioning
792 // **from** `WAITING` requires the lock to be held, a
793 // `store` is sufficient.
794 state.store(set_state(curr, EMPTY), SeqCst);
795 }
796 waker
797 }
798 _ => unreachable!(),
799 }
800}
801
802// ===== impl Notified =====
803
804impl Notified<'_> {
805 /// Adds this future to the list of futures that are ready to receive
806 /// wakeups from calls to [`notify_one`].
807 ///
808 /// Polling the future also adds it to the list, so this method should only
809 /// be used if you want to add the future to the list before the first call
810 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
811 /// that no `Waker` is registered.)
812 ///
813 /// This has no effect on notifications sent using [`notify_waiters`], which
814 /// are received as long as they happen after the creation of the `Notified`
815 /// regardless of whether `enable` or `poll` has been called.
816 ///
817 /// This method returns true if the `Notified` is ready. This happens in the
818 /// following situations:
819 ///
820 /// 1. The `notify_waiters` method was called between the creation of the
821 /// `Notified` and the call to this method.
822 /// 2. This is the first call to `enable` or `poll` on this future, and the
823 /// `Notify` was holding a permit from a previous call to `notify_one`.
824 /// The call consumes the permit in that case.
825 /// 3. The future has previously been enabled or polled, and it has since
826 /// then been marked ready by either consuming a permit from the
827 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
828 /// removed it from the list of futures ready to receive wakeups.
829 ///
830 /// If this method returns true, any future calls to poll on the same future
831 /// will immediately return `Poll::Ready`.
832 ///
833 /// # Examples
834 ///
835 /// Unbound multi-producer multi-consumer (mpmc) channel.
836 ///
837 /// The call to `enable` is important because otherwise if you have two
838 /// calls to `recv` and two calls to `send` in parallel, the following could
839 /// happen:
840 ///
841 /// 1. Both calls to `try_recv` return `None`.
842 /// 2. Both new elements are added to the vector.
843 /// 3. The `notify_one` method is called twice, adding only a single
844 /// permit to the `Notify`.
845 /// 4. Both calls to `recv` reach the `Notified` future. One of them
846 /// consumes the permit, and the other sleeps forever.
847 ///
848 /// By adding the `Notified` futures to the list by calling `enable` before
849 /// `try_recv`, the `notify_one` calls in step three would remove the
850 /// futures from the list and mark them notified instead of adding a permit
851 /// to the `Notify`. This ensures that both futures are woken.
852 ///
853 /// ```
854 /// use tokio::sync::Notify;
855 ///
856 /// use std::collections::VecDeque;
857 /// use std::sync::Mutex;
858 ///
859 /// struct Channel<T> {
860 /// messages: Mutex<VecDeque<T>>,
861 /// notify_on_sent: Notify,
862 /// }
863 ///
864 /// impl<T> Channel<T> {
865 /// pub fn send(&self, msg: T) {
866 /// let mut locked_queue = self.messages.lock().unwrap();
867 /// locked_queue.push_back(msg);
868 /// drop(locked_queue);
869 ///
870 /// // Send a notification to one of the calls currently
871 /// // waiting in a call to `recv`.
872 /// self.notify_on_sent.notify_one();
873 /// }
874 ///
875 /// pub fn try_recv(&self) -> Option<T> {
876 /// let mut locked_queue = self.messages.lock().unwrap();
877 /// locked_queue.pop_front()
878 /// }
879 ///
880 /// pub async fn recv(&self) -> T {
881 /// let future = self.notify_on_sent.notified();
882 /// tokio::pin!(future);
883 ///
884 /// loop {
885 /// // Make sure that no wakeup is lost if we get
886 /// // `None` from `try_recv`.
887 /// future.as_mut().enable();
888 ///
889 /// if let Some(msg) = self.try_recv() {
890 /// return msg;
891 /// }
892 ///
893 /// // Wait for a call to `notify_one`.
894 /// //
895 /// // This uses `.as_mut()` to avoid consuming the future,
896 /// // which lets us call `Pin::set` below.
897 /// future.as_mut().await;
898 ///
899 /// // Reset the future in case another call to
900 /// // `try_recv` got the message before us.
901 /// future.set(self.notify_on_sent.notified());
902 /// }
903 /// }
904 /// }
905 /// ```
906 ///
907 /// [`notify_one`]: Notify::notify_one()
908 /// [`notify_waiters`]: Notify::notify_waiters()
909 pub fn enable(self: Pin<&mut Self>) -> bool {
910 self.poll_notified(None).is_ready()
911 }
912
913 /// A custom `project` implementation is used in place of `pin-project-lite`
914 /// as a custom drop implementation is needed.
915 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
916 unsafe {
917 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
918
919 is_unpin::<&Notify>();
920 is_unpin::<State>();
921 is_unpin::<usize>();
922
923 let me = self.get_unchecked_mut();
924 (
925 me.notify,
926 &mut me.state,
927 &me.notify_waiters_calls,
928 &me.waiter,
929 )
930 }
931 }
932
933 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
934 let (notify, state, notify_waiters_calls, waiter) = self.project();
935
936 'outer_loop: loop {
937 match *state {
938 State::Init => {
939 let curr = notify.state.load(SeqCst);
940
941 // Optimistically try acquiring a pending notification
942 let res = notify.state.compare_exchange(
943 set_state(curr, NOTIFIED),
944 set_state(curr, EMPTY),
945 SeqCst,
946 SeqCst,
947 );
948
949 if res.is_ok() {
950 // Acquired the notification
951 *state = State::Done;
952 continue 'outer_loop;
953 }
954
955 // Clone the waker before locking, a waker clone can be
956 // triggering arbitrary code.
957 let waker = waker.cloned();
958
959 // Acquire the lock and attempt to transition to the waiting
960 // state.
961 let mut waiters = notify.waiters.lock();
962
963 // Reload the state with the lock held
964 let mut curr = notify.state.load(SeqCst);
965
966 // if notify_waiters has been called after the future
967 // was created, then we are done
968 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
969 *state = State::Done;
970 continue 'outer_loop;
971 }
972
973 // Transition the state to WAITING.
974 loop {
975 match get_state(curr) {
976 EMPTY => {
977 // Transition to WAITING
978 let res = notify.state.compare_exchange(
979 set_state(curr, EMPTY),
980 set_state(curr, WAITING),
981 SeqCst,
982 SeqCst,
983 );
984
985 if let Err(actual) = res {
986 assert_eq!(get_state(actual), NOTIFIED);
987 curr = actual;
988 } else {
989 break;
990 }
991 }
992 WAITING => break,
993 NOTIFIED => {
994 // Try consuming the notification
995 let res = notify.state.compare_exchange(
996 set_state(curr, NOTIFIED),
997 set_state(curr, EMPTY),
998 SeqCst,
999 SeqCst,
1000 );
1001
1002 match res {
1003 Ok(_) => {
1004 // Acquired the notification
1005 *state = State::Done;
1006 continue 'outer_loop;
1007 }
1008 Err(actual) => {
1009 assert_eq!(get_state(actual), EMPTY);
1010 curr = actual;
1011 }
1012 }
1013 }
1014 _ => unreachable!(),
1015 }
1016 }
1017
1018 let mut old_waker = None;
1019 if waker.is_some() {
1020 // Safety: called while locked.
1021 //
1022 // The use of `old_waiter` here is not necessary, as the field is always
1023 // None when we reach this line.
1024 unsafe {
1025 old_waker =
1026 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1027 }
1028 }
1029
1030 // Insert the waiter into the linked list
1031 waiters.push_front(NonNull::from(waiter));
1032
1033 *state = State::Waiting;
1034
1035 drop(waiters);
1036 drop(old_waker);
1037
1038 return Poll::Pending;
1039 }
1040 State::Waiting => {
1041 #[cfg(tokio_taskdump)]
1042 if let Some(waker) = waker {
1043 let mut ctx = Context::from_waker(waker);
1044 ready!(crate::trace::trace_leaf(&mut ctx));
1045 }
1046
1047 if waiter.notification.load(Acquire).is_some() {
1048 // Safety: waiter is already unlinked and will not be shared again,
1049 // so we have an exclusive access to `waker`.
1050 drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1051
1052 waiter.notification.clear();
1053 *state = State::Done;
1054 return Poll::Ready(());
1055 }
1056
1057 // Our waiter was not notified, implying it is still stored in a waiter
1058 // list (guarded by `notify.waiters`). In order to access the waker
1059 // fields, we must acquire the lock.
1060
1061 let mut old_waker = None;
1062 let mut waiters = notify.waiters.lock();
1063
1064 // We hold the lock and notifications are set only with the lock held,
1065 // so this can be relaxed, because the happens-before relationship is
1066 // established through the mutex.
1067 if waiter.notification.load(Relaxed).is_some() {
1068 // Safety: waiter is already unlinked and will not be shared again,
1069 // so we have an exclusive access to `waker`.
1070 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1071
1072 waiter.notification.clear();
1073
1074 // Drop the old waker after releasing the lock.
1075 drop(waiters);
1076 drop(old_waker);
1077
1078 *state = State::Done;
1079 return Poll::Ready(());
1080 }
1081
1082 // Load the state with the lock held.
1083 let curr = notify.state.load(SeqCst);
1084
1085 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1086 // Before we add a waiter to the list we check if these numbers are
1087 // different while holding the lock. If these numbers are different now,
1088 // it means that there is a call to `notify_waiters` in progress and this
1089 // waiter must be contained by a guarded list used in `notify_waiters`.
1090 // We can treat the waiter as notified and remove it from the list, as
1091 // it would have been notified in the `notify_waiters` call anyways.
1092
1093 // Safety: we hold the lock, so we can modify the waker.
1094 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1095
1096 // Safety: we hold the lock, so we have an exclusive access to the list.
1097 // The list is used in `notify_waiters`, so it must be guarded.
1098 unsafe { waiters.remove(NonNull::from(waiter)) };
1099
1100 *state = State::Done;
1101 } else {
1102 // Safety: we hold the lock, so we can modify the waker.
1103 unsafe {
1104 waiter.waker.with_mut(|v| {
1105 if let Some(waker) = waker {
1106 let should_update = match &*v {
1107 Some(current_waker) => !current_waker.will_wake(waker),
1108 None => true,
1109 };
1110 if should_update {
1111 old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1112 }
1113 }
1114 });
1115 }
1116
1117 // Drop the old waker after releasing the lock.
1118 drop(waiters);
1119 drop(old_waker);
1120
1121 return Poll::Pending;
1122 }
1123
1124 // Explicit drop of the lock to indicate the scope that the
1125 // lock is held. Because holding the lock is required to
1126 // ensure safe access to fields not held within the lock, it
1127 // is helpful to visualize the scope of the critical
1128 // section.
1129 drop(waiters);
1130
1131 // Drop the old waker after releasing the lock.
1132 drop(old_waker);
1133 }
1134 State::Done => {
1135 #[cfg(tokio_taskdump)]
1136 if let Some(waker) = waker {
1137 let mut ctx = Context::from_waker(waker);
1138 ready!(crate::trace::trace_leaf(&mut ctx));
1139 }
1140 return Poll::Ready(());
1141 }
1142 }
1143 }
1144 }
1145}
1146
1147impl Future for Notified<'_> {
1148 type Output = ();
1149
1150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1151 self.poll_notified(Some(cx.waker()))
1152 }
1153}
1154
1155impl Drop for Notified<'_> {
1156 fn drop(&mut self) {
1157 // Safety: The type only transitions to a "Waiting" state when pinned.
1158 let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1159
1160 // This is where we ensure safety. The `Notified` value is being
1161 // dropped, which means we must ensure that the waiter entry is no
1162 // longer stored in the linked list.
1163 if matches!(*state, State::Waiting) {
1164 let mut waiters = notify.waiters.lock();
1165 let mut notify_state = notify.state.load(SeqCst);
1166
1167 // We hold the lock, so this field is not concurrently accessed by
1168 // `notify_*` functions and we can use the relaxed ordering.
1169 let notification = waiter.notification.load(Relaxed);
1170
1171 // remove the entry from the list (if not already removed)
1172 //
1173 // Safety: we hold the lock, so we have an exclusive access to every list the
1174 // waiter may be contained in. If the node is not contained in the `waiters`
1175 // list, then it is contained by a guarded list used by `notify_waiters`.
1176 unsafe { waiters.remove(NonNull::from(waiter)) };
1177
1178 if waiters.is_empty() && get_state(notify_state) == WAITING {
1179 notify_state = set_state(notify_state, EMPTY);
1180 notify.state.store(notify_state, SeqCst);
1181 }
1182
1183 // See if the node was notified but not received. In this case, if
1184 // the notification was triggered via `notify_one`, it must be sent
1185 // to the next waiter.
1186 if let Some(Notification::One(strategy)) = notification {
1187 if let Some(waker) =
1188 notify_locked(&mut waiters, ¬ify.state, notify_state, strategy)
1189 {
1190 drop(waiters);
1191 waker.wake();
1192 }
1193 }
1194 }
1195 }
1196}
1197
1198/// # Safety
1199///
1200/// `Waiter` is forced to be !Unpin.
1201unsafe impl linked_list::Link for Waiter {
1202 type Handle = NonNull<Waiter>;
1203 type Target = Waiter;
1204
1205 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1206 *handle
1207 }
1208
1209 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1210 ptr
1211 }
1212
1213 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1214 Waiter::addr_of_pointers(target)
1215 }
1216}
1217
1218fn is_unpin<T: Unpin>() {}