fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18// Exports common to all target os.
19pub use implementation::executor::{
20    LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21    SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31// Fuchsia specific exports.
32#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34    executor::{
35        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36    },
37    timer::Interval,
38};
39
40/// Structured concurrency API for fuchsia-async.
41///
42/// See the [`Scope`] documentation for details.
43pub mod scope;
44
45pub use scope::{Scope, ScopeHandle};
46
47use futures::prelude::*;
48use futures::task::AtomicWaker;
49use pin_project_lite::pin_project;
50use std::pin::Pin;
51use std::sync::Arc;
52use std::task::{Context, Poll, Wake, ready};
53
54/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
55pub trait DurationExt {
56    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
57    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
58    ///
59    /// This method requires that an executor has been set up.
60    fn after_now(self) -> MonotonicInstant;
61}
62
63/// The time when a Timer should wakeup.
64pub trait WakeupTime {
65    /// Create a timer based on this time.
66    ///
67    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
68    /// never earlier.
69    fn into_timer(self) -> Timer;
70}
71
72#[cfg(target_os = "fuchsia")]
73impl WakeupTime for std::time::Duration {
74    fn into_timer(self) -> Timer {
75        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
76    }
77}
78
79#[cfg(not(target_os = "fuchsia"))]
80impl WakeupTime for std::time::Duration {
81    fn into_timer(self) -> Timer {
82        Timer::from(self)
83    }
84}
85
86#[cfg(target_os = "fuchsia")]
87impl WakeupTime for MonotonicDuration {
88    fn into_timer(self) -> Timer {
89        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
90    }
91}
92
93#[cfg(target_os = "fuchsia")]
94impl WakeupTime for zx::BootDuration {
95    fn into_timer(self) -> Timer {
96        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
97    }
98}
99
100impl DurationExt for std::time::Duration {
101    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
102    fn after_now(self) -> MonotonicInstant {
103        MonotonicInstant::now() + self.into()
104    }
105}
106
107/// A trait which allows futures to be easily wrapped in a timeout.
108pub trait TimeoutExt: Future + Sized {
109    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
110    /// when the timeout occurs.
111    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
112    where
113        WT: WakeupTime,
114        OT: FnOnce() -> Self::Output,
115    {
116        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
117    }
118
119    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
120    /// when the future hasn't been otherwise polled within the `timeout`.
121    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
122    /// and moving all work to external tasks or threads with force the triggering early.
123    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
124    where
125        OS: FnOnce() -> Self::Output,
126    {
127        OnStalled {
128            timer: timeout.into_timer(),
129            future: self,
130            timeout,
131            on_stalled: Some(on_stalled),
132            waker: Arc::default(),
133        }
134    }
135}
136
137impl<F: Future + Sized> TimeoutExt for F {}
138
139pin_project! {
140    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
141    #[derive(Debug)]
142    #[must_use = "futures do nothing unless polled"]
143    pub struct OnTimeout<F, OT> {
144        #[pin]
145        timer: Timer,
146        #[pin]
147        future: F,
148        on_timeout: Option<OT>,
149    }
150}
151
152impl<F: Future, OT> Future for OnTimeout<F, OT>
153where
154    OT: FnOnce() -> F::Output,
155{
156    type Output = F::Output;
157
158    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159        let this = self.project();
160        if let Poll::Ready(item) = this.future.poll(cx) {
161            return Poll::Ready(item);
162        }
163        if let Poll::Ready(()) = this.timer.poll(cx) {
164            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
165            let item = (ot)();
166            return Poll::Ready(item);
167        }
168        Poll::Pending
169    }
170}
171
172pin_project! {
173    /// A wrapper for a future who's steady progress is monitored and will complete with the
174    /// provided closure if no progress is made before the timeout.
175    #[derive(Debug)]
176    #[must_use = "futures do nothing unless polled"]
177    pub struct OnStalled<F, OS> {
178        #[pin]
179        timer: Timer,
180        #[pin]
181        future: F,
182        timeout: std::time::Duration,
183        on_stalled: Option<OS>,
184        waker: Arc<OnStalledWaker>,
185    }
186}
187
188impl<F: Future, OS> Future for OnStalled<F, OS>
189where
190    OS: FnOnce() -> F::Output,
191{
192    type Output = F::Output;
193
194    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        let mut this = self.project();
196        // Use a different waker when polling the future so we know whether it was woken or not.
197        let mut woken = this.waker.original_waker.take().is_none();
198        loop {
199            this.waker.original_waker.register(cx.waker());
200            let waker = this.waker.clone().into();
201            let mut cx2 = Context::from_waker(&waker);
202            if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
203                return Poll::Ready(item);
204            }
205            if woken || this.timer.as_mut().poll(cx).is_pending() {
206                this.timer.set(this.timeout.into_timer());
207                ready!(this.timer.as_mut().poll(cx));
208                // The timer immediately fired.  See if the future was woken.  If it was, we'll
209                // loop around and poll the future again.
210                woken = this.waker.original_waker.take().is_none();
211            }
212            if !woken {
213                return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
214            }
215        }
216    }
217}
218
219// A different waker is used so we can tell when the future was woken, rather than the timer.
220#[derive(Debug, Default)]
221struct OnStalledWaker {
222    original_waker: AtomicWaker,
223}
224
225impl Wake for OnStalledWaker {
226    fn wake(self: Arc<Self>) {
227        self.original_waker.wake();
228    }
229}
230
231#[cfg(test)]
232mod task_tests {
233
234    use super::*;
235    use futures::channel::oneshot;
236
237    fn run(f: impl Send + 'static + Future<Output = ()>) {
238        const TEST_THREADS: u8 = 2;
239        SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
240    }
241
242    #[test]
243    fn can_detach() {
244        run(async move {
245            let (tx_started, rx_started) = oneshot::channel();
246            let (tx_continue, rx_continue) = oneshot::channel();
247            let (tx_done, rx_done) = oneshot::channel();
248            {
249                // spawn a task and detach it
250                // the task will wait for a signal, signal it received it, and then wait for another
251                Task::spawn(async move {
252                    tx_started.send(()).unwrap();
253                    rx_continue.await.unwrap();
254                    tx_done.send(()).unwrap();
255                })
256                .detach();
257            }
258            // task is detached, have a short conversation with it
259            rx_started.await.unwrap();
260            tx_continue.send(()).unwrap();
261            rx_done.await.unwrap();
262        });
263    }
264
265    #[test]
266    fn can_join() {
267        // can we spawn, then join a task
268        run(async move {
269            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
270        })
271    }
272
273    #[test]
274    fn can_join_unblock() {
275        // can we poll a blocked task
276        run(async move {
277            assert_eq!(42, unblock(|| 42u8).await);
278        })
279    }
280
281    #[test]
282    fn can_join_unblock_local() {
283        // can we poll a blocked task in a local executor
284        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
285            assert_eq!(42, unblock(|| 42u8).await);
286        });
287    }
288
289    #[test]
290    #[should_panic]
291    // TODO(https://fxbug.dev/42169733): delete the below
292    #[cfg_attr(feature = "variant_asan", ignore)]
293    #[cfg_attr(feature = "variant_hwasan", ignore)]
294    fn unblock_fn_panics() {
295        run(async move {
296            unblock(|| panic!("bad")).await;
297        })
298    }
299
300    #[test]
301    fn can_join_local() {
302        // can we spawn, then join a task locally
303        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
304            assert_eq!(42, Task::local(async move { 42u8 }).await);
305        })
306    }
307
308    #[test]
309    fn can_cancel() {
310        run(async move {
311            let (_tx_start, rx_start) = oneshot::channel::<()>();
312            let (tx_done, rx_done) = oneshot::channel();
313            // Start and immediately cancel the task (by dropping it).
314            drop(Task::spawn(async move {
315                rx_start.await.unwrap();
316                tx_done.send(()).unwrap();
317            }));
318            // we should see an error on receive
319            rx_done.await.expect_err("done should not be sent");
320        })
321    }
322}
323
324#[cfg(test)]
325mod timer_tests {
326    use super::*;
327    use futures::future::Either;
328    use std::pin::pin;
329
330    #[test]
331    fn shorter_fires_first_instant() {
332        use std::time::{Duration, Instant};
333        let mut exec = LocalExecutorBuilder::new().build();
334        let now = Instant::now();
335        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
336        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
337        match exec.run_singlethreaded(future::select(shorter, longer)) {
338            Either::Left((_, _)) => {}
339            Either::Right((_, _)) => panic!("wrong timer fired"),
340        }
341    }
342
343    #[cfg(target_os = "fuchsia")]
344    #[test]
345    fn can_use_zx_duration() {
346        let mut exec = LocalExecutorBuilder::new().build();
347        let start = MonotonicInstant::now();
348        let timer = Timer::new(MonotonicDuration::from_millis(100));
349        exec.run_singlethreaded(timer);
350        let end = MonotonicInstant::now();
351        assert!(end - start > MonotonicDuration::from_millis(100));
352    }
353
354    #[test]
355    fn can_detect_stalls() {
356        use std::sync::Arc;
357        use std::sync::atomic::{AtomicU64, Ordering};
358        use std::time::Duration;
359        let runs = Arc::new(AtomicU64::new(0));
360        assert_eq!(
361            {
362                let runs = runs.clone();
363                LocalExecutorBuilder::new().build().run_singlethreaded(
364                    async move {
365                        let mut sleep = Duration::from_millis(1);
366                        loop {
367                            Timer::new(sleep).await;
368                            sleep *= 2;
369                            runs.fetch_add(1, Ordering::SeqCst);
370                        }
371                    }
372                    .on_stalled(Duration::from_secs(1), || 1u8),
373                )
374            },
375            1u8
376        );
377        assert!(runs.load(Ordering::SeqCst) >= 9);
378    }
379}