fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AtomicFutureHandle, CancelAndDetachResult};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28//
29// # Public API
30//
31
32/// A scope for managing async tasks. This scope is cancelled when dropped.
33///
34/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
35/// task is spawned on a scope, and runs until either the task completes or the
36/// scope is cancelled. In addition to owning tasks, scopes may own child
37/// scopes, forming a nested structure.
38///
39/// Scopes are usually joined or cancelled when the owning code is done with
40/// them. This makes it easier to reason about when a background task might
41/// still be running. Note that in multithreaded contexts it is safer to cancel
42/// and await a scope explicitly than to drop it, because the destructor is not
43/// synchronized with other threads that might be running a task.
44///
45/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
46/// of the executor. New code is encouraged to spawn directly on scopes instead,
47/// passing their handles as a way of documenting when a function might spawn
48/// tasks that run in the background and reasoning about their side effects.
49///
50/// ## Scope lifecycle
51///
52/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
53/// closed when one of the following happens:
54///
55/// 1. When [`close()`][Scope::close] is called.
56/// 2. When the scope is cancelled or dropped, the scope is closed immediately.
57/// 3. When the scope is joined and all tasks complete, the scope is closed
58///    before the join future resolves.
59///
60/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
61/// scope are dropped immediately, and their [`Task`][crate::Task] or
62/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
63/// transitively to all child scopes. Closed scopes cannot currently be
64/// reopened.
65///
66/// Scopes can also be detached, in which case they are never closed, and run
67/// until the completion of all tasks.
68///
69/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
70#[must_use = "Scopes should be explicitly awaited or cancelled"]
71#[derive(Debug)]
72pub struct Scope {
73    // LINT.IfChange
74    inner: ScopeHandle,
75    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
76}
77
78impl Scope {
79    /// Create a new scope.
80    ///
81    /// The returned scope is a child of the current scope.
82    ///
83    /// # Panics
84    ///
85    /// May panic if not called in the context of an executor (e.g. within a
86    /// call to [`run`][crate::SendExecutor::run]).
87    pub fn new() -> Scope {
88        ScopeHandle::with_current(|handle| handle.new_child())
89    }
90
91    /// Create a new scope with a name.
92    ///
93    /// The returned scope is a child of the current scope.
94    ///
95    /// # Panics
96    ///
97    /// May panic if not called in the context of an executor (e.g. within a
98    /// call to [`run`][crate::SendExecutor::run]).
99    pub fn new_with_name(name: &str) -> Scope {
100        ScopeHandle::with_current(|handle| handle.new_child_with_name(name))
101    }
102
103    /// Get the scope of the current task, or the global scope if there is no task
104    /// being polled.
105    ///
106    /// # Panics
107    ///
108    /// May panic if not called in the context of an executor (e.g. within a
109    /// call to [`run`][crate::SendExecutor::run]).
110    pub fn current() -> ScopeHandle {
111        ScopeHandle::with_current(|handle| handle.clone())
112    }
113
114    /// Get the global scope of the executor.
115    ///
116    /// This can be used to spawn tasks that live as long as the executor.
117    /// Usually, this means until the end of the program or test. This should
118    /// only be done for tasks where this is expected. If in doubt, spawn on a
119    /// shorter lived scope instead.
120    ///
121    /// In code that uses scopes, you are strongly encouraged to use this API
122    /// instead of the spawn APIs on [`Task`][crate::Task].
123    ///
124    /// All scopes are descendants of the global scope.
125    ///
126    /// # Panics
127    ///
128    /// May panic if not called in the context of an executor (e.g. within a
129    /// call to [`run`][crate::SendExecutor::run]).
130    pub fn global() -> ScopeHandle {
131        EHandle::local().global_scope().clone()
132    }
133
134    /// Create a child scope.
135    pub fn new_child(&self) -> Scope {
136        self.inner.new_child()
137    }
138
139    /// Create a child scope with a name.
140    pub fn new_child_with_name(&self, name: &str) -> Scope {
141        self.inner.new_child_with_name(name)
142    }
143
144    /// Returns the name of the scope.
145    pub fn name(&self) -> &str {
146        &self.inner.inner.name
147    }
148
149    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
150    ///
151    /// This is a shorthand for `scope.as_handle().clone()`.
152    ///
153    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
154    /// available. Note that you should _not_ call `scope.clone()`, even though
155    /// the compiler allows it due to the Deref impl. Call this method instead.
156    pub fn to_handle(&self) -> ScopeHandle {
157        self.inner.clone()
158    }
159
160    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
161    /// this scope.
162    ///
163    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
164    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
165    /// calling this method over the less readable `&*scope`.
166    pub fn as_handle(&self) -> &ScopeHandle {
167        &self.inner
168    }
169
170    /// Wait for all tasks in the scope and its children to complete.
171    ///
172    /// New tasks will be accepted on the scope until every task completes and
173    /// this future resolves.
174    ///
175    /// Note that you can await a scope directly because it implements
176    /// `IntoFuture`. `scope.join().await` is a more explicit form of
177    /// `scope.await`.
178    pub fn join(self) -> Join {
179        Join::new(self)
180    }
181
182    /// Stop accepting new tasks on the scope. Returns a future that waits for
183    /// every task on the scope to complete.
184    pub fn close(self) -> Join {
185        self.inner.close();
186        Join::new(self)
187    }
188
189    /// Cancel all tasks in the scope and its children recursively.
190    ///
191    /// Once the returned future resolves, no task on the scope will be polled
192    /// again.
193    ///
194    /// When a scope is cancelled it immediately stops accepting tasks. Handles
195    /// of tasks spawned on the scope will pend forever.
196    ///
197    /// Dropping the `Scope` object is equivalent to calling this method and
198    /// discarding the returned future. Awaiting the future is preferred because
199    /// it eliminates the possibility of a task poll completing on another
200    /// thread after the scope object has been dropped, which can sometimes
201    /// result in surprising behavior.
202    pub fn cancel(self) -> impl Future<Output = ()> {
203        self.inner.cancel_all_tasks();
204        Join::new(self)
205    }
206
207    /// Detach the scope, allowing its tasks to continue running in the
208    /// background.
209    ///
210    /// Tasks of a detached scope are still subject to join and cancel
211    /// operations on parent scopes.
212    pub fn detach(self) {
213        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
214        // for types which implement Drop.
215        let this = ManuallyDrop::new(self);
216        // SAFETY: this.inner is obviously valid, and we don't access `this`
217        // after moving.
218        mem::drop(unsafe { std::ptr::read(&this.inner) });
219    }
220}
221
222/// Cancel the scope and all of its tasks. Prefer using the [`Scope::cancel`]
223/// or [`Scope::join`] methods.
224impl Drop for Scope {
225    fn drop(&mut self) {
226        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
227        // which will be dropped after all the tasks in the scope are dropped.
228
229        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
230        // here, but we cannot do that without either:
231        // - Sync drop support in AtomicFuture, or
232        // - The ability to reparent tasks, which requires atomic_arc or
233        //   acquiring a mutex during polling.
234        self.inner.cancel_all_tasks();
235    }
236}
237
238impl IntoFuture for Scope {
239    type Output = ();
240    type IntoFuture = Join;
241    fn into_future(self) -> Self::IntoFuture {
242        self.join()
243    }
244}
245
246impl Deref for Scope {
247    type Target = ScopeHandle;
248    fn deref(&self) -> &Self::Target {
249        &self.inner
250    }
251}
252
253impl Borrow<ScopeHandle> for Scope {
254    fn borrow(&self) -> &ScopeHandle {
255        &*self
256    }
257}
258
259pin_project! {
260    /// Join operation for a [`Scope`].
261    ///
262    /// This is a future that resolves when all tasks on the scope are complete
263    /// or have been cancelled. New tasks will be accepted on the scope until
264    /// every task completes and this future resolves.
265    ///
266    /// When this object is dropped, the scope and all tasks in it are
267    /// cancelled.
268    //
269    // Note: The drop property is only true when S = Scope; it does not apply to
270    // other (non-public) uses of this struct where S = ScopeHandle.
271    pub struct Join<S = Scope> {
272        scope: S,
273        #[pin]
274        waker_entry: WakerEntry<ScopeState>,
275    }
276}
277
278impl<S> Join<S> {
279    fn new(scope: S) -> Self {
280        Self { scope, waker_entry: WakerEntry::new() }
281    }
282}
283
284impl Join {
285    /// Cancel the scope. The future will resolve when all tasks have finished
286    /// polling.
287    ///
288    /// See [`Scope::cancel`] for more details.
289    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
290        self.scope.inner.cancel_all_tasks();
291        self
292    }
293}
294
295impl<S> Future for Join<S>
296where
297    S: Borrow<ScopeHandle>,
298{
299    type Output = ();
300    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301        let this = self.project();
302        let mut state = Borrow::borrow(&*this.scope).lock();
303        if state.has_tasks() {
304            state.add_waker(this.waker_entry, cx.waker().clone());
305            Poll::Pending
306        } else {
307            state.mark_finished();
308            Poll::Ready(())
309        }
310    }
311}
312
313/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
314/// below for futures.
315pub trait Spawnable {
316    /// The type of value produced on completion.
317    type Output;
318
319    /// Converts to a task that can be spawned directly.
320    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
321}
322
323impl<F: Future + Send + 'static> Spawnable for F
324where
325    F::Output: Send + 'static,
326{
327    type Output = F::Output;
328
329    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
330        scope.new_task(None, self)
331    }
332}
333
334/// A handle to a scope, which may be used to spawn tasks.
335///
336/// ## Ownership and cycles
337///
338/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
339/// not create an ownership cycle because the task will drop the `ScopeHandle`
340/// once it completes or is cancelled.
341///
342/// Naturally, scopes containing tasks that never complete and that are never
343/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
344/// this problem.
345#[derive(Clone)]
346pub struct ScopeHandle {
347    // LINT.IfChange
348    inner: Arc<ScopeInner>,
349    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
350}
351
352impl ScopeHandle {
353    /// Create a child scope.
354    pub fn new_child(&self) -> Scope {
355        let mut state = self.lock();
356        let child = ScopeHandle {
357            inner: Arc::new(ScopeInner {
358                executor: self.inner.executor.clone(),
359                state: Condition::new(ScopeState::new(
360                    Some(self.clone()),
361                    state.status(),
362                    JoinResults::default().into(),
363                )),
364                name: String::new(),
365            }),
366        };
367        let weak = child.downgrade();
368        state.insert_child(weak);
369        Scope { inner: child }
370    }
371
372    /// Create a child scope.
373    pub fn new_child_with_name(&self, name: &str) -> Scope {
374        let mut state = self.lock();
375        let child = ScopeHandle {
376            inner: Arc::new(ScopeInner {
377                executor: self.inner.executor.clone(),
378                state: Condition::new(ScopeState::new(
379                    Some(self.clone()),
380                    state.status(),
381                    JoinResults::default().into(),
382                )),
383                name: name.to_string(),
384            }),
385        };
386        let weak = child.downgrade();
387        state.insert_child(weak);
388        Scope { inner: child }
389    }
390
391    /// Spawn a new task on the scope.
392    // This does not have the must_use attribute because it's common to detach and the lifetime of
393    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
394    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
395        let task = future.into_task(self.clone());
396        let task_id = task.id();
397        self.insert_task(task, false);
398        JoinHandle::new(self.clone(), task_id)
399    }
400
401    /// Spawn a new task on the scope of a thread local executor.
402    ///
403    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
404    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
405    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
406        let task = self.new_local_task(None, future);
407        let id = task.id();
408        self.insert_task(task, false);
409        JoinHandle::new(self.clone(), id)
410    }
411
412    /// Like `spawn`, but for tasks that return a result.
413    ///
414    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
415    /// *cancelled*.
416    pub fn compute<T: Send + 'static>(
417        &self,
418        future: impl Spawnable<Output = T> + Send + 'static,
419    ) -> crate::Task<T> {
420        let task = future.into_task(self.clone());
421        let id = task.id();
422        self.insert_task(task, false);
423        JoinHandle::new(self.clone(), id).into()
424    }
425
426    /// Like `spawn`, but for tasks that return a result.
427    ///
428    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
429    /// *cancelled*.
430    ///
431    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
432    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
433    pub fn compute_local<T: 'static>(
434        &self,
435        future: impl Future<Output = T> + 'static,
436    ) -> crate::Task<T> {
437        let task = self.new_local_task(None, future);
438        let id = task.id();
439        self.insert_task(task, false);
440        JoinHandle::new(self.clone(), id).into()
441    }
442
443    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
444        ScopeHandle {
445            inner: Arc::new(ScopeInner {
446                executor,
447                state: Condition::new(ScopeState::new(
448                    None,
449                    Status::default(),
450                    JoinResults::default().into(),
451                )),
452                name: "root".to_string(),
453            }),
454        }
455    }
456
457    /// Stop the scope from accepting new tasks.
458    ///
459    /// Note that unlike [`Scope::close`], this does not return a future that
460    /// waits for all tasks to complete. This could lead to resource leaks
461    /// because it is not uncommon to access a TaskGroup from a task running on
462    /// the scope itself. If such a task were to await a future returned by this
463    /// method it would suspend forever waiting for itself to complete.
464    pub fn close(&self) {
465        self.lock().close();
466    }
467
468    /// Cancel all the scope's tasks.
469    ///
470    /// Note that if this is called from within a task running on the scope, the
471    /// task will not resume from the next await point.
472    pub fn cancel(self) -> impl Future<Output = ()> {
473        self.cancel_all_tasks();
474        Join::new(self)
475    }
476
477    // Joining the scope could be allowed from a ScopeHandle, but the use case
478    // seems less common and more bug prone than cancelling. We don't allow this
479    // for the same reason we don't return a future from close().
480
481    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
482    /// another task to have been spawned on this scope.
483    pub async fn on_no_tasks(&self) {
484        self.inner
485            .state
486            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
487            .await;
488    }
489
490    /// Wake all the scope's tasks so their futures will be polled again.
491    pub fn wake_all(&self) {
492        self.lock().wake_all();
493    }
494
495    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
496    /// That must be done separately.
497    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
498        &self,
499        id: Option<usize>,
500        fut: Fut,
501    ) -> AtomicFutureHandle<'a>
502    where
503        Fut::Output: Send,
504    {
505        AtomicFutureHandle::new(
506            Some(self.clone()),
507            id.unwrap_or_else(|| self.executor().next_task_id()),
508            fut,
509        )
510    }
511
512    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
513    /// That must be done separately.
514    pub(crate) fn new_local_task<'a>(
515        &self,
516        id: Option<usize>,
517        fut: impl Future + 'a,
518    ) -> AtomicFutureHandle<'a> {
519        // Check that the executor is local.
520        if !self.executor().is_local() {
521            panic!(
522                "Error: called `new_local_task` on multithreaded executor. \
523                 Use `spawn` or a `LocalExecutor` instead."
524            );
525        }
526
527        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
528        // so the Send requirements that `new_local` requires should be met.
529        unsafe {
530            AtomicFutureHandle::new_local(
531                Some(self.clone()),
532                id.unwrap_or_else(|| self.executor().next_task_id()),
533                fut,
534            )
535        }
536    }
537}
538
539impl fmt::Debug for ScopeHandle {
540    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541        f.debug_struct("Scope").field("name", &self.inner.name).finish()
542    }
543}
544
545/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
546/// That allows the scope to return a stream of results. Attempting to spawn tasks using
547/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
548/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
549/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
550/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
551/// rather than just concurrently.  Another difference is that the futures don't need to be the same
552/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
553/// it can have children, you can join them, cancel them, etc.
554pub struct ScopeStream<R> {
555    inner: ScopeHandle,
556    stream: Arc<Mutex<ResultsStreamInner<R>>>,
557}
558
559impl<R: Send + 'static> ScopeStream<R> {
560    /// Creates a new scope stream.
561    ///
562    /// The returned scope stream is a child of the current scope.
563    ///
564    /// # Panics
565    ///
566    /// May panic if not called in the context of an executor (e.g. within a
567    /// call to [`run`][crate::SendExecutor::run]).
568    pub fn new() -> (Self, ScopeStreamHandle<R>) {
569        Self::new_with_name(String::new())
570    }
571
572    /// Creates a new scope stream with a name.
573    ///
574    /// The returned scope stream is a child of the current scope.
575    ///
576    /// # Panics
577    ///
578    /// May panic if not called in the context of an executor (e.g. within a
579    /// call to [`run`][crate::SendExecutor::run]).
580    pub fn new_with_name(name: String) -> (Self, ScopeStreamHandle<R>) {
581        let this = ScopeHandle::with_current(|handle| {
582            let mut state = handle.lock();
583            let stream = Arc::default();
584            let child = ScopeHandle {
585                inner: Arc::new(ScopeInner {
586                    executor: handle.executor().clone(),
587                    state: Condition::new(ScopeState::new(
588                        Some(handle.clone()),
589                        state.status(),
590                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
591                    )),
592                    name,
593                }),
594            };
595            let weak = child.downgrade();
596            state.insert_child(weak);
597            ScopeStream { inner: child, stream }
598        });
599        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
600        (this, handle)
601    }
602}
603
604impl<R> Drop for ScopeStream<R> {
605    fn drop(&mut self) {
606        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
607        // which will be dropped after all the tasks in the scope are dropped.
608
609        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
610        // here, but we cannot do that without either:
611        // - Sync drop support in AtomicFuture, or
612        // - The ability to reparent tasks, which requires atomic_arc or
613        //   acquiring a mutex during polling.
614        self.inner.cancel_all_tasks();
615    }
616}
617
618impl<R: Send + 'static> Stream for ScopeStream<R> {
619    type Item = R;
620
621    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622        let mut stream_inner = self.stream.lock();
623        match stream_inner.results.pop() {
624            Some(result) => Poll::Ready(Some(result)),
625            None => {
626                // Lock ordering: when results are posted, the state lock is taken first, so we must
627                // do the same.
628                drop(stream_inner);
629                let state = self.inner.lock();
630                let mut stream_inner = self.stream.lock();
631                match stream_inner.results.pop() {
632                    Some(result) => Poll::Ready(Some(result)),
633                    None => {
634                        if state.has_tasks() {
635                            stream_inner.waker = Some(cx.waker().clone());
636                            Poll::Pending
637                        } else {
638                            Poll::Ready(None)
639                        }
640                    }
641                }
642            }
643        }
644    }
645}
646
647impl<R> Deref for ScopeStream<R> {
648    type Target = ScopeHandle;
649    fn deref(&self) -> &Self::Target {
650        &self.inner
651    }
652}
653
654impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
655    fn borrow(&self) -> &ScopeHandle {
656        &*self
657    }
658}
659
660impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
661    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
662        let (stream, handle) = ScopeStream::new();
663        for fut in iter {
664            handle.push(fut);
665        }
666        stream.close();
667        stream
668    }
669}
670
671#[derive(Clone)]
672pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
673
674impl<R: Send> ScopeStreamHandle<R> {
675    pub fn push(&self, future: impl Spawnable<Output = R>) {
676        self.0.insert_task(future.into_task(self.0.clone()), true);
677    }
678}
679
680//
681// # Internal API
682//
683
684/// A weak reference to a scope.
685#[derive(Clone)]
686struct WeakScopeHandle {
687    inner: Weak<ScopeInner>,
688}
689
690impl WeakScopeHandle {
691    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
692    pub fn upgrade(&self) -> Option<ScopeHandle> {
693        self.inner.upgrade().map(|inner| ScopeHandle { inner })
694    }
695}
696
697impl hash::Hash for WeakScopeHandle {
698    fn hash<H: hash::Hasher>(&self, state: &mut H) {
699        Weak::as_ptr(&self.inner).hash(state);
700    }
701}
702
703impl PartialEq for WeakScopeHandle {
704    fn eq(&self, other: &Self) -> bool {
705        Weak::ptr_eq(&self.inner, &other.inner)
706    }
707}
708
709impl Eq for WeakScopeHandle {
710    // Weak::ptr_eq should return consistent results, even when the inner value
711    // has been dropped.
712}
713
714// This module exists as a privacy boundary so that we can make sure any
715// operation that might cause the scope to finish also wakes its waker.
716mod state {
717    use super::*;
718
719    pub struct ScopeState {
720        pub parent: Option<ScopeHandle>,
721        // LINT.IfChange
722        children: HashSet<WeakScopeHandle>,
723        all_tasks: HashSet<TaskHandle>,
724        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
725        /// The number of children that transitively contain tasks, plus one for
726        /// this scope if it directly contains tasks.
727        subscopes_with_tasks: u32,
728        status: Status,
729        /// Wakers/results for joining each task.
730        pub results: Box<dyn Results>,
731    }
732
733    pub enum JoinResult {
734        Waker(Waker),
735        Result(TaskHandle),
736    }
737
738    #[repr(u8)] // So zxdb can read the status.
739    #[derive(Default, Debug, Clone, Copy)]
740    pub enum Status {
741        #[default]
742        /// The scope is accepting new tasks.
743        Open,
744        /// The scope is no longer accepting new tasks.
745        Closed,
746        /// The scope is not accepting new tasks and all tasks have completed.
747        ///
748        /// This is purely an optimization; it is not guaranteed to be set.
749        Finished,
750    }
751
752    impl Status {
753        pub fn can_spawn(&self) -> bool {
754            match self {
755                Status::Open => true,
756                Status::Closed | Status::Finished => false,
757            }
758        }
759
760        pub fn might_have_running_tasks(&self) -> bool {
761            match self {
762                Status::Open | Status::Closed => true,
763                Status::Finished => false,
764            }
765        }
766    }
767
768    impl ScopeState {
769        pub fn new(
770            parent: Option<ScopeHandle>,
771            status: Status,
772            results: Box<impl Results>,
773        ) -> Self {
774            Self {
775                parent,
776                children: Default::default(),
777                all_tasks: Default::default(),
778                subscopes_with_tasks: 0,
779                status,
780                results,
781            }
782        }
783    }
784
785    impl ScopeState {
786        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
787            &self.all_tasks
788        }
789
790        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
791        /// (since it isn't safe to drop the task whilst the lock is held).
792        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
793            if !self.status.can_spawn() || (!for_stream && !self.results.can_spawn()) {
794                return Some(task);
795            }
796            if self.all_tasks.is_empty() && !self.register_first_task() {
797                return Some(task);
798            }
799            task.wake();
800            assert!(self.all_tasks.insert(task));
801            None
802        }
803
804        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
805            &self.children
806        }
807
808        pub fn insert_child(&mut self, child: WeakScopeHandle) {
809            self.children.insert(child);
810        }
811
812        pub fn remove_child(&mut self, child: &PtrKey) {
813            let found = self.children.remove(child);
814            // This should always succeed unless the scope is being dropped
815            // (in which case children will be empty).
816            assert!(found || self.children.is_empty());
817        }
818
819        pub fn status(&self) -> Status {
820            self.status
821        }
822
823        pub fn close(&mut self) {
824            self.status = Status::Closed;
825        }
826
827        pub fn mark_finished(&mut self) {
828            self.status = Status::Finished;
829        }
830
831        pub fn has_tasks(&self) -> bool {
832            self.subscopes_with_tasks > 0
833        }
834
835        pub fn wake_all(&self) {
836            for task in &self.all_tasks {
837                task.wake();
838            }
839        }
840
841        /// Registers our first task with the parent scope.
842        ///
843        /// Returns false if the scope is not allowed to accept a task.
844        #[must_use]
845        fn register_first_task(&mut self) -> bool {
846            if !self.status.can_spawn() {
847                return false;
848            }
849            let can_spawn = match &self.parent {
850                Some(parent) => {
851                    // If our parent already knows we have tasks, we can always
852                    // spawn. Otherwise, we have to recurse.
853                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
854                }
855                None => true,
856            };
857            if can_spawn {
858                self.subscopes_with_tasks += 1;
859                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
860            };
861            can_spawn
862        }
863
864        fn on_last_task_removed(
865            this: &mut ConditionGuard<'_, ScopeState>,
866            num_wakers_hint: usize,
867            wakers: &mut Vec<Waker>,
868        ) {
869            debug_assert!(this.subscopes_with_tasks > 0);
870            this.subscopes_with_tasks -= 1;
871            if this.subscopes_with_tasks > 0 {
872                wakers.reserve(num_wakers_hint);
873                return;
874            }
875
876            match &this.parent {
877                Some(parent) => {
878                    Self::on_last_task_removed(
879                        &mut parent.lock(),
880                        num_wakers_hint + this.waker_count(),
881                        wakers,
882                    );
883                }
884                None => wakers.reserve(num_wakers_hint),
885            };
886            wakers.extend(this.drain_wakers());
887        }
888    }
889
890    #[derive(Default)]
891    struct WakeVec(Vec<Waker>);
892
893    impl Drop for WakeVec {
894        fn drop(&mut self) {
895            for waker in self.0.drain(..) {
896                waker.wake();
897            }
898        }
899    }
900
901    // WakeVec *must* come after the guard because we want the guard to be dropped first.
902    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
903
904    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
905        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
906            Self(value, WakeVec::default())
907        }
908    }
909
910    impl ScopeWaker<'_> {
911        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
912            let task = self.all_tasks.take(&id);
913            if task.is_some() {
914                self.on_task_removed(0);
915            }
916            task
917        }
918
919        pub fn task_did_finish(&mut self, id: usize) {
920            if let Some(task) = self.all_tasks.take(&id) {
921                self.on_task_removed(1);
922                if !task.is_detached() {
923                    let maybe_waker = self.results.task_did_finish(task);
924                    self.1 .0.extend(maybe_waker);
925                }
926            }
927        }
928
929        pub fn set_closed_and_drain(
930            &mut self,
931        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
932            self.close();
933            let all_tasks = std::mem::take(&mut self.all_tasks);
934            let results = self.results.take();
935            if !all_tasks.is_empty() {
936                self.on_task_removed(0)
937            }
938            let children = self.children.drain();
939            (all_tasks, results, children)
940        }
941
942        fn on_task_removed(&mut self, num_wakers_hint: usize) {
943            if self.all_tasks.is_empty() {
944                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
945            }
946        }
947    }
948
949    impl<'a> Deref for ScopeWaker<'a> {
950        type Target = ConditionGuard<'a, ScopeState>;
951
952        fn deref(&self) -> &Self::Target {
953            &self.0
954        }
955    }
956
957    impl DerefMut for ScopeWaker<'_> {
958        fn deref_mut(&mut self) -> &mut Self::Target {
959            &mut self.0
960        }
961    }
962}
963
964struct ScopeInner {
965    executor: Arc<Executor>,
966    state: Condition<ScopeState>,
967    name: String,
968}
969
970impl Drop for ScopeInner {
971    fn drop(&mut self) {
972        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
973        // This also complies with the correctness requirements of
974        // HashSet::remove because the implementations of Hash and Eq match
975        // between PtrKey and WeakScopeHandle.
976        let key = unsafe { &*(self as *const _ as *const PtrKey) };
977        if let Some(parent) = &self.state.lock().parent {
978            let mut parent_state = parent.lock();
979            parent_state.remove_child(key);
980        }
981    }
982}
983
984impl ScopeHandle {
985    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
986        super::common::TaskHandle::with_current(|task| match task {
987            Some(task) => f(task.scope()),
988            None => f(EHandle::local().global_scope()),
989        })
990    }
991
992    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
993        self.inner.state.lock()
994    }
995
996    fn downgrade(&self) -> WeakScopeHandle {
997        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
998    }
999
1000    #[inline(always)]
1001    pub(crate) fn executor(&self) -> &Arc<Executor> {
1002        &self.inner.executor
1003    }
1004
1005    /// Marks the task as detached.
1006    pub(crate) fn detach(&self, task_id: usize) {
1007        let _maybe_task = {
1008            let mut state = self.lock();
1009            if let Some(task) = state.all_tasks().get(&task_id) {
1010                task.detach();
1011            }
1012            state.results.detach(task_id)
1013        };
1014    }
1015
1016    /// Cancels the task.
1017    ///
1018    /// # Safety
1019    ///
1020    /// The caller must guarantee that `R` is the correct type.
1021    pub(crate) unsafe fn cancel_task<R>(&self, task_id: usize) -> Option<R> {
1022        let mut state = self.lock();
1023        if let Some(task) = state.results.detach(task_id) {
1024            drop(state);
1025            return task.take_result();
1026        }
1027        state.all_tasks().get(&task_id).and_then(|task| {
1028            if task.cancel() {
1029                self.inner.executor.ready_tasks.push(task.clone());
1030            }
1031            task.take_result()
1032        })
1033    }
1034
1035    /// Cancels and detaches the task.
1036    pub(crate) fn cancel_and_detach(&self, task_id: usize) {
1037        let _tasks = {
1038            let mut state = ScopeWaker::from(self.lock());
1039            let maybe_task1 = state.results.detach(task_id);
1040            let mut maybe_task2 = None;
1041            if let Some(task) = state.all_tasks().get(&task_id) {
1042                match task.cancel_and_detach() {
1043                    CancelAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1044                    CancelAndDetachResult::AddToRunQueue => {
1045                        self.inner.executor.ready_tasks.push(task.clone());
1046                    }
1047                    CancelAndDetachResult::Pending => {}
1048                }
1049            }
1050            (maybe_task1, maybe_task2)
1051        };
1052    }
1053
1054    /// Polls for a join result for the given task ID.
1055    ///
1056    /// # Safety
1057    ///
1058    /// The caller must guarantee that `R` is the correct type.
1059    pub(crate) unsafe fn poll_join_result<R>(
1060        &self,
1061        task_id: usize,
1062        cx: &mut Context<'_>,
1063    ) -> Poll<R> {
1064        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1065        match task.take_result() {
1066            Some(result) => Poll::Ready(result),
1067            None => {
1068                // The task has been cancelled so all we can do is forever return pending.
1069                Poll::Pending
1070            }
1071        }
1072    }
1073
1074    /// Polls for the task to be cancelled.
1075    pub(crate) unsafe fn poll_cancelled<R>(
1076        &self,
1077        task_id: usize,
1078        cx: &mut Context<'_>,
1079    ) -> Poll<Option<R>> {
1080        let task = self.lock().results.poll_join_result(task_id, cx);
1081        task.map(|task| task.take_result())
1082    }
1083
1084    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1085        let returned_task = self.lock().insert_task(task, for_stream);
1086        returned_task.is_none()
1087    }
1088
1089    /// Drops the specified task.
1090    ///
1091    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1092    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1093    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1094    ///
1095    /// # Safety
1096    ///
1097    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1098    /// thread is currently polling the task.
1099    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1100        let mut state = ScopeWaker::from(self.lock());
1101        let task = state.take_task(task_id);
1102        if let Some(task) = task {
1103            task.drop_future_unchecked();
1104        }
1105    }
1106
1107    pub(super) fn task_did_finish(&self, id: usize) {
1108        let mut state = ScopeWaker::from(self.lock());
1109        state.task_did_finish(id);
1110    }
1111
1112    /// Cancels tasks in this scope and all child scopes.
1113    fn cancel_all_tasks(&self) {
1114        let mut scopes = vec![self.clone()];
1115        while let Some(scope) = scopes.pop() {
1116            let mut state = scope.lock();
1117            if !state.status().might_have_running_tasks() {
1118                // Already cancelled or closed.
1119                continue;
1120            }
1121            for task in state.all_tasks() {
1122                if task.cancel() {
1123                    task.scope().executor().ready_tasks.push(task.clone());
1124                }
1125                // Don't bother dropping tasks that are finished; the entire
1126                // scope is going to be dropped soon anyway.
1127            }
1128            // Copy children to a vec so we don't hold the lock for too long.
1129            scopes.extend(state.children().iter().filter_map(|child| child.upgrade()));
1130            state.mark_finished();
1131        }
1132    }
1133
1134    /// Drops tasks in this scope and all child scopes.
1135    ///
1136    /// # Panics
1137    ///
1138    /// Panics if any task is being accessed by another thread. Only call this
1139    /// method when the executor is shutting down and there are no other pollers.
1140    pub(super) fn drop_all_tasks(&self) {
1141        let mut scopes = vec![self.clone()];
1142        while let Some(scope) = scopes.pop() {
1143            let (tasks, join_results) = {
1144                let mut state = ScopeWaker::from(scope.lock());
1145                let (tasks, join_results, children) = state.set_closed_and_drain();
1146                scopes.extend(children.filter_map(|child| child.upgrade()));
1147                (tasks, join_results)
1148            };
1149            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1150            for task in tasks {
1151                task.try_drop().expect("Expected drop to succeed");
1152            }
1153            std::mem::drop(join_results);
1154        }
1155    }
1156}
1157
1158/// Optimizes removal from parent scope.
1159#[repr(transparent)]
1160struct PtrKey;
1161
1162impl Borrow<PtrKey> for WeakScopeHandle {
1163    fn borrow(&self) -> &PtrKey {
1164        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1165        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1166    }
1167}
1168
1169impl PartialEq for PtrKey {
1170    fn eq(&self, other: &Self) -> bool {
1171        self as *const _ == other as *const _
1172    }
1173}
1174
1175impl Eq for PtrKey {}
1176
1177impl hash::Hash for PtrKey {
1178    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1179        (self as *const PtrKey).hash(state);
1180    }
1181}
1182
1183#[derive(Default)]
1184struct JoinResults(HashMap<usize, JoinResult>);
1185
1186trait Results: Send + Sync + 'static {
1187    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1188    fn can_spawn(&self) -> bool;
1189
1190    /// Polls for the specified task having finished.
1191    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1192
1193    /// Called when a task finishes.
1194    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1195
1196    /// Called to drop any results for a particular task.
1197    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1198
1199    /// Takes *all* the stored results.
1200    fn take(&mut self) -> Box<dyn Any>;
1201
1202    /// Used only for testing.  Returns true if there are any results registered.
1203    #[cfg(test)]
1204    fn is_empty(&self) -> bool;
1205}
1206
1207impl Results for JoinResults {
1208    fn can_spawn(&self) -> bool {
1209        true
1210    }
1211
1212    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1213        match self.0.entry(task_id) {
1214            Entry::Occupied(mut o) => match o.get_mut() {
1215                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1216                JoinResult::Result(_) => {
1217                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1218                    return Poll::Ready(task);
1219                }
1220            },
1221            Entry::Vacant(v) => {
1222                v.insert(JoinResult::Waker(cx.waker().clone()));
1223            }
1224        }
1225        Poll::Pending
1226    }
1227
1228    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1229        match self.0.entry(task.id()) {
1230            Entry::Occupied(mut o) => {
1231                let JoinResult::Waker(waker) =
1232                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1233                else {
1234                    // It can't be JoinResult::Result because this function is the only
1235                    // function that sets that, and `task_did_finish` won't get called
1236                    // twice.
1237                    unreachable!()
1238                };
1239                Some(waker)
1240            }
1241            Entry::Vacant(v) => {
1242                v.insert(JoinResult::Result(task));
1243                None
1244            }
1245        }
1246    }
1247
1248    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1249        match self.0.remove(&task_id) {
1250            Some(JoinResult::Result(task)) => Some(task),
1251            _ => None,
1252        }
1253    }
1254
1255    fn take(&mut self) -> Box<dyn Any> {
1256        Box::new(Self(std::mem::take(&mut self.0)))
1257    }
1258
1259    #[cfg(test)]
1260    fn is_empty(&self) -> bool {
1261        self.0.is_empty()
1262    }
1263}
1264
1265#[derive(Default)]
1266struct ResultsStream<R> {
1267    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1268}
1269
1270struct ResultsStreamInner<R> {
1271    results: Vec<R>,
1272    waker: Option<Waker>,
1273}
1274
1275impl<R> Default for ResultsStreamInner<R> {
1276    fn default() -> Self {
1277        Self { results: Vec::new(), waker: None }
1278    }
1279}
1280
1281impl<R: Send + 'static> Results for ResultsStream<R> {
1282    fn can_spawn(&self) -> bool {
1283        false
1284    }
1285
1286    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1287        Poll::Pending
1288    }
1289
1290    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1291        let mut inner = self.inner.lock();
1292        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1293        // scope.
1294        inner.results.extend(unsafe { task.take_result() });
1295        inner.waker.take()
1296    }
1297
1298    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1299        None
1300    }
1301
1302    fn take(&mut self) -> Box<dyn Any> {
1303        Box::new(std::mem::take(&mut self.inner.lock().results))
1304    }
1305
1306    #[cfg(test)]
1307    fn is_empty(&self) -> bool {
1308        false
1309    }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314    use super::*;
1315    use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1316    use assert_matches::assert_matches;
1317    use fuchsia_sync::{Condvar, Mutex};
1318    use futures::channel::mpsc;
1319    use futures::future::join_all;
1320    use futures::{FutureExt, StreamExt};
1321    use std::future::{pending, poll_fn};
1322    use std::pin::{pin, Pin};
1323    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1324    use std::sync::Arc;
1325    use std::task::{Context, Poll};
1326    use std::time::Duration;
1327
1328    #[derive(Default)]
1329    struct RemoteControlFuture(Mutex<RCFState>);
1330    #[derive(Default)]
1331    struct RCFState {
1332        resolved: bool,
1333        waker: Option<Waker>,
1334    }
1335
1336    impl Future for &RemoteControlFuture {
1337        type Output = ();
1338        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1339            let mut this = self.0.lock();
1340            if this.resolved {
1341                Poll::Ready(())
1342            } else {
1343                this.waker.replace(cx.waker().clone());
1344                Poll::Pending
1345            }
1346        }
1347    }
1348
1349    impl RemoteControlFuture {
1350        fn new() -> Arc<Self> {
1351            Arc::new(Default::default())
1352        }
1353
1354        fn resolve(&self) {
1355            let mut this = self.0.lock();
1356            this.resolved = true;
1357            if let Some(waker) = this.waker.take() {
1358                waker.wake();
1359            }
1360        }
1361
1362        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1363            let this = Arc::clone(self);
1364            async move { (&*this).await }
1365        }
1366    }
1367
1368    #[test]
1369    fn compute_works_on_root_scope() {
1370        let mut executor = TestExecutor::new();
1371        let scope = executor.global_scope();
1372        let mut task = pin!(scope.compute(async { 1 }));
1373        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1374    }
1375
1376    #[test]
1377    fn compute_works_on_new_child() {
1378        let mut executor = TestExecutor::new();
1379        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1380        let mut task = pin!(scope.compute(async { 1 }));
1381        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1382    }
1383
1384    #[test]
1385    fn scope_drop_cancels_tasks() {
1386        let mut executor = TestExecutor::new();
1387        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1388        let mut task = pin!(scope.compute(async { 1 }));
1389        drop(scope);
1390        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1391    }
1392
1393    #[test]
1394    fn tasks_do_not_spawn_on_cancelled_scopes() {
1395        let mut executor = TestExecutor::new();
1396        let scope =
1397            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1398        let handle = scope.to_handle();
1399        let mut cancel = pin!(scope.cancel());
1400        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1401        let mut task = pin!(handle.compute(async { 1 }));
1402        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1403    }
1404
1405    #[test]
1406    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1407        let mut executor = TestExecutor::new();
1408        let scope =
1409            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1410        let handle = scope.to_handle();
1411        let mut close = pin!(scope.cancel());
1412        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1413        let mut task = pin!(handle.compute(async { 1 }));
1414        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1415    }
1416
1417    #[test]
1418    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1419        let mut executor = TestExecutor::new();
1420        let scope = executor.global_scope().new_child();
1421        let handle = scope.to_handle();
1422        handle.spawn(pending());
1423        let mut close = pin!(scope.close());
1424        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1425        let mut task = pin!(handle.compute(async { 1 }));
1426        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1427    }
1428
1429    #[test]
1430    fn spawn_works_on_child_and_grandchild() {
1431        let mut executor = TestExecutor::new();
1432        let scope = executor.global_scope().new_child();
1433        let child = scope.new_child();
1434        let grandchild = child.new_child();
1435        let mut child_task = pin!(child.compute(async { 1 }));
1436        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1437        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1438        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1439    }
1440
1441    #[test]
1442    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1443        let mut executor = TestExecutor::new();
1444        let scope = executor.global_scope().new_child();
1445        let child = scope.new_child();
1446        let grandchild = child.new_child();
1447        let mut child_task = pin!(child.compute(async { 1 }));
1448        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1449        drop(scope);
1450        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1451        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1452    }
1453
1454    #[test]
1455    fn completed_tasks_are_cleaned_up_after_cancel() {
1456        let mut executor = TestExecutor::new();
1457        let scope = executor.global_scope().new_child();
1458
1459        let task1 = scope.spawn(pending::<()>());
1460        let task2 = scope.spawn(async {});
1461        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1462        assert_eq!(scope.lock().all_tasks().len(), 1);
1463
1464        // Running the executor after cancelling the task isn't currently
1465        // necessary, but we might decide to do async cleanup in the future.
1466        assert_eq!(task1.cancel().now_or_never(), None);
1467        assert_eq!(task2.cancel().now_or_never(), Some(Some(())));
1468
1469        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1470        assert_eq!(scope.lock().all_tasks().len(), 0);
1471        assert!(scope.lock().results.is_empty());
1472    }
1473
1474    #[test]
1475    fn join_emtpy_scope() {
1476        let mut executor = TestExecutor::new();
1477        let scope = executor.global_scope().new_child();
1478        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1479    }
1480
1481    #[test]
1482    fn task_handle_preserves_access_to_result_after_join_begins() {
1483        let mut executor = TestExecutor::new();
1484        let scope = executor.global_scope().new_child();
1485        let mut task = scope.compute(async { 1 });
1486        scope.spawn(async {});
1487        let task2 = scope.spawn(pending::<()>());
1488        // Fuse to stay agnostic as to whether the join completes before or
1489        // after awaiting the task handle.
1490        let mut join = pin!(scope.join().fuse());
1491        let _ = executor.run_until_stalled(&mut join);
1492        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1493        let _ = task2.cancel();
1494        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1495    }
1496
1497    #[test]
1498    fn join_blocks_until_task_is_cancelled() {
1499        // Scope with one outstanding task handle and one cancelled task.
1500        // The scope is not complete until the outstanding task handle is cancelled.
1501        let mut executor = TestExecutor::new();
1502        let scope = executor.global_scope().new_child();
1503        let outstanding_task = scope.spawn(pending::<()>());
1504        let cancelled_task = scope.spawn(pending::<()>());
1505        assert_eq!(
1506            executor.run_until_stalled(&mut pin!(cancelled_task.cancel())),
1507            Poll::Ready(None)
1508        );
1509        let mut join = pin!(scope.join());
1510        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1511        assert_eq!(
1512            executor.run_until_stalled(&mut pin!(outstanding_task.cancel())),
1513            Poll::Ready(None)
1514        );
1515        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1516    }
1517
1518    #[test]
1519    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1520        let mut executor = TestExecutor::new();
1521        let scope = executor.global_scope().new_child();
1522        // The default is to detach.
1523        scope.spawn(pending::<()>());
1524        let mut join = pin!(scope.join());
1525        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1526        let mut cancel = pin!(join.cancel());
1527        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1528    }
1529
1530    #[test]
1531    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1532        let mut executor = TestExecutor::new();
1533        let scope = executor.global_scope().new_child();
1534        // The default is to detach.
1535        scope.spawn(pending::<()>());
1536        let mut close = pin!(scope.close());
1537        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1538        let mut cancel = pin!(close.cancel());
1539        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1540    }
1541
1542    #[test]
1543    fn join_scope_blocks_until_spawned_task_completes() {
1544        let mut executor = TestExecutor::new();
1545        let scope = executor.global_scope().new_child();
1546        let remote = RemoteControlFuture::new();
1547        let mut task = scope.spawn(remote.as_future());
1548        let mut scope_join = pin!(scope.join());
1549        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1550        remote.resolve();
1551        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1552        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1553    }
1554
1555    #[test]
1556    fn close_scope_blocks_until_spawned_task_completes() {
1557        let mut executor = TestExecutor::new();
1558        let scope = executor.global_scope().new_child();
1559        let remote = RemoteControlFuture::new();
1560        let mut task = scope.spawn(remote.as_future());
1561        let mut scope_close = pin!(scope.close());
1562        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1563        remote.resolve();
1564        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1565        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1566    }
1567
1568    #[test]
1569    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1570        let mut executor = TestExecutor::new();
1571        let scope = executor.global_scope().new_child();
1572        let child = scope.new_child();
1573        let remote = RemoteControlFuture::new();
1574        child.spawn(remote.as_future());
1575        let mut scope_join = pin!(scope.join());
1576        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1577        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1578        child.detach();
1579        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1580        remote.resolve();
1581        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1582    }
1583
1584    #[test]
1585    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1586        let mut executor = TestExecutor::new();
1587        let scope = executor.global_scope().new_child();
1588        let remote = RemoteControlFuture::new();
1589        {
1590            let remote = remote.clone();
1591            scope.spawn(async move {
1592                let child = Scope::new_with_name("child");
1593                child.spawn(async move {
1594                    Scope::current().spawn(remote.as_future());
1595                });
1596                child.detach();
1597            });
1598        }
1599        let mut scope_join = pin!(scope.join());
1600        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1601        remote.resolve();
1602        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1603    }
1604
1605    #[test]
1606    fn join_scope_blocks_when_blocked_child_is_detached() {
1607        let mut executor = TestExecutor::new();
1608        let scope = executor.global_scope().new_child();
1609        let child = scope.new_child();
1610        child.spawn(pending());
1611        let mut scope_join = pin!(scope.join());
1612        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1613        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1614        child.detach();
1615        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1616    }
1617
1618    #[test]
1619    fn join_scope_completes_when_blocked_child_is_cancelled() {
1620        let mut executor = TestExecutor::new();
1621        let scope = executor.global_scope().new_child();
1622        let child = scope.new_child();
1623        child.spawn(pending());
1624        let mut scope_join = pin!(scope.join());
1625        {
1626            let mut child_join = pin!(child.join());
1627            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1628            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1629        }
1630        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1631    }
1632
1633    #[test]
1634    fn detached_scope_can_spawn() {
1635        let mut executor = TestExecutor::new();
1636        let scope = executor.global_scope().new_child();
1637        let handle = scope.to_handle();
1638        scope.detach();
1639        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1640    }
1641
1642    #[test]
1643    fn dropped_scope_cannot_spawn() {
1644        let mut executor = TestExecutor::new();
1645        let scope = executor.global_scope().new_child();
1646        let handle = scope.to_handle();
1647        drop(scope);
1648        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1649    }
1650
1651    #[test]
1652    fn dropped_scope_with_running_task_cannot_spawn() {
1653        let mut executor = TestExecutor::new();
1654        let scope = executor.global_scope().new_child();
1655        let handle = scope.to_handle();
1656        let _running_task = handle.spawn(pending::<()>());
1657        drop(scope);
1658        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1659    }
1660
1661    #[test]
1662    fn joined_scope_cannot_spawn() {
1663        let mut executor = TestExecutor::new();
1664        let scope = executor.global_scope().new_child();
1665        let handle = scope.to_handle();
1666        let mut scope_join = pin!(scope.join());
1667        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1668        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1669    }
1670
1671    #[test]
1672    fn joining_scope_with_running_task_can_spawn() {
1673        let mut executor = TestExecutor::new();
1674        let scope = executor.global_scope().new_child();
1675        let handle = scope.to_handle();
1676        let _running_task = handle.spawn(pending::<()>());
1677        let mut scope_join = pin!(scope.join());
1678        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1679        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1680    }
1681
1682    #[test]
1683    fn joined_scope_child_cannot_spawn() {
1684        let mut executor = TestExecutor::new();
1685        let scope = executor.global_scope().new_child();
1686        let handle = scope.to_handle();
1687        let child_before_join = scope.new_child();
1688        assert_eq!(
1689            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1690            Poll::Ready(1)
1691        );
1692        let mut scope_join = pin!(scope.join());
1693        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1694        let child_after_join = handle.new_child();
1695        let grandchild_after_join = child_before_join.new_child();
1696        assert_eq!(
1697            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1698            Poll::Pending
1699        );
1700        assert_eq!(
1701            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1702            Poll::Pending
1703        );
1704        assert_eq!(
1705            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1706            Poll::Pending
1707        );
1708    }
1709
1710    #[test]
1711    fn closed_scope_child_cannot_spawn() {
1712        let mut executor = TestExecutor::new();
1713        let scope = executor.global_scope().new_child();
1714        let handle = scope.to_handle();
1715        let child_before_close = scope.new_child();
1716        assert_eq!(
1717            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1718            Poll::Ready(1)
1719        );
1720        let mut scope_close = pin!(scope.close());
1721        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1722        let child_after_close = handle.new_child();
1723        let grandchild_after_close = child_before_close.new_child();
1724        assert_eq!(
1725            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1726            Poll::Pending
1727        );
1728        assert_eq!(
1729            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1730            Poll::Pending
1731        );
1732        assert_eq!(
1733            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1734            Poll::Pending
1735        );
1736    }
1737
1738    #[test]
1739    fn can_join_child_first() {
1740        let mut executor = TestExecutor::new();
1741        let scope = executor.global_scope().new_child();
1742        let child = scope.new_child();
1743        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1744        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1745        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1746    }
1747
1748    #[test]
1749    fn can_join_parent_first() {
1750        let mut executor = TestExecutor::new();
1751        let scope = executor.global_scope().new_child();
1752        let child = scope.new_child();
1753        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1754        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1755        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1756    }
1757
1758    #[test]
1759    fn task_in_parent_scope_can_join_child() {
1760        let mut executor = TestExecutor::new();
1761        let scope = executor.global_scope().new_child();
1762        let child = scope.new_child();
1763        let remote = RemoteControlFuture::new();
1764        child.spawn(remote.as_future());
1765        scope.spawn(async move { child.join().await });
1766        let mut join = pin!(scope.join());
1767        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1768        remote.resolve();
1769        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1770    }
1771
1772    #[test]
1773    fn join_completes_while_completed_task_handle_is_held() {
1774        let mut executor = TestExecutor::new();
1775        let scope = executor.global_scope().new_child();
1776        let mut task = scope.compute(async { 1 });
1777        scope.spawn(async {});
1778        let mut join = pin!(scope.join());
1779        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1780        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1781    }
1782
1783    #[test]
1784    fn cancel_completes_while_task_holds_handle() {
1785        let mut executor = TestExecutor::new();
1786        let scope = executor.global_scope().new_child();
1787        let handle = scope.to_handle();
1788        let mut task = scope.compute(async move {
1789            loop {
1790                pending::<()>().await; // never returns
1791                handle.spawn(async {});
1792            }
1793        });
1794
1795        // Join should not complete because the task never does.
1796        let mut join = pin!(scope.join());
1797        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1798
1799        let mut cancel = pin!(join.cancel());
1800        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1801        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1802    }
1803
1804    #[test]
1805    fn cancel_from_handle_inside_task() {
1806        let mut executor = TestExecutor::new();
1807        let scope = executor.global_scope().new_child();
1808        {
1809            // Spawn a task that never finishes until the scope is cancelled.
1810            scope.spawn(pending::<()>());
1811
1812            let mut no_tasks = pin!(scope.on_no_tasks());
1813            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
1814
1815            let handle = scope.to_handle();
1816            scope.spawn(async move {
1817                handle.cancel().await;
1818                panic!("cancel() should never complete");
1819            });
1820
1821            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
1822        }
1823        assert_eq!(scope.join().now_or_never(), Some(()));
1824    }
1825
1826    #[test]
1827    fn can_spawn_from_non_executor_thread() {
1828        let mut executor = TestExecutor::new();
1829        let scope = executor.global_scope().clone();
1830        let done = Arc::new(AtomicBool::new(false));
1831        let done_clone = done.clone();
1832        let _ = std::thread::spawn(move || {
1833            scope.spawn(async move {
1834                done_clone.store(true, Ordering::Relaxed);
1835            })
1836        })
1837        .join();
1838        let _ = executor.run_until_stalled(&mut pending::<()>());
1839        assert!(done.load(Ordering::Relaxed));
1840    }
1841
1842    #[test]
1843    fn scope_tree() {
1844        // A
1845        //  \
1846        //   B
1847        //  / \
1848        // C   D
1849        let mut executor = TestExecutor::new();
1850        let a = executor.global_scope().new_child();
1851        let b = a.new_child();
1852        let c = b.new_child();
1853        let d = b.new_child();
1854        let a_remote = RemoteControlFuture::new();
1855        let c_remote = RemoteControlFuture::new();
1856        let d_remote = RemoteControlFuture::new();
1857        a.spawn(a_remote.as_future());
1858        c.spawn(c_remote.as_future());
1859        d.spawn(d_remote.as_future());
1860        let mut a_join = pin!(a.join());
1861        let mut b_join = pin!(b.join());
1862        let mut d_join = pin!(d.join());
1863        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1864        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1865        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
1866        d_remote.resolve();
1867        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1868        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1869        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
1870        c_remote.resolve();
1871        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1872        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
1873        a_remote.resolve();
1874        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
1875        let mut c_join = pin!(c.join());
1876        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
1877    }
1878
1879    #[test]
1880    fn on_no_tasks() {
1881        let mut executor = TestExecutor::new();
1882        let scope = executor.global_scope().new_child();
1883        let _task1 = scope.spawn(std::future::ready(()));
1884        let task2 = scope.spawn(pending::<()>());
1885
1886        let mut on_no_tasks = pin!(scope.on_no_tasks());
1887
1888        assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
1889
1890        let _ = task2.cancel();
1891
1892        let on_no_tasks2 = pin!(scope.on_no_tasks());
1893        let on_no_tasks3 = pin!(scope.on_no_tasks());
1894
1895        assert_matches!(
1896            executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
1897            Poll::Ready(_)
1898        );
1899    }
1900
1901    #[test]
1902    fn wake_all() {
1903        let mut executor = TestExecutor::new();
1904        let scope = executor.global_scope().new_child();
1905
1906        let poll_count = Arc::new(AtomicU64::new(0));
1907
1908        struct PollCounter(Arc<AtomicU64>);
1909
1910        impl Future for PollCounter {
1911            type Output = ();
1912            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1913                self.0.fetch_add(1, Ordering::Relaxed);
1914                Poll::Pending
1915            }
1916        }
1917
1918        scope.spawn(PollCounter(poll_count.clone()));
1919        scope.spawn(PollCounter(poll_count.clone()));
1920
1921        let _ = executor.run_until_stalled(&mut pending::<()>());
1922
1923        let mut start_count = poll_count.load(Ordering::Relaxed);
1924
1925        for _ in 0..2 {
1926            scope.wake_all();
1927            let _ = executor.run_until_stalled(&mut pending::<()>());
1928            assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
1929            start_count += 2;
1930        }
1931    }
1932
1933    #[test]
1934    fn on_no_tasks_race() {
1935        fn sleep_random() {
1936            use rand::Rng;
1937            std::thread::sleep(std::time::Duration::from_micros(
1938                rand::thread_rng().gen_range(0..10),
1939            ));
1940        }
1941        for _ in 0..2000 {
1942            let mut executor = SendExecutor::new(2);
1943            let scope = executor.root_scope().new_child();
1944            scope.spawn(async {
1945                sleep_random();
1946            });
1947            executor.run(async move {
1948                sleep_random();
1949                scope.on_no_tasks().await;
1950            });
1951        }
1952    }
1953
1954    async fn yield_to_executor() {
1955        let mut done = false;
1956        poll_fn(|cx| {
1957            if done {
1958                Poll::Ready(())
1959            } else {
1960                done = true;
1961                cx.waker().wake_by_ref();
1962                Poll::Pending
1963            }
1964        })
1965        .await;
1966    }
1967
1968    #[test]
1969    fn test_detach() {
1970        let mut e = LocalExecutor::new();
1971        e.run_singlethreaded(async {
1972            let counter = Arc::new(AtomicU32::new(0));
1973
1974            {
1975                let counter = counter.clone();
1976                Task::spawn(async move {
1977                    for _ in 0..5 {
1978                        yield_to_executor().await;
1979                        counter.fetch_add(1, Ordering::Relaxed);
1980                    }
1981                })
1982                .detach();
1983            }
1984
1985            while counter.load(Ordering::Relaxed) != 5 {
1986                yield_to_executor().await;
1987            }
1988        });
1989
1990        assert!(e.ehandle.root_scope.lock().results.is_empty());
1991    }
1992
1993    #[test]
1994    fn test_cancel() {
1995        let mut e = LocalExecutor::new();
1996        e.run_singlethreaded(async {
1997            let ref_count = Arc::new(());
1998            // First, just drop the task.
1999            {
2000                let ref_count = ref_count.clone();
2001                let _ = Task::spawn(async move {
2002                    let _ref_count = ref_count;
2003                    let _: () = std::future::pending().await;
2004                });
2005            }
2006
2007            while Arc::strong_count(&ref_count) != 1 {
2008                yield_to_executor().await;
2009            }
2010
2011            // Now try explicitly cancelling.
2012            let task = {
2013                let ref_count = ref_count.clone();
2014                Task::spawn(async move {
2015                    let _ref_count = ref_count;
2016                    let _: () = std::future::pending().await;
2017                })
2018            };
2019
2020            assert_eq!(task.cancel().await, None);
2021            while Arc::strong_count(&ref_count) != 1 {
2022                yield_to_executor().await;
2023            }
2024
2025            // Now cancel a task that has already finished.
2026            let task = {
2027                let ref_count = ref_count.clone();
2028                Task::spawn(async move {
2029                    let _ref_count = ref_count;
2030                })
2031            };
2032
2033            // Wait for it to finish.
2034            while Arc::strong_count(&ref_count) != 1 {
2035                yield_to_executor().await;
2036            }
2037
2038            assert_eq!(task.cancel().await, Some(()));
2039        });
2040
2041        assert!(e.ehandle.root_scope.lock().results.is_empty());
2042    }
2043
2044    #[test]
2045    fn test_cancel_waits() {
2046        let mut executor = SendExecutor::new(2);
2047        let running = Arc::new((Mutex::new(false), Condvar::new()));
2048        let task = {
2049            let running = running.clone();
2050            executor.root_scope().compute(async move {
2051                *running.0.lock() = true;
2052                running.1.notify_all();
2053                std::thread::sleep(std::time::Duration::from_millis(10));
2054                *running.0.lock() = false;
2055                "foo"
2056            })
2057        };
2058        executor.run(async move {
2059            {
2060                let mut guard = running.0.lock();
2061                while !*guard {
2062                    running.1.wait(&mut guard);
2063                }
2064            }
2065            assert_eq!(task.cancel().await, Some("foo"));
2066            assert!(!*running.0.lock());
2067        });
2068    }
2069
2070    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2071        let mut executor = SendExecutor::new(2);
2072        let running = Arc::new((Mutex::new(false), Condvar::new()));
2073        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2074        let task = {
2075            let running = running.clone();
2076            let can_quit = can_quit.clone();
2077            executor.root_scope().compute(async move {
2078                *running.0.lock() = true;
2079                running.1.notify_all();
2080                {
2081                    let mut guard = can_quit.0.lock();
2082                    while !*guard {
2083                        can_quit.1.wait(&mut guard);
2084                    }
2085                }
2086                *running.0.lock() = false;
2087            })
2088        };
2089        executor.run(async move {
2090            {
2091                let mut guard = running.0.lock();
2092                while !*guard {
2093                    running.1.wait(&mut guard);
2094                }
2095            }
2096
2097            callback(task);
2098
2099            *can_quit.0.lock() = true;
2100            can_quit.1.notify_all();
2101
2102            let ehandle = EHandle::local();
2103            let scope = ehandle.global_scope();
2104
2105            // The only way of testing for this is to poll.
2106            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2107                Timer::new(std::time::Duration::from_millis(1)).await;
2108            }
2109
2110            assert!(!*running.0.lock());
2111        });
2112    }
2113
2114    #[test]
2115    fn test_dropped_cancel_cleans_up() {
2116        test_clean_up(|task| {
2117            let cancel_fut = std::pin::pin!(task.cancel());
2118            let waker = futures::task::noop_waker();
2119            assert!(cancel_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2120        });
2121    }
2122
2123    #[test]
2124    fn test_dropped_task_cleans_up() {
2125        test_clean_up(|task| {
2126            std::mem::drop(task);
2127        });
2128    }
2129
2130    #[test]
2131    fn test_detach_cleans_up() {
2132        test_clean_up(|task| {
2133            task.detach();
2134        });
2135    }
2136
2137    #[test]
2138    fn test_scope_stream() {
2139        let mut executor = SendExecutor::new(2);
2140        executor.run(async move {
2141            let (stream, handle) = ScopeStream::new();
2142            handle.push(async { 1 });
2143            handle.push(async { 2 });
2144            stream.close();
2145            let results: HashSet<_> = stream.collect().await;
2146            assert_eq!(results, HashSet::from_iter([1, 2]));
2147        });
2148    }
2149
2150    #[test]
2151    fn test_scope_stream_wakes_properly() {
2152        let mut executor = SendExecutor::new(2);
2153        executor.run(async move {
2154            let (stream, handle) = ScopeStream::new();
2155            handle.push(async {
2156                Timer::new(Duration::from_millis(10)).await;
2157                1
2158            });
2159            handle.push(async {
2160                Timer::new(Duration::from_millis(10)).await;
2161                2
2162            });
2163            stream.close();
2164            let results: HashSet<_> = stream.collect().await;
2165            assert_eq!(results, HashSet::from_iter([1, 2]));
2166        });
2167    }
2168
2169    #[test]
2170    fn test_scope_stream_drops_spawned_tasks() {
2171        let mut executor = SendExecutor::new(2);
2172        executor.run(async move {
2173            let (stream, handle) = ScopeStream::new();
2174            handle.push(async { 1 });
2175            let _task = stream.compute(async { "foo" });
2176            stream.close();
2177            let results: HashSet<_> = stream.collect().await;
2178            assert_eq!(results, HashSet::from_iter([1]));
2179        });
2180    }
2181
2182    #[test]
2183    fn test_nested_scope_stream() {
2184        let mut executor = SendExecutor::new(2);
2185        executor.run(async move {
2186            let (mut stream, handle) = ScopeStream::new();
2187            handle.clone().push(async move {
2188                handle.clone().push(async move {
2189                    handle.clone().push(async move { 3 });
2190                    2
2191                });
2192                1
2193            });
2194            let mut results = HashSet::default();
2195            while let Some(item) = stream.next().await {
2196                results.insert(item);
2197                if results.len() == 3 {
2198                    stream.close();
2199                }
2200            }
2201            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2202        });
2203    }
2204
2205    #[test]
2206    fn test_dropping_scope_stream_cancels_all_tasks() {
2207        let mut executor = SendExecutor::new(2);
2208        executor.run(async move {
2209            let (stream, handle) = ScopeStream::new();
2210            let (tx1, mut rx) = mpsc::unbounded::<()>();
2211            let tx2 = tx1.clone();
2212            handle.push(async move {
2213                let _tx1 = tx1;
2214                let () = pending().await;
2215            });
2216            handle.push(async move {
2217                let _tx2 = tx2;
2218                let () = pending().await;
2219            });
2220            drop(stream);
2221
2222            // This will wait forever if the tasks aren't cancelled.
2223            assert_eq!(rx.next().await, None);
2224        });
2225    }
2226
2227    #[test]
2228    fn test_scope_stream_collect() {
2229        let mut executor = SendExecutor::new(2);
2230        executor.run(async move {
2231            let stream: ScopeStream<_> = (0..10).into_iter().map(|i| async move { i }).collect();
2232            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2233
2234            let stream: ScopeStream<_> =
2235                (0..10).into_iter().map(|i| SpawnableFuture::new(async move { i })).collect();
2236            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2237        });
2238    }
2239}