Skip to main content

libasync_dispatcher/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7#![deny(missing_docs, clippy::undocumented_unsafe_blocks)]
8
9use libasync_sys::*;
10
11use core::cell::UnsafeCell;
12use core::future::Future;
13use core::marker::PhantomData;
14use core::ptr::NonNull;
15use std::sync::Arc;
16
17use zx_status::Status;
18use zx_types::zx_time_t;
19
20mod task;
21
22pub use task::*;
23
24/// A reference to a dispatcher that supports the v4 async api's reference counting operations,
25/// and so can be held safely without a lifetime.
26#[derive(Debug)]
27pub struct AsyncDispatcher(NonNull<async_dispatcher_t>);
28
29// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
30unsafe impl Send for AsyncDispatcher {}
31// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
32unsafe impl Sync for AsyncDispatcher {}
33
34impl AsyncDispatcher {
35    /// Converts from something that implements [`AsAsyncDispatcherRef`] to an [`AsyncDispatcher`]
36    /// if it implements the v4 async api's reference counting.
37    ///
38    /// # Panics
39    ///
40    /// This will panic if the implementation does not support reference counting. If you need to be
41    /// able to deal with a dispatcher that might not implement this api, you can use
42    /// [`AsyncDispatcher::new`].
43    pub fn new(dispatcher: &impl AsAsyncDispatcherRef) -> Self {
44        Self::try_new(dispatcher).expect("Dispatcher does not implement reference counting")
45    }
46
47    /// Converts from something that implements [`AsAsyncDispatcherRef`] to an [`AsyncDispatcher`]
48    /// if it implements the v4 async api's reference counting.
49    ///
50    /// Returns [`Status::UNSUPPORTED`] if the dispatcher does not support refcounting.
51    pub fn try_new(dispatcher: &impl AsAsyncDispatcherRef) -> Result<Self, Status> {
52        let dispatcher = dispatcher.as_async_dispatcher_ref();
53        // SAFETY: The dispatcher is a valid reference to a live dispatcher by construction, and
54        // we will only return a new Self if the call succeeds, so we will not release an invalid
55        // reference.
56        Status::ok(unsafe { libasync_sys::async_acquire_shared_ref(dispatcher.0.as_ptr()) })?;
57        Ok(Self(dispatcher.0))
58    }
59}
60
61impl Clone for AsyncDispatcher {
62    fn clone(&self) -> Self {
63        Self::new(self)
64    }
65}
66
67impl Drop for AsyncDispatcher {
68    fn drop(&mut self) {
69        // SAFETY: The dispatcher is a valid reference to a live dispatcher by construction, and
70        // we have already successfully acquired the shared reference to it in [`Self::try_new`].
71        Status::ok(unsafe { libasync_sys::async_release_shared_ref(self.0.as_ptr()) })
72            .expect("attempted to release shared dispatcher ref that doesn't support refcounting");
73    }
74}
75
76impl AsAsyncDispatcherRef for AsyncDispatcher {
77    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
78        AsyncDispatcherRef(self.0, PhantomData)
79    }
80}
81
82/// An unowned reference to a driver runtime dispatcher such as is produced by calling
83/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
84/// leaving that up to the driver runtime or another owner.
85#[derive(Debug, Copy, Clone)]
86pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
87
88// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
89unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
90// SAFETY: It is safe to access an `async_dispatcher_t` from any thread per the libasync C api.
91unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
92
93impl<'a> AsyncDispatcherRef<'a> {
94    /// Creates a dispatcher ref from a raw ptr.
95    ///
96    /// # Safety
97    ///
98    /// Caller is responsible for ensuring that the given ptr is valid for
99    /// the lifetime `'a`.
100    pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
101        // SAFETY: Caller promises the ptr is valid.
102        Self(ptr, PhantomData)
103    }
104
105    /// Gets the inner pointer to the dispatcher struct.
106    pub fn inner(&self) -> NonNull<async_dispatcher_t> {
107        self.0
108    }
109}
110
111/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
112pub trait AsAsyncDispatcherRef: Send + Sync {
113    /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
114    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
115
116    /// Schedules the callback [`p`] to be run on this dispatcher later.
117    fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
118        #[expect(clippy::arc_with_non_send_sync)]
119        let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
120            task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
121            func: Box::new(p),
122        }));
123
124        let task_cell = Arc::into_raw(task_arc);
125        // SAFETY: we need a raw mut pointer to give to async_post_task. From
126        // when we call that function to when the task is cancelled or the
127        // callback is called, the driver runtime owns the contents of that
128        // object and we will not manipulate it. So even though the Arc only
129        // gives us a shared reference, it's fine to give the runtime a
130        // mutable pointer to it.
131        let res = unsafe {
132            let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
133            Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
134        };
135        if res.is_err() {
136            // SAFETY: `TaskFunc::call` will never be called now so dispose of
137            // the long-lived reference we just created.
138            unsafe { Arc::decrement_strong_count(task_cell) }
139        }
140        res
141    }
142
143    /// Returns the current time on the dispatcher's timeline
144    fn now(&self) -> zx_time_t {
145        let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
146        // SAFETY: The dispatcher returned from self.as_async_dispatcher_ref() is valid by
147        // construction.
148        unsafe { async_now(async_dispatcher) }
149    }
150}
151
152impl<'a> AsAsyncDispatcherRef for AsyncDispatcherRef<'a> {
153    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
154        *self
155    }
156}
157
158/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
159pub trait OnDispatcher: Clone + Send + Sync {
160    /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
161    /// If the dispatcher is no longer valid, the callback will be given [`None`].
162    ///
163    /// Note that it is *very important* that no blocking work be done in this callback to prevent
164    /// long lived strong references to dispatchers that might be shutting down.
165    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
166
167    /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
168    /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
169    ///
170    /// Note that it is *very important* that no blocking work be done in this callback to prevent
171    /// long lived strong references to dispatchers that might be shutting down.
172    fn on_maybe_dispatcher<R, E: From<Status>>(
173        &self,
174        f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
175    ) -> Result<R, E> {
176        self.on_dispatcher(|dispatcher| {
177            let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
178            f(dispatcher)
179        })
180    }
181
182    /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
183    /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
184    /// shuts down. The returned future's result will be [`Ok`] if the future completed
185    /// successfully, or an [`Err`] if the task did not complete for some reason (like the
186    /// dispatcher shut down).
187    ///
188    /// Returns a [`JoinHandle`] that will detach the future when dropped.
189    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) -> JoinHandle<()>
190    where
191        Self: 'static,
192    {
193        Task::start(future, self.clone()).detach_on_drop()
194    }
195
196    /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
197    /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
198    /// the task couldn't be started or failed to complete (for example because the dispatcher was
199    /// shutting down).
200    ///
201    /// Returns a [`Task`] that will cancel the future when dropped.
202    ///
203    /// TODO(470088116): This may be the cause of some flakes, so care should be used with it
204    /// in critical paths for now.
205    fn compute<T: Send + 'static>(
206        &self,
207        future: impl Future<Output = T> + Send + 'static,
208    ) -> Task<T>
209    where
210        Self: 'static,
211    {
212        Task::start(future, self.clone())
213    }
214}
215
216impl<T: AsAsyncDispatcherRef + Clone> OnDispatcher for T {
217    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
218        f(Some(self.as_async_dispatcher_ref()))
219    }
220}
221
222impl<T: AsAsyncDispatcherRef> OnDispatcher for Arc<T> {
223    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
224        f(Some(self.as_async_dispatcher_ref()))
225    }
226}
227
228/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
229pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
230impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
231
232#[repr(C)]
233struct TaskFunc {
234    task: async_task,
235    func: Box<dyn TaskCallback>,
236}
237
238impl TaskFunc {
239    extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
240        // SAFETY: the async api promises that this function will only be called
241        // up to once, so we can reconstitute the `Arc` and let it get dropped.
242        let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
243        // SAFETY: if we can't get a mut ref from the arc, then the task is already
244        // being cancelled, so we don't want to call it.
245        if let Ok(task) = Arc::try_unwrap(task) {
246            (task.into_inner().func)(Status::from_raw(status));
247        }
248    }
249}