libasync/dispatcher.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the C libasync async dispatcher library
6
7use libasync_sys::*;
8
9use core::cell::UnsafeCell;
10use core::future::Future;
11use core::marker::PhantomData;
12use core::ptr::NonNull;
13use std::sync::{Arc, Weak};
14
15use zx::Status;
16
17mod after_deadline;
18mod task;
19
20pub use after_deadline::*;
21pub use task::*;
22
23/// An unowned reference to a driver runtime dispatcher such as is produced by calling
24/// [`AsyncDispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
25/// leaving that up to the driver runtime or another owner.
26#[derive(Debug)]
27pub struct AsyncDispatcherRef<'a>(NonNull<async_dispatcher_t>, PhantomData<&'a async_dispatcher_t>);
28
29unsafe impl<'a> Send for AsyncDispatcherRef<'a> {}
30unsafe impl<'a> Sync for AsyncDispatcherRef<'a> {}
31
32impl<'a> AsyncDispatcherRef<'a> {
33 /// Creates a dispatcher ref from a raw ptr.
34 ///
35 /// # Safety
36 ///
37 /// Caller is responsible for ensuring that the given ptr is valid for
38 /// the lifetime `'a`.
39 pub unsafe fn from_raw(ptr: NonNull<async_dispatcher_t>) -> Self {
40 // SAFETY: Caller promises the ptr is valid.
41 Self(ptr, PhantomData)
42 }
43
44 /// Gets the inner pointer to the dispatcher struct.
45 pub fn inner(&self) -> NonNull<async_dispatcher_t> {
46 self.0
47 }
48}
49
50impl<'a> Clone for AsyncDispatcherRef<'a> {
51 fn clone(&self) -> Self {
52 Self(self.0, PhantomData)
53 }
54}
55
56/// A trait for things that can be represented as an [`AsyncDispatcherRef`].
57pub trait AsyncDispatcher: Send + Sync {
58 /// Gets an [`AsyncDispatcherRef`] corresponding to this object.
59 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_>;
60
61 /// Schedules the callback [`p`] to be run on this dispatcher later.
62 fn post_task_sync(&self, p: impl TaskCallback) -> Result<(), Status> {
63 #[expect(clippy::arc_with_non_send_sync)]
64 let task_arc = Arc::new(UnsafeCell::new(TaskFunc {
65 task: async_task { handler: Some(TaskFunc::call), ..Default::default() },
66 func: Box::new(p),
67 }));
68
69 let task_cell = Arc::into_raw(task_arc);
70 // SAFETY: we need a raw mut pointer to give to async_post_task. From
71 // when we call that function to when the task is cancelled or the
72 // callback is called, the driver runtime owns the contents of that
73 // object and we will not manipulate it. So even though the Arc only
74 // gives us a shared reference, it's fine to give the runtime a
75 // mutable pointer to it.
76 let res = unsafe {
77 let task_ptr = &raw mut (*UnsafeCell::raw_get(task_cell)).task;
78 Status::ok(async_post_task(self.as_async_dispatcher_ref().0.as_ptr(), task_ptr))
79 };
80 if res.is_err() {
81 // SAFETY: `TaskFunc::call` will never be called now so dispose of
82 // the long-lived reference we just created.
83 unsafe { Arc::decrement_strong_count(task_cell) }
84 }
85 res
86 }
87
88 /// Returns the current time on the dispatcher's timeline
89 fn now(&self) -> zx::MonotonicInstant {
90 let async_dispatcher = self.as_async_dispatcher_ref().0.as_ptr();
91 let now_nanos = unsafe { async_now(async_dispatcher) };
92 zx::MonotonicInstant::from_nanos(now_nanos)
93 }
94}
95
96impl<'a> AsyncDispatcher for AsyncDispatcherRef<'a> {
97 fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
98 self.clone()
99 }
100}
101
102/// A trait that can be used to access a lifetime-constrained dispatcher in a generic way.
103pub trait OnDispatcher: Clone + Send + Sync {
104 /// Runs the function `f` with a lifetime-bound [`AsyncDispatcherRef`] for this object's dispatcher.
105 /// If the dispatcher is no longer valid, the callback will be given [`None`].
106 ///
107 /// Note that it is *very important* that no blocking work be done in this callback to prevent
108 /// long lived strong references to dispatchers that might be shutting down.
109 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R;
110
111 /// Helper version of [`OnDispatcher::on_dispatcher`] that translates an invalidated dispatcher
112 /// handle into a [`Status::BAD_STATE`] error instead of giving the callback [`None`].
113 ///
114 /// Note that it is *very important* that no blocking work be done in this callback to prevent
115 /// long lived strong references to dispatchers that might be shutting down.
116 fn on_maybe_dispatcher<R, E: From<Status>>(
117 &self,
118 f: impl FnOnce(AsyncDispatcherRef<'_>) -> Result<R, E>,
119 ) -> Result<R, E> {
120 self.on_dispatcher(|dispatcher| {
121 let dispatcher = dispatcher.ok_or(Status::BAD_STATE)?;
122 f(dispatcher)
123 })
124 }
125
126 /// Spawn an asynchronous task on this dispatcher. If this returns [`Ok`] then the task has
127 /// successfully been scheduled and will run or be cancelled and dropped when the dispatcher
128 /// shuts down. The returned future's result will be [`Ok`] if the future completed
129 /// successfully, or an [`Err`] if the task did not complete for some reason (like the
130 /// dispatcher shut down).
131 ///
132 /// Returns a [`JoinHandle`] that will detach the future when dropped.
133 fn spawn(
134 &self,
135 future: impl Future<Output = ()> + Send + 'static,
136 ) -> Result<JoinHandle<()>, Status>
137 where
138 Self: 'static,
139 {
140 Task::try_start(future, self.clone()).map(Task::detach_on_drop)
141 }
142
143 /// Spawn an asynchronous task that outputs type 'T' on this dispatcher. The returned future's
144 /// result will be [`Ok`] if the task was started and completed successfully, or an [`Err`] if
145 /// the task couldn't be started or failed to complete (for example because the dispatcher was
146 /// shutting down).
147 ///
148 /// Returns a [`Task`] that will cancel the future when dropped.
149 fn compute<T: Send + 'static>(
150 &self,
151 future: impl Future<Output = T> + Send + 'static,
152 ) -> Task<T>
153 where
154 Self: 'static,
155 {
156 Task::start(future, self.clone())
157 }
158
159 /// Returns a future that will fire when after the given deadline time.
160 ///
161 /// This can be used instead of the fuchsia-async timer primitives in situations where
162 /// there isn't a currently active fuchsia-async executor running on that dispatcher for some
163 /// reason (ie. the rust code does not own the dispatcher) or for cases where the small overhead
164 /// of fuchsia-async compatibility is too much.
165 fn after_deadline(&self, deadline: zx::MonotonicInstant) -> AfterDeadline<Self> {
166 AfterDeadline::new(self, deadline)
167 }
168}
169
170impl<D: AsyncDispatcher> OnDispatcher for &D {
171 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
172 f(Some(D::as_async_dispatcher_ref(*self)))
173 }
174}
175
176impl<'a> OnDispatcher for AsyncDispatcherRef<'a> {
177 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
178 f(Some(self.clone()))
179 }
180}
181
182impl<T: AsyncDispatcher> OnDispatcher for Arc<T> {
183 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
184 f(Some(self.as_async_dispatcher_ref()))
185 }
186}
187
188impl<T: AsyncDispatcher> OnDispatcher for Weak<T> {
189 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
190 let dispatcher = Weak::upgrade(self);
191 match dispatcher {
192 Some(dispatcher) => f(Some(dispatcher.as_async_dispatcher_ref())),
193 None => f(None),
194 }
195 }
196}
197
198/// A marker trait for a callback that can be used with [`Dispatcher::post_task_sync`].
199pub trait TaskCallback: FnOnce(Status) + 'static + Send {}
200impl<T> TaskCallback for T where T: FnOnce(Status) + 'static + Send {}
201
202#[repr(C)]
203struct TaskFunc {
204 task: async_task,
205 func: Box<dyn TaskCallback>,
206}
207
208impl TaskFunc {
209 extern "C" fn call(_dispatcher: *mut async_dispatcher, task: *mut async_task, status: i32) {
210 // SAFETY: the async api promises that this function will only be called
211 // up to once, so we can reconstitute the `Arc` and let it get dropped.
212 let task = unsafe { Arc::from_raw(task as *const UnsafeCell<Self>) };
213 // SAFETY: if we can't get a mut ref from the arc, then the task is already
214 // being cancelled, so we don't want to call it.
215 if let Ok(task) = Arc::try_unwrap(task) {
216 (task.into_inner().func)(Status::from_raw(status));
217 }
218 }
219}