fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18pub use implementation::executor::{
20 LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21 SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34 executor::{
35 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36 },
37 task::LowPriorityTask,
38 timer::Interval,
39};
40
41pub mod scope;
45
46pub use scope::{Scope, ScopeHandle};
47
48use futures::prelude::*;
49use futures::task::AtomicWaker;
50use pin_project_lite::pin_project;
51use std::pin::Pin;
52use std::sync::Arc;
53use std::task::{Context, Poll, Wake, ready};
54
55pub trait DurationExt {
57 fn after_now(self) -> MonotonicInstant;
62}
63
64pub trait WakeupTime {
66 fn into_timer(self) -> Timer;
71}
72
73#[cfg(target_os = "fuchsia")]
74impl WakeupTime for std::time::Duration {
75 fn into_timer(self) -> Timer {
76 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
77 }
78}
79
80#[cfg(not(target_os = "fuchsia"))]
81impl WakeupTime for std::time::Duration {
82 fn into_timer(self) -> Timer {
83 Timer::from(self)
84 }
85}
86
87#[cfg(target_os = "fuchsia")]
88impl WakeupTime for MonotonicDuration {
89 fn into_timer(self) -> Timer {
90 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
91 }
92}
93
94#[cfg(target_os = "fuchsia")]
95impl WakeupTime for zx::BootDuration {
96 fn into_timer(self) -> Timer {
97 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
98 }
99}
100
101impl DurationExt for std::time::Duration {
102 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
104 MonotonicInstant::now() + self.into()
105 }
106}
107
108pub trait TimeoutExt: Future + Sized {
110 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
113 where
114 WT: WakeupTime,
115 OT: FnOnce() -> Self::Output,
116 {
117 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
118 }
119
120 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
125 where
126 OS: FnOnce() -> Self::Output,
127 {
128 OnStalled {
129 timer: timeout.into_timer(),
130 future: self,
131 timeout,
132 on_stalled: Some(on_stalled),
133 waker: Arc::default(),
134 }
135 }
136}
137
138impl<F: Future + Sized> TimeoutExt for F {}
139
140pin_project! {
141 #[derive(Debug)]
143 #[must_use = "futures do nothing unless polled"]
144 pub struct OnTimeout<F, OT> {
145 #[pin]
146 timer: Timer,
147 #[pin]
148 future: F,
149 on_timeout: Option<OT>,
150 }
151}
152
153impl<F: Future, OT> Future for OnTimeout<F, OT>
154where
155 OT: FnOnce() -> F::Output,
156{
157 type Output = F::Output;
158
159 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160 let this = self.project();
161 if let Poll::Ready(item) = this.future.poll(cx) {
162 return Poll::Ready(item);
163 }
164 if let Poll::Ready(()) = this.timer.poll(cx) {
165 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
166 let item = (ot)();
167 return Poll::Ready(item);
168 }
169 Poll::Pending
170 }
171}
172
173pin_project! {
174 #[derive(Debug)]
177 #[must_use = "futures do nothing unless polled"]
178 pub struct OnStalled<F, OS> {
179 #[pin]
180 timer: Timer,
181 #[pin]
182 future: F,
183 timeout: std::time::Duration,
184 on_stalled: Option<OS>,
185 waker: Arc<OnStalledWaker>,
186 }
187}
188
189impl<F: Future, OS> Future for OnStalled<F, OS>
190where
191 OS: FnOnce() -> F::Output,
192{
193 type Output = F::Output;
194
195 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196 let mut this = self.project();
197 let mut woken = this.waker.original_waker.take().is_none();
199 loop {
200 this.waker.original_waker.register(cx.waker());
201 let waker = this.waker.clone().into();
202 let mut cx2 = Context::from_waker(&waker);
203 if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
204 return Poll::Ready(item);
205 }
206 if woken || this.timer.as_mut().poll(cx).is_pending() {
207 this.timer.set(this.timeout.into_timer());
208 ready!(this.timer.as_mut().poll(cx));
209 woken = this.waker.original_waker.take().is_none();
212 }
213 if !woken {
214 return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
215 }
216 }
217 }
218}
219
220#[derive(Debug, Default)]
222struct OnStalledWaker {
223 original_waker: AtomicWaker,
224}
225
226impl Wake for OnStalledWaker {
227 fn wake(self: Arc<Self>) {
228 self.original_waker.wake();
229 }
230}
231
232#[cfg(test)]
233mod task_tests {
234
235 use super::*;
236 use futures::channel::oneshot;
237
238 fn run(f: impl Send + 'static + Future<Output = ()>) {
239 const TEST_THREADS: u8 = 2;
240 SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
241 }
242
243 #[test]
244 fn can_detach() {
245 run(async move {
246 let (tx_started, rx_started) = oneshot::channel();
247 let (tx_continue, rx_continue) = oneshot::channel();
248 let (tx_done, rx_done) = oneshot::channel();
249 {
250 Task::spawn(async move {
253 tx_started.send(()).unwrap();
254 rx_continue.await.unwrap();
255 tx_done.send(()).unwrap();
256 })
257 .detach();
258 }
259 rx_started.await.unwrap();
261 tx_continue.send(()).unwrap();
262 rx_done.await.unwrap();
263 });
264 }
265
266 #[test]
267 fn can_join() {
268 run(async move {
270 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
271 })
272 }
273
274 #[test]
275 fn can_join_unblock() {
276 run(async move {
278 assert_eq!(42, unblock(|| 42u8).await);
279 })
280 }
281
282 #[test]
283 fn can_join_unblock_local() {
284 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
286 assert_eq!(42, unblock(|| 42u8).await);
287 });
288 }
289
290 #[test]
291 #[should_panic]
292 #[cfg_attr(feature = "variant_asan", ignore)]
294 #[cfg_attr(feature = "variant_hwasan", ignore)]
295 fn unblock_fn_panics() {
296 run(async move {
297 unblock(|| panic!("bad")).await;
298 })
299 }
300
301 #[test]
302 fn can_join_local() {
303 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
305 assert_eq!(42, Task::local(async move { 42u8 }).await);
306 })
307 }
308
309 #[test]
310 fn can_cancel() {
311 run(async move {
312 let (_tx_start, rx_start) = oneshot::channel::<()>();
313 let (tx_done, rx_done) = oneshot::channel();
314 drop(Task::spawn(async move {
316 rx_start.await.unwrap();
317 tx_done.send(()).unwrap();
318 }));
319 rx_done.await.expect_err("done should not be sent");
321 })
322 }
323}
324
325#[cfg(test)]
326mod timer_tests {
327 use super::*;
328 use futures::future::Either;
329 use std::pin::pin;
330
331 #[test]
332 fn shorter_fires_first_instant() {
333 use std::time::{Duration, Instant};
334 let mut exec = LocalExecutorBuilder::new().build();
335 let now = Instant::now();
336 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
337 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
338 match exec.run_singlethreaded(future::select(shorter, longer)) {
339 Either::Left((_, _)) => {}
340 Either::Right((_, _)) => panic!("wrong timer fired"),
341 }
342 }
343
344 #[cfg(target_os = "fuchsia")]
345 #[test]
346 fn can_use_zx_duration() {
347 let mut exec = LocalExecutorBuilder::new().build();
348 let start = MonotonicInstant::now();
349 let timer = Timer::new(MonotonicDuration::from_millis(100));
350 exec.run_singlethreaded(timer);
351 let end = MonotonicInstant::now();
352 assert!(end - start > MonotonicDuration::from_millis(100));
353 }
354
355 #[test]
356 fn can_detect_stalls() {
357 use std::sync::Arc;
358 use std::sync::atomic::{AtomicU64, Ordering};
359 use std::time::Duration;
360 let runs = Arc::new(AtomicU64::new(0));
361 assert_eq!(
362 {
363 let runs = runs.clone();
364 LocalExecutorBuilder::new().build().run_singlethreaded(
365 async move {
366 let mut sleep = Duration::from_millis(1);
367 loop {
368 Timer::new(sleep).await;
369 sleep *= 2;
370 runs.fetch_add(1, Ordering::SeqCst);
371 }
372 }
373 .on_stalled(Duration::from_secs(1), || 1u8),
374 )
375 },
376 1u8
377 );
378 assert!(runs.load(Ordering::SeqCst) >= 9);
379 }
380}