fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::marker::PhantomData;
9use std::mem::ManuallyDrop;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// A handle to a future that is owned and polled by the executor.
14///
15/// Once a task is created, the executor will poll it until done, even if the task handle itself is
16/// not polled.
17///
18/// NOTE: When a JoinHandle is dropped, its future will be detached.
19///
20/// Polling (or attempting to extract the value from) a task after the executor is dropped may
21/// trigger a panic.
22#[derive(Debug)]
23// LINT.IfChange
24pub struct JoinHandle<T> {
25    scope: ScopeHandle,
26    task_id: usize,
27    phantom: PhantomData<T>,
28}
29// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
30
31impl<T> Unpin for JoinHandle<T> {}
32
33impl<T> JoinHandle<T> {
34    pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
35        Self { scope, task_id, phantom: PhantomData }
36    }
37
38    /// Aborts a task and returns a future that resolves once the task is
39    /// aborted. The future can be ignored in which case the task will still be
40    /// aborted.
41    pub fn abort(mut self) -> impl Future<Output = Option<T>> {
42        // SAFETY: We spawned the task so the return type should be correct.
43        let result = unsafe { self.scope.abort_task(self.task_id) };
44        async move {
45            match result {
46                Some(output) => Some(output),
47                None => {
48                    // If we are dropped from here, we'll end up calling `abort_and_detach`.
49                    let result = std::future::poll_fn(|cx| {
50                        // SAFETY: We spawned the task so the return type should be correct.
51                        unsafe { self.scope.poll_aborted(self.task_id, cx) }
52                    })
53                    .await;
54                    self.task_id = 0;
55                    result
56                }
57            }
58        }
59    }
60}
61
62impl<T> Drop for JoinHandle<T> {
63    fn drop(&mut self) {
64        if self.task_id != 0 {
65            self.scope.detach(self.task_id);
66        }
67    }
68}
69
70impl<T: 'static> Future for JoinHandle<T> {
71    type Output = T;
72    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73        // SAFETY: We spawned the task so the return type should be correct.
74        let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
75        if result.is_ready() {
76            self.task_id = 0;
77        }
78        result
79    }
80}
81
82/// This is the same as a JoinHandle, except that the future will be aborted when the task is
83/// dropped.
84#[must_use]
85#[repr(transparent)]
86#[derive(Debug)]
87// LINT.IfChange
88pub struct Task<T>(JoinHandle<T>);
89// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
90
91impl<T> Task<T> {
92    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
93    pub fn detach_on_drop(self) -> JoinHandle<T> {
94        let this = ManuallyDrop::new(self);
95        // SAFETY: We are bypassing our drop implementation.
96        unsafe { std::ptr::read(&this.0) }
97    }
98}
99
100impl Task<()> {
101    /// Detach this task so that it can run independently in the background.
102    ///
103    /// *Note*: This is usually not what you want. This API severs the control flow from the
104    /// caller. This can result in flaky tests and makes it impossible to return values
105    /// (including errors).
106    ///
107    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
108    ///
109    /// You can also use other futures combinators such as:
110    ///
111    /// * [`futures::future::join`]
112    /// * [`futures::future::select`]
113    /// * [`futures::select`]
114    ///
115    /// or their error-aware variants
116    ///
117    /// * [`futures::future::try_join`]
118    /// * [`futures::future::try_select`]
119    ///
120    /// or their stream counterparts
121    ///
122    /// * [`futures::stream::StreamExt::for_each`]
123    /// * [`futures::stream::StreamExt::for_each_concurrent`]
124    /// * [`futures::stream::TryStreamExt::try_for_each`]
125    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
126    ///
127    /// can meet your needs.
128    pub fn detach(mut self) {
129        self.0.scope.detach(self.0.task_id);
130        self.0.task_id = 0;
131    }
132}
133
134impl<T: Send + 'static> Task<T> {
135    /// Spawn a new task on the global scope of the current executor.
136    ///
137    /// The task may be executed on any thread(s) owned by the current executor.
138    /// See [`Task::local`] for an equivalent that ensures locality.
139    ///
140    /// The passed future will live until either (a) the future completes,
141    /// (b) the returned [`Task`] is dropped while the executor is running, or
142    /// (c) the executor is destroyed; whichever comes first.
143    ///
144    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
145    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
146    ///
147    /// # Panics
148    ///
149    /// May panic if not called in the context of an executor (e.g. within a
150    /// call to [`run`][crate::SendExecutor::run]).
151    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
152        EHandle::local().global_scope().compute(future)
153    }
154}
155
156impl<T: 'static> Task<T> {
157    /// Spawn a new task on the global scope of the thread local executor.
158    ///
159    /// The passed future will live until either (a) the future completes,
160    /// (b) the returned [`Task`] is dropped while the executor is running, or
161    /// (c) the executor is destroyed; whichever comes first.
162    ///
163    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
164    /// runtime panic. Use [`Task::spawn`] instead.
165    ///
166    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
167    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
168    ///
169    /// # Panics
170    ///
171    /// May panic if not called in the context of an executor (e.g. within a
172    /// call to [`run`][crate::SendExecutor::run]).
173    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
174        EHandle::local().global_scope().compute_local(future)
175    }
176}
177
178impl<T: 'static> Task<T> {
179    /// Aborts a task and returns a future that resolves once the task is
180    /// aborted. The future can be ignored in which case the task will still be
181    /// aborted.
182    pub fn abort(self) -> impl Future<Output = Option<T>> {
183        self.detach_on_drop().abort()
184    }
185}
186
187impl<T: 'static> Future for Task<T> {
188    type Output = T;
189    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
190        // SAFETY: We spawned the task so the return type should be correct.
191        let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
192        if result.is_ready() {
193            self.0.task_id = 0;
194        }
195        result
196    }
197}
198
199impl<T> Drop for Task<T> {
200    fn drop(&mut self) {
201        if self.0.task_id != 0 {
202            self.0.scope.abort_and_detach(self.0.task_id);
203            self.0.task_id = 0;
204        }
205    }
206}
207
208impl<T> From<JoinHandle<T>> for Task<T> {
209    fn from(value: JoinHandle<T>) -> Self {
210        Self(value)
211    }
212}
213
214/// Offload a blocking function call onto a different thread.
215///
216/// This function can be called from an asynchronous function without blocking
217/// it, returning a future that can be `.await`ed normally. The provided
218/// function should contain at least one blocking operation, such as:
219///
220/// - A synchronous syscall that does not yet have an async counterpart.
221/// - A compute operation which risks blocking the executor for an unacceptable
222///   amount of time.
223///
224/// If neither of these conditions are satisfied, just call the function normally,
225/// as synchronous functions themselves are allowed within an async context,
226/// as long as they are not blocking.
227///
228/// If you have an async function that may block, refactor the function such that
229/// the blocking operations are offloaded onto the function passed to [`unblock`].
230///
231/// NOTE:
232///
233/// - The input function should not interact with the executor. Attempting to do so
234///   can cause runtime errors. This includes spawning, creating new executors,
235///   passing futures between the input function and the calling context, and
236///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
237/// - Synchronous functions cannot be cancelled and may keep running after
238///   the returned future is dropped. As a result, resources held by the function
239///   should be assumed to be held until the returned future completes.
240/// - This function assumes panic=abort semantics, so if the input function panics,
241///   the process aborts. Behavior for panic=unwind is not defined.
242// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
243// spawning new threads if this proves to be a bottleneck.
244pub fn unblock<T: 'static + Send>(
245    f: impl 'static + Send + FnOnce() -> T,
246) -> impl 'static + Send + Future<Output = T> {
247    let (tx, rx) = futures::channel::oneshot::channel();
248    std::thread::spawn(move || {
249        let _ = tx.send(f());
250    });
251    rx.map(|r| r.unwrap())
252}
253
254#[cfg(test)]
255mod tests {
256    use super::super::executor::{LocalExecutor, SendExecutor};
257    use super::*;
258    use std::sync::{Arc, Mutex};
259
260    /// This struct holds a thread-safe mutable boolean and
261    /// sets its value to true when dropped.
262    #[derive(Clone)]
263    struct SetsBoolTrueOnDrop {
264        value: Arc<Mutex<bool>>,
265    }
266
267    impl SetsBoolTrueOnDrop {
268        fn new() -> (Self, Arc<Mutex<bool>>) {
269            let value = Arc::new(Mutex::new(false));
270            let sets_bool_true_on_drop = Self { value: value.clone() };
271            (sets_bool_true_on_drop, value)
272        }
273    }
274
275    impl Drop for SetsBoolTrueOnDrop {
276        fn drop(&mut self) {
277            let mut lock = self.value.lock().unwrap();
278            *lock = true;
279        }
280    }
281
282    #[test]
283    #[should_panic]
284    fn spawn_from_unblock_fails() {
285        // no executor in the off-thread, so spawning fails
286        SendExecutor::new(2).run(async move {
287            unblock(|| {
288                #[allow(clippy::let_underscore_future)]
289                let _ = Task::spawn(async {});
290            })
291            .await;
292        });
293    }
294
295    #[test]
296    fn future_destroyed_before_await_returns() {
297        LocalExecutor::new().run_singlethreaded(async {
298            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
299
300            // Move the switch into a different thread.
301            // Once we return from this await, that switch should have been dropped.
302            unblock(move || {
303                let lock = sets_bool_true_on_drop.value.lock().unwrap();
304                assert!(!*lock);
305            })
306            .await;
307
308            // Switch moved into the future should have been dropped at this point.
309            // The value of the boolean should now be true.
310            let lock = value.lock().unwrap();
311            assert!(*lock);
312        });
313    }
314}