fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::runtime::fuchsia::scope::JoinError;
7use crate::scope::ScopeHandle;
8use futures::prelude::*;
9use std::future::poll_fn;
10use std::marker::PhantomData;
11use std::mem::ManuallyDrop;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15/// A handle to a future that is owned and polled by the executor.
16///
17/// Once a task is created, the executor will poll it until done, even if the task handle itself is
18/// not polled.
19///
20/// NOTE: When a JoinHandle is dropped, its future will be detached.
21///
22/// Polling (or attempting to extract the value from) a task after the executor is dropped may
23/// trigger a panic.
24#[derive(Debug)]
25// LINT.IfChange
26pub struct JoinHandle<T> {
27    scope: ScopeHandle,
28    task_id: usize,
29    phantom: PhantomData<T>,
30}
31// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
32
33impl<T> Unpin for JoinHandle<T> {}
34
35impl<T> JoinHandle<T> {
36    pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
37        Self { scope, task_id, phantom: PhantomData }
38    }
39
40    /// Aborts a task and returns a future that resolves once the task is
41    /// aborted. The future can be ignored in which case the task will still be
42    /// aborted.
43    pub fn abort(mut self) -> impl Future<Output = Option<T>> {
44        // SAFETY: We spawned the task so the return type should be correct.
45        let result = unsafe { self.scope.abort_task(self.task_id) };
46        // TODO(https://fxbug.dev/452064816): The compiler throws a false
47        // positive linter warning because it thinks that `self.task_id = 0;` is
48        // never read, even though it is read in the Drop implementation below.
49        #[allow(unused_assignments)]
50        async move {
51            match result {
52                Some(output) => Some(output),
53                None => {
54                    // If we are dropped from here, we'll end up calling `abort_and_detach`.
55                    let result = std::future::poll_fn(|cx| {
56                        // SAFETY: We spawned the task so the return type should be correct.
57                        unsafe { self.scope.poll_aborted(self.task_id, cx) }
58                    })
59                    .await;
60                    self.task_id = 0;
61                    result
62                }
63            }
64        }
65    }
66}
67
68impl<T> Drop for JoinHandle<T> {
69    fn drop(&mut self) {
70        if self.task_id != 0 {
71            self.scope.detach(self.task_id);
72        }
73    }
74}
75
76impl<T: 'static> Future for JoinHandle<T> {
77    type Output = T;
78    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        // SAFETY: We spawned the task so the return type should be correct.
80        let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
81        if result.is_ready() {
82            self.task_id = 0;
83        }
84        result
85    }
86}
87
88/// A `JoinHandle` which returns `Err` when canceled instead of pending forever.
89#[derive(Debug)]
90pub struct CancelableJoinHandle<T> {
91    inner: JoinHandle<T>,
92}
93
94impl<T> CancelableJoinHandle<T> {
95    /// Aborts a task and returns a future that resolves once the task is
96    /// aborted. The future can be ignored in which case the task will still be
97    /// aborted.
98    pub fn abort(self) -> impl Future<Output = Option<T>> {
99        self.inner.abort()
100    }
101}
102
103impl<T> From<JoinHandle<T>> for CancelableJoinHandle<T> {
104    fn from(inner: JoinHandle<T>) -> Self {
105        Self { inner }
106    }
107}
108
109impl<T: 'static> Future for CancelableJoinHandle<T> {
110    type Output = Result<T, JoinError>;
111
112    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113        // SAFETY: We spawned the task so the return type should be correct.
114        let result = unsafe { self.inner.scope.try_poll_join_result(self.inner.task_id, cx) };
115        if result.is_ready() {
116            self.inner.task_id = 0;
117        }
118        result
119    }
120}
121
122/// This is the same as a JoinHandle, except that the future will be aborted when the task is
123/// dropped.
124#[must_use]
125#[repr(transparent)]
126#[derive(Debug)]
127// LINT.IfChange
128pub struct Task<T>(JoinHandle<T>);
129// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
130
131impl<T> Task<T> {
132    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
133    pub fn detach_on_drop(self) -> JoinHandle<T> {
134        let this = ManuallyDrop::new(self);
135        // SAFETY: We are bypassing our drop implementation.
136        unsafe { std::ptr::read(&this.0) }
137    }
138}
139
140impl Task<()> {
141    /// Detach this task so that it can run independently in the background.
142    ///
143    /// *Note*: This is usually not what you want. This API severs the control flow from the
144    /// caller. This can result in flaky tests and makes it impossible to return values
145    /// (including errors).
146    ///
147    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
148    ///
149    /// You can also use other futures combinators such as:
150    ///
151    /// * [`futures::future::join`]
152    /// * [`futures::future::select`]
153    /// * [`futures::select`]
154    ///
155    /// or their error-aware variants
156    ///
157    /// * [`futures::future::try_join`]
158    /// * [`futures::future::try_select`]
159    ///
160    /// or their stream counterparts
161    ///
162    /// * [`futures::stream::StreamExt::for_each`]
163    /// * [`futures::stream::StreamExt::for_each_concurrent`]
164    /// * [`futures::stream::TryStreamExt::try_for_each`]
165    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
166    ///
167    /// can meet your needs.
168    pub fn detach(mut self) {
169        self.0.scope.detach(self.0.task_id);
170        self.0.task_id = 0;
171    }
172}
173
174impl<T: Send + 'static> Task<T> {
175    /// Spawn a new task on the global scope of the current executor.
176    ///
177    /// The task may be executed on any thread(s) owned by the current executor.
178    /// See [`Task::local`] for an equivalent that ensures locality.
179    ///
180    /// The passed future will live until either (a) the future completes,
181    /// (b) the returned [`Task`] is dropped while the executor is running, or
182    /// (c) the executor is destroyed; whichever comes first.
183    ///
184    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
185    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
186    ///
187    /// # Panics
188    ///
189    /// May panic if not called in the context of an executor (e.g. within a
190    /// call to [`run`][crate::SendExecutor::run]).
191    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
192        EHandle::local().global_scope().compute(future)
193    }
194}
195
196impl<T: 'static> Task<T> {
197    /// Spawn a new task on the global scope of the thread local executor.
198    ///
199    /// The passed future will live until either (a) the future completes,
200    /// (b) the returned [`Task`] is dropped while the executor is running, or
201    /// (c) the executor is destroyed; whichever comes first.
202    ///
203    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
204    /// runtime panic. Use [`Task::spawn`] instead.
205    ///
206    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
207    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
208    ///
209    /// # Panics
210    ///
211    /// May panic if not called in the context of an executor (e.g. within a
212    /// call to [`run`][crate::SendExecutor::run]).
213    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
214        EHandle::local().global_scope().compute_local(future)
215    }
216}
217
218impl<T: 'static> Task<T> {
219    /// Aborts a task and returns a future that resolves once the task is
220    /// aborted. The future can be ignored in which case the task will still be
221    /// aborted.
222    pub fn abort(self) -> impl Future<Output = Option<T>> {
223        self.detach_on_drop().abort()
224    }
225}
226
227impl<T: 'static> Future for Task<T> {
228    type Output = T;
229    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230        // SAFETY: We spawned the task so the return type should be correct.
231        let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
232        if result.is_ready() {
233            self.0.task_id = 0;
234        }
235        result
236    }
237}
238
239impl<T> Drop for Task<T> {
240    fn drop(&mut self) {
241        if self.0.task_id != 0 {
242            self.0.scope.abort_and_detach(self.0.task_id);
243            self.0.task_id = 0;
244        }
245    }
246}
247
248impl<T> From<JoinHandle<T>> for Task<T> {
249    fn from(value: JoinHandle<T>) -> Self {
250        Self(value)
251    }
252}
253
254/// Offload a blocking function call onto a different thread.
255///
256/// This function can be called from an asynchronous function without blocking
257/// it, returning a future that can be `.await`ed normally. The provided
258/// function should contain at least one blocking operation, such as:
259///
260/// - A synchronous syscall that does not yet have an async counterpart.
261/// - A compute operation which risks blocking the executor for an unacceptable
262///   amount of time.
263///
264/// If neither of these conditions are satisfied, just call the function normally,
265/// as synchronous functions themselves are allowed within an async context,
266/// as long as they are not blocking.
267///
268/// If you have an async function that may block, refactor the function such that
269/// the blocking operations are offloaded onto the function passed to [`unblock`].
270///
271/// NOTE:
272///
273/// - The input function should not interact with the executor. Attempting to do so
274///   can cause runtime errors. This includes spawning, creating new executors,
275///   passing futures between the input function and the calling context, and
276///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
277/// - Synchronous functions cannot be cancelled and may keep running after
278///   the returned future is dropped. As a result, resources held by the function
279///   should be assumed to be held until the returned future completes.
280/// - This function assumes panic=abort semantics, so if the input function panics,
281///   the process aborts. Behavior for panic=unwind is not defined.
282// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
283// spawning new threads if this proves to be a bottleneck.
284pub fn unblock<T: 'static + Send>(
285    f: impl 'static + Send + FnOnce() -> T,
286) -> impl 'static + Send + Future<Output = T> {
287    let (tx, rx) = futures::channel::oneshot::channel();
288    std::thread::spawn(move || {
289        let _ = tx.send(f());
290    });
291    rx.map(|r| r.unwrap())
292}
293
294/// Yields execution back to the runtime.
295pub async fn yield_now() {
296    let mut done = false;
297    poll_fn(|cx| {
298        if done {
299            Poll::Ready(())
300        } else {
301            done = true;
302            cx.waker().wake_by_ref();
303            Poll::Pending
304        }
305    })
306    .await;
307}
308
309#[cfg(test)]
310mod tests {
311    use super::super::executor::{LocalExecutor, SendExecutorBuilder};
312    use super::*;
313    use fuchsia_sync::Mutex;
314    use std::sync::Arc;
315
316    /// This struct holds a thread-safe mutable boolean and
317    /// sets its value to true when dropped.
318    #[derive(Clone)]
319    struct SetsBoolTrueOnDrop {
320        value: Arc<Mutex<bool>>,
321    }
322
323    impl SetsBoolTrueOnDrop {
324        fn new() -> (Self, Arc<Mutex<bool>>) {
325            let value = Arc::new(Mutex::new(false));
326            let sets_bool_true_on_drop = Self { value: value.clone() };
327            (sets_bool_true_on_drop, value)
328        }
329    }
330
331    impl Drop for SetsBoolTrueOnDrop {
332        fn drop(&mut self) {
333            let mut lock = self.value.lock();
334            *lock = true;
335        }
336    }
337
338    #[test]
339    #[should_panic]
340    fn spawn_from_unblock_fails() {
341        // no executor in the off-thread, so spawning fails
342        SendExecutorBuilder::new().num_threads(2).build().run(async move {
343            unblock(|| {
344                #[allow(clippy::let_underscore_future)]
345                let _ = Task::spawn(async {});
346            })
347            .await;
348        });
349    }
350
351    #[test]
352    fn future_destroyed_before_await_returns() {
353        LocalExecutor::default().run_singlethreaded(async {
354            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
355
356            // Move the switch into a different thread.
357            // Once we return from this await, that switch should have been dropped.
358            unblock(move || {
359                let lock = sets_bool_true_on_drop.value.lock();
360                assert!(!*lock);
361            })
362            .await;
363
364            // Switch moved into the future should have been dropped at this point.
365            // The value of the boolean should now be true.
366            let lock = value.lock();
367            assert!(*lock);
368        });
369    }
370}