vfs/
execution_scope.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Values of this type represent "execution scopes" used by the library to give fine grained
6//! control of the lifetimes of the tasks associated with particular connections.  When a new
7//! connection is attached to a pseudo directory tree, an execution scope is provided.  This scope
8//! is then used to start any tasks related to this connection.  All connections opened as a result
9//! of operations on this first connection will also use the same scope, as well as any tasks
10//! related to those connections.
11//!
12//! This way, it is possible to control the lifetime of a group of connections.  All connections
13//! and their tasks can be shutdown by calling `shutdown` method on the scope that is hosting them.
14//! Scope will also shutdown all the tasks when it goes out of scope.
15//!
16//! Implementation wise, execution scope is just a proxy, that forwards all the tasks to an actual
17//! executor, provided as an instance of a [`futures::task::Spawn`] trait.
18
19use crate::token_registry::TokenRegistry;
20
21use fuchsia_async::{JoinHandle, Scope, ScopeHandle, SpawnableFuture};
22use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
23use futures::Future;
24use futures::task::{self, Poll};
25use std::future::poll_fn;
26use std::pin::Pin;
27use std::sync::{Arc, Weak};
28use std::task::Context;
29
30#[cfg(target_os = "fuchsia")]
31use fuchsia_async::EHandle;
32
33pub use fuchsia_async::scope::ScopeActiveGuard as ActiveGuard;
34
35pub type SpawnError = task::SpawnError;
36
37/// An execution scope that is hosting tasks for a group of connections.  See the module level
38/// documentation for details.
39///
40/// Actual execution will be delegated to an "upstream" executor - something that implements
41/// [`futures::task::Spawn`].  In a sense, this is somewhat of an analog of a multithreaded capable
42/// [`futures::stream::FuturesUnordered`], but this some additional functionality specific to the
43/// vfs library.
44///
45/// Use [`ExecutionScope::new()`] or [`ExecutionScope::build()`] to construct new
46/// `ExecutionScope`es.
47#[derive(Clone)]
48pub struct ExecutionScope {
49    executor: Arc<Executor>,
50}
51
52struct Executor {
53    token_registry: TokenRegistry,
54    scope: Mutex<Option<Scope>>,
55}
56
57impl ExecutionScope {
58    /// Constructs an execution scope.  Use [`ExecutionScope::build()`] if you want to specify
59    /// parameters.
60    pub fn new() -> Self {
61        Self::build().new()
62    }
63
64    /// Constructs a new execution scope builder, wrapping the specified executor and optionally
65    /// accepting additional parameters.  Run [`ExecutionScopeParams::new()`] to get an actual
66    /// [`ExecutionScope`] object.
67    pub fn build() -> ExecutionScopeParams {
68        ExecutionScopeParams::default()
69    }
70
71    pub fn as_weak(&self) -> WeakExecutionScope {
72        WeakExecutionScope { executor: Arc::downgrade(&self.executor) }
73    }
74
75    /// Sends a `task` to be executed in this execution scope.  This is very similar to
76    /// [`futures::task::Spawn::spawn_obj()`] with a minor difference that `self` reference is not
77    /// exclusive.
78    ///
79    /// If the task needs to prevent itself from being shutdown, then it should use the
80    /// `try_active_guard` function below.
81    ///
82    /// For the "vfs" library it is more convenient that this method allows non-exclusive
83    /// access.  And as the implementation is employing internal mutability there are no downsides.
84    /// This way `ExecutionScope` can actually also implement [`futures::task::Spawn`] - it just was
85    /// not necessary for now.
86    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()> {
87        self.executor.scope().spawn(task)
88    }
89
90    /// Returns a task that can be spawned later.  The task can also be polled before spawning.
91    pub fn new_task(self, task: impl Future<Output = ()> + Send + 'static) -> Task {
92        Task(self.executor, SpawnableFuture::new(task))
93    }
94
95    pub fn token_registry(&self) -> &TokenRegistry {
96        &self.executor.token_registry
97    }
98
99    pub fn shutdown(&self) {
100        self.executor.shutdown();
101    }
102
103    /// Forcibly shut down the executor without respecting the active guards.
104    pub fn force_shutdown(&self) {
105        let _ = self.executor.scope().clone().abort();
106    }
107
108    /// Restores the executor so that it is no longer in the shut-down state.  Any tasks
109    /// that are still running will continue to run after calling this.
110    pub fn resurrect(&self) {
111        // After setting the scope to None, a new scope will be created the next time `spawn` is
112        // called.
113        *self.executor.scope.lock() = None;
114    }
115
116    /// Wait for all tasks to complete and for there to be no guards.
117    pub async fn wait(&self) {
118        let scope = self.executor.scope().clone();
119        scope.on_no_tasks_and_guards().await;
120    }
121
122    /// Prevents the executor from shutting down whilst the guard is held. Returns None if the
123    /// executor is shutting down.
124    pub fn try_active_guard(&self) -> Option<ActiveGuard> {
125        self.executor.scope().active_guard()
126    }
127}
128
129impl PartialEq for ExecutionScope {
130    fn eq(&self, other: &Self) -> bool {
131        Arc::as_ptr(&self.executor) == Arc::as_ptr(&other.executor)
132    }
133}
134
135impl Eq for ExecutionScope {}
136
137impl std::fmt::Debug for ExecutionScope {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.write_fmt(format_args!("ExecutionScope {:?}", Arc::as_ptr(&self.executor)))
140    }
141}
142
143#[derive(Default)]
144pub struct ExecutionScopeParams {
145    #[cfg(target_os = "fuchsia")]
146    async_executor: Option<EHandle>,
147}
148
149impl ExecutionScopeParams {
150    #[cfg(target_os = "fuchsia")]
151    pub fn executor(mut self, value: EHandle) -> Self {
152        assert!(self.async_executor.is_none(), "`executor` is already set");
153        self.async_executor = Some(value);
154        self
155    }
156
157    pub fn new(self) -> ExecutionScope {
158        ExecutionScope {
159            executor: Arc::new(Executor {
160                token_registry: TokenRegistry::new(),
161                #[cfg(target_os = "fuchsia")]
162                scope: self.async_executor.map_or_else(
163                    || Mutex::new(None),
164                    |e| Mutex::new(Some(e.global_scope().new_child())),
165                ),
166                #[cfg(not(target_os = "fuchsia"))]
167                scope: Mutex::new(None),
168            }),
169        }
170    }
171}
172
173/// Holds a weak reference to the internal `ExecutionScope`, and can spawn futures on it as long as
174/// the reference is still valid.
175#[derive(Clone)]
176pub struct WeakExecutionScope {
177    executor: Weak<Executor>,
178}
179
180impl WeakExecutionScope {
181    /// Adds a task to the referenced [`ExecutionScope`]. The task is dropped if there are no more
182    /// strong references to the original task group.
183    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
184        let executor = self.executor.upgrade();
185        if let Some(executor) = executor {
186            _ = executor.scope().spawn(task)
187        }
188    }
189}
190
191impl Executor {
192    fn scope(&self) -> MappedMutexGuard<'_, Scope> {
193        // We lazily initialize the executor rather than at construction time as there are currently
194        // a few tests that create the ExecutionScope before the async executor has been initialized
195        // (which means we cannot call EHandle::local()).
196        MutexGuard::map(self.scope.lock(), |s| {
197            s.get_or_insert_with(|| {
198                #[cfg(target_os = "fuchsia")]
199                return Scope::global().new_child();
200                #[cfg(not(target_os = "fuchsia"))]
201                return Scope::new();
202            })
203        })
204    }
205
206    fn shutdown(&self) {
207        if let Some(scope) = &*self.scope.lock() {
208            scope.wake_all_with_active_guard();
209            let _ = ScopeHandle::clone(&*scope).cancel();
210        }
211    }
212}
213
214impl Drop for Executor {
215    fn drop(&mut self) {
216        self.shutdown();
217        // We must detach the scope, because otherwise all the tasks will be aborted and the active
218        // guards will be ignored.
219        if let Some(scope) = self.scope.get_mut().take() {
220            scope.detach();
221        }
222    }
223}
224
225/// Yields to the executor, providing an opportunity for other futures to run.
226pub async fn yield_to_executor() {
227    let mut done = false;
228    poll_fn(|cx| {
229        if done {
230            Poll::Ready(())
231        } else {
232            done = true;
233            cx.waker().wake_by_ref();
234            Poll::Pending
235        }
236    })
237    .await;
238}
239
240pub struct Task(Arc<Executor>, SpawnableFuture<'static, ()>);
241
242impl Task {
243    /// Spawns the task on the scope.
244    pub fn spawn(self) {
245        self.0.scope().spawn(self.1);
246    }
247}
248
249impl Future for Task {
250    type Output = ();
251
252    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253        Pin::new(&mut &mut self.1).poll(cx)
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::{ExecutionScope, yield_to_executor};
260
261    use fuchsia_async::{TestExecutor, Timer};
262    use futures::Future;
263    use futures::channel::oneshot;
264    use std::pin::pin;
265    use std::sync::Arc;
266    #[cfg(target_os = "fuchsia")]
267    use std::sync::atomic::{AtomicBool, Ordering};
268    #[cfg(target_os = "fuchsia")]
269    use std::task::Poll;
270    use std::time::Duration;
271
272    #[cfg(target_os = "fuchsia")]
273    fn run_test<GetTest, GetTestRes>(get_test: GetTest)
274    where
275        GetTest: FnOnce(ExecutionScope) -> GetTestRes,
276        GetTestRes: Future<Output = ()>,
277    {
278        let mut exec = TestExecutor::new();
279
280        let scope = ExecutionScope::new();
281
282        let test = get_test(scope);
283
284        assert_eq!(
285            exec.run_until_stalled(&mut pin!(test)),
286            Poll::Ready(()),
287            "Test did not complete"
288        );
289    }
290
291    #[cfg(not(target_os = "fuchsia"))]
292    fn run_test<GetTest, GetTestRes>(get_test: GetTest)
293    where
294        GetTest: FnOnce(ExecutionScope) -> GetTestRes,
295        GetTestRes: Future<Output = ()>,
296    {
297        use fuchsia_async::TimeoutExt;
298        let mut exec = TestExecutor::new();
299
300        let scope = ExecutionScope::new();
301
302        // This isn't a perfect equivalent to the target version, but Tokio
303        // doesn't have run_until_stalled and it sounds like it's
304        // architecturally impossible.
305        let test =
306            get_test(scope).on_stalled(Duration::from_secs(30), || panic!("Test did not complete"));
307
308        exec.run_singlethreaded(&mut pin!(test));
309    }
310
311    #[test]
312    fn simple() {
313        run_test(|scope| {
314            async move {
315                let (sender, receiver) = oneshot::channel();
316                let (counters, task) = mocks::ImmediateTask::new(sender);
317
318                scope.spawn(task);
319
320                // Make sure our task had a chance to execute.
321                receiver.await.unwrap();
322
323                assert_eq!(counters.drop_call(), 1);
324                assert_eq!(counters.poll_call(), 1);
325            }
326        });
327    }
328
329    #[test]
330    fn simple_drop() {
331        run_test(|scope| {
332            async move {
333                let (poll_sender, poll_receiver) = oneshot::channel();
334                let (processing_done_sender, processing_done_receiver) = oneshot::channel();
335                let (drop_sender, drop_receiver) = oneshot::channel();
336                let (counters, task) =
337                    mocks::ControlledTask::new(poll_sender, processing_done_receiver, drop_sender);
338
339                scope.spawn(task);
340
341                poll_receiver.await.unwrap();
342
343                processing_done_sender.send(()).unwrap();
344
345                scope.shutdown();
346
347                drop_receiver.await.unwrap();
348
349                // poll might be called one or two times depending on the order in which the
350                // executor decides to poll the two tasks (this one and the one we spawned).
351                let poll_count = counters.poll_call();
352                assert!(poll_count >= 1, "poll was not called");
353
354                assert_eq!(counters.drop_call(), 1);
355            }
356        });
357    }
358
359    #[test]
360    fn test_wait_waits_for_tasks_to_finish() {
361        let mut executor = TestExecutor::new();
362        let scope = ExecutionScope::new();
363        executor.run_singlethreaded(async {
364            let (poll_sender, poll_receiver) = oneshot::channel();
365            let (processing_done_sender, processing_done_receiver) = oneshot::channel();
366            let (drop_sender, _drop_receiver) = oneshot::channel();
367            let (_, task) =
368                mocks::ControlledTask::new(poll_sender, processing_done_receiver, drop_sender);
369
370            scope.spawn(task);
371
372            poll_receiver.await.unwrap();
373
374            // We test that wait is working correctly by concurrently waiting and telling the
375            // task to complete, and making sure that the order is correct.
376            let done = fuchsia_sync::Mutex::new(false);
377            futures::join!(
378                async {
379                    scope.wait().await;
380                    assert_eq!(*done.lock(), true);
381                },
382                async {
383                    // This is a Turing halting problem so the sleep is justified.
384                    Timer::new(Duration::from_millis(100)).await;
385                    *done.lock() = true;
386                    processing_done_sender.send(()).unwrap();
387                }
388            );
389        });
390    }
391
392    #[cfg(target_os = "fuchsia")]
393    #[fuchsia::test]
394    async fn test_shutdown_waits_for_channels() {
395        use fuchsia_async as fasync;
396
397        let scope = ExecutionScope::new();
398        let (rx, tx) = zx::Channel::create();
399        let received_msg = Arc::new(AtomicBool::new(false));
400        let (sender, receiver) = futures::channel::oneshot::channel();
401        {
402            let received_msg = received_msg.clone();
403            scope.spawn(async move {
404                let mut msg_buf = zx::MessageBuf::new();
405                msg_buf.ensure_capacity_bytes(64);
406                let _ = sender.send(());
407                let _ = fasync::Channel::from_channel(rx).recv_msg(&mut msg_buf).await;
408                received_msg.store(true, Ordering::Relaxed);
409            });
410        }
411        // Wait until the spawned future has been polled once.
412        let _ = receiver.await;
413
414        tx.write(b"hello", &mut []).expect("write failed");
415        scope.shutdown();
416        scope.wait().await;
417        assert!(received_msg.load(Ordering::Relaxed));
418    }
419
420    #[fuchsia::test]
421    async fn test_force_shutdown() {
422        let scope = ExecutionScope::new();
423        let scope_clone = scope.clone();
424        let ref_count = Arc::new(());
425        let ref_count_clone = ref_count.clone();
426
427        // Spawn a task that holds a reference.  When the task is dropped the reference will get
428        // dropped with it.
429        scope.spawn(async move {
430            let _ref_count_clone = ref_count_clone;
431
432            // Hold an active guard so that only a forced shutdown will work.
433            let _guard = scope_clone.try_active_guard().unwrap();
434
435            let _: () = std::future::pending().await;
436        });
437
438        scope.force_shutdown();
439        scope.wait().await;
440
441        // The task should have been dropped leaving us with the only reference.
442        assert_eq!(Arc::strong_count(&ref_count), 1);
443
444        // Test resurrection...
445        scope.resurrect();
446
447        let ref_count_clone = ref_count.clone();
448        scope.spawn(async move {
449            // Yield so that if the executor is in the shutdown state, it will kill this task.
450            yield_to_executor().await;
451
452            // Take another reference count so that we can check we got here below.
453            let _ref_count = ref_count_clone.clone();
454
455            let _: () = std::future::pending().await;
456        });
457
458        while Arc::strong_count(&ref_count) != 3 {
459            yield_to_executor().await;
460        }
461
462        // Yield some more just to be sure the task isn't killed.
463        for _ in 0..5 {
464            yield_to_executor().await;
465            assert_eq!(Arc::strong_count(&ref_count), 3);
466        }
467    }
468
469    mod mocks {
470        use futures::Future;
471        use futures::channel::oneshot;
472        use futures::task::{Context, Poll};
473        use std::pin::Pin;
474        use std::sync::Arc;
475        use std::sync::atomic::{AtomicUsize, Ordering};
476
477        pub(super) struct TaskCounters {
478            poll_call_count: Arc<AtomicUsize>,
479            drop_call_count: Arc<AtomicUsize>,
480        }
481
482        impl TaskCounters {
483            fn new() -> (Arc<AtomicUsize>, Arc<AtomicUsize>, Self) {
484                let poll_call_count = Arc::new(AtomicUsize::new(0));
485                let drop_call_count = Arc::new(AtomicUsize::new(0));
486
487                (
488                    poll_call_count.clone(),
489                    drop_call_count.clone(),
490                    Self { poll_call_count, drop_call_count },
491                )
492            }
493
494            pub(super) fn poll_call(&self) -> usize {
495                self.poll_call_count.load(Ordering::Relaxed)
496            }
497
498            pub(super) fn drop_call(&self) -> usize {
499                self.drop_call_count.load(Ordering::Relaxed)
500            }
501        }
502
503        pub(super) struct ImmediateTask {
504            poll_call_count: Arc<AtomicUsize>,
505            drop_call_count: Arc<AtomicUsize>,
506            done_sender: Option<oneshot::Sender<()>>,
507        }
508
509        impl ImmediateTask {
510            pub(super) fn new(done_sender: oneshot::Sender<()>) -> (TaskCounters, Self) {
511                let (poll_call_count, drop_call_count, counters) = TaskCounters::new();
512                (
513                    counters,
514                    Self { poll_call_count, drop_call_count, done_sender: Some(done_sender) },
515                )
516            }
517        }
518
519        impl Future for ImmediateTask {
520            type Output = ();
521
522            fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
523                self.poll_call_count.fetch_add(1, Ordering::Relaxed);
524
525                if let Some(sender) = self.done_sender.take() {
526                    sender.send(()).unwrap();
527                }
528
529                Poll::Ready(())
530            }
531        }
532
533        impl Drop for ImmediateTask {
534            fn drop(&mut self) {
535                self.drop_call_count.fetch_add(1, Ordering::Relaxed);
536            }
537        }
538
539        impl Unpin for ImmediateTask {}
540
541        pub(super) struct ControlledTask {
542            poll_call_count: Arc<AtomicUsize>,
543            drop_call_count: Arc<AtomicUsize>,
544
545            drop_sender: Option<oneshot::Sender<()>>,
546            future: Pin<Box<dyn Future<Output = ()> + Send>>,
547        }
548
549        impl ControlledTask {
550            pub(super) fn new(
551                poll_sender: oneshot::Sender<()>,
552                processing_complete: oneshot::Receiver<()>,
553                drop_sender: oneshot::Sender<()>,
554            ) -> (TaskCounters, Self) {
555                let (poll_call_count, drop_call_count, counters) = TaskCounters::new();
556                (
557                    counters,
558                    Self {
559                        poll_call_count,
560                        drop_call_count,
561                        drop_sender: Some(drop_sender),
562                        future: Box::pin(async move {
563                            poll_sender.send(()).unwrap();
564                            processing_complete.await.unwrap();
565                        }),
566                    },
567                )
568            }
569        }
570
571        impl Future for ControlledTask {
572            type Output = ();
573
574            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
575                self.poll_call_count.fetch_add(1, Ordering::Relaxed);
576                self.future.as_mut().poll(cx)
577            }
578        }
579
580        impl Drop for ControlledTask {
581            fn drop(&mut self) {
582                self.drop_call_count.fetch_add(1, Ordering::Relaxed);
583                self.drop_sender.take().unwrap().send(()).unwrap();
584            }
585        }
586    }
587}