fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20/// A single-threaded port-based executor for Fuchsia.
21///
22/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
23/// [`fuchsia_async::Channel`].
24///
25/// # Panics
26///
27/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
28/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
29pub struct LocalExecutor {
30    // LINT.IfChange
31    /// The inner executor state.
32    pub(crate) ehandle: EHandle,
33    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
34}
35
36impl fmt::Debug for LocalExecutor {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39    }
40}
41
42impl LocalExecutor {
43    /// Create a new single-threaded executor running with actual time.
44    pub fn new() -> Self {
45        let inner = Arc::new(Executor::new(
46            ExecutorTime::RealTime,
47            /* is_local */ true,
48            /* num_threads */ 1,
49        ));
50        let root_scope = ScopeHandle::root(inner);
51        Executor::set_local(root_scope.clone());
52        Self { ehandle: EHandle { root_scope } }
53    }
54
55    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
56    pub fn port(&self) -> &zx::Port {
57        self.ehandle.port()
58    }
59
60    /// Run a single future to completion on a single thread, also polling other active tasks.
61    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
62    where
63        F: Future,
64    {
65        assert!(
66            self.ehandle.inner().is_real_time(),
67            "Error: called `run_singlethreaded` on an executor using fake time"
68        );
69
70        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
71            unreachable!()
72        };
73        result
74    }
75
76    fn run<const UNTIL_STALLED: bool, Fut: Future>(
77        &mut self,
78        main_future: Fut,
79    ) -> Poll<Fut::Output> {
80        /// # Safety
81        ///
82        /// See the comment below.
83        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
84            std::mem::transmute(obj)
85        }
86
87        let scope = &self.ehandle.root_scope;
88        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
89
90        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
91        // the required lifetime.
92        unsafe {
93            scope.insert_task(remove_lifetime(task), false);
94        }
95
96        struct DropMainTask<'a>(&'a EHandle);
97        impl Drop for DropMainTask<'_> {
98            fn drop(&mut self) {
99                // SAFETY: drop_main_tasks requires that the executor isn't running
100                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
101                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
102            }
103        }
104        let _drop_main_task = DropMainTask(&self.ehandle);
105
106        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
107
108        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
109        // here.
110        unsafe {
111            self.ehandle.global_scope().poll_join_result(
112                MAIN_TASK_ID,
113                &mut Context::from_waker(&futures::task::noop_waker()),
114            )
115        }
116    }
117
118    #[doc(hidden)]
119    /// Returns the root scope of the executor.
120    pub fn root_scope(&self) -> &ScopeHandle {
121        self.ehandle.global_scope()
122    }
123}
124
125impl Drop for LocalExecutor {
126    fn drop(&mut self) {
127        self.ehandle.inner().mark_done();
128        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
129    }
130}
131
132/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
133/// and validating behavior of executed tasks.
134///
135/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
136pub struct TestExecutor {
137    /// LocalExecutor used under the hood, since most of the logic is shared.
138    local: LocalExecutor,
139}
140
141impl TestExecutor {
142    /// Create a new executor for testing.
143    pub fn new() -> Self {
144        Self { local: LocalExecutor::new() }
145    }
146
147    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
148    pub fn port(&self) -> &zx::Port {
149        self.local.port()
150    }
151
152    /// Create a new single-threaded executor running with fake time.
153    pub fn new_with_fake_time() -> Self {
154        let inner = Arc::new(Executor::new(
155            ExecutorTime::FakeTime {
156                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
157                mono_to_boot_offset_ns: AtomicI64::new(0),
158            },
159            /* is_local */ true,
160            /* num_threads */ 1,
161        ));
162        let root_scope = ScopeHandle::root(inner);
163        Executor::set_local(root_scope.clone());
164        Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
165    }
166
167    /// Return the current time according to the executor.
168    pub fn now(&self) -> MonotonicInstant {
169        self.local.ehandle.inner().now()
170    }
171
172    /// Return the current time on the boot timeline, according to the executor.
173    pub fn boot_now(&self) -> BootInstant {
174        self.local.ehandle.inner().boot_now()
175    }
176
177    /// Set the fake time to a given value.
178    ///
179    /// # Panics
180    ///
181    /// If the executor was not created with fake time.
182    pub fn set_fake_time(&self, t: MonotonicInstant) {
183        self.local.ehandle.inner().set_fake_time(t)
184    }
185
186    /// Set the offset between the reading of the monotonic and the boot
187    /// clocks.
188    ///
189    /// This is useful to test the situations in which the boot and monotonic
190    /// offsets diverge.  In realistic scenarios, the offset can only grow,
191    /// and testers should keep that in view when setting duration.
192    ///
193    /// # Panics
194    ///
195    /// If the executor was not created with fake time.
196    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
197        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
198    }
199
200    /// Get the global scope of the executor.
201    pub fn global_scope(&self) -> &ScopeHandle {
202        self.local.root_scope()
203    }
204
205    /// Run a single future to completion on a single thread, also polling other active tasks.
206    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
207    where
208        F: Future,
209    {
210        self.local.run_singlethreaded(main_future)
211    }
212
213    /// Poll the future. If it is not ready, dispatch available packets and possibly try
214    /// again. Timers will only fire if this executor uses fake time. Never blocks.
215    ///
216    /// This function is for testing. DO NOT use this function in tests or applications that
217    /// involve any interaction with other threads or processes, as those interactions
218    /// may become stalled waiting for signals from "the outside world" which is beyond
219    /// the knowledge of the executor.
220    ///
221    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
222    /// futures must first be pinned using the `pin!` macro.
223    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
224    where
225        F: Future + Unpin,
226    {
227        let mut main_future = pin!(main_future);
228
229        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
230        struct Cleanup(Arc<Executor>);
231        impl Drop for Cleanup {
232            fn drop(&mut self) {
233                *self.0.owner_data.lock() = None;
234            }
235        }
236        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
237        *self.local.ehandle.inner().owner_data.lock() =
238            Some(Box::new(UntilStalledData { watcher: None }));
239
240        loop {
241            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
242            if result.is_ready() {
243                return result;
244            }
245
246            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
247            if let Some(watcher) = with_data(|data| data.watcher.take()) {
248                watcher.waker.wake();
249                // Relaxed ordering is fine here because this atomic is only ever access from the
250                // main thread.
251                watcher.done.store(true, Ordering::Relaxed);
252            } else {
253                break;
254            }
255        }
256
257        Poll::Pending
258    }
259
260    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
261    ///
262    /// This is intended for use in test code in conjunction with fake time.
263    ///
264    /// The wake will have effect on both the monotonic and the boot timers.
265    pub fn wake_expired_timers(&mut self) -> bool {
266        self.local.ehandle.inner().monotonic_timers().wake_timers()
267            || self.local.ehandle.inner().boot_timers().wake_timers()
268    }
269
270    /// Wake up the next task waiting for a timer, if any, and return the time for which the
271    /// timer was scheduled.
272    ///
273    /// This is intended for use in test code in conjunction with `run_until_stalled`.
274    /// For example, here is how one could test that the Timer future fires after the given
275    /// timeout:
276    ///
277    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
278    ///     let mut future = Timer::<Never>::new(deadline);
279    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
280    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
281    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
282    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
283        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
284    }
285
286    /// Similar to [wake_next_timer], but operates on the timers on the boot
287    /// timeline.
288    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
289        self.local.ehandle.inner().boot_timers().wake_next_timer()
290    }
291
292    /// Returns the deadline for the next timer due to expire.
293    pub fn next_timer() -> Option<MonotonicInstant> {
294        EHandle::local().inner().monotonic_timers().next_timer()
295    }
296
297    /// Returns the deadline for the next boot timeline timer due to expire.
298    pub fn next_boot_timer() -> Option<BootInstant> {
299        EHandle::local().inner().boot_timers().next_timer()
300    }
301
302    /// Advances fake time to the specified time.  This will only work if the executor is being run
303    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
304    /// will make sure that repeating timers fire as expected.
305    ///
306    /// # Panics
307    ///
308    /// Panics if the executor was not created with fake time, and for the same reasons
309    /// `poll_until_stalled` can below.
310    pub async fn advance_to(time: MonotonicInstant) {
311        let ehandle = EHandle::local();
312        loop {
313            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
314            if let Some(next_timer) = Self::next_timer() {
315                if next_timer <= time {
316                    ehandle.inner().set_fake_time(next_timer);
317                    continue;
318                }
319            }
320            ehandle.inner().set_fake_time(time);
321            break;
322        }
323    }
324
325    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
326    /// future.
327    ///
328    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
329    /// can only be called by one task at a time.
330    ///
331    /// This can be used in tests to assert that a future should be pending:
332    /// ```
333    /// assert!(
334    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
335    ///     "my_fut should not be ready!"
336    /// );
337    /// ```
338    ///
339    /// If you just want to know when the executor is stalled, you can do:
340    /// ```
341    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
342    /// ```
343    ///
344    /// # Panics
345    ///
346    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
347    /// not using `TestExecutor::run_until_stalled`.
348    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
349        let watcher =
350            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
351
352        assert!(
353            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
354            "Error: Another task has called `poll_until_stalled`."
355        );
356
357        struct Watcher(Arc<StalledWatcher>);
358
359        // Make sure we clean up if we're dropped.
360        impl Drop for Watcher {
361            fn drop(&mut self) {
362                if !self.0.done.swap(true, Ordering::Relaxed) {
363                    with_data(|data| data.watcher = None);
364                }
365            }
366        }
367
368        let watcher = Watcher(watcher);
369
370        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
371            if watcher.0.done.load(Ordering::Relaxed) {
372                Poll::Ready(())
373            } else {
374                watcher.0.waker.register(cx.waker());
375                Poll::Pending
376            }
377        });
378        match future::select(poll_fn, fut).await {
379            Either::Left(_) => Poll::Pending,
380            Either::Right((value, _)) => Poll::Ready(value),
381        }
382    }
383}
384
385struct StalledWatcher {
386    waker: AtomicWaker,
387    done: AtomicBool,
388}
389
390struct UntilStalledData {
391    watcher: Option<Arc<StalledWatcher>>,
392}
393
394/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
395///
396/// # Panics
397///
398/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
399fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
400    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
401                           with TestExecutor::run_until_stalled";
402    f(&mut EHandle::local()
403        .inner()
404        .owner_data
405        .lock()
406        .as_mut()
407        .expect(MESSAGE)
408        .downcast_mut::<UntilStalledData>()
409        .expect(MESSAGE))
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::handle::on_signals::OnSignals;
416    use crate::{Interval, Timer, WakeupTime};
417    use assert_matches::assert_matches;
418    use futures::StreamExt;
419    use std::cell::{Cell, RefCell};
420    use std::task::Waker;
421    use zx::{self as zx, AsHandleRef};
422
423    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
424        crate::EHandle::local().spawn_detached(future);
425    }
426
427    // Runs a future that suspends and returns after being resumed.
428    #[test]
429    fn stepwise_two_steps() {
430        let fut_step = Arc::new(Cell::new(0));
431        let fut_waker: Arc<RefCell<Option<Waker>>> = Arc::new(RefCell::new(None));
432        let fut_waker_clone = fut_waker.clone();
433        let fut_step_clone = fut_step.clone();
434        let fut_fn = move |cx: &mut Context<'_>| {
435            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
436            match fut_step_clone.get() {
437                0 => {
438                    fut_step_clone.set(1);
439                    Poll::Pending
440                }
441                1 => {
442                    fut_step_clone.set(2);
443                    Poll::Ready(())
444                }
445                _ => panic!("future called after done"),
446            }
447        };
448        let fut = Box::new(future::poll_fn(fut_fn));
449        let mut executor = TestExecutor::new_with_fake_time();
450        // Spawn the future rather than waking it the main task because run_until_stalled will wake
451        // the main future on every call, and we want to wake it ourselves using the waker.
452        executor.local.ehandle.spawn_local_detached(fut);
453        assert_eq!(fut_step.get(), 0);
454        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
455        assert_eq!(fut_step.get(), 1);
456
457        fut_waker.borrow_mut().take().unwrap().wake();
458        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
459        assert_eq!(fut_step.get(), 2);
460    }
461
462    #[test]
463    // Runs a future that waits on a timer.
464    fn stepwise_timer() {
465        let mut executor = TestExecutor::new_with_fake_time();
466        executor.set_fake_time(MonotonicInstant::from_nanos(0));
467        let mut fut =
468            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
469
470        let _ = executor.run_until_stalled(&mut fut);
471        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
472
473        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
474        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
475        assert!(executor.run_until_stalled(&mut fut).is_ready());
476    }
477
478    // Runs a future that waits on an event.
479    #[test]
480    fn stepwise_event() {
481        let mut executor = TestExecutor::new_with_fake_time();
482        let event = zx::Event::create();
483        let mut fut = pin!(OnSignals::new(&event, zx::Signals::USER_0));
484
485        let _ = executor.run_until_stalled(&mut fut);
486
487        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
488        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
489    }
490
491    // Using `run_until_stalled` does not modify the order of events
492    // compared to normal execution.
493    #[test]
494    fn run_until_stalled_preserves_order() {
495        let mut executor = TestExecutor::new_with_fake_time();
496        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
497        let spawned_fut_completed_writer = spawned_fut_completed.clone();
498        let spawned_fut = Box::pin(async move {
499            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
500            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
501        });
502        let mut main_fut = pin!(async {
503            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
504        });
505        spawn(spawned_fut);
506        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
507        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
508        // The timer in `spawned_fut` should fire first, then the
509        // timer in `main_fut`.
510        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
511        assert_eq!(spawned_fut_completed.load(Ordering::SeqCst), true);
512    }
513
514    #[test]
515    fn task_destruction() {
516        struct DropSpawner {
517            dropped: Arc<AtomicBool>,
518        }
519        impl Drop for DropSpawner {
520            fn drop(&mut self) {
521                self.dropped.store(true, Ordering::SeqCst);
522                let dropped_clone = self.dropped.clone();
523                spawn(async {
524                    // Hold on to a reference here to verify that it, too, is destroyed later
525                    let _dropped_clone = dropped_clone;
526                    panic!("task spawned in drop shouldn't be polled");
527                });
528            }
529        }
530        let mut dropped = Arc::new(AtomicBool::new(false));
531        let drop_spawner = DropSpawner { dropped: dropped.clone() };
532        let mut executor = TestExecutor::new();
533        let mut main_fut = pin!(async move {
534            spawn(async move {
535                // Take ownership of the drop spawner
536                let _drop_spawner = drop_spawner;
537                future::pending::<()>().await;
538            });
539        });
540        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
541        assert_eq!(
542            dropped.load(Ordering::SeqCst),
543            false,
544            "executor dropped pending task before destruction"
545        );
546
547        // Should drop the pending task and it's owned drop spawner,
548        // as well as gracefully drop the future spawned from the drop spawner.
549        drop(executor);
550        let dropped = Arc::get_mut(&mut dropped)
551            .expect("someone else is unexpectedly still holding on to a reference");
552        assert_eq!(
553            dropped.load(Ordering::SeqCst),
554            true,
555            "executor did not drop pending task during destruction"
556        );
557    }
558
559    #[test]
560    fn time_now_real_time() {
561        let _executor = LocalExecutor::new();
562        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
563        let t2 = MonotonicInstant::now().into_zx();
564        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
565        assert!(t1 <= t2);
566        assert!(t2 <= t3);
567    }
568
569    #[test]
570    fn time_now_fake_time() {
571        let executor = TestExecutor::new_with_fake_time();
572        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
573        executor.set_fake_time(t1);
574        assert_eq!(MonotonicInstant::now(), t1);
575
576        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
577        executor.set_fake_time(t2);
578        assert_eq!(MonotonicInstant::now(), t2);
579    }
580
581    #[test]
582    fn time_now_fake_time_boot() {
583        let executor = TestExecutor::new_with_fake_time();
584        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
585        executor.set_fake_time(t1);
586        assert_eq!(MonotonicInstant::now(), t1);
587        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
588
589        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
590        executor.set_fake_time(t2);
591        assert_eq!(MonotonicInstant::now(), t2);
592        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
593
594        const TEST_BOOT_OFFSET: i64 = 42;
595
596        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
597        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
598    }
599
600    #[test]
601    fn time_boot_now() {
602        let executor = TestExecutor::new_with_fake_time();
603        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
604        executor.set_fake_time(t1);
605        assert_eq!(MonotonicInstant::now(), t1);
606        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
607
608        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
609        executor.set_fake_time(t2);
610        assert_eq!(MonotonicInstant::now(), t2);
611        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
612
613        const TEST_BOOT_OFFSET: i64 = 42;
614
615        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
616        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
617    }
618
619    #[test]
620    fn time_after_overflow() {
621        let executor = TestExecutor::new_with_fake_time();
622
623        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
624        assert_eq!(
625            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
626            MonotonicInstant::INFINITE
627        );
628
629        executor.set_fake_time(
630            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
631        );
632        assert_eq!(
633            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
634            MonotonicInstant::INFINITE_PAST
635        );
636    }
637
638    // This future wakes itself up a number of times during the same cycle
639    async fn multi_wake(n: usize) {
640        let mut done = false;
641        futures::future::poll_fn(|cx| {
642            if done {
643                return Poll::Ready(());
644            }
645            for _ in 1..n {
646                cx.waker().wake_by_ref()
647            }
648            done = true;
649            Poll::Pending
650        })
651        .await;
652    }
653
654    #[test]
655    fn test_boot_time_tracks_mono_time() {
656        const FAKE_TIME: i64 = 42;
657        let executor = TestExecutor::new_with_fake_time();
658        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
659        assert_eq!(
660            BootInstant::from_nanos(FAKE_TIME),
661            executor.boot_now(),
662            "boot time should have advanced"
663        );
664
665        // Now advance boot without mono.
666        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
667        assert_eq!(
668            BootInstant::from_nanos(2 * FAKE_TIME),
669            executor.boot_now(),
670            "boot time should have advanced again"
671        );
672    }
673
674    // Ensure that a large amount of wakeups does not exhaust kernel resources,
675    // such as the zx port queue limit.
676    #[test]
677    fn many_wakeups() {
678        let mut executor = LocalExecutor::new();
679        executor.run_singlethreaded(multi_wake(4096 * 2));
680    }
681
682    fn advance_to_with(timer_duration: impl WakeupTime) {
683        let mut executor = TestExecutor::new_with_fake_time();
684        executor.set_fake_time(MonotonicInstant::from_nanos(0));
685
686        let mut fut = pin!(async {
687            let timer_fired = Arc::new(AtomicBool::new(false));
688            futures::join!(
689                async {
690                    // Oneshot timer.
691                    Timer::new(timer_duration).await;
692                    timer_fired.store(true, Ordering::SeqCst);
693                },
694                async {
695                    // Interval timer, fires periodically.
696                    let mut fired = 0;
697                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
698                    while let Some(_) = interval.next().await {
699                        fired += 1;
700                        if fired == 3 {
701                            break;
702                        }
703                    }
704                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
705                },
706                async {
707                    assert!(
708                        !timer_fired.load(Ordering::SeqCst),
709                        "the oneshot timer shouldn't be fired"
710                    );
711                    TestExecutor::advance_to(MonotonicInstant::after(
712                        zx::MonotonicDuration::from_millis(500),
713                    ))
714                    .await;
715                    // Timer still shouldn't be fired.
716                    assert!(
717                        !timer_fired.load(Ordering::SeqCst),
718                        "the oneshot timer shouldn't be fired"
719                    );
720                    TestExecutor::advance_to(MonotonicInstant::after(
721                        zx::MonotonicDuration::from_millis(500),
722                    ))
723                    .await;
724
725                    assert!(
726                        timer_fired.load(Ordering::SeqCst),
727                        "the oneshot timer should have fired"
728                    );
729
730                    // The interval timer should have fired once.  Make it fire twice more.
731                    TestExecutor::advance_to(MonotonicInstant::after(
732                        zx::MonotonicDuration::from_seconds(2),
733                    ))
734                    .await;
735                }
736            )
737        });
738        assert!(executor.run_until_stalled(&mut fut).is_ready());
739    }
740
741    #[test]
742    fn test_advance_to() {
743        advance_to_with(zx::MonotonicDuration::from_seconds(1));
744    }
745
746    #[test]
747    fn test_advance_to_boot() {
748        advance_to_with(zx::BootDuration::from_seconds(1));
749    }
750}