1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use core::{error, fmt};
11use fuchsia_sync::Mutex;
12use futures::Stream;
13use pin_project_lite::pin_project;
14use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
15use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
16use std::any::Any;
17use std::borrow::Borrow;
18use std::collections::hash_map::Entry;
19use std::collections::hash_set;
20use std::future::{Future, IntoFuture};
21use std::hash;
22use std::marker::PhantomData;
23use std::mem::{self, ManuallyDrop};
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll, Waker, ready};
28
29#[must_use = "Scopes should be explicitly awaited or cancelled"]
74#[derive(Debug)]
75pub struct Scope {
76 inner: ScopeHandle,
78 }
80
81impl Default for Scope {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl Scope {
88 pub fn new() -> Scope {
97 ScopeHandle::with_current(|handle| handle.new_child())
98 }
99
100 pub fn new_with_name(name: impl Into<String>) -> Scope {
109 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
110 }
111
112 pub fn current() -> ScopeHandle {
120 ScopeHandle::with_current(|handle| handle.clone())
121 }
122
123 pub fn global() -> ScopeHandle {
140 EHandle::local().global_scope().clone()
141 }
142
143 pub fn new_child(&self) -> Scope {
145 self.inner.new_child()
146 }
147
148 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
150 self.inner.new_child_with_name(name.into())
151 }
152
153 pub fn name(&self) -> &str {
155 &self.inner.inner.name
156 }
157
158 pub fn to_handle(&self) -> ScopeHandle {
166 self.inner.clone()
167 }
168
169 pub fn as_handle(&self) -> &ScopeHandle {
176 &self.inner
177 }
178
179 pub fn join(self) -> Join {
188 Join::new(self)
189 }
190
191 pub fn close(self) -> Join {
194 self.inner.close();
195 Join::new(self)
196 }
197
198 pub fn cancel(self) -> Join {
216 self.inner.cancel_all_tasks();
217 Join::new(self)
218 }
219
220 pub fn abort(self) -> impl Future<Output = ()> {
235 self.inner.abort_all_tasks();
236 Join::new(self)
237 }
238
239 pub fn detach(self) {
245 let this = ManuallyDrop::new(self);
248 mem::drop(unsafe { std::ptr::read(&this.inner) });
251 }
252}
253
254impl Drop for Scope {
257 fn drop(&mut self) {
258 self.inner.abort_all_tasks();
267 }
268}
269
270impl IntoFuture for Scope {
271 type Output = ();
272 type IntoFuture = Join;
273 fn into_future(self) -> Self::IntoFuture {
274 self.join()
275 }
276}
277
278impl Deref for Scope {
279 type Target = ScopeHandle;
280 fn deref(&self) -> &Self::Target {
281 &self.inner
282 }
283}
284
285impl Borrow<ScopeHandle> for Scope {
286 fn borrow(&self) -> &ScopeHandle {
287 self
288 }
289}
290
291pin_project! {
292 pub struct Join<S = Scope> {
304 scope: S,
305 #[pin]
306 waker_entry: WakerEntry<ScopeState>,
307 }
308}
309
310impl<S: Borrow<ScopeHandle>> Join<S> {
311 fn new(scope: S) -> Self {
312 let waker_entry = scope.borrow().inner.state.waker_entry();
313 Self { scope, waker_entry }
314 }
315
316 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321 self.scope.borrow().abort_all_tasks();
322 self
323 }
324
325 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330 self.scope.borrow().cancel_all_tasks();
331 self
332 }
333}
334
335impl<S> Future for Join<S>
336where
337 S: Borrow<ScopeHandle>,
338{
339 type Output = ();
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let this = self.project();
342 let mut state = Borrow::borrow(&*this.scope).lock();
343 if state.has_tasks() {
344 state.add_waker(this.waker_entry, cx.waker().clone());
345 Poll::Pending
346 } else {
347 state.mark_finished();
348 Poll::Ready(())
349 }
350 }
351}
352
353pub trait Spawnable {
356 type Output;
358
359 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365 F::Output: Send + 'static,
366{
367 type Output = F::Output;
368
369 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370 scope.new_task(self)
371 }
372}
373
374#[derive(Clone)]
386pub struct ScopeHandle {
387 inner: Arc<ScopeInner>,
389 }
391
392impl ScopeHandle {
393 pub fn new_child(&self) -> Scope {
395 self.new_child_inner(String::new())
396 }
397
398 pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
400 self.inner.instrument_data.as_deref()
401 }
402
403 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
405 self.new_child_inner(name.into())
406 }
407
408 fn new_child_inner(&self, name: String) -> Scope {
409 let mut state = self.lock();
410 let child = ScopeHandle {
411 inner: Arc::new(ScopeInner {
412 executor: self.inner.executor.clone(),
413 state: Condition::new(ScopeState::new_child(
414 self.clone(),
415 &state,
416 JoinResults::default().into(),
417 )),
418
419 instrument_data: self
420 .inner
421 .executor
422 .instrument
423 .as_ref()
424 .map(|value| value.scope_created(&name, Some(self))),
425 name,
426 }),
427 };
428 let weak = child.downgrade();
429 state.insert_child(weak);
430 Scope { inner: child }
431 }
432
433 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
437 let task = future.into_task(self.clone());
438 let task_handle = task.clone();
439 self.insert_task(task, false);
440 JoinHandle::new(self.clone(), task_handle)
441 }
442
443 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
448 let task = self.new_local_task(future);
449 self.insert_task(task.clone(), false);
450 JoinHandle::new(self.clone(), task)
451 }
452
453 pub fn compute<T: Send + 'static>(
458 &self,
459 future: impl Spawnable<Output = T> + Send + 'static,
460 ) -> crate::Task<T> {
461 let task = future.into_task(self.clone());
462 let task_handle = task.clone();
463 self.insert_task(task, false);
464 JoinHandle::new(self.clone(), task_handle).into()
465 }
466
467 pub fn compute_local<T: 'static>(
475 &self,
476 future: impl Future<Output = T> + 'static,
477 ) -> crate::Task<T> {
478 let task = self.new_local_task(future);
479 self.insert_task(task.clone(), false);
480 JoinHandle::new(self.clone(), task).into()
481 }
482
483 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
484 ScopeHandle {
485 inner: Arc::new(ScopeInner {
486 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
487 name: "root".to_string(),
488 instrument_data: executor
489 .instrument
490 .as_ref()
491 .map(|value| value.scope_created("root", None)),
492 executor,
493 }),
494 }
495 }
496
497 pub fn close(&self) {
505 self.lock().close();
506 }
507
508 pub fn cancel(self) -> Join<Self> {
513 self.cancel_all_tasks();
514 Join::new(self)
515 }
516
517 pub fn abort(self) -> impl Future<Output = ()> {
522 self.abort_all_tasks();
523 Join::new(self)
524 }
525
526 #[must_use]
538 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
539 ScopeActiveGuard::new(self)
540 }
541
542 pub fn is_cancelled(&self) -> bool {
545 self.lock().status().is_cancelled()
546 }
547
548 pub async fn on_no_tasks(&self) {
555 self.inner
556 .state
557 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
558 .await;
559 }
560
561 pub async fn on_no_tasks_and_guards(&self) {
564 self.inner
565 .state
566 .when(|state| {
567 if state.has_tasks() || state.guards() > 0 {
568 Poll::Pending
569 } else {
570 Poll::Ready(())
571 }
572 })
573 .await;
574 }
575
576 pub fn wake_all_with_active_guard(&self) {
578 self.lock().wake_all_with_active_guard();
579 }
580
581 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(&self, fut: Fut) -> AtomicFutureHandle<'a>
584 where
585 Fut::Output: Send,
586 {
587 let mut task = AtomicFutureHandle::new(Some(self.clone()), fut);
588 if let Some(instrument) = &self.executor().instrument {
589 instrument.task_created(self, &mut task);
590 }
591 task
592 }
593
594 pub(crate) fn new_local_task<'a>(&self, fut: impl Future + 'a) -> AtomicFutureHandle<'a> {
597 if !self.executor().is_local() {
599 panic!(
600 "Error: called `new_local_task` on multithreaded executor. \
601 Use `spawn` or a `LocalExecutor` instead."
602 );
603 }
604 assert_eq!(
605 self.executor().first_thread_id.get(),
606 Some(&std::thread::current().id()),
607 "Error: called `new_local_task` on a different thread to the executor",
608 );
609
610 unsafe {
613 let mut task = AtomicFutureHandle::new_local(Some(self.clone()), fut);
614 if let Some(instrument) = &self.executor().instrument {
615 instrument.task_created(self, &mut task);
616 }
617 task
618 }
619 }
620}
621
622impl fmt::Debug for ScopeHandle {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 f.debug_struct("Scope").field("name", &self.inner.name).finish()
625 }
626}
627
628pub struct ScopeStream<R> {
638 inner: ScopeHandle,
639 stream: Arc<Mutex<ResultsStreamInner<R>>>,
640}
641
642impl<R: Send + 'static> ScopeStream<R> {
643 pub fn new() -> (Self, ScopeStreamHandle<R>) {
652 Self::new_inner(String::new())
653 }
654
655 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
664 Self::new_inner(name.into())
665 }
666
667 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
668 let this = ScopeHandle::with_current(|handle| {
669 let mut state = handle.lock();
670 let stream = Arc::default();
671 let child = ScopeHandle {
672 inner: Arc::new(ScopeInner {
673 executor: handle.executor().clone(),
674 state: Condition::new(ScopeState::new_child(
675 handle.clone(),
676 &state,
677 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
678 )),
679 instrument_data: handle
680 .executor()
681 .instrument
682 .as_ref()
683 .map(|value| value.scope_created(&name, Some(handle))),
684 name,
685 }),
686 };
687 let weak = child.downgrade();
688 state.insert_child(weak);
689 ScopeStream { inner: child, stream }
690 });
691 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
692 (this, handle)
693 }
694}
695
696impl<R> Drop for ScopeStream<R> {
697 fn drop(&mut self) {
698 self.inner.abort_all_tasks();
707 }
708}
709
710impl<R: Send + 'static> Stream for ScopeStream<R> {
711 type Item = R;
712
713 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
714 let mut stream_inner = self.stream.lock();
715 match stream_inner.results.pop() {
716 Some(result) => Poll::Ready(Some(result)),
717 None => {
718 drop(stream_inner);
721 let state = self.inner.lock();
722 let mut stream_inner = self.stream.lock();
723 match stream_inner.results.pop() {
724 Some(result) => Poll::Ready(Some(result)),
725 None => {
726 if state.has_tasks() {
727 stream_inner.waker = Some(cx.waker().clone());
728 Poll::Pending
729 } else {
730 Poll::Ready(None)
731 }
732 }
733 }
734 }
735 }
736 }
737}
738
739impl<R> Deref for ScopeStream<R> {
740 type Target = ScopeHandle;
741 fn deref(&self) -> &Self::Target {
742 &self.inner
743 }
744}
745
746impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
747 fn borrow(&self) -> &ScopeHandle {
748 self
749 }
750}
751
752impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
753 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
754 let (stream, handle) = ScopeStream::new();
755 for fut in iter {
756 handle.push(fut);
757 }
758 stream.close();
759 stream
760 }
761}
762
763#[derive(Clone)]
764pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
765
766impl<R: Send> ScopeStreamHandle<R> {
767 pub fn push(&self, future: impl Spawnable<Output = R>) {
768 self.0.insert_task(future.into_task(self.0.clone()), true);
769 }
770}
771
772#[derive(Debug)]
782#[must_use]
783pub struct ScopeActiveGuard(ScopeHandle);
784
785impl Deref for ScopeActiveGuard {
786 type Target = ScopeHandle;
787 fn deref(&self) -> &Self::Target {
788 &self.0
789 }
790}
791
792impl Drop for ScopeActiveGuard {
793 fn drop(&mut self) {
794 let Self(scope) = self;
795 scope.release_cancel_guard();
796 }
797}
798
799impl Clone for ScopeActiveGuard {
800 fn clone(&self) -> Self {
801 self.0.lock().acquire_cancel_guard(1);
802 Self(self.0.clone())
803 }
804}
805
806impl ScopeActiveGuard {
807 pub fn as_handle(&self) -> &ScopeHandle {
809 &self.0
810 }
811
812 pub fn to_handle(&self) -> ScopeHandle {
814 self.0.clone()
815 }
816
817 pub async fn on_cancel(&self) {
823 self.0
824 .inner
825 .state
826 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
827 .await
828 }
829
830 fn new(scope: &ScopeHandle) -> Option<Self> {
831 if scope.lock().acquire_cancel_guard_if_not_finished() {
832 Some(Self(scope.clone()))
833 } else {
834 None
835 }
836 }
837}
838
839pub struct JoinError {
841 _phantom: PhantomData<()>,
842}
843
844impl fmt::Debug for JoinError {
845 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
846 f.debug_tuple("JoinError").finish()
847 }
848}
849
850impl fmt::Display for JoinError {
851 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
852 write!(f, "JoinError: a task failed to execute to completion")
853 }
854}
855
856impl error::Error for JoinError {}
857
858#[derive(Clone)]
864struct WeakScopeHandle {
865 inner: Weak<ScopeInner>,
866}
867
868impl WeakScopeHandle {
869 pub fn upgrade(&self) -> Option<ScopeHandle> {
871 self.inner.upgrade().map(|inner| ScopeHandle { inner })
872 }
873}
874
875impl hash::Hash for WeakScopeHandle {
876 fn hash<H: hash::Hasher>(&self, state: &mut H) {
877 Weak::as_ptr(&self.inner).hash(state);
878 }
879}
880
881impl PartialEq for WeakScopeHandle {
882 fn eq(&self, other: &Self) -> bool {
883 Weak::ptr_eq(&self.inner, &other.inner)
884 }
885}
886
887impl Eq for WeakScopeHandle {
888 }
891
892mod state {
895 use super::*;
896
897 pub struct ScopeState {
898 pub parent: Option<ScopeHandle>,
899 children: HashSet<WeakScopeHandle>,
901 all_tasks: HashSet<TaskHandle>,
902 subscopes_with_tasks: u32,
906 can_spawn: bool,
907 guards: u32,
908 status: Status,
909 pub results: Box<dyn Results>,
911 }
912
913 pub enum JoinResult {
914 Waker(Waker),
915 Ready,
916 }
917
918 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
920 pub enum Status {
921 #[default]
922 Active,
924 PendingCancellation,
927 Finished,
930 }
931
932 impl Status {
933 pub fn is_cancelled(&self) -> bool {
935 match self {
936 Self::Active => false,
937 Self::PendingCancellation | Self::Finished => true,
938 }
939 }
940 }
941
942 impl ScopeState {
943 pub fn new_root(results: Box<impl Results>) -> Self {
944 Self {
945 parent: None,
946 children: Default::default(),
947 all_tasks: Default::default(),
948 subscopes_with_tasks: 0,
949 can_spawn: true,
950 guards: 0,
951 status: Default::default(),
952 results,
953 }
954 }
955
956 pub fn new_child(
957 parent_handle: ScopeHandle,
958 parent_state: &Self,
959 results: Box<impl Results>,
960 ) -> Self {
961 let (status, can_spawn) = match parent_state.status {
962 Status::Active => (Status::Active, parent_state.can_spawn),
963 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
964 };
965 Self {
966 parent: Some(parent_handle),
967 children: Default::default(),
968 all_tasks: Default::default(),
969 subscopes_with_tasks: 0,
970 can_spawn,
971 guards: 0,
972 status,
973 results,
974 }
975 }
976 }
977
978 impl ScopeState {
979 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
980 &self.all_tasks
981 }
982
983 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
986 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
987 return Some(task);
988 }
989 if self.all_tasks.is_empty() && !self.register_first_task() {
990 return Some(task);
991 }
992 task.wake();
993 assert!(self.all_tasks.insert(task), "Task must be new");
994 None
995 }
996
997 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
998 &self.children
999 }
1000
1001 pub fn insert_child(&mut self, child: WeakScopeHandle) {
1002 self.children.insert(child);
1003 }
1004
1005 pub fn remove_child(&mut self, child: &PtrKey) {
1006 let found = self.children.remove(child);
1007 assert!(found || self.children.is_empty());
1010 }
1011
1012 pub fn status(&self) -> Status {
1013 self.status
1014 }
1015
1016 pub fn guards(&self) -> u32 {
1017 self.guards
1018 }
1019
1020 pub fn close(&mut self) {
1021 self.can_spawn = false;
1022 }
1023
1024 pub fn mark_finished(&mut self) {
1025 self.can_spawn = false;
1026 self.status = Status::Finished;
1027 }
1028
1029 pub fn has_tasks(&self) -> bool {
1030 self.subscopes_with_tasks > 0
1031 }
1032
1033 pub fn wake_all_with_active_guard(&mut self) {
1034 let mut count = 0;
1035 for task in &self.all_tasks {
1036 if task.wake_with_active_guard() {
1037 count += 1;
1038 }
1039 }
1040 self.acquire_cancel_guard(count);
1041 }
1042
1043 pub fn abort_tasks_and_mark_finished(&mut self) {
1044 for task in self.all_tasks() {
1045 if task.abort() {
1046 task.scope().executor().ready_tasks.push(task.clone());
1047 }
1048 }
1051 self.mark_finished();
1052 }
1053
1054 pub fn wake_wakers_and_mark_pending(
1055 this: &mut ConditionGuard<'_, ScopeState>,
1056 wakers: &mut Vec<Waker>,
1057 ) {
1058 wakers.extend(this.drain_wakers());
1059 this.status = Status::PendingCancellation;
1060 }
1061
1062 #[must_use]
1066 fn register_first_task(&mut self) -> bool {
1067 if !self.can_spawn {
1068 return false;
1069 }
1070 let can_spawn = match &self.parent {
1071 Some(parent) => {
1072 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1075 }
1076 None => true,
1077 };
1078 if can_spawn {
1079 self.subscopes_with_tasks += 1;
1080 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1081 };
1082 can_spawn
1083 }
1084
1085 fn on_last_task_removed(
1086 this: &mut ConditionGuard<'_, ScopeState>,
1087 num_wakers_hint: usize,
1088 wakers: &mut Vec<Waker>,
1089 ) {
1090 debug_assert!(this.subscopes_with_tasks > 0);
1091 this.subscopes_with_tasks -= 1;
1092 if this.subscopes_with_tasks > 0 {
1093 wakers.reserve(num_wakers_hint);
1094 return;
1095 }
1096
1097 match &this.parent {
1098 Some(parent) => {
1099 Self::on_last_task_removed(
1100 &mut parent.lock(),
1101 num_wakers_hint + this.waker_count(),
1102 wakers,
1103 );
1104 }
1105 None => wakers.reserve(num_wakers_hint),
1106 };
1107 wakers.extend(this.drain_wakers());
1108 }
1109
1110 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1114 match self.status {
1115 Status::Active | Status::PendingCancellation => {
1116 self.acquire_cancel_guard(1);
1117 true
1118 }
1119 Status::Finished => false,
1120 }
1121 }
1122
1123 pub fn acquire_cancel_guard(&mut self, count: u32) {
1124 if count == 0 {
1125 return;
1126 }
1127 if self.guards == 0
1128 && let Some(parent) = self.parent.as_ref()
1129 {
1130 parent.acquire_cancel_guard();
1131 }
1132 self.guards += count;
1133 }
1134
1135 pub fn release_cancel_guard(
1136 this: &mut ConditionGuard<'_, Self>,
1137 wake_vec: &mut WakeVec,
1138 mut waker_count: usize,
1139 ) {
1140 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1141 if this.guards == 0 {
1142 waker_count += this.waker_count();
1143 this.on_zero_guards(wake_vec, waker_count);
1144 wake_vec.0.extend(this.drain_wakers())
1145 } else {
1146 wake_vec.0.reserve_exact(waker_count);
1147 }
1148 }
1149
1150 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1151 match self.status {
1152 Status::Active => {}
1153 Status::PendingCancellation => {
1154 self.abort_tasks_and_mark_finished();
1155 }
1156 Status::Finished => {}
1159 }
1160 if let Some(parent) = &self.parent {
1161 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1162 }
1163 }
1164 }
1165
1166 #[derive(Default)]
1167 pub struct WakeVec(Vec<Waker>);
1168
1169 impl Drop for WakeVec {
1170 fn drop(&mut self) {
1171 for waker in self.0.drain(..) {
1172 waker.wake();
1173 }
1174 }
1175 }
1176
1177 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1179
1180 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1181 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1182 Self(value, WakeVec::default())
1183 }
1184 }
1185
1186 impl ScopeWaker<'_> {
1187 pub fn take_task(&mut self, task: &TaskHandle) -> Option<TaskHandle> {
1188 let task = self.all_tasks.take(task);
1189 if task.is_some() {
1190 self.on_task_removed(0);
1191 }
1192 task
1193 }
1194
1195 pub fn task_did_finish(&mut self, task: &TaskHandle) {
1196 if let Some(task) = self.all_tasks.take(task) {
1197 self.on_task_removed(1);
1198 if !task.is_detached() {
1199 let maybe_waker = self.results.task_did_finish(task);
1200 self.1.0.extend(maybe_waker);
1201 }
1202 }
1203 }
1204
1205 pub fn set_closed_and_drain(
1206 &mut self,
1207 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1208 self.close();
1209 let all_tasks = std::mem::take(&mut self.all_tasks);
1210 let results = self.results.take();
1211 if !all_tasks.is_empty() {
1212 self.on_task_removed(0)
1213 }
1214 let children = self.children.drain();
1215 (all_tasks, results, children)
1216 }
1217
1218 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1219 if self.all_tasks.is_empty() {
1220 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1221 }
1222 }
1223
1224 pub fn wake_wakers_and_mark_pending(&mut self) {
1225 let Self(state, wakers) = self;
1226 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1227 }
1228 }
1229
1230 impl<'a> Deref for ScopeWaker<'a> {
1231 type Target = ConditionGuard<'a, ScopeState>;
1232
1233 fn deref(&self) -> &Self::Target {
1234 &self.0
1235 }
1236 }
1237
1238 impl DerefMut for ScopeWaker<'_> {
1239 fn deref_mut(&mut self) -> &mut Self::Target {
1240 &mut self.0
1241 }
1242 }
1243}
1244
1245struct ScopeInner {
1246 executor: Arc<Executor>,
1247 state: Condition<ScopeState>,
1248 name: String,
1249 instrument_data: Option<Box<dyn Any + Send + Sync>>,
1250}
1251
1252impl Drop for ScopeInner {
1253 fn drop(&mut self) {
1254 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1259 let state = self.state.lock();
1260 if let Some(parent) = &state.parent {
1261 let mut wake_vec = WakeVec::default();
1262 let mut parent_state = parent.lock();
1263 if state.guards() != 0 {
1264 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1265 }
1266 parent_state.remove_child(key);
1267 }
1268 }
1269}
1270
1271impl ScopeHandle {
1272 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1273 super::common::TaskHandle::with_current(|task| match task {
1274 Some(task) => f(task.scope()),
1275 None => f(EHandle::local().global_scope()),
1276 })
1277 }
1278
1279 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1280 self.inner.state.lock()
1281 }
1282
1283 fn downgrade(&self) -> WeakScopeHandle {
1284 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1285 }
1286
1287 #[inline(always)]
1288 pub(crate) fn executor(&self) -> &Arc<Executor> {
1289 &self.inner.executor
1290 }
1291
1292 pub(crate) fn detach(&self, task: &TaskHandle) {
1294 let _maybe_task = {
1295 let mut state = self.lock();
1296 if let Some(task) = state.all_tasks().get(task) {
1297 task.detach();
1298 }
1299 state.results.detach(task)
1300 };
1301 }
1302
1303 pub(crate) unsafe fn abort_task<R>(&self, task: &TaskHandle) -> Option<R> {
1309 let mut state = self.lock();
1310 if state.results.detach(task) {
1311 drop(state);
1312 return unsafe { task.take_result() };
1313 }
1314 state.all_tasks().get(task).and_then(|task| {
1315 if task.abort() {
1316 self.inner.executor.ready_tasks.push(task.clone());
1317 }
1318 unsafe { task.take_result() }
1319 })
1320 }
1321
1322 pub(crate) fn abort_and_detach(&self, task: &TaskHandle) {
1324 let _tasks = {
1325 let mut state = ScopeWaker::from(self.lock());
1326 let maybe_task1 = state.results.detach(task);
1327 let mut maybe_task2 = None;
1328 if state.all_tasks().contains(task) {
1329 match task.abort_and_detach() {
1330 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task),
1331 AbortAndDetachResult::AddToRunQueue => {
1332 self.inner.executor.ready_tasks.push(task.clone());
1333 }
1334 AbortAndDetachResult::Pending => {}
1335 }
1336 }
1337 (maybe_task1, maybe_task2)
1338 };
1339 }
1340
1341 pub(crate) unsafe fn poll_join_result<R>(
1347 &self,
1348 task: &TaskHandle,
1349 cx: &mut Context<'_>,
1350 ) -> Poll<R> {
1351 let task = ready!(self.lock().results.poll_join_result(task, cx));
1352 match unsafe { task.take_result() } {
1353 Some(result) => Poll::Ready(result),
1354 None => {
1355 Poll::Pending
1357 }
1358 }
1359 }
1360
1361 pub(crate) unsafe fn try_poll_join_result<R>(
1368 &self,
1369 task: &TaskHandle,
1370 cx: &mut Context<'_>,
1371 ) -> Poll<Result<R, JoinError>> {
1372 let task = ready!(self.lock().results.poll_join_result(task, cx));
1373 match unsafe { task.take_result() } {
1374 Some(result) => Poll::Ready(Ok(result)),
1375 None => {
1376 if task.is_aborted() {
1377 Poll::Ready(Err(JoinError { _phantom: PhantomData }))
1378 } else {
1379 Poll::Pending
1380 }
1381 }
1382 }
1383 }
1384
1385 pub(crate) unsafe fn poll_aborted<R>(
1387 &self,
1388 task: &TaskHandle,
1389 cx: &mut Context<'_>,
1390 ) -> Poll<Option<R>> {
1391 let task = self.lock().results.poll_join_result(task, cx);
1392 task.map(|task| unsafe { task.take_result() })
1393 }
1394
1395 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1397 let returned_task = self.lock().insert_task(task, for_stream);
1398 if let Some(t) = returned_task {
1399 t.try_drop().unwrap();
1404 false
1405 } else {
1406 true
1407 }
1408 }
1409
1410 pub(super) unsafe fn drop_task_unchecked(&self, task: &TaskHandle) {
1421 let mut state = ScopeWaker::from(self.lock());
1422 let task = state.take_task(task);
1423 if let Some(task) = task {
1424 unsafe { task.drop_future_unchecked() };
1425 }
1426 }
1427
1428 pub(super) fn task_did_finish(&self, task: &TaskHandle) {
1429 let mut state = ScopeWaker::from(self.lock());
1430 state.task_did_finish(task);
1431 }
1432
1433 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1436 let mut scopes = vec![self.clone()];
1437 while let Some(scope) = scopes.pop() {
1438 let mut scope_waker = ScopeWaker::from(scope.lock());
1439 if callback(&mut scope_waker) {
1440 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1441 }
1442 }
1443 }
1444
1445 fn acquire_cancel_guard(&self) {
1446 self.lock().acquire_cancel_guard(1)
1447 }
1448
1449 pub(crate) fn release_cancel_guard(&self) {
1450 let mut wake_vec = WakeVec::default();
1451 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1452 }
1453
1454 fn cancel_all_tasks(&self) {
1456 self.visit_scopes_locked(|state| {
1457 match state.status() {
1458 Status::Active => {
1459 if state.guards() == 0 {
1460 state.abort_tasks_and_mark_finished();
1461 } else {
1462 state.wake_wakers_and_mark_pending();
1463 }
1464 true
1465 }
1466 Status::PendingCancellation => {
1467 true
1471 }
1472 Status::Finished => {
1473 false
1475 }
1476 }
1477 });
1478 }
1479
1480 fn abort_all_tasks(&self) {
1482 self.visit_scopes_locked(|state| match state.status() {
1483 Status::Active | Status::PendingCancellation => {
1484 state.abort_tasks_and_mark_finished();
1485 true
1486 }
1487 Status::Finished => false,
1488 });
1489 }
1490
1491 pub(super) fn drop_all_tasks(&self) {
1498 let mut scopes = vec![self.clone()];
1499 while let Some(scope) = scopes.pop() {
1500 let (tasks, join_results) = {
1501 let mut state = ScopeWaker::from(scope.lock());
1502 let (tasks, join_results, children) = state.set_closed_and_drain();
1503 scopes.extend(children.filter_map(|child| child.upgrade()));
1504 (tasks, join_results)
1505 };
1506 for task in tasks {
1508 task.try_drop().expect("Expected drop to succeed");
1509 }
1510 std::mem::drop(join_results);
1511 }
1512 }
1513}
1514
1515#[repr(transparent)]
1517struct PtrKey;
1518
1519impl Borrow<PtrKey> for WeakScopeHandle {
1520 fn borrow(&self) -> &PtrKey {
1521 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1523 }
1524}
1525
1526impl PartialEq for PtrKey {
1527 fn eq(&self, other: &Self) -> bool {
1528 std::ptr::eq(self, other)
1529 }
1530}
1531
1532impl Eq for PtrKey {}
1533
1534impl hash::Hash for PtrKey {
1535 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1536 (self as *const PtrKey).hash(state);
1537 }
1538}
1539
1540#[derive(Default)]
1541struct JoinResults(HashMap<TaskHandle, JoinResult>);
1542
1543trait Results: Send + Sync + 'static {
1544 fn can_spawn(&self) -> bool;
1546
1547 fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1549
1550 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1552
1553 fn detach(&mut self, task: &TaskHandle) -> bool;
1556
1557 fn take(&mut self) -> Box<dyn Any>;
1559
1560 #[cfg(test)]
1562 fn is_empty(&self) -> bool;
1563}
1564
1565impl Results for JoinResults {
1566 fn can_spawn(&self) -> bool {
1567 true
1568 }
1569
1570 fn poll_join_result(&mut self, task: &TaskHandle, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1571 match self.0.entry(task.clone()) {
1572 Entry::Occupied(mut o) => match o.get_mut() {
1573 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1574 JoinResult::Ready => {
1575 o.remove();
1576 return Poll::Ready(task.clone());
1577 }
1578 },
1579 Entry::Vacant(v) => {
1580 v.insert(JoinResult::Waker(cx.waker().clone()));
1581 }
1582 }
1583 Poll::Pending
1584 }
1585
1586 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1587 match self.0.entry(task) {
1588 Entry::Occupied(mut o) => {
1589 let JoinResult::Waker(waker) = std::mem::replace(o.get_mut(), JoinResult::Ready)
1590 else {
1591 unreachable!()
1595 };
1596 Some(waker)
1597 }
1598 Entry::Vacant(v) => {
1599 v.insert(JoinResult::Ready);
1600 None
1601 }
1602 }
1603 }
1604
1605 fn detach(&mut self, task: &TaskHandle) -> bool {
1606 matches!(self.0.remove(task), Some(JoinResult::Ready))
1607 }
1608
1609 fn take(&mut self) -> Box<dyn Any> {
1610 Box::new(Self(std::mem::take(&mut self.0)))
1611 }
1612
1613 #[cfg(test)]
1614 fn is_empty(&self) -> bool {
1615 self.0.is_empty()
1616 }
1617}
1618
1619#[derive(Default)]
1620struct ResultsStream<R> {
1621 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1622}
1623
1624struct ResultsStreamInner<R> {
1625 results: Vec<R>,
1626 waker: Option<Waker>,
1627}
1628
1629impl<R> Default for ResultsStreamInner<R> {
1630 fn default() -> Self {
1631 Self { results: Vec::new(), waker: None }
1632 }
1633}
1634
1635impl<R: Send + 'static> Results for ResultsStream<R> {
1636 fn can_spawn(&self) -> bool {
1637 false
1638 }
1639
1640 fn poll_join_result(&mut self, _task: &TaskHandle, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1641 Poll::Pending
1642 }
1643
1644 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1645 let mut inner = self.inner.lock();
1646 inner.results.extend(unsafe { task.take_result() });
1649 inner.waker.take()
1650 }
1651
1652 fn detach(&mut self, _task: &TaskHandle) -> bool {
1653 false
1654 }
1655
1656 fn take(&mut self) -> Box<dyn Any> {
1657 Box::new(std::mem::take(&mut self.inner.lock().results))
1658 }
1659
1660 #[cfg(test)]
1661 fn is_empty(&self) -> bool {
1662 false
1663 }
1664}
1665
1666#[cfg(test)]
1667mod tests {
1668 use super::super::super::task::CancelableJoinHandle;
1672 use super::*;
1673 use crate::{
1674 EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1675 yield_now,
1676 };
1677 use fuchsia_sync::{Condvar, Mutex};
1678 use futures::channel::mpsc;
1679 use futures::{FutureExt, StreamExt};
1680 use std::future::pending;
1681 use std::pin::{Pin, pin};
1682 use std::sync::Arc;
1683 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1684 use std::task::{Context, Poll};
1685 use std::time::Duration;
1686
1687 #[derive(Default)]
1688 struct RemoteControlFuture(Mutex<RCFState>);
1689 #[derive(Default)]
1690 struct RCFState {
1691 resolved: bool,
1692 waker: Option<Waker>,
1693 }
1694
1695 impl Future for &RemoteControlFuture {
1696 type Output = ();
1697 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1698 let mut this = self.0.lock();
1699 if this.resolved {
1700 Poll::Ready(())
1701 } else {
1702 this.waker.replace(cx.waker().clone());
1703 Poll::Pending
1704 }
1705 }
1706 }
1707
1708 impl RemoteControlFuture {
1709 fn new() -> Arc<Self> {
1710 Arc::new(Default::default())
1711 }
1712
1713 fn resolve(&self) {
1714 let mut this = self.0.lock();
1715 this.resolved = true;
1716 if let Some(waker) = this.waker.take() {
1717 waker.wake();
1718 }
1719 }
1720
1721 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
1722 let this = Arc::clone(self);
1723 #[allow(clippy::redundant_async_block)] async move {
1725 (&*this).await
1726 }
1727 }
1728 }
1729
1730 #[test]
1731 fn compute_works_on_root_scope() {
1732 let mut executor = TestExecutor::new();
1733 let scope = executor.global_scope();
1734 let mut task = pin!(scope.compute(async { 1 }));
1735 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1736 }
1737
1738 #[test]
1739 fn compute_works_on_new_child() {
1740 let mut executor = TestExecutor::new();
1741 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1742 let mut task = pin!(scope.compute(async { 1 }));
1743 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1744 }
1745
1746 #[test]
1747 fn scope_drop_cancels_tasks() {
1748 let mut executor = TestExecutor::new();
1749 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1750 let mut task = pin!(scope.compute(async { 1 }));
1751 drop(scope);
1752 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1753 }
1754
1755 #[test]
1756 fn tasks_do_not_spawn_on_cancelled_scopes() {
1757 let mut executor = TestExecutor::new();
1758 let scope =
1759 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1760 let handle = scope.to_handle();
1761 let mut cancel = pin!(scope.cancel());
1762 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1763 let mut task = pin!(handle.compute(async { 1 }));
1764 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1765 }
1766
1767 #[test]
1768 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1769 let mut executor = TestExecutor::new();
1770 let scope =
1771 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1772 let handle = scope.to_handle();
1773 let mut close = pin!(scope.cancel());
1774 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1775 let mut task = pin!(handle.compute(async { 1 }));
1776 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1777 }
1778
1779 #[test]
1780 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1781 let mut executor = TestExecutor::new();
1782 let scope = executor.global_scope().new_child();
1783 let handle = scope.to_handle();
1784 handle.spawn(pending());
1785 let mut close = pin!(scope.close());
1786 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1787 let mut task = pin!(handle.compute(async { 1 }));
1788 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1789 }
1790
1791 #[test]
1792 fn spawn_works_on_child_and_grandchild() {
1793 let mut executor = TestExecutor::new();
1794 let scope = executor.global_scope().new_child();
1795 let child = scope.new_child();
1796 let grandchild = child.new_child();
1797 let mut child_task = pin!(child.compute(async { 1 }));
1798 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1799 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1800 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1801 }
1802
1803 #[test]
1804 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1805 let mut executor = TestExecutor::new();
1806 let scope = executor.global_scope().new_child();
1807 let child = scope.new_child();
1808 let grandchild = child.new_child();
1809 let mut child_task = pin!(child.compute(async { 1 }));
1810 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1811 drop(scope);
1812 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1813 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1814 }
1815
1816 #[test]
1817 fn completed_tasks_are_cleaned_up_after_cancel() {
1818 let mut executor = TestExecutor::new();
1819 let scope = executor.global_scope().new_child();
1820
1821 let task1 = scope.spawn(pending::<()>());
1822 let task2 = scope.spawn(async {});
1823 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1824 assert_eq!(scope.lock().all_tasks().len(), 1);
1825
1826 assert_eq!(task1.abort().now_or_never(), None);
1829 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1830
1831 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1832 assert_eq!(scope.lock().all_tasks().len(), 0);
1833 assert!(scope.lock().results.is_empty());
1834 }
1835
1836 #[test]
1837 fn join_emtpy_scope() {
1838 let mut executor = TestExecutor::new();
1839 let scope = executor.global_scope().new_child();
1840 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1841 }
1842
1843 #[test]
1844 fn task_handle_preserves_access_to_result_after_join_begins() {
1845 let mut executor = TestExecutor::new();
1846 let scope = executor.global_scope().new_child();
1847 let mut task = scope.compute(async { 1 });
1848 scope.spawn(async {});
1849 let task2 = scope.spawn(pending::<()>());
1850 let mut join = pin!(scope.join().fuse());
1853 let _ = executor.run_until_stalled(&mut join);
1854 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1855 drop(task2.abort());
1856 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1857 }
1858
1859 #[test]
1860 fn join_blocks_until_task_is_cancelled() {
1861 let mut executor = TestExecutor::new();
1864 let scope = executor.global_scope().new_child();
1865 let outstanding_task = scope.spawn(pending::<()>());
1866 let cancelled_task = scope.spawn(pending::<()>());
1867 assert_eq!(
1868 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1869 Poll::Ready(None)
1870 );
1871 let mut join = pin!(scope.join());
1872 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1873 assert_eq!(
1874 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1875 Poll::Ready(None)
1876 );
1877 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1878 }
1879
1880 #[test]
1881 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1882 let mut executor = TestExecutor::new();
1883 let scope = executor.global_scope().new_child();
1884 scope.spawn(pending::<()>());
1886 let mut join = pin!(scope.join());
1887 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1888 let mut cancel = pin!(join.cancel());
1889 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1890 }
1891
1892 #[test]
1893 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1894 let mut executor = TestExecutor::new();
1895 let scope = executor.global_scope().new_child();
1896 scope.spawn(pending::<()>());
1898 let mut close = pin!(scope.close());
1899 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1900 let mut cancel = pin!(close.cancel());
1901 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1902 }
1903
1904 #[test]
1905 fn join_scope_blocks_until_spawned_task_completes() {
1906 let mut executor = TestExecutor::new();
1907 let scope = executor.global_scope().new_child();
1908 let remote = RemoteControlFuture::new();
1909 let mut task = scope.spawn(remote.as_future());
1910 let mut scope_join = pin!(scope.join());
1911 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1912 remote.resolve();
1913 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1914 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1915 }
1916
1917 #[test]
1918 fn close_scope_blocks_until_spawned_task_completes() {
1919 let mut executor = TestExecutor::new();
1920 let scope = executor.global_scope().new_child();
1921 let remote = RemoteControlFuture::new();
1922 let mut task = scope.spawn(remote.as_future());
1923 let mut scope_close = pin!(scope.close());
1924 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1925 remote.resolve();
1926 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1927 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1928 }
1929
1930 #[test]
1931 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1932 let mut executor = TestExecutor::new();
1933 let scope = executor.global_scope().new_child();
1934 let child = scope.new_child();
1935 let remote = RemoteControlFuture::new();
1936 child.spawn(remote.as_future());
1937 let mut scope_join = pin!(scope.join());
1938 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1939 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1940 child.detach();
1941 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1942 remote.resolve();
1943 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1944 }
1945
1946 #[test]
1947 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1948 let mut executor = TestExecutor::new();
1949 let scope = executor.global_scope().new_child();
1950 let remote = RemoteControlFuture::new();
1951 {
1952 let remote = remote.clone();
1953 scope.spawn(async move {
1954 let child = Scope::new_with_name("child");
1955 child.spawn(async move {
1956 Scope::current().spawn(remote.as_future());
1957 });
1958 child.detach();
1959 });
1960 }
1961 let mut scope_join = pin!(scope.join());
1962 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1963 remote.resolve();
1964 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1965 }
1966
1967 #[test]
1968 fn join_scope_blocks_when_blocked_child_is_detached() {
1969 let mut executor = TestExecutor::new();
1970 let scope = executor.global_scope().new_child();
1971 let child = scope.new_child();
1972 child.spawn(pending());
1973 let mut scope_join = pin!(scope.join());
1974 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1975 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1976 child.detach();
1977 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1978 }
1979
1980 #[test]
1981 fn join_scope_completes_when_blocked_child_is_cancelled() {
1982 let mut executor = TestExecutor::new();
1983 let scope = executor.global_scope().new_child();
1984 let child = scope.new_child();
1985 child.spawn(pending());
1986 let mut scope_join = pin!(scope.join());
1987 {
1988 let mut child_join = pin!(child.join());
1989 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1990 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1991 }
1992 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1993 }
1994
1995 #[test]
1996 fn detached_scope_can_spawn() {
1997 let mut executor = TestExecutor::new();
1998 let scope = executor.global_scope().new_child();
1999 let handle = scope.to_handle();
2000 scope.detach();
2001 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2002 }
2003
2004 #[test]
2005 fn dropped_scope_cannot_spawn() {
2006 let mut executor = TestExecutor::new();
2007 let scope = executor.global_scope().new_child();
2008 let handle = scope.to_handle();
2009 drop(scope);
2010 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2011 }
2012
2013 #[test]
2014 fn dropped_scope_with_running_task_cannot_spawn() {
2015 let mut executor = TestExecutor::new();
2016 let scope = executor.global_scope().new_child();
2017 let handle = scope.to_handle();
2018 let _running_task = handle.spawn(pending::<()>());
2019 drop(scope);
2020 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2021 }
2022
2023 #[test]
2024 fn joined_scope_cannot_spawn() {
2025 let mut executor = TestExecutor::new();
2026 let scope = executor.global_scope().new_child();
2027 let handle = scope.to_handle();
2028 let mut scope_join = pin!(scope.join());
2029 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2030 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2031 }
2032
2033 #[test]
2034 fn joining_scope_with_running_task_can_spawn() {
2035 let mut executor = TestExecutor::new();
2036 let scope = executor.global_scope().new_child();
2037 let handle = scope.to_handle();
2038 let _running_task = handle.spawn(pending::<()>());
2039 let mut scope_join = pin!(scope.join());
2040 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2041 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2042 }
2043
2044 #[test]
2045 fn joined_scope_child_cannot_spawn() {
2046 let mut executor = TestExecutor::new();
2047 let scope = executor.global_scope().new_child();
2048 let handle = scope.to_handle();
2049 let child_before_join = scope.new_child();
2050 assert_eq!(
2051 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2052 Poll::Ready(1)
2053 );
2054 let mut scope_join = pin!(scope.join());
2055 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2056 let child_after_join = handle.new_child();
2057 let grandchild_after_join = child_before_join.new_child();
2058 assert_eq!(
2059 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2060 Poll::Pending
2061 );
2062 assert_eq!(
2063 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2064 Poll::Pending
2065 );
2066 assert_eq!(
2067 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2068 Poll::Pending
2069 );
2070 }
2071
2072 #[test]
2073 fn closed_scope_child_cannot_spawn() {
2074 let mut executor = TestExecutor::new();
2075 let scope = executor.global_scope().new_child();
2076 let handle = scope.to_handle();
2077 let child_before_close = scope.new_child();
2078 assert_eq!(
2079 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2080 Poll::Ready(1)
2081 );
2082 let mut scope_close = pin!(scope.close());
2083 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2084 let child_after_close = handle.new_child();
2085 let grandchild_after_close = child_before_close.new_child();
2086 assert_eq!(
2087 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2088 Poll::Pending
2089 );
2090 assert_eq!(
2091 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2092 Poll::Pending
2093 );
2094 assert_eq!(
2095 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2096 Poll::Pending
2097 );
2098 }
2099
2100 #[test]
2101 fn can_join_child_first() {
2102 let mut executor = TestExecutor::new();
2103 let scope = executor.global_scope().new_child();
2104 let child = scope.new_child();
2105 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2106 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2107 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2108 }
2109
2110 #[test]
2111 fn can_join_parent_first() {
2112 let mut executor = TestExecutor::new();
2113 let scope = executor.global_scope().new_child();
2114 let child = scope.new_child();
2115 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2116 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2117 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2118 }
2119
2120 #[test]
2121 fn task_in_parent_scope_can_join_child() {
2122 let mut executor = TestExecutor::new();
2123 let scope = executor.global_scope().new_child();
2124 let child = scope.new_child();
2125 let remote = RemoteControlFuture::new();
2126 child.spawn(remote.as_future());
2127 scope.spawn(async move { child.join().await });
2128 let mut join = pin!(scope.join());
2129 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2130 remote.resolve();
2131 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2132 }
2133
2134 #[test]
2135 fn join_completes_while_completed_task_handle_is_held() {
2136 let mut executor = TestExecutor::new();
2137 let scope = executor.global_scope().new_child();
2138 let mut task = scope.compute(async { 1 });
2139 scope.spawn(async {});
2140 let mut join = pin!(scope.join());
2141 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2142 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2143 }
2144
2145 #[test]
2146 fn cancel_completes_while_task_holds_handle() {
2147 let mut executor = TestExecutor::new();
2148 let scope = executor.global_scope().new_child();
2149 let handle = scope.to_handle();
2150 let mut task = scope.compute(async move {
2151 loop {
2152 pending::<()>().await; handle.spawn(async {});
2154 }
2155 });
2156
2157 let mut join = pin!(scope.join());
2159 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2160
2161 let mut cancel = pin!(join.cancel());
2162 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2163 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2164 }
2165
2166 #[test]
2167 fn cancel_from_handle_inside_task() {
2168 let mut executor = TestExecutor::new();
2169 let scope = executor.global_scope().new_child();
2170 {
2171 scope.spawn(pending::<()>());
2173
2174 let mut no_tasks = pin!(scope.on_no_tasks());
2175 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2176
2177 let handle = scope.to_handle();
2178 scope.spawn(async move {
2179 handle.cancel().await;
2180 panic!("cancel() should never complete");
2181 });
2182
2183 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2184 }
2185 assert_eq!(scope.join().now_or_never(), Some(()));
2186 }
2187
2188 #[test]
2189 fn can_spawn_from_non_executor_thread() {
2190 let mut executor = TestExecutor::new();
2191 let scope = executor.global_scope().clone();
2192 let done = Arc::new(AtomicBool::new(false));
2193 let done_clone = done.clone();
2194 let _ = std::thread::spawn(move || {
2195 scope.spawn(async move {
2196 done_clone.store(true, Ordering::Relaxed);
2197 })
2198 })
2199 .join();
2200 let _ = executor.run_until_stalled(&mut pending::<()>());
2201 assert!(done.load(Ordering::Relaxed));
2202 }
2203
2204 #[test]
2205 fn scope_tree() {
2206 let mut executor = TestExecutor::new();
2212 let a = executor.global_scope().new_child();
2213 let b = a.new_child();
2214 let c = b.new_child();
2215 let d = b.new_child();
2216 let a_remote = RemoteControlFuture::new();
2217 let c_remote = RemoteControlFuture::new();
2218 let d_remote = RemoteControlFuture::new();
2219 a.spawn(a_remote.as_future());
2220 c.spawn(c_remote.as_future());
2221 d.spawn(d_remote.as_future());
2222 let mut a_join = pin!(a.join());
2223 let mut b_join = pin!(b.join());
2224 let mut d_join = pin!(d.join());
2225 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2226 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2227 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2228 d_remote.resolve();
2229 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2230 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2231 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2232 c_remote.resolve();
2233 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2234 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2235 a_remote.resolve();
2236 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2237 let mut c_join = pin!(c.join());
2238 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2239 }
2240
2241 #[test]
2242 fn wake_all_with_active_guard_on_send_executor() {
2243 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2244 let scope = executor.root_scope().new_child();
2245
2246 let (tx, mut rx) = mpsc::unbounded();
2247 let state = Arc::new(AtomicU64::new(0));
2249
2250 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2251
2252 impl Future for PollCounter {
2253 type Output = ();
2254 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2255 let old = self.0.fetch_add(1, Ordering::Relaxed);
2256 if old >> 32 == (old + 1) & u32::MAX as u64 {
2257 let _ = self.1.unbounded_send(());
2258 }
2259 Poll::Pending
2260 }
2261 }
2262
2263 scope.spawn(PollCounter(state.clone(), tx.clone()));
2264 scope.spawn(PollCounter(state.clone(), tx.clone()));
2265
2266 executor.run(async move {
2267 let mut wait_for_poll_count = async |count| {
2268 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2269 if old & u32::MAX as u64 != count {
2270 rx.next().await.unwrap();
2271 }
2272 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2273 };
2274
2275 wait_for_poll_count(2).await;
2277
2278 let mut start_count = 2;
2279 for _ in 0..2 {
2280 scope.wake_all_with_active_guard();
2281
2282 wait_for_poll_count(start_count + 2).await;
2283 start_count += 2;
2284 }
2285
2286 scope.wake_all_with_active_guard();
2288 let done = scope.cancel();
2289
2290 wait_for_poll_count(start_count + 2).await;
2291
2292 done.await;
2293 });
2294 }
2295
2296 #[test]
2297 fn on_no_tasks_race() {
2298 fn sleep_random() {
2299 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2300 }
2301 for _ in 0..2000 {
2302 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2303 let scope = executor.root_scope().new_child();
2304 scope.spawn(async {
2305 sleep_random();
2306 });
2307 executor.run(async move {
2308 sleep_random();
2309 scope.on_no_tasks().await;
2310 });
2311 }
2312 }
2313
2314 #[test]
2315 fn test_detach() {
2316 let mut e = LocalExecutor::default();
2317 e.run_singlethreaded(async {
2318 let counter = Arc::new(AtomicU32::new(0));
2319
2320 {
2321 let counter = counter.clone();
2322 Task::spawn(async move {
2323 for _ in 0..5 {
2324 yield_now().await;
2325 counter.fetch_add(1, Ordering::Relaxed);
2326 }
2327 })
2328 .detach();
2329 }
2330
2331 while counter.load(Ordering::Relaxed) != 5 {
2332 yield_now().await;
2333 }
2334 });
2335
2336 assert!(e.ehandle.root_scope.lock().results.is_empty());
2337 }
2338
2339 #[test]
2340 fn test_cancel() {
2341 let mut e = LocalExecutor::default();
2342 e.run_singlethreaded(async {
2343 let ref_count = Arc::new(());
2344 {
2346 let ref_count = ref_count.clone();
2347 drop(Task::spawn(async move {
2348 let _ref_count = ref_count;
2349 let _: () = std::future::pending().await;
2350 }));
2351 }
2352
2353 while Arc::strong_count(&ref_count) != 1 {
2354 yield_now().await;
2355 }
2356
2357 let task = {
2359 let ref_count = ref_count.clone();
2360 Task::spawn(async move {
2361 let _ref_count = ref_count;
2362 let _: () = std::future::pending().await;
2363 })
2364 };
2365
2366 assert_eq!(task.abort().await, None);
2367 while Arc::strong_count(&ref_count) != 1 {
2368 yield_now().await;
2369 }
2370
2371 let task = {
2373 let ref_count = ref_count.clone();
2374 Task::spawn(async move {
2375 let _ref_count = ref_count;
2376 })
2377 };
2378
2379 while Arc::strong_count(&ref_count) != 1 {
2381 yield_now().await;
2382 }
2383
2384 assert_eq!(task.abort().await, Some(()));
2385 });
2386
2387 assert!(e.ehandle.root_scope.lock().results.is_empty());
2388 }
2389
2390 #[test]
2391 fn test_cancel_waits() {
2392 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2393 let state = Arc::new((Mutex::new(0), Condvar::new()));
2394 let task = {
2395 let state = state.clone();
2396 executor.root_scope().compute(async move {
2397 *state.0.lock() = 1;
2398 state.1.notify_all();
2399 state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2401 std::thread::sleep(std::time::Duration::from_millis(10));
2402 *state.0.lock() = 3;
2403 "foo"
2404 })
2405 };
2406 executor.run(async move {
2407 state.1.wait_while(&mut state.0.lock(), |state| {
2408 if *state == 1 {
2409 *state = 2;
2411 false
2412 } else {
2413 true
2414 }
2415 });
2416 state.1.notify_all();
2417 assert_eq!(task.abort().await, Some("foo"));
2418 assert_eq!(*state.0.lock(), 3);
2420 });
2421 }
2422
2423 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2424 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2425 let running = Arc::new((Mutex::new(false), Condvar::new()));
2426 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2427 let task = {
2428 let running = running.clone();
2429 let can_quit = can_quit.clone();
2430 executor.root_scope().compute(async move {
2431 *running.0.lock() = true;
2432 running.1.notify_all();
2433 {
2434 let mut guard = can_quit.0.lock();
2435 while !*guard {
2436 can_quit.1.wait(&mut guard);
2437 }
2438 }
2439 *running.0.lock() = false;
2440 })
2441 };
2442 executor.run(async move {
2443 {
2444 let mut guard = running.0.lock();
2445 while !*guard {
2446 running.1.wait(&mut guard);
2447 }
2448 }
2449
2450 callback(task);
2451
2452 *can_quit.0.lock() = true;
2453 can_quit.1.notify_all();
2454
2455 let ehandle = EHandle::local();
2456 let scope = ehandle.global_scope();
2457
2458 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2460 Timer::new(std::time::Duration::from_millis(1)).await;
2461 }
2462
2463 assert!(!*running.0.lock());
2464 });
2465 }
2466
2467 #[test]
2468 fn test_dropped_cancel_cleans_up() {
2469 test_clean_up(|task| {
2470 let abort_fut = std::pin::pin!(task.abort());
2471 let waker = std::task::Waker::noop();
2472 assert!(abort_fut.poll(&mut Context::from_waker(waker)).is_pending());
2473 });
2474 }
2475
2476 #[test]
2477 fn test_dropped_task_cleans_up() {
2478 test_clean_up(|task| {
2479 std::mem::drop(task);
2480 });
2481 }
2482
2483 #[test]
2484 fn test_detach_cleans_up() {
2485 test_clean_up(|task| {
2486 task.detach();
2487 });
2488 }
2489
2490 #[test]
2491 fn test_scope_stream() {
2492 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2493 executor.run(async move {
2494 let (stream, handle) = ScopeStream::new();
2495 handle.push(async { 1 });
2496 handle.push(async { 2 });
2497 stream.close();
2498 let results: HashSet<_> = stream.collect().await;
2499 assert_eq!(results, HashSet::from_iter([1, 2]));
2500 });
2501 }
2502
2503 #[test]
2504 fn test_scope_stream_wakes_properly() {
2505 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2506 executor.run(async move {
2507 let (stream, handle) = ScopeStream::new();
2508 handle.push(async {
2509 Timer::new(Duration::from_millis(10)).await;
2510 1
2511 });
2512 handle.push(async {
2513 Timer::new(Duration::from_millis(10)).await;
2514 2
2515 });
2516 stream.close();
2517 let results: HashSet<_> = stream.collect().await;
2518 assert_eq!(results, HashSet::from_iter([1, 2]));
2519 });
2520 }
2521
2522 #[test]
2523 fn test_scope_stream_drops_spawned_tasks() {
2524 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2525 executor.run(async move {
2526 let (stream, handle) = ScopeStream::new();
2527 handle.push(async { 1 });
2528 let _task = stream.compute(async { "foo" });
2529 stream.close();
2530 let results: HashSet<_> = stream.collect().await;
2531 assert_eq!(results, HashSet::from_iter([1]));
2532 });
2533 }
2534
2535 #[test]
2536 fn test_nested_scope_stream() {
2537 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2538 executor.run(async move {
2539 let (mut stream, handle) = ScopeStream::new();
2540 handle.clone().push(async move {
2541 handle.clone().push(async move {
2542 handle.clone().push(async move { 3 });
2543 2
2544 });
2545 1
2546 });
2547 let mut results = HashSet::default();
2548 while let Some(item) = stream.next().await {
2549 results.insert(item);
2550 if results.len() == 3 {
2551 stream.close();
2552 }
2553 }
2554 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2555 });
2556 }
2557
2558 #[test]
2559 fn test_dropping_scope_stream_cancels_all_tasks() {
2560 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2561 executor.run(async move {
2562 let (stream, handle) = ScopeStream::new();
2563 let (tx1, mut rx) = mpsc::unbounded::<()>();
2564 let tx2 = tx1.clone();
2565 handle.push(async move {
2566 let _tx1 = tx1;
2567 let () = pending().await;
2568 });
2569 handle.push(async move {
2570 let _tx2 = tx2;
2571 let () = pending().await;
2572 });
2573 drop(stream);
2574
2575 assert_eq!(rx.next().await, None);
2577 });
2578 }
2579
2580 #[test]
2581 fn test_scope_stream_collect() {
2582 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2583 executor.run(async move {
2584 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2585 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2586
2587 let stream: ScopeStream<_> =
2588 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2589 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2590 });
2591 }
2592
2593 struct DropSignal(Arc<AtomicBool>);
2594
2595 impl Drop for DropSignal {
2596 fn drop(&mut self) {
2597 self.0.store(true, Ordering::SeqCst);
2598 }
2599 }
2600
2601 struct DropChecker(Arc<AtomicBool>);
2602
2603 impl DropChecker {
2604 fn new() -> (Self, DropSignal) {
2605 let inner = Arc::new(AtomicBool::new(false));
2606 (Self(inner.clone()), DropSignal(inner))
2607 }
2608
2609 fn is_dropped(&self) -> bool {
2610 self.0.load(Ordering::SeqCst)
2611 }
2612 }
2613
2614 #[test]
2615 fn child_finished_when_parent_pending() {
2616 let mut executor = LocalExecutor::default();
2617 executor.run_singlethreaded(async {
2618 let scope = Scope::new();
2619 let _guard = scope.active_guard().expect("acquire guard");
2620 let cancel = scope.to_handle().cancel();
2621 let child = scope.new_child();
2622 let (checker, signal) = DropChecker::new();
2623 child.spawn(async move {
2624 let _signal = signal;
2625 futures::future::pending::<()>().await
2626 });
2627 assert!(checker.is_dropped());
2628 assert!(child.active_guard().is_none());
2629 cancel.await;
2630 })
2631 }
2632
2633 #[test]
2634 fn guarded_scopes_observe_closed() {
2635 let mut executor = LocalExecutor::default();
2636 executor.run_singlethreaded(async {
2637 let scope = Scope::new();
2638 let handle = scope.to_handle();
2639 let _guard = scope.active_guard().expect("acquire guard");
2640 handle.close();
2641 let (checker, signal) = DropChecker::new();
2642 handle.spawn(async move {
2643 let _signal = signal;
2644 futures::future::pending::<()>().await
2645 });
2646 assert!(checker.is_dropped());
2647 let (checker, signal) = DropChecker::new();
2648 let cancel = handle.clone().cancel();
2649 handle.spawn(async move {
2650 let _signal = signal;
2651 futures::future::pending::<()>().await
2652 });
2653 assert!(checker.is_dropped());
2654 scope.join().await;
2655 cancel.await;
2656 })
2657 }
2658
2659 #[test]
2660 fn child_guard_holds_parent_cancellation() {
2661 let mut executor = TestExecutor::new();
2662 let scope = executor.global_scope().new_child();
2663 let child = scope.new_child();
2664 let guard = child.active_guard().expect("acquire guard");
2665 scope.spawn(futures::future::pending());
2666 let mut join = pin!(scope.cancel());
2667 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2668 drop(guard);
2669 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2670 }
2671
2672 #[test]
2673 fn active_guard_on_cancel() {
2674 let mut executor = TestExecutor::new();
2675 let scope = executor.global_scope().new_child();
2676 let child1 = scope.new_child();
2677 let child2 = scope.new_child();
2678 let guard = child1.active_guard().expect("acquire guard");
2679 let guard_for_right_scope = guard.clone();
2680 let guard_for_wrong_scope = guard.clone();
2681 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2682 child2.spawn(async move {
2683 guard_for_wrong_scope.on_cancel().await;
2684 });
2685
2686 let handle = scope.to_handle();
2687 let mut join = pin!(scope.join());
2688 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2689 let cancel: Join<_> = handle.cancel();
2690 drop(cancel);
2691 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2692 }
2693
2694 #[test]
2695 fn abort_join() {
2696 let mut executor = TestExecutor::new();
2697 let scope = executor.global_scope().new_child();
2698 let child = scope.new_child();
2699 let _guard = child.active_guard().expect("acquire guard");
2700
2701 let (checker1, signal) = DropChecker::new();
2702 scope.spawn(async move {
2703 let _signal = signal;
2704 futures::future::pending::<()>().await
2705 });
2706 let (checker2, signal) = DropChecker::new();
2707 scope.spawn(async move {
2708 let _signal = signal;
2709 futures::future::pending::<()>().await
2710 });
2711
2712 let mut join = pin!(scope.cancel());
2713 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2714 assert!(!checker1.is_dropped());
2715 assert!(!checker2.is_dropped());
2716
2717 let mut join = join.abort();
2718 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2719 assert!(checker1.is_dropped());
2720 assert!(checker2.is_dropped());
2721 }
2722
2723 #[test]
2724 fn child_without_guard_aborts_immediately_on_cancel() {
2725 let mut executor = TestExecutor::new();
2726 let scope = executor.global_scope().new_child();
2727 let child = scope.new_child();
2728 let guard = scope.active_guard().expect("acquire guard");
2729
2730 let (checker_scope, signal) = DropChecker::new();
2731 scope.spawn(async move {
2732 let _signal = signal;
2733 futures::future::pending::<()>().await
2734 });
2735 let (checker_child, signal) = DropChecker::new();
2736 child.spawn(async move {
2737 let _signal = signal;
2738 futures::future::pending::<()>().await
2739 });
2740
2741 let mut join = pin!(scope.cancel());
2742 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2743 assert!(!checker_scope.is_dropped());
2744 assert!(checker_child.is_dropped());
2745
2746 drop(guard);
2747 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2748 assert!(checker_child.is_dropped());
2749 }
2750
2751 #[test]
2752 fn await_canceled_task_pends_forever() {
2753 let mut executor = TestExecutor::new();
2754 let scope = executor.global_scope().new_child();
2755
2756 let task = scope.spawn(pending::<()>());
2757 let mut main_future = pin!(async move {
2758 drop(scope);
2759 task.await;
2760 });
2761 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Pending,);
2762 }
2763
2764 #[test]
2765 fn await_canceled_abortable_task_finishes_with_error() {
2766 let mut executor = TestExecutor::new();
2767 let scope = executor.global_scope().new_child();
2768
2769 let task = CancelableJoinHandle::from(scope.spawn(pending::<()>()));
2770 let mut main_future = pin!(async move {
2771 drop(scope);
2772 let _ = task.await;
2773 });
2774 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Ready(()),);
2775 }
2776
2777 #[test]
2778 fn closed_scope_drops_task_immediately() {
2779 let executor = TestExecutor::new();
2780 let scope = executor.global_scope().new_child();
2781 scope.clone().close();
2782 let object = Arc::new(());
2783 let object2 = object.clone();
2784 let _task = scope.spawn(async move {
2785 let _object2 = object2;
2786 let () = std::future::pending().await;
2787 });
2788
2789 assert!(Arc::into_inner(object).is_some());
2792 }
2793}