fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18pub use implementation::executor::{
20 LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21 SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34 executor::{
35 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36 },
37 timer::Interval,
38};
39
40pub mod scope;
44
45pub use scope::{Scope, ScopeHandle};
46
47use futures::prelude::*;
48use futures::task::AtomicWaker;
49use pin_project_lite::pin_project;
50use std::pin::Pin;
51use std::sync::Arc;
52use std::task::{Context, Poll, Wake, ready};
53
54pub trait DurationExt {
56 fn after_now(self) -> MonotonicInstant;
61}
62
63pub trait WakeupTime {
65 fn into_timer(self) -> Timer;
70}
71
72#[cfg(target_os = "fuchsia")]
73impl WakeupTime for std::time::Duration {
74 fn into_timer(self) -> Timer {
75 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
76 }
77}
78
79#[cfg(not(target_os = "fuchsia"))]
80impl WakeupTime for std::time::Duration {
81 fn into_timer(self) -> Timer {
82 Timer::from(self)
83 }
84}
85
86#[cfg(target_os = "fuchsia")]
87impl WakeupTime for MonotonicDuration {
88 fn into_timer(self) -> Timer {
89 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
90 }
91}
92
93#[cfg(target_os = "fuchsia")]
94impl WakeupTime for zx::BootDuration {
95 fn into_timer(self) -> Timer {
96 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
97 }
98}
99
100impl DurationExt for std::time::Duration {
101 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
103 MonotonicInstant::now() + self.into()
104 }
105}
106
107pub trait TimeoutExt: Future + Sized {
109 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
112 where
113 WT: WakeupTime,
114 OT: FnOnce() -> Self::Output,
115 {
116 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
117 }
118
119 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
124 where
125 OS: FnOnce() -> Self::Output,
126 {
127 OnStalled {
128 timer: timeout.into_timer(),
129 future: self,
130 timeout,
131 on_stalled: Some(on_stalled),
132 waker: Arc::default(),
133 }
134 }
135}
136
137impl<F: Future + Sized> TimeoutExt for F {}
138
139pin_project! {
140 #[derive(Debug)]
142 #[must_use = "futures do nothing unless polled"]
143 pub struct OnTimeout<F, OT> {
144 #[pin]
145 timer: Timer,
146 #[pin]
147 future: F,
148 on_timeout: Option<OT>,
149 }
150}
151
152impl<F: Future, OT> Future for OnTimeout<F, OT>
153where
154 OT: FnOnce() -> F::Output,
155{
156 type Output = F::Output;
157
158 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159 let this = self.project();
160 if let Poll::Ready(item) = this.future.poll(cx) {
161 return Poll::Ready(item);
162 }
163 if let Poll::Ready(()) = this.timer.poll(cx) {
164 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
165 let item = (ot)();
166 return Poll::Ready(item);
167 }
168 Poll::Pending
169 }
170}
171
172pin_project! {
173 #[derive(Debug)]
176 #[must_use = "futures do nothing unless polled"]
177 pub struct OnStalled<F, OS> {
178 #[pin]
179 timer: Timer,
180 #[pin]
181 future: F,
182 timeout: std::time::Duration,
183 on_stalled: Option<OS>,
184 waker: Arc<OnStalledWaker>,
185 }
186}
187
188impl<F: Future, OS> Future for OnStalled<F, OS>
189where
190 OS: FnOnce() -> F::Output,
191{
192 type Output = F::Output;
193
194 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 let mut this = self.project();
196 let mut woken = this.waker.original_waker.take().is_none();
198 loop {
199 this.waker.original_waker.register(cx.waker());
200 let waker = this.waker.clone().into();
201 let mut cx2 = Context::from_waker(&waker);
202 if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
203 return Poll::Ready(item);
204 }
205 if woken || this.timer.as_mut().poll(cx).is_pending() {
206 this.timer.set(this.timeout.into_timer());
207 ready!(this.timer.as_mut().poll(cx));
208 woken = this.waker.original_waker.take().is_none();
211 }
212 if !woken {
213 return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
214 }
215 }
216 }
217}
218
219#[derive(Debug, Default)]
221struct OnStalledWaker {
222 original_waker: AtomicWaker,
223}
224
225impl Wake for OnStalledWaker {
226 fn wake(self: Arc<Self>) {
227 self.original_waker.wake();
228 }
229}
230
231#[cfg(test)]
232mod task_tests {
233
234 use super::*;
235 use futures::channel::oneshot;
236
237 fn run(f: impl Send + 'static + Future<Output = ()>) {
238 const TEST_THREADS: u8 = 2;
239 SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
240 }
241
242 #[test]
243 fn can_detach() {
244 run(async move {
245 let (tx_started, rx_started) = oneshot::channel();
246 let (tx_continue, rx_continue) = oneshot::channel();
247 let (tx_done, rx_done) = oneshot::channel();
248 {
249 Task::spawn(async move {
252 tx_started.send(()).unwrap();
253 rx_continue.await.unwrap();
254 tx_done.send(()).unwrap();
255 })
256 .detach();
257 }
258 rx_started.await.unwrap();
260 tx_continue.send(()).unwrap();
261 rx_done.await.unwrap();
262 });
263 }
264
265 #[test]
266 fn can_join() {
267 run(async move {
269 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
270 })
271 }
272
273 #[test]
274 fn can_join_unblock() {
275 run(async move {
277 assert_eq!(42, unblock(|| 42u8).await);
278 })
279 }
280
281 #[test]
282 fn can_join_unblock_local() {
283 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
285 assert_eq!(42, unblock(|| 42u8).await);
286 });
287 }
288
289 #[test]
290 #[should_panic]
291 #[cfg_attr(feature = "variant_asan", ignore)]
293 #[cfg_attr(feature = "variant_hwasan", ignore)]
294 fn unblock_fn_panics() {
295 run(async move {
296 unblock(|| panic!("bad")).await;
297 })
298 }
299
300 #[test]
301 fn can_join_local() {
302 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
304 assert_eq!(42, Task::local(async move { 42u8 }).await);
305 })
306 }
307
308 #[test]
309 fn can_cancel() {
310 run(async move {
311 let (_tx_start, rx_start) = oneshot::channel::<()>();
312 let (tx_done, rx_done) = oneshot::channel();
313 drop(Task::spawn(async move {
315 rx_start.await.unwrap();
316 tx_done.send(()).unwrap();
317 }));
318 rx_done.await.expect_err("done should not be sent");
320 })
321 }
322}
323
324#[cfg(test)]
325mod timer_tests {
326 use super::*;
327 use futures::future::Either;
328 use std::pin::pin;
329
330 #[test]
331 fn shorter_fires_first_instant() {
332 use std::time::{Duration, Instant};
333 let mut exec = LocalExecutorBuilder::new().build();
334 let now = Instant::now();
335 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
336 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
337 match exec.run_singlethreaded(future::select(shorter, longer)) {
338 Either::Left((_, _)) => {}
339 Either::Right((_, _)) => panic!("wrong timer fired"),
340 }
341 }
342
343 #[cfg(target_os = "fuchsia")]
344 #[test]
345 fn can_use_zx_duration() {
346 let mut exec = LocalExecutorBuilder::new().build();
347 let start = MonotonicInstant::now();
348 let timer = Timer::new(MonotonicDuration::from_millis(100));
349 exec.run_singlethreaded(timer);
350 let end = MonotonicInstant::now();
351 assert!(end - start > MonotonicDuration::from_millis(100));
352 }
353
354 #[test]
355 fn can_detect_stalls() {
356 use std::sync::Arc;
357 use std::sync::atomic::{AtomicU64, Ordering};
358 use std::time::Duration;
359 let runs = Arc::new(AtomicU64::new(0));
360 assert_eq!(
361 {
362 let runs = runs.clone();
363 LocalExecutorBuilder::new().build().run_singlethreaded(
364 async move {
365 let mut sleep = Duration::from_millis(1);
366 loop {
367 Timer::new(sleep).await;
368 sleep *= 2;
369 runs.fetch_add(1, Ordering::SeqCst);
370 }
371 }
372 .on_stalled(Duration::from_secs(1), || 1u8),
373 )
374 },
375 1u8
376 );
377 assert!(runs.load(Ordering::SeqCst) >= 9);
378 }
379}