fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::marker::PhantomData;
9use std::mem::ManuallyDrop;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// A handle to a future that is owned and polled by the executor.
14///
15/// Once a task is created, the executor will poll it until done, even if the task handle itself is
16/// not polled.
17///
18/// NOTE: When a JoinHandle is dropped, its future will be detached.
19///
20/// Polling (or attempting to extract the value from) a task after the executor is dropped may
21/// trigger a panic.
22#[derive(Debug)]
23// LINT.IfChange
24pub struct JoinHandle<T> {
25 scope: ScopeHandle,
26 task_id: usize,
27 phantom: PhantomData<T>,
28}
29// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
30
31impl<T> Unpin for JoinHandle<T> {}
32
33impl<T> JoinHandle<T> {
34 pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
35 Self { scope, task_id, phantom: PhantomData }
36 }
37
38 /// Cancel a task and returns a future that resolves once the cancellation is complete. The
39 /// future can be ignored in which case the task will still be cancelled.
40 pub fn cancel(mut self) -> impl Future<Output = Option<T>> {
41 // SAFETY: We spawned the task so the return type should be correct.
42 let result = unsafe { self.scope.cancel_task(self.task_id) };
43 async move {
44 match result {
45 Some(output) => Some(output),
46 None => {
47 // If we are dropped from here, we'll end up calling `cancel_and_detach`.
48 let result = std::future::poll_fn(|cx| {
49 // SAFETY: We spawned the task so the return type should be correct.
50 unsafe { self.scope.poll_cancelled(self.task_id, cx) }
51 })
52 .await;
53 self.task_id = 0;
54 result
55 }
56 }
57 }
58 }
59}
60
61impl<T> Drop for JoinHandle<T> {
62 fn drop(&mut self) {
63 if self.task_id != 0 {
64 self.scope.detach(self.task_id);
65 }
66 }
67}
68
69impl<T: 'static> Future for JoinHandle<T> {
70 type Output = T;
71 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 // SAFETY: We spawned the task so the return type should be correct.
73 let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
74 if result.is_ready() {
75 self.task_id = 0;
76 }
77 result
78 }
79}
80
81/// This is the same as a JoinHandle, except that the future will be cancelled when the task is
82/// dropped.
83#[must_use]
84#[repr(transparent)]
85#[derive(Debug)]
86// LINT.IfChange
87pub struct Task<T>(JoinHandle<T>);
88// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
89
90impl<T> Task<T> {
91 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
92 pub fn detach_on_drop(self) -> JoinHandle<T> {
93 let this = ManuallyDrop::new(self);
94 // SAFETY: We are bypassing our drop implementation.
95 unsafe { std::ptr::read(&this.0) }
96 }
97}
98
99impl Task<()> {
100 /// Detach this task so that it can run independently in the background.
101 ///
102 /// *Note*: This is usually not what you want. This API severs the control flow from the
103 /// caller. This can result in flaky tests and makes it impossible to return values
104 /// (including errors).
105 ///
106 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
107 ///
108 /// You can also use other futures combinators such as:
109 ///
110 /// * [`futures::future::join`]
111 /// * [`futures::future::select`]
112 /// * [`futures::select`]
113 ///
114 /// or their error-aware variants
115 ///
116 /// * [`futures::future::try_join`]
117 /// * [`futures::future::try_select`]
118 ///
119 /// or their stream counterparts
120 ///
121 /// * [`futures::stream::StreamExt::for_each`]
122 /// * [`futures::stream::StreamExt::for_each_concurrent`]
123 /// * [`futures::stream::TryStreamExt::try_for_each`]
124 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
125 ///
126 /// can meet your needs.
127 pub fn detach(mut self) {
128 self.0.scope.detach(self.0.task_id);
129 self.0.task_id = 0;
130 }
131}
132
133impl<T: Send + 'static> Task<T> {
134 /// Spawn a new task on the global scope of the current executor.
135 ///
136 /// The task may be executed on any thread(s) owned by the current executor.
137 /// See [`Task::local`] for an equivalent that ensures locality.
138 ///
139 /// The passed future will live until either (a) the future completes,
140 /// (b) the returned [`Task`] is dropped while the executor is running, or
141 /// (c) the executor is destroyed; whichever comes first.
142 ///
143 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
144 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
145 ///
146 /// # Panics
147 ///
148 /// May panic if not called in the context of an executor (e.g. within a
149 /// call to [`run`][crate::SendExecutor::run]).
150 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
151 EHandle::local().global_scope().compute(future)
152 }
153}
154
155impl<T: 'static> Task<T> {
156 /// Spawn a new task on the global scope of the thread local executor.
157 ///
158 /// The passed future will live until either (a) the future completes,
159 /// (b) the returned [`Task`] is dropped while the executor is running, or
160 /// (c) the executor is destroyed; whichever comes first.
161 ///
162 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
163 /// runtime panic. Use [`Task::spawn`] instead.
164 ///
165 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
166 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
167 ///
168 /// # Panics
169 ///
170 /// May panic if not called in the context of an executor (e.g. within a
171 /// call to [`run`][crate::SendExecutor::run]).
172 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
173 EHandle::local().global_scope().compute_local(future)
174 }
175}
176
177impl<T: 'static> Task<T> {
178 /// Cancel a task and returns a future that resolves once the cancellation is complete. The
179 /// future can be ignored in which case the task will still be cancelled.
180 pub fn cancel(self) -> impl Future<Output = Option<T>> {
181 self.detach_on_drop().cancel()
182 }
183}
184
185impl<T: 'static> Future for Task<T> {
186 type Output = T;
187 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 // SAFETY: We spawned the task so the return type should be correct.
189 let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
190 if result.is_ready() {
191 self.0.task_id = 0;
192 }
193 result
194 }
195}
196
197impl<T> Drop for Task<T> {
198 fn drop(&mut self) {
199 if self.0.task_id != 0 {
200 self.0.scope.cancel_and_detach(self.0.task_id);
201 self.0.task_id = 0;
202 }
203 }
204}
205
206impl<T> From<JoinHandle<T>> for Task<T> {
207 fn from(value: JoinHandle<T>) -> Self {
208 Self(value)
209 }
210}
211
212/// Offload a blocking function call onto a different thread.
213///
214/// This function can be called from an asynchronous function without blocking
215/// it, returning a future that can be `.await`ed normally. The provided
216/// function should contain at least one blocking operation, such as:
217///
218/// - A synchronous syscall that does not yet have an async counterpart.
219/// - A compute operation which risks blocking the executor for an unacceptable
220/// amount of time.
221///
222/// If neither of these conditions are satisfied, just call the function normally,
223/// as synchronous functions themselves are allowed within an async context,
224/// as long as they are not blocking.
225///
226/// If you have an async function that may block, refactor the function such that
227/// the blocking operations are offloaded onto the function passed to [`unblock`].
228///
229/// NOTE:
230///
231/// - The input function should not interact with the executor. Attempting to do so
232/// can cause runtime errors. This includes spawning, creating new executors,
233/// passing futures between the input function and the calling context, and
234/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
235/// - Synchronous functions cannot be cancelled and may keep running after
236/// the returned future is dropped. As a result, resources held by the function
237/// should be assumed to be held until the returned future completes.
238/// - This function assumes panic=abort semantics, so if the input function panics,
239/// the process aborts. Behavior for panic=unwind is not defined.
240// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
241// spawning new threads if this proves to be a bottleneck.
242pub fn unblock<T: 'static + Send>(
243 f: impl 'static + Send + FnOnce() -> T,
244) -> impl 'static + Send + Future<Output = T> {
245 let (tx, rx) = futures::channel::oneshot::channel();
246 std::thread::spawn(move || {
247 let _ = tx.send(f());
248 });
249 rx.map(|r| r.unwrap())
250}
251
252#[cfg(test)]
253mod tests {
254 use super::super::executor::{LocalExecutor, SendExecutor};
255 use super::*;
256 use std::sync::{Arc, Mutex};
257
258 /// This struct holds a thread-safe mutable boolean and
259 /// sets its value to true when dropped.
260 #[derive(Clone)]
261 struct SetsBoolTrueOnDrop {
262 value: Arc<Mutex<bool>>,
263 }
264
265 impl SetsBoolTrueOnDrop {
266 fn new() -> (Self, Arc<Mutex<bool>>) {
267 let value = Arc::new(Mutex::new(false));
268 let sets_bool_true_on_drop = Self { value: value.clone() };
269 (sets_bool_true_on_drop, value)
270 }
271 }
272
273 impl Drop for SetsBoolTrueOnDrop {
274 fn drop(&mut self) {
275 let mut lock = self.value.lock().unwrap();
276 *lock = true;
277 }
278 }
279
280 #[test]
281 #[should_panic]
282 fn spawn_from_unblock_fails() {
283 // no executor in the off-thread, so spawning fails
284 SendExecutor::new(2).run(async move {
285 unblock(|| {
286 let _ = Task::spawn(async {});
287 })
288 .await;
289 });
290 }
291
292 #[test]
293 fn future_destroyed_before_await_returns() {
294 LocalExecutor::new().run_singlethreaded(async {
295 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
296
297 // Move the switch into a different thread.
298 // Once we return from this await, that switch should have been dropped.
299 unblock(move || {
300 let lock = sets_bool_true_on_drop.value.lock().unwrap();
301 assert_eq!(*lock, false);
302 })
303 .await;
304
305 // Switch moved into the future should have been dropped at this point.
306 // The value of the boolean should now be true.
307 let lock = value.lock().unwrap();
308 assert_eq!(*lock, true);
309 });
310 }
311}