libasync/
dispatcher.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7use libasync_sys::*;
8
9use core::cell::UnsafeCell;
10use core::future::Future;
11use core::marker::PhantomData;
12use core::ptr::NonNull;
13use std::sync::{Arc, Weak};
14
15use zx::Status;
16
17mod after_deadline;
18mod task;
19
20pub use after_deadline::*;
21pub use task::*;
22
23/// An unowned reference to a driver runtime dispatcher such as is produced by calling
24/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
25/// leaving that up to the driver runtime or another owner.
26#[derive(Debug)]
27pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
28
29unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
30unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
31
32impl<'a> AsyncDispatcherRef<'a> {
33    /// Creates a dispatcher ref from a raw ptr.
34    ///
35    /// # Safety
36    ///
37    /// Caller is responsible for ensuring that the given ptr is valid for
38    /// the lifetime `'a`.
39    pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
40        // SAFETY: Caller promises the ptr is valid.
41        Self(ptr, PhantomData)
42    }
43
44    /// Gets the inner pointer to the dispatcher struct.
45    pub fn inner(&self) -> NonNull<async_dispatcher_t> {
46        self.0
47    }
48}
49
50impl<'a> Clone for AsyncDispatcherRef<'a> {
51    fn clone(&self) -> Self {
52        Self(self.0, PhantomData)
53    }
54}
55
56/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
57pub trait AsyncDispatcher: Send + Sync {
58    /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
59    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
60
61    /// Schedules the callback [`p`] to be run on this dispatcher later.
62    fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
63        #[expect(clippy::arc_with_non_send_sync)]
64        let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
65            task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
66            func: Box::new(p),
67        }));
68
69        let task_cell = Arc::into_raw(task_arc);
70        // SAFETY: we need a raw mut pointer to give to async_post_task. From
71        // when we call that function to when the task is cancelled or the
72        // callback is called, the driver runtime owns the contents of that
73        // object and we will not manipulate it. So even though the Arc only
74        // gives us a shared reference, it's fine to give the runtime a
75        // mutable pointer to it.
76        let res = unsafe {
77            let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
78            Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
79        };
80        if res.is_err() {
81            // SAFETY: `TaskFunc::call` will never be called now so dispose of
82            // the long-lived reference we just created.
83            unsafe { Arc::decrement_strong_count(task_cell) }
84        }
85        res
86    }
87
88    /// Returns the current time on the dispatcher's timeline
89    fn now(&self) -> zx::MonotonicInstant {
90        let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
91        let now_nanos = unsafe { async_now(async_dispatcher) };
92        zx::MonotonicInstant::from_nanos(now_nanos)
93    }
94}
95
96impl<'a> AsyncDispatcher for AsyncDispatcherRef<'a> {
97    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
98        self.clone()
99    }
100}
101
102/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
103pub trait OnDispatcher: Clone + Send + Sync {
104    /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
105    /// If the dispatcher is no longer valid, the callback will be given [`None`].
106    ///
107    /// Note that it is *very important* that no blocking work be done in this callback to prevent
108    /// long lived strong references to dispatchers that might be shutting down.
109    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
110
111    /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
112    /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
113    ///
114    /// Note that it is *very important* that no blocking work be done in this callback to prevent
115    /// long lived strong references to dispatchers that might be shutting down.
116    fn on_maybe_dispatcher<R, E: From<Status>>(
117        &self,
118        f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
119    ) -> Result<R, E> {
120        self.on_dispatcher(|dispatcher| {
121            let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
122            f(dispatcher)
123        })
124    }
125
126    /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
127    /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
128    /// shuts down. The returned future's result will be [`Ok`] if the future completed
129    /// successfully, or an [`Err`] if the task did not complete for some reason (like the
130    /// dispatcher shut down).
131    ///
132    /// Returns a [`JoinHandle`] that will detach the future when dropped.
133    fn spawn(
134        &self,
135        future: impl Future<Output = ()> + Send + 'static,
136    ) -> Result<JoinHandle<()>, Status>
137    where
138        Self: 'static,
139    {
140        Task::try_start(future, self.clone()).map(Task::detach_on_drop)
141    }
142
143    /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
144    /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
145    /// the task couldn't be started or failed to complete (for example because the dispatcher was
146    /// shutting down).
147    ///
148    /// Returns a [`Task`] that will cancel the future when dropped.
149    fn compute<T: Send + 'static>(
150        &self,
151        future: impl Future<Output = T> + Send + 'static,
152    ) -> Task<T>
153    where
154        Self: 'static,
155    {
156        Task::start(future, self.clone())
157    }
158
159    /// Returns a future that will fire when after the given deadline time.
160    ///
161    /// This can be used instead of the fuchsia-async timer primitives in situations where
162    /// there isn't a currently active fuchsia-async executor running on that dispatcher for some
163    /// reason (ie. the rust code does not own the dispatcher) or for cases where the small overhead
164    /// of fuchsia-async compatibility is too much.
165    fn after_deadline(&self, deadline: zx::MonotonicInstant) -> AfterDeadline<Self> {
166        AfterDeadline::new(self, deadline)
167    }
168}
169
170impl<D: AsyncDispatcher> OnDispatcher for &D {
171    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
172        f(Some(D::as_async_dispatcher_ref(*self)))
173    }
174}
175
176impl<'a> OnDispatcher for AsyncDispatcherRef<'a> {
177    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
178        f(Some(self.clone()))
179    }
180}
181
182impl<T: AsyncDispatcher> OnDispatcher for Arc<T> {
183    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
184        f(Some(self.as_async_dispatcher_ref()))
185    }
186}
187
188impl<T: AsyncDispatcher> OnDispatcher for Weak<T> {
189    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
190        let dispatcher = Weak::upgrade(self);
191        match dispatcher {
192            Some(dispatcher) => f(Some(dispatcher.as_async_dispatcher_ref())),
193            None => f(None),
194        }
195    }
196}
197
198/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
199pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
200impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
201
202#[repr(C)]
203struct TaskFunc {
204    task: async_task,
205    func: Box<dyn TaskCallback>,
206}
207
208impl TaskFunc {
209    extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
210        // SAFETY: the async api promises that this function will only be called
211        // up to once, so we can reconstitute the `Arc` and let it get dropped.
212        let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
213        // SAFETY: if we can't get a mut ref from the arc, then the task is already
214        // being cancelled, so we don't want to call it.
215        if let Ok(task) = Arc::try_unwrap(task) {
216            (task.into_inner().func)(Status::from_raw(status));
217        }
218    }
219}