fuchsia_async/atomic_future.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use futures::ready;
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::task::{Context, Poll};
/// A lock-free thread-safe future.
// The debugger knows the layout so that async backtraces work, so if this changes the debugger
// might need to be changed too.
// LINT.IfChange
pub struct AtomicFuture<'a> {
// A bitfield (holds the bits INACTIVE, READY or DONE).
state: AtomicUsize,
// `future` is safe to access after successfully clearing the INACTIVE bit and the `DONE` bit
// isn't set.
future: UnsafeCell<Box<dyn FutureOrResultAccess<'a>>>,
}
// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
trait FutureOrResultAccess<'a>: 'a {
/// Drops the future.
///
/// # Safety
///
/// The caller must ensure the future hasn't been dropped.
// zxdb uses this method to figure out the concrete type of the future and it currently assumes
// it is the first method in the trait.
// LINT.IfChange
unsafe fn drop_future(&mut self);
// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
/// Polls the future.
///
/// # Safety
///
/// The caller must ensure the future hasn't been dropped.
unsafe fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()>;
/// Gets the result.
///
/// # Safety
///
/// The caller must ensure the future is finished and the result hasn't been taken or dropped.
unsafe fn get_result(&self) -> *const ();
/// Drops the result.
///
/// # Safety
///
/// The caller must ensure the future is finished and the result hasn't already been taken or
/// dropped.
unsafe fn drop_result(&mut self);
}
union FutureOrResult<'a, F: 'a, R: 'a> {
future: ManuallyDrop<F>,
result: ManuallyDrop<R>,
lifetime: PhantomData<&'a ()>,
}
impl<'a, F: Future<Output = R> + 'a, R: 'a> FutureOrResultAccess<'a> for FutureOrResult<'a, F, R> {
unsafe fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let result = ready!(Pin::new_unchecked(&mut *self.future).poll(cx));
// This might panic which will leave ourselves in a bad state. We deal with this by
// aborting (see below).
ManuallyDrop::drop(&mut self.future);
self.result = ManuallyDrop::new(result);
Poll::Ready(())
}
unsafe fn drop_future(&mut self) {
ManuallyDrop::drop(&mut self.future);
}
unsafe fn get_result(&self) -> *const () {
&*self.result as *const R as *const ()
}
unsafe fn drop_result(&mut self) {
ManuallyDrop::drop(&mut self.result);
}
}
/// `AtomicFuture` is safe to access from multiple threads at once.
unsafe impl Sync for AtomicFuture<'_> {}
unsafe impl Send for AtomicFuture<'_> {}
/// State Bits
// Exclusive access is gained by clearing this bit.
const INACTIVE: usize = 1 << 0;
// Set to indicate the future needs to be polled again.
const READY: usize = 1 << 1;
// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
// can be set, including READY (which has no meaning).
const DONE: usize = 1 << 2;
// The task has been detached.
const DETACHED: usize = 1 << 3;
// The task has been cancelled.
const CANCELLED: usize = 1 << 4;
// The result has been taken.
const RESULT_TAKEN: usize = 1 << 5;
/// The result of a call to `try_poll`.
/// This indicates the result of attempting to `poll` the future.
#[derive(Debug)]
pub enum AttemptPollResult {
/// The future was being polled by another thread, but it was notified
/// to poll at least once more before yielding.
Busy,
/// The future was polled, but did not complete.
Pending,
/// The future was polled and finished by this thread.
/// This result is normally used to trigger garbage-collection of the future.
IFinished,
/// The future was already completed by another thread.
SomeoneElseFinished,
/// The future was polled, did not complete, but it is woken whilst it is polled so it
/// should be polled again.
Yield,
/// The future was cancelled.
Cancelled,
}
/// The result of calling the `cancel_and_detach` function.
#[must_use]
pub enum CancelAndDetachResult {
/// The future has finished; it can be dropped.
Done,
/// The future needs to be added to a run queue to be cancelled.
AddToRunQueue,
/// The future is soon to be cancelled and nothing needs to be done.
Pending,
}
impl<'a> AtomicFuture<'a> {
/// Create a new `AtomicFuture`.
pub fn new<F: Future<Output = R> + Send + 'a, R: Send + 'a>(future: F, detached: bool) -> Self {
unsafe { Self::new_local(future, detached) }
}
/// Create a new `AtomicFuture` from a !Send future.
///
/// # Safety
///
/// The caller must uphold the Send requirements.
pub unsafe fn new_local<F: Future<Output = R> + 'a, R: 'a>(future: F, detached: bool) -> Self {
AtomicFuture {
state: AtomicUsize::new(
INACTIVE + {
if detached {
DETACHED
} else {
0
}
},
),
future: UnsafeCell::new(Box::new(FutureOrResult { future: ManuallyDrop::new(future) })),
}
}
/// Attempt to poll the underlying future.
///
/// `try_poll` ensures that the future is polled at least once more
/// unless it has already finished.
pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
loop {
// Attempt to acquire sole responsibility for polling the future (by clearing the
// INACTIVE bit) and also clear the READY bit at the same time so that we track if it
// becomes READY again whilst we are polling.
let old = self.state.fetch_and(!(INACTIVE | READY), Acquire);
if old & DONE != 0 {
// Someone else completed this future already
return AttemptPollResult::SomeoneElseFinished;
}
if old & INACTIVE != 0 {
// We are now the (only) active worker, proceed to poll...
if old & CANCELLED != 0 {
// The future was cancelled.
// SAFETY: We have exclusive access.
unsafe {
self.drop_future_unchecked();
}
return AttemptPollResult::Cancelled;
}
break;
}
// Future was already active; this shouldn't really happen because we shouldn't be
// polling it from multiple threads at the same time. Still, we handle it by setting
// the READY bit so that it gets polled again. We do this regardless of whether we
// cleared the READY bit above.
let old = self.state.fetch_or(READY, Relaxed);
// If the future is still active, or the future was already marked as ready, we can
// just return and it will get polled again.
if old & INACTIVE == 0 || old & READY != 0 {
return AttemptPollResult::Pending;
}
// The worker finished, and we marked the future as ready, so we must try again because
// the future won't be in a run queue.
}
// We cannot recover from panics.
struct Bomb;
impl Drop for Bomb {
fn drop(&mut self) {
std::process::abort();
}
}
let bomb = Bomb;
// This `UnsafeCell` access is valid because `self.future.get()` is only called here, inside
// the critical section where we performed the transition from INACTIVE to ACTIVE.
let result = unsafe { (*self.future.get()).poll(cx) };
std::mem::forget(bomb);
if let Poll::Ready(()) = result {
// The future will have been dropped, so we just need to set the state.
//
// This needs to be Release ordering because we need to synchronize with another thread
// that takes the result.
self.state.fetch_or(DONE, Release);
// No one else will read `future` unless they see `INACTIVE`, which will never
// happen again.
AttemptPollResult::IFinished
} else if self.state.fetch_or(INACTIVE, Release) & READY == 0 {
AttemptPollResult::Pending
} else {
// The future was marked ready whilst we were polling, so yield.
AttemptPollResult::Yield
}
}
/// Marks the future as ready and returns true if it needs to be added to a run queue, i.e.
/// it isn't already ready, active or done.
#[must_use]
pub fn mark_ready(&self) -> bool {
self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
}
/// Drops the future without checking its current state.
///
/// # Safety
///
/// This doesn't check the current state, so this must only be called if it is known that there
/// is no concurrent access. This also does *not* include any memory barriers before dropping
/// the future.
pub unsafe fn drop_future_unchecked(&self) {
// Set the state first in case we panic when we drop.
assert!(self.state.fetch_or(DONE | RESULT_TAKEN, Relaxed) & DONE == 0);
(*self.future.get()).drop_future();
}
/// Drops the future if it is not currently being polled. Returns success if the future was
/// dropped or was already dropped.
pub fn try_drop(&self) -> Result<(), ()> {
let old = self.state.fetch_and(!INACTIVE, Acquire);
if old & DONE != 0 {
Ok(())
} else if old & INACTIVE != 0 {
// SAFETY: We have exclusive access.
unsafe {
self.drop_future_unchecked();
}
Ok(())
} else {
Err(())
}
}
/// Cancels the task. Returns true if the task needs to be added to a run queue.
#[must_use]
pub fn cancel(&self) -> bool {
self.state.fetch_or(CANCELLED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
}
/// Marks the task as detached.
pub fn detach(&self) {
self.state.fetch_or(DETACHED, Relaxed);
}
/// Marks the task as cancelled and detached (for when the caller isn't interested in waiting
/// for the cancellation to be finished). Returns true if the task should be added to a run
/// queue.
pub fn cancel_and_detach(&self) -> CancelAndDetachResult {
let old_state = self.state.fetch_or(CANCELLED | DETACHED | READY, Relaxed);
if old_state & DONE != 0 {
CancelAndDetachResult::Done
} else if old_state & (INACTIVE | READY) == INACTIVE {
CancelAndDetachResult::AddToRunQueue
} else {
CancelAndDetachResult::Pending
}
}
/// Returns true if the task is detached.
pub fn is_detached(&self) -> bool {
self.state.load(Relaxed) & DETACHED != 0
}
/// Takes the result.
///
/// # Safety
///
/// The caller must guarantee that `R` is the correct type.
pub unsafe fn take_result<R>(&self) -> Option<R> {
// This needs to be Acquire ordering to synchronize with the polling thread.
if self.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
&& self.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
{
Some(((*self.future.get()).get_result() as *const R).read())
} else {
None
}
}
}
impl Drop for AtomicFuture<'_> {
fn drop(&mut self) {
let state = *self.state.get_mut();
if state & DONE == 0 {
// SAFETY: The state isn't DONE so we must drop the future.
unsafe {
(*self.future.get()).drop_future();
}
} else if state & RESULT_TAKEN == 0 {
// SAFETY: The result hasn't been taken so we must drop the result.
unsafe {
(*self.future.get()).drop_result();
}
}
}
}