fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::marker::PhantomData;
9use std::mem::ManuallyDrop;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// A handle to a future that is owned and polled by the executor.
14///
15/// Once a task is created, the executor will poll it until done, even if the task handle itself is
16/// not polled.
17///
18/// NOTE: When a JoinHandle is dropped, its future will be detached.
19///
20/// Polling (or attempting to extract the value from) a task after the executor is dropped may
21/// trigger a panic.
22#[derive(Debug)]
23// LINT.IfChange
24pub struct JoinHandle<T> {
25    scope: ScopeHandle,
26    task_id: usize,
27    phantom: PhantomData<T>,
28}
29// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
30
31impl<T> Unpin for JoinHandle<T> {}
32
33impl<T> JoinHandle<T> {
34    pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
35        Self { scope, task_id, phantom: PhantomData }
36    }
37
38    /// Cancel a task and returns a future that resolves once the cancellation is complete.  The
39    /// future can be ignored in which case the task will still be cancelled.
40    pub fn cancel(mut self) -> impl Future<Output = Option<T>> {
41        // SAFETY: We spawned the task so the return type should be correct.
42        let result = unsafe { self.scope.cancel_task(self.task_id) };
43        async move {
44            match result {
45                Some(output) => Some(output),
46                None => {
47                    // If we are dropped from here, we'll end up calling `cancel_and_detach`.
48                    let result = std::future::poll_fn(|cx| {
49                        // SAFETY: We spawned the task so the return type should be correct.
50                        unsafe { self.scope.poll_cancelled(self.task_id, cx) }
51                    })
52                    .await;
53                    self.task_id = 0;
54                    result
55                }
56            }
57        }
58    }
59}
60
61impl<T> Drop for JoinHandle<T> {
62    fn drop(&mut self) {
63        if self.task_id != 0 {
64            self.scope.detach(self.task_id);
65        }
66    }
67}
68
69impl<T: 'static> Future for JoinHandle<T> {
70    type Output = T;
71    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        // SAFETY: We spawned the task so the return type should be correct.
73        let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
74        if result.is_ready() {
75            self.task_id = 0;
76        }
77        result
78    }
79}
80
81/// This is the same as a JoinHandle, except that the future will be cancelled when the task is
82/// dropped.
83#[must_use]
84#[repr(transparent)]
85#[derive(Debug)]
86// LINT.IfChange
87pub struct Task<T>(JoinHandle<T>);
88// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
89
90impl<T> Task<T> {
91    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
92    pub fn detach_on_drop(self) -> JoinHandle<T> {
93        let this = ManuallyDrop::new(self);
94        // SAFETY: We are bypassing our drop implementation.
95        unsafe { std::ptr::read(&this.0) }
96    }
97}
98
99impl Task<()> {
100    /// Detach this task so that it can run independently in the background.
101    ///
102    /// *Note*: This is usually not what you want. This API severs the control flow from the
103    /// caller. This can result in flaky tests and makes it impossible to return values
104    /// (including errors).
105    ///
106    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
107    ///
108    /// You can also use other futures combinators such as:
109    ///
110    /// * [`futures::future::join`]
111    /// * [`futures::future::select`]
112    /// * [`futures::select`]
113    ///
114    /// or their error-aware variants
115    ///
116    /// * [`futures::future::try_join`]
117    /// * [`futures::future::try_select`]
118    ///
119    /// or their stream counterparts
120    ///
121    /// * [`futures::stream::StreamExt::for_each`]
122    /// * [`futures::stream::StreamExt::for_each_concurrent`]
123    /// * [`futures::stream::TryStreamExt::try_for_each`]
124    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
125    ///
126    /// can meet your needs.
127    pub fn detach(mut self) {
128        self.0.scope.detach(self.0.task_id);
129        self.0.task_id = 0;
130    }
131}
132
133impl<T: Send + 'static> Task<T> {
134    /// Spawn a new task on the global scope of the current executor.
135    ///
136    /// The task may be executed on any thread(s) owned by the current executor.
137    /// See [`Task::local`] for an equivalent that ensures locality.
138    ///
139    /// The passed future will live until either (a) the future completes,
140    /// (b) the returned [`Task`] is dropped while the executor is running, or
141    /// (c) the executor is destroyed; whichever comes first.
142    ///
143    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
144    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
145    ///
146    /// # Panics
147    ///
148    /// May panic if not called in the context of an executor (e.g. within a
149    /// call to [`run`][crate::SendExecutor::run]).
150    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
151        EHandle::local().global_scope().compute(future)
152    }
153}
154
155impl<T: 'static> Task<T> {
156    /// Spawn a new task on the global scope of the thread local executor.
157    ///
158    /// The passed future will live until either (a) the future completes,
159    /// (b) the returned [`Task`] is dropped while the executor is running, or
160    /// (c) the executor is destroyed; whichever comes first.
161    ///
162    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
163    /// runtime panic. Use [`Task::spawn`] instead.
164    ///
165    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
166    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
167    ///
168    /// # Panics
169    ///
170    /// May panic if not called in the context of an executor (e.g. within a
171    /// call to [`run`][crate::SendExecutor::run]).
172    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
173        EHandle::local().global_scope().compute_local(future)
174    }
175}
176
177impl<T: 'static> Task<T> {
178    /// Cancel a task and returns a future that resolves once the cancellation is complete.  The
179    /// future can be ignored in which case the task will still be cancelled.
180    pub fn cancel(self) -> impl Future<Output = Option<T>> {
181        self.detach_on_drop().cancel()
182    }
183}
184
185impl<T: 'static> Future for Task<T> {
186    type Output = T;
187    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        // SAFETY: We spawned the task so the return type should be correct.
189        let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
190        if result.is_ready() {
191            self.0.task_id = 0;
192        }
193        result
194    }
195}
196
197impl<T> Drop for Task<T> {
198    fn drop(&mut self) {
199        if self.0.task_id != 0 {
200            self.0.scope.cancel_and_detach(self.0.task_id);
201            self.0.task_id = 0;
202        }
203    }
204}
205
206impl<T> From<JoinHandle<T>> for Task<T> {
207    fn from(value: JoinHandle<T>) -> Self {
208        Self(value)
209    }
210}
211
212/// Offload a blocking function call onto a different thread.
213///
214/// This function can be called from an asynchronous function without blocking
215/// it, returning a future that can be `.await`ed normally. The provided
216/// function should contain at least one blocking operation, such as:
217///
218/// - A synchronous syscall that does not yet have an async counterpart.
219/// - A compute operation which risks blocking the executor for an unacceptable
220///   amount of time.
221///
222/// If neither of these conditions are satisfied, just call the function normally,
223/// as synchronous functions themselves are allowed within an async context,
224/// as long as they are not blocking.
225///
226/// If you have an async function that may block, refactor the function such that
227/// the blocking operations are offloaded onto the function passed to [`unblock`].
228///
229/// NOTE:
230///
231/// - The input function should not interact with the executor. Attempting to do so
232///   can cause runtime errors. This includes spawning, creating new executors,
233///   passing futures between the input function and the calling context, and
234///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
235/// - Synchronous functions cannot be cancelled and may keep running after
236///   the returned future is dropped. As a result, resources held by the function
237///   should be assumed to be held until the returned future completes.
238/// - This function assumes panic=abort semantics, so if the input function panics,
239///   the process aborts. Behavior for panic=unwind is not defined.
240// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
241// spawning new threads if this proves to be a bottleneck.
242pub fn unblock<T: 'static + Send>(
243    f: impl 'static + Send + FnOnce() -> T,
244) -> impl 'static + Send + Future<Output = T> {
245    let (tx, rx) = futures::channel::oneshot::channel();
246    std::thread::spawn(move || {
247        let _ = tx.send(f());
248    });
249    rx.map(|r| r.unwrap())
250}
251
252#[cfg(test)]
253mod tests {
254    use super::super::executor::{LocalExecutor, SendExecutor};
255    use super::*;
256    use std::sync::{Arc, Mutex};
257
258    /// This struct holds a thread-safe mutable boolean and
259    /// sets its value to true when dropped.
260    #[derive(Clone)]
261    struct SetsBoolTrueOnDrop {
262        value: Arc<Mutex<bool>>,
263    }
264
265    impl SetsBoolTrueOnDrop {
266        fn new() -> (Self, Arc<Mutex<bool>>) {
267            let value = Arc::new(Mutex::new(false));
268            let sets_bool_true_on_drop = Self { value: value.clone() };
269            (sets_bool_true_on_drop, value)
270        }
271    }
272
273    impl Drop for SetsBoolTrueOnDrop {
274        fn drop(&mut self) {
275            let mut lock = self.value.lock().unwrap();
276            *lock = true;
277        }
278    }
279
280    #[test]
281    #[should_panic]
282    fn spawn_from_unblock_fails() {
283        // no executor in the off-thread, so spawning fails
284        SendExecutor::new(2).run(async move {
285            unblock(|| {
286                let _ = Task::spawn(async {});
287            })
288            .await;
289        });
290    }
291
292    #[test]
293    fn future_destroyed_before_await_returns() {
294        LocalExecutor::new().run_singlethreaded(async {
295            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
296
297            // Move the switch into a different thread.
298            // Once we return from this await, that switch should have been dropped.
299            unblock(move || {
300                let lock = sets_bool_true_on_drop.value.lock().unwrap();
301                assert_eq!(*lock, false);
302            })
303            .await;
304
305            // Switch moved into the future should have been dropped at this point.
306            // The value of the boolean should now be true.
307            let lock = value.lock().unwrap();
308            assert_eq!(*lock, true);
309        });
310    }
311}