fuchsia_async/runtime/fuchsia/executor/atomic_future.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod hooks;
6pub mod spawnable_future;
7
8use crate::ScopeHandle;
9use futures::ready;
10use std::future::Future;
11use std::hash::{Hash, Hasher};
12use std::marker::PhantomData;
13use std::mem::ManuallyDrop;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
20
21/// A lock-free thread-safe future.
22//
23// The debugger knows the layout so that async backtraces work, so if this changes the debugger
24// might need to be changed too.
25//
26// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
27//
28// LINT.IfChange
29#[repr(C)]
30struct AtomicFuture<F: Future> {
31 meta: Meta,
32
33 // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
34 // state bit isn't set.
35 future: FutureOrResult<F>,
36}
37// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
38
39/// A lock-free thread-safe future. The handles can be cloned.
40#[derive(Debug)]
41pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
42
43/// `AtomicFutureHandle` is safe to access from multiple threads at once.
44unsafe impl Sync for AtomicFutureHandle<'_> {}
45unsafe impl Send for AtomicFutureHandle<'_> {}
46
47impl Drop for AtomicFutureHandle<'_> {
48 fn drop(&mut self) {
49 self.meta().release();
50 }
51}
52
53impl Clone for AtomicFutureHandle<'_> {
54 fn clone(&self) -> Self {
55 self.meta().retain();
56 Self(self.0, PhantomData)
57 }
58}
59
60impl PartialEq for AtomicFutureHandle<'_> {
61 fn eq(&self, other: &Self) -> bool {
62 self.0 == other.0
63 }
64}
65
66impl Eq for AtomicFutureHandle<'_> {}
67
68impl Hash for AtomicFutureHandle<'_> {
69 fn hash<H: Hasher>(&self, state: &mut H) {
70 self.0.hash(state);
71 }
72}
73
74struct Meta {
75 vtable: &'static VTable,
76
77 // Holds the reference count and state bits (INACTIVE, READY, etc.).
78 state: AtomicUsize,
79
80 scope: Option<ScopeHandle>,
81}
82
83impl Meta {
84 // # Safety
85 //
86 // This mints a handle with the 'static lifetime, so this should only be called from
87 // `AtomicFutureHandle<'static>`.
88 unsafe fn wake(&self) {
89 if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
90 self.retain();
91 self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
92 }
93 }
94
95 // Returns true if a guard should be acquired.
96 //
97 // # Safety
98 //
99 // This mints a handle with the 'static lifetime, so this should only be called from
100 // `AtomicFutureHandle<'static>`.
101 unsafe fn wake_with_active_guard(&self) -> bool {
102 let old = self.state.fetch_or(READY | WITH_ACTIVE_GUARD, Relaxed);
103 if old & (INACTIVE | READY | DONE) == INACTIVE {
104 self.retain();
105 self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
106 }
107
108 // If the task is DONE, the guard won't be released, so we must let the caller know.
109 old & (DONE | WITH_ACTIVE_GUARD) == 0
110 }
111
112 fn scope(&self) -> &ScopeHandle {
113 self.scope.as_ref().unwrap()
114 }
115
116 fn retain(&self) {
117 let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
118 assert!(old != REF_COUNT_MASK);
119 }
120
121 fn release(&self) {
122 // This can be Relaxed because there is a barrier in the drop function.
123 let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
124 if old == 1 {
125 // SAFETY: This is safe because we just released the last reference.
126 unsafe {
127 (self.vtable.drop)(self.into());
128 }
129 } else {
130 // Check for underflow.
131 assert!(old > 0);
132 }
133 }
134
135 // # Safety
136 //
137 // The caller must know that the future has completed.
138 unsafe fn drop_result(&self, ordering: Ordering) {
139 // It's possible for this to race with another thread so we only drop the result if we are
140 // successful in setting the RESULT_TAKEN bit.
141 if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
142 unsafe { (self.vtable.drop_result)(self.into()) };
143 }
144 }
145}
146
147struct VTable {
148 /// Drops the atomic future.
149 ///
150 /// # Safety
151 ///
152 /// The caller must ensure there are no other references i.e. the reference count should be
153 /// zero.
154 // zxdb uses this method to figure out the concrete type of the future.
155 // LINT.IfChange
156 drop: unsafe fn(NonNull<Meta>),
157 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
158 /// Drops the future.
159 ///
160 /// # Safety
161 ///
162 /// The caller must ensure the future hasn't been dropped.
163 drop_future: unsafe fn(NonNull<Meta>),
164 /// Polls the future.
165 ///
166 /// # Safety
167 ///
168 /// The caller must ensure the future hasn't been dropped and has exclusive access.
169 poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
170
171 /// Gets the result.
172 ///
173 /// # Safety
174 ///
175 /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
176 get_result: unsafe fn(NonNull<Meta>) -> *const (),
177
178 /// Drops the result.
179 ///
180 /// # Safety
181 ///
182 /// The caller must ensure the future is finished and the result hasn't already been taken or
183 /// dropped.
184 drop_result: unsafe fn(NonNull<Meta>),
185}
186
187union FutureOrResult<F: Future> {
188 future: ManuallyDrop<F>,
189 result: ManuallyDrop<F::Output>,
190}
191
192impl<F: Future> AtomicFuture<F> {
193 const VTABLE: VTable = VTable {
194 drop: Self::drop,
195 drop_future: Self::drop_future,
196 poll: Self::poll,
197 get_result: Self::get_result,
198 drop_result: Self::drop_result,
199 };
200
201 unsafe fn drop(meta: NonNull<Meta>) {
202 drop(unsafe { Box::from_raw(meta.cast::<Self>().as_mut()) });
203 }
204
205 unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
206 let future = &mut unsafe { meta.cast::<Self>().as_mut() }.future;
207 let result = ready!(unsafe { Pin::new_unchecked(&mut *future.future) }.poll(cx));
208 // This might panic which will leave ourselves in a bad state. We deal with this by
209 // aborting (see below).
210 unsafe { ManuallyDrop::drop(&mut future.future) };
211 future.result = ManuallyDrop::new(result);
212 Poll::Ready(())
213 }
214
215 unsafe fn drop_future(meta: NonNull<Meta>) {
216 unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future) };
217 }
218
219 unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
220 unsafe { &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const () }
221 }
222
223 unsafe fn drop_result(meta: NonNull<Meta>) {
224 unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result) };
225 }
226}
227
228/// State Bits
229//
230// Exclusive access is gained by clearing this bit.
231const INACTIVE: usize = 1 << 63;
232
233// Set to indicate the future needs to be polled again.
234const READY: usize = 1 << 62;
235
236// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
237// can be set, including READY (which has no meaning).
238const DONE: usize = 1 << 61;
239
240// The task has been detached.
241const DETACHED: usize = 1 << 60;
242
243// The task has been cancelled.
244const ABORTED: usize = 1 << 59;
245
246// The task has an active guard that should be dropped when the task is next polled.
247const WITH_ACTIVE_GUARD: usize = 1 << 58;
248
249// The result has been taken.
250const RESULT_TAKEN: usize = 1 << 57;
251
252// The mask for the ref count.
253const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
254
255/// The result of a call to `try_poll`.
256/// This indicates the result of attempting to `poll` the future.
257pub enum AttemptPollResult {
258 /// The future was polled, but did not complete.
259 Pending,
260 /// The future was polled and finished by this thread.
261 /// This result is normally used to trigger garbage-collection of the future.
262 IFinished,
263 /// The future was already completed by another thread.
264 SomeoneElseFinished,
265 /// The future was polled, did not complete, but it is woken whilst it is polled so it
266 /// should be polled again.
267 Yield,
268 /// The future was aborted.
269 Aborted,
270}
271
272/// The result of calling the `abort_and_detach` function.
273#[must_use]
274pub enum AbortAndDetachResult {
275 /// The future has finished; it can be dropped.
276 Done,
277
278 /// The future needs to be added to a run queue to be aborted.
279 AddToRunQueue,
280
281 /// The future is soon to be aborted and nothing needs to be done.
282 Pending,
283}
284
285impl<'a> AtomicFutureHandle<'a> {
286 /// Create a new `AtomicFuture`.
287 pub(crate) fn new<F: Future + Send + 'a>(scope: Option<ScopeHandle>, future: F) -> Self
288 where
289 F::Output: Send + 'a,
290 {
291 // SAFETY: This is safe because the future and output are both Send.
292 unsafe { Self::new_local(scope, future) }
293 }
294
295 /// Create a new `AtomicFuture` from a !Send future.
296 ///
297 /// # Safety
298 ///
299 /// The caller must uphold the Send requirements.
300 pub(crate) unsafe fn new_local<F: Future + 'a>(scope: Option<ScopeHandle>, future: F) -> Self
301 where
302 F::Output: 'a,
303 {
304 Self(
305 unsafe {
306 NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
307 meta: Meta {
308 vtable: &AtomicFuture::<F>::VTABLE,
309 // The future is inactive and we start with a single reference.
310 state: AtomicUsize::new(1 | INACTIVE),
311 scope,
312 },
313 future: FutureOrResult { future: ManuallyDrop::new(future) },
314 })))
315 }
316 .cast::<Meta>(),
317 PhantomData,
318 )
319 }
320
321 fn meta(&self) -> &Meta {
322 // SAFETY: This is safe because we hold a reference count.
323 unsafe { self.0.as_ref() }
324 }
325
326 /// Returns the future's ID.
327 ///
328 /// The ID is only valid so long as there exists at least one live handle.
329 pub fn id(&self) -> usize {
330 // We use the address of the metadata as the ID since we know it's a stable heap address.
331 // We can't use Pin to guarantee it never moves because the actual pointer to the
332 // AtomicFuture is stored as a NonNull<Meta>.
333 //
334 // See https://github.com/rust-lang/rust/issues/54815 for an upstream feature request that
335 // would let us encode this in the types.
336 self.meta() as *const Meta as usize
337 }
338
339 /// Returns the associated scope.
340 pub fn scope(&self) -> &ScopeHandle {
341 self.meta().scope()
342 }
343
344 /// Attempt to poll the underlying future.
345 ///
346 /// `try_poll` ensures that the future is polled at least once more
347 /// unless it has already finished.
348 pub(crate) fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
349 let meta = self.meta();
350 let has_active_guard = loop {
351 // Attempt to acquire sole responsibility for polling the future (by clearing the
352 // INACTIVE bit) and also clear the READY and WITH_ACTIVE_GUARD bits at the same time.
353 // We clear both so that we can track if they are set again whilst we are polling.
354 let old = meta.state.fetch_and(!(INACTIVE | READY | WITH_ACTIVE_GUARD), Acquire);
355 assert_ne!(old & REF_COUNT_MASK, 0);
356 if old & DONE != 0 {
357 // If the DONE bit is set, the WITH_ACTIVE_GUARD bit should be ignored; it may or
358 // may not be set, but it doesn't reflect whether an active guard is held so even
359 // though we just cleared it, we shouldn't release a guard here.
360 return AttemptPollResult::SomeoneElseFinished;
361 }
362 let has_active_guard = old & WITH_ACTIVE_GUARD != 0;
363 if old & INACTIVE != 0 {
364 // We are now the (only) active worker, proceed to poll...
365 if old & ABORTED != 0 {
366 if has_active_guard {
367 meta.scope().release_cancel_guard();
368 }
369 // The future was aborted.
370 // SAFETY: We have exclusive access.
371 unsafe {
372 self.drop_future_unchecked();
373 }
374 return AttemptPollResult::Aborted;
375 }
376 break has_active_guard;
377 }
378 // Future was already active; this shouldn't really happen because we shouldn't be
379 // polling it from multiple threads at the same time. Still, we handle it by setting
380 // the READY bit so that it gets polled again. We do this regardless of whether we
381 // cleared the READY bit above.
382 let old2 = meta.state.fetch_or(READY | (old & WITH_ACTIVE_GUARD), Relaxed);
383
384 if old2 & DONE != 0 {
385 // If `has_active_guard` is true, we are responsible for releasing a guard since it
386 // means we cleared the `WITH_ACTIVE_GUARD` bit.
387 if has_active_guard {
388 meta.scope().release_cancel_guard();
389 }
390 return AttemptPollResult::SomeoneElseFinished;
391 }
392
393 if has_active_guard && old2 & WITH_ACTIVE_GUARD != 0 {
394 // Within the small window, something else gave this task an active guard, so we
395 // must return one of them.
396 meta.scope().release_cancel_guard();
397 }
398
399 // If the future is still active, or the future was already marked as ready, we can
400 // just return and it will get polled again.
401 if old2 & INACTIVE == 0 || old2 & READY != 0 {
402 return AttemptPollResult::Pending;
403 }
404 // The worker finished, and we marked the future as ready, so we must try again because
405 // the future won't be in a run queue.
406 };
407
408 // We cannot recover from panics.
409 let bomb = Bomb;
410
411 // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
412 let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
413
414 std::mem::forget(bomb);
415
416 if has_active_guard {
417 meta.scope().release_cancel_guard();
418 }
419
420 if let Poll::Ready(()) = result {
421 // The future will have been dropped, so we just need to set the state.
422 //
423 // This needs to be Release ordering because we need to synchronize with another thread
424 // that takes or drops the result.
425 let old = meta.state.fetch_or(DONE, Release);
426
427 if old & WITH_ACTIVE_GUARD != 0 {
428 // Whilst we were polling the task, it was given an active guard. We must return it
429 // now.
430 meta.scope().release_cancel_guard();
431 }
432
433 if old & DETACHED != 0 {
434 // If the future is detached, we should eagerly drop the result. This can be
435 // Relaxed ordering because the result was written by this thread.
436
437 // SAFETY: The future has completed.
438 unsafe {
439 meta.drop_result(Relaxed);
440 }
441 }
442 // No one else will read `future` unless they see `INACTIVE`, which will never
443 // happen again.
444 AttemptPollResult::IFinished
445 } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
446 AttemptPollResult::Pending
447 } else {
448 // The future was marked ready whilst we were polling, so yield.
449 AttemptPollResult::Yield
450 }
451 }
452
453 /// Drops the future without checking its current state.
454 ///
455 /// # Panics
456 ///
457 /// This will panic if the future is already marked with `DONE`.
458 ///
459 /// # Safety
460 ///
461 /// This doesn't check the current state, so this must only be called if it is known that there
462 /// is no concurrent access. This also does *not* include any memory barriers before dropping
463 /// the future.
464 pub(crate) unsafe fn drop_future_unchecked(&self) {
465 // Set the state first in case we panic when we drop.
466 let meta = self.meta();
467 let old = meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed);
468 assert_eq!(old & DONE, 0);
469 if old & WITH_ACTIVE_GUARD != 0 {
470 meta.scope().release_cancel_guard();
471 }
472 unsafe { (meta.vtable.drop_future)(meta.into()) };
473 }
474
475 /// Drops the future if it is not currently being polled. Returns success if the future was
476 /// dropped or was already dropped.
477 pub(crate) fn try_drop(&self) -> Result<(), ()> {
478 let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
479 if old & DONE != 0 {
480 Ok(())
481 } else if old & INACTIVE != 0 {
482 // SAFETY: We have exclusive access.
483 unsafe {
484 self.drop_future_unchecked();
485 }
486 Ok(())
487 } else {
488 Err(())
489 }
490 }
491
492 /// Aborts the task. Returns true if the task needs to be added to a run queue.
493 #[must_use]
494 pub(crate) fn abort(&self) -> bool {
495 self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
496 }
497
498 /// Marks the task as detached.
499 pub(crate) fn detach(&self) {
500 let meta = self.meta();
501 let old = meta.state.fetch_or(DETACHED, Relaxed);
502
503 if old & (DONE | RESULT_TAKEN) == DONE {
504 // If the future is done, we should eagerly drop the result. This needs to be acquire
505 // ordering because another thread might have written the result.
506
507 // SAFETY: The future has completed.
508 unsafe {
509 meta.drop_result(Acquire);
510 }
511 }
512 }
513
514 /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
515 /// for the cancellation to be finished). Returns true if the task should be added to a run
516 /// queue.
517 pub(crate) fn abort_and_detach(&self) -> AbortAndDetachResult {
518 let meta = self.meta();
519 let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
520 if old_state & DONE != 0 {
521 // If the future is done, we should eagerly drop the result. This needs to be acquire
522 // ordering because another thread might have written the result.
523
524 // SAFETY: The future has completed.
525 unsafe {
526 meta.drop_result(Acquire);
527 }
528
529 AbortAndDetachResult::Done
530 } else if old_state & (INACTIVE | READY) == INACTIVE {
531 AbortAndDetachResult::AddToRunQueue
532 } else {
533 AbortAndDetachResult::Pending
534 }
535 }
536
537 /// Returns true if the task is detached.
538 pub(crate) fn is_detached(&self) -> bool {
539 self.meta().state.load(Relaxed) & DETACHED != 0
540 }
541
542 /// Returns true if the task is aborted.
543 pub(crate) fn is_aborted(&self) -> bool {
544 self.meta().state.load(Relaxed) & ABORTED != 0
545 }
546
547 /// Takes the result.
548 ///
549 /// # Safety
550 ///
551 /// The caller must guarantee that `R` is the correct type.
552 pub(crate) unsafe fn take_result<R>(&self) -> Option<R> {
553 // This needs to be Acquire ordering to synchronize with the polling thread.
554 let meta = self.meta();
555 if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
556 && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
557 {
558 Some(unsafe { ((meta.vtable.get_result)(meta.into()) as *const R).read() })
559 } else {
560 None
561 }
562 }
563}
564
565impl AtomicFutureHandle<'static> {
566 /// Returns a waker for the future.
567 pub(crate) fn waker(&self) -> BorrowedWaker<'_> {
568 static BORROWED_WAKER_VTABLE: RawWakerVTable =
569 RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
570 static WAKER_VTABLE: RawWakerVTable =
571 RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
572
573 fn waker_clone(raw_meta: *const ()) -> RawWaker {
574 // SAFETY: We did the reverse cast below.
575 let meta = unsafe { &*(raw_meta as *const Meta) };
576 meta.retain();
577 RawWaker::new(raw_meta, &WAKER_VTABLE)
578 }
579
580 fn waker_wake(raw_meta: *const ()) {
581 // SAFETY: We did the reverse cast below.
582 let meta = unsafe { &*(raw_meta as *const Meta) };
583 if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
584 // This consumes the reference count.
585 meta.scope().executor().task_is_ready(AtomicFutureHandle(
586 // SAFETY: We know raw_meta is not null.
587 unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
588 PhantomData,
589 ));
590 } else {
591 meta.release();
592 }
593 }
594
595 fn waker_wake_by_ref(meta: *const ()) {
596 // SAFETY: We did the reverse cast below.
597 let meta = unsafe { &*(meta as *const Meta) };
598 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
599 unsafe {
600 meta.wake();
601 }
602 }
603
604 fn waker_noop(_meta: *const ()) {}
605
606 fn waker_drop(meta: *const ()) {
607 // SAFETY: We did the reverse cast below.
608 let meta = unsafe { &*(meta as *const Meta) };
609 meta.release();
610 }
611
612 BorrowedWaker(
613 // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
614 unsafe {
615 Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
616 },
617 PhantomData,
618 )
619 }
620
621 /// Wakes the future.
622 pub(crate) fn wake(&self) {
623 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
624 unsafe {
625 self.meta().wake();
626 }
627 }
628
629 /// Wakes the future with an active guard. Returns true if successful i.e. a guard needs to be
630 /// acquired.
631 ///
632 /// NOTE: `Scope::release_cancel_guard` can be called *before* this function returns because the
633 /// task can be polled on another thread. For this reason, the caller either needs to hold a
634 /// lock, or it should preemptively take the guard.
635 pub(crate) fn wake_with_active_guard(&self) -> bool {
636 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
637 unsafe { self.meta().wake_with_active_guard() }
638 }
639}
640
641impl<F: Future> Drop for AtomicFuture<F> {
642 fn drop(&mut self) {
643 let meta = &mut self.meta;
644 // This needs to be acquire ordering so that we see writes that might have just happened
645 // in another thread when the future was polled.
646 let state = meta.state.load(Acquire);
647 if state & DONE == 0 {
648 // SAFETY: The state isn't DONE so we must drop the future.
649 unsafe {
650 (meta.vtable.drop_future)(meta.into());
651 }
652 } else if state & RESULT_TAKEN == 0 {
653 // SAFETY: The result hasn't been taken so we must drop the result.
654 unsafe {
655 (meta.vtable.drop_result)(meta.into());
656 }
657 }
658 }
659}
660
661pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
662
663impl Deref for BorrowedWaker<'_> {
664 type Target = Waker;
665
666 fn deref(&self) -> &Self::Target {
667 &self.0
668 }
669}
670
671struct Bomb;
672impl Drop for Bomb {
673 fn drop(&mut self) {
674 std::process::abort();
675 }
676}