fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::scope::ScopeHandle;
7use futures::prelude::*;
8use std::future::poll_fn;
9use std::marker::PhantomData;
10use std::mem::ManuallyDrop;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A handle to a future that is owned and polled by the executor.
15///
16/// Once a task is created, the executor will poll it until done, even if the task handle itself is
17/// not polled.
18///
19/// NOTE: When a JoinHandle is dropped, its future will be detached.
20///
21/// Polling (or attempting to extract the value from) a task after the executor is dropped may
22/// trigger a panic.
23#[derive(Debug)]
24// LINT.IfChange
25pub struct JoinHandle<T> {
26 scope: ScopeHandle,
27 task_id: usize,
28 phantom: PhantomData<T>,
29}
30// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
31
32impl<T> Unpin for JoinHandle<T> {}
33
34impl<T> JoinHandle<T> {
35 pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
36 Self { scope, task_id, phantom: PhantomData }
37 }
38
39 /// Aborts a task and returns a future that resolves once the task is
40 /// aborted. The future can be ignored in which case the task will still be
41 /// aborted.
42 pub fn abort(mut self) -> impl Future<Output = Option<T>> {
43 // SAFETY: We spawned the task so the return type should be correct.
44 let result = unsafe { self.scope.abort_task(self.task_id) };
45 // TODO(https://fxbug.dev/452064816): The compiler throws a false
46 // positive linter warning because it thinks that `self.task_id = 0;` is
47 // never read, even though it is read in the Drop implementation below.
48 #[allow(unused_assignments)]
49 async move {
50 match result {
51 Some(output) => Some(output),
52 None => {
53 // If we are dropped from here, we'll end up calling `abort_and_detach`.
54 let result = std::future::poll_fn(|cx| {
55 // SAFETY: We spawned the task so the return type should be correct.
56 unsafe { self.scope.poll_aborted(self.task_id, cx) }
57 })
58 .await;
59 self.task_id = 0;
60 result
61 }
62 }
63 }
64 }
65}
66
67impl<T> Drop for JoinHandle<T> {
68 fn drop(&mut self) {
69 if self.task_id != 0 {
70 self.scope.detach(self.task_id);
71 }
72 }
73}
74
75impl<T: 'static> Future for JoinHandle<T> {
76 type Output = T;
77 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78 // SAFETY: We spawned the task so the return type should be correct.
79 let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
80 if result.is_ready() {
81 self.task_id = 0;
82 }
83 result
84 }
85}
86
87/// This is the same as a JoinHandle, except that the future will be aborted when the task is
88/// dropped.
89#[must_use]
90#[repr(transparent)]
91#[derive(Debug)]
92// LINT.IfChange
93pub struct Task<T>(JoinHandle<T>);
94// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
95
96impl<T> Task<T> {
97 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
98 pub fn detach_on_drop(self) -> JoinHandle<T> {
99 let this = ManuallyDrop::new(self);
100 // SAFETY: We are bypassing our drop implementation.
101 unsafe { std::ptr::read(&this.0) }
102 }
103}
104
105impl Task<()> {
106 /// Detach this task so that it can run independently in the background.
107 ///
108 /// *Note*: This is usually not what you want. This API severs the control flow from the
109 /// caller. This can result in flaky tests and makes it impossible to return values
110 /// (including errors).
111 ///
112 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
113 ///
114 /// You can also use other futures combinators such as:
115 ///
116 /// * [`futures::future::join`]
117 /// * [`futures::future::select`]
118 /// * [`futures::select`]
119 ///
120 /// or their error-aware variants
121 ///
122 /// * [`futures::future::try_join`]
123 /// * [`futures::future::try_select`]
124 ///
125 /// or their stream counterparts
126 ///
127 /// * [`futures::stream::StreamExt::for_each`]
128 /// * [`futures::stream::StreamExt::for_each_concurrent`]
129 /// * [`futures::stream::TryStreamExt::try_for_each`]
130 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
131 ///
132 /// can meet your needs.
133 pub fn detach(mut self) {
134 self.0.scope.detach(self.0.task_id);
135 self.0.task_id = 0;
136 }
137}
138
139impl<T: Send + 'static> Task<T> {
140 /// Spawn a new task on the global scope of the current executor.
141 ///
142 /// The task may be executed on any thread(s) owned by the current executor.
143 /// See [`Task::local`] for an equivalent that ensures locality.
144 ///
145 /// The passed future will live until either (a) the future completes,
146 /// (b) the returned [`Task`] is dropped while the executor is running, or
147 /// (c) the executor is destroyed; whichever comes first.
148 ///
149 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
150 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
151 ///
152 /// # Panics
153 ///
154 /// May panic if not called in the context of an executor (e.g. within a
155 /// call to [`run`][crate::SendExecutor::run]).
156 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
157 EHandle::local().global_scope().compute(future)
158 }
159}
160
161impl<T: 'static> Task<T> {
162 /// Spawn a new task on the global scope of the thread local executor.
163 ///
164 /// The passed future will live until either (a) the future completes,
165 /// (b) the returned [`Task`] is dropped while the executor is running, or
166 /// (c) the executor is destroyed; whichever comes first.
167 ///
168 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
169 /// runtime panic. Use [`Task::spawn`] instead.
170 ///
171 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
172 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
173 ///
174 /// # Panics
175 ///
176 /// May panic if not called in the context of an executor (e.g. within a
177 /// call to [`run`][crate::SendExecutor::run]).
178 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
179 EHandle::local().global_scope().compute_local(future)
180 }
181}
182
183impl<T: 'static> Task<T> {
184 /// Aborts a task and returns a future that resolves once the task is
185 /// aborted. The future can be ignored in which case the task will still be
186 /// aborted.
187 pub fn abort(self) -> impl Future<Output = Option<T>> {
188 self.detach_on_drop().abort()
189 }
190}
191
192impl<T: 'static> Future for Task<T> {
193 type Output = T;
194 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 // SAFETY: We spawned the task so the return type should be correct.
196 let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
197 if result.is_ready() {
198 self.0.task_id = 0;
199 }
200 result
201 }
202}
203
204impl<T> Drop for Task<T> {
205 fn drop(&mut self) {
206 if self.0.task_id != 0 {
207 self.0.scope.abort_and_detach(self.0.task_id);
208 self.0.task_id = 0;
209 }
210 }
211}
212
213impl<T> From<JoinHandle<T>> for Task<T> {
214 fn from(value: JoinHandle<T>) -> Self {
215 Self(value)
216 }
217}
218
219/// Offload a blocking function call onto a different thread.
220///
221/// This function can be called from an asynchronous function without blocking
222/// it, returning a future that can be `.await`ed normally. The provided
223/// function should contain at least one blocking operation, such as:
224///
225/// - A synchronous syscall that does not yet have an async counterpart.
226/// - A compute operation which risks blocking the executor for an unacceptable
227/// amount of time.
228///
229/// If neither of these conditions are satisfied, just call the function normally,
230/// as synchronous functions themselves are allowed within an async context,
231/// as long as they are not blocking.
232///
233/// If you have an async function that may block, refactor the function such that
234/// the blocking operations are offloaded onto the function passed to [`unblock`].
235///
236/// NOTE:
237///
238/// - The input function should not interact with the executor. Attempting to do so
239/// can cause runtime errors. This includes spawning, creating new executors,
240/// passing futures between the input function and the calling context, and
241/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
242/// - Synchronous functions cannot be cancelled and may keep running after
243/// the returned future is dropped. As a result, resources held by the function
244/// should be assumed to be held until the returned future completes.
245/// - This function assumes panic=abort semantics, so if the input function panics,
246/// the process aborts. Behavior for panic=unwind is not defined.
247// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
248// spawning new threads if this proves to be a bottleneck.
249pub fn unblock<T: 'static + Send>(
250 f: impl 'static + Send + FnOnce() -> T,
251) -> impl 'static + Send + Future<Output = T> {
252 let (tx, rx) = futures::channel::oneshot::channel();
253 std::thread::spawn(move || {
254 let _ = tx.send(f());
255 });
256 rx.map(|r| r.unwrap())
257}
258
259/// Yields execution back to the runtime.
260pub async fn yield_now() {
261 let mut done = false;
262 poll_fn(|cx| {
263 if done {
264 Poll::Ready(())
265 } else {
266 done = true;
267 cx.waker().wake_by_ref();
268 Poll::Pending
269 }
270 })
271 .await;
272}
273
274#[cfg(test)]
275mod tests {
276 use super::super::executor::{LocalExecutor, SendExecutorBuilder};
277 use super::*;
278 use std::sync::{Arc, Mutex};
279
280 /// This struct holds a thread-safe mutable boolean and
281 /// sets its value to true when dropped.
282 #[derive(Clone)]
283 struct SetsBoolTrueOnDrop {
284 value: Arc<Mutex<bool>>,
285 }
286
287 impl SetsBoolTrueOnDrop {
288 fn new() -> (Self, Arc<Mutex<bool>>) {
289 let value = Arc::new(Mutex::new(false));
290 let sets_bool_true_on_drop = Self { value: value.clone() };
291 (sets_bool_true_on_drop, value)
292 }
293 }
294
295 impl Drop for SetsBoolTrueOnDrop {
296 fn drop(&mut self) {
297 let mut lock = self.value.lock().unwrap();
298 *lock = true;
299 }
300 }
301
302 #[test]
303 #[should_panic]
304 fn spawn_from_unblock_fails() {
305 // no executor in the off-thread, so spawning fails
306 SendExecutorBuilder::new().num_threads(2).build().run(async move {
307 unblock(|| {
308 #[allow(clippy::let_underscore_future)]
309 let _ = Task::spawn(async {});
310 })
311 .await;
312 });
313 }
314
315 #[test]
316 fn future_destroyed_before_await_returns() {
317 LocalExecutor::default().run_singlethreaded(async {
318 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
319
320 // Move the switch into a different thread.
321 // Once we return from this await, that switch should have been dropped.
322 unblock(move || {
323 let lock = sets_bool_true_on_drop.value.lock().unwrap();
324 assert!(!*lock);
325 })
326 .await;
327
328 // Switch moved into the future should have been dropped at this point.
329 // The value of the boolean should now be true.
330 let lock = value.lock().unwrap();
331 assert!(*lock);
332 });
333 }
334}