fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18pub use implementation::executor::{
20 LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21 SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26pub mod epoch;
27
28#[cfg(target_os = "fuchsia")]
30pub use self::fuchsia::{
31 executor::{
32 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
33 },
34 task::LowPriorityTask,
35 timer::Interval,
36};
37
38pub mod scope;
42
43pub use scope::{Scope, ScopeHandle};
44
45use futures::prelude::*;
46use futures::task::AtomicWaker;
47use pin_project_lite::pin_project;
48use std::pin::Pin;
49use std::sync::Arc;
50use std::task::{Context, Poll, Wake, ready};
51
52pub trait DurationExt {
54 fn after_now(self) -> MonotonicInstant;
59}
60
61pub trait WakeupTime {
63 fn into_timer(self) -> Timer;
68}
69
70#[cfg(target_os = "fuchsia")]
71impl WakeupTime for std::time::Duration {
72 fn into_timer(self) -> Timer {
73 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
74 }
75}
76
77#[cfg(not(target_os = "fuchsia"))]
78impl WakeupTime for std::time::Duration {
79 fn into_timer(self) -> Timer {
80 Timer::from(self)
81 }
82}
83
84#[cfg(target_os = "fuchsia")]
85impl WakeupTime for MonotonicDuration {
86 fn into_timer(self) -> Timer {
87 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
88 }
89}
90
91#[cfg(target_os = "fuchsia")]
92impl WakeupTime for zx::BootDuration {
93 fn into_timer(self) -> Timer {
94 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
95 }
96}
97
98impl DurationExt for std::time::Duration {
99 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
101 MonotonicInstant::now() + self.into()
102 }
103}
104
105pub trait TimeoutExt: Future + Sized {
107 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
110 where
111 WT: WakeupTime,
112 OT: FnOnce() -> Self::Output,
113 {
114 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
115 }
116
117 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
122 where
123 OS: FnOnce() -> Self::Output,
124 {
125 OnStalled {
126 timer: timeout.into_timer(),
127 future: self,
128 timeout,
129 on_stalled: Some(on_stalled),
130 waker: Arc::default(),
131 }
132 }
133}
134
135impl<F: Future + Sized> TimeoutExt for F {}
136
137pin_project! {
138 #[derive(Debug)]
140 #[must_use = "futures do nothing unless polled"]
141 pub struct OnTimeout<F, OT> {
142 #[pin]
143 timer: Timer,
144 #[pin]
145 future: F,
146 on_timeout: Option<OT>,
147 }
148}
149
150impl<F: Future, OT> Future for OnTimeout<F, OT>
151where
152 OT: FnOnce() -> F::Output,
153{
154 type Output = F::Output;
155
156 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 let this = self.project();
158 if let Poll::Ready(item) = this.future.poll(cx) {
159 return Poll::Ready(item);
160 }
161 if let Poll::Ready(()) = this.timer.poll(cx) {
162 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
163 let item = (ot)();
164 return Poll::Ready(item);
165 }
166 Poll::Pending
167 }
168}
169
170pin_project! {
171 #[derive(Debug)]
174 #[must_use = "futures do nothing unless polled"]
175 pub struct OnStalled<F, OS> {
176 #[pin]
177 timer: Timer,
178 #[pin]
179 future: F,
180 timeout: std::time::Duration,
181 on_stalled: Option<OS>,
182 waker: Arc<OnStalledWaker>,
183 }
184}
185
186impl<F: Future, OS> Future for OnStalled<F, OS>
187where
188 OS: FnOnce() -> F::Output,
189{
190 type Output = F::Output;
191
192 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let mut this = self.project();
194 let mut woken = this.waker.original_waker.take().is_none();
196 loop {
197 this.waker.original_waker.register(cx.waker());
198 let waker = this.waker.clone().into();
199 let mut cx2 = Context::from_waker(&waker);
200 if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
201 return Poll::Ready(item);
202 }
203 if woken || this.timer.as_mut().poll(cx).is_pending() {
204 this.timer.set(this.timeout.into_timer());
205 ready!(this.timer.as_mut().poll(cx));
206 woken = this.waker.original_waker.take().is_none();
209 }
210 if !woken {
211 return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
212 }
213 }
214 }
215}
216
217#[derive(Debug, Default)]
219struct OnStalledWaker {
220 original_waker: AtomicWaker,
221}
222
223impl Wake for OnStalledWaker {
224 fn wake(self: Arc<Self>) {
225 self.original_waker.wake();
226 }
227}
228
229#[cfg(test)]
230mod task_tests {
231
232 use super::*;
233 use futures::channel::oneshot;
234
235 fn run(f: impl Send + 'static + Future<Output = ()>) {
236 const TEST_THREADS: u8 = 2;
237 SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
238 }
239
240 #[test]
241 fn can_detach() {
242 run(async move {
243 let (tx_started, rx_started) = oneshot::channel();
244 let (tx_continue, rx_continue) = oneshot::channel();
245 let (tx_done, rx_done) = oneshot::channel();
246 {
247 Task::spawn(async move {
250 tx_started.send(()).unwrap();
251 rx_continue.await.unwrap();
252 tx_done.send(()).unwrap();
253 })
254 .detach();
255 }
256 rx_started.await.unwrap();
258 tx_continue.send(()).unwrap();
259 rx_done.await.unwrap();
260 });
261 }
262
263 #[test]
264 fn can_join() {
265 run(async move {
267 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
268 })
269 }
270
271 #[test]
272 fn can_join_unblock() {
273 run(async move {
275 assert_eq!(42, unblock(|| 42u8).await);
276 })
277 }
278
279 #[test]
280 fn can_join_unblock_local() {
281 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
283 assert_eq!(42, unblock(|| 42u8).await);
284 });
285 }
286
287 #[test]
288 #[should_panic]
289 #[cfg_attr(feature = "variant_asan", ignore)]
291 #[cfg_attr(feature = "variant_hwasan", ignore)]
292 fn unblock_fn_panics() {
293 run(async move {
294 unblock(|| panic!("bad")).await;
295 })
296 }
297
298 #[test]
299 fn can_join_local() {
300 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
302 assert_eq!(42, Task::local(async move { 42u8 }).await);
303 })
304 }
305
306 #[test]
307 fn can_cancel() {
308 run(async move {
309 let (_tx_start, rx_start) = oneshot::channel::<()>();
310 let (tx_done, rx_done) = oneshot::channel();
311 drop(Task::spawn(async move {
313 rx_start.await.unwrap();
314 tx_done.send(()).unwrap();
315 }));
316 rx_done.await.expect_err("done should not be sent");
318 })
319 }
320}
321
322#[cfg(test)]
323mod timer_tests {
324 use super::*;
325 use futures::future::Either;
326 use std::pin::pin;
327
328 #[test]
329 fn shorter_fires_first_instant() {
330 use std::time::{Duration, Instant};
331 let mut exec = LocalExecutorBuilder::new().build();
332 let now = Instant::now();
333 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
334 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
335 match exec.run_singlethreaded(future::select(shorter, longer)) {
336 Either::Left((_, _)) => {}
337 Either::Right((_, _)) => panic!("wrong timer fired"),
338 }
339 }
340
341 #[cfg(target_os = "fuchsia")]
342 #[test]
343 fn can_use_zx_duration() {
344 let mut exec = LocalExecutorBuilder::new().build();
345 let start = MonotonicInstant::now();
346 let timer = Timer::new(MonotonicDuration::from_millis(100));
347 exec.run_singlethreaded(timer);
348 let end = MonotonicInstant::now();
349 assert!(end - start > MonotonicDuration::from_millis(100));
350 }
351
352 #[test]
353 fn can_detect_stalls() {
354 use std::sync::Arc;
355 use std::sync::atomic::{AtomicU64, Ordering};
356 use std::time::Duration;
357 let runs = Arc::new(AtomicU64::new(0));
358 assert_eq!(
359 {
360 let runs = runs.clone();
361 LocalExecutorBuilder::new().build().run_singlethreaded(
362 async move {
363 let mut sleep = Duration::from_millis(1);
364 loop {
365 Timer::new(sleep).await;
366 sleep *= 2;
367 runs.fetch_add(1, Ordering::SeqCst);
368 }
369 }
370 .on_stalled(Duration::from_secs(1), || 1u8),
371 )
372 },
373 1u8
374 );
375 assert!(runs.load(Ordering::SeqCst) >= 9);
376 }
377}