fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::scope::ScopeHandle;
7use futures::prelude::*;
8use std::future::poll_fn;
9use std::marker::PhantomData;
10use std::mem::ManuallyDrop;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A handle to a future that is owned and polled by the executor.
15///
16/// Once a task is created, the executor will poll it until done, even if the task handle itself is
17/// not polled.
18///
19/// NOTE: When a JoinHandle is dropped, its future will be detached.
20///
21/// Polling (or attempting to extract the value from) a task after the executor is dropped may
22/// trigger a panic.
23#[derive(Debug)]
24// LINT.IfChange
25pub struct JoinHandle<T> {
26    scope: ScopeHandle,
27    task_id: usize,
28    phantom: PhantomData<T>,
29}
30// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
31
32impl<T> Unpin for JoinHandle<T> {}
33
34impl<T> JoinHandle<T> {
35    pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
36        Self { scope, task_id, phantom: PhantomData }
37    }
38
39    /// Aborts a task and returns a future that resolves once the task is
40    /// aborted. The future can be ignored in which case the task will still be
41    /// aborted.
42    pub fn abort(mut self) -> impl Future<Output = Option<T>> {
43        // SAFETY: We spawned the task so the return type should be correct.
44        let result = unsafe { self.scope.abort_task(self.task_id) };
45        // TODO(https://fxbug.dev/452064816): The compiler throws a false
46        // positive linter warning because it thinks that `self.task_id = 0;` is
47        // never read, even though it is read in the Drop implementation below.
48        #[allow(unused_assignments)]
49        async move {
50            match result {
51                Some(output) => Some(output),
52                None => {
53                    // If we are dropped from here, we'll end up calling `abort_and_detach`.
54                    let result = std::future::poll_fn(|cx| {
55                        // SAFETY: We spawned the task so the return type should be correct.
56                        unsafe { self.scope.poll_aborted(self.task_id, cx) }
57                    })
58                    .await;
59                    self.task_id = 0;
60                    result
61                }
62            }
63        }
64    }
65}
66
67impl<T> Drop for JoinHandle<T> {
68    fn drop(&mut self) {
69        if self.task_id != 0 {
70            self.scope.detach(self.task_id);
71        }
72    }
73}
74
75impl<T: 'static> Future for JoinHandle<T> {
76    type Output = T;
77    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        // SAFETY: We spawned the task so the return type should be correct.
79        let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
80        if result.is_ready() {
81            self.task_id = 0;
82        }
83        result
84    }
85}
86
87/// This is the same as a JoinHandle, except that the future will be aborted when the task is
88/// dropped.
89#[must_use]
90#[repr(transparent)]
91#[derive(Debug)]
92// LINT.IfChange
93pub struct Task<T>(JoinHandle<T>);
94// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
95
96impl<T> Task<T> {
97    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
98    pub fn detach_on_drop(self) -> JoinHandle<T> {
99        let this = ManuallyDrop::new(self);
100        // SAFETY: We are bypassing our drop implementation.
101        unsafe { std::ptr::read(&this.0) }
102    }
103}
104
105impl Task<()> {
106    /// Detach this task so that it can run independently in the background.
107    ///
108    /// *Note*: This is usually not what you want. This API severs the control flow from the
109    /// caller. This can result in flaky tests and makes it impossible to return values
110    /// (including errors).
111    ///
112    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
113    ///
114    /// You can also use other futures combinators such as:
115    ///
116    /// * [`futures::future::join`]
117    /// * [`futures::future::select`]
118    /// * [`futures::select`]
119    ///
120    /// or their error-aware variants
121    ///
122    /// * [`futures::future::try_join`]
123    /// * [`futures::future::try_select`]
124    ///
125    /// or their stream counterparts
126    ///
127    /// * [`futures::stream::StreamExt::for_each`]
128    /// * [`futures::stream::StreamExt::for_each_concurrent`]
129    /// * [`futures::stream::TryStreamExt::try_for_each`]
130    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
131    ///
132    /// can meet your needs.
133    pub fn detach(mut self) {
134        self.0.scope.detach(self.0.task_id);
135        self.0.task_id = 0;
136    }
137}
138
139impl<T: Send + 'static> Task<T> {
140    /// Spawn a new task on the global scope of the current executor.
141    ///
142    /// The task may be executed on any thread(s) owned by the current executor.
143    /// See [`Task::local`] for an equivalent that ensures locality.
144    ///
145    /// The passed future will live until either (a) the future completes,
146    /// (b) the returned [`Task`] is dropped while the executor is running, or
147    /// (c) the executor is destroyed; whichever comes first.
148    ///
149    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
150    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
151    ///
152    /// # Panics
153    ///
154    /// May panic if not called in the context of an executor (e.g. within a
155    /// call to [`run`][crate::SendExecutor::run]).
156    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
157        EHandle::local().global_scope().compute(future)
158    }
159}
160
161impl<T: 'static> Task<T> {
162    /// Spawn a new task on the global scope of the thread local executor.
163    ///
164    /// The passed future will live until either (a) the future completes,
165    /// (b) the returned [`Task`] is dropped while the executor is running, or
166    /// (c) the executor is destroyed; whichever comes first.
167    ///
168    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
169    /// runtime panic. Use [`Task::spawn`] instead.
170    ///
171    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
172    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
173    ///
174    /// # Panics
175    ///
176    /// May panic if not called in the context of an executor (e.g. within a
177    /// call to [`run`][crate::SendExecutor::run]).
178    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
179        EHandle::local().global_scope().compute_local(future)
180    }
181}
182
183impl<T: 'static> Task<T> {
184    /// Aborts a task and returns a future that resolves once the task is
185    /// aborted. The future can be ignored in which case the task will still be
186    /// aborted.
187    pub fn abort(self) -> impl Future<Output = Option<T>> {
188        self.detach_on_drop().abort()
189    }
190}
191
192impl<T: 'static> Future for Task<T> {
193    type Output = T;
194    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        // SAFETY: We spawned the task so the return type should be correct.
196        let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
197        if result.is_ready() {
198            self.0.task_id = 0;
199        }
200        result
201    }
202}
203
204impl<T> Drop for Task<T> {
205    fn drop(&mut self) {
206        if self.0.task_id != 0 {
207            self.0.scope.abort_and_detach(self.0.task_id);
208            self.0.task_id = 0;
209        }
210    }
211}
212
213impl<T> From<JoinHandle<T>> for Task<T> {
214    fn from(value: JoinHandle<T>) -> Self {
215        Self(value)
216    }
217}
218
219/// Offload a blocking function call onto a different thread.
220///
221/// This function can be called from an asynchronous function without blocking
222/// it, returning a future that can be `.await`ed normally. The provided
223/// function should contain at least one blocking operation, such as:
224///
225/// - A synchronous syscall that does not yet have an async counterpart.
226/// - A compute operation which risks blocking the executor for an unacceptable
227///   amount of time.
228///
229/// If neither of these conditions are satisfied, just call the function normally,
230/// as synchronous functions themselves are allowed within an async context,
231/// as long as they are not blocking.
232///
233/// If you have an async function that may block, refactor the function such that
234/// the blocking operations are offloaded onto the function passed to [`unblock`].
235///
236/// NOTE:
237///
238/// - The input function should not interact with the executor. Attempting to do so
239///   can cause runtime errors. This includes spawning, creating new executors,
240///   passing futures between the input function and the calling context, and
241///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
242/// - Synchronous functions cannot be cancelled and may keep running after
243///   the returned future is dropped. As a result, resources held by the function
244///   should be assumed to be held until the returned future completes.
245/// - This function assumes panic=abort semantics, so if the input function panics,
246///   the process aborts. Behavior for panic=unwind is not defined.
247// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
248// spawning new threads if this proves to be a bottleneck.
249pub fn unblock<T: 'static + Send>(
250    f: impl 'static + Send + FnOnce() -> T,
251) -> impl 'static + Send + Future<Output = T> {
252    let (tx, rx) = futures::channel::oneshot::channel();
253    std::thread::spawn(move || {
254        let _ = tx.send(f());
255    });
256    rx.map(|r| r.unwrap())
257}
258
259/// Yields execution back to the runtime.
260pub async fn yield_now() {
261    let mut done = false;
262    poll_fn(|cx| {
263        if done {
264            Poll::Ready(())
265        } else {
266            done = true;
267            cx.waker().wake_by_ref();
268            Poll::Pending
269        }
270    })
271    .await;
272}
273
274#[cfg(test)]
275mod tests {
276    use super::super::executor::{LocalExecutor, SendExecutorBuilder};
277    use super::*;
278    use std::sync::{Arc, Mutex};
279
280    /// This struct holds a thread-safe mutable boolean and
281    /// sets its value to true when dropped.
282    #[derive(Clone)]
283    struct SetsBoolTrueOnDrop {
284        value: Arc<Mutex<bool>>,
285    }
286
287    impl SetsBoolTrueOnDrop {
288        fn new() -> (Self, Arc<Mutex<bool>>) {
289            let value = Arc::new(Mutex::new(false));
290            let sets_bool_true_on_drop = Self { value: value.clone() };
291            (sets_bool_true_on_drop, value)
292        }
293    }
294
295    impl Drop for SetsBoolTrueOnDrop {
296        fn drop(&mut self) {
297            let mut lock = self.value.lock().unwrap();
298            *lock = true;
299        }
300    }
301
302    #[test]
303    #[should_panic]
304    fn spawn_from_unblock_fails() {
305        // no executor in the off-thread, so spawning fails
306        SendExecutorBuilder::new().num_threads(2).build().run(async move {
307            unblock(|| {
308                #[allow(clippy::let_underscore_future)]
309                let _ = Task::spawn(async {});
310            })
311            .await;
312        });
313    }
314
315    #[test]
316    fn future_destroyed_before_await_returns() {
317        LocalExecutor::default().run_singlethreaded(async {
318            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
319
320            // Move the switch into a different thread.
321            // Once we return from this await, that switch should have been dropped.
322            unblock(move || {
323                let lock = sets_bool_true_on_drop.value.lock().unwrap();
324                assert!(!*lock);
325            })
326            .await;
327
328            // Switch moved into the future should have been dropped at this point.
329            // The value of the boolean should now be true.
330            let lock = value.lock().unwrap();
331            assert!(*lock);
332        });
333    }
334}