Skip to main content

fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use core::{error, fmt};
11use fuchsia_sync::Mutex;
12use futures::Stream;
13use pin_project_lite::pin_project;
14use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
15use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
16use std::any::Any;
17use std::borrow::Borrow;
18use std::collections::hash_map::Entry;
19use std::collections::hash_set;
20use std::future::{Future, IntoFuture};
21use std::hash;
22use std::marker::PhantomData;
23use std::mem::{self, ManuallyDrop};
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll, Waker, ready};
28
29//
30// # Public API
31//
32
33/// A scope for managing async tasks. This scope is aborted when dropped.
34///
35/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
36/// task is spawned on a scope, and runs until either the task completes or the
37/// scope is cancelled or aborted. In addition to owning tasks, scopes may own
38/// child scopes, forming a nested structure.
39///
40/// Scopes are usually joined or cancelled when the owning code is done with
41/// them. This makes it easier to reason about when a background task might
42/// still be running. Note that in multithreaded contexts it is safer to cancel
43/// and await a scope explicitly than to drop it, because the destructor is not
44/// synchronized with other threads that might be running a task.
45///
46/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
47/// of the executor. New code is encouraged to spawn directly on scopes instead,
48/// passing their handles as a way of documenting when a function might spawn
49/// tasks that run in the background and reasoning about their side effects.
50///
51/// ## Scope lifecycle
52///
53/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
54/// closed when one of the following happens:
55///
56/// 1. When [`close()`][Scope::close] is called.
57/// 2. When the scope is aborted or dropped, the scope is closed immediately.
58/// 3. When the scope is cancelled, the scope is closed when all active guards
59///    are dropped.
60/// 4. When the scope is joined and all tasks complete, the scope is closed
61///    before the join future resolves.
62///
63/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
64/// scope are dropped immediately, and their [`Task`][crate::Task] or
65/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
66/// transitively to all child scopes. Closed scopes cannot currently be
67/// reopened.
68///
69/// Scopes can also be detached, in which case they are never closed, and run
70/// until the completion of all tasks.
71///
72/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
73#[must_use = "Scopes should be explicitly awaited or cancelled"]
74#[derive(Debug)]
75pub struct Scope {
76    // LINT.IfChange
77    inner: ScopeHandle,
78    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
79}
80
81impl Default for Scope {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl Scope {
88    /// Create a new scope.
89    ///
90    /// The returned scope is a child of the current scope.
91    ///
92    /// # Panics
93    ///
94    /// May panic if not called in the context of an executor (e.g. within a
95    /// call to [`run`][crate::SendExecutor::run]).
96    pub fn new() -> Scope {
97        ScopeHandle::with_current(|handle| handle.new_child())
98    }
99
100    /// Create a new scope with a name.
101    ///
102    /// The returned scope is a child of the current scope.
103    ///
104    /// # Panics
105    ///
106    /// May panic if not called in the context of an executor (e.g. within a
107    /// call to [`run`][crate::SendExecutor::run]).
108    pub fn new_with_name(name: impl Into<String>) -> Scope {
109        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
110    }
111
112    /// Get the scope of the current task, or the global scope if there is no task
113    /// being polled.
114    ///
115    /// # Panics
116    ///
117    /// May panic if not called in the context of an executor (e.g. within a
118    /// call to [`run`][crate::SendExecutor::run]).
119    pub fn current() -> ScopeHandle {
120        ScopeHandle::with_current(|handle| handle.clone())
121    }
122
123    /// Get the global scope of the executor.
124    ///
125    /// This can be used to spawn tasks that live as long as the executor.
126    /// Usually, this means until the end of the program or test. This should
127    /// only be done for tasks where this is expected. If in doubt, spawn on a
128    /// shorter lived scope instead.
129    ///
130    /// In code that uses scopes, you are strongly encouraged to use this API
131    /// instead of the spawn APIs on [`Task`][crate::Task].
132    ///
133    /// All scopes are descendants of the global scope.
134    ///
135    /// # Panics
136    ///
137    /// May panic if not called in the context of an executor (e.g. within a
138    /// call to [`run`][crate::SendExecutor::run]).
139    pub fn global() -> ScopeHandle {
140        EHandle::local().global_scope().clone()
141    }
142
143    /// Create a child scope.
144    pub fn new_child(&self) -> Scope {
145        self.inner.new_child()
146    }
147
148    /// Create a child scope with a name.
149    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
150        self.inner.new_child_with_name(name.into())
151    }
152
153    /// Returns the name of the scope.
154    pub fn name(&self) -> &str {
155        &self.inner.inner.name
156    }
157
158    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
159    ///
160    /// This is a shorthand for `scope.as_handle().clone()`.
161    ///
162    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
163    /// available. Note that you should _not_ call `scope.clone()`, even though
164    /// the compiler allows it due to the Deref impl. Call this method instead.
165    pub fn to_handle(&self) -> ScopeHandle {
166        self.inner.clone()
167    }
168
169    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
170    /// this scope.
171    ///
172    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
173    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
174    /// calling this method over the less readable `&*scope`.
175    pub fn as_handle(&self) -> &ScopeHandle {
176        &self.inner
177    }
178
179    /// Wait for all tasks in the scope and its children to complete.
180    ///
181    /// New tasks will be accepted on the scope until every task completes and
182    /// this future resolves.
183    ///
184    /// Note that you can await a scope directly because it implements
185    /// `IntoFuture`. `scope.join().await` is a more explicit form of
186    /// `scope.await`.
187    pub fn join(self) -> Join {
188        Join::new(self)
189    }
190
191    /// Stop accepting new tasks on the scope. Returns a future that waits for
192    /// every task on the scope to complete.
193    pub fn close(self) -> Join {
194        self.inner.close();
195        Join::new(self)
196    }
197
198    /// Cancel all tasks cooperatively in the scope and its children
199    /// recursively.
200    ///
201    /// `cancel` first gives a chance to all child tasks (including tasks of
202    /// child scopes) to shutdown cleanly if they're holding on to a
203    /// [`ScopeActiveGuard`]. Once no child tasks are holding on to guards, then
204    /// `cancel` behaves like [`Scope::abort`], dropping all tasks and stopping
205    /// them from running at the next yield point. A [`ScopeActiveGuard`]
206    /// provides a cooperative cancellation signal that is triggered by this
207    /// call, see its documentation for more details.
208    ///
209    /// Once the returned future resolves, no task on the scope will be polled
210    /// again.
211    ///
212    /// Cancelling a scope _does not_ immediately prevent new tasks from being
213    /// accepted. New tasks are accepted as long as there are
214    /// `ScopeActiveGuard`s for this scope.
215    pub fn cancel(self) -> Join {
216        self.inner.cancel_all_tasks();
217        Join::new(self)
218    }
219
220    /// Cancel all tasks in the scope and its children recursively.
221    ///
222    /// Once the returned future resolves, no task on the scope will be polled
223    /// again. Unlike [`Scope::cancel`], this doesn't send a cooperative
224    /// cancellation signal to tasks or child scopes.
225    ///
226    /// When a scope is aborted it immediately stops accepting tasks. Handles of
227    /// tasks spawned on the scope will pend forever.
228    ///
229    /// Dropping the `Scope` object is equivalent to calling this method and
230    /// discarding the returned future. Awaiting the future is preferred because
231    /// it eliminates the possibility of a task poll completing on another
232    /// thread after the scope object has been dropped, which can sometimes
233    /// result in surprising behavior.
234    pub fn abort(self) -> impl Future<Output = ()> {
235        self.inner.abort_all_tasks();
236        Join::new(self)
237    }
238
239    /// Detach the scope, allowing its tasks to continue running in the
240    /// background.
241    ///
242    /// Tasks of a detached scope are still subject to join and cancel
243    /// operations on parent scopes.
244    pub fn detach(self) {
245        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
246        // for types which implement Drop.
247        let this = ManuallyDrop::new(self);
248        // SAFETY: this.inner is obviously valid, and we don't access `this`
249        // after moving.
250        mem::drop(unsafe { std::ptr::read(&this.inner) });
251    }
252}
253
254/// Abort the scope and all of its tasks. Prefer using the [`Scope::abort`]
255/// or [`Scope::join`] methods.
256impl Drop for Scope {
257    fn drop(&mut self) {
258        // Abort all tasks in the scope. Each task has a strong reference to the ScopeState,
259        // which will be dropped after all the tasks in the scope are dropped.
260
261        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
262        // here, but we cannot do that without either:
263        // - Sync drop support in AtomicFuture, or
264        // - The ability to reparent tasks, which requires atomic_arc or
265        //   acquiring a mutex during polling.
266        self.inner.abort_all_tasks();
267    }
268}
269
270impl IntoFuture for Scope {
271    type Output = ();
272    type IntoFuture = Join;
273    fn into_future(self) -> Self::IntoFuture {
274        self.join()
275    }
276}
277
278impl Deref for Scope {
279    type Target = ScopeHandle;
280    fn deref(&self) -> &Self::Target {
281        &self.inner
282    }
283}
284
285impl Borrow<ScopeHandle> for Scope {
286    fn borrow(&self) -> &ScopeHandle {
287        self
288    }
289}
290
291pin_project! {
292    /// Join operation for a [`Scope`].
293    ///
294    /// This is a future that resolves when all tasks on the scope are complete
295    /// or have been cancelled. New tasks will be accepted on the scope until
296    /// every task completes and this future resolves.
297    ///
298    /// When this object is dropped, the scope and all tasks in it are
299    /// cancelled.
300    //
301    // Note: The drop property is only true when S = Scope; it does not apply to
302    // other (non-public) uses of this struct where S = ScopeHandle.
303    pub struct Join<S = Scope> {
304        scope: S,
305        #[pin]
306        waker_entry: WakerEntry<ScopeState>,
307    }
308}
309
310impl<S: Borrow<ScopeHandle>> Join<S> {
311    fn new(scope: S) -> Self {
312        let waker_entry = scope.borrow().inner.state.waker_entry();
313        Self { scope, waker_entry }
314    }
315
316    /// Aborts the scope. The future will resolve when all tasks have finished
317    /// polling.
318    ///
319    /// See [`Scope::abort`] for more details.
320    pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321        self.scope.borrow().abort_all_tasks();
322        self
323    }
324
325    /// Cancel the scope. The future will resolve when all tasks have finished
326    /// polling.
327    ///
328    /// See [`Scope::cancel`] for more details.
329    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330        self.scope.borrow().cancel_all_tasks();
331        self
332    }
333}
334
335impl<S> Future for Join<S>
336where
337    S: Borrow<ScopeHandle>,
338{
339    type Output = ();
340    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341        let this = self.project();
342        let mut state = Borrow::borrow(&*this.scope).lock();
343        if state.has_tasks() {
344            state.add_waker(this.waker_entry, cx.waker().clone());
345            Poll::Pending
346        } else {
347            state.mark_finished();
348            Poll::Ready(())
349        }
350    }
351}
352
353/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
354/// below for futures.
355pub trait Spawnable {
356    /// The type of value produced on completion.
357    type Output;
358
359    /// Converts to a task that can be spawned directly.
360    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365    F::Output: Send + 'static,
366{
367    type Output = F::Output;
368
369    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370        scope.new_task(self)
371    }
372}
373
374/// A handle to a scope, which may be used to spawn tasks.
375///
376/// ## Ownership and cycles
377///
378/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
379/// not create an ownership cycle because the task will drop the `ScopeHandle`
380/// once it completes or is cancelled.
381///
382/// Naturally, scopes containing tasks that never complete and that are never
383/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
384/// this problem.
385#[derive(Clone)]
386pub struct ScopeHandle {
387    // LINT.IfChange
388    inner: Arc<ScopeInner>,
389    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
390}
391
392impl ScopeHandle {
393    /// Create a child scope.
394    pub fn new_child(&self) -> Scope {
395        self.new_child_inner(String::new())
396    }
397
398    /// Returns a reference to the instrument data.
399    pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
400        self.inner.instrument_data.as_deref()
401    }
402
403    /// Create a child scope.
404    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
405        self.new_child_inner(name.into())
406    }
407
408    fn new_child_inner(&self, name: String) -> Scope {
409        let mut state = self.lock();
410        let child = ScopeHandle {
411            inner: Arc::new(ScopeInner {
412                executor: self.inner.executor.clone(),
413                state: Condition::new(ScopeState::new_child(
414                    self.clone(),
415                    &state,
416                    JoinResults::default().into(),
417                )),
418
419                instrument_data: self
420                    .inner
421                    .executor
422                    .instrument
423                    .as_ref()
424                    .map(|value| value.scope_created(&name, Some(self))),
425                name,
426            }),
427        };
428        let weak = child.downgrade();
429        state.insert_child(weak);
430        Scope { inner: child }
431    }
432
433    /// Spawn a new task on the scope.
434    // This does not have the must_use attribute because it's common to detach and the lifetime of
435    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
436    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
437        let task = future.into_task(self.clone());
438        let task_handle = task.clone();
439        self.insert_task(task, false);
440        JoinHandle::new(self.clone(), task_handle)
441    }
442
443    /// Spawn a new task on the scope of a thread local executor.
444    ///
445    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
446    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
447    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
448        let task = self.new_local_task(future);
449        self.insert_task(task.clone(), false);
450        JoinHandle::new(self.clone(), task)
451    }
452
453    /// Like `spawn`, but for tasks that return a result.
454    ///
455    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
456    /// *cancelled*.
457    pub fn compute<T: Send + 'static>(
458        &self,
459        future: impl Spawnable<Output = T> + Send + 'static,
460    ) -> crate::Task<T> {
461        let task = future.into_task(self.clone());
462        let task_handle = task.clone();
463        self.insert_task(task, false);
464        JoinHandle::new(self.clone(), task_handle).into()
465    }
466
467    /// Like `spawn`, but for tasks that return a result.
468    ///
469    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
470    /// *cancelled*.
471    ///
472    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
473    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
474    pub fn compute_local<T: 'static>(
475        &self,
476        future: impl Future<Output = T> + 'static,
477    ) -> crate::Task<T> {
478        let task = self.new_local_task(future);
479        self.insert_task(task.clone(), false);
480        JoinHandle::new(self.clone(), task).into()
481    }
482
483    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
484        ScopeHandle {
485            inner: Arc::new(ScopeInner {
486                state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
487                name: "root".to_string(),
488                instrument_data: executor
489                    .instrument
490                    .as_ref()
491                    .map(|value| value.scope_created("root", None)),
492                executor,
493            }),
494        }
495    }
496
497    /// Stop the scope from accepting new tasks.
498    ///
499    /// Note that unlike [`Scope::close`], this does not return a future that
500    /// waits for all tasks to complete. This could lead to resource leaks
501    /// because it is not uncommon to access a TaskGroup from a task running on
502    /// the scope itself. If such a task were to await a future returned by this
503    /// method it would suspend forever waiting for itself to complete.
504    pub fn close(&self) {
505        self.lock().close();
506    }
507
508    /// Cancel all the scope's tasks.
509    ///
510    /// Note that if this is called from within a task running on the scope, the
511    /// task will not resume from the next await point.
512    pub fn cancel(self) -> Join<Self> {
513        self.cancel_all_tasks();
514        Join::new(self)
515    }
516
517    /// Aborts all the scope's tasks.
518    ///
519    /// Note that if this is called from within a task running on the scope, the
520    /// task will not resume from the next await point.
521    pub fn abort(self) -> impl Future<Output = ()> {
522        self.abort_all_tasks();
523        Join::new(self)
524    }
525
526    /// Retrieves a [`ScopeActiveGuard`] for this scope.
527    ///
528    /// Note that this may fail if cancellation has already started for this
529    /// scope. In that case, the caller must assume any tasks from this scope
530    /// may be dropped at any yield point.
531    ///
532    /// Creating a [`ScopeActiveGuard`] is substantially more expensive than
533    /// just polling it, so callers should maintain the returned guard when
534    /// success is observed from this call for best performance.
535    ///
536    /// See [`Scope::cancel`] for details on cooperative cancellation behavior.
537    #[must_use]
538    pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
539        ScopeActiveGuard::new(self)
540    }
541
542    /// Returns true if the scope has been signaled to exit via
543    /// [`Scope::cancel`] or [`Scope::abort`].
544    pub fn is_cancelled(&self) -> bool {
545        self.lock().status().is_cancelled()
546    }
547
548    // Joining the scope could be allowed from a ScopeHandle, but the use case
549    // seems less common and more bug prone than cancelling. We don't allow this
550    // for the same reason we don't return a future from close().
551
552    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
553    /// another task to have been spawned on this scope.
554    pub async fn on_no_tasks(&self) {
555        self.inner
556            .state
557            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
558            .await;
559    }
560
561    /// Wait for there to be no tasks and no guards. This is racy: as soon as this returns it is
562    /// possible for another task to have been spawned on this scope, or for there to be guards.
563    pub async fn on_no_tasks_and_guards(&self) {
564        self.inner
565            .state
566            .when(|state| {
567                if state.has_tasks() || state.guards() > 0 {
568                    Poll::Pending
569                } else {
570                    Poll::Ready(())
571                }
572            })
573            .await;
574    }
575
576    /// Wake all the scope's tasks so their futures will be polled again.
577    pub fn wake_all_with_active_guard(&self) {
578        self.lock().wake_all_with_active_guard();
579    }
580
581    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
582    /// That must be done separately.
583    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(&self, fut: Fut) -> AtomicFutureHandle<'a>
584    where
585        Fut::Output: Send,
586    {
587        let mut task = AtomicFutureHandle::new(Some(self.clone()), fut);
588        if let Some(instrument) = &self.executor().instrument {
589            instrument.task_created(self, &mut task);
590        }
591        task
592    }
593
594    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
595    /// That must be done separately.
596    pub(crate) fn new_local_task<'a>(&self, fut: impl Future + 'a) -> AtomicFutureHandle<'a> {
597        // Check that the executor is local and that this is the executor thread.
598        if !self.executor().is_local() {
599            panic!(
600                "Error: called `new_local_task` on multithreaded executor. \
601                 Use `spawn` or a `LocalExecutor` instead."
602            );
603        }
604        assert_eq!(
605            self.executor().first_thread_id.get(),
606            Some(&std::thread::current().id()),
607            "Error: called `new_local_task` on a different thread to the executor",
608        );
609
610        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
611        // so the Send requirements that `new_local` requires should be met.
612        unsafe {
613            let mut task = AtomicFutureHandle::new_local(Some(self.clone()), fut);
614            if let Some(instrument) = &self.executor().instrument {
615                instrument.task_created(self, &mut task);
616            }
617            task
618        }
619    }
620}
621
622impl fmt::Debug for ScopeHandle {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        f.debug_struct("Scope").field("name", &self.inner.name).finish()
625    }
626}
627
628/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
629/// That allows the scope to return a stream of results. Attempting to spawn tasks using
630/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
631/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
632/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
633/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
634/// rather than just concurrently.  Another difference is that the futures don't need to be the same
635/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
636/// it can have children, you can join them, cancel them, etc.
637pub struct ScopeStream<R> {
638    inner: ScopeHandle,
639    stream: Arc<Mutex<ResultsStreamInner<R>>>,
640}
641
642impl<R: Send + 'static> ScopeStream<R> {
643    /// Creates a new scope stream.
644    ///
645    /// The returned scope stream is a child of the current scope.
646    ///
647    /// # Panics
648    ///
649    /// May panic if not called in the context of an executor (e.g. within a
650    /// call to [`run`][crate::SendExecutor::run]).
651    pub fn new() -> (Self, ScopeStreamHandle<R>) {
652        Self::new_inner(String::new())
653    }
654
655    /// Creates a new scope stream with a name.
656    ///
657    /// The returned scope stream is a child of the current scope.
658    ///
659    /// # Panics
660    ///
661    /// May panic if not called in the context of an executor (e.g. within a
662    /// call to [`run`][crate::SendExecutor::run]).
663    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
664        Self::new_inner(name.into())
665    }
666
667    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
668        let this = ScopeHandle::with_current(|handle| {
669            let mut state = handle.lock();
670            let stream = Arc::default();
671            let child = ScopeHandle {
672                inner: Arc::new(ScopeInner {
673                    executor: handle.executor().clone(),
674                    state: Condition::new(ScopeState::new_child(
675                        handle.clone(),
676                        &state,
677                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
678                    )),
679                    instrument_data: handle
680                        .executor()
681                        .instrument
682                        .as_ref()
683                        .map(|value| value.scope_created(&name, Some(handle))),
684                    name,
685                }),
686            };
687            let weak = child.downgrade();
688            state.insert_child(weak);
689            ScopeStream { inner: child, stream }
690        });
691        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
692        (this, handle)
693    }
694}
695
696impl<R> Drop for ScopeStream<R> {
697    fn drop(&mut self) {
698        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
699        // which will be dropped after all the tasks in the scope are dropped.
700
701        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
702        // here, but we cannot do that without either:
703        // - Sync drop support in AtomicFuture, or
704        // - The ability to reparent tasks, which requires atomic_arc or
705        //   acquiring a mutex during polling.
706        self.inner.abort_all_tasks();
707    }
708}
709
710impl<R: Send + 'static> Stream for ScopeStream<R> {
711    type Item = R;
712
713    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
714        let mut stream_inner = self.stream.lock();
715        match stream_inner.results.pop() {
716            Some(result) => Poll::Ready(Some(result)),
717            None => {
718                // Lock ordering: when results are posted, the state lock is taken first, so we must
719                // do the same.
720                drop(stream_inner);
721                let state = self.inner.lock();
722                let mut stream_inner = self.stream.lock();
723                match stream_inner.results.pop() {
724                    Some(result) => Poll::Ready(Some(result)),
725                    None => {
726                        if state.has_tasks() {
727                            stream_inner.waker = Some(cx.waker().clone());
728                            Poll::Pending
729                        } else {
730                            Poll::Ready(None)
731                        }
732                    }
733                }
734            }
735        }
736    }
737}
738
739impl<R> Deref for ScopeStream<R> {
740    type Target = ScopeHandle;
741    fn deref(&self) -> &Self::Target {
742        &self.inner
743    }
744}
745
746impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
747    fn borrow(&self) -> &ScopeHandle {
748        self
749    }
750}
751
752impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
753    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
754        let (stream, handle) = ScopeStream::new();
755        for fut in iter {
756            handle.push(fut);
757        }
758        stream.close();
759        stream
760    }
761}
762
763#[derive(Clone)]
764pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
765
766impl<R: Send> ScopeStreamHandle<R> {
767    pub fn push(&self, future: impl Spawnable<Output = R>) {
768        self.0.insert_task(future.into_task(self.0.clone()), true);
769    }
770}
771
772/// Holds a guard on the creating scope, holding off cancelation.
773///
774/// `ScopeActiveGuard` allows [`Scope`]s to perform cooperative cancellation.
775/// [`ScopeActiveGuard::on_cancel`] returns a future that resolves when
776/// [`Scope::cancel`] and [`ScopeHandle::cancel`] are called. That is the signal
777/// sent to cooperative tasks to stop doing work and finish.
778///
779/// A `ScopeActiveGuard` is obtained via [`ScopeHandle::active_guard`].
780/// `ScopeActiveGuard` releases the guard on the originating scope on drop.
781#[derive(Debug)]
782#[must_use]
783pub struct ScopeActiveGuard(ScopeHandle);
784
785impl Deref for ScopeActiveGuard {
786    type Target = ScopeHandle;
787    fn deref(&self) -> &Self::Target {
788        &self.0
789    }
790}
791
792impl Drop for ScopeActiveGuard {
793    fn drop(&mut self) {
794        let Self(scope) = self;
795        scope.release_cancel_guard();
796    }
797}
798
799impl Clone for ScopeActiveGuard {
800    fn clone(&self) -> Self {
801        self.0.lock().acquire_cancel_guard(1);
802        Self(self.0.clone())
803    }
804}
805
806impl ScopeActiveGuard {
807    /// Returns a borrow of the scope handle associated with this guard.
808    pub fn as_handle(&self) -> &ScopeHandle {
809        &self.0
810    }
811
812    /// Returns a clone of the scope handle associated with this guard.
813    pub fn to_handle(&self) -> ScopeHandle {
814        self.0.clone()
815    }
816
817    /// Retrieves a future from this guard that can be polled on for
818    /// cancellation.
819    ///
820    /// The returned future resolves when the scope is cancelled. Callers should
821    /// perform teardown and drop the guard when done.
822    pub async fn on_cancel(&self) {
823        self.0
824            .inner
825            .state
826            .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
827            .await
828    }
829
830    fn new(scope: &ScopeHandle) -> Option<Self> {
831        if scope.lock().acquire_cancel_guard_if_not_finished() {
832            Some(Self(scope.clone()))
833        } else {
834            None
835        }
836    }
837}
838
839/// An error indicating that a task failed to execute to completion.
840pub struct JoinError {
841    _phantom: PhantomData<()>,
842}
843
844impl fmt::Debug for JoinError {
845    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
846        f.debug_tuple("JoinError").finish()
847    }
848}
849
850impl fmt::Display for JoinError {
851    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
852        write!(f, "JoinError: a task failed to execute to completion")
853    }
854}
855
856impl error::Error for JoinError {}
857
858//
859// # Internal API
860//
861
862/// A weak reference to a scope.
863#[derive(Clone)]
864struct WeakScopeHandle {
865    inner: Weak<ScopeInner>,
866}
867
868impl WeakScopeHandle {
869    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
870    pub fn upgrade(&self) -> Option<ScopeHandle> {
871        self.inner.upgrade().map(|inner| ScopeHandle { inner })
872    }
873}
874
875impl hash::Hash for WeakScopeHandle {
876    fn hash<H: hash::Hasher>(&self, state: &mut H) {
877        Weak::as_ptr(&self.inner).hash(state);
878    }
879}
880
881impl PartialEq for WeakScopeHandle {
882    fn eq(&self, other: &Self) -> bool {
883        Weak::ptr_eq(&self.inner, &other.inner)
884    }
885}
886
887impl Eq for WeakScopeHandle {
888    // Weak::ptr_eq should return consistent results, even when the inner value
889    // has been dropped.
890}
891
892// This module exists as a privacy boundary so that we can make sure any
893// operation that might cause the scope to finish also wakes its waker.
894mod state {
895    use super::*;
896
897    pub struct ScopeState {
898        pub parent: Option<ScopeHandle>,
899        // LINT.IfChange
900        children: HashSet<WeakScopeHandle>,
901        all_tasks: HashSet<TaskHandle>,
902        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
903        /// The number of children that transitively contain tasks, plus one for
904        /// this scope if it directly contains tasks.
905        subscopes_with_tasks: u32,
906        can_spawn: bool,
907        guards: u32,
908        status: Status,
909        /// Wakers/results for joining each task.
910        pub results: Box<dyn Results>,
911    }
912
913    pub enum JoinResult {
914        Waker(Waker),
915        Ready,
916    }
917
918    #[repr(u8)] // So zxdb can read the status.
919    #[derive(Default, Debug, Clone, Copy)]
920    pub enum Status {
921        #[default]
922        /// The scope is active.
923        Active,
924        /// The scope has been signalled to cancel and is waiting for all guards
925        /// to be released.
926        PendingCancellation,
927        /// The scope is not accepting new tasks and all tasks have been
928        /// scheduled to be dropped.
929        Finished,
930    }
931
932    impl Status {
933        /// Returns whether this records a cancelled state.
934        pub fn is_cancelled(&self) -> bool {
935            match self {
936                Self::Active => false,
937                Self::PendingCancellation | Self::Finished => true,
938            }
939        }
940    }
941
942    impl ScopeState {
943        pub fn new_root(results: Box<impl Results>) -> Self {
944            Self {
945                parent: None,
946                children: Default::default(),
947                all_tasks: Default::default(),
948                subscopes_with_tasks: 0,
949                can_spawn: true,
950                guards: 0,
951                status: Default::default(),
952                results,
953            }
954        }
955
956        pub fn new_child(
957            parent_handle: ScopeHandle,
958            parent_state: &Self,
959            results: Box<impl Results>,
960        ) -> Self {
961            let (status, can_spawn) = match parent_state.status {
962                Status::Active => (Status::Active, parent_state.can_spawn),
963                Status::Finished | Status::PendingCancellation => (Status::Finished, false),
964            };
965            Self {
966                parent: Some(parent_handle),
967                children: Default::default(),
968                all_tasks: Default::default(),
969                subscopes_with_tasks: 0,
970                can_spawn,
971                guards: 0,
972                status,
973                results,
974            }
975        }
976    }
977
978    impl ScopeState {
979        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
980            &self.all_tasks
981        }
982
983        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
984        /// (since it isn't safe to drop the task whilst the lock is held).
985        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
986            if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
987                return Some(task);
988            }
989            if self.all_tasks.is_empty() && !self.register_first_task() {
990                return Some(task);
991            }
992            task.wake();
993            assert!(self.all_tasks.insert(task), "Task must be new");
994            None
995        }
996
997        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
998            &self.children
999        }
1000
1001        pub fn insert_child(&mut self, child: WeakScopeHandle) {
1002            self.children.insert(child);
1003        }
1004
1005        pub fn remove_child(&mut self, child: &PtrKey) {
1006            let found = self.children.remove(child);
1007            // This should always succeed unless the scope is being dropped
1008            // (in which case children will be empty).
1009            assert!(found || self.children.is_empty());
1010        }
1011
1012        pub fn status(&self) -> Status {
1013            self.status
1014        }
1015
1016        pub fn guards(&self) -> u32 {
1017            self.guards
1018        }
1019
1020        pub fn close(&mut self) {
1021            self.can_spawn = false;
1022        }
1023
1024        pub fn mark_finished(&mut self) {
1025            self.can_spawn = false;
1026            self.status = Status::Finished;
1027        }
1028
1029        pub fn has_tasks(&self) -> bool {
1030            self.subscopes_with_tasks > 0
1031        }
1032
1033        pub fn wake_all_with_active_guard(&mut self) {
1034            let mut count = 0;
1035            for task in &self.all_tasks {
1036                if task.wake_with_active_guard() {
1037                    count += 1;
1038                }
1039            }
1040            self.acquire_cancel_guard(count);
1041        }
1042
1043        pub fn abort_tasks_and_mark_finished(&mut self) {
1044            for task in self.all_tasks() {
1045                if task.abort() {
1046                    task.scope().executor().ready_tasks.push(task.clone());
1047                }
1048                // Don't bother dropping tasks that are finished; the entire
1049                // scope is going to be dropped soon anyway.
1050            }
1051            self.mark_finished();
1052        }
1053
1054        pub fn wake_wakers_and_mark_pending(
1055            this: &mut ConditionGuard<'_, ScopeState>,
1056            wakers: &mut Vec<Waker>,
1057        ) {
1058            wakers.extend(this.drain_wakers());
1059            this.status = Status::PendingCancellation;
1060        }
1061
1062        /// Registers our first task with the parent scope.
1063        ///
1064        /// Returns false if the scope is not allowed to accept a task.
1065        #[must_use]
1066        fn register_first_task(&mut self) -> bool {
1067            if !self.can_spawn {
1068                return false;
1069            }
1070            let can_spawn = match &self.parent {
1071                Some(parent) => {
1072                    // If our parent already knows we have tasks, we can always
1073                    // spawn. Otherwise, we have to recurse.
1074                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1075                }
1076                None => true,
1077            };
1078            if can_spawn {
1079                self.subscopes_with_tasks += 1;
1080                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1081            };
1082            can_spawn
1083        }
1084
1085        fn on_last_task_removed(
1086            this: &mut ConditionGuard<'_, ScopeState>,
1087            num_wakers_hint: usize,
1088            wakers: &mut Vec<Waker>,
1089        ) {
1090            debug_assert!(this.subscopes_with_tasks > 0);
1091            this.subscopes_with_tasks -= 1;
1092            if this.subscopes_with_tasks > 0 {
1093                wakers.reserve(num_wakers_hint);
1094                return;
1095            }
1096
1097            match &this.parent {
1098                Some(parent) => {
1099                    Self::on_last_task_removed(
1100                        &mut parent.lock(),
1101                        num_wakers_hint + this.waker_count(),
1102                        wakers,
1103                    );
1104                }
1105                None => wakers.reserve(num_wakers_hint),
1106            };
1107            wakers.extend(this.drain_wakers());
1108        }
1109
1110        /// Acquires a cancel guard IFF we're not in the finished state.
1111        ///
1112        /// Returns `true` if a guard was acquired.
1113        pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1114            match self.status {
1115                Status::Active | Status::PendingCancellation => {
1116                    self.acquire_cancel_guard(1);
1117                    true
1118                }
1119                Status::Finished => false,
1120            }
1121        }
1122
1123        pub fn acquire_cancel_guard(&mut self, count: u32) {
1124            if count == 0 {
1125                return;
1126            }
1127            if self.guards == 0
1128                && let Some(parent) = self.parent.as_ref()
1129            {
1130                parent.acquire_cancel_guard();
1131            }
1132            self.guards += count;
1133        }
1134
1135        pub fn release_cancel_guard(
1136            this: &mut ConditionGuard<'_, Self>,
1137            wake_vec: &mut WakeVec,
1138            mut waker_count: usize,
1139        ) {
1140            this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1141            if this.guards == 0 {
1142                waker_count += this.waker_count();
1143                this.on_zero_guards(wake_vec, waker_count);
1144                wake_vec.0.extend(this.drain_wakers())
1145            } else {
1146                wake_vec.0.reserve_exact(waker_count);
1147            }
1148        }
1149
1150        fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1151            match self.status {
1152                Status::Active => {}
1153                Status::PendingCancellation => {
1154                    self.abort_tasks_and_mark_finished();
1155                }
1156                // Acquiring and releasing guards post finished state is a
1157                // no-op.
1158                Status::Finished => {}
1159            }
1160            if let Some(parent) = &self.parent {
1161                ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1162            }
1163        }
1164    }
1165
1166    #[derive(Default)]
1167    pub struct WakeVec(Vec<Waker>);
1168
1169    impl Drop for WakeVec {
1170        fn drop(&mut self) {
1171            for waker in self.0.drain(..) {
1172                waker.wake();
1173            }
1174        }
1175    }
1176
1177    // WakeVec *must* come after the guard because we want the guard to be dropped first.
1178    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1179
1180    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1181        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1182            Self(value, WakeVec::default())
1183        }
1184    }
1185
1186    impl ScopeWaker<'_> {
1187        pub fn take_task(&mut self, task: &TaskHandle) -> Option<TaskHandle> {
1188            let task = self.all_tasks.take(task);
1189            if task.is_some() {
1190                self.on_task_removed(0);
1191            }
1192            task
1193        }
1194
1195        pub fn task_did_finish(&mut self, task: &TaskHandle) {
1196            if let Some(task) = self.all_tasks.take(task) {
1197                self.on_task_removed(1);
1198                if !task.is_detached() {
1199                    let maybe_waker = self.results.task_did_finish(task);
1200                    self.1.0.extend(maybe_waker);
1201                }
1202            }
1203        }
1204
1205        pub fn set_closed_and_drain(
1206            &mut self,
1207        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1208            self.close();
1209            let all_tasks = std::mem::take(&mut self.all_tasks);
1210            let results = self.results.take();
1211            if !all_tasks.is_empty() {
1212                self.on_task_removed(0)
1213            }
1214            let children = self.children.drain();
1215            (all_tasks, results, children)
1216        }
1217
1218        fn on_task_removed(&mut self, num_wakers_hint: usize) {
1219            if self.all_tasks.is_empty() {
1220                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1221            }
1222        }
1223
1224        pub fn wake_wakers_and_mark_pending(&mut self) {
1225            let Self(state, wakers) = self;
1226            ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1227        }
1228    }
1229
1230    impl<'a> Deref for ScopeWaker<'a> {
1231        type Target = ConditionGuard<'a, ScopeState>;
1232
1233        fn deref(&self) -> &Self::Target {
1234            &self.0
1235        }
1236    }
1237
1238    impl DerefMut for ScopeWaker<'_> {
1239        fn deref_mut(&mut self) -> &mut Self::Target {
1240            &mut self.0
1241        }
1242    }
1243}
1244
1245struct ScopeInner {
1246    executor: Arc<Executor>,
1247    state: Condition<ScopeState>,
1248    name: String,
1249    instrument_data: Option<Box<dyn Any + Send + Sync>>,
1250}
1251
1252impl Drop for ScopeInner {
1253    fn drop(&mut self) {
1254        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1255        // This also complies with the correctness requirements of
1256        // HashSet::remove because the implementations of Hash and Eq match
1257        // between PtrKey and WeakScopeHandle.
1258        let key = unsafe { &*(self as *const _ as *const PtrKey) };
1259        let state = self.state.lock();
1260        if let Some(parent) = &state.parent {
1261            let mut wake_vec = WakeVec::default();
1262            let mut parent_state = parent.lock();
1263            if state.guards() != 0 {
1264                ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1265            }
1266            parent_state.remove_child(key);
1267        }
1268    }
1269}
1270
1271impl ScopeHandle {
1272    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1273        super::common::TaskHandle::with_current(|task| match task {
1274            Some(task) => f(task.scope()),
1275            None => f(EHandle::local().global_scope()),
1276        })
1277    }
1278
1279    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1280        self.inner.state.lock()
1281    }
1282
1283    fn downgrade(&self) -> WeakScopeHandle {
1284        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1285    }
1286
1287    #[inline(always)]
1288    pub(crate) fn executor(&self) -> &Arc<Executor> {
1289        &self.inner.executor
1290    }
1291
1292    /// Marks the task as detached.
1293    pub(crate) fn detach(&self, task: &TaskHandle) {
1294        let _maybe_task = {
1295            let mut state = self.lock();
1296            if let Some(task) = state.all_tasks().get(task) {
1297                task.detach();
1298            }
1299            state.results.detach(task)
1300        };
1301    }
1302
1303    /// Aborts the task.
1304    ///
1305    /// # Safety
1306    ///
1307    /// The caller must guarantee that `R` is the correct type.
1308    pub(crate) unsafe fn abort_task<R>(&self, task: &TaskHandle) -> Option<R> {
1309        let mut state = self.lock();
1310        if state.results.detach(task) {
1311            drop(state);
1312            return unsafe { task.take_result() };
1313        }
1314        state.all_tasks().get(task).and_then(|task| {
1315            if task.abort() {
1316                self.inner.executor.ready_tasks.push(task.clone());
1317            }
1318            unsafe { task.take_result() }
1319        })
1320    }
1321
1322    /// Aborts and detaches the task.
1323    pub(crate) fn abort_and_detach(&self, task: &TaskHandle) {
1324        let _tasks = {
1325            let mut state = ScopeWaker::from(self.lock());
1326            let maybe_task1 = state.results.detach(task);
1327            let mut maybe_task2 = None;
1328            if state.all_tasks().contains(task) {
1329                match task.abort_and_detach() {
1330                    AbortAndDetachResult::Done => maybe_task2 = state.take_task(task),
1331                    AbortAndDetachResult::AddToRunQueue => {
1332                        self.inner.executor.ready_tasks.push(task.clone());
1333                    }
1334                    AbortAndDetachResult::Pending => {}
1335                }
1336            }
1337            (maybe_task1, maybe_task2)
1338        };
1339    }
1340
1341    /// Polls for a join result for the given task ID.
1342    ///
1343    /// # Safety
1344    ///
1345    /// The caller must guarantee that `R` is the correct type.
1346    pub(crate) unsafe fn poll_join_result<R>(
1347        &self,
1348        task: &TaskHandle,
1349        cx: &mut Context<'_>,
1350    ) -> Poll<R> {
1351        let task = ready!(self.lock().results.poll_join_result(task, cx));
1352        match unsafe { task.take_result() } {
1353            Some(result) => Poll::Ready(result),
1354            None => {
1355                // The task has been aborted so all we can do is forever return pending.
1356                Poll::Pending
1357            }
1358        }
1359    }
1360
1361    /// Polls for a join result for the given task ID, or a `JoinError` if the
1362    /// task was canceled.
1363    ///
1364    /// # Safety
1365    ///
1366    /// The caller must guarantee that `R` is the correct type.
1367    pub(crate) unsafe fn try_poll_join_result<R>(
1368        &self,
1369        task: &TaskHandle,
1370        cx: &mut Context<'_>,
1371    ) -> Poll<Result<R, JoinError>> {
1372        let task = ready!(self.lock().results.poll_join_result(task, cx));
1373        match unsafe { task.take_result() } {
1374            Some(result) => Poll::Ready(Ok(result)),
1375            None => {
1376                if task.is_aborted() {
1377                    Poll::Ready(Err(JoinError { _phantom: PhantomData }))
1378                } else {
1379                    Poll::Pending
1380                }
1381            }
1382        }
1383    }
1384
1385    /// Polls for the task to be aborted.
1386    pub(crate) unsafe fn poll_aborted<R>(
1387        &self,
1388        task: &TaskHandle,
1389        cx: &mut Context<'_>,
1390    ) -> Poll<Option<R>> {
1391        let task = self.lock().results.poll_join_result(task, cx);
1392        task.map(|task| unsafe { task.take_result() })
1393    }
1394
1395    /// Returns `true` if the task is accepted.
1396    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1397        let returned_task = self.lock().insert_task(task, for_stream);
1398        if let Some(t) = returned_task {
1399            // The scope is not accepting new tasks at this time, but we want the user to see this
1400            // as if it accepted the task and then immediately aborted the task, which means we
1401            // must drop it here.  `try_drop` should succeed because we should be able to get
1402            // exclusive access to the task.
1403            t.try_drop().unwrap();
1404            false
1405        } else {
1406            true
1407        }
1408    }
1409
1410    /// Drops the specified task.
1411    ///
1412    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1413    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1414    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1415    ///
1416    /// # Safety
1417    ///
1418    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1419    /// thread is currently polling the task.
1420    pub(super) unsafe fn drop_task_unchecked(&self, task: &TaskHandle) {
1421        let mut state = ScopeWaker::from(self.lock());
1422        let task = state.take_task(task);
1423        if let Some(task) = task {
1424            unsafe { task.drop_future_unchecked() };
1425        }
1426    }
1427
1428    pub(super) fn task_did_finish(&self, task: &TaskHandle) {
1429        let mut state = ScopeWaker::from(self.lock());
1430        state.task_did_finish(task);
1431    }
1432
1433    /// Visits scopes by state. If the callback returns `true`, children will
1434    /// be visited.
1435    fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1436        let mut scopes = vec![self.clone()];
1437        while let Some(scope) = scopes.pop() {
1438            let mut scope_waker = ScopeWaker::from(scope.lock());
1439            if callback(&mut scope_waker) {
1440                scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1441            }
1442        }
1443    }
1444
1445    fn acquire_cancel_guard(&self) {
1446        self.lock().acquire_cancel_guard(1)
1447    }
1448
1449    pub(crate) fn release_cancel_guard(&self) {
1450        let mut wake_vec = WakeVec::default();
1451        ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1452    }
1453
1454    /// Cancels tasks in this scope and all child scopes.
1455    fn cancel_all_tasks(&self) {
1456        self.visit_scopes_locked(|state| {
1457            match state.status() {
1458                Status::Active => {
1459                    if state.guards() == 0 {
1460                        state.abort_tasks_and_mark_finished();
1461                    } else {
1462                        state.wake_wakers_and_mark_pending();
1463                    }
1464                    true
1465                }
1466                Status::PendingCancellation => {
1467                    // If we're already pending cancellation, don't wake all
1468                    // tasks. A single wake should be enough here. More
1469                    // wakes on further calls probably hides bugs.
1470                    true
1471                }
1472                Status::Finished => {
1473                    // Already finished.
1474                    false
1475                }
1476            }
1477        });
1478    }
1479
1480    /// Aborts tasks in this scope and all child scopes.
1481    fn abort_all_tasks(&self) {
1482        self.visit_scopes_locked(|state| match state.status() {
1483            Status::Active | Status::PendingCancellation => {
1484                state.abort_tasks_and_mark_finished();
1485                true
1486            }
1487            Status::Finished => false,
1488        });
1489    }
1490
1491    /// Drops tasks in this scope and all child scopes.
1492    ///
1493    /// # Panics
1494    ///
1495    /// Panics if any task is being accessed by another thread. Only call this
1496    /// method when the executor is shutting down and there are no other pollers.
1497    pub(super) fn drop_all_tasks(&self) {
1498        let mut scopes = vec![self.clone()];
1499        while let Some(scope) = scopes.pop() {
1500            let (tasks, join_results) = {
1501                let mut state = ScopeWaker::from(scope.lock());
1502                let (tasks, join_results, children) = state.set_closed_and_drain();
1503                scopes.extend(children.filter_map(|child| child.upgrade()));
1504                (tasks, join_results)
1505            };
1506            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1507            for task in tasks {
1508                task.try_drop().expect("Expected drop to succeed");
1509            }
1510            std::mem::drop(join_results);
1511        }
1512    }
1513}
1514
1515/// Optimizes removal from parent scope.
1516#[repr(transparent)]
1517struct PtrKey;
1518
1519impl Borrow<PtrKey> for WeakScopeHandle {
1520    fn borrow(&self) -> &PtrKey {
1521        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1522        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1523    }
1524}
1525
1526impl PartialEq for PtrKey {
1527    fn eq(&self, other: &Self) -> bool {
1528        std::ptr::eq(self, other)
1529    }
1530}
1531
1532impl Eq for PtrKey {}
1533
1534impl hash::Hash for PtrKey {
1535    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1536        (self as *const PtrKey).hash(state);
1537    }
1538}
1539
1540#[derive(Default)]
1541struct JoinResults(HashMap<TaskHandle, JoinResult>);
1542
1543trait Results: Send + Sync + 'static {
1544    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1545    fn can_spawn(&self) -> bool;
1546
1547    /// Polls for the specified task having finished.
1548    fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1549
1550    /// Called when a task finishes.
1551    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1552
1553    /// Called to drop any results for a particular task. Returns `true` if the
1554    /// task was ready.
1555    fn detach(&mut self, task: &TaskHandle) -> bool;
1556
1557    /// Takes *all* the stored results.
1558    fn take(&mut self) -> Box<dyn Any>;
1559
1560    /// Used only for testing.  Returns true if there are any results registered.
1561    #[cfg(test)]
1562    fn is_empty(&self) -> bool;
1563}
1564
1565impl Results for JoinResults {
1566    fn can_spawn(&self) -> bool {
1567        true
1568    }
1569
1570    fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1571        match self.0.entry(task.clone()) {
1572            Entry::Occupied(mut o) => match o.get_mut() {
1573                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1574                JoinResult::Ready => {
1575                    o.remove();
1576                    return Poll::Ready(task.clone());
1577                }
1578            },
1579            Entry::Vacant(v) => {
1580                v.insert(JoinResult::Waker(cx.waker().clone()));
1581            }
1582        }
1583        Poll::Pending
1584    }
1585
1586    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1587        match self.0.entry(task) {
1588            Entry::Occupied(mut o) => {
1589                let JoinResult::Waker(waker) = std::mem::replace(o.get_mut(), JoinResult::Ready)
1590                else {
1591                    // It can't be JoinResult::Ready because this function is the only
1592                    // function that sets that, and `task_did_finish` won't get called
1593                    // twice.
1594                    unreachable!()
1595                };
1596                Some(waker)
1597            }
1598            Entry::Vacant(v) => {
1599                v.insert(JoinResult::Ready);
1600                None
1601            }
1602        }
1603    }
1604
1605    fn detach(&mut self, task: &TaskHandle) -> bool {
1606        matches!(self.0.remove(task), Some(JoinResult::Ready))
1607    }
1608
1609    fn take(&mut self) -> Box<dyn Any> {
1610        Box::new(Self(std::mem::take(&mut self.0)))
1611    }
1612
1613    #[cfg(test)]
1614    fn is_empty(&self) -> bool {
1615        self.0.is_empty()
1616    }
1617}
1618
1619#[derive(Default)]
1620struct ResultsStream<R> {
1621    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1622}
1623
1624struct ResultsStreamInner<R> {
1625    results: Vec<R>,
1626    waker: Option<Waker>,
1627}
1628
1629impl<R> Default for ResultsStreamInner<R> {
1630    fn default() -> Self {
1631        Self { results: Vec::new(), waker: None }
1632    }
1633}
1634
1635impl<R: Send + 'static> Results for ResultsStream<R> {
1636    fn can_spawn(&self) -> bool {
1637        false
1638    }
1639
1640    fn poll_join_result(&mut self, _task: &TaskHandle, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1641        Poll::Pending
1642    }
1643
1644    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1645        let mut inner = self.inner.lock();
1646        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1647        // scope.
1648        inner.results.extend(unsafe { task.take_result() });
1649        inner.waker.take()
1650    }
1651
1652    fn detach(&mut self, _task: &TaskHandle) -> bool {
1653        false
1654    }
1655
1656    fn take(&mut self) -> Box<dyn Any> {
1657        Box::new(std::mem::take(&mut self.inner.lock().results))
1658    }
1659
1660    #[cfg(test)]
1661    fn is_empty(&self) -> bool {
1662        false
1663    }
1664}
1665
1666#[cfg(test)]
1667mod tests {
1668    // NOTE: Tests that work on both the fuchsia and portable runtimes should be placed in
1669    // runtime/scope.rs.
1670
1671    use super::super::super::task::CancelableJoinHandle;
1672    use super::*;
1673    use crate::{
1674        EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1675        yield_now,
1676    };
1677    use fuchsia_sync::{Condvar, Mutex};
1678    use futures::channel::mpsc;
1679    use futures::{FutureExt, StreamExt};
1680    use std::future::pending;
1681    use std::pin::{Pin, pin};
1682    use std::sync::Arc;
1683    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1684    use std::task::{Context, Poll};
1685    use std::time::Duration;
1686
1687    #[derive(Default)]
1688    struct RemoteControlFuture(Mutex<RCFState>);
1689    #[derive(Default)]
1690    struct RCFState {
1691        resolved: bool,
1692        waker: Option<Waker>,
1693    }
1694
1695    impl Future for &RemoteControlFuture {
1696        type Output = ();
1697        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1698            let mut this = self.0.lock();
1699            if this.resolved {
1700                Poll::Ready(())
1701            } else {
1702                this.waker.replace(cx.waker().clone());
1703                Poll::Pending
1704            }
1705        }
1706    }
1707
1708    impl RemoteControlFuture {
1709        fn new() -> Arc<Self> {
1710            Arc::new(Default::default())
1711        }
1712
1713        fn resolve(&self) {
1714            let mut this = self.0.lock();
1715            this.resolved = true;
1716            if let Some(waker) = this.waker.take() {
1717                waker.wake();
1718            }
1719        }
1720
1721        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
1722            let this = Arc::clone(self);
1723            #[allow(clippy::redundant_async_block)] // Allow returning `&*this` out of this fn.
1724            async move {
1725                (&*this).await
1726            }
1727        }
1728    }
1729
1730    #[test]
1731    fn compute_works_on_root_scope() {
1732        let mut executor = TestExecutor::new();
1733        let scope = executor.global_scope();
1734        let mut task = pin!(scope.compute(async { 1 }));
1735        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1736    }
1737
1738    #[test]
1739    fn compute_works_on_new_child() {
1740        let mut executor = TestExecutor::new();
1741        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1742        let mut task = pin!(scope.compute(async { 1 }));
1743        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1744    }
1745
1746    #[test]
1747    fn scope_drop_cancels_tasks() {
1748        let mut executor = TestExecutor::new();
1749        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1750        let mut task = pin!(scope.compute(async { 1 }));
1751        drop(scope);
1752        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1753    }
1754
1755    #[test]
1756    fn tasks_do_not_spawn_on_cancelled_scopes() {
1757        let mut executor = TestExecutor::new();
1758        let scope =
1759            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1760        let handle = scope.to_handle();
1761        let mut cancel = pin!(scope.cancel());
1762        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1763        let mut task = pin!(handle.compute(async { 1 }));
1764        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1765    }
1766
1767    #[test]
1768    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1769        let mut executor = TestExecutor::new();
1770        let scope =
1771            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1772        let handle = scope.to_handle();
1773        let mut close = pin!(scope.cancel());
1774        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1775        let mut task = pin!(handle.compute(async { 1 }));
1776        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1777    }
1778
1779    #[test]
1780    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1781        let mut executor = TestExecutor::new();
1782        let scope = executor.global_scope().new_child();
1783        let handle = scope.to_handle();
1784        handle.spawn(pending());
1785        let mut close = pin!(scope.close());
1786        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1787        let mut task = pin!(handle.compute(async { 1 }));
1788        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1789    }
1790
1791    #[test]
1792    fn spawn_works_on_child_and_grandchild() {
1793        let mut executor = TestExecutor::new();
1794        let scope = executor.global_scope().new_child();
1795        let child = scope.new_child();
1796        let grandchild = child.new_child();
1797        let mut child_task = pin!(child.compute(async { 1 }));
1798        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1799        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1800        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1801    }
1802
1803    #[test]
1804    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1805        let mut executor = TestExecutor::new();
1806        let scope = executor.global_scope().new_child();
1807        let child = scope.new_child();
1808        let grandchild = child.new_child();
1809        let mut child_task = pin!(child.compute(async { 1 }));
1810        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1811        drop(scope);
1812        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1813        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1814    }
1815
1816    #[test]
1817    fn completed_tasks_are_cleaned_up_after_cancel() {
1818        let mut executor = TestExecutor::new();
1819        let scope = executor.global_scope().new_child();
1820
1821        let task1 = scope.spawn(pending::<()>());
1822        let task2 = scope.spawn(async {});
1823        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1824        assert_eq!(scope.lock().all_tasks().len(), 1);
1825
1826        // Running the executor after cancelling the task isn't currently
1827        // necessary, but we might decide to do async cleanup in the future.
1828        assert_eq!(task1.abort().now_or_never(), None);
1829        assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1830
1831        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1832        assert_eq!(scope.lock().all_tasks().len(), 0);
1833        assert!(scope.lock().results.is_empty());
1834    }
1835
1836    #[test]
1837    fn join_emtpy_scope() {
1838        let mut executor = TestExecutor::new();
1839        let scope = executor.global_scope().new_child();
1840        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1841    }
1842
1843    #[test]
1844    fn task_handle_preserves_access_to_result_after_join_begins() {
1845        let mut executor = TestExecutor::new();
1846        let scope = executor.global_scope().new_child();
1847        let mut task = scope.compute(async { 1 });
1848        scope.spawn(async {});
1849        let task2 = scope.spawn(pending::<()>());
1850        // Fuse to stay agnostic as to whether the join completes before or
1851        // after awaiting the task handle.
1852        let mut join = pin!(scope.join().fuse());
1853        let _ = executor.run_until_stalled(&mut join);
1854        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1855        drop(task2.abort());
1856        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1857    }
1858
1859    #[test]
1860    fn join_blocks_until_task_is_cancelled() {
1861        // Scope with one outstanding task handle and one cancelled task.
1862        // The scope is not complete until the outstanding task handle is cancelled.
1863        let mut executor = TestExecutor::new();
1864        let scope = executor.global_scope().new_child();
1865        let outstanding_task = scope.spawn(pending::<()>());
1866        let cancelled_task = scope.spawn(pending::<()>());
1867        assert_eq!(
1868            executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1869            Poll::Ready(None)
1870        );
1871        let mut join = pin!(scope.join());
1872        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1873        assert_eq!(
1874            executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1875            Poll::Ready(None)
1876        );
1877        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1878    }
1879
1880    #[test]
1881    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1882        let mut executor = TestExecutor::new();
1883        let scope = executor.global_scope().new_child();
1884        // The default is to detach.
1885        scope.spawn(pending::<()>());
1886        let mut join = pin!(scope.join());
1887        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1888        let mut cancel = pin!(join.cancel());
1889        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1890    }
1891
1892    #[test]
1893    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1894        let mut executor = TestExecutor::new();
1895        let scope = executor.global_scope().new_child();
1896        // The default is to detach.
1897        scope.spawn(pending::<()>());
1898        let mut close = pin!(scope.close());
1899        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1900        let mut cancel = pin!(close.cancel());
1901        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1902    }
1903
1904    #[test]
1905    fn join_scope_blocks_until_spawned_task_completes() {
1906        let mut executor = TestExecutor::new();
1907        let scope = executor.global_scope().new_child();
1908        let remote = RemoteControlFuture::new();
1909        let mut task = scope.spawn(remote.as_future());
1910        let mut scope_join = pin!(scope.join());
1911        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1912        remote.resolve();
1913        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1914        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1915    }
1916
1917    #[test]
1918    fn close_scope_blocks_until_spawned_task_completes() {
1919        let mut executor = TestExecutor::new();
1920        let scope = executor.global_scope().new_child();
1921        let remote = RemoteControlFuture::new();
1922        let mut task = scope.spawn(remote.as_future());
1923        let mut scope_close = pin!(scope.close());
1924        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1925        remote.resolve();
1926        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1927        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1928    }
1929
1930    #[test]
1931    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1932        let mut executor = TestExecutor::new();
1933        let scope = executor.global_scope().new_child();
1934        let child = scope.new_child();
1935        let remote = RemoteControlFuture::new();
1936        child.spawn(remote.as_future());
1937        let mut scope_join = pin!(scope.join());
1938        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1939        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1940        child.detach();
1941        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1942        remote.resolve();
1943        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1944    }
1945
1946    #[test]
1947    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1948        let mut executor = TestExecutor::new();
1949        let scope = executor.global_scope().new_child();
1950        let remote = RemoteControlFuture::new();
1951        {
1952            let remote = remote.clone();
1953            scope.spawn(async move {
1954                let child = Scope::new_with_name("child");
1955                child.spawn(async move {
1956                    Scope::current().spawn(remote.as_future());
1957                });
1958                child.detach();
1959            });
1960        }
1961        let mut scope_join = pin!(scope.join());
1962        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1963        remote.resolve();
1964        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1965    }
1966
1967    #[test]
1968    fn join_scope_blocks_when_blocked_child_is_detached() {
1969        let mut executor = TestExecutor::new();
1970        let scope = executor.global_scope().new_child();
1971        let child = scope.new_child();
1972        child.spawn(pending());
1973        let mut scope_join = pin!(scope.join());
1974        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1975        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1976        child.detach();
1977        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1978    }
1979
1980    #[test]
1981    fn join_scope_completes_when_blocked_child_is_cancelled() {
1982        let mut executor = TestExecutor::new();
1983        let scope = executor.global_scope().new_child();
1984        let child = scope.new_child();
1985        child.spawn(pending());
1986        let mut scope_join = pin!(scope.join());
1987        {
1988            let mut child_join = pin!(child.join());
1989            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1990            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1991        }
1992        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1993    }
1994
1995    #[test]
1996    fn detached_scope_can_spawn() {
1997        let mut executor = TestExecutor::new();
1998        let scope = executor.global_scope().new_child();
1999        let handle = scope.to_handle();
2000        scope.detach();
2001        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2002    }
2003
2004    #[test]
2005    fn dropped_scope_cannot_spawn() {
2006        let mut executor = TestExecutor::new();
2007        let scope = executor.global_scope().new_child();
2008        let handle = scope.to_handle();
2009        drop(scope);
2010        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2011    }
2012
2013    #[test]
2014    fn dropped_scope_with_running_task_cannot_spawn() {
2015        let mut executor = TestExecutor::new();
2016        let scope = executor.global_scope().new_child();
2017        let handle = scope.to_handle();
2018        let _running_task = handle.spawn(pending::<()>());
2019        drop(scope);
2020        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2021    }
2022
2023    #[test]
2024    fn joined_scope_cannot_spawn() {
2025        let mut executor = TestExecutor::new();
2026        let scope = executor.global_scope().new_child();
2027        let handle = scope.to_handle();
2028        let mut scope_join = pin!(scope.join());
2029        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2030        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2031    }
2032
2033    #[test]
2034    fn joining_scope_with_running_task_can_spawn() {
2035        let mut executor = TestExecutor::new();
2036        let scope = executor.global_scope().new_child();
2037        let handle = scope.to_handle();
2038        let _running_task = handle.spawn(pending::<()>());
2039        let mut scope_join = pin!(scope.join());
2040        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2041        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2042    }
2043
2044    #[test]
2045    fn joined_scope_child_cannot_spawn() {
2046        let mut executor = TestExecutor::new();
2047        let scope = executor.global_scope().new_child();
2048        let handle = scope.to_handle();
2049        let child_before_join = scope.new_child();
2050        assert_eq!(
2051            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2052            Poll::Ready(1)
2053        );
2054        let mut scope_join = pin!(scope.join());
2055        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2056        let child_after_join = handle.new_child();
2057        let grandchild_after_join = child_before_join.new_child();
2058        assert_eq!(
2059            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2060            Poll::Pending
2061        );
2062        assert_eq!(
2063            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2064            Poll::Pending
2065        );
2066        assert_eq!(
2067            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2068            Poll::Pending
2069        );
2070    }
2071
2072    #[test]
2073    fn closed_scope_child_cannot_spawn() {
2074        let mut executor = TestExecutor::new();
2075        let scope = executor.global_scope().new_child();
2076        let handle = scope.to_handle();
2077        let child_before_close = scope.new_child();
2078        assert_eq!(
2079            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2080            Poll::Ready(1)
2081        );
2082        let mut scope_close = pin!(scope.close());
2083        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2084        let child_after_close = handle.new_child();
2085        let grandchild_after_close = child_before_close.new_child();
2086        assert_eq!(
2087            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2088            Poll::Pending
2089        );
2090        assert_eq!(
2091            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2092            Poll::Pending
2093        );
2094        assert_eq!(
2095            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2096            Poll::Pending
2097        );
2098    }
2099
2100    #[test]
2101    fn can_join_child_first() {
2102        let mut executor = TestExecutor::new();
2103        let scope = executor.global_scope().new_child();
2104        let child = scope.new_child();
2105        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2106        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2107        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2108    }
2109
2110    #[test]
2111    fn can_join_parent_first() {
2112        let mut executor = TestExecutor::new();
2113        let scope = executor.global_scope().new_child();
2114        let child = scope.new_child();
2115        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2116        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2117        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2118    }
2119
2120    #[test]
2121    fn task_in_parent_scope_can_join_child() {
2122        let mut executor = TestExecutor::new();
2123        let scope = executor.global_scope().new_child();
2124        let child = scope.new_child();
2125        let remote = RemoteControlFuture::new();
2126        child.spawn(remote.as_future());
2127        scope.spawn(async move { child.join().await });
2128        let mut join = pin!(scope.join());
2129        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2130        remote.resolve();
2131        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2132    }
2133
2134    #[test]
2135    fn join_completes_while_completed_task_handle_is_held() {
2136        let mut executor = TestExecutor::new();
2137        let scope = executor.global_scope().new_child();
2138        let mut task = scope.compute(async { 1 });
2139        scope.spawn(async {});
2140        let mut join = pin!(scope.join());
2141        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2142        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2143    }
2144
2145    #[test]
2146    fn cancel_completes_while_task_holds_handle() {
2147        let mut executor = TestExecutor::new();
2148        let scope = executor.global_scope().new_child();
2149        let handle = scope.to_handle();
2150        let mut task = scope.compute(async move {
2151            loop {
2152                pending::<()>().await; // never returns
2153                handle.spawn(async {});
2154            }
2155        });
2156
2157        // Join should not complete because the task never does.
2158        let mut join = pin!(scope.join());
2159        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2160
2161        let mut cancel = pin!(join.cancel());
2162        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2163        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2164    }
2165
2166    #[test]
2167    fn cancel_from_handle_inside_task() {
2168        let mut executor = TestExecutor::new();
2169        let scope = executor.global_scope().new_child();
2170        {
2171            // Spawn a task that never finishes until the scope is cancelled.
2172            scope.spawn(pending::<()>());
2173
2174            let mut no_tasks = pin!(scope.on_no_tasks());
2175            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2176
2177            let handle = scope.to_handle();
2178            scope.spawn(async move {
2179                handle.cancel().await;
2180                panic!("cancel() should never complete");
2181            });
2182
2183            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2184        }
2185        assert_eq!(scope.join().now_or_never(), Some(()));
2186    }
2187
2188    #[test]
2189    fn can_spawn_from_non_executor_thread() {
2190        let mut executor = TestExecutor::new();
2191        let scope = executor.global_scope().clone();
2192        let done = Arc::new(AtomicBool::new(false));
2193        let done_clone = done.clone();
2194        let _ = std::thread::spawn(move || {
2195            scope.spawn(async move {
2196                done_clone.store(true, Ordering::Relaxed);
2197            })
2198        })
2199        .join();
2200        let _ = executor.run_until_stalled(&mut pending::<()>());
2201        assert!(done.load(Ordering::Relaxed));
2202    }
2203
2204    #[test]
2205    fn scope_tree() {
2206        // A
2207        //  \
2208        //   B
2209        //  / \
2210        // C   D
2211        let mut executor = TestExecutor::new();
2212        let a = executor.global_scope().new_child();
2213        let b = a.new_child();
2214        let c = b.new_child();
2215        let d = b.new_child();
2216        let a_remote = RemoteControlFuture::new();
2217        let c_remote = RemoteControlFuture::new();
2218        let d_remote = RemoteControlFuture::new();
2219        a.spawn(a_remote.as_future());
2220        c.spawn(c_remote.as_future());
2221        d.spawn(d_remote.as_future());
2222        let mut a_join = pin!(a.join());
2223        let mut b_join = pin!(b.join());
2224        let mut d_join = pin!(d.join());
2225        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2226        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2227        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2228        d_remote.resolve();
2229        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2230        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2231        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2232        c_remote.resolve();
2233        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2234        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2235        a_remote.resolve();
2236        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2237        let mut c_join = pin!(c.join());
2238        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2239    }
2240
2241    #[test]
2242    fn wake_all_with_active_guard_on_send_executor() {
2243        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2244        let scope = executor.root_scope().new_child();
2245
2246        let (tx, mut rx) = mpsc::unbounded();
2247        // Bottom 32 bits are the poll count. Top 32 bits are when to signal.
2248        let state = Arc::new(AtomicU64::new(0));
2249
2250        struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2251
2252        impl Future for PollCounter {
2253            type Output = ();
2254            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2255                let old = self.0.fetch_add(1, Ordering::Relaxed);
2256                if old >> 32 == (old + 1) & u32::MAX as u64 {
2257                    let _ = self.1.unbounded_send(());
2258                }
2259                Poll::Pending
2260            }
2261        }
2262
2263        scope.spawn(PollCounter(state.clone(), tx.clone()));
2264        scope.spawn(PollCounter(state.clone(), tx.clone()));
2265
2266        executor.run(async move {
2267            let mut wait_for_poll_count = async |count| {
2268                let old = state.fetch_or(count << 32, Ordering::Relaxed);
2269                if old & u32::MAX as u64 != count {
2270                    rx.next().await.unwrap();
2271                }
2272                state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2273            };
2274
2275            // We must assume the executor will only poll the two tasks once each.
2276            wait_for_poll_count(2).await;
2277
2278            let mut start_count = 2;
2279            for _ in 0..2 {
2280                scope.wake_all_with_active_guard();
2281
2282                wait_for_poll_count(start_count + 2).await;
2283                start_count += 2;
2284            }
2285
2286            // Wake, then cancel the scope and verify the tasks still get polled.
2287            scope.wake_all_with_active_guard();
2288            let done = scope.cancel();
2289
2290            wait_for_poll_count(start_count + 2).await;
2291
2292            done.await;
2293        });
2294    }
2295
2296    #[test]
2297    fn on_no_tasks_race() {
2298        fn sleep_random() {
2299            std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2300        }
2301        for _ in 0..2000 {
2302            let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2303            let scope = executor.root_scope().new_child();
2304            scope.spawn(async {
2305                sleep_random();
2306            });
2307            executor.run(async move {
2308                sleep_random();
2309                scope.on_no_tasks().await;
2310            });
2311        }
2312    }
2313
2314    #[test]
2315    fn test_detach() {
2316        let mut e = LocalExecutor::default();
2317        e.run_singlethreaded(async {
2318            let counter = Arc::new(AtomicU32::new(0));
2319
2320            {
2321                let counter = counter.clone();
2322                Task::spawn(async move {
2323                    for _ in 0..5 {
2324                        yield_now().await;
2325                        counter.fetch_add(1, Ordering::Relaxed);
2326                    }
2327                })
2328                .detach();
2329            }
2330
2331            while counter.load(Ordering::Relaxed) != 5 {
2332                yield_now().await;
2333            }
2334        });
2335
2336        assert!(e.ehandle.root_scope.lock().results.is_empty());
2337    }
2338
2339    #[test]
2340    fn test_cancel() {
2341        let mut e = LocalExecutor::default();
2342        e.run_singlethreaded(async {
2343            let ref_count = Arc::new(());
2344            // First, just drop the task.
2345            {
2346                let ref_count = ref_count.clone();
2347                drop(Task::spawn(async move {
2348                    let _ref_count = ref_count;
2349                    let _: () = std::future::pending().await;
2350                }));
2351            }
2352
2353            while Arc::strong_count(&ref_count) != 1 {
2354                yield_now().await;
2355            }
2356
2357            // Now try explicitly cancelling.
2358            let task = {
2359                let ref_count = ref_count.clone();
2360                Task::spawn(async move {
2361                    let _ref_count = ref_count;
2362                    let _: () = std::future::pending().await;
2363                })
2364            };
2365
2366            assert_eq!(task.abort().await, None);
2367            while Arc::strong_count(&ref_count) != 1 {
2368                yield_now().await;
2369            }
2370
2371            // Now cancel a task that has already finished.
2372            let task = {
2373                let ref_count = ref_count.clone();
2374                Task::spawn(async move {
2375                    let _ref_count = ref_count;
2376                })
2377            };
2378
2379            // Wait for it to finish.
2380            while Arc::strong_count(&ref_count) != 1 {
2381                yield_now().await;
2382            }
2383
2384            assert_eq!(task.abort().await, Some(()));
2385        });
2386
2387        assert!(e.ehandle.root_scope.lock().results.is_empty());
2388    }
2389
2390    #[test]
2391    fn test_cancel_waits() {
2392        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2393        let state = Arc::new((Mutex::new(0), Condvar::new()));
2394        let task = {
2395            let state = state.clone();
2396            executor.root_scope().compute(async move {
2397                *state.0.lock() = 1;
2398                state.1.notify_all();
2399                // Wait till the other task has noticed we changed state to 1.
2400                state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2401                std::thread::sleep(std::time::Duration::from_millis(10));
2402                *state.0.lock() = 3;
2403                "foo"
2404            })
2405        };
2406        executor.run(async move {
2407            state.1.wait_while(&mut state.0.lock(), |state| {
2408                if *state == 1 {
2409                    // Tell the other task we've noticed state 1.
2410                    *state = 2;
2411                    false
2412                } else {
2413                    true
2414                }
2415            });
2416            state.1.notify_all();
2417            assert_eq!(task.abort().await, Some("foo"));
2418            // The other task should have finished and set state to 3.
2419            assert_eq!(*state.0.lock(), 3);
2420        });
2421    }
2422
2423    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2424        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2425        let running = Arc::new((Mutex::new(false), Condvar::new()));
2426        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2427        let task = {
2428            let running = running.clone();
2429            let can_quit = can_quit.clone();
2430            executor.root_scope().compute(async move {
2431                *running.0.lock() = true;
2432                running.1.notify_all();
2433                {
2434                    let mut guard = can_quit.0.lock();
2435                    while !*guard {
2436                        can_quit.1.wait(&mut guard);
2437                    }
2438                }
2439                *running.0.lock() = false;
2440            })
2441        };
2442        executor.run(async move {
2443            {
2444                let mut guard = running.0.lock();
2445                while !*guard {
2446                    running.1.wait(&mut guard);
2447                }
2448            }
2449
2450            callback(task);
2451
2452            *can_quit.0.lock() = true;
2453            can_quit.1.notify_all();
2454
2455            let ehandle = EHandle::local();
2456            let scope = ehandle.global_scope();
2457
2458            // The only way of testing for this is to poll.
2459            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2460                Timer::new(std::time::Duration::from_millis(1)).await;
2461            }
2462
2463            assert!(!*running.0.lock());
2464        });
2465    }
2466
2467    #[test]
2468    fn test_dropped_cancel_cleans_up() {
2469        test_clean_up(|task| {
2470            let abort_fut = std::pin::pin!(task.abort());
2471            let waker = std::task::Waker::noop();
2472            assert!(abort_fut.poll(&mut Context::from_waker(waker)).is_pending());
2473        });
2474    }
2475
2476    #[test]
2477    fn test_dropped_task_cleans_up() {
2478        test_clean_up(|task| {
2479            std::mem::drop(task);
2480        });
2481    }
2482
2483    #[test]
2484    fn test_detach_cleans_up() {
2485        test_clean_up(|task| {
2486            task.detach();
2487        });
2488    }
2489
2490    #[test]
2491    fn test_scope_stream() {
2492        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2493        executor.run(async move {
2494            let (stream, handle) = ScopeStream::new();
2495            handle.push(async { 1 });
2496            handle.push(async { 2 });
2497            stream.close();
2498            let results: HashSet<_> = stream.collect().await;
2499            assert_eq!(results, HashSet::from_iter([1, 2]));
2500        });
2501    }
2502
2503    #[test]
2504    fn test_scope_stream_wakes_properly() {
2505        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2506        executor.run(async move {
2507            let (stream, handle) = ScopeStream::new();
2508            handle.push(async {
2509                Timer::new(Duration::from_millis(10)).await;
2510                1
2511            });
2512            handle.push(async {
2513                Timer::new(Duration::from_millis(10)).await;
2514                2
2515            });
2516            stream.close();
2517            let results: HashSet<_> = stream.collect().await;
2518            assert_eq!(results, HashSet::from_iter([1, 2]));
2519        });
2520    }
2521
2522    #[test]
2523    fn test_scope_stream_drops_spawned_tasks() {
2524        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2525        executor.run(async move {
2526            let (stream, handle) = ScopeStream::new();
2527            handle.push(async { 1 });
2528            let _task = stream.compute(async { "foo" });
2529            stream.close();
2530            let results: HashSet<_> = stream.collect().await;
2531            assert_eq!(results, HashSet::from_iter([1]));
2532        });
2533    }
2534
2535    #[test]
2536    fn test_nested_scope_stream() {
2537        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2538        executor.run(async move {
2539            let (mut stream, handle) = ScopeStream::new();
2540            handle.clone().push(async move {
2541                handle.clone().push(async move {
2542                    handle.clone().push(async move { 3 });
2543                    2
2544                });
2545                1
2546            });
2547            let mut results = HashSet::default();
2548            while let Some(item) = stream.next().await {
2549                results.insert(item);
2550                if results.len() == 3 {
2551                    stream.close();
2552                }
2553            }
2554            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2555        });
2556    }
2557
2558    #[test]
2559    fn test_dropping_scope_stream_cancels_all_tasks() {
2560        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2561        executor.run(async move {
2562            let (stream, handle) = ScopeStream::new();
2563            let (tx1, mut rx) = mpsc::unbounded::<()>();
2564            let tx2 = tx1.clone();
2565            handle.push(async move {
2566                let _tx1 = tx1;
2567                let () = pending().await;
2568            });
2569            handle.push(async move {
2570                let _tx2 = tx2;
2571                let () = pending().await;
2572            });
2573            drop(stream);
2574
2575            // This will wait forever if the tasks aren't cancelled.
2576            assert_eq!(rx.next().await, None);
2577        });
2578    }
2579
2580    #[test]
2581    fn test_scope_stream_collect() {
2582        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2583        executor.run(async move {
2584            let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2585            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2586
2587            let stream: ScopeStream<_> =
2588                (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2589            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2590        });
2591    }
2592
2593    struct DropSignal(Arc<AtomicBool>);
2594
2595    impl Drop for DropSignal {
2596        fn drop(&mut self) {
2597            self.0.store(true, Ordering::SeqCst);
2598        }
2599    }
2600
2601    struct DropChecker(Arc<AtomicBool>);
2602
2603    impl DropChecker {
2604        fn new() -> (Self, DropSignal) {
2605            let inner = Arc::new(AtomicBool::new(false));
2606            (Self(inner.clone()), DropSignal(inner))
2607        }
2608
2609        fn is_dropped(&self) -> bool {
2610            self.0.load(Ordering::SeqCst)
2611        }
2612    }
2613
2614    #[test]
2615    fn child_finished_when_parent_pending() {
2616        let mut executor = LocalExecutor::default();
2617        executor.run_singlethreaded(async {
2618            let scope = Scope::new();
2619            let _guard = scope.active_guard().expect("acquire guard");
2620            let cancel = scope.to_handle().cancel();
2621            let child = scope.new_child();
2622            let (checker, signal) = DropChecker::new();
2623            child.spawn(async move {
2624                let _signal = signal;
2625                futures::future::pending::<()>().await
2626            });
2627            assert!(checker.is_dropped());
2628            assert!(child.active_guard().is_none());
2629            cancel.await;
2630        })
2631    }
2632
2633    #[test]
2634    fn guarded_scopes_observe_closed() {
2635        let mut executor = LocalExecutor::default();
2636        executor.run_singlethreaded(async {
2637            let scope = Scope::new();
2638            let handle = scope.to_handle();
2639            let _guard = scope.active_guard().expect("acquire guard");
2640            handle.close();
2641            let (checker, signal) = DropChecker::new();
2642            handle.spawn(async move {
2643                let _signal = signal;
2644                futures::future::pending::<()>().await
2645            });
2646            assert!(checker.is_dropped());
2647            let (checker, signal) = DropChecker::new();
2648            let cancel = handle.clone().cancel();
2649            handle.spawn(async move {
2650                let _signal = signal;
2651                futures::future::pending::<()>().await
2652            });
2653            assert!(checker.is_dropped());
2654            scope.join().await;
2655            cancel.await;
2656        })
2657    }
2658
2659    #[test]
2660    fn child_guard_holds_parent_cancellation() {
2661        let mut executor = TestExecutor::new();
2662        let scope = executor.global_scope().new_child();
2663        let child = scope.new_child();
2664        let guard = child.active_guard().expect("acquire guard");
2665        scope.spawn(futures::future::pending());
2666        let mut join = pin!(scope.cancel());
2667        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2668        drop(guard);
2669        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2670    }
2671
2672    #[test]
2673    fn active_guard_on_cancel() {
2674        let mut executor = TestExecutor::new();
2675        let scope = executor.global_scope().new_child();
2676        let child1 = scope.new_child();
2677        let child2 = scope.new_child();
2678        let guard = child1.active_guard().expect("acquire guard");
2679        let guard_for_right_scope = guard.clone();
2680        let guard_for_wrong_scope = guard.clone();
2681        child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2682        child2.spawn(async move {
2683            guard_for_wrong_scope.on_cancel().await;
2684        });
2685
2686        let handle = scope.to_handle();
2687        let mut join = pin!(scope.join());
2688        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2689        let cancel: Join<_> = handle.cancel();
2690        drop(cancel);
2691        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2692    }
2693
2694    #[test]
2695    fn abort_join() {
2696        let mut executor = TestExecutor::new();
2697        let scope = executor.global_scope().new_child();
2698        let child = scope.new_child();
2699        let _guard = child.active_guard().expect("acquire guard");
2700
2701        let (checker1, signal) = DropChecker::new();
2702        scope.spawn(async move {
2703            let _signal = signal;
2704            futures::future::pending::<()>().await
2705        });
2706        let (checker2, signal) = DropChecker::new();
2707        scope.spawn(async move {
2708            let _signal = signal;
2709            futures::future::pending::<()>().await
2710        });
2711
2712        let mut join = pin!(scope.cancel());
2713        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2714        assert!(!checker1.is_dropped());
2715        assert!(!checker2.is_dropped());
2716
2717        let mut join = join.abort();
2718        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2719        assert!(checker1.is_dropped());
2720        assert!(checker2.is_dropped());
2721    }
2722
2723    #[test]
2724    fn child_without_guard_aborts_immediately_on_cancel() {
2725        let mut executor = TestExecutor::new();
2726        let scope = executor.global_scope().new_child();
2727        let child = scope.new_child();
2728        let guard = scope.active_guard().expect("acquire guard");
2729
2730        let (checker_scope, signal) = DropChecker::new();
2731        scope.spawn(async move {
2732            let _signal = signal;
2733            futures::future::pending::<()>().await
2734        });
2735        let (checker_child, signal) = DropChecker::new();
2736        child.spawn(async move {
2737            let _signal = signal;
2738            futures::future::pending::<()>().await
2739        });
2740
2741        let mut join = pin!(scope.cancel());
2742        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2743        assert!(!checker_scope.is_dropped());
2744        assert!(checker_child.is_dropped());
2745
2746        drop(guard);
2747        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2748        assert!(checker_child.is_dropped());
2749    }
2750
2751    #[test]
2752    fn await_canceled_task_pends_forever() {
2753        let mut executor = TestExecutor::new();
2754        let scope = executor.global_scope().new_child();
2755
2756        let task = scope.spawn(pending::<()>());
2757        let mut main_future = pin!(async move {
2758            drop(scope);
2759            task.await;
2760        });
2761        assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Pending,);
2762    }
2763
2764    #[test]
2765    fn await_canceled_abortable_task_finishes_with_error() {
2766        let mut executor = TestExecutor::new();
2767        let scope = executor.global_scope().new_child();
2768
2769        let task = CancelableJoinHandle::from(scope.spawn(pending::<()>()));
2770        let mut main_future = pin!(async move {
2771            drop(scope);
2772            let _ = task.await;
2773        });
2774        assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Ready(()),);
2775    }
2776
2777    #[test]
2778    fn closed_scope_drops_task_immediately() {
2779        let executor = TestExecutor::new();
2780        let scope = executor.global_scope().new_child();
2781        scope.clone().close();
2782        let object = Arc::new(());
2783        let object2 = object.clone();
2784        let _task = scope.spawn(async move {
2785            let _object2 = object2;
2786            let () = std::future::pending().await;
2787        });
2788
2789        // The scope is closed, so the future should have been immediately dropped,
2790        // leaving only one reference.
2791        assert!(Arc::into_inner(object).is_some());
2792    }
2793}