Skip to main content

fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18// Exports common to all target os.
19pub use implementation::executor::{
20    LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21    SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{CancelableJoinHandle, JoinHandle, Task, unblock, yield_now};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29pub mod epoch;
30
31// Fuchsia specific exports.
32#[cfg(target_os = "fuchsia")]
33pub use self::fuchsia::{
34    executor::{
35        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
36    },
37    task::LowPriorityTask,
38    timer::Interval,
39};
40
41/// Structured concurrency API for fuchsia-async.
42///
43/// See the [`Scope`] documentation for details.
44pub mod scope;
45
46pub use scope::{Scope, ScopeHandle};
47
48use futures::prelude::*;
49use futures::task::AtomicWaker;
50use pin_project_lite::pin_project;
51use std::pin::Pin;
52use std::sync::Arc;
53use std::task::{Context, Poll, Wake, ready};
54
55/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
56pub trait DurationExt {
57    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
58    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
59    ///
60    /// This method requires that an executor has been set up.
61    fn after_now(self) -> MonotonicInstant;
62}
63
64/// The time when a Timer should wakeup.
65pub trait WakeupTime {
66    /// Create a timer based on this time.
67    ///
68    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
69    /// never earlier.
70    fn into_timer(self) -> Timer;
71}
72
73#[cfg(target_os = "fuchsia")]
74impl WakeupTime for std::time::Duration {
75    fn into_timer(self) -> Timer {
76        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
77    }
78}
79
80#[cfg(not(target_os = "fuchsia"))]
81impl WakeupTime for std::time::Duration {
82    fn into_timer(self) -> Timer {
83        Timer::from(self)
84    }
85}
86
87#[cfg(target_os = "fuchsia")]
88impl WakeupTime for MonotonicDuration {
89    fn into_timer(self) -> Timer {
90        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
91    }
92}
93
94#[cfg(target_os = "fuchsia")]
95impl WakeupTime for zx::BootDuration {
96    fn into_timer(self) -> Timer {
97        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
98    }
99}
100
101impl DurationExt for std::time::Duration {
102    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
103    fn after_now(self) -> MonotonicInstant {
104        MonotonicInstant::now() + self.into()
105    }
106}
107
108/// A trait which allows futures to be easily wrapped in a timeout.
109pub trait TimeoutExt: Future + Sized {
110    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
111    /// when the timeout occurs.
112    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
113    where
114        WT: WakeupTime,
115        OT: FnOnce() -> Self::Output,
116    {
117        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
118    }
119
120    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
121    /// when the future hasn't been otherwise polled within the `timeout`.
122    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
123    /// and moving all work to external tasks or threads with force the triggering early.
124    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
125    where
126        OS: FnOnce() -> Self::Output,
127    {
128        OnStalled {
129            timer: timeout.into_timer(),
130            future: self,
131            timeout,
132            on_stalled: Some(on_stalled),
133            waker: Arc::default(),
134        }
135    }
136}
137
138impl<F: Future + Sized> TimeoutExt for F {}
139
140pin_project! {
141    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
142    #[derive(Debug)]
143    #[must_use = "futures do nothing unless polled"]
144    pub struct OnTimeout<F, OT> {
145        #[pin]
146        timer: Timer,
147        #[pin]
148        future: F,
149        on_timeout: Option<OT>,
150    }
151}
152
153impl<F: Future, OT> Future for OnTimeout<F, OT>
154where
155    OT: FnOnce() -> F::Output,
156{
157    type Output = F::Output;
158
159    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160        let this = self.project();
161        if let Poll::Ready(item) = this.future.poll(cx) {
162            return Poll::Ready(item);
163        }
164        if let Poll::Ready(()) = this.timer.poll(cx) {
165            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
166            let item = (ot)();
167            return Poll::Ready(item);
168        }
169        Poll::Pending
170    }
171}
172
173pin_project! {
174    /// A wrapper for a future who's steady progress is monitored and will complete with the
175    /// provided closure if no progress is made before the timeout.
176    #[derive(Debug)]
177    #[must_use = "futures do nothing unless polled"]
178    pub struct OnStalled<F, OS> {
179        #[pin]
180        timer: Timer,
181        #[pin]
182        future: F,
183        timeout: std::time::Duration,
184        on_stalled: Option<OS>,
185        waker: Arc<OnStalledWaker>,
186    }
187}
188
189impl<F: Future, OS> Future for OnStalled<F, OS>
190where
191    OS: FnOnce() -> F::Output,
192{
193    type Output = F::Output;
194
195    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196        let mut this = self.project();
197        // Use a different waker when polling the future so we know whether it was woken or not.
198        let mut woken = this.waker.original_waker.take().is_none();
199        loop {
200            this.waker.original_waker.register(cx.waker());
201            let waker = this.waker.clone().into();
202            let mut cx2 = Context::from_waker(&waker);
203            if let Poll::Ready(item) = this.future.as_mut().poll(&mut cx2) {
204                return Poll::Ready(item);
205            }
206            if woken || this.timer.as_mut().poll(cx).is_pending() {
207                this.timer.set(this.timeout.into_timer());
208                ready!(this.timer.as_mut().poll(cx));
209                // The timer immediately fired.  See if the future was woken.  If it was, we'll
210                // loop around and poll the future again.
211                woken = this.waker.original_waker.take().is_none();
212            }
213            if !woken {
214                return Poll::Ready((this.on_stalled.take().expect("polled after completion"))());
215            }
216        }
217    }
218}
219
220// A different waker is used so we can tell when the future was woken, rather than the timer.
221#[derive(Debug, Default)]
222struct OnStalledWaker {
223    original_waker: AtomicWaker,
224}
225
226impl Wake for OnStalledWaker {
227    fn wake(self: Arc<Self>) {
228        self.original_waker.wake();
229    }
230}
231
232#[cfg(test)]
233mod task_tests {
234
235    use super::*;
236    use futures::channel::oneshot;
237
238    fn run(f: impl Send + 'static + Future<Output = ()>) {
239        const TEST_THREADS: u8 = 2;
240        SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
241    }
242
243    #[test]
244    fn can_detach() {
245        run(async move {
246            let (tx_started, rx_started) = oneshot::channel();
247            let (tx_continue, rx_continue) = oneshot::channel();
248            let (tx_done, rx_done) = oneshot::channel();
249            {
250                // spawn a task and detach it
251                // the task will wait for a signal, signal it received it, and then wait for another
252                Task::spawn(async move {
253                    tx_started.send(()).unwrap();
254                    rx_continue.await.unwrap();
255                    tx_done.send(()).unwrap();
256                })
257                .detach();
258            }
259            // task is detached, have a short conversation with it
260            rx_started.await.unwrap();
261            tx_continue.send(()).unwrap();
262            rx_done.await.unwrap();
263        });
264    }
265
266    #[test]
267    fn can_join() {
268        // can we spawn, then join a task
269        run(async move {
270            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
271        })
272    }
273
274    #[test]
275    fn can_join_unblock() {
276        // can we poll a blocked task
277        run(async move {
278            assert_eq!(42, unblock(|| 42u8).await);
279        })
280    }
281
282    #[test]
283    fn can_join_unblock_local() {
284        // can we poll a blocked task in a local executor
285        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
286            assert_eq!(42, unblock(|| 42u8).await);
287        });
288    }
289
290    #[test]
291    #[should_panic]
292    // TODO(https://fxbug.dev/42169733): delete the below
293    #[cfg_attr(feature = "variant_asan", ignore)]
294    #[cfg_attr(feature = "variant_hwasan", ignore)]
295    fn unblock_fn_panics() {
296        run(async move {
297            unblock(|| panic!("bad")).await;
298        })
299    }
300
301    #[test]
302    fn can_join_local() {
303        // can we spawn, then join a task locally
304        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
305            assert_eq!(42, Task::local(async move { 42u8 }).await);
306        })
307    }
308
309    #[test]
310    fn can_cancel() {
311        run(async move {
312            let (_tx_start, rx_start) = oneshot::channel::<()>();
313            let (tx_done, rx_done) = oneshot::channel();
314            // Start and immediately cancel the task (by dropping it).
315            drop(Task::spawn(async move {
316                rx_start.await.unwrap();
317                tx_done.send(()).unwrap();
318            }));
319            // we should see an error on receive
320            rx_done.await.expect_err("done should not be sent");
321        })
322    }
323}
324
325#[cfg(test)]
326mod timer_tests {
327    use super::*;
328    use futures::future::Either;
329    use std::pin::pin;
330
331    #[test]
332    fn shorter_fires_first_instant() {
333        use std::time::{Duration, Instant};
334        let mut exec = LocalExecutorBuilder::new().build();
335        let now = Instant::now();
336        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
337        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
338        match exec.run_singlethreaded(future::select(shorter, longer)) {
339            Either::Left((_, _)) => {}
340            Either::Right((_, _)) => panic!("wrong timer fired"),
341        }
342    }
343
344    #[cfg(target_os = "fuchsia")]
345    #[test]
346    fn can_use_zx_duration() {
347        let mut exec = LocalExecutorBuilder::new().build();
348        let start = MonotonicInstant::now();
349        let timer = Timer::new(MonotonicDuration::from_millis(100));
350        exec.run_singlethreaded(timer);
351        let end = MonotonicInstant::now();
352        assert!(end - start > MonotonicDuration::from_millis(100));
353    }
354
355    #[test]
356    fn can_detect_stalls() {
357        use std::sync::Arc;
358        use std::sync::atomic::{AtomicU64, Ordering};
359        use std::time::Duration;
360        let runs = Arc::new(AtomicU64::new(0));
361        assert_eq!(
362            {
363                let runs = runs.clone();
364                LocalExecutorBuilder::new().build().run_singlethreaded(
365                    async move {
366                        let mut sleep = Duration::from_millis(1);
367                        loop {
368                            Timer::new(sleep).await;
369                            sleep *= 2;
370                            runs.fetch_add(1, Ordering::SeqCst);
371                        }
372                    }
373                    .on_stalled(Duration::from_secs(1), || 1u8),
374                )
375            },
376            1u8
377        );
378        assert!(runs.load(Ordering::SeqCst) >= 9);
379    }
380}