fuchsia_async/runtime/fuchsia/executor/
atomic_future.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod hooks;
6pub mod spawnable_future;
7
8use crate::ScopeHandle;
9use futures::ready;
10use std::future::Future;
11use std::hash::{Hash, Hasher};
12use std::marker::PhantomData;
13use std::mem::ManuallyDrop;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
20
21/// A lock-free thread-safe future.
22//
23// The debugger knows the layout so that async backtraces work, so if this changes the debugger
24// might need to be changed too.
25//
26// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
27//
28// LINT.IfChange
29#[repr(C)]
30struct AtomicFuture<F: Future> {
31    meta: Meta,
32
33    // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
34    // state bit isn't set.
35    future: FutureOrResult<F>,
36}
37// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
38
39/// A lock-free thread-safe future. The handles can be cloned.
40#[derive(Debug)]
41pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
42
43/// `AtomicFutureHandle` is safe to access from multiple threads at once.
44unsafe impl Sync for AtomicFutureHandle<'_> {}
45unsafe impl Send for AtomicFutureHandle<'_> {}
46
47impl Drop for AtomicFutureHandle<'_> {
48    fn drop(&mut self) {
49        self.meta().release();
50    }
51}
52
53impl Clone for AtomicFutureHandle<'_> {
54    fn clone(&self) -> Self {
55        self.meta().retain();
56        Self(self.0, PhantomData)
57    }
58}
59
60impl PartialEq for AtomicFutureHandle<'_> {
61    fn eq(&self, other: &Self) -> bool {
62        self.0 == other.0
63    }
64}
65
66impl Eq for AtomicFutureHandle<'_> {}
67
68impl Hash for AtomicFutureHandle<'_> {
69    fn hash<H: Hasher>(&self, state: &mut H) {
70        self.0.hash(state);
71    }
72}
73
74struct Meta {
75    vtable: &'static VTable,
76
77    // Holds the reference count and state bits (INACTIVE, READY, etc.).
78    state: AtomicUsize,
79
80    scope: Option<ScopeHandle>,
81}
82
83impl Meta {
84    // # Safety
85    //
86    // This mints a handle with the 'static lifetime, so this should only be called from
87    // `AtomicFutureHandle<'static>`.
88    unsafe fn wake(&self) {
89        if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
90            self.retain();
91            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
92        }
93    }
94
95    // Returns true if a guard should be acquired.
96    //
97    // # Safety
98    //
99    // This mints a handle with the 'static lifetime, so this should only be called from
100    // `AtomicFutureHandle<'static>`.
101    unsafe fn wake_with_active_guard(&self) -> bool {
102        let old = self.state.fetch_or(READY | WITH_ACTIVE_GUARD, Relaxed);
103        if old & (INACTIVE | READY | DONE) == INACTIVE {
104            self.retain();
105            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
106        }
107
108        // If the task is DONE, the guard won't be released, so we must let the caller know.
109        old & (DONE | WITH_ACTIVE_GUARD) == 0
110    }
111
112    fn scope(&self) -> &ScopeHandle {
113        self.scope.as_ref().unwrap()
114    }
115
116    fn retain(&self) {
117        let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
118        assert!(old != REF_COUNT_MASK);
119    }
120
121    fn release(&self) {
122        // This can be Relaxed because there is a barrier in the drop function.
123        let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
124        if old == 1 {
125            // SAFETY: This is safe because we just released the last reference.
126            unsafe {
127                (self.vtable.drop)(self.into());
128            }
129        } else {
130            // Check for underflow.
131            assert!(old > 0);
132        }
133    }
134
135    // # Safety
136    //
137    // The caller must know that the future has completed.
138    unsafe fn drop_result(&self, ordering: Ordering) {
139        // It's possible for this to race with another thread so we only drop the result if we are
140        // successful in setting the RESULT_TAKEN bit.
141        if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
142            unsafe { (self.vtable.drop_result)(self.into()) };
143        }
144    }
145}
146
147struct VTable {
148    /// Drops the atomic future.
149    ///
150    /// # Safety
151    ///
152    /// The caller must ensure there are no other references i.e. the reference count should be
153    /// zero.
154    // zxdb uses this method to figure out the concrete type of the future.
155    // LINT.IfChange
156    drop: unsafe fn(NonNull<Meta>),
157    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
158    /// Drops the future.
159    ///
160    /// # Safety
161    ///
162    /// The caller must ensure the future hasn't been dropped.
163    drop_future: unsafe fn(NonNull<Meta>),
164    /// Polls the future.
165    ///
166    /// # Safety
167    ///
168    /// The caller must ensure the future hasn't been dropped and has exclusive access.
169    poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
170
171    /// Gets the result.
172    ///
173    /// # Safety
174    ///
175    /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
176    get_result: unsafe fn(NonNull<Meta>) -> *const (),
177
178    /// Drops the result.
179    ///
180    /// # Safety
181    ///
182    /// The caller must ensure the future is finished and the result hasn't already been taken or
183    /// dropped.
184    drop_result: unsafe fn(NonNull<Meta>),
185}
186
187union FutureOrResult<F: Future> {
188    future: ManuallyDrop<F>,
189    result: ManuallyDrop<F::Output>,
190}
191
192impl<F: Future> AtomicFuture<F> {
193    const VTABLE: VTable = VTable {
194        drop: Self::drop,
195        drop_future: Self::drop_future,
196        poll: Self::poll,
197        get_result: Self::get_result,
198        drop_result: Self::drop_result,
199    };
200
201    unsafe fn drop(meta: NonNull<Meta>) {
202        drop(unsafe { Box::from_raw(meta.cast::<Self>().as_mut()) });
203    }
204
205    unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
206        let future = &mut unsafe { meta.cast::<Self>().as_mut() }.future;
207        let result = ready!(unsafe { Pin::new_unchecked(&mut *future.future) }.poll(cx));
208        // This might panic which will leave ourselves in a bad state. We deal with this by
209        // aborting (see below).
210        unsafe { ManuallyDrop::drop(&mut future.future) };
211        future.result = ManuallyDrop::new(result);
212        Poll::Ready(())
213    }
214
215    unsafe fn drop_future(meta: NonNull<Meta>) {
216        unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future) };
217    }
218
219    unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
220        unsafe { &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const () }
221    }
222
223    unsafe fn drop_result(meta: NonNull<Meta>) {
224        unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result) };
225    }
226}
227
228/// State Bits
229//
230// Exclusive access is gained by clearing this bit.
231const INACTIVE: usize = 1 << 63;
232
233// Set to indicate the future needs to be polled again.
234const READY: usize = 1 << 62;
235
236// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
237// can be set, including READY (which has no meaning).
238const DONE: usize = 1 << 61;
239
240// The task has been detached.
241const DETACHED: usize = 1 << 60;
242
243// The task has been cancelled.
244const ABORTED: usize = 1 << 59;
245
246// The task has an active guard that should be dropped when the task is next polled.
247const WITH_ACTIVE_GUARD: usize = 1 << 58;
248
249// The result has been taken.
250const RESULT_TAKEN: usize = 1 << 57;
251
252// The mask for the ref count.
253const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
254
255/// The result of a call to `try_poll`.
256/// This indicates the result of attempting to `poll` the future.
257pub enum AttemptPollResult {
258    /// The future was polled, but did not complete.
259    Pending,
260    /// The future was polled and finished by this thread.
261    /// This result is normally used to trigger garbage-collection of the future.
262    IFinished,
263    /// The future was already completed by another thread.
264    SomeoneElseFinished,
265    /// The future was polled, did not complete, but it is woken whilst it is polled so it
266    /// should be polled again.
267    Yield,
268    /// The future was aborted.
269    Aborted,
270}
271
272/// The result of calling the `abort_and_detach` function.
273#[must_use]
274pub enum AbortAndDetachResult {
275    /// The future has finished; it can be dropped.
276    Done,
277
278    /// The future needs to be added to a run queue to be aborted.
279    AddToRunQueue,
280
281    /// The future is soon to be aborted and nothing needs to be done.
282    Pending,
283}
284
285impl<'a> AtomicFutureHandle<'a> {
286    /// Create a new `AtomicFuture`.
287    pub(crate) fn new<F: Future + Send + 'a>(scope: Option<ScopeHandle>, future: F) -> Self
288    where
289        F::Output: Send + 'a,
290    {
291        // SAFETY: This is safe because the future and output are both Send.
292        unsafe { Self::new_local(scope, future) }
293    }
294
295    /// Create a new `AtomicFuture` from a !Send future.
296    ///
297    /// # Safety
298    ///
299    /// The caller must uphold the Send requirements.
300    pub(crate) unsafe fn new_local<F: Future + 'a>(scope: Option<ScopeHandle>, future: F) -> Self
301    where
302        F::Output: 'a,
303    {
304        Self(
305            unsafe {
306                NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
307                    meta: Meta {
308                        vtable: &AtomicFuture::<F>::VTABLE,
309                        // The future is inactive and we start with a single reference.
310                        state: AtomicUsize::new(1 | INACTIVE),
311                        scope,
312                    },
313                    future: FutureOrResult { future: ManuallyDrop::new(future) },
314                })))
315            }
316            .cast::<Meta>(),
317            PhantomData,
318        )
319    }
320
321    fn meta(&self) -> &Meta {
322        // SAFETY: This is safe because we hold a reference count.
323        unsafe { self.0.as_ref() }
324    }
325
326    /// Returns the future's ID.
327    ///
328    /// The ID is only valid so long as there exists at least one live handle.
329    pub fn id(&self) -> usize {
330        // We use the address of the metadata as the ID since we know it's a stable heap address.
331        // We can't use Pin to guarantee it never moves because the actual pointer to the
332        // AtomicFuture is stored as a NonNull<Meta>.
333        //
334        // See https://github.com/rust-lang/rust/issues/54815 for an upstream feature request that
335        // would let us encode this in the types.
336        self.meta() as *const Meta as usize
337    }
338
339    /// Returns the associated scope.
340    pub fn scope(&self) -> &ScopeHandle {
341        self.meta().scope()
342    }
343
344    /// Attempt to poll the underlying future.
345    ///
346    /// `try_poll` ensures that the future is polled at least once more
347    /// unless it has already finished.
348    pub(crate) fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
349        let meta = self.meta();
350        let has_active_guard = loop {
351            // Attempt to acquire sole responsibility for polling the future (by clearing the
352            // INACTIVE bit) and also clear the READY and WITH_ACTIVE_GUARD bits at the same time.
353            // We clear both so that we can track if they are set again whilst we are polling.
354            let old = meta.state.fetch_and(!(INACTIVE | READY | WITH_ACTIVE_GUARD), Acquire);
355            assert_ne!(old & REF_COUNT_MASK, 0);
356            if old & DONE != 0 {
357                // If the DONE bit is set, the WITH_ACTIVE_GUARD bit should be ignored; it may or
358                // may not be set, but it doesn't reflect whether an active guard is held so even
359                // though we just cleared it, we shouldn't release a guard here.
360                return AttemptPollResult::SomeoneElseFinished;
361            }
362            let has_active_guard = old & WITH_ACTIVE_GUARD != 0;
363            if old & INACTIVE != 0 {
364                // We are now the (only) active worker, proceed to poll...
365                if old & ABORTED != 0 {
366                    if has_active_guard {
367                        meta.scope().release_cancel_guard();
368                    }
369                    // The future was aborted.
370                    // SAFETY: We have exclusive access.
371                    unsafe {
372                        self.drop_future_unchecked();
373                    }
374                    return AttemptPollResult::Aborted;
375                }
376                break has_active_guard;
377            }
378            // Future was already active; this shouldn't really happen because we shouldn't be
379            // polling it from multiple threads at the same time. Still, we handle it by setting
380            // the READY bit so that it gets polled again. We do this regardless of whether we
381            // cleared the READY bit above.
382            let old2 = meta.state.fetch_or(READY | (old & WITH_ACTIVE_GUARD), Relaxed);
383
384            if old2 & DONE != 0 {
385                // If `has_active_guard` is true, we are responsible for releasing a guard since it
386                // means we cleared the `WITH_ACTIVE_GUARD` bit.
387                if has_active_guard {
388                    meta.scope().release_cancel_guard();
389                }
390                return AttemptPollResult::SomeoneElseFinished;
391            }
392
393            if has_active_guard && old2 & WITH_ACTIVE_GUARD != 0 {
394                // Within the small window, something else gave this task an active guard, so we
395                // must return one of them.
396                meta.scope().release_cancel_guard();
397            }
398
399            // If the future is still active, or the future was already marked as ready, we can
400            // just return and it will get polled again.
401            if old2 & INACTIVE == 0 || old2 & READY != 0 {
402                return AttemptPollResult::Pending;
403            }
404            // The worker finished, and we marked the future as ready, so we must try again because
405            // the future won't be in a run queue.
406        };
407
408        // We cannot recover from panics.
409        let bomb = Bomb;
410
411        // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
412        let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
413
414        std::mem::forget(bomb);
415
416        if has_active_guard {
417            meta.scope().release_cancel_guard();
418        }
419
420        if let Poll::Ready(()) = result {
421            // The future will have been dropped, so we just need to set the state.
422            //
423            // This needs to be Release ordering because we need to synchronize with another thread
424            // that takes or drops the result.
425            let old = meta.state.fetch_or(DONE, Release);
426
427            if old & WITH_ACTIVE_GUARD != 0 {
428                // Whilst we were polling the task, it was given an active guard. We must return it
429                // now.
430                meta.scope().release_cancel_guard();
431            }
432
433            if old & DETACHED != 0 {
434                // If the future is detached, we should eagerly drop the result. This can be
435                // Relaxed ordering because the result was written by this thread.
436
437                // SAFETY: The future has completed.
438                unsafe {
439                    meta.drop_result(Relaxed);
440                }
441            }
442            // No one else will read `future` unless they see `INACTIVE`, which will never
443            // happen again.
444            AttemptPollResult::IFinished
445        } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
446            AttemptPollResult::Pending
447        } else {
448            // The future was marked ready whilst we were polling, so yield.
449            AttemptPollResult::Yield
450        }
451    }
452
453    /// Drops the future without checking its current state.
454    ///
455    /// # Panics
456    ///
457    /// This will panic if the future is already marked with `DONE`.
458    ///
459    /// # Safety
460    ///
461    /// This doesn't check the current state, so this must only be called if it is known that there
462    /// is no concurrent access. This also does *not* include any memory barriers before dropping
463    /// the future.
464    pub(crate) unsafe fn drop_future_unchecked(&self) {
465        // Set the state first in case we panic when we drop.
466        let meta = self.meta();
467        let old = meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed);
468        assert_eq!(old & DONE, 0);
469        if old & WITH_ACTIVE_GUARD != 0 {
470            meta.scope().release_cancel_guard();
471        }
472        unsafe { (meta.vtable.drop_future)(meta.into()) };
473    }
474
475    /// Drops the future if it is not currently being polled. Returns success if the future was
476    /// dropped or was already dropped.
477    pub(crate) fn try_drop(&self) -> Result<(), ()> {
478        let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
479        if old & DONE != 0 {
480            Ok(())
481        } else if old & INACTIVE != 0 {
482            // SAFETY: We have exclusive access.
483            unsafe {
484                self.drop_future_unchecked();
485            }
486            Ok(())
487        } else {
488            Err(())
489        }
490    }
491
492    /// Aborts the task. Returns true if the task needs to be added to a run queue.
493    #[must_use]
494    pub(crate) fn abort(&self) -> bool {
495        self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
496    }
497
498    /// Marks the task as detached.
499    pub(crate) fn detach(&self) {
500        let meta = self.meta();
501        let old = meta.state.fetch_or(DETACHED, Relaxed);
502
503        if old & (DONE | RESULT_TAKEN) == DONE {
504            // If the future is done, we should eagerly drop the result. This needs to be acquire
505            // ordering because another thread might have written the result.
506
507            // SAFETY: The future has completed.
508            unsafe {
509                meta.drop_result(Acquire);
510            }
511        }
512    }
513
514    /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
515    /// for the cancellation to be finished). Returns true if the task should be added to a run
516    /// queue.
517    pub(crate) fn abort_and_detach(&self) -> AbortAndDetachResult {
518        let meta = self.meta();
519        let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
520        if old_state & DONE != 0 {
521            // If the future is done, we should eagerly drop the result. This needs to be acquire
522            // ordering because another thread might have written the result.
523
524            // SAFETY: The future has completed.
525            unsafe {
526                meta.drop_result(Acquire);
527            }
528
529            AbortAndDetachResult::Done
530        } else if old_state & (INACTIVE | READY) == INACTIVE {
531            AbortAndDetachResult::AddToRunQueue
532        } else {
533            AbortAndDetachResult::Pending
534        }
535    }
536
537    /// Returns true if the task is detached.
538    pub(crate) fn is_detached(&self) -> bool {
539        self.meta().state.load(Relaxed) & DETACHED != 0
540    }
541
542    /// Returns true if the task is aborted.
543    pub(crate) fn is_aborted(&self) -> bool {
544        self.meta().state.load(Relaxed) & ABORTED != 0
545    }
546
547    /// Takes the result.
548    ///
549    /// # Safety
550    ///
551    /// The caller must guarantee that `R` is the correct type.
552    pub(crate) unsafe fn take_result<R>(&self) -> Option<R> {
553        // This needs to be Acquire ordering to synchronize with the polling thread.
554        let meta = self.meta();
555        if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
556            && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
557        {
558            Some(unsafe { ((meta.vtable.get_result)(meta.into()) as *const R).read() })
559        } else {
560            None
561        }
562    }
563}
564
565impl AtomicFutureHandle<'static> {
566    /// Returns a waker for the future.
567    pub(crate) fn waker(&self) -> BorrowedWaker<'_> {
568        static BORROWED_WAKER_VTABLE: RawWakerVTable =
569            RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
570        static WAKER_VTABLE: RawWakerVTable =
571            RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
572
573        fn waker_clone(raw_meta: *const ()) -> RawWaker {
574            // SAFETY: We did the reverse cast below.
575            let meta = unsafe { &*(raw_meta as *const Meta) };
576            meta.retain();
577            RawWaker::new(raw_meta, &WAKER_VTABLE)
578        }
579
580        fn waker_wake(raw_meta: *const ()) {
581            // SAFETY: We did the reverse cast below.
582            let meta = unsafe { &*(raw_meta as *const Meta) };
583            if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
584                // This consumes the reference count.
585                meta.scope().executor().task_is_ready(AtomicFutureHandle(
586                    // SAFETY: We know raw_meta is not null.
587                    unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
588                    PhantomData,
589                ));
590            } else {
591                meta.release();
592            }
593        }
594
595        fn waker_wake_by_ref(meta: *const ()) {
596            // SAFETY: We did the reverse cast below.
597            let meta = unsafe { &*(meta as *const Meta) };
598            // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
599            unsafe {
600                meta.wake();
601            }
602        }
603
604        fn waker_noop(_meta: *const ()) {}
605
606        fn waker_drop(meta: *const ()) {
607            // SAFETY: We did the reverse cast below.
608            let meta = unsafe { &*(meta as *const Meta) };
609            meta.release();
610        }
611
612        BorrowedWaker(
613            // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
614            unsafe {
615                Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
616            },
617            PhantomData,
618        )
619    }
620
621    /// Wakes the future.
622    pub(crate) fn wake(&self) {
623        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
624        unsafe {
625            self.meta().wake();
626        }
627    }
628
629    /// Wakes the future with an active guard. Returns true if successful i.e. a guard needs to be
630    /// acquired.
631    ///
632    /// NOTE: `Scope::release_cancel_guard` can be called *before* this function returns because the
633    /// task can be polled on another thread. For this reason, the caller either needs to hold a
634    /// lock, or it should preemptively take the guard.
635    pub(crate) fn wake_with_active_guard(&self) -> bool {
636        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
637        unsafe { self.meta().wake_with_active_guard() }
638    }
639}
640
641impl<F: Future> Drop for AtomicFuture<F> {
642    fn drop(&mut self) {
643        let meta = &mut self.meta;
644        // This needs to be acquire ordering so that we see writes that might have just happened
645        // in another thread when the future was polled.
646        let state = meta.state.load(Acquire);
647        if state & DONE == 0 {
648            // SAFETY: The state isn't DONE so we must drop the future.
649            unsafe {
650                (meta.vtable.drop_future)(meta.into());
651            }
652        } else if state & RESULT_TAKEN == 0 {
653            // SAFETY: The result hasn't been taken so we must drop the result.
654            unsafe {
655                (meta.vtable.drop_result)(meta.into());
656            }
657        }
658    }
659}
660
661pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
662
663impl Deref for BorrowedWaker<'_> {
664    type Target = Waker;
665
666    fn deref(&self) -> &Self::Target {
667        &self.0
668    }
669}
670
671struct Bomb;
672impl Drop for Bomb {
673    fn drop(&mut self) {
674        std::process::abort();
675    }
676}