libasync_dispatcher/lib.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7use libasync_sys::*;
8
9use core::cell::UnsafeCell;
10use core::future::Future;
11use core::marker::PhantomData;
12use core::ptr::NonNull;
13use std::sync::{Arc, Weak};
14
15use zx_status::Status;
16use zx_types::zx_time_t;
17
18mod task;
19
20pub use task::*;
21
22/// An unowned reference to a driver runtime dispatcher such as is produced by calling
23/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
24/// leaving that up to the driver runtime or another owner.
25#[derive(Debug)]
26pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
27
28unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
29unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
30
31impl<'a> AsyncDispatcherRef<'a> {
32 /// Creates a dispatcher ref from a raw ptr.
33 ///
34 /// # Safety
35 ///
36 /// Caller is responsible for ensuring that the given ptr is valid for
37 /// the lifetime `'a`.
38 pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
39 // SAFETY: Caller promises the ptr is valid.
40 Self(ptr, PhantomData)
41 }
42
43 /// Gets the inner pointer to the dispatcher struct.
44 pub fn inner(&self) -> NonNull<async_dispatcher_t> {
45 self.0
46 }
47}
48
49impl<'a> Clone for AsyncDispatcherRef<'a> {
50 fn clone(&self) -> Self {
51 Self(self.0, PhantomData)
52 }
53}
54
55/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
56pub trait AsyncDispatcher: Send + Sync {
57 /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
58 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
59
60 /// Schedules the callback [`p`] to be run on this dispatcher later.
61 fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
62 #[expect(clippy::arc_with_non_send_sync)]
63 let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
64 task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
65 func: Box::new(p),
66 }));
67
68 let task_cell = Arc::into_raw(task_arc);
69 // SAFETY: we need a raw mut pointer to give to async_post_task. From
70 // when we call that function to when the task is cancelled or the
71 // callback is called, the driver runtime owns the contents of that
72 // object and we will not manipulate it. So even though the Arc only
73 // gives us a shared reference, it's fine to give the runtime a
74 // mutable pointer to it.
75 let res = unsafe {
76 let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
77 Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
78 };
79 if res.is_err() {
80 // SAFETY: `TaskFunc::call` will never be called now so dispose of
81 // the long-lived reference we just created.
82 unsafe { Arc::decrement_strong_count(task_cell) }
83 }
84 res
85 }
86
87 /// Returns the current time on the dispatcher's timeline
88 fn now(&self) -> zx_time_t {
89 let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
90 unsafe { async_now(async_dispatcher) }
91 }
92}
93
94impl<'a> AsyncDispatcher for AsyncDispatcherRef<'a> {
95 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
96 self.clone()
97 }
98}
99
100/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
101pub trait OnDispatcher: Clone + Send + Sync {
102 /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
103 /// If the dispatcher is no longer valid, the callback will be given [`None`].
104 ///
105 /// Note that it is *very important* that no blocking work be done in this callback to prevent
106 /// long lived strong references to dispatchers that might be shutting down.
107 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
108
109 /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
110 /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
111 ///
112 /// Note that it is *very important* that no blocking work be done in this callback to prevent
113 /// long lived strong references to dispatchers that might be shutting down.
114 fn on_maybe_dispatcher<R, E: From<Status>>(
115 &self,
116 f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
117 ) -> Result<R, E> {
118 self.on_dispatcher(|dispatcher| {
119 let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
120 f(dispatcher)
121 })
122 }
123
124 /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
125 /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
126 /// shuts down. The returned future's result will be [`Ok`] if the future completed
127 /// successfully, or an [`Err`] if the task did not complete for some reason (like the
128 /// dispatcher shut down).
129 ///
130 /// Returns a [`JoinHandle`] that will detach the future when dropped.
131 fn spawn(
132 &self,
133 future: impl Future<Output = ()> + Send + 'static,
134 ) -> Result<JoinHandle<()>, Status>
135 where
136 Self: 'static,
137 {
138 Task::try_start(future, self.clone()).map(Task::detach_on_drop)
139 }
140
141 /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
142 /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
143 /// the task couldn't be started or failed to complete (for example because the dispatcher was
144 /// shutting down).
145 ///
146 /// Returns a [`Task`] that will cancel the future when dropped.
147 ///
148 /// TODO(470088116): This may be the cause of some flakes, so care should be used with it
149 /// in critical paths for now.
150 fn compute<T: Send + 'static>(
151 &self,
152 future: impl Future<Output = T> + Send + 'static,
153 ) -> Task<T>
154 where
155 Self: 'static,
156 {
157 Task::start(future, self.clone())
158 }
159}
160
161impl<D: AsyncDispatcher> OnDispatcher for &D {
162 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
163 f(Some(D::as_async_dispatcher_ref(*self)))
164 }
165}
166
167impl<'a> OnDispatcher for AsyncDispatcherRef<'a> {
168 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
169 f(Some(self.clone()))
170 }
171}
172
173impl<T: AsyncDispatcher> OnDispatcher for Arc<T> {
174 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
175 f(Some(self.as_async_dispatcher_ref()))
176 }
177}
178
179impl<T: AsyncDispatcher> OnDispatcher for Weak<T> {
180 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
181 let dispatcher = Weak::upgrade(self);
182 match dispatcher {
183 Some(dispatcher) => f(Some(dispatcher.as_async_dispatcher_ref())),
184 None => f(None),
185 }
186 }
187}
188
189/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
190pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
191impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
192
193#[repr(C)]
194struct TaskFunc {
195 task: async_task,
196 func: Box<dyn TaskCallback>,
197}
198
199impl TaskFunc {
200 extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
201 // SAFETY: the async api promises that this function will only be called
202 // up to once, so we can reconstitute the `Arc` and let it get dropped.
203 let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
204 // SAFETY: if we can't get a mut ref from the arc, then the task is already
205 // being cancelled, so we don't want to call it.
206 if let Ok(task) = Arc::try_unwrap(task) {
207 (task.into_inner().func)(Status::from_raw(status));
208 }
209 }
210}