fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::EHandle;
6use crate::runtime::fuchsia::executor::TaskHandle;
7use crate::runtime::fuchsia::scope::JoinError;
8use crate::scope::ScopeHandle;
9use futures::prelude::*;
10use std::future::poll_fn;
11use std::marker::PhantomData;
12use std::mem::ManuallyDrop;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// A handle to a future that is owned and polled by the executor.
17///
18/// Once a task is created, the executor will poll it until done, even if the task handle itself is
19/// not polled.
20///
21/// NOTE: When a JoinHandle is dropped, its future will be detached.
22///
23/// Polling (or attempting to extract the value from) a task after the executor is dropped may
24/// trigger a panic.
25#[derive(Debug)]
26// LINT.IfChange
27pub struct JoinHandle<T> {
28 scope: ScopeHandle,
29 task: Option<TaskHandle>,
30 phantom: PhantomData<T>,
31}
32// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
33
34impl<T> Unpin for JoinHandle<T> {}
35
36impl<T> JoinHandle<T> {
37 pub(crate) fn new(scope: ScopeHandle, task: TaskHandle) -> Self {
38 Self { scope, task: Some(task), phantom: PhantomData }
39 }
40
41 /// Aborts a task and returns a future that resolves once the task is
42 /// aborted. The future can be ignored in which case the task will still be
43 /// aborted.
44 pub fn abort(mut self) -> impl Future<Output = Option<T>> {
45 // SAFETY: We spawned the task so the return type should be correct.
46 let result = self.task.as_ref().and_then(|t| unsafe { self.scope.abort_task(t) });
47 // TODO(https://fxbug.dev/452064816): The compiler throws a false
48 // positive linter warning because it thinks that `self.task = None;` is
49 // never read, even though it is read in the Drop implementation below.
50 #[allow(unused_assignments)]
51 async move {
52 match result {
53 Some(output) => Some(output),
54 None => {
55 // If we are dropped from here, we'll end up calling `abort_and_detach`.
56 let result = std::future::poll_fn(|cx| {
57 let Some(task) = self.task.as_ref() else {
58 return Poll::Ready(None);
59 };
60 // SAFETY: We spawned the task so the return type should be correct.
61 unsafe { self.scope.poll_aborted(task, cx) }
62 })
63 .await;
64 self.task = None;
65 result
66 }
67 }
68 }
69 }
70}
71
72impl<T> Drop for JoinHandle<T> {
73 fn drop(&mut self) {
74 if let Some(task) = &self.task {
75 self.scope.detach(task);
76 }
77 }
78}
79
80impl<T: 'static> Future for JoinHandle<T> {
81 type Output = T;
82 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 // SAFETY: We spawned the task so the return type should be correct.
84 let result = unsafe { self.scope.poll_join_result(self.task.as_ref().unwrap(), cx) };
85 if result.is_ready() {
86 self.task = None;
87 }
88 result
89 }
90}
91
92/// A `JoinHandle` which returns `Err` when canceled instead of pending forever.
93#[derive(Debug)]
94pub struct CancelableJoinHandle<T> {
95 inner: JoinHandle<T>,
96}
97
98impl<T> CancelableJoinHandle<T> {
99 /// Aborts a task and returns a future that resolves once the task is
100 /// aborted. The future can be ignored in which case the task will still be
101 /// aborted.
102 pub fn abort(self) -> impl Future<Output = Option<T>> {
103 self.inner.abort()
104 }
105}
106
107impl<T> From<JoinHandle<T>> for CancelableJoinHandle<T> {
108 fn from(inner: JoinHandle<T>) -> Self {
109 Self { inner }
110 }
111}
112
113impl<T: 'static> Future for CancelableJoinHandle<T> {
114 type Output = Result<T, JoinError>;
115
116 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117 // SAFETY: We spawned the task so the return type should be correct.
118 let result =
119 unsafe { self.inner.scope.try_poll_join_result(self.inner.task.as_ref().unwrap(), cx) };
120 if result.is_ready() {
121 self.inner.task = None;
122 }
123 result
124 }
125}
126
127/// This is the same as a JoinHandle, except that the future will be aborted when the task is
128/// dropped.
129#[must_use]
130#[repr(transparent)]
131#[derive(Debug)]
132// LINT.IfChange
133pub struct Task<T>(JoinHandle<T>);
134// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
135
136impl<T> Task<T> {
137 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
138 pub fn detach_on_drop(self) -> JoinHandle<T> {
139 let this = ManuallyDrop::new(self);
140 // SAFETY: We are bypassing our drop implementation.
141 unsafe { std::ptr::read(&this.0) }
142 }
143}
144
145impl Task<()> {
146 /// Detach this task so that it can run independently in the background.
147 ///
148 /// *Note*: This is usually not what you want. This API severs the control flow from the
149 /// caller. This can result in flaky tests and makes it impossible to return values
150 /// (including errors).
151 ///
152 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
153 ///
154 /// You can also use other futures combinators such as:
155 ///
156 /// * [`futures::future::join`]
157 /// * [`futures::future::select`]
158 /// * [`futures::select`]
159 ///
160 /// or their error-aware variants
161 ///
162 /// * [`futures::future::try_join`]
163 /// * [`futures::future::try_select`]
164 ///
165 /// or their stream counterparts
166 ///
167 /// * [`futures::stream::StreamExt::for_each`]
168 /// * [`futures::stream::StreamExt::for_each_concurrent`]
169 /// * [`futures::stream::TryStreamExt::try_for_each`]
170 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
171 ///
172 /// can meet your needs.
173 pub fn detach(mut self) {
174 self.0.scope.detach(self.0.task.as_ref().unwrap());
175 self.0.task = None;
176 }
177}
178
179impl<T: Send + 'static> Task<T> {
180 /// Spawn a new task on the global scope of the current executor.
181 ///
182 /// The task may be executed on any thread(s) owned by the current executor.
183 /// See [`Task::local`] for an equivalent that ensures locality.
184 ///
185 /// The passed future will live until either (a) the future completes,
186 /// (b) the returned [`Task`] is dropped while the executor is running, or
187 /// (c) the executor is destroyed; whichever comes first.
188 ///
189 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
190 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
191 ///
192 /// # Panics
193 ///
194 /// May panic if not called in the context of an executor (e.g. within a
195 /// call to [`run`][crate::SendExecutor::run]).
196 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
197 EHandle::local().global_scope().compute(future)
198 }
199}
200
201impl<T: 'static> Task<T> {
202 /// Spawn a new task on the global scope of the thread local executor.
203 ///
204 /// The passed future will live until either (a) the future completes,
205 /// (b) the returned [`Task`] is dropped while the executor is running, or
206 /// (c) the executor is destroyed; whichever comes first.
207 ///
208 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
209 /// runtime panic. Use [`Task::spawn`] instead.
210 ///
211 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
212 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
213 ///
214 /// # Panics
215 ///
216 /// May panic if not called in the context of an executor (e.g. within a
217 /// call to [`run`][crate::SendExecutor::run]).
218 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
219 EHandle::local().global_scope().compute_local(future)
220 }
221}
222
223impl<T: 'static> Task<T> {
224 /// Aborts a task and returns a future that resolves once the task is
225 /// aborted. The future can be ignored in which case the task will still be
226 /// aborted.
227 pub fn abort(self) -> impl Future<Output = Option<T>> {
228 self.detach_on_drop().abort()
229 }
230}
231
232impl<T: 'static> Future for Task<T> {
233 type Output = T;
234 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235 // SAFETY: We spawned the task so the return type should be correct.
236 let result = unsafe { self.0.scope.poll_join_result(self.0.task.as_ref().unwrap(), cx) };
237 if result.is_ready() {
238 self.0.task = None;
239 }
240 result
241 }
242}
243
244impl<T> Drop for Task<T> {
245 fn drop(&mut self) {
246 if let Some(task) = &self.0.task {
247 self.0.scope.abort_and_detach(task);
248 self.0.task = None;
249 }
250 }
251}
252
253impl<T> From<JoinHandle<T>> for Task<T> {
254 fn from(value: JoinHandle<T>) -> Self {
255 Self(value)
256 }
257}
258
259/// Offload a blocking function call onto a different thread.
260///
261/// This function can be called from an asynchronous function without blocking
262/// it, returning a future that can be `.await`ed normally. The provided
263/// function should contain at least one blocking operation, such as:
264///
265/// - A synchronous syscall that does not yet have an async counterpart.
266/// - A compute operation which risks blocking the executor for an unacceptable
267/// amount of time.
268///
269/// If neither of these conditions are satisfied, just call the function normally,
270/// as synchronous functions themselves are allowed within an async context,
271/// as long as they are not blocking.
272///
273/// If you have an async function that may block, refactor the function such that
274/// the blocking operations are offloaded onto the function passed to [`unblock`].
275///
276/// NOTE:
277///
278/// - The input function should not interact with the executor. Attempting to do so
279/// can cause runtime errors. This includes spawning, creating new executors,
280/// passing futures between the input function and the calling context, and
281/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
282/// - Synchronous functions cannot be cancelled and may keep running after
283/// the returned future is dropped. As a result, resources held by the function
284/// should be assumed to be held until the returned future completes.
285/// - This function assumes panic=abort semantics, so if the input function panics,
286/// the process aborts. Behavior for panic=unwind is not defined.
287// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
288// spawning new threads if this proves to be a bottleneck.
289pub fn unblock<T: 'static + Send>(
290 f: impl 'static + Send + FnOnce() -> T,
291) -> impl 'static + Send + Future<Output = T> {
292 let (tx, rx) = futures::channel::oneshot::channel();
293 std::thread::spawn(move || {
294 let _ = tx.send(f());
295 });
296 rx.map(|r| r.unwrap())
297}
298
299/// Yields execution back to the runtime.
300pub async fn yield_now() {
301 let mut done = false;
302 poll_fn(|cx| {
303 if done {
304 Poll::Ready(())
305 } else {
306 done = true;
307 cx.waker().wake_by_ref();
308 Poll::Pending
309 }
310 })
311 .await;
312}
313
314#[cfg(test)]
315mod tests {
316 use super::super::executor::{LocalExecutor, SendExecutorBuilder};
317 use super::*;
318 use fuchsia_sync::Mutex;
319 use std::sync::Arc;
320
321 /// This struct holds a thread-safe mutable boolean and
322 /// sets its value to true when dropped.
323 #[derive(Clone)]
324 struct SetsBoolTrueOnDrop {
325 value: Arc<Mutex<bool>>,
326 }
327
328 impl SetsBoolTrueOnDrop {
329 fn new() -> (Self, Arc<Mutex<bool>>) {
330 let value = Arc::new(Mutex::new(false));
331 let sets_bool_true_on_drop = Self { value: value.clone() };
332 (sets_bool_true_on_drop, value)
333 }
334 }
335
336 impl Drop for SetsBoolTrueOnDrop {
337 fn drop(&mut self) {
338 let mut lock = self.value.lock();
339 *lock = true;
340 }
341 }
342
343 #[test]
344 #[should_panic]
345 fn spawn_from_unblock_fails() {
346 // no executor in the off-thread, so spawning fails
347 SendExecutorBuilder::new().num_threads(2).build().run(async move {
348 unblock(|| {
349 #[allow(clippy::let_underscore_future)]
350 let _ = Task::spawn(async {});
351 })
352 .await;
353 });
354 }
355
356 #[test]
357 fn future_destroyed_before_await_returns() {
358 LocalExecutor::default().run_singlethreaded(async {
359 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
360
361 // Move the switch into a different thread.
362 // Once we return from this await, that switch should have been dropped.
363 unblock(move || {
364 let lock = sets_bool_true_on_drop.value.lock();
365 assert!(!*lock);
366 })
367 .await;
368
369 // Switch moved into the future should have been dropped at this point.
370 // The value of the boolean should now be true.
371 let lock = value.lock();
372 assert!(*lock);
373 });
374 }
375}