input_pipeline_dso/
dispatcher.rs1use core::task::Context;
6use fidl_next::ClientEnd;
7use futures::prelude::*;
8use futures::task::Poll;
9use pin_project_lite::pin_project;
10use std::pin::Pin;
11
12#[cfg(feature = "dso")]
13pub use dso::*;
14
15#[cfg(not(feature = "dso"))]
16pub use elf::*;
17
18pin_project! {
19 #[derive(Debug)]
20 #[must_use = "futures do nothing unless polled"]
21 pub struct OnTimeout<F, T, OT> {
22 #[pin]
23 timer: T,
24 #[pin]
25 future: F,
26 on_timeout: Option<OT>,
27 }
28}
29
30impl<F: Future, T, OT> Future for OnTimeout<F, T, OT>
31where
32 T: Future<Output = ()> + 'static,
33 OT: FnOnce() -> F::Output,
34{
35 type Output = F::Output;
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 let this = self.project();
39 if let Poll::Ready(item) = this.future.poll(cx) {
40 return Poll::Ready(item);
41 }
42 if let Poll::Ready(()) = this.timer.poll(cx) {
43 let ot = this.on_timeout.take().expect("polled with timeout after completion");
44 let item = (ot)();
45 return Poll::Ready(item);
46 }
47 Poll::Pending
48 }
49}
50
51pub trait TimeoutExt: Future + Sized {
55 fn on_timeout<T, OT>(self, timer: T, on_timeout: OT) -> OnTimeout<Self, T, OT>
56 where
57 T: Future<Output = ()> + 'static,
58 OT: FnOnce() -> Self::Output,
59 {
60 OnTimeout { timer, future: self, on_timeout: Some(on_timeout) }
61 }
62}
63
64impl<F: Future + Sized> TimeoutExt for F {}
65
66#[derive(Clone, Default)]
67pub struct Dispatcher {}
68
69mod dso {
70 #![cfg(feature = "dso")]
71
72 pub use super::*;
73
74 #[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
75 #[repr(transparent)]
76 pub struct MonotonicInstant(zx::MonotonicInstant);
77
78 impl From<zx::MonotonicInstant> for MonotonicInstant {
79 fn from(o: zx::MonotonicInstant) -> Self {
80 Self(o)
81 }
82 }
83
84 impl From<MonotonicInstant> for zx::MonotonicInstant {
85 fn from(o: MonotonicInstant) -> Self {
86 o.0
87 }
88 }
89
90 impl MonotonicInstant {
91 pub fn now() -> Self {
92 Self(zx::MonotonicInstant::get())
93 }
94
95 pub fn into_nanos(&self) -> i64 {
96 self.0.into_nanos()
97 }
98
99 pub fn into_zx(self) -> zx::MonotonicInstant {
100 self.0
101 }
102
103 pub fn after(duration: zx::MonotonicDuration) -> Self {
104 Self(zx::MonotonicInstant::after(duration))
105 }
106 }
107
108 pub type Transport = libasync_fidl::AsyncChannel<Dispatcher>;
109
110 #[derive(Debug)]
111 pub struct TaskHandle<T> {
112 handle: Option<::libasync::JoinHandle<T>>,
113 detached: bool,
114 }
115
116 use fdf::{OnDispatcher, OnDriverDispatcher};
117
118 impl<T> Drop for TaskHandle<T> {
119 fn drop(&mut self) {
120 if !self.detached {
121 self.handle.as_mut().take().map(|h| {
122 _ = h.abort();
123 });
124 }
125 }
126 }
127
128 impl TaskHandle<()> {
129 pub fn detach(mut self) {
130 self.detached = true
131 }
132 }
133
134 impl<T: 'static> Future for TaskHandle<T> {
135 type Output = T;
136
137 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 match self.handle.as_mut().unwrap().poll_unpin(cx) {
139 Poll::Pending => Poll::Pending,
140 Poll::Ready(Ok(t)) => Poll::Ready(t),
141 Poll::Ready(Err(e)) => panic!("TaskHandle: polled unexpected error {e:?}"),
142 }
143 }
144 }
145
146 impl Dispatcher {
147 #[must_use]
148 pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
149 where
150 Self: 'static,
151 {
152 TaskHandle {
154 handle: Some(
155 fdf::CurrentDispatcher.spawn_local(future).expect("Dispatcher::spawn_local"),
156 ),
157 detached: false,
158 }
159 }
160
161 pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
162 let f = fdf::CurrentDispatcher.after_deadline(deadline.into());
163 async move {
164 f.await.expect("Dispatcher::after_deadline");
166 }
167 }
168
169 pub fn client_from_zx_channel<P>(
170 client_end: ClientEnd<P, zx::Channel>,
171 ) -> ClientEnd<P, Transport> {
172 libasync_fidl::AsyncChannel::<Dispatcher>::client_from_zx_channel(client_end)
173 }
174 }
175
176 impl fdf::OnDispatcher for Dispatcher {
177 fn on_dispatcher<R>(&self, f: impl FnOnce(Option<fdf::AsyncDispatcherRef<'_>>) -> R) -> R {
178 fdf::CurrentDispatcher.on_dispatcher(f)
179 }
180 }
181}
182
183mod elf {
184 #![cfg(not(feature = "dso"))]
185
186 pub use super::*;
187
188 pub type MonotonicInstant = fuchsia_async::MonotonicInstant;
189
190 pub type Transport = zx::Channel;
191
192 #[derive(Debug)]
193 pub struct TaskHandle<T>(fuchsia_async::Task<T>);
194
195 impl TaskHandle<()> {
196 pub fn detach(self) {
197 self.0.detach();
198 }
199 }
200
201 #[cfg(test)]
202 impl<T: 'static> From<fuchsia_async::Task<T>> for TaskHandle<T> {
203 fn from(task: fuchsia_async::Task<T>) -> Self {
204 Self(task)
205 }
206 }
207
208 impl<T: 'static> Future for TaskHandle<T> {
209 type Output = T;
210
211 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212 match self.0.poll_unpin(cx) {
213 Poll::Ready(t) => Poll::Ready(t),
214 Poll::Pending => Poll::Pending,
215 }
216 }
217 }
218
219 impl Dispatcher {
220 #[must_use]
221 pub fn spawn_local(future: impl Future<Output = ()> + 'static) -> TaskHandle<()>
222 where
223 Self: 'static,
224 {
225 TaskHandle(fuchsia_async::Task::local(future))
226 }
227
228 pub fn after_deadline(deadline: MonotonicInstant) -> impl Future<Output = ()> + 'static {
229 fuchsia_async::Timer::new(deadline)
230 }
231
232 pub fn client_from_zx_channel<P>(
233 client_end: fidl_next::ClientEnd<P, zx::Channel>,
234 ) -> ClientEnd<P, Transport> {
235 client_end
236 }
237 }
238}