fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18// Exports common to all target os.
19pub use implementation::executor::{
20    LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21    SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31// Fuchsia specific exports.
32#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34    executor::{
35        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36    },
37    timer::Interval,
38};
39
40/// Structured concurrency API for fuchsia-async.
41///
42/// See the [`Scope`] documentation for details.
43pub mod scope;
44
45pub use scope::{Scope, ScopeHandle};
46
47use futures::prelude::*;
48use pin_project_lite::pin_project;
49use std::pin::Pin;
50use std::task::{Context, Poll, ready};
51
52/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
53pub trait DurationExt {
54    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
55    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
56    ///
57    /// This method requires that an executor has been set up.
58    fn after_now(self) -> MonotonicInstant;
59}
60
61/// The time when a Timer should wakeup.
62pub trait WakeupTime {
63    /// Create a timer based on this time.
64    ///
65    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
66    /// never earlier.
67    fn into_timer(self) -> Timer;
68}
69
70#[cfg(target_os = "fuchsia")]
71impl WakeupTime for std::time::Duration {
72    fn into_timer(self) -> Timer {
73        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
74    }
75}
76
77#[cfg(not(target_os = "fuchsia"))]
78impl WakeupTime for std::time::Duration {
79    fn into_timer(self) -> Timer {
80        Timer::from(self)
81    }
82}
83
84#[cfg(target_os = "fuchsia")]
85impl WakeupTime for MonotonicDuration {
86    fn into_timer(self) -> Timer {
87        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
88    }
89}
90
91#[cfg(target_os = "fuchsia")]
92impl WakeupTime for zx::BootDuration {
93    fn into_timer(self) -> Timer {
94        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
95    }
96}
97
98impl DurationExt for std::time::Duration {
99    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
100    fn after_now(self) -> MonotonicInstant {
101        MonotonicInstant::now() + self.into()
102    }
103}
104
105/// A trait which allows futures to be easily wrapped in a timeout.
106pub trait TimeoutExt: Future + Sized {
107    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
108    /// when the timeout occurs.
109    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
110    where
111        WT: WakeupTime,
112        OT: FnOnce() -> Self::Output,
113    {
114        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
115    }
116
117    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
118    /// when the future hasn't been otherwise polled within the `timeout`.
119    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
120    /// and moving all work to external tasks or threads with force the triggering early.
121    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
122    where
123        OS: FnOnce() -> Self::Output,
124    {
125        OnStalled {
126            timer: timeout.into_timer(),
127            future: self,
128            timeout,
129            on_stalled: Some(on_stalled),
130        }
131    }
132}
133
134impl<F: Future + Sized> TimeoutExt for F {}
135
136pin_project! {
137    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
138    #[derive(Debug)]
139    #[must_use = "futures do nothing unless polled"]
140    pub struct OnTimeout<F, OT> {
141        #[pin]
142        timer: Timer,
143        #[pin]
144        future: F,
145        on_timeout: Option<OT>,
146    }
147}
148
149impl<F: Future, OT> Future for OnTimeout<F, OT>
150where
151    OT: FnOnce() -> F::Output,
152{
153    type Output = F::Output;
154
155    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156        let this = self.project();
157        if let Poll::Ready(item) = this.future.poll(cx) {
158            return Poll::Ready(item);
159        }
160        if let Poll::Ready(()) = this.timer.poll(cx) {
161            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
162            let item = (ot)();
163            return Poll::Ready(item);
164        }
165        Poll::Pending
166    }
167}
168
169pin_project! {
170    /// A wrapper for a future who's steady progress is monitored and will complete with the
171    /// provided closure if no progress is made before the timeout.
172    #[derive(Debug)]
173    #[must_use = "futures do nothing unless polled"]
174    pub struct OnStalled<F, OS> {
175        #[pin]
176        timer: Timer,
177        #[pin]
178        future: F,
179        timeout: std::time::Duration,
180        on_stalled: Option<OS>,
181    }
182}
183
184impl<F: Future, OS> Future for OnStalled<F, OS>
185where
186    OS: FnOnce() -> F::Output,
187{
188    type Output = F::Output;
189
190    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191        let mut this = self.project();
192        if let Poll::Ready(item) = this.future.poll(cx) {
193            return Poll::Ready(item);
194        }
195        match this.timer.as_mut().poll(cx) {
196            Poll::Ready(()) => {}
197            Poll::Pending => {
198                this.timer.set(this.timeout.into_timer());
199                ready!(this.timer.as_mut().poll(cx));
200            }
201        }
202        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
203    }
204}
205
206#[cfg(test)]
207mod task_tests {
208
209    use super::*;
210    use futures::channel::oneshot;
211
212    fn run(f: impl Send + 'static + Future<Output = ()>) {
213        const TEST_THREADS: u8 = 2;
214        SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
215    }
216
217    #[test]
218    fn can_detach() {
219        run(async move {
220            let (tx_started, rx_started) = oneshot::channel();
221            let (tx_continue, rx_continue) = oneshot::channel();
222            let (tx_done, rx_done) = oneshot::channel();
223            {
224                // spawn a task and detach it
225                // the task will wait for a signal, signal it received it, and then wait for another
226                Task::spawn(async move {
227                    tx_started.send(()).unwrap();
228                    rx_continue.await.unwrap();
229                    tx_done.send(()).unwrap();
230                })
231                .detach();
232            }
233            // task is detached, have a short conversation with it
234            rx_started.await.unwrap();
235            tx_continue.send(()).unwrap();
236            rx_done.await.unwrap();
237        });
238    }
239
240    #[test]
241    fn can_join() {
242        // can we spawn, then join a task
243        run(async move {
244            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
245        })
246    }
247
248    #[test]
249    fn can_join_unblock() {
250        // can we poll a blocked task
251        run(async move {
252            assert_eq!(42, unblock(|| 42u8).await);
253        })
254    }
255
256    #[test]
257    fn can_join_unblock_local() {
258        // can we poll a blocked task in a local executor
259        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
260            assert_eq!(42, unblock(|| 42u8).await);
261        });
262    }
263
264    #[test]
265    #[should_panic]
266    // TODO(https://fxbug.dev/42169733): delete the below
267    #[cfg_attr(feature = "variant_asan", ignore)]
268    #[cfg_attr(feature = "variant_hwasan", ignore)]
269    fn unblock_fn_panics() {
270        run(async move {
271            unblock(|| panic!("bad")).await;
272        })
273    }
274
275    #[test]
276    fn can_join_local() {
277        // can we spawn, then join a task locally
278        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
279            assert_eq!(42, Task::local(async move { 42u8 }).await);
280        })
281    }
282
283    #[test]
284    fn can_cancel() {
285        run(async move {
286            let (_tx_start, rx_start) = oneshot::channel::<()>();
287            let (tx_done, rx_done) = oneshot::channel();
288            // Start and immediately cancel the task (by dropping it).
289            drop(Task::spawn(async move {
290                rx_start.await.unwrap();
291                tx_done.send(()).unwrap();
292            }));
293            // we should see an error on receive
294            rx_done.await.expect_err("done should not be sent");
295        })
296    }
297}
298
299#[cfg(test)]
300mod timer_tests {
301    use super::*;
302    use futures::future::Either;
303    use std::pin::pin;
304
305    #[test]
306    fn shorter_fires_first_instant() {
307        use std::time::{Duration, Instant};
308        let mut exec = LocalExecutorBuilder::new().build();
309        let now = Instant::now();
310        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
311        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
312        match exec.run_singlethreaded(future::select(shorter, longer)) {
313            Either::Left((_, _)) => {}
314            Either::Right((_, _)) => panic!("wrong timer fired"),
315        }
316    }
317
318    #[cfg(target_os = "fuchsia")]
319    #[test]
320    fn can_use_zx_duration() {
321        let mut exec = LocalExecutorBuilder::new().build();
322        let start = MonotonicInstant::now();
323        let timer = Timer::new(MonotonicDuration::from_millis(100));
324        exec.run_singlethreaded(timer);
325        let end = MonotonicInstant::now();
326        assert!(end - start > MonotonicDuration::from_millis(100));
327    }
328
329    #[test]
330    fn can_detect_stalls() {
331        use std::sync::Arc;
332        use std::sync::atomic::{AtomicU64, Ordering};
333        use std::time::Duration;
334        let runs = Arc::new(AtomicU64::new(0));
335        assert_eq!(
336            {
337                let runs = runs.clone();
338                LocalExecutorBuilder::new().build().run_singlethreaded(
339                    async move {
340                        let mut sleep = Duration::from_millis(1);
341                        loop {
342                            Timer::new(sleep).await;
343                            sleep *= 2;
344                            runs.fetch_add(1, Ordering::SeqCst);
345                        }
346                    }
347                    .on_stalled(Duration::from_secs(1), || 1u8),
348                )
349            },
350            1u8
351        );
352        assert!(runs.load(Ordering::SeqCst) >= 9);
353    }
354}