fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18pub use implementation::executor::{
20 LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21 SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34 executor::{
35 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36 },
37 timer::Interval,
38};
39
40pub mod scope;
44
45pub use scope::{Scope, ScopeHandle};
46
47use futures::prelude::*;
48use pin_project_lite::pin_project;
49use std::pin::Pin;
50use std::task::{Context, Poll, ready};
51
52pub trait DurationExt {
54 fn after_now(self) -> MonotonicInstant;
59}
60
61pub trait WakeupTime {
63 fn into_timer(self) -> Timer;
68}
69
70#[cfg(target_os = "fuchsia")]
71impl WakeupTime for std::time::Duration {
72 fn into_timer(self) -> Timer {
73 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
74 }
75}
76
77#[cfg(not(target_os = "fuchsia"))]
78impl WakeupTime for std::time::Duration {
79 fn into_timer(self) -> Timer {
80 Timer::from(self)
81 }
82}
83
84#[cfg(target_os = "fuchsia")]
85impl WakeupTime for MonotonicDuration {
86 fn into_timer(self) -> Timer {
87 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
88 }
89}
90
91#[cfg(target_os = "fuchsia")]
92impl WakeupTime for zx::BootDuration {
93 fn into_timer(self) -> Timer {
94 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
95 }
96}
97
98impl DurationExt for std::time::Duration {
99 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
101 MonotonicInstant::now() + self.into()
102 }
103}
104
105pub trait TimeoutExt: Future + Sized {
107 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
110 where
111 WT: WakeupTime,
112 OT: FnOnce() -> Self::Output,
113 {
114 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
115 }
116
117 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
122 where
123 OS: FnOnce() -> Self::Output,
124 {
125 OnStalled {
126 timer: timeout.into_timer(),
127 future: self,
128 timeout,
129 on_stalled: Some(on_stalled),
130 }
131 }
132}
133
134impl<F: Future + Sized> TimeoutExt for F {}
135
136pin_project! {
137 #[derive(Debug)]
139 #[must_use = "futures do nothing unless polled"]
140 pub struct OnTimeout<F, OT> {
141 #[pin]
142 timer: Timer,
143 #[pin]
144 future: F,
145 on_timeout: Option<OT>,
146 }
147}
148
149impl<F: Future, OT> Future for OnTimeout<F, OT>
150where
151 OT: FnOnce() -> F::Output,
152{
153 type Output = F::Output;
154
155 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156 let this = self.project();
157 if let Poll::Ready(item) = this.future.poll(cx) {
158 return Poll::Ready(item);
159 }
160 if let Poll::Ready(()) = this.timer.poll(cx) {
161 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
162 let item = (ot)();
163 return Poll::Ready(item);
164 }
165 Poll::Pending
166 }
167}
168
169pin_project! {
170 #[derive(Debug)]
173 #[must_use = "futures do nothing unless polled"]
174 pub struct OnStalled<F, OS> {
175 #[pin]
176 timer: Timer,
177 #[pin]
178 future: F,
179 timeout: std::time::Duration,
180 on_stalled: Option<OS>,
181 }
182}
183
184impl<F: Future, OS> Future for OnStalled<F, OS>
185where
186 OS: FnOnce() -> F::Output,
187{
188 type Output = F::Output;
189
190 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191 let mut this = self.project();
192 if let Poll::Ready(item) = this.future.poll(cx) {
193 return Poll::Ready(item);
194 }
195 match this.timer.as_mut().poll(cx) {
196 Poll::Ready(()) => {}
197 Poll::Pending => {
198 this.timer.set(this.timeout.into_timer());
199 ready!(this.timer.as_mut().poll(cx));
200 }
201 }
202 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
203 }
204}
205
206#[cfg(test)]
207mod task_tests {
208
209 use super::*;
210 use futures::channel::oneshot;
211
212 fn run(f: impl Send + 'static + Future<Output = ()>) {
213 const TEST_THREADS: u8 = 2;
214 SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
215 }
216
217 #[test]
218 fn can_detach() {
219 run(async move {
220 let (tx_started, rx_started) = oneshot::channel();
221 let (tx_continue, rx_continue) = oneshot::channel();
222 let (tx_done, rx_done) = oneshot::channel();
223 {
224 Task::spawn(async move {
227 tx_started.send(()).unwrap();
228 rx_continue.await.unwrap();
229 tx_done.send(()).unwrap();
230 })
231 .detach();
232 }
233 rx_started.await.unwrap();
235 tx_continue.send(()).unwrap();
236 rx_done.await.unwrap();
237 });
238 }
239
240 #[test]
241 fn can_join() {
242 run(async move {
244 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
245 })
246 }
247
248 #[test]
249 fn can_join_unblock() {
250 run(async move {
252 assert_eq!(42, unblock(|| 42u8).await);
253 })
254 }
255
256 #[test]
257 fn can_join_unblock_local() {
258 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
260 assert_eq!(42, unblock(|| 42u8).await);
261 });
262 }
263
264 #[test]
265 #[should_panic]
266 #[cfg_attr(feature = "variant_asan", ignore)]
268 #[cfg_attr(feature = "variant_hwasan", ignore)]
269 fn unblock_fn_panics() {
270 run(async move {
271 unblock(|| panic!("bad")).await;
272 })
273 }
274
275 #[test]
276 fn can_join_local() {
277 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
279 assert_eq!(42, Task::local(async move { 42u8 }).await);
280 })
281 }
282
283 #[test]
284 fn can_cancel() {
285 run(async move {
286 let (_tx_start, rx_start) = oneshot::channel::<()>();
287 let (tx_done, rx_done) = oneshot::channel();
288 drop(Task::spawn(async move {
290 rx_start.await.unwrap();
291 tx_done.send(()).unwrap();
292 }));
293 rx_done.await.expect_err("done should not be sent");
295 })
296 }
297}
298
299#[cfg(test)]
300mod timer_tests {
301 use super::*;
302 use futures::future::Either;
303 use std::pin::pin;
304
305 #[test]
306 fn shorter_fires_first_instant() {
307 use std::time::{Duration, Instant};
308 let mut exec = LocalExecutorBuilder::new().build();
309 let now = Instant::now();
310 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
311 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
312 match exec.run_singlethreaded(future::select(shorter, longer)) {
313 Either::Left((_, _)) => {}
314 Either::Right((_, _)) => panic!("wrong timer fired"),
315 }
316 }
317
318 #[cfg(target_os = "fuchsia")]
319 #[test]
320 fn can_use_zx_duration() {
321 let mut exec = LocalExecutorBuilder::new().build();
322 let start = MonotonicInstant::now();
323 let timer = Timer::new(MonotonicDuration::from_millis(100));
324 exec.run_singlethreaded(timer);
325 let end = MonotonicInstant::now();
326 assert!(end - start > MonotonicDuration::from_millis(100));
327 }
328
329 #[test]
330 fn can_detect_stalls() {
331 use std::sync::Arc;
332 use std::sync::atomic::{AtomicU64, Ordering};
333 use std::time::Duration;
334 let runs = Arc::new(AtomicU64::new(0));
335 assert_eq!(
336 {
337 let runs = runs.clone();
338 LocalExecutorBuilder::new().build().run_singlethreaded(
339 async move {
340 let mut sleep = Duration::from_millis(1);
341 loop {
342 Timer::new(sleep).await;
343 sleep *= 2;
344 runs.fetch_add(1, Ordering::SeqCst);
345 }
346 }
347 .on_stalled(Duration::from_secs(1), || 1u8),
348 )
349 },
350 1u8
351 );
352 assert!(runs.load(Ordering::SeqCst) >= 9);
353 }
354}