fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::runtime::fuchsia::scope::JoinError;
7use crate::scope::ScopeHandle;
8use futures::prelude::*;
9use std::future::poll_fn;
10use std::marker::PhantomData;
11use std::mem::ManuallyDrop;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15/// A handle to a future that is owned and polled by the executor.
16///
17/// Once a task is created, the executor will poll it until done, even if the task handle itself is
18/// not polled.
19///
20/// NOTE: When a JoinHandle is dropped, its future will be detached.
21///
22/// Polling (or attempting to extract the value from) a task after the executor is dropped may
23/// trigger a panic.
24#[derive(Debug)]
25// LINT.IfChange
26pub struct JoinHandle<T> {
27 scope: ScopeHandle,
28 task_id: usize,
29 phantom: PhantomData<T>,
30}
31// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
32
33impl<T> Unpin for JoinHandle<T> {}
34
35impl<T> JoinHandle<T> {
36 pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
37 Self { scope, task_id, phantom: PhantomData }
38 }
39
40 /// Aborts a task and returns a future that resolves once the task is
41 /// aborted. The future can be ignored in which case the task will still be
42 /// aborted.
43 pub fn abort(mut self) -> impl Future<Output = Option<T>> {
44 // SAFETY: We spawned the task so the return type should be correct.
45 let result = unsafe { self.scope.abort_task(self.task_id) };
46 // TODO(https://fxbug.dev/452064816): The compiler throws a false
47 // positive linter warning because it thinks that `self.task_id = 0;` is
48 // never read, even though it is read in the Drop implementation below.
49 #[allow(unused_assignments)]
50 async move {
51 match result {
52 Some(output) => Some(output),
53 None => {
54 // If we are dropped from here, we'll end up calling `abort_and_detach`.
55 let result = std::future::poll_fn(|cx| {
56 // SAFETY: We spawned the task so the return type should be correct.
57 unsafe { self.scope.poll_aborted(self.task_id, cx) }
58 })
59 .await;
60 self.task_id = 0;
61 result
62 }
63 }
64 }
65 }
66}
67
68impl<T> Drop for JoinHandle<T> {
69 fn drop(&mut self) {
70 if self.task_id != 0 {
71 self.scope.detach(self.task_id);
72 }
73 }
74}
75
76impl<T: 'static> Future for JoinHandle<T> {
77 type Output = T;
78 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 // SAFETY: We spawned the task so the return type should be correct.
80 let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
81 if result.is_ready() {
82 self.task_id = 0;
83 }
84 result
85 }
86}
87
88/// A `JoinHandle` which returns `Err` when canceled instead of pending forever.
89#[derive(Debug)]
90pub struct CancelableJoinHandle<T> {
91 inner: JoinHandle<T>,
92}
93
94impl<T> CancelableJoinHandle<T> {
95 /// Aborts a task and returns a future that resolves once the task is
96 /// aborted. The future can be ignored in which case the task will still be
97 /// aborted.
98 pub fn abort(self) -> impl Future<Output = Option<T>> {
99 self.inner.abort()
100 }
101}
102
103impl<T> From<JoinHandle<T>> for CancelableJoinHandle<T> {
104 fn from(inner: JoinHandle<T>) -> Self {
105 Self { inner }
106 }
107}
108
109impl<T: 'static> Future for CancelableJoinHandle<T> {
110 type Output = Result<T, JoinError>;
111
112 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113 // SAFETY: We spawned the task so the return type should be correct.
114 let result = unsafe { self.inner.scope.try_poll_join_result(self.inner.task_id, cx) };
115 if result.is_ready() {
116 self.inner.task_id = 0;
117 }
118 result
119 }
120}
121
122/// This is the same as a JoinHandle, except that the future will be aborted when the task is
123/// dropped.
124#[must_use]
125#[repr(transparent)]
126#[derive(Debug)]
127// LINT.IfChange
128pub struct Task<T>(JoinHandle<T>);
129// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
130
131impl<T> Task<T> {
132 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
133 pub fn detach_on_drop(self) -> JoinHandle<T> {
134 let this = ManuallyDrop::new(self);
135 // SAFETY: We are bypassing our drop implementation.
136 unsafe { std::ptr::read(&this.0) }
137 }
138}
139
140impl Task<()> {
141 /// Detach this task so that it can run independently in the background.
142 ///
143 /// *Note*: This is usually not what you want. This API severs the control flow from the
144 /// caller. This can result in flaky tests and makes it impossible to return values
145 /// (including errors).
146 ///
147 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
148 ///
149 /// You can also use other futures combinators such as:
150 ///
151 /// * [`futures::future::join`]
152 /// * [`futures::future::select`]
153 /// * [`futures::select`]
154 ///
155 /// or their error-aware variants
156 ///
157 /// * [`futures::future::try_join`]
158 /// * [`futures::future::try_select`]
159 ///
160 /// or their stream counterparts
161 ///
162 /// * [`futures::stream::StreamExt::for_each`]
163 /// * [`futures::stream::StreamExt::for_each_concurrent`]
164 /// * [`futures::stream::TryStreamExt::try_for_each`]
165 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
166 ///
167 /// can meet your needs.
168 pub fn detach(mut self) {
169 self.0.scope.detach(self.0.task_id);
170 self.0.task_id = 0;
171 }
172}
173
174impl<T: Send + 'static> Task<T> {
175 /// Spawn a new task on the global scope of the current executor.
176 ///
177 /// The task may be executed on any thread(s) owned by the current executor.
178 /// See [`Task::local`] for an equivalent that ensures locality.
179 ///
180 /// The passed future will live until either (a) the future completes,
181 /// (b) the returned [`Task`] is dropped while the executor is running, or
182 /// (c) the executor is destroyed; whichever comes first.
183 ///
184 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
185 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
186 ///
187 /// # Panics
188 ///
189 /// May panic if not called in the context of an executor (e.g. within a
190 /// call to [`run`][crate::SendExecutor::run]).
191 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
192 EHandle::local().global_scope().compute(future)
193 }
194}
195
196impl<T: 'static> Task<T> {
197 /// Spawn a new task on the global scope of the thread local executor.
198 ///
199 /// The passed future will live until either (a) the future completes,
200 /// (b) the returned [`Task`] is dropped while the executor is running, or
201 /// (c) the executor is destroyed; whichever comes first.
202 ///
203 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
204 /// runtime panic. Use [`Task::spawn`] instead.
205 ///
206 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
207 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
208 ///
209 /// # Panics
210 ///
211 /// May panic if not called in the context of an executor (e.g. within a
212 /// call to [`run`][crate::SendExecutor::run]).
213 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
214 EHandle::local().global_scope().compute_local(future)
215 }
216}
217
218impl<T: 'static> Task<T> {
219 /// Aborts a task and returns a future that resolves once the task is
220 /// aborted. The future can be ignored in which case the task will still be
221 /// aborted.
222 pub fn abort(self) -> impl Future<Output = Option<T>> {
223 self.detach_on_drop().abort()
224 }
225}
226
227impl<T: 'static> Future for Task<T> {
228 type Output = T;
229 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230 // SAFETY: We spawned the task so the return type should be correct.
231 let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
232 if result.is_ready() {
233 self.0.task_id = 0;
234 }
235 result
236 }
237}
238
239impl<T> Drop for Task<T> {
240 fn drop(&mut self) {
241 if self.0.task_id != 0 {
242 self.0.scope.abort_and_detach(self.0.task_id);
243 self.0.task_id = 0;
244 }
245 }
246}
247
248impl<T> From<JoinHandle<T>> for Task<T> {
249 fn from(value: JoinHandle<T>) -> Self {
250 Self(value)
251 }
252}
253
254/// Offload a blocking function call onto a different thread.
255///
256/// This function can be called from an asynchronous function without blocking
257/// it, returning a future that can be `.await`ed normally. The provided
258/// function should contain at least one blocking operation, such as:
259///
260/// - A synchronous syscall that does not yet have an async counterpart.
261/// - A compute operation which risks blocking the executor for an unacceptable
262/// amount of time.
263///
264/// If neither of these conditions are satisfied, just call the function normally,
265/// as synchronous functions themselves are allowed within an async context,
266/// as long as they are not blocking.
267///
268/// If you have an async function that may block, refactor the function such that
269/// the blocking operations are offloaded onto the function passed to [`unblock`].
270///
271/// NOTE:
272///
273/// - The input function should not interact with the executor. Attempting to do so
274/// can cause runtime errors. This includes spawning, creating new executors,
275/// passing futures between the input function and the calling context, and
276/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
277/// - Synchronous functions cannot be cancelled and may keep running after
278/// the returned future is dropped. As a result, resources held by the function
279/// should be assumed to be held until the returned future completes.
280/// - This function assumes panic=abort semantics, so if the input function panics,
281/// the process aborts. Behavior for panic=unwind is not defined.
282// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
283// spawning new threads if this proves to be a bottleneck.
284pub fn unblock<T: 'static + Send>(
285 f: impl 'static + Send + FnOnce() -> T,
286) -> impl 'static + Send + Future<Output = T> {
287 let (tx, rx) = futures::channel::oneshot::channel();
288 std::thread::spawn(move || {
289 let _ = tx.send(f());
290 });
291 rx.map(|r| r.unwrap())
292}
293
294/// Yields execution back to the runtime.
295pub async fn yield_now() {
296 let mut done = false;
297 poll_fn(|cx| {
298 if done {
299 Poll::Ready(())
300 } else {
301 done = true;
302 cx.waker().wake_by_ref();
303 Poll::Pending
304 }
305 })
306 .await;
307}
308
309#[cfg(test)]
310mod tests {
311 use super::super::executor::{LocalExecutor, SendExecutorBuilder};
312 use super::*;
313 use fuchsia_sync::Mutex;
314 use std::sync::Arc;
315
316 /// This struct holds a thread-safe mutable boolean and
317 /// sets its value to true when dropped.
318 #[derive(Clone)]
319 struct SetsBoolTrueOnDrop {
320 value: Arc<Mutex<bool>>,
321 }
322
323 impl SetsBoolTrueOnDrop {
324 fn new() -> (Self, Arc<Mutex<bool>>) {
325 let value = Arc::new(Mutex::new(false));
326 let sets_bool_true_on_drop = Self { value: value.clone() };
327 (sets_bool_true_on_drop, value)
328 }
329 }
330
331 impl Drop for SetsBoolTrueOnDrop {
332 fn drop(&mut self) {
333 let mut lock = self.value.lock();
334 *lock = true;
335 }
336 }
337
338 #[test]
339 #[should_panic]
340 fn spawn_from_unblock_fails() {
341 // no executor in the off-thread, so spawning fails
342 SendExecutorBuilder::new().num_threads(2).build().run(async move {
343 unblock(|| {
344 #[allow(clippy::let_underscore_future)]
345 let _ = Task::spawn(async {});
346 })
347 .await;
348 });
349 }
350
351 #[test]
352 fn future_destroyed_before_await_returns() {
353 LocalExecutor::default().run_singlethreaded(async {
354 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
355
356 // Move the switch into a different thread.
357 // Once we return from this await, that switch should have been dropped.
358 unblock(move || {
359 let lock = sets_bool_true_on_drop.value.lock();
360 assert!(!*lock);
361 })
362 .await;
363
364 // Switch moved into the future should have been dropped at this point.
365 // The value of the boolean should now be true.
366 let lock = value.lock();
367 assert!(*lock);
368 });
369 }
370}