Skip to main content

fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18// Exports common to all target os.
19pub use implementation::executor::{
20    LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21    SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26pub mod epoch;
27
28// Fuchsia specific exports.
29#[cfg(target_os = "fuchsia")]
30pub use self::fuchsia::{
31    executor::{
32        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
33    },
34    task::LowPriorityTask,
35    timer::Interval,
36};
37
38/// Structured concurrency API for fuchsia-async.
39///
40/// See the [`Scope`] documentation for details.
41pub mod scope;
42
43pub use scope::{Scope, ScopeHandle};
44
45use futures::prelude::*;
46use futures::task::AtomicWaker;
47use pin_project_lite::pin_project;
48use std::pin::Pin;
49use std::sync::Arc;
50use std::task::{Context, Poll, Wake, ready};
51
52/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
53pub trait DurationExt {
54    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
55    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
56    ///
57    /// This method requires that an executor has been set up.
58    fn after_now(self) -> MonotonicInstant;
59}
60
61/// The time when a Timer should wakeup.
62pub trait WakeupTime {
63    /// Create a timer based on this time.
64    ///
65    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
66    /// never earlier.
67    fn into_timer(self) -> Timer;
68}
69
70#[cfg(target_os = "fuchsia")]
71impl WakeupTime for std::time::Duration {
72    fn into_timer(self) -> Timer {
73        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
74    }
75}
76
77#[cfg(not(target_os = "fuchsia"))]
78impl WakeupTime for std::time::Duration {
79    fn into_timer(self) -> Timer {
80        Timer::from(self)
81    }
82}
83
84#[cfg(target_os = "fuchsia")]
85impl WakeupTime for MonotonicDuration {
86    fn into_timer(self) -> Timer {
87        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
88    }
89}
90
91#[cfg(target_os = "fuchsia")]
92impl WakeupTime for zx::BootDuration {
93    fn into_timer(self) -> Timer {
94        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
95    }
96}
97
98impl DurationExt for std::time::Duration {
99    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
100    fn after_now(self) -> MonotonicInstant {
101        MonotonicInstant::now() + self.into()
102    }
103}
104
105/// A trait which allows futures to be easily wrapped in a timeout.
106pub trait TimeoutExt: Future + Sized {
107    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
108    /// when the timeout occurs.
109    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
110    where
111        WT: WakeupTime,
112        OT: FnOnce() -> Self::Output,
113    {
114        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
115    }
116
117    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
118    /// when the future hasn't been otherwise polled within the `timeout`.
119    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
120    /// and moving all work to external tasks or threads with force the triggering early.
121    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
122    where
123        OS: FnOnce() -> Self::Output,
124    {
125        OnStalled {
126            timer: timeout.into_timer(),
127            future: self,
128            timeout,
129            on_stalled: Some(on_stalled),
130            waker: Arc::default(),
131        }
132    }
133}
134
135impl<F: Future + Sized> TimeoutExt for F {}
136
137pin_project! {
138    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
139    #[derive(Debug)]
140    #[must_use = "futures do nothing unless polled"]
141    pub struct OnTimeout<F, OT> {
142        #[pin]
143        timer: Timer,
144        #[pin]
145        future: F,
146        on_timeout: Option<OT>,
147    }
148}
149
150impl<F: Future, OT> Future for OnTimeout<F, OT>
151where
152    OT: FnOnce() -> F::Output,
153{
154    type Output = F::Output;
155
156    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157        let this = self.project();
158        if let Poll::Ready(item) = this.future.poll(cx) {
159            return Poll::Ready(item);
160        }
161        if let Poll::Ready(()) = this.timer.poll(cx) {
162            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
163            let item = (ot)();
164            return Poll::Ready(item);
165        }
166        Poll::Pending
167    }
168}
169
170pin_project! {
171    /// A wrapper for a future who's steady progress is monitored and will complete with the
172    /// provided closure if no progress is made before the timeout.
173    #[derive(Debug)]
174    #[must_use = "futures do nothing unless polled"]
175    pub struct OnStalled<F, OS> {
176        #[pin]
177        timer: Timer,
178        #[pin]
179        future: F,
180        timeout: std::time::Duration,
181        on_stalled: Option<OS>,
182        waker: Arc<OnStalledWaker>,
183    }
184}
185
186impl<F: Future, OS> Future for OnStalled<F, OS>
187where
188    OS: FnOnce() -> F::Output,
189{
190    type Output = F::Output;
191
192    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        let mut this = self.project();
194        // Use a different waker when polling the future so we know whether it was woken or not.
195        let mut woken = this.waker.original_waker.take().is_none();
196        loop {
197            this.waker.original_waker.register(cx.waker());
198            let waker = this.waker.clone().into();
199            let mut cx2 = Context::from_waker(&waker);
200            if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
201                return Poll::Ready(item);
202            }
203            if woken || this.timer.as_mut().poll(cx).is_pending() {
204                this.timer.set(this.timeout.into_timer());
205                ready!(this.timer.as_mut().poll(cx));
206                // The timer immediately fired.  See if the future was woken.  If it was, we'll
207                // loop around and poll the future again.
208                woken = this.waker.original_waker.take().is_none();
209            }
210            if !woken {
211                return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
212            }
213        }
214    }
215}
216
217// A different waker is used so we can tell when the future was woken, rather than the timer.
218#[derive(Debug, Default)]
219struct OnStalledWaker {
220    original_waker: AtomicWaker,
221}
222
223impl Wake for OnStalledWaker {
224    fn wake(self: Arc<Self>) {
225        self.original_waker.wake();
226    }
227}
228
229#[cfg(test)]
230mod task_tests {
231
232    use super::*;
233    use futures::channel::oneshot;
234
235    fn run(f: impl Send + 'static + Future<Output = ()>) {
236        const TEST_THREADS: u8 = 2;
237        SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
238    }
239
240    #[test]
241    fn can_detach() {
242        run(async move {
243            let (tx_started, rx_started) = oneshot::channel();
244            let (tx_continue, rx_continue) = oneshot::channel();
245            let (tx_done, rx_done) = oneshot::channel();
246            {
247                // spawn a task and detach it
248                // the task will wait for a signal, signal it received it, and then wait for another
249                Task::spawn(async move {
250                    tx_started.send(()).unwrap();
251                    rx_continue.await.unwrap();
252                    tx_done.send(()).unwrap();
253                })
254                .detach();
255            }
256            // task is detached, have a short conversation with it
257            rx_started.await.unwrap();
258            tx_continue.send(()).unwrap();
259            rx_done.await.unwrap();
260        });
261    }
262
263    #[test]
264    fn can_join() {
265        // can we spawn, then join a task
266        run(async move {
267            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
268        })
269    }
270
271    #[test]
272    fn can_join_unblock() {
273        // can we poll a blocked task
274        run(async move {
275            assert_eq!(42, unblock(|| 42u8).await);
276        })
277    }
278
279    #[test]
280    fn can_join_unblock_local() {
281        // can we poll a blocked task in a local executor
282        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
283            assert_eq!(42, unblock(|| 42u8).await);
284        });
285    }
286
287    #[test]
288    #[should_panic]
289    // TODO(https://fxbug.dev/42169733): delete the below
290    #[cfg_attr(feature = "variant_asan", ignore)]
291    #[cfg_attr(feature = "variant_hwasan", ignore)]
292    fn unblock_fn_panics() {
293        run(async move {
294            unblock(|| panic!("bad")).await;
295        })
296    }
297
298    #[test]
299    fn can_join_local() {
300        // can we spawn, then join a task locally
301        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
302            assert_eq!(42, Task::local(async move { 42u8 }).await);
303        })
304    }
305
306    #[test]
307    fn can_cancel() {
308        run(async move {
309            let (_tx_start, rx_start) = oneshot::channel::<()>();
310            let (tx_done, rx_done) = oneshot::channel();
311            // Start and immediately cancel the task (by dropping it).
312            drop(Task::spawn(async move {
313                rx_start.await.unwrap();
314                tx_done.send(()).unwrap();
315            }));
316            // we should see an error on receive
317            rx_done.await.expect_err("done should not be sent");
318        })
319    }
320}
321
322#[cfg(test)]
323mod timer_tests {
324    use super::*;
325    use futures::future::Either;
326    use std::pin::pin;
327
328    #[test]
329    fn shorter_fires_first_instant() {
330        use std::time::{Duration, Instant};
331        let mut exec = LocalExecutorBuilder::new().build();
332        let now = Instant::now();
333        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
334        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
335        match exec.run_singlethreaded(future::select(shorter, longer)) {
336            Either::Left((_, _)) => {}
337            Either::Right((_, _)) => panic!("wrong timer fired"),
338        }
339    }
340
341    #[cfg(target_os = "fuchsia")]
342    #[test]
343    fn can_use_zx_duration() {
344        let mut exec = LocalExecutorBuilder::new().build();
345        let start = MonotonicInstant::now();
346        let timer = Timer::new(MonotonicDuration::from_millis(100));
347        exec.run_singlethreaded(timer);
348        let end = MonotonicInstant::now();
349        assert!(end - start > MonotonicDuration::from_millis(100));
350    }
351
352    #[test]
353    fn can_detect_stalls() {
354        use std::sync::Arc;
355        use std::sync::atomic::{AtomicU64, Ordering};
356        use std::time::Duration;
357        let runs = Arc::new(AtomicU64::new(0));
358        assert_eq!(
359            {
360                let runs = runs.clone();
361                LocalExecutorBuilder::new().build().run_singlethreaded(
362                    async move {
363                        let mut sleep = Duration::from_millis(1);
364                        loop {
365                            Timer::new(sleep).await;
366                            sleep *= 2;
367                            runs.fetch_add(1, Ordering::SeqCst);
368                        }
369                    }
370                    .on_stalled(Duration::from_secs(1), || 1u8),
371                )
372            },
373            1u8
374        );
375        assert!(runs.load(Ordering::SeqCst) >= 9);
376    }
377}