fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::marker::PhantomData;
9use std::mem::ManuallyDrop;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// A handle to a future that is owned and polled by the executor.
14///
15/// Once a task is created, the executor will poll it until done, even if the task handle itself is
16/// not polled.
17///
18/// NOTE: When a JoinHandle is dropped, its future will be detached.
19///
20/// Polling (or attempting to extract the value from) a task after the executor is dropped may
21/// trigger a panic.
22#[derive(Debug)]
23// LINT.IfChange
24pub struct JoinHandle<T> {
25 scope: ScopeHandle,
26 task_id: usize,
27 phantom: PhantomData<T>,
28}
29// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
30
31impl<T> Unpin for JoinHandle<T> {}
32
33impl<T> JoinHandle<T> {
34 pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
35 Self { scope, task_id, phantom: PhantomData }
36 }
37
38 /// Aborts a task and returns a future that resolves once the task is
39 /// aborted. The future can be ignored in which case the task will still be
40 /// aborted.
41 pub fn abort(mut self) -> impl Future<Output = Option<T>> {
42 // SAFETY: We spawned the task so the return type should be correct.
43 let result = unsafe { self.scope.abort_task(self.task_id) };
44 async move {
45 match result {
46 Some(output) => Some(output),
47 None => {
48 // If we are dropped from here, we'll end up calling `abort_and_detach`.
49 let result = std::future::poll_fn(|cx| {
50 // SAFETY: We spawned the task so the return type should be correct.
51 unsafe { self.scope.poll_aborted(self.task_id, cx) }
52 })
53 .await;
54 self.task_id = 0;
55 result
56 }
57 }
58 }
59 }
60}
61
62impl<T> Drop for JoinHandle<T> {
63 fn drop(&mut self) {
64 if self.task_id != 0 {
65 self.scope.detach(self.task_id);
66 }
67 }
68}
69
70impl<T: 'static> Future for JoinHandle<T> {
71 type Output = T;
72 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73 // SAFETY: We spawned the task so the return type should be correct.
74 let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
75 if result.is_ready() {
76 self.task_id = 0;
77 }
78 result
79 }
80}
81
82/// This is the same as a JoinHandle, except that the future will be aborted when the task is
83/// dropped.
84#[must_use]
85#[repr(transparent)]
86#[derive(Debug)]
87// LINT.IfChange
88pub struct Task<T>(JoinHandle<T>);
89// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
90
91impl<T> Task<T> {
92 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
93 pub fn detach_on_drop(self) -> JoinHandle<T> {
94 let this = ManuallyDrop::new(self);
95 // SAFETY: We are bypassing our drop implementation.
96 unsafe { std::ptr::read(&this.0) }
97 }
98}
99
100impl Task<()> {
101 /// Detach this task so that it can run independently in the background.
102 ///
103 /// *Note*: This is usually not what you want. This API severs the control flow from the
104 /// caller. This can result in flaky tests and makes it impossible to return values
105 /// (including errors).
106 ///
107 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
108 ///
109 /// You can also use other futures combinators such as:
110 ///
111 /// * [`futures::future::join`]
112 /// * [`futures::future::select`]
113 /// * [`futures::select`]
114 ///
115 /// or their error-aware variants
116 ///
117 /// * [`futures::future::try_join`]
118 /// * [`futures::future::try_select`]
119 ///
120 /// or their stream counterparts
121 ///
122 /// * [`futures::stream::StreamExt::for_each`]
123 /// * [`futures::stream::StreamExt::for_each_concurrent`]
124 /// * [`futures::stream::TryStreamExt::try_for_each`]
125 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
126 ///
127 /// can meet your needs.
128 pub fn detach(mut self) {
129 self.0.scope.detach(self.0.task_id);
130 self.0.task_id = 0;
131 }
132}
133
134impl<T: Send + 'static> Task<T> {
135 /// Spawn a new task on the global scope of the current executor.
136 ///
137 /// The task may be executed on any thread(s) owned by the current executor.
138 /// See [`Task::local`] for an equivalent that ensures locality.
139 ///
140 /// The passed future will live until either (a) the future completes,
141 /// (b) the returned [`Task`] is dropped while the executor is running, or
142 /// (c) the executor is destroyed; whichever comes first.
143 ///
144 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
145 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
146 ///
147 /// # Panics
148 ///
149 /// May panic if not called in the context of an executor (e.g. within a
150 /// call to [`run`][crate::SendExecutor::run]).
151 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
152 EHandle::local().global_scope().compute(future)
153 }
154}
155
156impl<T: 'static> Task<T> {
157 /// Spawn a new task on the global scope of the thread local executor.
158 ///
159 /// The passed future will live until either (a) the future completes,
160 /// (b) the returned [`Task`] is dropped while the executor is running, or
161 /// (c) the executor is destroyed; whichever comes first.
162 ///
163 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
164 /// runtime panic. Use [`Task::spawn`] instead.
165 ///
166 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
167 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
168 ///
169 /// # Panics
170 ///
171 /// May panic if not called in the context of an executor (e.g. within a
172 /// call to [`run`][crate::SendExecutor::run]).
173 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
174 EHandle::local().global_scope().compute_local(future)
175 }
176}
177
178impl<T: 'static> Task<T> {
179 /// Aborts a task and returns a future that resolves once the task is
180 /// aborted. The future can be ignored in which case the task will still be
181 /// aborted.
182 pub fn abort(self) -> impl Future<Output = Option<T>> {
183 self.detach_on_drop().abort()
184 }
185}
186
187impl<T: 'static> Future for Task<T> {
188 type Output = T;
189 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
190 // SAFETY: We spawned the task so the return type should be correct.
191 let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
192 if result.is_ready() {
193 self.0.task_id = 0;
194 }
195 result
196 }
197}
198
199impl<T> Drop for Task<T> {
200 fn drop(&mut self) {
201 if self.0.task_id != 0 {
202 self.0.scope.abort_and_detach(self.0.task_id);
203 self.0.task_id = 0;
204 }
205 }
206}
207
208impl<T> From<JoinHandle<T>> for Task<T> {
209 fn from(value: JoinHandle<T>) -> Self {
210 Self(value)
211 }
212}
213
214/// Offload a blocking function call onto a different thread.
215///
216/// This function can be called from an asynchronous function without blocking
217/// it, returning a future that can be `.await`ed normally. The provided
218/// function should contain at least one blocking operation, such as:
219///
220/// - A synchronous syscall that does not yet have an async counterpart.
221/// - A compute operation which risks blocking the executor for an unacceptable
222/// amount of time.
223///
224/// If neither of these conditions are satisfied, just call the function normally,
225/// as synchronous functions themselves are allowed within an async context,
226/// as long as they are not blocking.
227///
228/// If you have an async function that may block, refactor the function such that
229/// the blocking operations are offloaded onto the function passed to [`unblock`].
230///
231/// NOTE:
232///
233/// - The input function should not interact with the executor. Attempting to do so
234/// can cause runtime errors. This includes spawning, creating new executors,
235/// passing futures between the input function and the calling context, and
236/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
237/// - Synchronous functions cannot be cancelled and may keep running after
238/// the returned future is dropped. As a result, resources held by the function
239/// should be assumed to be held until the returned future completes.
240/// - This function assumes panic=abort semantics, so if the input function panics,
241/// the process aborts. Behavior for panic=unwind is not defined.
242// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
243// spawning new threads if this proves to be a bottleneck.
244pub fn unblock<T: 'static + Send>(
245 f: impl 'static + Send + FnOnce() -> T,
246) -> impl 'static + Send + Future<Output = T> {
247 let (tx, rx) = futures::channel::oneshot::channel();
248 std::thread::spawn(move || {
249 let _ = tx.send(f());
250 });
251 rx.map(|r| r.unwrap())
252}
253
254#[cfg(test)]
255mod tests {
256 use super::super::executor::{LocalExecutor, SendExecutor};
257 use super::*;
258 use std::sync::{Arc, Mutex};
259
260 /// This struct holds a thread-safe mutable boolean and
261 /// sets its value to true when dropped.
262 #[derive(Clone)]
263 struct SetsBoolTrueOnDrop {
264 value: Arc<Mutex<bool>>,
265 }
266
267 impl SetsBoolTrueOnDrop {
268 fn new() -> (Self, Arc<Mutex<bool>>) {
269 let value = Arc::new(Mutex::new(false));
270 let sets_bool_true_on_drop = Self { value: value.clone() };
271 (sets_bool_true_on_drop, value)
272 }
273 }
274
275 impl Drop for SetsBoolTrueOnDrop {
276 fn drop(&mut self) {
277 let mut lock = self.value.lock().unwrap();
278 *lock = true;
279 }
280 }
281
282 #[test]
283 #[should_panic]
284 fn spawn_from_unblock_fails() {
285 // no executor in the off-thread, so spawning fails
286 SendExecutor::new(2).run(async move {
287 unblock(|| {
288 #[allow(clippy::let_underscore_future)]
289 let _ = Task::spawn(async {});
290 })
291 .await;
292 });
293 }
294
295 #[test]
296 fn future_destroyed_before_await_returns() {
297 LocalExecutor::new().run_singlethreaded(async {
298 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
299
300 // Move the switch into a different thread.
301 // Once we return from this await, that switch should have been dropped.
302 unblock(move || {
303 let lock = sets_bool_true_on_drop.value.lock().unwrap();
304 assert!(!*lock);
305 })
306 .await;
307
308 // Switch moved into the future should have been dropped at this point.
309 // The value of the boolean should now be true.
310 let lock = value.lock().unwrap();
311 assert!(*lock);
312 });
313 }
314}