async_task/header.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
use core::cell::UnsafeCell;
use core::fmt;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::Waker;
use crate::raw::TaskVTable;
use crate::state::*;
use crate::utils::abort_on_panic;
/// The header of a task.
///
/// This header is stored in memory at the beginning of the heap-allocated task.
pub(crate) struct Header {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
pub(crate) state: AtomicUsize,
/// The task that is blocked on the `Task` handle.
///
/// This waker needs to be woken up once the task completes or is closed.
pub(crate) awaiter: UnsafeCell<Option<Waker>>,
/// The virtual table.
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,
}
impl Header {
/// Notifies the awaiter blocked on this task.
///
/// If the awaiter is the same as the current waker, it will not be notified.
#[inline]
pub(crate) fn notify(&self, current: Option<&Waker>) {
if let Some(w) = self.take(current) {
abort_on_panic(|| w.wake());
}
}
/// Takes the awaiter blocked on this task.
///
/// If there is no awaiter or if it is the same as the current waker, returns `None`.
#[inline]
pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> {
// Set the bit indicating that the task is notifying its awaiter.
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
// If the task was not notifying or registering an awaiter...
if state & (NOTIFYING | REGISTERING) == 0 {
// Take the waker out.
let waker = unsafe { (*self.awaiter.get()).take() };
// Unset the bit indicating that the task is notifying its awaiter.
self.state
.fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
// Finally, notify the waker if it's different from the current waker.
if let Some(w) = waker {
match current {
None => return Some(w),
Some(c) if !w.will_wake(c) => return Some(w),
Some(_) => abort_on_panic(|| drop(w)),
}
}
}
None
}
/// Registers a new awaiter blocked on this task.
///
/// This method is called when `Task` is polled and it has not yet completed.
#[inline]
pub(crate) fn register(&self, waker: &Waker) {
// Load the state and synchronize with it.
let mut state = self.state.fetch_or(0, Ordering::Acquire);
loop {
// There can't be two concurrent registrations because `Task` can only be polled
// by a unique pinned reference.
debug_assert!(state & REGISTERING == 0);
// If we're in the notifying state at this moment, just wake and return without
// registering.
if state & NOTIFYING != 0 {
abort_on_panic(|| waker.wake_by_ref());
return;
}
// Mark the state to let other threads know we're registering a new awaiter.
match self.state.compare_exchange_weak(
state,
state | REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
state |= REGISTERING;
break;
}
Err(s) => state = s,
}
}
// Put the waker into the awaiter field.
unsafe {
abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
}
// This variable will contain the newly registered waker if a notification comes in before
// we complete registration.
let mut waker = None;
loop {
// If there was a notification, take the waker out of the awaiter field.
if state & NOTIFYING != 0 {
if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
abort_on_panic(|| waker = Some(w));
}
}
// The new state is not being notified nor registered, but there might or might not be
// an awaiter depending on whether there was a concurrent notification.
let new = if waker.is_none() {
(state & !NOTIFYING & !REGISTERING) | AWAITER
} else {
state & !NOTIFYING & !REGISTERING & !AWAITER
};
match self
.state
.compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => break,
Err(s) => state = s,
}
}
// If there was a notification during registration, wake the awaiter now.
if let Some(w) = waker {
abort_on_panic(|| w.wake());
}
}
}
impl fmt::Debug for Header {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
f.debug_struct("Header")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("task", &(state & TASK != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
}