fuchsia_async/runtime/
mod.rs
1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15pub use implementation::executor::{
17 LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28 executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
29 timer::Interval,
30};
31
32pub mod scope {
36 pub use super::implementation::scope::{Scope, ScopeHandle};
37
38 #[cfg(target_os = "fuchsia")]
39 pub use super::implementation::scope::{Join, ScopeStream, Spawnable};
40}
41
42pub use scope::{Scope, ScopeHandle};
43
44use futures::prelude::*;
45use pin_project_lite::pin_project;
46use std::pin::Pin;
47use std::task::{ready, Context, Poll};
48
49pub trait DurationExt {
51 fn after_now(self) -> MonotonicInstant;
56}
57
58pub trait WakeupTime {
60 fn into_timer(self) -> Timer;
65}
66
67#[cfg(target_os = "fuchsia")]
68impl WakeupTime for std::time::Duration {
69 fn into_timer(self) -> Timer {
70 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
71 }
72}
73
74#[cfg(not(target_os = "fuchsia"))]
75impl WakeupTime for std::time::Duration {
76 fn into_timer(self) -> Timer {
77 Timer::from(self)
78 }
79}
80
81#[cfg(target_os = "fuchsia")]
82impl WakeupTime for MonotonicDuration {
83 fn into_timer(self) -> Timer {
84 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
85 }
86}
87
88#[cfg(target_os = "fuchsia")]
89impl WakeupTime for zx::BootDuration {
90 fn into_timer(self) -> Timer {
91 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
92 }
93}
94
95impl DurationExt for std::time::Duration {
96 fn after_now(self) -> MonotonicInstant {
97 MonotonicInstant::now() + self.into()
98 }
99}
100
101pub trait TimeoutExt: Future + Sized {
103 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
106 where
107 WT: WakeupTime,
108 OT: FnOnce() -> Self::Output,
109 {
110 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
111 }
112
113 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
118 where
119 OS: FnOnce() -> Self::Output,
120 {
121 OnStalled {
122 timer: timeout.into_timer(),
123 future: self,
124 timeout,
125 on_stalled: Some(on_stalled),
126 }
127 }
128}
129
130impl<F: Future + Sized> TimeoutExt for F {}
131
132pin_project! {
133 #[derive(Debug)]
135 #[must_use = "futures do nothing unless polled"]
136 pub struct OnTimeout<F, OT> {
137 #[pin]
138 timer: Timer,
139 #[pin]
140 future: F,
141 on_timeout: Option<OT>,
142 }
143}
144
145impl<F: Future, OT> Future for OnTimeout<F, OT>
146where
147 OT: FnOnce() -> F::Output,
148{
149 type Output = F::Output;
150
151 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152 let this = self.project();
153 if let Poll::Ready(item) = this.future.poll(cx) {
154 return Poll::Ready(item);
155 }
156 if let Poll::Ready(()) = this.timer.poll(cx) {
157 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
158 let item = (ot)();
159 return Poll::Ready(item);
160 }
161 Poll::Pending
162 }
163}
164
165pin_project! {
166 #[derive(Debug)]
169 #[must_use = "futures do nothing unless polled"]
170 pub struct OnStalled<F, OS> {
171 #[pin]
172 timer: Timer,
173 #[pin]
174 future: F,
175 timeout: std::time::Duration,
176 on_stalled: Option<OS>,
177 }
178}
179
180impl<F: Future, OS> Future for OnStalled<F, OS>
181where
182 OS: FnOnce() -> F::Output,
183{
184 type Output = F::Output;
185
186 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
187 let mut this = self.project();
188 if let Poll::Ready(item) = this.future.poll(cx) {
189 return Poll::Ready(item);
190 }
191 match this.timer.as_mut().poll(cx) {
192 Poll::Ready(()) => {}
193 Poll::Pending => {
194 this.timer.set(this.timeout.into_timer());
195 ready!(this.timer.as_mut().poll(cx));
196 }
197 }
198 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
199 }
200}
201
202#[cfg(test)]
203mod task_tests {
204
205 use super::*;
206 use futures::channel::oneshot;
207
208 fn run(f: impl Send + 'static + Future<Output = ()>) {
209 const TEST_THREADS: u8 = 2;
210 SendExecutor::new(TEST_THREADS).run(f)
211 }
212
213 #[test]
214 fn can_detach() {
215 run(async move {
216 let (tx_started, rx_started) = oneshot::channel();
217 let (tx_continue, rx_continue) = oneshot::channel();
218 let (tx_done, rx_done) = oneshot::channel();
219 {
220 Task::spawn(async move {
223 tx_started.send(()).unwrap();
224 rx_continue.await.unwrap();
225 tx_done.send(()).unwrap();
226 })
227 .detach();
228 }
229 rx_started.await.unwrap();
231 tx_continue.send(()).unwrap();
232 rx_done.await.unwrap();
233 });
234 }
235
236 #[test]
237 fn can_join() {
238 run(async move {
240 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
241 })
242 }
243
244 #[test]
245 fn can_join_unblock() {
246 run(async move {
248 assert_eq!(42, unblock(|| 42u8).await);
249 })
250 }
251
252 #[test]
253 fn can_join_unblock_local() {
254 LocalExecutor::new().run_singlethreaded(async move {
256 assert_eq!(42, unblock(|| 42u8).await);
257 });
258 }
259
260 #[test]
261 #[should_panic]
262 #[cfg_attr(feature = "variant_asan", ignore)]
264 fn unblock_fn_panics() {
265 run(async move {
266 unblock(|| panic!("bad")).await;
267 })
268 }
269
270 #[test]
271 fn can_join_local() {
272 LocalExecutor::new().run_singlethreaded(async move {
274 assert_eq!(42, Task::local(async move { 42u8 }).await);
275 })
276 }
277
278 #[test]
279 fn can_cancel() {
280 run(async move {
281 let (_tx_start, rx_start) = oneshot::channel::<()>();
282 let (tx_done, rx_done) = oneshot::channel();
283 let _ = Task::spawn(async move {
285 rx_start.await.unwrap();
286 tx_done.send(()).unwrap();
287 });
288 rx_done.await.expect_err("done should not be sent");
290 })
291 }
292}
293
294#[cfg(test)]
295mod timer_tests {
296 use super::*;
297 use futures::future::Either;
298 use std::pin::pin;
299
300 #[test]
301 fn shorter_fires_first_instant() {
302 use std::time::{Duration, Instant};
303 let mut exec = LocalExecutor::new();
304 let now = Instant::now();
305 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
306 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
307 match exec.run_singlethreaded(future::select(shorter, longer)) {
308 Either::Left((_, _)) => {}
309 Either::Right((_, _)) => panic!("wrong timer fired"),
310 }
311 }
312
313 #[cfg(target_os = "fuchsia")]
314 #[test]
315 fn can_use_zx_duration() {
316 let mut exec = LocalExecutor::new();
317 let start = MonotonicInstant::now();
318 let timer = Timer::new(MonotonicDuration::from_millis(100));
319 exec.run_singlethreaded(timer);
320 let end = MonotonicInstant::now();
321 assert!(end - start > MonotonicDuration::from_millis(100));
322 }
323
324 #[test]
325 fn can_detect_stalls() {
326 use std::sync::atomic::{AtomicU64, Ordering};
327 use std::sync::Arc;
328 use std::time::Duration;
329 let runs = Arc::new(AtomicU64::new(0));
330 assert_eq!(
331 {
332 let runs = runs.clone();
333 LocalExecutor::new().run_singlethreaded(
334 async move {
335 let mut sleep = Duration::from_millis(1);
336 loop {
337 Timer::new(sleep).await;
338 sleep *= 2;
339 runs.fetch_add(1, Ordering::SeqCst);
340 }
341 }
342 .on_stalled(Duration::from_secs(1), || 1u8),
343 )
344 },
345 1u8
346 );
347 assert!(runs.load(Ordering::SeqCst) >= 9);
348 }
349}