1use crate::token_registry::TokenRegistry;
20
21use fuchsia_async::{JoinHandle, Scope, ScopeHandle, SpawnableFuture};
22use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
23use futures::Future;
24use futures::task::{self, Poll};
25use std::future::poll_fn;
26use std::pin::Pin;
27use std::sync::{Arc, Weak};
28use std::task::Context;
29
30#[cfg(target_os = "fuchsia")]
31use fuchsia_async::EHandle;
32
33pub use fuchsia_async::scope::ScopeActiveGuard as ActiveGuard;
34
35pub type SpawnError = task::SpawnError;
36
37#[derive(Clone)]
48pub struct ExecutionScope {
49 executor: Arc<Executor>,
50}
51
52struct Executor {
53 token_registry: TokenRegistry,
54 scope: Mutex<Option<Scope>>,
55}
56
57impl ExecutionScope {
58 pub fn new() -> Self {
61 Self::build().new()
62 }
63
64 pub fn build() -> ExecutionScopeParams {
68 ExecutionScopeParams::default()
69 }
70
71 pub fn as_weak(&self) -> WeakExecutionScope {
72 WeakExecutionScope { executor: Arc::downgrade(&self.executor) }
73 }
74
75 pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()> {
87 self.executor.scope().spawn(task)
88 }
89
90 pub fn new_task(self, task: impl Future<Output = ()> + Send + 'static) -> Task {
92 Task(self.executor, SpawnableFuture::new(task))
93 }
94
95 pub fn token_registry(&self) -> &TokenRegistry {
96 &self.executor.token_registry
97 }
98
99 pub fn shutdown(&self) {
100 self.executor.shutdown();
101 }
102
103 pub fn force_shutdown(&self) {
105 let _ = self.executor.scope().clone().abort();
106 }
107
108 pub fn resurrect(&self) {
111 *self.executor.scope.lock() = None;
114 }
115
116 pub async fn wait(&self) {
118 let scope = self.executor.scope().clone();
119 scope.on_no_tasks_and_guards().await;
120 }
121
122 pub fn try_active_guard(&self) -> Option<ActiveGuard> {
125 self.executor.scope().active_guard()
126 }
127}
128
129impl PartialEq for ExecutionScope {
130 fn eq(&self, other: &Self) -> bool {
131 Arc::as_ptr(&self.executor) == Arc::as_ptr(&other.executor)
132 }
133}
134
135impl Eq for ExecutionScope {}
136
137impl std::fmt::Debug for ExecutionScope {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.write_fmt(format_args!("ExecutionScope {:?}", Arc::as_ptr(&self.executor)))
140 }
141}
142
143#[derive(Default)]
144pub struct ExecutionScopeParams {
145 #[cfg(target_os = "fuchsia")]
146 async_executor: Option<EHandle>,
147}
148
149impl ExecutionScopeParams {
150 #[cfg(target_os = "fuchsia")]
151 pub fn executor(mut self, value: EHandle) -> Self {
152 assert!(self.async_executor.is_none(), "`executor` is already set");
153 self.async_executor = Some(value);
154 self
155 }
156
157 pub fn new(self) -> ExecutionScope {
158 ExecutionScope {
159 executor: Arc::new(Executor {
160 token_registry: TokenRegistry::new(),
161 #[cfg(target_os = "fuchsia")]
162 scope: self.async_executor.map_or_else(
163 || Mutex::new(None),
164 |e| Mutex::new(Some(e.global_scope().new_child())),
165 ),
166 #[cfg(not(target_os = "fuchsia"))]
167 scope: Mutex::new(None),
168 }),
169 }
170 }
171}
172
173#[derive(Clone)]
176pub struct WeakExecutionScope {
177 executor: Weak<Executor>,
178}
179
180impl WeakExecutionScope {
181 pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
184 let executor = self.executor.upgrade();
185 if let Some(executor) = executor {
186 _ = executor.scope().spawn(task)
187 }
188 }
189}
190
191impl Executor {
192 fn scope(&self) -> MappedMutexGuard<'_, Scope> {
193 MutexGuard::map(self.scope.lock(), |s| {
197 s.get_or_insert_with(|| {
198 #[cfg(target_os = "fuchsia")]
199 return Scope::global().new_child();
200 #[cfg(not(target_os = "fuchsia"))]
201 return Scope::new();
202 })
203 })
204 }
205
206 fn shutdown(&self) {
207 if let Some(scope) = &*self.scope.lock() {
208 scope.wake_all_with_active_guard();
209 let _ = ScopeHandle::clone(&*scope).cancel();
210 }
211 }
212}
213
214impl Drop for Executor {
215 fn drop(&mut self) {
216 self.shutdown();
217 if let Some(scope) = self.scope.get_mut().take() {
220 scope.detach();
221 }
222 }
223}
224
225pub async fn yield_to_executor() {
227 let mut done = false;
228 poll_fn(|cx| {
229 if done {
230 Poll::Ready(())
231 } else {
232 done = true;
233 cx.waker().wake_by_ref();
234 Poll::Pending
235 }
236 })
237 .await;
238}
239
240pub struct Task(Arc<Executor>, SpawnableFuture<'static, ()>);
241
242impl Task {
243 pub fn spawn(self) {
245 self.0.scope().spawn(self.1);
246 }
247}
248
249impl Future for Task {
250 type Output = ();
251
252 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253 Pin::new(&mut &mut self.1).poll(cx)
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::{ExecutionScope, yield_to_executor};
260
261 use fuchsia_async::{TestExecutor, Timer};
262 use futures::Future;
263 use futures::channel::oneshot;
264 use std::pin::pin;
265 use std::sync::Arc;
266 #[cfg(target_os = "fuchsia")]
267 use std::sync::atomic::{AtomicBool, Ordering};
268 #[cfg(target_os = "fuchsia")]
269 use std::task::Poll;
270 use std::time::Duration;
271
272 #[cfg(target_os = "fuchsia")]
273 fn run_test<GetTest, GetTestRes>(get_test: GetTest)
274 where
275 GetTest: FnOnce(ExecutionScope) -> GetTestRes,
276 GetTestRes: Future<Output = ()>,
277 {
278 let mut exec = TestExecutor::new();
279
280 let scope = ExecutionScope::new();
281
282 let test = get_test(scope);
283
284 assert_eq!(
285 exec.run_until_stalled(&mut pin!(test)),
286 Poll::Ready(()),
287 "Test did not complete"
288 );
289 }
290
291 #[cfg(not(target_os = "fuchsia"))]
292 fn run_test<GetTest, GetTestRes>(get_test: GetTest)
293 where
294 GetTest: FnOnce(ExecutionScope) -> GetTestRes,
295 GetTestRes: Future<Output = ()>,
296 {
297 use fuchsia_async::TimeoutExt;
298 let mut exec = TestExecutor::new();
299
300 let scope = ExecutionScope::new();
301
302 let test =
306 get_test(scope).on_stalled(Duration::from_secs(30), || panic!("Test did not complete"));
307
308 exec.run_singlethreaded(&mut pin!(test));
309 }
310
311 #[test]
312 fn simple() {
313 run_test(|scope| {
314 async move {
315 let (sender, receiver) = oneshot::channel();
316 let (counters, task) = mocks::ImmediateTask::new(sender);
317
318 scope.spawn(task);
319
320 receiver.await.unwrap();
322
323 assert_eq!(counters.drop_call(), 1);
324 assert_eq!(counters.poll_call(), 1);
325 }
326 });
327 }
328
329 #[test]
330 fn simple_drop() {
331 run_test(|scope| {
332 async move {
333 let (poll_sender, poll_receiver) = oneshot::channel();
334 let (processing_done_sender, processing_done_receiver) = oneshot::channel();
335 let (drop_sender, drop_receiver) = oneshot::channel();
336 let (counters, task) =
337 mocks::ControlledTask::new(poll_sender, processing_done_receiver, drop_sender);
338
339 scope.spawn(task);
340
341 poll_receiver.await.unwrap();
342
343 processing_done_sender.send(()).unwrap();
344
345 scope.shutdown();
346
347 drop_receiver.await.unwrap();
348
349 let poll_count = counters.poll_call();
352 assert!(poll_count >= 1, "poll was not called");
353
354 assert_eq!(counters.drop_call(), 1);
355 }
356 });
357 }
358
359 #[test]
360 fn test_wait_waits_for_tasks_to_finish() {
361 let mut executor = TestExecutor::new();
362 let scope = ExecutionScope::new();
363 executor.run_singlethreaded(async {
364 let (poll_sender, poll_receiver) = oneshot::channel();
365 let (processing_done_sender, processing_done_receiver) = oneshot::channel();
366 let (drop_sender, _drop_receiver) = oneshot::channel();
367 let (_, task) =
368 mocks::ControlledTask::new(poll_sender, processing_done_receiver, drop_sender);
369
370 scope.spawn(task);
371
372 poll_receiver.await.unwrap();
373
374 let done = fuchsia_sync::Mutex::new(false);
377 futures::join!(
378 async {
379 scope.wait().await;
380 assert_eq!(*done.lock(), true);
381 },
382 async {
383 Timer::new(Duration::from_millis(100)).await;
385 *done.lock() = true;
386 processing_done_sender.send(()).unwrap();
387 }
388 );
389 });
390 }
391
392 #[cfg(target_os = "fuchsia")]
393 #[fuchsia::test]
394 async fn test_shutdown_waits_for_channels() {
395 use fuchsia_async as fasync;
396
397 let scope = ExecutionScope::new();
398 let (rx, tx) = zx::Channel::create();
399 let received_msg = Arc::new(AtomicBool::new(false));
400 let (sender, receiver) = futures::channel::oneshot::channel();
401 {
402 let received_msg = received_msg.clone();
403 scope.spawn(async move {
404 let mut msg_buf = zx::MessageBuf::new();
405 msg_buf.ensure_capacity_bytes(64);
406 let _ = sender.send(());
407 let _ = fasync::Channel::from_channel(rx).recv_msg(&mut msg_buf).await;
408 received_msg.store(true, Ordering::Relaxed);
409 });
410 }
411 let _ = receiver.await;
413
414 tx.write(b"hello", &mut []).expect("write failed");
415 scope.shutdown();
416 scope.wait().await;
417 assert!(received_msg.load(Ordering::Relaxed));
418 }
419
420 #[fuchsia::test]
421 async fn test_force_shutdown() {
422 let scope = ExecutionScope::new();
423 let scope_clone = scope.clone();
424 let ref_count = Arc::new(());
425 let ref_count_clone = ref_count.clone();
426
427 scope.spawn(async move {
430 let _ref_count_clone = ref_count_clone;
431
432 let _guard = scope_clone.try_active_guard().unwrap();
434
435 let _: () = std::future::pending().await;
436 });
437
438 scope.force_shutdown();
439 scope.wait().await;
440
441 assert_eq!(Arc::strong_count(&ref_count), 1);
443
444 scope.resurrect();
446
447 let ref_count_clone = ref_count.clone();
448 scope.spawn(async move {
449 yield_to_executor().await;
451
452 let _ref_count = ref_count_clone.clone();
454
455 let _: () = std::future::pending().await;
456 });
457
458 while Arc::strong_count(&ref_count) != 3 {
459 yield_to_executor().await;
460 }
461
462 for _ in 0..5 {
464 yield_to_executor().await;
465 assert_eq!(Arc::strong_count(&ref_count), 3);
466 }
467 }
468
469 mod mocks {
470 use futures::Future;
471 use futures::channel::oneshot;
472 use futures::task::{Context, Poll};
473 use std::pin::Pin;
474 use std::sync::Arc;
475 use std::sync::atomic::{AtomicUsize, Ordering};
476
477 pub(super) struct TaskCounters {
478 poll_call_count: Arc<AtomicUsize>,
479 drop_call_count: Arc<AtomicUsize>,
480 }
481
482 impl TaskCounters {
483 fn new() -> (Arc<AtomicUsize>, Arc<AtomicUsize>, Self) {
484 let poll_call_count = Arc::new(AtomicUsize::new(0));
485 let drop_call_count = Arc::new(AtomicUsize::new(0));
486
487 (
488 poll_call_count.clone(),
489 drop_call_count.clone(),
490 Self { poll_call_count, drop_call_count },
491 )
492 }
493
494 pub(super) fn poll_call(&self) -> usize {
495 self.poll_call_count.load(Ordering::Relaxed)
496 }
497
498 pub(super) fn drop_call(&self) -> usize {
499 self.drop_call_count.load(Ordering::Relaxed)
500 }
501 }
502
503 pub(super) struct ImmediateTask {
504 poll_call_count: Arc<AtomicUsize>,
505 drop_call_count: Arc<AtomicUsize>,
506 done_sender: Option<oneshot::Sender<()>>,
507 }
508
509 impl ImmediateTask {
510 pub(super) fn new(done_sender: oneshot::Sender<()>) -> (TaskCounters, Self) {
511 let (poll_call_count, drop_call_count, counters) = TaskCounters::new();
512 (
513 counters,
514 Self { poll_call_count, drop_call_count, done_sender: Some(done_sender) },
515 )
516 }
517 }
518
519 impl Future for ImmediateTask {
520 type Output = ();
521
522 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
523 self.poll_call_count.fetch_add(1, Ordering::Relaxed);
524
525 if let Some(sender) = self.done_sender.take() {
526 sender.send(()).unwrap();
527 }
528
529 Poll::Ready(())
530 }
531 }
532
533 impl Drop for ImmediateTask {
534 fn drop(&mut self) {
535 self.drop_call_count.fetch_add(1, Ordering::Relaxed);
536 }
537 }
538
539 impl Unpin for ImmediateTask {}
540
541 pub(super) struct ControlledTask {
542 poll_call_count: Arc<AtomicUsize>,
543 drop_call_count: Arc<AtomicUsize>,
544
545 drop_sender: Option<oneshot::Sender<()>>,
546 future: Pin<Box<dyn Future<Output = ()> + Send>>,
547 }
548
549 impl ControlledTask {
550 pub(super) fn new(
551 poll_sender: oneshot::Sender<()>,
552 processing_complete: oneshot::Receiver<()>,
553 drop_sender: oneshot::Sender<()>,
554 ) -> (TaskCounters, Self) {
555 let (poll_call_count, drop_call_count, counters) = TaskCounters::new();
556 (
557 counters,
558 Self {
559 poll_call_count,
560 drop_call_count,
561 drop_sender: Some(drop_sender),
562 future: Box::pin(async move {
563 poll_sender.send(()).unwrap();
564 processing_complete.await.unwrap();
565 }),
566 },
567 )
568 }
569 }
570
571 impl Future for ControlledTask {
572 type Output = ();
573
574 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
575 self.poll_call_count.fetch_add(1, Ordering::Relaxed);
576 self.future.as_mut().poll(cx)
577 }
578 }
579
580 impl Drop for ControlledTask {
581 fn drop(&mut self) {
582 self.drop_call_count.fetch_add(1, Ordering::Relaxed);
583 self.drop_sender.take().unwrap().send(()).unwrap();
584 }
585 }
586 }
587}