libasync_dispatcher/lib.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7#![deny(missing_docs, clippy::undocumented_unsafe_blocks)]
8
9use libasync_sys::*;
10
11use core::cell::UnsafeCell;
12use core::future::Future;
13use core::marker::PhantomData;
14use core::ptr::NonNull;
15use std::sync::Arc;
16
17use zx_status::Status;
18use zx_types::zx_time_t;
19
20mod task;
21
22pub use task::*;
23
24/// A reference to a dispatcher that supports the v4 async api's reference counting operations,
25/// and so can be held safely without a lifetime.
26#[derive(Debug)]
27pub struct AsyncDispatcher(NonNull<async_dispatcher_t>);
28
29// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
30unsafe impl Send for AsyncDispatcher {}
31// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
32unsafe impl Sync for AsyncDispatcher {}
33
34impl AsyncDispatcher {
35 /// Converts from something that implements [`AsAsyncDispatcherRef`] to an [`AsyncDispatcher`]
36 /// if it implements the v4 async api's reference counting.
37 ///
38 /// # Panics
39 ///
40 /// This will panic if the implementation does not support reference counting. If you need to be
41 /// able to deal with a dispatcher that might not implement this api, you can use
42 /// [`AsyncDispatcher::new`].
43 pub fn new(dispatcher: &impl AsAsyncDispatcherRef) -> Self {
44 Self::try_new(dispatcher).expect("Dispatcher does not implement reference counting")
45 }
46
47 /// Converts from something that implements [`AsAsyncDispatcherRef`] to an [`AsyncDispatcher`]
48 /// if it implements the v4 async api's reference counting.
49 ///
50 /// Returns [`Status::UNSUPPORTED`] if the dispatcher does not support refcounting.
51 pub fn try_new(dispatcher: &impl AsAsyncDispatcherRef) -> Result<Self, Status> {
52 let dispatcher = dispatcher.as_async_dispatcher_ref();
53 // SAFETY: The dispatcher is a valid reference to a live dispatcher by construction, and
54 // we will only return a new Self if the call succeeds, so we will not release an invalid
55 // reference.
56 Status::ok(unsafe { libasync_sys::async_acquire_shared_ref(dispatcher.0.as_ptr()) })?;
57 Ok(Self(dispatcher.0))
58 }
59}
60
61impl Clone for AsyncDispatcher {
62 fn clone(&self) -> Self {
63 Self::new(self)
64 }
65}
66
67impl Drop for AsyncDispatcher {
68 fn drop(&mut self) {
69 // SAFETY: The dispatcher is a valid reference to a live dispatcher by construction, and
70 // we have already successfully acquired the shared reference to it in [`Self::try_new`].
71 Status::ok(unsafe { libasync_sys::async_release_shared_ref(self.0.as_ptr()) })
72 .expect("attempted to release shared dispatcher ref that doesn't support refcounting");
73 }
74}
75
76impl AsAsyncDispatcherRef for AsyncDispatcher {
77 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
78 AsyncDispatcherRef(self.0, PhantomData)
79 }
80}
81
82/// An unowned reference to a driver runtime dispatcher such as is produced by calling
83/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
84/// leaving that up to the driver runtime or another owner.
85#[derive(Debug, Copy, Clone)]
86pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
87
88// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
89unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
90// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
91unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
92
93impl<'a> AsyncDispatcherRef<'a> {
94 /// Creates a dispatcher ref from a raw ptr.
95 ///
96 /// # Safety
97 ///
98 /// Caller is responsible for ensuring that the given ptr is valid for
99 /// the lifetime `'a`.
100 pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
101 // SAFETY: Caller promises the ptr is valid.
102 Self(ptr, PhantomData)
103 }
104
105 /// Gets the inner pointer to the dispatcher struct.
106 pub fn inner(&self) -> NonNull<async_dispatcher_t> {
107 self.0
108 }
109}
110
111/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
112pub trait AsAsyncDispatcherRef: Send + Sync {
113 /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
114 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
115
116 /// Schedules the callback [`p`] to be run on this dispatcher later.
117 fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
118 #[expect(clippy::arc_with_non_send_sync)]
119 let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
120 task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
121 func: Box::new(p),
122 }));
123
124 let task_cell = Arc::into_raw(task_arc);
125 // SAFETY: we need a raw mut pointer to give to async_post_task. From
126 // when we call that function to when the task is cancelled or the
127 // callback is called, the driver runtime owns the contents of that
128 // object and we will not manipulate it. So even though the Arc only
129 // gives us a shared reference, it's fine to give the runtime a
130 // mutable pointer to it.
131 let res = unsafe {
132 let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
133 Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
134 };
135 if res.is_err() {
136 // SAFETY: `TaskFunc::call` will never be called now so dispose of
137 // the long-lived reference we just created.
138 unsafe { Arc::decrement_strong_count(task_cell) }
139 }
140 res
141 }
142
143 /// Returns the current time on the dispatcher's timeline
144 fn now(&self) -> zx_time_t {
145 let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
146 // SAFETY: The dispatcher returned from self.as_async_dispatcher_ref() is valid by
147 // construction.
148 unsafe { async_now(async_dispatcher) }
149 }
150}
151
152impl<'a> AsAsyncDispatcherRef for AsyncDispatcherRef<'a> {
153 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
154 *self
155 }
156}
157
158/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
159pub trait OnDispatcher: Clone + Send + Sync {
160 /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
161 /// If the dispatcher is no longer valid, the callback will be given [`None`].
162 ///
163 /// Note that it is *very important* that no blocking work be done in this callback to prevent
164 /// long lived strong references to dispatchers that might be shutting down.
165 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
166
167 /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
168 /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
169 ///
170 /// Note that it is *very important* that no blocking work be done in this callback to prevent
171 /// long lived strong references to dispatchers that might be shutting down.
172 fn on_maybe_dispatcher<R, E: From<Status>>(
173 &self,
174 f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
175 ) -> Result<R, E> {
176 self.on_dispatcher(|dispatcher| {
177 let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
178 f(dispatcher)
179 })
180 }
181
182 /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
183 /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
184 /// shuts down. The returned future's result will be [`Ok`] if the future completed
185 /// successfully, or an [`Err`] if the task did not complete for some reason (like the
186 /// dispatcher shut down).
187 ///
188 /// Returns a [`JoinHandle`] that will detach the future when dropped.
189 fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()>
190 where
191 Self: 'static,
192 {
193 Task::start(future, self.clone()).detach_on_drop()
194 }
195
196 /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
197 /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
198 /// the task couldn't be started or failed to complete (for example because the dispatcher was
199 /// shutting down).
200 ///
201 /// Returns a [`Task`] that will cancel the future when dropped.
202 ///
203 /// TODO(470088116): This may be the cause of some flakes, so care should be used with it
204 /// in critical paths for now.
205 fn compute<T: Send + 'static>(
206 &self,
207 future: impl Future<Output = T> + Send + 'static,
208 ) -> Task<T>
209 where
210 Self: 'static,
211 {
212 Task::start(future, self.clone())
213 }
214}
215
216impl<T: AsAsyncDispatcherRef + Clone> OnDispatcher for T {
217 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
218 f(Some(self.as_async_dispatcher_ref()))
219 }
220}
221
222impl<T: AsAsyncDispatcherRef> OnDispatcher for Arc<T> {
223 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
224 f(Some(self.as_async_dispatcher_ref()))
225 }
226}
227
228/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
229pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
230impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
231
232#[repr(C)]
233struct TaskFunc {
234 task: async_task,
235 func: Box<dyn TaskCallback>,
236}
237
238impl TaskFunc {
239 extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
240 // SAFETY: the async api promises that this function will only be called
241 // up to once, so we can reconstitute the `Arc` and let it get dropped.
242 let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
243 // SAFETY: if we can't get a mut ref from the arc, then the task is already
244 // being cancelled, so we don't want to call it.
245 if let Ok(task) = Arc::try_unwrap(task) {
246 (task.into_inner().func)(Status::from_raw(status));
247 }
248 }
249}