Skip to main content

libasync_dispatcher/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7use libasync_sys::*;
8
9use core::cell::UnsafeCell;
10use core::future::Future;
11use core::marker::PhantomData;
12use core::ptr::NonNull;
13use std::sync::{Arc, Weak};
14
15use zx_status::Status;
16use zx_types::zx_time_t;
17
18mod task;
19
20pub use task::*;
21
22/// An unowned reference to a driver runtime dispatcher such as is produced by calling
23/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
24/// leaving that up to the driver runtime or another owner.
25#[derive(Debug)]
26pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
27
28unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
29unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
30
31impl<'a> AsyncDispatcherRef<'a> {
32    /// Creates a dispatcher ref from a raw ptr.
33    ///
34    /// # Safety
35    ///
36    /// Caller is responsible for ensuring that the given ptr is valid for
37    /// the lifetime `'a`.
38    pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
39        // SAFETY: Caller promises the ptr is valid.
40        Self(ptr, PhantomData)
41    }
42
43    /// Gets the inner pointer to the dispatcher struct.
44    pub fn inner(&self) -> NonNull<async_dispatcher_t> {
45        self.0
46    }
47}
48
49impl<'a> Clone for AsyncDispatcherRef<'a> {
50    fn clone(&self) -> Self {
51        Self(self.0, PhantomData)
52    }
53}
54
55/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
56pub trait AsyncDispatcher: Send + Sync {
57    /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
58    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
59
60    /// Schedules the callback [`p`] to be run on this dispatcher later.
61    fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
62        #[expect(clippy::arc_with_non_send_sync)]
63        let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
64            task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
65            func: Box::new(p),
66        }));
67
68        let task_cell = Arc::into_raw(task_arc);
69        // SAFETY: we need a raw mut pointer to give to async_post_task. From
70        // when we call that function to when the task is cancelled or the
71        // callback is called, the driver runtime owns the contents of that
72        // object and we will not manipulate it. So even though the Arc only
73        // gives us a shared reference, it's fine to give the runtime a
74        // mutable pointer to it.
75        let res = unsafe {
76            let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
77            Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
78        };
79        if res.is_err() {
80            // SAFETY: `TaskFunc::call` will never be called now so dispose of
81            // the long-lived reference we just created.
82            unsafe { Arc::decrement_strong_count(task_cell) }
83        }
84        res
85    }
86
87    /// Returns the current time on the dispatcher's timeline
88    fn now(&self) -> zx_time_t {
89        let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
90        unsafe { async_now(async_dispatcher) }
91    }
92}
93
94impl<'a> AsyncDispatcher for AsyncDispatcherRef<'a> {
95    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
96        self.clone()
97    }
98}
99
100/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
101pub trait OnDispatcher: Clone + Send + Sync {
102    /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
103    /// If the dispatcher is no longer valid, the callback will be given [`None`].
104    ///
105    /// Note that it is *very important* that no blocking work be done in this callback to prevent
106    /// long lived strong references to dispatchers that might be shutting down.
107    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
108
109    /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
110    /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
111    ///
112    /// Note that it is *very important* that no blocking work be done in this callback to prevent
113    /// long lived strong references to dispatchers that might be shutting down.
114    fn on_maybe_dispatcher<R, E: From<Status>>(
115        &self,
116        f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
117    ) -> Result<R, E> {
118        self.on_dispatcher(|dispatcher| {
119            let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
120            f(dispatcher)
121        })
122    }
123
124    /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
125    /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
126    /// shuts down. The returned future's result will be [`Ok`] if the future completed
127    /// successfully, or an [`Err`] if the task did not complete for some reason (like the
128    /// dispatcher shut down).
129    ///
130    /// Returns a [`JoinHandle`] that will detach the future when dropped.
131    fn spawn(
132        &self,
133        future: impl Future<Output = ()> + Send + 'static,
134    ) -> Result<JoinHandle<()>, Status>
135    where
136        Self: 'static,
137    {
138        Task::try_start(future, self.clone()).map(Task::detach_on_drop)
139    }
140
141    /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
142    /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
143    /// the task couldn't be started or failed to complete (for example because the dispatcher was
144    /// shutting down).
145    ///
146    /// Returns a [`Task`] that will cancel the future when dropped.
147    ///
148    /// TODO(470088116): This may be the cause of some flakes, so care should be used with it
149    /// in critical paths for now.
150    fn compute<T: Send + 'static>(
151        &self,
152        future: impl Future<Output = T> + Send + 'static,
153    ) -> Task<T>
154    where
155        Self: 'static,
156    {
157        Task::start(future, self.clone())
158    }
159}
160
161impl<D: AsyncDispatcher> OnDispatcher for &D {
162    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
163        f(Some(D::as_async_dispatcher_ref(*self)))
164    }
165}
166
167impl<'a> OnDispatcher for AsyncDispatcherRef<'a> {
168    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
169        f(Some(self.clone()))
170    }
171}
172
173impl<T: AsyncDispatcher> OnDispatcher for Arc<T> {
174    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
175        f(Some(self.as_async_dispatcher_ref()))
176    }
177}
178
179impl<T: AsyncDispatcher> OnDispatcher for Weak<T> {
180    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
181        let dispatcher = Weak::upgrade(self);
182        match dispatcher {
183            Some(dispatcher) => f(Some(dispatcher.as_async_dispatcher_ref())),
184            None => f(None),
185        }
186    }
187}
188
189/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
190pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
191impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
192
193#[repr(C)]
194struct TaskFunc {
195    task: async_task,
196    func: Box<dyn TaskCallback>,
197}
198
199impl TaskFunc {
200    extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
201        // SAFETY: the async api promises that this function will only be called
202        // up to once, so we can reconstitute the `Arc` and let it get dropped.
203        let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
204        // SAFETY: if we can't get a mut ref from the arc, then the task is already
205        // being cancelled, so we don't want to call it.
206        if let Ok(task) = Arc::try_unwrap(task) {
207            (task.into_inner().func)(Status::from_raw(status));
208        }
209    }
210}