1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use core::{error, fmt};
11use fuchsia_sync::Mutex;
12use futures::Stream;
13use pin_project_lite::pin_project;
14use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
15use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
16use std::any::Any;
17use std::borrow::Borrow;
18use std::collections::hash_map::Entry;
19use std::collections::hash_set;
20use std::future::{Future, IntoFuture};
21use std::hash;
22use std::marker::PhantomData;
23use std::mem::{self, ManuallyDrop};
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll, Waker, ready};
28
29#[must_use = "Scopes should be explicitly awaited or cancelled"]
74#[derive(Debug)]
75pub struct Scope {
76 inner: ScopeHandle,
78 }
80
81impl Default for Scope {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl Scope {
88 pub fn new() -> Scope {
97 ScopeHandle::with_current(|handle| handle.new_child())
98 }
99
100 pub fn new_with_name(name: impl Into<String>) -> Scope {
109 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
110 }
111
112 pub fn current() -> ScopeHandle {
120 ScopeHandle::with_current(|handle| handle.clone())
121 }
122
123 pub fn global() -> ScopeHandle {
140 EHandle::local().global_scope().clone()
141 }
142
143 pub fn new_child(&self) -> Scope {
145 self.inner.new_child()
146 }
147
148 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
150 self.inner.new_child_with_name(name.into())
151 }
152
153 pub fn name(&self) -> &str {
155 &self.inner.inner.name
156 }
157
158 pub fn to_handle(&self) -> ScopeHandle {
166 self.inner.clone()
167 }
168
169 pub fn as_handle(&self) -> &ScopeHandle {
176 &self.inner
177 }
178
179 pub fn join(self) -> Join {
188 Join::new(self)
189 }
190
191 pub fn close(self) -> Join {
194 self.inner.close();
195 Join::new(self)
196 }
197
198 pub fn cancel(self) -> Join {
216 self.inner.cancel_all_tasks();
217 Join::new(self)
218 }
219
220 pub fn abort(self) -> impl Future<Output = ()> {
235 self.inner.abort_all_tasks();
236 Join::new(self)
237 }
238
239 pub fn detach(self) {
245 let this = ManuallyDrop::new(self);
248 mem::drop(unsafe { std::ptr::read(&this.inner) });
251 }
252}
253
254impl Drop for Scope {
257 fn drop(&mut self) {
258 self.inner.abort_all_tasks();
267 }
268}
269
270impl IntoFuture for Scope {
271 type Output = ();
272 type IntoFuture = Join;
273 fn into_future(self) -> Self::IntoFuture {
274 self.join()
275 }
276}
277
278impl Deref for Scope {
279 type Target = ScopeHandle;
280 fn deref(&self) -> &Self::Target {
281 &self.inner
282 }
283}
284
285impl Borrow<ScopeHandle> for Scope {
286 fn borrow(&self) -> &ScopeHandle {
287 self
288 }
289}
290
291pin_project! {
292 pub struct Join<S = Scope> {
304 scope: S,
305 #[pin]
306 waker_entry: WakerEntry<ScopeState>,
307 }
308}
309
310impl<S: Borrow<ScopeHandle>> Join<S> {
311 fn new(scope: S) -> Self {
312 let waker_entry = scope.borrow().inner.state.waker_entry();
313 Self { scope, waker_entry }
314 }
315
316 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321 self.scope.borrow().abort_all_tasks();
322 self
323 }
324
325 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330 self.scope.borrow().cancel_all_tasks();
331 self
332 }
333}
334
335impl<S> Future for Join<S>
336where
337 S: Borrow<ScopeHandle>,
338{
339 type Output = ();
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let this = self.project();
342 let mut state = Borrow::borrow(&*this.scope).lock();
343 if state.has_tasks() {
344 state.add_waker(this.waker_entry, cx.waker().clone());
345 Poll::Pending
346 } else {
347 state.mark_finished();
348 Poll::Ready(())
349 }
350 }
351}
352
353pub trait Spawnable {
356 type Output;
358
359 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365 F::Output: Send + 'static,
366{
367 type Output = F::Output;
368
369 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370 scope.new_task(self)
371 }
372}
373
374#[derive(Clone)]
386pub struct ScopeHandle {
387 inner: Arc<ScopeInner>,
389 }
391
392impl ScopeHandle {
393 pub fn new_child(&self) -> Scope {
395 self.new_child_inner(String::new())
396 }
397
398 pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
400 self.inner.instrument_data.as_deref()
401 }
402
403 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
405 self.new_child_inner(name.into())
406 }
407
408 fn new_child_inner(&self, name: String) -> Scope {
409 let mut state = self.lock();
410 let child = ScopeHandle {
411 inner: Arc::new(ScopeInner {
412 executor: self.inner.executor.clone(),
413 state: Condition::new(ScopeState::new_child(
414 self.clone(),
415 &state,
416 JoinResults::default().into(),
417 )),
418
419 instrument_data: self
420 .inner
421 .executor
422 .instrument
423 .as_ref()
424 .map(|value| value.scope_created(&name, Some(self))),
425 name,
426 }),
427 };
428 let weak = child.downgrade();
429 state.insert_child(weak);
430 Scope { inner: child }
431 }
432
433 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
437 let task = future.into_task(self.clone());
438 let task_handle = task.clone();
439 self.insert_task(task, false);
440 JoinHandle::new(self.clone(), task_handle)
441 }
442
443 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
448 let task = self.new_local_task(future);
449 self.insert_task(task.clone(), false);
450 JoinHandle::new(self.clone(), task)
451 }
452
453 pub fn compute<T: Send + 'static>(
458 &self,
459 future: impl Spawnable<Output = T> + Send + 'static,
460 ) -> crate::Task<T> {
461 let task = future.into_task(self.clone());
462 let task_handle = task.clone();
463 self.insert_task(task, false);
464 JoinHandle::new(self.clone(), task_handle).into()
465 }
466
467 pub fn compute_local<T: 'static>(
475 &self,
476 future: impl Future<Output = T> + 'static,
477 ) -> crate::Task<T> {
478 let task = self.new_local_task(future);
479 self.insert_task(task.clone(), false);
480 JoinHandle::new(self.clone(), task).into()
481 }
482
483 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
484 ScopeHandle {
485 inner: Arc::new(ScopeInner {
486 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
487 name: "root".to_string(),
488 instrument_data: executor
489 .instrument
490 .as_ref()
491 .map(|value| value.scope_created("root", None)),
492 executor,
493 }),
494 }
495 }
496
497 pub fn close(&self) {
505 self.lock().close();
506 }
507
508 pub fn cancel(self) -> Join<Self> {
513 self.cancel_all_tasks();
514 Join::new(self)
515 }
516
517 pub fn abort(self) -> impl Future<Output = ()> {
522 self.abort_all_tasks();
523 Join::new(self)
524 }
525
526 #[must_use]
538 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
539 ScopeActiveGuard::new(self)
540 }
541
542 pub fn is_cancelled(&self) -> bool {
545 self.lock().status().is_cancelled()
546 }
547
548 pub async fn on_no_tasks(&self) {
555 self.inner
556 .state
557 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
558 .await;
559 }
560
561 pub async fn on_no_tasks_and_guards(&self) {
564 self.inner
565 .state
566 .when(|state| {
567 if state.has_tasks() || state.guards() > 0 {
568 Poll::Pending
569 } else {
570 Poll::Ready(())
571 }
572 })
573 .await;
574 }
575
576 pub fn wake_all_with_active_guard(&self) {
578 self.lock().wake_all_with_active_guard();
579 }
580
581 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(&self, fut: Fut) -> AtomicFutureHandle<'a>
584 where
585 Fut::Output: Send,
586 {
587 let mut task = AtomicFutureHandle::new(Some(self.clone()), fut);
588 if let Some(instrument) = &self.executor().instrument {
589 instrument.task_created(self, &mut task);
590 }
591 task
592 }
593
594 pub(crate) fn new_local_task<'a>(&self, fut: impl Future + 'a) -> AtomicFutureHandle<'a> {
597 if !self.executor().is_local() {
599 panic!(
600 "Error: called `new_local_task` on multithreaded executor. \
601 Use `spawn` or a `LocalExecutor` instead."
602 );
603 }
604 assert_eq!(
605 self.executor().first_thread_id.get(),
606 Some(&std::thread::current().id()),
607 "Error: called `new_local_task` on a different thread to the executor",
608 );
609
610 unsafe {
613 let mut task = AtomicFutureHandle::new_local(Some(self.clone()), fut);
614 if let Some(instrument) = &self.executor().instrument {
615 instrument.task_created(self, &mut task);
616 }
617 task
618 }
619 }
620}
621
622impl fmt::Debug for ScopeHandle {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 f.debug_struct("Scope").field("name", &self.inner.name).finish()
625 }
626}
627
628pub struct ScopeStream<R> {
638 inner: ScopeHandle,
639 stream: Arc<Mutex<ResultsStreamInner<R>>>,
640}
641
642impl<R: Send + 'static> ScopeStream<R> {
643 pub fn new() -> (Self, ScopeStreamHandle<R>) {
652 Self::new_inner(String::new())
653 }
654
655 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
664 Self::new_inner(name.into())
665 }
666
667 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
668 let this = ScopeHandle::with_current(|handle| {
669 let mut state = handle.lock();
670 let stream = Arc::default();
671 let child = ScopeHandle {
672 inner: Arc::new(ScopeInner {
673 executor: handle.executor().clone(),
674 state: Condition::new(ScopeState::new_child(
675 handle.clone(),
676 &state,
677 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
678 )),
679 instrument_data: handle
680 .executor()
681 .instrument
682 .as_ref()
683 .map(|value| value.scope_created(&name, Some(handle))),
684 name,
685 }),
686 };
687 let weak = child.downgrade();
688 state.insert_child(weak);
689 ScopeStream { inner: child, stream }
690 });
691 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
692 (this, handle)
693 }
694}
695
696impl<R> Drop for ScopeStream<R> {
697 fn drop(&mut self) {
698 self.inner.abort_all_tasks();
707 }
708}
709
710impl<R: Send + 'static> Stream for ScopeStream<R> {
711 type Item = R;
712
713 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
714 let mut stream_inner = self.stream.lock();
715 match stream_inner.results.pop() {
716 Some(result) => Poll::Ready(Some(result)),
717 None => {
718 drop(stream_inner);
721 let state = self.inner.lock();
722 let mut stream_inner = self.stream.lock();
723 match stream_inner.results.pop() {
724 Some(result) => Poll::Ready(Some(result)),
725 None => {
726 if state.has_tasks() {
727 stream_inner.waker = Some(cx.waker().clone());
728 Poll::Pending
729 } else {
730 Poll::Ready(None)
731 }
732 }
733 }
734 }
735 }
736 }
737}
738
739impl<R> Deref for ScopeStream<R> {
740 type Target = ScopeHandle;
741 fn deref(&self) -> &Self::Target {
742 &self.inner
743 }
744}
745
746impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
747 fn borrow(&self) -> &ScopeHandle {
748 self
749 }
750}
751
752impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
753 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
754 let (stream, handle) = ScopeStream::new();
755 for fut in iter {
756 handle.push(fut);
757 }
758 stream.close();
759 stream
760 }
761}
762
763#[derive(Clone)]
764pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
765
766impl<R: Send> ScopeStreamHandle<R> {
767 pub fn push(&self, future: impl Spawnable<Output = R>) {
768 self.0.insert_task(future.into_task(self.0.clone()), true);
769 }
770}
771
772#[derive(Debug)]
782#[must_use]
783pub struct ScopeActiveGuard(ScopeHandle);
784
785impl Deref for ScopeActiveGuard {
786 type Target = ScopeHandle;
787 fn deref(&self) -> &Self::Target {
788 &self.0
789 }
790}
791
792impl Drop for ScopeActiveGuard {
793 fn drop(&mut self) {
794 let Self(scope) = self;
795 scope.release_cancel_guard();
796 }
797}
798
799impl Clone for ScopeActiveGuard {
800 fn clone(&self) -> Self {
801 self.0.lock().acquire_cancel_guard(1);
802 Self(self.0.clone())
803 }
804}
805
806impl ScopeActiveGuard {
807 pub fn as_handle(&self) -> &ScopeHandle {
809 &self.0
810 }
811
812 pub fn to_handle(&self) -> ScopeHandle {
814 self.0.clone()
815 }
816
817 pub async fn on_cancel(&self) {
823 self.0
824 .inner
825 .state
826 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
827 .await
828 }
829
830 fn new(scope: &ScopeHandle) -> Option<Self> {
831 if scope.lock().acquire_cancel_guard_if_not_finished() {
832 Some(Self(scope.clone()))
833 } else {
834 None
835 }
836 }
837}
838
839pub struct JoinError {
841 _phantom: PhantomData<()>,
842}
843
844impl fmt::Debug for JoinError {
845 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
846 f.debug_tuple("JoinError").finish()
847 }
848}
849
850impl fmt::Display for JoinError {
851 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
852 write!(f, "JoinError: a task failed to execute to completion")
853 }
854}
855
856impl error::Error for JoinError {}
857
858#[derive(Clone)]
864struct WeakScopeHandle {
865 inner: Weak<ScopeInner>,
866}
867
868impl WeakScopeHandle {
869 pub fn upgrade(&self) -> Option<ScopeHandle> {
871 self.inner.upgrade().map(|inner| ScopeHandle { inner })
872 }
873}
874
875impl hash::Hash for WeakScopeHandle {
876 fn hash<H: hash::Hasher>(&self, state: &mut H) {
877 Weak::as_ptr(&self.inner).hash(state);
878 }
879}
880
881impl PartialEq for WeakScopeHandle {
882 fn eq(&self, other: &Self) -> bool {
883 Weak::ptr_eq(&self.inner, &other.inner)
884 }
885}
886
887impl Eq for WeakScopeHandle {
888 }
891
892mod state {
895 use super::*;
896
897 pub struct ScopeState {
898 pub parent: Option<ScopeHandle>,
899 children: HashSet<WeakScopeHandle>,
901 all_tasks: HashSet<TaskHandle>,
902 subscopes_with_tasks: u32,
906 can_spawn: bool,
907 guards: u32,
908 status: Status,
909 pub results: Box<dyn Results>,
911 }
912
913 pub enum JoinResult {
914 Waker(Waker),
915 Ready,
916 }
917
918 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
920 pub enum Status {
921 #[default]
922 Active,
924 PendingCancellation,
927 Finished,
930 }
931
932 impl Status {
933 pub fn is_cancelled(&self) -> bool {
935 match self {
936 Self::Active => false,
937 Self::PendingCancellation | Self::Finished => true,
938 }
939 }
940 }
941
942 impl ScopeState {
943 pub fn new_root(results: Box<impl Results>) -> Self {
944 Self {
945 parent: None,
946 children: Default::default(),
947 all_tasks: Default::default(),
948 subscopes_with_tasks: 0,
949 can_spawn: true,
950 guards: 0,
951 status: Default::default(),
952 results,
953 }
954 }
955
956 pub fn new_child(
957 parent_handle: ScopeHandle,
958 parent_state: &Self,
959 results: Box<impl Results>,
960 ) -> Self {
961 let (status, can_spawn) = match parent_state.status {
962 Status::Active => (Status::Active, parent_state.can_spawn),
963 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
964 };
965 Self {
966 parent: Some(parent_handle),
967 children: Default::default(),
968 all_tasks: Default::default(),
969 subscopes_with_tasks: 0,
970 can_spawn,
971 guards: 0,
972 status,
973 results,
974 }
975 }
976 }
977
978 impl ScopeState {
979 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
980 &self.all_tasks
981 }
982
983 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
986 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
987 return Some(task);
988 }
989 if self.all_tasks.is_empty() && !self.register_first_task() {
990 return Some(task);
991 }
992 task.wake();
993 assert!(self.all_tasks.insert(task), "Task must be new");
994 None
995 }
996
997 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
998 &self.children
999 }
1000
1001 pub fn insert_child(&mut self, child: WeakScopeHandle) {
1002 self.children.insert(child);
1003 }
1004
1005 pub fn remove_child(&mut self, child: &PtrKey) {
1006 let found = self.children.remove(child);
1007 assert!(found || self.children.is_empty());
1010 }
1011
1012 pub fn status(&self) -> Status {
1013 self.status
1014 }
1015
1016 pub fn guards(&self) -> u32 {
1017 self.guards
1018 }
1019
1020 pub fn close(&mut self) {
1021 self.can_spawn = false;
1022 }
1023
1024 pub fn mark_finished(&mut self) {
1025 self.can_spawn = false;
1026 self.status = Status::Finished;
1027 }
1028
1029 pub fn has_tasks(&self) -> bool {
1030 self.subscopes_with_tasks > 0
1031 }
1032
1033 pub fn wake_all_with_active_guard(&mut self) {
1034 let mut count = 0;
1035 for task in &self.all_tasks {
1036 if task.wake_with_active_guard() {
1037 count += 1;
1038 }
1039 }
1040 self.acquire_cancel_guard(count);
1041 }
1042
1043 pub fn abort_tasks_and_mark_finished(&mut self) {
1044 for task in self.all_tasks() {
1045 if task.abort() {
1046 task.scope().executor().ready_tasks.push(task.clone());
1047 }
1048 }
1051 self.mark_finished();
1052 }
1053
1054 pub fn wake_wakers_and_mark_pending(
1055 this: &mut ConditionGuard<'_, ScopeState>,
1056 wakers: &mut Vec<Waker>,
1057 ) {
1058 wakers.extend(this.drain_wakers());
1059 this.status = Status::PendingCancellation;
1060 }
1061
1062 #[must_use]
1066 fn register_first_task(&mut self) -> bool {
1067 if !self.can_spawn {
1068 return false;
1069 }
1070 let can_spawn = match &self.parent {
1071 Some(parent) => {
1072 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1075 }
1076 None => true,
1077 };
1078 if can_spawn {
1079 self.subscopes_with_tasks += 1;
1080 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1081 };
1082 can_spawn
1083 }
1084
1085 fn on_last_task_removed(
1086 this: &mut ConditionGuard<'_, ScopeState>,
1087 num_wakers_hint: usize,
1088 wakers: &mut Vec<Waker>,
1089 ) {
1090 debug_assert!(this.subscopes_with_tasks > 0);
1091 this.subscopes_with_tasks -= 1;
1092 if this.subscopes_with_tasks > 0 {
1093 wakers.reserve(num_wakers_hint);
1094 return;
1095 }
1096
1097 match &this.parent {
1098 Some(parent) => {
1099 Self::on_last_task_removed(
1100 &mut parent.lock(),
1101 num_wakers_hint + this.waker_count(),
1102 wakers,
1103 );
1104 }
1105 None => wakers.reserve(num_wakers_hint),
1106 };
1107 wakers.extend(this.drain_wakers());
1108 }
1109
1110 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1114 match self.status {
1115 Status::Active | Status::PendingCancellation => {
1116 self.acquire_cancel_guard(1);
1117 true
1118 }
1119 Status::Finished => false,
1120 }
1121 }
1122
1123 pub fn acquire_cancel_guard(&mut self, count: u32) {
1124 if count == 0 {
1125 return;
1126 }
1127 if self.guards == 0
1128 && let Some(parent) = self.parent.as_ref()
1129 {
1130 parent.acquire_cancel_guard();
1131 }
1132 self.guards += count;
1133 }
1134
1135 pub fn release_cancel_guard(
1136 this: &mut ConditionGuard<'_, Self>,
1137 wake_vec: &mut WakeVec,
1138 mut waker_count: usize,
1139 ) {
1140 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1141 if this.guards == 0 {
1142 waker_count += this.waker_count();
1143 this.on_zero_guards(wake_vec, waker_count);
1144 wake_vec.0.extend(this.drain_wakers())
1145 } else {
1146 wake_vec.0.reserve_exact(waker_count);
1147 }
1148 }
1149
1150 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1151 match self.status {
1152 Status::Active => {}
1153 Status::PendingCancellation => {
1154 self.abort_tasks_and_mark_finished();
1155 }
1156 Status::Finished => {}
1159 }
1160 if let Some(parent) = &self.parent {
1161 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1162 }
1163 }
1164 }
1165
1166 #[derive(Default)]
1167 pub struct WakeVec(Vec<Waker>);
1168
1169 impl Drop for WakeVec {
1170 fn drop(&mut self) {
1171 for waker in self.0.drain(..) {
1172 waker.wake();
1173 }
1174 }
1175 }
1176
1177 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1179
1180 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1181 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1182 Self(value, WakeVec::default())
1183 }
1184 }
1185
1186 impl ScopeWaker<'_> {
1187 pub fn take_task(&mut self, task: &TaskHandle) -> Option<TaskHandle> {
1188 let task = self.all_tasks.take(task);
1189 if task.is_some() {
1190 self.on_task_removed(0);
1191 }
1192 task
1193 }
1194
1195 pub fn task_did_finish(&mut self, task: &TaskHandle) {
1196 if let Some(task) = self.all_tasks.take(task) {
1197 self.on_task_removed(1);
1198 if !task.is_detached() {
1199 let maybe_waker = self.results.task_did_finish(task);
1200 self.1.0.extend(maybe_waker);
1201 }
1202 }
1203 }
1204
1205 pub fn set_closed_and_drain(
1206 &mut self,
1207 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1208 self.close();
1209 let all_tasks = std::mem::take(&mut self.all_tasks);
1210 let results = self.results.take();
1211 if !all_tasks.is_empty() {
1212 self.on_task_removed(0)
1213 }
1214 let children = self.children.drain();
1215 (all_tasks, results, children)
1216 }
1217
1218 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1219 if self.all_tasks.is_empty() {
1220 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1221 }
1222 }
1223
1224 pub fn wake_wakers_and_mark_pending(&mut self) {
1225 let Self(state, wakers) = self;
1226 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1227 }
1228 }
1229
1230 impl<'a> Deref for ScopeWaker<'a> {
1231 type Target = ConditionGuard<'a, ScopeState>;
1232
1233 fn deref(&self) -> &Self::Target {
1234 &self.0
1235 }
1236 }
1237
1238 impl DerefMut for ScopeWaker<'_> {
1239 fn deref_mut(&mut self) -> &mut Self::Target {
1240 &mut self.0
1241 }
1242 }
1243}
1244
1245struct ScopeInner {
1246 executor: Arc<Executor>,
1247 state: Condition<ScopeState>,
1248 name: String,
1249 instrument_data: Option<Box<dyn Any + Send + Sync>>,
1250}
1251
1252impl Drop for ScopeInner {
1253 fn drop(&mut self) {
1254 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1259 let state = self.state.lock();
1260 if let Some(parent) = &state.parent {
1261 let mut wake_vec = WakeVec::default();
1262 let mut parent_state = parent.lock();
1263 if state.guards() != 0 {
1264 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1265 }
1266 parent_state.remove_child(key);
1267 }
1268 }
1269}
1270
1271impl ScopeHandle {
1272 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1273 super::common::TaskHandle::with_current(|task| match task {
1274 Some(task) => f(task.scope()),
1275 None => f(EHandle::local().global_scope()),
1276 })
1277 }
1278
1279 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1280 self.inner.state.lock()
1281 }
1282
1283 fn downgrade(&self) -> WeakScopeHandle {
1284 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1285 }
1286
1287 #[inline(always)]
1288 pub(crate) fn executor(&self) -> &Arc<Executor> {
1289 &self.inner.executor
1290 }
1291
1292 pub(crate) fn detach(&self, task: &TaskHandle) {
1294 let _maybe_task = {
1295 let mut state = self.lock();
1296 if let Some(task) = state.all_tasks().get(task) {
1297 task.detach();
1298 }
1299 state.results.detach(task)
1300 };
1301 }
1302
1303 pub(crate) unsafe fn abort_task<R>(&self, task: &TaskHandle) -> Option<R> {
1309 let mut state = self.lock();
1310 if state.results.detach(task) {
1311 drop(state);
1312 return unsafe { task.take_result() };
1313 }
1314 state.all_tasks().get(task).and_then(|task| {
1315 if task.abort() {
1316 self.inner.executor.ready_tasks.push(task.clone());
1317 }
1318 unsafe { task.take_result() }
1319 })
1320 }
1321
1322 pub(crate) fn abort_and_detach(&self, task: &TaskHandle) {
1324 let _tasks = {
1325 let mut state = ScopeWaker::from(self.lock());
1326 let maybe_task1 = state.results.detach(task);
1327 let mut maybe_task2 = None;
1328 if state.all_tasks().contains(task) {
1329 match task.abort_and_detach() {
1330 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task),
1331 AbortAndDetachResult::AddToRunQueue => {
1332 self.inner.executor.ready_tasks.push(task.clone());
1333 }
1334 AbortAndDetachResult::Pending => {}
1335 }
1336 }
1337 (maybe_task1, maybe_task2)
1338 };
1339 }
1340
1341 pub(crate) unsafe fn poll_join_result<R>(
1347 &self,
1348 task: &TaskHandle,
1349 cx: &mut Context<'_>,
1350 ) -> Poll<R> {
1351 let task = ready!(self.lock().results.poll_join_result(task, cx));
1352 match unsafe { task.take_result() } {
1353 Some(result) => Poll::Ready(result),
1354 None => {
1355 Poll::Pending
1357 }
1358 }
1359 }
1360
1361 pub(crate) unsafe fn try_poll_join_result<R>(
1368 &self,
1369 task: &TaskHandle,
1370 cx: &mut Context<'_>,
1371 ) -> Poll<Result<R, JoinError>> {
1372 let task = ready!(self.lock().results.poll_join_result(task, cx));
1373 match unsafe { task.take_result() } {
1374 Some(result) => Poll::Ready(Ok(result)),
1375 None => {
1376 if task.is_aborted() {
1377 Poll::Ready(Err(JoinError { _phantom: PhantomData }))
1378 } else {
1379 Poll::Pending
1380 }
1381 }
1382 }
1383 }
1384
1385 pub(crate) unsafe fn poll_aborted<R>(
1387 &self,
1388 task: &TaskHandle,
1389 cx: &mut Context<'_>,
1390 ) -> Poll<Option<R>> {
1391 let task = self.lock().results.poll_join_result(task, cx);
1392 task.map(|task| unsafe { task.take_result() })
1393 }
1394
1395 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1396 let returned_task = self.lock().insert_task(task, for_stream);
1397 returned_task.is_none()
1398 }
1399
1400 pub(super) unsafe fn drop_task_unchecked(&self, task: &TaskHandle) {
1411 let mut state = ScopeWaker::from(self.lock());
1412 let task = state.take_task(task);
1413 if let Some(task) = task {
1414 unsafe { task.drop_future_unchecked() };
1415 }
1416 }
1417
1418 pub(super) fn task_did_finish(&self, task: &TaskHandle) {
1419 let mut state = ScopeWaker::from(self.lock());
1420 state.task_did_finish(task);
1421 }
1422
1423 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1426 let mut scopes = vec![self.clone()];
1427 while let Some(scope) = scopes.pop() {
1428 let mut scope_waker = ScopeWaker::from(scope.lock());
1429 if callback(&mut scope_waker) {
1430 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1431 }
1432 }
1433 }
1434
1435 fn acquire_cancel_guard(&self) {
1436 self.lock().acquire_cancel_guard(1)
1437 }
1438
1439 pub(crate) fn release_cancel_guard(&self) {
1440 let mut wake_vec = WakeVec::default();
1441 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1442 }
1443
1444 fn cancel_all_tasks(&self) {
1446 self.visit_scopes_locked(|state| {
1447 match state.status() {
1448 Status::Active => {
1449 if state.guards() == 0 {
1450 state.abort_tasks_and_mark_finished();
1451 } else {
1452 state.wake_wakers_and_mark_pending();
1453 }
1454 true
1455 }
1456 Status::PendingCancellation => {
1457 true
1461 }
1462 Status::Finished => {
1463 false
1465 }
1466 }
1467 });
1468 }
1469
1470 fn abort_all_tasks(&self) {
1472 self.visit_scopes_locked(|state| match state.status() {
1473 Status::Active | Status::PendingCancellation => {
1474 state.abort_tasks_and_mark_finished();
1475 true
1476 }
1477 Status::Finished => false,
1478 });
1479 }
1480
1481 pub(super) fn drop_all_tasks(&self) {
1488 let mut scopes = vec![self.clone()];
1489 while let Some(scope) = scopes.pop() {
1490 let (tasks, join_results) = {
1491 let mut state = ScopeWaker::from(scope.lock());
1492 let (tasks, join_results, children) = state.set_closed_and_drain();
1493 scopes.extend(children.filter_map(|child| child.upgrade()));
1494 (tasks, join_results)
1495 };
1496 for task in tasks {
1498 task.try_drop().expect("Expected drop to succeed");
1499 }
1500 std::mem::drop(join_results);
1501 }
1502 }
1503}
1504
1505#[repr(transparent)]
1507struct PtrKey;
1508
1509impl Borrow<PtrKey> for WeakScopeHandle {
1510 fn borrow(&self) -> &PtrKey {
1511 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1513 }
1514}
1515
1516impl PartialEq for PtrKey {
1517 fn eq(&self, other: &Self) -> bool {
1518 std::ptr::eq(self, other)
1519 }
1520}
1521
1522impl Eq for PtrKey {}
1523
1524impl hash::Hash for PtrKey {
1525 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1526 (self as *const PtrKey).hash(state);
1527 }
1528}
1529
1530#[derive(Default)]
1531struct JoinResults(HashMap<TaskHandle, JoinResult>);
1532
1533trait Results: Send + Sync + 'static {
1534 fn can_spawn(&self) -> bool;
1536
1537 fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1539
1540 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1542
1543 fn detach(&mut self, task: &TaskHandle) -> bool;
1546
1547 fn take(&mut self) -> Box<dyn Any>;
1549
1550 #[cfg(test)]
1552 fn is_empty(&self) -> bool;
1553}
1554
1555impl Results for JoinResults {
1556 fn can_spawn(&self) -> bool {
1557 true
1558 }
1559
1560 fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1561 match self.0.entry(task.clone()) {
1562 Entry::Occupied(mut o) => match o.get_mut() {
1563 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1564 JoinResult::Ready => {
1565 o.remove();
1566 return Poll::Ready(task.clone());
1567 }
1568 },
1569 Entry::Vacant(v) => {
1570 v.insert(JoinResult::Waker(cx.waker().clone()));
1571 }
1572 }
1573 Poll::Pending
1574 }
1575
1576 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1577 match self.0.entry(task) {
1578 Entry::Occupied(mut o) => {
1579 let JoinResult::Waker(waker) = std::mem::replace(o.get_mut(), JoinResult::Ready)
1580 else {
1581 unreachable!()
1585 };
1586 Some(waker)
1587 }
1588 Entry::Vacant(v) => {
1589 v.insert(JoinResult::Ready);
1590 None
1591 }
1592 }
1593 }
1594
1595 fn detach(&mut self, task: &TaskHandle) -> bool {
1596 matches!(self.0.remove(task), Some(JoinResult::Ready))
1597 }
1598
1599 fn take(&mut self) -> Box<dyn Any> {
1600 Box::new(Self(std::mem::take(&mut self.0)))
1601 }
1602
1603 #[cfg(test)]
1604 fn is_empty(&self) -> bool {
1605 self.0.is_empty()
1606 }
1607}
1608
1609#[derive(Default)]
1610struct ResultsStream<R> {
1611 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1612}
1613
1614struct ResultsStreamInner<R> {
1615 results: Vec<R>,
1616 waker: Option<Waker>,
1617}
1618
1619impl<R> Default for ResultsStreamInner<R> {
1620 fn default() -> Self {
1621 Self { results: Vec::new(), waker: None }
1622 }
1623}
1624
1625impl<R: Send + 'static> Results for ResultsStream<R> {
1626 fn can_spawn(&self) -> bool {
1627 false
1628 }
1629
1630 fn poll_join_result(&mut self, _task: &TaskHandle, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1631 Poll::Pending
1632 }
1633
1634 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1635 let mut inner = self.inner.lock();
1636 inner.results.extend(unsafe { task.take_result() });
1639 inner.waker.take()
1640 }
1641
1642 fn detach(&mut self, _task: &TaskHandle) -> bool {
1643 false
1644 }
1645
1646 fn take(&mut self) -> Box<dyn Any> {
1647 Box::new(std::mem::take(&mut self.inner.lock().results))
1648 }
1649
1650 #[cfg(test)]
1651 fn is_empty(&self) -> bool {
1652 false
1653 }
1654}
1655
1656#[cfg(test)]
1657mod tests {
1658 use super::super::super::task::CancelableJoinHandle;
1662 use super::*;
1663 use crate::{
1664 EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1665 yield_now,
1666 };
1667 use fuchsia_sync::{Condvar, Mutex};
1668 use futures::channel::mpsc;
1669 use futures::{FutureExt, StreamExt};
1670 use std::future::pending;
1671 use std::pin::{Pin, pin};
1672 use std::sync::Arc;
1673 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1674 use std::task::{Context, Poll};
1675 use std::time::Duration;
1676
1677 #[derive(Default)]
1678 struct RemoteControlFuture(Mutex<RCFState>);
1679 #[derive(Default)]
1680 struct RCFState {
1681 resolved: bool,
1682 waker: Option<Waker>,
1683 }
1684
1685 impl Future for &RemoteControlFuture {
1686 type Output = ();
1687 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1688 let mut this = self.0.lock();
1689 if this.resolved {
1690 Poll::Ready(())
1691 } else {
1692 this.waker.replace(cx.waker().clone());
1693 Poll::Pending
1694 }
1695 }
1696 }
1697
1698 impl RemoteControlFuture {
1699 fn new() -> Arc<Self> {
1700 Arc::new(Default::default())
1701 }
1702
1703 fn resolve(&self) {
1704 let mut this = self.0.lock();
1705 this.resolved = true;
1706 if let Some(waker) = this.waker.take() {
1707 waker.wake();
1708 }
1709 }
1710
1711 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
1712 let this = Arc::clone(self);
1713 #[allow(clippy::redundant_async_block)] async move {
1715 (&*this).await
1716 }
1717 }
1718 }
1719
1720 #[test]
1721 fn compute_works_on_root_scope() {
1722 let mut executor = TestExecutor::new();
1723 let scope = executor.global_scope();
1724 let mut task = pin!(scope.compute(async { 1 }));
1725 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1726 }
1727
1728 #[test]
1729 fn compute_works_on_new_child() {
1730 let mut executor = TestExecutor::new();
1731 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1732 let mut task = pin!(scope.compute(async { 1 }));
1733 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1734 }
1735
1736 #[test]
1737 fn scope_drop_cancels_tasks() {
1738 let mut executor = TestExecutor::new();
1739 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1740 let mut task = pin!(scope.compute(async { 1 }));
1741 drop(scope);
1742 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1743 }
1744
1745 #[test]
1746 fn tasks_do_not_spawn_on_cancelled_scopes() {
1747 let mut executor = TestExecutor::new();
1748 let scope =
1749 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1750 let handle = scope.to_handle();
1751 let mut cancel = pin!(scope.cancel());
1752 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1753 let mut task = pin!(handle.compute(async { 1 }));
1754 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1755 }
1756
1757 #[test]
1758 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1759 let mut executor = TestExecutor::new();
1760 let scope =
1761 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1762 let handle = scope.to_handle();
1763 let mut close = pin!(scope.cancel());
1764 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1765 let mut task = pin!(handle.compute(async { 1 }));
1766 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1767 }
1768
1769 #[test]
1770 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1771 let mut executor = TestExecutor::new();
1772 let scope = executor.global_scope().new_child();
1773 let handle = scope.to_handle();
1774 handle.spawn(pending());
1775 let mut close = pin!(scope.close());
1776 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1777 let mut task = pin!(handle.compute(async { 1 }));
1778 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1779 }
1780
1781 #[test]
1782 fn spawn_works_on_child_and_grandchild() {
1783 let mut executor = TestExecutor::new();
1784 let scope = executor.global_scope().new_child();
1785 let child = scope.new_child();
1786 let grandchild = child.new_child();
1787 let mut child_task = pin!(child.compute(async { 1 }));
1788 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1789 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1790 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1791 }
1792
1793 #[test]
1794 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1795 let mut executor = TestExecutor::new();
1796 let scope = executor.global_scope().new_child();
1797 let child = scope.new_child();
1798 let grandchild = child.new_child();
1799 let mut child_task = pin!(child.compute(async { 1 }));
1800 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1801 drop(scope);
1802 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1803 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1804 }
1805
1806 #[test]
1807 fn completed_tasks_are_cleaned_up_after_cancel() {
1808 let mut executor = TestExecutor::new();
1809 let scope = executor.global_scope().new_child();
1810
1811 let task1 = scope.spawn(pending::<()>());
1812 let task2 = scope.spawn(async {});
1813 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1814 assert_eq!(scope.lock().all_tasks().len(), 1);
1815
1816 assert_eq!(task1.abort().now_or_never(), None);
1819 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1820
1821 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1822 assert_eq!(scope.lock().all_tasks().len(), 0);
1823 assert!(scope.lock().results.is_empty());
1824 }
1825
1826 #[test]
1827 fn join_emtpy_scope() {
1828 let mut executor = TestExecutor::new();
1829 let scope = executor.global_scope().new_child();
1830 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1831 }
1832
1833 #[test]
1834 fn task_handle_preserves_access_to_result_after_join_begins() {
1835 let mut executor = TestExecutor::new();
1836 let scope = executor.global_scope().new_child();
1837 let mut task = scope.compute(async { 1 });
1838 scope.spawn(async {});
1839 let task2 = scope.spawn(pending::<()>());
1840 let mut join = pin!(scope.join().fuse());
1843 let _ = executor.run_until_stalled(&mut join);
1844 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1845 drop(task2.abort());
1846 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1847 }
1848
1849 #[test]
1850 fn join_blocks_until_task_is_cancelled() {
1851 let mut executor = TestExecutor::new();
1854 let scope = executor.global_scope().new_child();
1855 let outstanding_task = scope.spawn(pending::<()>());
1856 let cancelled_task = scope.spawn(pending::<()>());
1857 assert_eq!(
1858 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1859 Poll::Ready(None)
1860 );
1861 let mut join = pin!(scope.join());
1862 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1863 assert_eq!(
1864 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1865 Poll::Ready(None)
1866 );
1867 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1868 }
1869
1870 #[test]
1871 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1872 let mut executor = TestExecutor::new();
1873 let scope = executor.global_scope().new_child();
1874 scope.spawn(pending::<()>());
1876 let mut join = pin!(scope.join());
1877 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1878 let mut cancel = pin!(join.cancel());
1879 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1880 }
1881
1882 #[test]
1883 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1884 let mut executor = TestExecutor::new();
1885 let scope = executor.global_scope().new_child();
1886 scope.spawn(pending::<()>());
1888 let mut close = pin!(scope.close());
1889 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1890 let mut cancel = pin!(close.cancel());
1891 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1892 }
1893
1894 #[test]
1895 fn join_scope_blocks_until_spawned_task_completes() {
1896 let mut executor = TestExecutor::new();
1897 let scope = executor.global_scope().new_child();
1898 let remote = RemoteControlFuture::new();
1899 let mut task = scope.spawn(remote.as_future());
1900 let mut scope_join = pin!(scope.join());
1901 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1902 remote.resolve();
1903 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1904 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1905 }
1906
1907 #[test]
1908 fn close_scope_blocks_until_spawned_task_completes() {
1909 let mut executor = TestExecutor::new();
1910 let scope = executor.global_scope().new_child();
1911 let remote = RemoteControlFuture::new();
1912 let mut task = scope.spawn(remote.as_future());
1913 let mut scope_close = pin!(scope.close());
1914 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1915 remote.resolve();
1916 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1917 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1918 }
1919
1920 #[test]
1921 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1922 let mut executor = TestExecutor::new();
1923 let scope = executor.global_scope().new_child();
1924 let child = scope.new_child();
1925 let remote = RemoteControlFuture::new();
1926 child.spawn(remote.as_future());
1927 let mut scope_join = pin!(scope.join());
1928 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1929 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1930 child.detach();
1931 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1932 remote.resolve();
1933 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1934 }
1935
1936 #[test]
1937 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1938 let mut executor = TestExecutor::new();
1939 let scope = executor.global_scope().new_child();
1940 let remote = RemoteControlFuture::new();
1941 {
1942 let remote = remote.clone();
1943 scope.spawn(async move {
1944 let child = Scope::new_with_name("child");
1945 child.spawn(async move {
1946 Scope::current().spawn(remote.as_future());
1947 });
1948 child.detach();
1949 });
1950 }
1951 let mut scope_join = pin!(scope.join());
1952 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1953 remote.resolve();
1954 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1955 }
1956
1957 #[test]
1958 fn join_scope_blocks_when_blocked_child_is_detached() {
1959 let mut executor = TestExecutor::new();
1960 let scope = executor.global_scope().new_child();
1961 let child = scope.new_child();
1962 child.spawn(pending());
1963 let mut scope_join = pin!(scope.join());
1964 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1965 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1966 child.detach();
1967 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1968 }
1969
1970 #[test]
1971 fn join_scope_completes_when_blocked_child_is_cancelled() {
1972 let mut executor = TestExecutor::new();
1973 let scope = executor.global_scope().new_child();
1974 let child = scope.new_child();
1975 child.spawn(pending());
1976 let mut scope_join = pin!(scope.join());
1977 {
1978 let mut child_join = pin!(child.join());
1979 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1980 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1981 }
1982 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1983 }
1984
1985 #[test]
1986 fn detached_scope_can_spawn() {
1987 let mut executor = TestExecutor::new();
1988 let scope = executor.global_scope().new_child();
1989 let handle = scope.to_handle();
1990 scope.detach();
1991 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1992 }
1993
1994 #[test]
1995 fn dropped_scope_cannot_spawn() {
1996 let mut executor = TestExecutor::new();
1997 let scope = executor.global_scope().new_child();
1998 let handle = scope.to_handle();
1999 drop(scope);
2000 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2001 }
2002
2003 #[test]
2004 fn dropped_scope_with_running_task_cannot_spawn() {
2005 let mut executor = TestExecutor::new();
2006 let scope = executor.global_scope().new_child();
2007 let handle = scope.to_handle();
2008 let _running_task = handle.spawn(pending::<()>());
2009 drop(scope);
2010 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2011 }
2012
2013 #[test]
2014 fn joined_scope_cannot_spawn() {
2015 let mut executor = TestExecutor::new();
2016 let scope = executor.global_scope().new_child();
2017 let handle = scope.to_handle();
2018 let mut scope_join = pin!(scope.join());
2019 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2020 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2021 }
2022
2023 #[test]
2024 fn joining_scope_with_running_task_can_spawn() {
2025 let mut executor = TestExecutor::new();
2026 let scope = executor.global_scope().new_child();
2027 let handle = scope.to_handle();
2028 let _running_task = handle.spawn(pending::<()>());
2029 let mut scope_join = pin!(scope.join());
2030 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2031 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2032 }
2033
2034 #[test]
2035 fn joined_scope_child_cannot_spawn() {
2036 let mut executor = TestExecutor::new();
2037 let scope = executor.global_scope().new_child();
2038 let handle = scope.to_handle();
2039 let child_before_join = scope.new_child();
2040 assert_eq!(
2041 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2042 Poll::Ready(1)
2043 );
2044 let mut scope_join = pin!(scope.join());
2045 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2046 let child_after_join = handle.new_child();
2047 let grandchild_after_join = child_before_join.new_child();
2048 assert_eq!(
2049 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2050 Poll::Pending
2051 );
2052 assert_eq!(
2053 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2054 Poll::Pending
2055 );
2056 assert_eq!(
2057 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2058 Poll::Pending
2059 );
2060 }
2061
2062 #[test]
2063 fn closed_scope_child_cannot_spawn() {
2064 let mut executor = TestExecutor::new();
2065 let scope = executor.global_scope().new_child();
2066 let handle = scope.to_handle();
2067 let child_before_close = scope.new_child();
2068 assert_eq!(
2069 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2070 Poll::Ready(1)
2071 );
2072 let mut scope_close = pin!(scope.close());
2073 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2074 let child_after_close = handle.new_child();
2075 let grandchild_after_close = child_before_close.new_child();
2076 assert_eq!(
2077 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2078 Poll::Pending
2079 );
2080 assert_eq!(
2081 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2082 Poll::Pending
2083 );
2084 assert_eq!(
2085 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2086 Poll::Pending
2087 );
2088 }
2089
2090 #[test]
2091 fn can_join_child_first() {
2092 let mut executor = TestExecutor::new();
2093 let scope = executor.global_scope().new_child();
2094 let child = scope.new_child();
2095 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2096 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2097 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2098 }
2099
2100 #[test]
2101 fn can_join_parent_first() {
2102 let mut executor = TestExecutor::new();
2103 let scope = executor.global_scope().new_child();
2104 let child = scope.new_child();
2105 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2106 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2107 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2108 }
2109
2110 #[test]
2111 fn task_in_parent_scope_can_join_child() {
2112 let mut executor = TestExecutor::new();
2113 let scope = executor.global_scope().new_child();
2114 let child = scope.new_child();
2115 let remote = RemoteControlFuture::new();
2116 child.spawn(remote.as_future());
2117 scope.spawn(async move { child.join().await });
2118 let mut join = pin!(scope.join());
2119 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2120 remote.resolve();
2121 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2122 }
2123
2124 #[test]
2125 fn join_completes_while_completed_task_handle_is_held() {
2126 let mut executor = TestExecutor::new();
2127 let scope = executor.global_scope().new_child();
2128 let mut task = scope.compute(async { 1 });
2129 scope.spawn(async {});
2130 let mut join = pin!(scope.join());
2131 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2132 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2133 }
2134
2135 #[test]
2136 fn cancel_completes_while_task_holds_handle() {
2137 let mut executor = TestExecutor::new();
2138 let scope = executor.global_scope().new_child();
2139 let handle = scope.to_handle();
2140 let mut task = scope.compute(async move {
2141 loop {
2142 pending::<()>().await; handle.spawn(async {});
2144 }
2145 });
2146
2147 let mut join = pin!(scope.join());
2149 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2150
2151 let mut cancel = pin!(join.cancel());
2152 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2153 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2154 }
2155
2156 #[test]
2157 fn cancel_from_handle_inside_task() {
2158 let mut executor = TestExecutor::new();
2159 let scope = executor.global_scope().new_child();
2160 {
2161 scope.spawn(pending::<()>());
2163
2164 let mut no_tasks = pin!(scope.on_no_tasks());
2165 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2166
2167 let handle = scope.to_handle();
2168 scope.spawn(async move {
2169 handle.cancel().await;
2170 panic!("cancel() should never complete");
2171 });
2172
2173 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2174 }
2175 assert_eq!(scope.join().now_or_never(), Some(()));
2176 }
2177
2178 #[test]
2179 fn can_spawn_from_non_executor_thread() {
2180 let mut executor = TestExecutor::new();
2181 let scope = executor.global_scope().clone();
2182 let done = Arc::new(AtomicBool::new(false));
2183 let done_clone = done.clone();
2184 let _ = std::thread::spawn(move || {
2185 scope.spawn(async move {
2186 done_clone.store(true, Ordering::Relaxed);
2187 })
2188 })
2189 .join();
2190 let _ = executor.run_until_stalled(&mut pending::<()>());
2191 assert!(done.load(Ordering::Relaxed));
2192 }
2193
2194 #[test]
2195 fn scope_tree() {
2196 let mut executor = TestExecutor::new();
2202 let a = executor.global_scope().new_child();
2203 let b = a.new_child();
2204 let c = b.new_child();
2205 let d = b.new_child();
2206 let a_remote = RemoteControlFuture::new();
2207 let c_remote = RemoteControlFuture::new();
2208 let d_remote = RemoteControlFuture::new();
2209 a.spawn(a_remote.as_future());
2210 c.spawn(c_remote.as_future());
2211 d.spawn(d_remote.as_future());
2212 let mut a_join = pin!(a.join());
2213 let mut b_join = pin!(b.join());
2214 let mut d_join = pin!(d.join());
2215 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2216 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2217 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2218 d_remote.resolve();
2219 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2220 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2221 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2222 c_remote.resolve();
2223 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2224 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2225 a_remote.resolve();
2226 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2227 let mut c_join = pin!(c.join());
2228 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2229 }
2230
2231 #[test]
2232 fn wake_all_with_active_guard_on_send_executor() {
2233 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2234 let scope = executor.root_scope().new_child();
2235
2236 let (tx, mut rx) = mpsc::unbounded();
2237 let state = Arc::new(AtomicU64::new(0));
2239
2240 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2241
2242 impl Future for PollCounter {
2243 type Output = ();
2244 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2245 let old = self.0.fetch_add(1, Ordering::Relaxed);
2246 if old >> 32 == (old + 1) & u32::MAX as u64 {
2247 let _ = self.1.unbounded_send(());
2248 }
2249 Poll::Pending
2250 }
2251 }
2252
2253 scope.spawn(PollCounter(state.clone(), tx.clone()));
2254 scope.spawn(PollCounter(state.clone(), tx.clone()));
2255
2256 executor.run(async move {
2257 let mut wait_for_poll_count = async |count| {
2258 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2259 if old & u32::MAX as u64 != count {
2260 rx.next().await.unwrap();
2261 }
2262 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2263 };
2264
2265 wait_for_poll_count(2).await;
2267
2268 let mut start_count = 2;
2269 for _ in 0..2 {
2270 scope.wake_all_with_active_guard();
2271
2272 wait_for_poll_count(start_count + 2).await;
2273 start_count += 2;
2274 }
2275
2276 scope.wake_all_with_active_guard();
2278 let done = scope.cancel();
2279
2280 wait_for_poll_count(start_count + 2).await;
2281
2282 done.await;
2283 });
2284 }
2285
2286 #[test]
2287 fn on_no_tasks_race() {
2288 fn sleep_random() {
2289 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2290 }
2291 for _ in 0..2000 {
2292 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2293 let scope = executor.root_scope().new_child();
2294 scope.spawn(async {
2295 sleep_random();
2296 });
2297 executor.run(async move {
2298 sleep_random();
2299 scope.on_no_tasks().await;
2300 });
2301 }
2302 }
2303
2304 #[test]
2305 fn test_detach() {
2306 let mut e = LocalExecutor::default();
2307 e.run_singlethreaded(async {
2308 let counter = Arc::new(AtomicU32::new(0));
2309
2310 {
2311 let counter = counter.clone();
2312 Task::spawn(async move {
2313 for _ in 0..5 {
2314 yield_now().await;
2315 counter.fetch_add(1, Ordering::Relaxed);
2316 }
2317 })
2318 .detach();
2319 }
2320
2321 while counter.load(Ordering::Relaxed) != 5 {
2322 yield_now().await;
2323 }
2324 });
2325
2326 assert!(e.ehandle.root_scope.lock().results.is_empty());
2327 }
2328
2329 #[test]
2330 fn test_cancel() {
2331 let mut e = LocalExecutor::default();
2332 e.run_singlethreaded(async {
2333 let ref_count = Arc::new(());
2334 {
2336 let ref_count = ref_count.clone();
2337 drop(Task::spawn(async move {
2338 let _ref_count = ref_count;
2339 let _: () = std::future::pending().await;
2340 }));
2341 }
2342
2343 while Arc::strong_count(&ref_count) != 1 {
2344 yield_now().await;
2345 }
2346
2347 let task = {
2349 let ref_count = ref_count.clone();
2350 Task::spawn(async move {
2351 let _ref_count = ref_count;
2352 let _: () = std::future::pending().await;
2353 })
2354 };
2355
2356 assert_eq!(task.abort().await, None);
2357 while Arc::strong_count(&ref_count) != 1 {
2358 yield_now().await;
2359 }
2360
2361 let task = {
2363 let ref_count = ref_count.clone();
2364 Task::spawn(async move {
2365 let _ref_count = ref_count;
2366 })
2367 };
2368
2369 while Arc::strong_count(&ref_count) != 1 {
2371 yield_now().await;
2372 }
2373
2374 assert_eq!(task.abort().await, Some(()));
2375 });
2376
2377 assert!(e.ehandle.root_scope.lock().results.is_empty());
2378 }
2379
2380 #[test]
2381 fn test_cancel_waits() {
2382 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2383 let state = Arc::new((Mutex::new(0), Condvar::new()));
2384 let task = {
2385 let state = state.clone();
2386 executor.root_scope().compute(async move {
2387 *state.0.lock() = 1;
2388 state.1.notify_all();
2389 state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2391 std::thread::sleep(std::time::Duration::from_millis(10));
2392 *state.0.lock() = 3;
2393 "foo"
2394 })
2395 };
2396 executor.run(async move {
2397 state.1.wait_while(&mut state.0.lock(), |state| {
2398 if *state == 1 {
2399 *state = 2;
2401 false
2402 } else {
2403 true
2404 }
2405 });
2406 state.1.notify_all();
2407 assert_eq!(task.abort().await, Some("foo"));
2408 assert_eq!(*state.0.lock(), 3);
2410 });
2411 }
2412
2413 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2414 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2415 let running = Arc::new((Mutex::new(false), Condvar::new()));
2416 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2417 let task = {
2418 let running = running.clone();
2419 let can_quit = can_quit.clone();
2420 executor.root_scope().compute(async move {
2421 *running.0.lock() = true;
2422 running.1.notify_all();
2423 {
2424 let mut guard = can_quit.0.lock();
2425 while !*guard {
2426 can_quit.1.wait(&mut guard);
2427 }
2428 }
2429 *running.0.lock() = false;
2430 })
2431 };
2432 executor.run(async move {
2433 {
2434 let mut guard = running.0.lock();
2435 while !*guard {
2436 running.1.wait(&mut guard);
2437 }
2438 }
2439
2440 callback(task);
2441
2442 *can_quit.0.lock() = true;
2443 can_quit.1.notify_all();
2444
2445 let ehandle = EHandle::local();
2446 let scope = ehandle.global_scope();
2447
2448 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2450 Timer::new(std::time::Duration::from_millis(1)).await;
2451 }
2452
2453 assert!(!*running.0.lock());
2454 });
2455 }
2456
2457 #[test]
2458 fn test_dropped_cancel_cleans_up() {
2459 test_clean_up(|task| {
2460 let abort_fut = std::pin::pin!(task.abort());
2461 let waker = std::task::Waker::noop();
2462 assert!(abort_fut.poll(&mut Context::from_waker(waker)).is_pending());
2463 });
2464 }
2465
2466 #[test]
2467 fn test_dropped_task_cleans_up() {
2468 test_clean_up(|task| {
2469 std::mem::drop(task);
2470 });
2471 }
2472
2473 #[test]
2474 fn test_detach_cleans_up() {
2475 test_clean_up(|task| {
2476 task.detach();
2477 });
2478 }
2479
2480 #[test]
2481 fn test_scope_stream() {
2482 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2483 executor.run(async move {
2484 let (stream, handle) = ScopeStream::new();
2485 handle.push(async { 1 });
2486 handle.push(async { 2 });
2487 stream.close();
2488 let results: HashSet<_> = stream.collect().await;
2489 assert_eq!(results, HashSet::from_iter([1, 2]));
2490 });
2491 }
2492
2493 #[test]
2494 fn test_scope_stream_wakes_properly() {
2495 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2496 executor.run(async move {
2497 let (stream, handle) = ScopeStream::new();
2498 handle.push(async {
2499 Timer::new(Duration::from_millis(10)).await;
2500 1
2501 });
2502 handle.push(async {
2503 Timer::new(Duration::from_millis(10)).await;
2504 2
2505 });
2506 stream.close();
2507 let results: HashSet<_> = stream.collect().await;
2508 assert_eq!(results, HashSet::from_iter([1, 2]));
2509 });
2510 }
2511
2512 #[test]
2513 fn test_scope_stream_drops_spawned_tasks() {
2514 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2515 executor.run(async move {
2516 let (stream, handle) = ScopeStream::new();
2517 handle.push(async { 1 });
2518 let _task = stream.compute(async { "foo" });
2519 stream.close();
2520 let results: HashSet<_> = stream.collect().await;
2521 assert_eq!(results, HashSet::from_iter([1]));
2522 });
2523 }
2524
2525 #[test]
2526 fn test_nested_scope_stream() {
2527 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2528 executor.run(async move {
2529 let (mut stream, handle) = ScopeStream::new();
2530 handle.clone().push(async move {
2531 handle.clone().push(async move {
2532 handle.clone().push(async move { 3 });
2533 2
2534 });
2535 1
2536 });
2537 let mut results = HashSet::default();
2538 while let Some(item) = stream.next().await {
2539 results.insert(item);
2540 if results.len() == 3 {
2541 stream.close();
2542 }
2543 }
2544 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2545 });
2546 }
2547
2548 #[test]
2549 fn test_dropping_scope_stream_cancels_all_tasks() {
2550 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2551 executor.run(async move {
2552 let (stream, handle) = ScopeStream::new();
2553 let (tx1, mut rx) = mpsc::unbounded::<()>();
2554 let tx2 = tx1.clone();
2555 handle.push(async move {
2556 let _tx1 = tx1;
2557 let () = pending().await;
2558 });
2559 handle.push(async move {
2560 let _tx2 = tx2;
2561 let () = pending().await;
2562 });
2563 drop(stream);
2564
2565 assert_eq!(rx.next().await, None);
2567 });
2568 }
2569
2570 #[test]
2571 fn test_scope_stream_collect() {
2572 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2573 executor.run(async move {
2574 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2575 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2576
2577 let stream: ScopeStream<_> =
2578 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2579 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2580 });
2581 }
2582
2583 struct DropSignal(Arc<AtomicBool>);
2584
2585 impl Drop for DropSignal {
2586 fn drop(&mut self) {
2587 self.0.store(true, Ordering::SeqCst);
2588 }
2589 }
2590
2591 struct DropChecker(Arc<AtomicBool>);
2592
2593 impl DropChecker {
2594 fn new() -> (Self, DropSignal) {
2595 let inner = Arc::new(AtomicBool::new(false));
2596 (Self(inner.clone()), DropSignal(inner))
2597 }
2598
2599 fn is_dropped(&self) -> bool {
2600 self.0.load(Ordering::SeqCst)
2601 }
2602 }
2603
2604 #[test]
2605 fn child_finished_when_parent_pending() {
2606 let mut executor = LocalExecutor::default();
2607 executor.run_singlethreaded(async {
2608 let scope = Scope::new();
2609 let _guard = scope.active_guard().expect("acquire guard");
2610 let cancel = scope.to_handle().cancel();
2611 let child = scope.new_child();
2612 let (checker, signal) = DropChecker::new();
2613 child.spawn(async move {
2614 let _signal = signal;
2615 futures::future::pending::<()>().await
2616 });
2617 assert!(checker.is_dropped());
2618 assert!(child.active_guard().is_none());
2619 cancel.await;
2620 })
2621 }
2622
2623 #[test]
2624 fn guarded_scopes_observe_closed() {
2625 let mut executor = LocalExecutor::default();
2626 executor.run_singlethreaded(async {
2627 let scope = Scope::new();
2628 let handle = scope.to_handle();
2629 let _guard = scope.active_guard().expect("acquire guard");
2630 handle.close();
2631 let (checker, signal) = DropChecker::new();
2632 handle.spawn(async move {
2633 let _signal = signal;
2634 futures::future::pending::<()>().await
2635 });
2636 assert!(checker.is_dropped());
2637 let (checker, signal) = DropChecker::new();
2638 let cancel = handle.clone().cancel();
2639 handle.spawn(async move {
2640 let _signal = signal;
2641 futures::future::pending::<()>().await
2642 });
2643 assert!(checker.is_dropped());
2644 scope.join().await;
2645 cancel.await;
2646 })
2647 }
2648
2649 #[test]
2650 fn child_guard_holds_parent_cancellation() {
2651 let mut executor = TestExecutor::new();
2652 let scope = executor.global_scope().new_child();
2653 let child = scope.new_child();
2654 let guard = child.active_guard().expect("acquire guard");
2655 scope.spawn(futures::future::pending());
2656 let mut join = pin!(scope.cancel());
2657 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2658 drop(guard);
2659 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2660 }
2661
2662 #[test]
2663 fn active_guard_on_cancel() {
2664 let mut executor = TestExecutor::new();
2665 let scope = executor.global_scope().new_child();
2666 let child1 = scope.new_child();
2667 let child2 = scope.new_child();
2668 let guard = child1.active_guard().expect("acquire guard");
2669 let guard_for_right_scope = guard.clone();
2670 let guard_for_wrong_scope = guard.clone();
2671 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2672 child2.spawn(async move {
2673 guard_for_wrong_scope.on_cancel().await;
2674 });
2675
2676 let handle = scope.to_handle();
2677 let mut join = pin!(scope.join());
2678 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2679 let cancel: Join<_> = handle.cancel();
2680 drop(cancel);
2681 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2682 }
2683
2684 #[test]
2685 fn abort_join() {
2686 let mut executor = TestExecutor::new();
2687 let scope = executor.global_scope().new_child();
2688 let child = scope.new_child();
2689 let _guard = child.active_guard().expect("acquire guard");
2690
2691 let (checker1, signal) = DropChecker::new();
2692 scope.spawn(async move {
2693 let _signal = signal;
2694 futures::future::pending::<()>().await
2695 });
2696 let (checker2, signal) = DropChecker::new();
2697 scope.spawn(async move {
2698 let _signal = signal;
2699 futures::future::pending::<()>().await
2700 });
2701
2702 let mut join = pin!(scope.cancel());
2703 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2704 assert!(!checker1.is_dropped());
2705 assert!(!checker2.is_dropped());
2706
2707 let mut join = join.abort();
2708 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2709 assert!(checker1.is_dropped());
2710 assert!(checker2.is_dropped());
2711 }
2712
2713 #[test]
2714 fn child_without_guard_aborts_immediately_on_cancel() {
2715 let mut executor = TestExecutor::new();
2716 let scope = executor.global_scope().new_child();
2717 let child = scope.new_child();
2718 let guard = scope.active_guard().expect("acquire guard");
2719
2720 let (checker_scope, signal) = DropChecker::new();
2721 scope.spawn(async move {
2722 let _signal = signal;
2723 futures::future::pending::<()>().await
2724 });
2725 let (checker_child, signal) = DropChecker::new();
2726 child.spawn(async move {
2727 let _signal = signal;
2728 futures::future::pending::<()>().await
2729 });
2730
2731 let mut join = pin!(scope.cancel());
2732 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2733 assert!(!checker_scope.is_dropped());
2734 assert!(checker_child.is_dropped());
2735
2736 drop(guard);
2737 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2738 assert!(checker_child.is_dropped());
2739 }
2740
2741 #[test]
2742 fn await_canceled_task_pends_forever() {
2743 let mut executor = TestExecutor::new();
2744 let scope = executor.global_scope().new_child();
2745
2746 let task = scope.spawn(pending::<()>());
2747 let mut main_future = pin!(async move {
2748 drop(scope);
2749 task.await;
2750 });
2751 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Pending,);
2752 }
2753
2754 #[test]
2755 fn await_canceled_abortable_task_finishes_with_error() {
2756 let mut executor = TestExecutor::new();
2757 let scope = executor.global_scope().new_child();
2758
2759 let task = CancelableJoinHandle::from(scope.spawn(pending::<()>()));
2760 let mut main_future = pin!(async move {
2761 drop(scope);
2762 let _ = task.await;
2763 });
2764 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Ready(()),);
2765 }
2766}