fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::runtime::fuchsia::executor::TaskHandle;
7use crate::runtime::fuchsia::scope::JoinError;
8use crate::scope::ScopeHandle;
9use futures::prelude::*;
10use std::future::poll_fn;
11use std::marker::PhantomData;
12use std::mem::ManuallyDrop;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// A handle to a future that is owned and polled by the executor.
17///
18/// Once a task is created, the executor will poll it until done, even if the task handle itself is
19/// not polled.
20///
21/// NOTE: When a JoinHandle is dropped, its future will be detached.
22///
23/// Polling (or attempting to extract the value from) a task after the executor is dropped may
24/// trigger a panic.
25#[derive(Debug)]
26// LINT.IfChange
27pub struct JoinHandle<T> {
28    scope: ScopeHandle,
29    task: Option<TaskHandle>,
30    phantom: PhantomData<T>,
31}
32// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
33
34impl<T> Unpin for JoinHandle<T> {}
35
36impl<T> JoinHandle<T> {
37    pub(crate) fn new(scope: ScopeHandle, task: TaskHandle) -> Self {
38        Self { scope, task: Some(task), phantom: PhantomData }
39    }
40
41    /// Aborts a task and returns a future that resolves once the task is
42    /// aborted. The future can be ignored in which case the task will still be
43    /// aborted.
44    pub fn abort(mut self) -> impl Future<Output = Option<T>> {
45        // SAFETY: We spawned the task so the return type should be correct.
46        let result = self.task.as_ref().and_then(|t| unsafe { self.scope.abort_task(t) });
47        // TODO(https://fxbug.dev/452064816): The compiler throws a false
48        // positive linter warning because it thinks that `self.task = None;` is
49        // never read, even though it is read in the Drop implementation below.
50        #[allow(unused_assignments)]
51        async move {
52            match result {
53                Some(output) => Some(output),
54                None => {
55                    // If we are dropped from here, we'll end up calling `abort_and_detach`.
56                    let result = std::future::poll_fn(|cx| {
57                        let Some(task) = self.task.as_ref() else {
58                            return Poll::Ready(None);
59                        };
60                        // SAFETY: We spawned the task so the return type should be correct.
61                        unsafe { self.scope.poll_aborted(task, cx) }
62                    })
63                    .await;
64                    self.task = None;
65                    result
66                }
67            }
68        }
69    }
70}
71
72impl<T> Drop for JoinHandle<T> {
73    fn drop(&mut self) {
74        if let Some(task) = &self.task {
75            self.scope.detach(task);
76        }
77    }
78}
79
80impl<T: 'static> Future for JoinHandle<T> {
81    type Output = T;
82    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        // SAFETY: We spawned the task so the return type should be correct.
84        let result = unsafe { self.scope.poll_join_result(self.task.as_ref().unwrap(), cx) };
85        if result.is_ready() {
86            self.task = None;
87        }
88        result
89    }
90}
91
92/// A `JoinHandle` which returns `Err` when canceled instead of pending forever.
93#[derive(Debug)]
94pub struct CancelableJoinHandle<T> {
95    inner: JoinHandle<T>,
96}
97
98impl<T> CancelableJoinHandle<T> {
99    /// Aborts a task and returns a future that resolves once the task is
100    /// aborted. The future can be ignored in which case the task will still be
101    /// aborted.
102    pub fn abort(self) -> impl Future<Output = Option<T>> {
103        self.inner.abort()
104    }
105}
106
107impl<T> From<JoinHandle<T>> for CancelableJoinHandle<T> {
108    fn from(inner: JoinHandle<T>) -> Self {
109        Self { inner }
110    }
111}
112
113impl<T: 'static> Future for CancelableJoinHandle<T> {
114    type Output = Result<T, JoinError>;
115
116    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117        // SAFETY: We spawned the task so the return type should be correct.
118        let result =
119            unsafe { self.inner.scope.try_poll_join_result(self.inner.task.as_ref().unwrap(), cx) };
120        if result.is_ready() {
121            self.inner.task = None;
122        }
123        result
124    }
125}
126
127/// This is the same as a JoinHandle, except that the future will be aborted when the task is
128/// dropped.
129#[must_use]
130#[repr(transparent)]
131#[derive(Debug)]
132// LINT.IfChange
133pub struct Task<T>(JoinHandle<T>);
134// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
135
136impl<T> Task<T> {
137    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
138    pub fn detach_on_drop(self) -> JoinHandle<T> {
139        let this = ManuallyDrop::new(self);
140        // SAFETY: We are bypassing our drop implementation.
141        unsafe { std::ptr::read(&this.0) }
142    }
143}
144
145impl Task<()> {
146    /// Detach this task so that it can run independently in the background.
147    ///
148    /// *Note*: This is usually not what you want. This API severs the control flow from the
149    /// caller. This can result in flaky tests and makes it impossible to return values
150    /// (including errors).
151    ///
152    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
153    ///
154    /// You can also use other futures combinators such as:
155    ///
156    /// * [`futures::future::join`]
157    /// * [`futures::future::select`]
158    /// * [`futures::select`]
159    ///
160    /// or their error-aware variants
161    ///
162    /// * [`futures::future::try_join`]
163    /// * [`futures::future::try_select`]
164    ///
165    /// or their stream counterparts
166    ///
167    /// * [`futures::stream::StreamExt::for_each`]
168    /// * [`futures::stream::StreamExt::for_each_concurrent`]
169    /// * [`futures::stream::TryStreamExt::try_for_each`]
170    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
171    ///
172    /// can meet your needs.
173    pub fn detach(mut self) {
174        self.0.scope.detach(self.0.task.as_ref().unwrap());
175        self.0.task = None;
176    }
177}
178
179impl<T: Send + 'static> Task<T> {
180    /// Spawn a new task on the global scope of the current executor.
181    ///
182    /// The task may be executed on any thread(s) owned by the current executor.
183    /// See [`Task::local`] for an equivalent that ensures locality.
184    ///
185    /// The passed future will live until either (a) the future completes,
186    /// (b) the returned [`Task`] is dropped while the executor is running, or
187    /// (c) the executor is destroyed; whichever comes first.
188    ///
189    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
190    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
191    ///
192    /// # Panics
193    ///
194    /// May panic if not called in the context of an executor (e.g. within a
195    /// call to [`run`][crate::SendExecutor::run]).
196    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
197        EHandle::local().global_scope().compute(future)
198    }
199}
200
201impl<T: 'static> Task<T> {
202    /// Spawn a new task on the global scope of the thread local executor.
203    ///
204    /// The passed future will live until either (a) the future completes,
205    /// (b) the returned [`Task`] is dropped while the executor is running, or
206    /// (c) the executor is destroyed; whichever comes first.
207    ///
208    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
209    /// runtime panic. Use [`Task::spawn`] instead.
210    ///
211    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
212    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
213    ///
214    /// # Panics
215    ///
216    /// May panic if not called in the context of an executor (e.g. within a
217    /// call to [`run`][crate::SendExecutor::run]).
218    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
219        EHandle::local().global_scope().compute_local(future)
220    }
221}
222
223impl<T: 'static> Task<T> {
224    /// Aborts a task and returns a future that resolves once the task is
225    /// aborted. The future can be ignored in which case the task will still be
226    /// aborted.
227    pub fn abort(self) -> impl Future<Output = Option<T>> {
228        self.detach_on_drop().abort()
229    }
230}
231
232impl<T: 'static> Future for Task<T> {
233    type Output = T;
234    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235        // SAFETY: We spawned the task so the return type should be correct.
236        let result = unsafe { self.0.scope.poll_join_result(self.0.task.as_ref().unwrap(), cx) };
237        if result.is_ready() {
238            self.0.task = None;
239        }
240        result
241    }
242}
243
244impl<T> Drop for Task<T> {
245    fn drop(&mut self) {
246        if let Some(task) = &self.0.task {
247            self.0.scope.abort_and_detach(task);
248            self.0.task = None;
249        }
250    }
251}
252
253impl<T> From<JoinHandle<T>> for Task<T> {
254    fn from(value: JoinHandle<T>) -> Self {
255        Self(value)
256    }
257}
258
259/// Offload a blocking function call onto a different thread.
260///
261/// This function can be called from an asynchronous function without blocking
262/// it, returning a future that can be `.await`ed normally. The provided
263/// function should contain at least one blocking operation, such as:
264///
265/// - A synchronous syscall that does not yet have an async counterpart.
266/// - A compute operation which risks blocking the executor for an unacceptable
267///   amount of time.
268///
269/// If neither of these conditions are satisfied, just call the function normally,
270/// as synchronous functions themselves are allowed within an async context,
271/// as long as they are not blocking.
272///
273/// If you have an async function that may block, refactor the function such that
274/// the blocking operations are offloaded onto the function passed to [`unblock`].
275///
276/// NOTE:
277///
278/// - The input function should not interact with the executor. Attempting to do so
279///   can cause runtime errors. This includes spawning, creating new executors,
280///   passing futures between the input function and the calling context, and
281///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
282/// - Synchronous functions cannot be cancelled and may keep running after
283///   the returned future is dropped. As a result, resources held by the function
284///   should be assumed to be held until the returned future completes.
285/// - This function assumes panic=abort semantics, so if the input function panics,
286///   the process aborts. Behavior for panic=unwind is not defined.
287// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
288// spawning new threads if this proves to be a bottleneck.
289pub fn unblock<T: 'static + Send>(
290    f: impl 'static + Send + FnOnce() -> T,
291) -> impl 'static + Send + Future<Output = T> {
292    let (tx, rx) = futures::channel::oneshot::channel();
293    std::thread::spawn(move || {
294        let _ = tx.send(f());
295    });
296    rx.map(|r| r.unwrap())
297}
298
299/// Yields execution back to the runtime.
300pub async fn yield_now() {
301    let mut done = false;
302    poll_fn(|cx| {
303        if done {
304            Poll::Ready(())
305        } else {
306            done = true;
307            cx.waker().wake_by_ref();
308            Poll::Pending
309        }
310    })
311    .await;
312}
313
314#[cfg(test)]
315mod tests {
316    use super::super::executor::{LocalExecutor, SendExecutorBuilder};
317    use super::*;
318    use fuchsia_sync::Mutex;
319    use std::sync::Arc;
320
321    /// This struct holds a thread-safe mutable boolean and
322    /// sets its value to true when dropped.
323    #[derive(Clone)]
324    struct SetsBoolTrueOnDrop {
325        value: Arc<Mutex<bool>>,
326    }
327
328    impl SetsBoolTrueOnDrop {
329        fn new() -> (Self, Arc<Mutex<bool>>) {
330            let value = Arc::new(Mutex::new(false));
331            let sets_bool_true_on_drop = Self { value: value.clone() };
332            (sets_bool_true_on_drop, value)
333        }
334    }
335
336    impl Drop for SetsBoolTrueOnDrop {
337        fn drop(&mut self) {
338            let mut lock = self.value.lock();
339            *lock = true;
340        }
341    }
342
343    #[test]
344    #[should_panic]
345    fn spawn_from_unblock_fails() {
346        // no executor in the off-thread, so spawning fails
347        SendExecutorBuilder::new().num_threads(2).build().run(async move {
348            unblock(|| {
349                #[allow(clippy::let_underscore_future)]
350                let _ = Task::spawn(async {});
351            })
352            .await;
353        });
354    }
355
356    #[test]
357    fn future_destroyed_before_await_returns() {
358        LocalExecutor::default().run_singlethreaded(async {
359            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
360
361            // Move the switch into a different thread.
362            // Once we return from this await, that switch should have been dropped.
363            unblock(move || {
364                let lock = sets_bool_true_on_drop.value.lock();
365                assert!(!*lock);
366            })
367            .await;
368
369            // Switch moved into the future should have been dropped at this point.
370            // The value of the boolean should now be true.
371            let lock = value.lock();
372            assert!(*lock);
373        });
374    }
375}