1use super::super::task::JoinHandle;
6use super::atomic_future::{AtomicFutureHandle, CancelAndDetachResult};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
71#[derive(Debug)]
72pub struct Scope {
73 inner: ScopeHandle,
75 }
77
78impl Scope {
79 pub fn new() -> Scope {
88 ScopeHandle::with_current(|handle| handle.new_child())
89 }
90
91 pub fn new_with_name(name: &str) -> Scope {
100 ScopeHandle::with_current(|handle| handle.new_child_with_name(name))
101 }
102
103 pub fn current() -> ScopeHandle {
111 ScopeHandle::with_current(|handle| handle.clone())
112 }
113
114 pub fn global() -> ScopeHandle {
131 EHandle::local().global_scope().clone()
132 }
133
134 pub fn new_child(&self) -> Scope {
136 self.inner.new_child()
137 }
138
139 pub fn new_child_with_name(&self, name: &str) -> Scope {
141 self.inner.new_child_with_name(name)
142 }
143
144 pub fn name(&self) -> &str {
146 &self.inner.inner.name
147 }
148
149 pub fn to_handle(&self) -> ScopeHandle {
157 self.inner.clone()
158 }
159
160 pub fn as_handle(&self) -> &ScopeHandle {
167 &self.inner
168 }
169
170 pub fn join(self) -> Join {
179 Join::new(self)
180 }
181
182 pub fn close(self) -> Join {
185 self.inner.close();
186 Join::new(self)
187 }
188
189 pub fn cancel(self) -> impl Future<Output = ()> {
203 self.inner.cancel_all_tasks();
204 Join::new(self)
205 }
206
207 pub fn detach(self) {
213 let this = ManuallyDrop::new(self);
216 mem::drop(unsafe { std::ptr::read(&this.inner) });
219 }
220}
221
222impl Drop for Scope {
225 fn drop(&mut self) {
226 self.inner.cancel_all_tasks();
235 }
236}
237
238impl IntoFuture for Scope {
239 type Output = ();
240 type IntoFuture = Join;
241 fn into_future(self) -> Self::IntoFuture {
242 self.join()
243 }
244}
245
246impl Deref for Scope {
247 type Target = ScopeHandle;
248 fn deref(&self) -> &Self::Target {
249 &self.inner
250 }
251}
252
253impl Borrow<ScopeHandle> for Scope {
254 fn borrow(&self) -> &ScopeHandle {
255 &*self
256 }
257}
258
259pin_project! {
260 pub struct Join<S = Scope> {
272 scope: S,
273 #[pin]
274 waker_entry: WakerEntry<ScopeState>,
275 }
276}
277
278impl<S> Join<S> {
279 fn new(scope: S) -> Self {
280 Self { scope, waker_entry: WakerEntry::new() }
281 }
282}
283
284impl Join {
285 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
290 self.scope.inner.cancel_all_tasks();
291 self
292 }
293}
294
295impl<S> Future for Join<S>
296where
297 S: Borrow<ScopeHandle>,
298{
299 type Output = ();
300 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301 let this = self.project();
302 let mut state = Borrow::borrow(&*this.scope).lock();
303 if state.has_tasks() {
304 state.add_waker(this.waker_entry, cx.waker().clone());
305 Poll::Pending
306 } else {
307 state.mark_finished();
308 Poll::Ready(())
309 }
310 }
311}
312
313pub trait Spawnable {
316 type Output;
318
319 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
321}
322
323impl<F: Future + Send + 'static> Spawnable for F
324where
325 F::Output: Send + 'static,
326{
327 type Output = F::Output;
328
329 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
330 scope.new_task(None, self)
331 }
332}
333
334#[derive(Clone)]
346pub struct ScopeHandle {
347 inner: Arc<ScopeInner>,
349 }
351
352impl ScopeHandle {
353 pub fn new_child(&self) -> Scope {
355 let mut state = self.lock();
356 let child = ScopeHandle {
357 inner: Arc::new(ScopeInner {
358 executor: self.inner.executor.clone(),
359 state: Condition::new(ScopeState::new(
360 Some(self.clone()),
361 state.status(),
362 JoinResults::default().into(),
363 )),
364 name: String::new(),
365 }),
366 };
367 let weak = child.downgrade();
368 state.insert_child(weak);
369 Scope { inner: child }
370 }
371
372 pub fn new_child_with_name(&self, name: &str) -> Scope {
374 let mut state = self.lock();
375 let child = ScopeHandle {
376 inner: Arc::new(ScopeInner {
377 executor: self.inner.executor.clone(),
378 state: Condition::new(ScopeState::new(
379 Some(self.clone()),
380 state.status(),
381 JoinResults::default().into(),
382 )),
383 name: name.to_string(),
384 }),
385 };
386 let weak = child.downgrade();
387 state.insert_child(weak);
388 Scope { inner: child }
389 }
390
391 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
395 let task = future.into_task(self.clone());
396 let task_id = task.id();
397 self.insert_task(task, false);
398 JoinHandle::new(self.clone(), task_id)
399 }
400
401 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
406 let task = self.new_local_task(None, future);
407 let id = task.id();
408 self.insert_task(task, false);
409 JoinHandle::new(self.clone(), id)
410 }
411
412 pub fn compute<T: Send + 'static>(
417 &self,
418 future: impl Spawnable<Output = T> + Send + 'static,
419 ) -> crate::Task<T> {
420 let task = future.into_task(self.clone());
421 let id = task.id();
422 self.insert_task(task, false);
423 JoinHandle::new(self.clone(), id).into()
424 }
425
426 pub fn compute_local<T: 'static>(
434 &self,
435 future: impl Future<Output = T> + 'static,
436 ) -> crate::Task<T> {
437 let task = self.new_local_task(None, future);
438 let id = task.id();
439 self.insert_task(task, false);
440 JoinHandle::new(self.clone(), id).into()
441 }
442
443 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
444 ScopeHandle {
445 inner: Arc::new(ScopeInner {
446 executor,
447 state: Condition::new(ScopeState::new(
448 None,
449 Status::default(),
450 JoinResults::default().into(),
451 )),
452 name: "root".to_string(),
453 }),
454 }
455 }
456
457 pub fn close(&self) {
465 self.lock().close();
466 }
467
468 pub fn cancel(self) -> impl Future<Output = ()> {
473 self.cancel_all_tasks();
474 Join::new(self)
475 }
476
477 pub async fn on_no_tasks(&self) {
484 self.inner
485 .state
486 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
487 .await;
488 }
489
490 pub fn wake_all(&self) {
492 self.lock().wake_all();
493 }
494
495 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
498 &self,
499 id: Option<usize>,
500 fut: Fut,
501 ) -> AtomicFutureHandle<'a>
502 where
503 Fut::Output: Send,
504 {
505 AtomicFutureHandle::new(
506 Some(self.clone()),
507 id.unwrap_or_else(|| self.executor().next_task_id()),
508 fut,
509 )
510 }
511
512 pub(crate) fn new_local_task<'a>(
515 &self,
516 id: Option<usize>,
517 fut: impl Future + 'a,
518 ) -> AtomicFutureHandle<'a> {
519 if !self.executor().is_local() {
521 panic!(
522 "Error: called `new_local_task` on multithreaded executor. \
523 Use `spawn` or a `LocalExecutor` instead."
524 );
525 }
526
527 unsafe {
530 AtomicFutureHandle::new_local(
531 Some(self.clone()),
532 id.unwrap_or_else(|| self.executor().next_task_id()),
533 fut,
534 )
535 }
536 }
537}
538
539impl fmt::Debug for ScopeHandle {
540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541 f.debug_struct("Scope").field("name", &self.inner.name).finish()
542 }
543}
544
545pub struct ScopeStream<R> {
555 inner: ScopeHandle,
556 stream: Arc<Mutex<ResultsStreamInner<R>>>,
557}
558
559impl<R: Send + 'static> ScopeStream<R> {
560 pub fn new() -> (Self, ScopeStreamHandle<R>) {
569 Self::new_with_name(String::new())
570 }
571
572 pub fn new_with_name(name: String) -> (Self, ScopeStreamHandle<R>) {
581 let this = ScopeHandle::with_current(|handle| {
582 let mut state = handle.lock();
583 let stream = Arc::default();
584 let child = ScopeHandle {
585 inner: Arc::new(ScopeInner {
586 executor: handle.executor().clone(),
587 state: Condition::new(ScopeState::new(
588 Some(handle.clone()),
589 state.status(),
590 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
591 )),
592 name,
593 }),
594 };
595 let weak = child.downgrade();
596 state.insert_child(weak);
597 ScopeStream { inner: child, stream }
598 });
599 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
600 (this, handle)
601 }
602}
603
604impl<R> Drop for ScopeStream<R> {
605 fn drop(&mut self) {
606 self.inner.cancel_all_tasks();
615 }
616}
617
618impl<R: Send + 'static> Stream for ScopeStream<R> {
619 type Item = R;
620
621 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622 let mut stream_inner = self.stream.lock();
623 match stream_inner.results.pop() {
624 Some(result) => Poll::Ready(Some(result)),
625 None => {
626 drop(stream_inner);
629 let state = self.inner.lock();
630 let mut stream_inner = self.stream.lock();
631 match stream_inner.results.pop() {
632 Some(result) => Poll::Ready(Some(result)),
633 None => {
634 if state.has_tasks() {
635 stream_inner.waker = Some(cx.waker().clone());
636 Poll::Pending
637 } else {
638 Poll::Ready(None)
639 }
640 }
641 }
642 }
643 }
644 }
645}
646
647impl<R> Deref for ScopeStream<R> {
648 type Target = ScopeHandle;
649 fn deref(&self) -> &Self::Target {
650 &self.inner
651 }
652}
653
654impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
655 fn borrow(&self) -> &ScopeHandle {
656 &*self
657 }
658}
659
660impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
661 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
662 let (stream, handle) = ScopeStream::new();
663 for fut in iter {
664 handle.push(fut);
665 }
666 stream.close();
667 stream
668 }
669}
670
671#[derive(Clone)]
672pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
673
674impl<R: Send> ScopeStreamHandle<R> {
675 pub fn push(&self, future: impl Spawnable<Output = R>) {
676 self.0.insert_task(future.into_task(self.0.clone()), true);
677 }
678}
679
680#[derive(Clone)]
686struct WeakScopeHandle {
687 inner: Weak<ScopeInner>,
688}
689
690impl WeakScopeHandle {
691 pub fn upgrade(&self) -> Option<ScopeHandle> {
693 self.inner.upgrade().map(|inner| ScopeHandle { inner })
694 }
695}
696
697impl hash::Hash for WeakScopeHandle {
698 fn hash<H: hash::Hasher>(&self, state: &mut H) {
699 Weak::as_ptr(&self.inner).hash(state);
700 }
701}
702
703impl PartialEq for WeakScopeHandle {
704 fn eq(&self, other: &Self) -> bool {
705 Weak::ptr_eq(&self.inner, &other.inner)
706 }
707}
708
709impl Eq for WeakScopeHandle {
710 }
713
714mod state {
717 use super::*;
718
719 pub struct ScopeState {
720 pub parent: Option<ScopeHandle>,
721 children: HashSet<WeakScopeHandle>,
723 all_tasks: HashSet<TaskHandle>,
724 subscopes_with_tasks: u32,
728 status: Status,
729 pub results: Box<dyn Results>,
731 }
732
733 pub enum JoinResult {
734 Waker(Waker),
735 Result(TaskHandle),
736 }
737
738 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
740 pub enum Status {
741 #[default]
742 Open,
744 Closed,
746 Finished,
750 }
751
752 impl Status {
753 pub fn can_spawn(&self) -> bool {
754 match self {
755 Status::Open => true,
756 Status::Closed | Status::Finished => false,
757 }
758 }
759
760 pub fn might_have_running_tasks(&self) -> bool {
761 match self {
762 Status::Open | Status::Closed => true,
763 Status::Finished => false,
764 }
765 }
766 }
767
768 impl ScopeState {
769 pub fn new(
770 parent: Option<ScopeHandle>,
771 status: Status,
772 results: Box<impl Results>,
773 ) -> Self {
774 Self {
775 parent,
776 children: Default::default(),
777 all_tasks: Default::default(),
778 subscopes_with_tasks: 0,
779 status,
780 results,
781 }
782 }
783 }
784
785 impl ScopeState {
786 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
787 &self.all_tasks
788 }
789
790 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
793 if !self.status.can_spawn() || (!for_stream && !self.results.can_spawn()) {
794 return Some(task);
795 }
796 if self.all_tasks.is_empty() && !self.register_first_task() {
797 return Some(task);
798 }
799 task.wake();
800 assert!(self.all_tasks.insert(task));
801 None
802 }
803
804 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
805 &self.children
806 }
807
808 pub fn insert_child(&mut self, child: WeakScopeHandle) {
809 self.children.insert(child);
810 }
811
812 pub fn remove_child(&mut self, child: &PtrKey) {
813 let found = self.children.remove(child);
814 assert!(found || self.children.is_empty());
817 }
818
819 pub fn status(&self) -> Status {
820 self.status
821 }
822
823 pub fn close(&mut self) {
824 self.status = Status::Closed;
825 }
826
827 pub fn mark_finished(&mut self) {
828 self.status = Status::Finished;
829 }
830
831 pub fn has_tasks(&self) -> bool {
832 self.subscopes_with_tasks > 0
833 }
834
835 pub fn wake_all(&self) {
836 for task in &self.all_tasks {
837 task.wake();
838 }
839 }
840
841 #[must_use]
845 fn register_first_task(&mut self) -> bool {
846 if !self.status.can_spawn() {
847 return false;
848 }
849 let can_spawn = match &self.parent {
850 Some(parent) => {
851 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
854 }
855 None => true,
856 };
857 if can_spawn {
858 self.subscopes_with_tasks += 1;
859 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
860 };
861 can_spawn
862 }
863
864 fn on_last_task_removed(
865 this: &mut ConditionGuard<'_, ScopeState>,
866 num_wakers_hint: usize,
867 wakers: &mut Vec<Waker>,
868 ) {
869 debug_assert!(this.subscopes_with_tasks > 0);
870 this.subscopes_with_tasks -= 1;
871 if this.subscopes_with_tasks > 0 {
872 wakers.reserve(num_wakers_hint);
873 return;
874 }
875
876 match &this.parent {
877 Some(parent) => {
878 Self::on_last_task_removed(
879 &mut parent.lock(),
880 num_wakers_hint + this.waker_count(),
881 wakers,
882 );
883 }
884 None => wakers.reserve(num_wakers_hint),
885 };
886 wakers.extend(this.drain_wakers());
887 }
888 }
889
890 #[derive(Default)]
891 struct WakeVec(Vec<Waker>);
892
893 impl Drop for WakeVec {
894 fn drop(&mut self) {
895 for waker in self.0.drain(..) {
896 waker.wake();
897 }
898 }
899 }
900
901 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
903
904 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
905 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
906 Self(value, WakeVec::default())
907 }
908 }
909
910 impl ScopeWaker<'_> {
911 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
912 let task = self.all_tasks.take(&id);
913 if task.is_some() {
914 self.on_task_removed(0);
915 }
916 task
917 }
918
919 pub fn task_did_finish(&mut self, id: usize) {
920 if let Some(task) = self.all_tasks.take(&id) {
921 self.on_task_removed(1);
922 if !task.is_detached() {
923 let maybe_waker = self.results.task_did_finish(task);
924 self.1 .0.extend(maybe_waker);
925 }
926 }
927 }
928
929 pub fn set_closed_and_drain(
930 &mut self,
931 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
932 self.close();
933 let all_tasks = std::mem::take(&mut self.all_tasks);
934 let results = self.results.take();
935 if !all_tasks.is_empty() {
936 self.on_task_removed(0)
937 }
938 let children = self.children.drain();
939 (all_tasks, results, children)
940 }
941
942 fn on_task_removed(&mut self, num_wakers_hint: usize) {
943 if self.all_tasks.is_empty() {
944 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
945 }
946 }
947 }
948
949 impl<'a> Deref for ScopeWaker<'a> {
950 type Target = ConditionGuard<'a, ScopeState>;
951
952 fn deref(&self) -> &Self::Target {
953 &self.0
954 }
955 }
956
957 impl DerefMut for ScopeWaker<'_> {
958 fn deref_mut(&mut self) -> &mut Self::Target {
959 &mut self.0
960 }
961 }
962}
963
964struct ScopeInner {
965 executor: Arc<Executor>,
966 state: Condition<ScopeState>,
967 name: String,
968}
969
970impl Drop for ScopeInner {
971 fn drop(&mut self) {
972 let key = unsafe { &*(self as *const _ as *const PtrKey) };
977 if let Some(parent) = &self.state.lock().parent {
978 let mut parent_state = parent.lock();
979 parent_state.remove_child(key);
980 }
981 }
982}
983
984impl ScopeHandle {
985 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
986 super::common::TaskHandle::with_current(|task| match task {
987 Some(task) => f(task.scope()),
988 None => f(EHandle::local().global_scope()),
989 })
990 }
991
992 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
993 self.inner.state.lock()
994 }
995
996 fn downgrade(&self) -> WeakScopeHandle {
997 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
998 }
999
1000 #[inline(always)]
1001 pub(crate) fn executor(&self) -> &Arc<Executor> {
1002 &self.inner.executor
1003 }
1004
1005 pub(crate) fn detach(&self, task_id: usize) {
1007 let _maybe_task = {
1008 let mut state = self.lock();
1009 if let Some(task) = state.all_tasks().get(&task_id) {
1010 task.detach();
1011 }
1012 state.results.detach(task_id)
1013 };
1014 }
1015
1016 pub(crate) unsafe fn cancel_task<R>(&self, task_id: usize) -> Option<R> {
1022 let mut state = self.lock();
1023 if let Some(task) = state.results.detach(task_id) {
1024 drop(state);
1025 return task.take_result();
1026 }
1027 state.all_tasks().get(&task_id).and_then(|task| {
1028 if task.cancel() {
1029 self.inner.executor.ready_tasks.push(task.clone());
1030 }
1031 task.take_result()
1032 })
1033 }
1034
1035 pub(crate) fn cancel_and_detach(&self, task_id: usize) {
1037 let _tasks = {
1038 let mut state = ScopeWaker::from(self.lock());
1039 let maybe_task1 = state.results.detach(task_id);
1040 let mut maybe_task2 = None;
1041 if let Some(task) = state.all_tasks().get(&task_id) {
1042 match task.cancel_and_detach() {
1043 CancelAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1044 CancelAndDetachResult::AddToRunQueue => {
1045 self.inner.executor.ready_tasks.push(task.clone());
1046 }
1047 CancelAndDetachResult::Pending => {}
1048 }
1049 }
1050 (maybe_task1, maybe_task2)
1051 };
1052 }
1053
1054 pub(crate) unsafe fn poll_join_result<R>(
1060 &self,
1061 task_id: usize,
1062 cx: &mut Context<'_>,
1063 ) -> Poll<R> {
1064 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1065 match task.take_result() {
1066 Some(result) => Poll::Ready(result),
1067 None => {
1068 Poll::Pending
1070 }
1071 }
1072 }
1073
1074 pub(crate) unsafe fn poll_cancelled<R>(
1076 &self,
1077 task_id: usize,
1078 cx: &mut Context<'_>,
1079 ) -> Poll<Option<R>> {
1080 let task = self.lock().results.poll_join_result(task_id, cx);
1081 task.map(|task| task.take_result())
1082 }
1083
1084 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1085 let returned_task = self.lock().insert_task(task, for_stream);
1086 returned_task.is_none()
1087 }
1088
1089 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1100 let mut state = ScopeWaker::from(self.lock());
1101 let task = state.take_task(task_id);
1102 if let Some(task) = task {
1103 task.drop_future_unchecked();
1104 }
1105 }
1106
1107 pub(super) fn task_did_finish(&self, id: usize) {
1108 let mut state = ScopeWaker::from(self.lock());
1109 state.task_did_finish(id);
1110 }
1111
1112 fn cancel_all_tasks(&self) {
1114 let mut scopes = vec![self.clone()];
1115 while let Some(scope) = scopes.pop() {
1116 let mut state = scope.lock();
1117 if !state.status().might_have_running_tasks() {
1118 continue;
1120 }
1121 for task in state.all_tasks() {
1122 if task.cancel() {
1123 task.scope().executor().ready_tasks.push(task.clone());
1124 }
1125 }
1128 scopes.extend(state.children().iter().filter_map(|child| child.upgrade()));
1130 state.mark_finished();
1131 }
1132 }
1133
1134 pub(super) fn drop_all_tasks(&self) {
1141 let mut scopes = vec![self.clone()];
1142 while let Some(scope) = scopes.pop() {
1143 let (tasks, join_results) = {
1144 let mut state = ScopeWaker::from(scope.lock());
1145 let (tasks, join_results, children) = state.set_closed_and_drain();
1146 scopes.extend(children.filter_map(|child| child.upgrade()));
1147 (tasks, join_results)
1148 };
1149 for task in tasks {
1151 task.try_drop().expect("Expected drop to succeed");
1152 }
1153 std::mem::drop(join_results);
1154 }
1155 }
1156}
1157
1158#[repr(transparent)]
1160struct PtrKey;
1161
1162impl Borrow<PtrKey> for WeakScopeHandle {
1163 fn borrow(&self) -> &PtrKey {
1164 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1166 }
1167}
1168
1169impl PartialEq for PtrKey {
1170 fn eq(&self, other: &Self) -> bool {
1171 self as *const _ == other as *const _
1172 }
1173}
1174
1175impl Eq for PtrKey {}
1176
1177impl hash::Hash for PtrKey {
1178 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1179 (self as *const PtrKey).hash(state);
1180 }
1181}
1182
1183#[derive(Default)]
1184struct JoinResults(HashMap<usize, JoinResult>);
1185
1186trait Results: Send + Sync + 'static {
1187 fn can_spawn(&self) -> bool;
1189
1190 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1192
1193 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1195
1196 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1198
1199 fn take(&mut self) -> Box<dyn Any>;
1201
1202 #[cfg(test)]
1204 fn is_empty(&self) -> bool;
1205}
1206
1207impl Results for JoinResults {
1208 fn can_spawn(&self) -> bool {
1209 true
1210 }
1211
1212 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1213 match self.0.entry(task_id) {
1214 Entry::Occupied(mut o) => match o.get_mut() {
1215 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1216 JoinResult::Result(_) => {
1217 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1218 return Poll::Ready(task);
1219 }
1220 },
1221 Entry::Vacant(v) => {
1222 v.insert(JoinResult::Waker(cx.waker().clone()));
1223 }
1224 }
1225 Poll::Pending
1226 }
1227
1228 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1229 match self.0.entry(task.id()) {
1230 Entry::Occupied(mut o) => {
1231 let JoinResult::Waker(waker) =
1232 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1233 else {
1234 unreachable!()
1238 };
1239 Some(waker)
1240 }
1241 Entry::Vacant(v) => {
1242 v.insert(JoinResult::Result(task));
1243 None
1244 }
1245 }
1246 }
1247
1248 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1249 match self.0.remove(&task_id) {
1250 Some(JoinResult::Result(task)) => Some(task),
1251 _ => None,
1252 }
1253 }
1254
1255 fn take(&mut self) -> Box<dyn Any> {
1256 Box::new(Self(std::mem::take(&mut self.0)))
1257 }
1258
1259 #[cfg(test)]
1260 fn is_empty(&self) -> bool {
1261 self.0.is_empty()
1262 }
1263}
1264
1265#[derive(Default)]
1266struct ResultsStream<R> {
1267 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1268}
1269
1270struct ResultsStreamInner<R> {
1271 results: Vec<R>,
1272 waker: Option<Waker>,
1273}
1274
1275impl<R> Default for ResultsStreamInner<R> {
1276 fn default() -> Self {
1277 Self { results: Vec::new(), waker: None }
1278 }
1279}
1280
1281impl<R: Send + 'static> Results for ResultsStream<R> {
1282 fn can_spawn(&self) -> bool {
1283 false
1284 }
1285
1286 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1287 Poll::Pending
1288 }
1289
1290 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1291 let mut inner = self.inner.lock();
1292 inner.results.extend(unsafe { task.take_result() });
1295 inner.waker.take()
1296 }
1297
1298 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1299 None
1300 }
1301
1302 fn take(&mut self) -> Box<dyn Any> {
1303 Box::new(std::mem::take(&mut self.inner.lock().results))
1304 }
1305
1306 #[cfg(test)]
1307 fn is_empty(&self) -> bool {
1308 false
1309 }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use super::*;
1315 use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1316 use assert_matches::assert_matches;
1317 use fuchsia_sync::{Condvar, Mutex};
1318 use futures::channel::mpsc;
1319 use futures::future::join_all;
1320 use futures::{FutureExt, StreamExt};
1321 use std::future::{pending, poll_fn};
1322 use std::pin::{pin, Pin};
1323 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1324 use std::sync::Arc;
1325 use std::task::{Context, Poll};
1326 use std::time::Duration;
1327
1328 #[derive(Default)]
1329 struct RemoteControlFuture(Mutex<RCFState>);
1330 #[derive(Default)]
1331 struct RCFState {
1332 resolved: bool,
1333 waker: Option<Waker>,
1334 }
1335
1336 impl Future for &RemoteControlFuture {
1337 type Output = ();
1338 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1339 let mut this = self.0.lock();
1340 if this.resolved {
1341 Poll::Ready(())
1342 } else {
1343 this.waker.replace(cx.waker().clone());
1344 Poll::Pending
1345 }
1346 }
1347 }
1348
1349 impl RemoteControlFuture {
1350 fn new() -> Arc<Self> {
1351 Arc::new(Default::default())
1352 }
1353
1354 fn resolve(&self) {
1355 let mut this = self.0.lock();
1356 this.resolved = true;
1357 if let Some(waker) = this.waker.take() {
1358 waker.wake();
1359 }
1360 }
1361
1362 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1363 let this = Arc::clone(self);
1364 async move { (&*this).await }
1365 }
1366 }
1367
1368 #[test]
1369 fn compute_works_on_root_scope() {
1370 let mut executor = TestExecutor::new();
1371 let scope = executor.global_scope();
1372 let mut task = pin!(scope.compute(async { 1 }));
1373 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1374 }
1375
1376 #[test]
1377 fn compute_works_on_new_child() {
1378 let mut executor = TestExecutor::new();
1379 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1380 let mut task = pin!(scope.compute(async { 1 }));
1381 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1382 }
1383
1384 #[test]
1385 fn scope_drop_cancels_tasks() {
1386 let mut executor = TestExecutor::new();
1387 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1388 let mut task = pin!(scope.compute(async { 1 }));
1389 drop(scope);
1390 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1391 }
1392
1393 #[test]
1394 fn tasks_do_not_spawn_on_cancelled_scopes() {
1395 let mut executor = TestExecutor::new();
1396 let scope =
1397 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1398 let handle = scope.to_handle();
1399 let mut cancel = pin!(scope.cancel());
1400 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1401 let mut task = pin!(handle.compute(async { 1 }));
1402 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1403 }
1404
1405 #[test]
1406 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1407 let mut executor = TestExecutor::new();
1408 let scope =
1409 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1410 let handle = scope.to_handle();
1411 let mut close = pin!(scope.cancel());
1412 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1413 let mut task = pin!(handle.compute(async { 1 }));
1414 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1415 }
1416
1417 #[test]
1418 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1419 let mut executor = TestExecutor::new();
1420 let scope = executor.global_scope().new_child();
1421 let handle = scope.to_handle();
1422 handle.spawn(pending());
1423 let mut close = pin!(scope.close());
1424 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1425 let mut task = pin!(handle.compute(async { 1 }));
1426 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1427 }
1428
1429 #[test]
1430 fn spawn_works_on_child_and_grandchild() {
1431 let mut executor = TestExecutor::new();
1432 let scope = executor.global_scope().new_child();
1433 let child = scope.new_child();
1434 let grandchild = child.new_child();
1435 let mut child_task = pin!(child.compute(async { 1 }));
1436 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1437 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1438 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1439 }
1440
1441 #[test]
1442 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1443 let mut executor = TestExecutor::new();
1444 let scope = executor.global_scope().new_child();
1445 let child = scope.new_child();
1446 let grandchild = child.new_child();
1447 let mut child_task = pin!(child.compute(async { 1 }));
1448 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1449 drop(scope);
1450 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1451 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1452 }
1453
1454 #[test]
1455 fn completed_tasks_are_cleaned_up_after_cancel() {
1456 let mut executor = TestExecutor::new();
1457 let scope = executor.global_scope().new_child();
1458
1459 let task1 = scope.spawn(pending::<()>());
1460 let task2 = scope.spawn(async {});
1461 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1462 assert_eq!(scope.lock().all_tasks().len(), 1);
1463
1464 assert_eq!(task1.cancel().now_or_never(), None);
1467 assert_eq!(task2.cancel().now_or_never(), Some(Some(())));
1468
1469 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1470 assert_eq!(scope.lock().all_tasks().len(), 0);
1471 assert!(scope.lock().results.is_empty());
1472 }
1473
1474 #[test]
1475 fn join_emtpy_scope() {
1476 let mut executor = TestExecutor::new();
1477 let scope = executor.global_scope().new_child();
1478 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1479 }
1480
1481 #[test]
1482 fn task_handle_preserves_access_to_result_after_join_begins() {
1483 let mut executor = TestExecutor::new();
1484 let scope = executor.global_scope().new_child();
1485 let mut task = scope.compute(async { 1 });
1486 scope.spawn(async {});
1487 let task2 = scope.spawn(pending::<()>());
1488 let mut join = pin!(scope.join().fuse());
1491 let _ = executor.run_until_stalled(&mut join);
1492 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1493 let _ = task2.cancel();
1494 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1495 }
1496
1497 #[test]
1498 fn join_blocks_until_task_is_cancelled() {
1499 let mut executor = TestExecutor::new();
1502 let scope = executor.global_scope().new_child();
1503 let outstanding_task = scope.spawn(pending::<()>());
1504 let cancelled_task = scope.spawn(pending::<()>());
1505 assert_eq!(
1506 executor.run_until_stalled(&mut pin!(cancelled_task.cancel())),
1507 Poll::Ready(None)
1508 );
1509 let mut join = pin!(scope.join());
1510 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1511 assert_eq!(
1512 executor.run_until_stalled(&mut pin!(outstanding_task.cancel())),
1513 Poll::Ready(None)
1514 );
1515 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1516 }
1517
1518 #[test]
1519 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1520 let mut executor = TestExecutor::new();
1521 let scope = executor.global_scope().new_child();
1522 scope.spawn(pending::<()>());
1524 let mut join = pin!(scope.join());
1525 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1526 let mut cancel = pin!(join.cancel());
1527 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1528 }
1529
1530 #[test]
1531 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1532 let mut executor = TestExecutor::new();
1533 let scope = executor.global_scope().new_child();
1534 scope.spawn(pending::<()>());
1536 let mut close = pin!(scope.close());
1537 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1538 let mut cancel = pin!(close.cancel());
1539 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1540 }
1541
1542 #[test]
1543 fn join_scope_blocks_until_spawned_task_completes() {
1544 let mut executor = TestExecutor::new();
1545 let scope = executor.global_scope().new_child();
1546 let remote = RemoteControlFuture::new();
1547 let mut task = scope.spawn(remote.as_future());
1548 let mut scope_join = pin!(scope.join());
1549 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1550 remote.resolve();
1551 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1552 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1553 }
1554
1555 #[test]
1556 fn close_scope_blocks_until_spawned_task_completes() {
1557 let mut executor = TestExecutor::new();
1558 let scope = executor.global_scope().new_child();
1559 let remote = RemoteControlFuture::new();
1560 let mut task = scope.spawn(remote.as_future());
1561 let mut scope_close = pin!(scope.close());
1562 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1563 remote.resolve();
1564 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1565 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1566 }
1567
1568 #[test]
1569 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1570 let mut executor = TestExecutor::new();
1571 let scope = executor.global_scope().new_child();
1572 let child = scope.new_child();
1573 let remote = RemoteControlFuture::new();
1574 child.spawn(remote.as_future());
1575 let mut scope_join = pin!(scope.join());
1576 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1577 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1578 child.detach();
1579 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1580 remote.resolve();
1581 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1582 }
1583
1584 #[test]
1585 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1586 let mut executor = TestExecutor::new();
1587 let scope = executor.global_scope().new_child();
1588 let remote = RemoteControlFuture::new();
1589 {
1590 let remote = remote.clone();
1591 scope.spawn(async move {
1592 let child = Scope::new_with_name("child");
1593 child.spawn(async move {
1594 Scope::current().spawn(remote.as_future());
1595 });
1596 child.detach();
1597 });
1598 }
1599 let mut scope_join = pin!(scope.join());
1600 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1601 remote.resolve();
1602 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1603 }
1604
1605 #[test]
1606 fn join_scope_blocks_when_blocked_child_is_detached() {
1607 let mut executor = TestExecutor::new();
1608 let scope = executor.global_scope().new_child();
1609 let child = scope.new_child();
1610 child.spawn(pending());
1611 let mut scope_join = pin!(scope.join());
1612 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1613 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1614 child.detach();
1615 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1616 }
1617
1618 #[test]
1619 fn join_scope_completes_when_blocked_child_is_cancelled() {
1620 let mut executor = TestExecutor::new();
1621 let scope = executor.global_scope().new_child();
1622 let child = scope.new_child();
1623 child.spawn(pending());
1624 let mut scope_join = pin!(scope.join());
1625 {
1626 let mut child_join = pin!(child.join());
1627 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1628 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1629 }
1630 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1631 }
1632
1633 #[test]
1634 fn detached_scope_can_spawn() {
1635 let mut executor = TestExecutor::new();
1636 let scope = executor.global_scope().new_child();
1637 let handle = scope.to_handle();
1638 scope.detach();
1639 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1640 }
1641
1642 #[test]
1643 fn dropped_scope_cannot_spawn() {
1644 let mut executor = TestExecutor::new();
1645 let scope = executor.global_scope().new_child();
1646 let handle = scope.to_handle();
1647 drop(scope);
1648 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1649 }
1650
1651 #[test]
1652 fn dropped_scope_with_running_task_cannot_spawn() {
1653 let mut executor = TestExecutor::new();
1654 let scope = executor.global_scope().new_child();
1655 let handle = scope.to_handle();
1656 let _running_task = handle.spawn(pending::<()>());
1657 drop(scope);
1658 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1659 }
1660
1661 #[test]
1662 fn joined_scope_cannot_spawn() {
1663 let mut executor = TestExecutor::new();
1664 let scope = executor.global_scope().new_child();
1665 let handle = scope.to_handle();
1666 let mut scope_join = pin!(scope.join());
1667 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1668 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1669 }
1670
1671 #[test]
1672 fn joining_scope_with_running_task_can_spawn() {
1673 let mut executor = TestExecutor::new();
1674 let scope = executor.global_scope().new_child();
1675 let handle = scope.to_handle();
1676 let _running_task = handle.spawn(pending::<()>());
1677 let mut scope_join = pin!(scope.join());
1678 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1679 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1680 }
1681
1682 #[test]
1683 fn joined_scope_child_cannot_spawn() {
1684 let mut executor = TestExecutor::new();
1685 let scope = executor.global_scope().new_child();
1686 let handle = scope.to_handle();
1687 let child_before_join = scope.new_child();
1688 assert_eq!(
1689 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1690 Poll::Ready(1)
1691 );
1692 let mut scope_join = pin!(scope.join());
1693 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1694 let child_after_join = handle.new_child();
1695 let grandchild_after_join = child_before_join.new_child();
1696 assert_eq!(
1697 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1698 Poll::Pending
1699 );
1700 assert_eq!(
1701 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1702 Poll::Pending
1703 );
1704 assert_eq!(
1705 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1706 Poll::Pending
1707 );
1708 }
1709
1710 #[test]
1711 fn closed_scope_child_cannot_spawn() {
1712 let mut executor = TestExecutor::new();
1713 let scope = executor.global_scope().new_child();
1714 let handle = scope.to_handle();
1715 let child_before_close = scope.new_child();
1716 assert_eq!(
1717 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1718 Poll::Ready(1)
1719 );
1720 let mut scope_close = pin!(scope.close());
1721 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1722 let child_after_close = handle.new_child();
1723 let grandchild_after_close = child_before_close.new_child();
1724 assert_eq!(
1725 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1726 Poll::Pending
1727 );
1728 assert_eq!(
1729 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1730 Poll::Pending
1731 );
1732 assert_eq!(
1733 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1734 Poll::Pending
1735 );
1736 }
1737
1738 #[test]
1739 fn can_join_child_first() {
1740 let mut executor = TestExecutor::new();
1741 let scope = executor.global_scope().new_child();
1742 let child = scope.new_child();
1743 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1744 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1745 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1746 }
1747
1748 #[test]
1749 fn can_join_parent_first() {
1750 let mut executor = TestExecutor::new();
1751 let scope = executor.global_scope().new_child();
1752 let child = scope.new_child();
1753 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1754 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1755 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1756 }
1757
1758 #[test]
1759 fn task_in_parent_scope_can_join_child() {
1760 let mut executor = TestExecutor::new();
1761 let scope = executor.global_scope().new_child();
1762 let child = scope.new_child();
1763 let remote = RemoteControlFuture::new();
1764 child.spawn(remote.as_future());
1765 scope.spawn(async move { child.join().await });
1766 let mut join = pin!(scope.join());
1767 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1768 remote.resolve();
1769 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1770 }
1771
1772 #[test]
1773 fn join_completes_while_completed_task_handle_is_held() {
1774 let mut executor = TestExecutor::new();
1775 let scope = executor.global_scope().new_child();
1776 let mut task = scope.compute(async { 1 });
1777 scope.spawn(async {});
1778 let mut join = pin!(scope.join());
1779 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1780 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1781 }
1782
1783 #[test]
1784 fn cancel_completes_while_task_holds_handle() {
1785 let mut executor = TestExecutor::new();
1786 let scope = executor.global_scope().new_child();
1787 let handle = scope.to_handle();
1788 let mut task = scope.compute(async move {
1789 loop {
1790 pending::<()>().await; handle.spawn(async {});
1792 }
1793 });
1794
1795 let mut join = pin!(scope.join());
1797 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1798
1799 let mut cancel = pin!(join.cancel());
1800 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1801 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1802 }
1803
1804 #[test]
1805 fn cancel_from_handle_inside_task() {
1806 let mut executor = TestExecutor::new();
1807 let scope = executor.global_scope().new_child();
1808 {
1809 scope.spawn(pending::<()>());
1811
1812 let mut no_tasks = pin!(scope.on_no_tasks());
1813 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
1814
1815 let handle = scope.to_handle();
1816 scope.spawn(async move {
1817 handle.cancel().await;
1818 panic!("cancel() should never complete");
1819 });
1820
1821 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
1822 }
1823 assert_eq!(scope.join().now_or_never(), Some(()));
1824 }
1825
1826 #[test]
1827 fn can_spawn_from_non_executor_thread() {
1828 let mut executor = TestExecutor::new();
1829 let scope = executor.global_scope().clone();
1830 let done = Arc::new(AtomicBool::new(false));
1831 let done_clone = done.clone();
1832 let _ = std::thread::spawn(move || {
1833 scope.spawn(async move {
1834 done_clone.store(true, Ordering::Relaxed);
1835 })
1836 })
1837 .join();
1838 let _ = executor.run_until_stalled(&mut pending::<()>());
1839 assert!(done.load(Ordering::Relaxed));
1840 }
1841
1842 #[test]
1843 fn scope_tree() {
1844 let mut executor = TestExecutor::new();
1850 let a = executor.global_scope().new_child();
1851 let b = a.new_child();
1852 let c = b.new_child();
1853 let d = b.new_child();
1854 let a_remote = RemoteControlFuture::new();
1855 let c_remote = RemoteControlFuture::new();
1856 let d_remote = RemoteControlFuture::new();
1857 a.spawn(a_remote.as_future());
1858 c.spawn(c_remote.as_future());
1859 d.spawn(d_remote.as_future());
1860 let mut a_join = pin!(a.join());
1861 let mut b_join = pin!(b.join());
1862 let mut d_join = pin!(d.join());
1863 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1864 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1865 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
1866 d_remote.resolve();
1867 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1868 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1869 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
1870 c_remote.resolve();
1871 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1872 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
1873 a_remote.resolve();
1874 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
1875 let mut c_join = pin!(c.join());
1876 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
1877 }
1878
1879 #[test]
1880 fn on_no_tasks() {
1881 let mut executor = TestExecutor::new();
1882 let scope = executor.global_scope().new_child();
1883 let _task1 = scope.spawn(std::future::ready(()));
1884 let task2 = scope.spawn(pending::<()>());
1885
1886 let mut on_no_tasks = pin!(scope.on_no_tasks());
1887
1888 assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
1889
1890 let _ = task2.cancel();
1891
1892 let on_no_tasks2 = pin!(scope.on_no_tasks());
1893 let on_no_tasks3 = pin!(scope.on_no_tasks());
1894
1895 assert_matches!(
1896 executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
1897 Poll::Ready(_)
1898 );
1899 }
1900
1901 #[test]
1902 fn wake_all() {
1903 let mut executor = TestExecutor::new();
1904 let scope = executor.global_scope().new_child();
1905
1906 let poll_count = Arc::new(AtomicU64::new(0));
1907
1908 struct PollCounter(Arc<AtomicU64>);
1909
1910 impl Future for PollCounter {
1911 type Output = ();
1912 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1913 self.0.fetch_add(1, Ordering::Relaxed);
1914 Poll::Pending
1915 }
1916 }
1917
1918 scope.spawn(PollCounter(poll_count.clone()));
1919 scope.spawn(PollCounter(poll_count.clone()));
1920
1921 let _ = executor.run_until_stalled(&mut pending::<()>());
1922
1923 let mut start_count = poll_count.load(Ordering::Relaxed);
1924
1925 for _ in 0..2 {
1926 scope.wake_all();
1927 let _ = executor.run_until_stalled(&mut pending::<()>());
1928 assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
1929 start_count += 2;
1930 }
1931 }
1932
1933 #[test]
1934 fn on_no_tasks_race() {
1935 fn sleep_random() {
1936 use rand::Rng;
1937 std::thread::sleep(std::time::Duration::from_micros(
1938 rand::thread_rng().gen_range(0..10),
1939 ));
1940 }
1941 for _ in 0..2000 {
1942 let mut executor = SendExecutor::new(2);
1943 let scope = executor.root_scope().new_child();
1944 scope.spawn(async {
1945 sleep_random();
1946 });
1947 executor.run(async move {
1948 sleep_random();
1949 scope.on_no_tasks().await;
1950 });
1951 }
1952 }
1953
1954 async fn yield_to_executor() {
1955 let mut done = false;
1956 poll_fn(|cx| {
1957 if done {
1958 Poll::Ready(())
1959 } else {
1960 done = true;
1961 cx.waker().wake_by_ref();
1962 Poll::Pending
1963 }
1964 })
1965 .await;
1966 }
1967
1968 #[test]
1969 fn test_detach() {
1970 let mut e = LocalExecutor::new();
1971 e.run_singlethreaded(async {
1972 let counter = Arc::new(AtomicU32::new(0));
1973
1974 {
1975 let counter = counter.clone();
1976 Task::spawn(async move {
1977 for _ in 0..5 {
1978 yield_to_executor().await;
1979 counter.fetch_add(1, Ordering::Relaxed);
1980 }
1981 })
1982 .detach();
1983 }
1984
1985 while counter.load(Ordering::Relaxed) != 5 {
1986 yield_to_executor().await;
1987 }
1988 });
1989
1990 assert!(e.ehandle.root_scope.lock().results.is_empty());
1991 }
1992
1993 #[test]
1994 fn test_cancel() {
1995 let mut e = LocalExecutor::new();
1996 e.run_singlethreaded(async {
1997 let ref_count = Arc::new(());
1998 {
2000 let ref_count = ref_count.clone();
2001 let _ = Task::spawn(async move {
2002 let _ref_count = ref_count;
2003 let _: () = std::future::pending().await;
2004 });
2005 }
2006
2007 while Arc::strong_count(&ref_count) != 1 {
2008 yield_to_executor().await;
2009 }
2010
2011 let task = {
2013 let ref_count = ref_count.clone();
2014 Task::spawn(async move {
2015 let _ref_count = ref_count;
2016 let _: () = std::future::pending().await;
2017 })
2018 };
2019
2020 assert_eq!(task.cancel().await, None);
2021 while Arc::strong_count(&ref_count) != 1 {
2022 yield_to_executor().await;
2023 }
2024
2025 let task = {
2027 let ref_count = ref_count.clone();
2028 Task::spawn(async move {
2029 let _ref_count = ref_count;
2030 })
2031 };
2032
2033 while Arc::strong_count(&ref_count) != 1 {
2035 yield_to_executor().await;
2036 }
2037
2038 assert_eq!(task.cancel().await, Some(()));
2039 });
2040
2041 assert!(e.ehandle.root_scope.lock().results.is_empty());
2042 }
2043
2044 #[test]
2045 fn test_cancel_waits() {
2046 let mut executor = SendExecutor::new(2);
2047 let running = Arc::new((Mutex::new(false), Condvar::new()));
2048 let task = {
2049 let running = running.clone();
2050 executor.root_scope().compute(async move {
2051 *running.0.lock() = true;
2052 running.1.notify_all();
2053 std::thread::sleep(std::time::Duration::from_millis(10));
2054 *running.0.lock() = false;
2055 "foo"
2056 })
2057 };
2058 executor.run(async move {
2059 {
2060 let mut guard = running.0.lock();
2061 while !*guard {
2062 running.1.wait(&mut guard);
2063 }
2064 }
2065 assert_eq!(task.cancel().await, Some("foo"));
2066 assert!(!*running.0.lock());
2067 });
2068 }
2069
2070 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2071 let mut executor = SendExecutor::new(2);
2072 let running = Arc::new((Mutex::new(false), Condvar::new()));
2073 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2074 let task = {
2075 let running = running.clone();
2076 let can_quit = can_quit.clone();
2077 executor.root_scope().compute(async move {
2078 *running.0.lock() = true;
2079 running.1.notify_all();
2080 {
2081 let mut guard = can_quit.0.lock();
2082 while !*guard {
2083 can_quit.1.wait(&mut guard);
2084 }
2085 }
2086 *running.0.lock() = false;
2087 })
2088 };
2089 executor.run(async move {
2090 {
2091 let mut guard = running.0.lock();
2092 while !*guard {
2093 running.1.wait(&mut guard);
2094 }
2095 }
2096
2097 callback(task);
2098
2099 *can_quit.0.lock() = true;
2100 can_quit.1.notify_all();
2101
2102 let ehandle = EHandle::local();
2103 let scope = ehandle.global_scope();
2104
2105 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2107 Timer::new(std::time::Duration::from_millis(1)).await;
2108 }
2109
2110 assert!(!*running.0.lock());
2111 });
2112 }
2113
2114 #[test]
2115 fn test_dropped_cancel_cleans_up() {
2116 test_clean_up(|task| {
2117 let cancel_fut = std::pin::pin!(task.cancel());
2118 let waker = futures::task::noop_waker();
2119 assert!(cancel_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2120 });
2121 }
2122
2123 #[test]
2124 fn test_dropped_task_cleans_up() {
2125 test_clean_up(|task| {
2126 std::mem::drop(task);
2127 });
2128 }
2129
2130 #[test]
2131 fn test_detach_cleans_up() {
2132 test_clean_up(|task| {
2133 task.detach();
2134 });
2135 }
2136
2137 #[test]
2138 fn test_scope_stream() {
2139 let mut executor = SendExecutor::new(2);
2140 executor.run(async move {
2141 let (stream, handle) = ScopeStream::new();
2142 handle.push(async { 1 });
2143 handle.push(async { 2 });
2144 stream.close();
2145 let results: HashSet<_> = stream.collect().await;
2146 assert_eq!(results, HashSet::from_iter([1, 2]));
2147 });
2148 }
2149
2150 #[test]
2151 fn test_scope_stream_wakes_properly() {
2152 let mut executor = SendExecutor::new(2);
2153 executor.run(async move {
2154 let (stream, handle) = ScopeStream::new();
2155 handle.push(async {
2156 Timer::new(Duration::from_millis(10)).await;
2157 1
2158 });
2159 handle.push(async {
2160 Timer::new(Duration::from_millis(10)).await;
2161 2
2162 });
2163 stream.close();
2164 let results: HashSet<_> = stream.collect().await;
2165 assert_eq!(results, HashSet::from_iter([1, 2]));
2166 });
2167 }
2168
2169 #[test]
2170 fn test_scope_stream_drops_spawned_tasks() {
2171 let mut executor = SendExecutor::new(2);
2172 executor.run(async move {
2173 let (stream, handle) = ScopeStream::new();
2174 handle.push(async { 1 });
2175 let _task = stream.compute(async { "foo" });
2176 stream.close();
2177 let results: HashSet<_> = stream.collect().await;
2178 assert_eq!(results, HashSet::from_iter([1]));
2179 });
2180 }
2181
2182 #[test]
2183 fn test_nested_scope_stream() {
2184 let mut executor = SendExecutor::new(2);
2185 executor.run(async move {
2186 let (mut stream, handle) = ScopeStream::new();
2187 handle.clone().push(async move {
2188 handle.clone().push(async move {
2189 handle.clone().push(async move { 3 });
2190 2
2191 });
2192 1
2193 });
2194 let mut results = HashSet::default();
2195 while let Some(item) = stream.next().await {
2196 results.insert(item);
2197 if results.len() == 3 {
2198 stream.close();
2199 }
2200 }
2201 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2202 });
2203 }
2204
2205 #[test]
2206 fn test_dropping_scope_stream_cancels_all_tasks() {
2207 let mut executor = SendExecutor::new(2);
2208 executor.run(async move {
2209 let (stream, handle) = ScopeStream::new();
2210 let (tx1, mut rx) = mpsc::unbounded::<()>();
2211 let tx2 = tx1.clone();
2212 handle.push(async move {
2213 let _tx1 = tx1;
2214 let () = pending().await;
2215 });
2216 handle.push(async move {
2217 let _tx2 = tx2;
2218 let () = pending().await;
2219 });
2220 drop(stream);
2221
2222 assert_eq!(rx.next().await, None);
2224 });
2225 }
2226
2227 #[test]
2228 fn test_scope_stream_collect() {
2229 let mut executor = SendExecutor::new(2);
2230 executor.run(async move {
2231 let stream: ScopeStream<_> = (0..10).into_iter().map(|i| async move { i }).collect();
2232 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2233
2234 let stream: ScopeStream<_> =
2235 (0..10).into_iter().map(|i| SpawnableFuture::new(async move { i })).collect();
2236 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2237 });
2238 }
2239}