Skip to main content

input_pipeline_dso/
dispatcher.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::task::Context;
6use fidl_next::ClientEnd;
7use futures::prelude::*;
8use futures::task::Poll;
9use pin_project_lite::pin_project;
10use std::pin::Pin;
11
12#[cfg(feature = "dso")]
13pub use dso::*;
14
15#[cfg(not(feature = "dso"))]
16pub use elf::*;
17
18pin_project! {
19    #[derive(Debug)]
20    #[must_use = "futures do nothing unless polled"]
21    pub struct OnTimeout<F, T, OT> {
22        #[pin]
23        timer: T,
24        #[pin]
25        future: F,
26        on_timeout: Option<OT>,
27    }
28}
29
30impl<F: Future, T, OT> Future for OnTimeout<F, T, OT>
31where
32    T: Future<Output = ()> + 'static,
33    OT: FnOnce() -> F::Output,
34{
35    type Output = F::Output;
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let this = self.project();
39        if let Poll::Ready(item) = this.future.poll(cx) {
40            return Poll::Ready(item);
41        }
42        if let Poll::Ready(()) = this.timer.poll(cx) {
43            let ot = this.on_timeout.take().expect("polled with timeout after completion");
44            let item = (ot)();
45            return Poll::Ready(item);
46        }
47        Poll::Pending
48    }
49}
50
51/// A wrapper for a future which will complete with a provided closure when a timeout occurs. This
52/// is forked from [`fuchsia_async::OnTimeout`] because that has a fixed dependency on
53/// [`fuchsia_async::Timer`] which driver dispatcher does not support.
54pub trait TimeoutExt: Future + Sized {
55    fn on_timeout<T, OT>(self, timer: T, on_timeout: OT) -> OnTimeout<Self, T, OT>
56    where
57        T: Future<Output = ()> + 'static,
58        OT: FnOnce() -> Self::Output,
59    {
60        OnTimeout { timer, future: self, on_timeout: Some(on_timeout) }
61    }
62}
63
64impl<F: Future + Sized> TimeoutExt for F {}
65
66#[derive(Clone, Default)]
67pub struct Dispatcher {}
68
69mod dso {
70    #![cfg(feature = "dso")]
71
72    pub use super::*;
73
74    #[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
75    #[repr(transparent)]
76    pub struct MonotonicInstant(zx::MonotonicInstant);
77
78    impl From<zx::MonotonicInstant> for MonotonicInstant {
79        fn from(o: zx::MonotonicInstant) -> Self {
80            Self(o)
81        }
82    }
83
84    impl From<MonotonicInstant> for zx::MonotonicInstant {
85        fn from(o: MonotonicInstant) -> Self {
86            o.0
87        }
88    }
89
90    impl MonotonicInstant {
91        pub fn now() -> Self {
92            Self(zx::MonotonicInstant::get())
93        }
94
95        pub fn into_nanos(&self) -> i64 {
96            self.0.into_nanos()
97        }
98
99        pub fn into_zx(self) -> zx::MonotonicInstant {
100            self.0
101        }
102
103        pub fn after(duration: zx::MonotonicDuration) -> Self {
104            Self(zx::MonotonicInstant::after(duration))
105        }
106    }
107
108    pub type Transport = libasync_fidl::AsyncChannel<Dispatcher>;
109
110    #[derive(Debug)]
111    pub struct TaskHandle<T> {
112        handle: Option<::libasync::JoinHandle<T>>,
113        detached: bool,
114    }
115
116    use fdf::{OnDispatcher, OnDriverDispatcher};
117
118    impl<T> Drop for TaskHandle<T> {
119        fn drop(&mut self) {
120            if !self.detached {
121                self.handle.as_mut().take().map(|h| {
122                    _ = h.abort();
123                });
124            }
125        }
126    }
127
128    impl TaskHandle<()> {
129        pub fn detach(mut self) {
130            self.detached = true
131        }
132    }
133
134    impl<T: 'static> Future for TaskHandle<T> {
135        type Output = T;
136
137        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138            match self.handle.as_mut().unwrap().poll_unpin(cx) {
139                Poll::Pending => Poll::Pending,
140                Poll::Ready(Ok(t)) => Poll::Ready(t),
141                Poll::Ready(Err(e)) => panic!("TaskHandle: polled unexpected error {e:?}"),
142            }
143        }
144    }
145
146    impl Dispatcher {
147        #[must_use]
148        pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
149        where
150            Self: 'static,
151        {
152            // This should never panic if the dispatcher is valid.
153            TaskHandle {
154                handle: Some(
155                    fdf::CurrentDispatcher.spawn_local(future).expect("Dispatcher::spawn_local"),
156                ),
157                detached: false,
158            }
159        }
160
161        pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
162            let f = fdf::CurrentDispatcher.after_deadline(deadline.into());
163            async move {
164                // This should never panic if the dispatcher is valid.
165                f.await.expect("Dispatcher::after_deadline");
166            }
167        }
168
169        pub fn client_from_zx_channel<P>(
170            client_end: ClientEnd<P, zx::Channel>,
171        ) -> ClientEnd<P, Transport> {
172            libasync_fidl::AsyncChannel::<Dispatcher>::client_from_zx_channel(client_end)
173        }
174    }
175
176    impl fdf::OnDispatcher for Dispatcher {
177        fn on_dispatcher<R>(&self, f: impl FnOnce(Option<fdf::AsyncDispatcherRef<'_>>) -> R) -> R {
178            fdf::CurrentDispatcher.on_dispatcher(f)
179        }
180    }
181}
182
183mod elf {
184    #![cfg(not(feature = "dso"))]
185
186    pub use super::*;
187
188    pub type MonotonicInstant = fuchsia_async::MonotonicInstant;
189
190    pub type Transport = zx::Channel;
191
192    #[derive(Debug)]
193    pub struct TaskHandle<T>(fuchsia_async::Task<T>);
194
195    impl TaskHandle<()> {
196        pub fn detach(self) {
197            self.0.detach();
198        }
199    }
200
201    #[cfg(test)]
202    impl<T: 'static> From<fuchsia_async::Task<T>> for TaskHandle<T> {
203        fn from(task: fuchsia_async::Task<T>) -> Self {
204            Self(task)
205        }
206    }
207
208    impl<T: 'static> Future for TaskHandle<T> {
209        type Output = T;
210
211        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212            match self.0.poll_unpin(cx) {
213                Poll::Ready(t) => Poll::Ready(t),
214                Poll::Pending => Poll::Pending,
215            }
216        }
217    }
218
219    impl Dispatcher {
220        #[must_use]
221        pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
222        where
223            Self: 'static,
224        {
225            TaskHandle(fuchsia_async::Task::local(future))
226        }
227
228        pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
229            fuchsia_async::Timer::new(deadline)
230        }
231
232        pub fn client_from_zx_channel<P>(
233            client_end: fidl_next::ClientEnd<P, zx::Channel>,
234        ) -> ClientEnd<P, Transport> {
235            client_end
236        }
237    }
238}