fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, MAIN_TASK_ID, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21/// A single-threaded port-based executor for Fuchsia.
22///
23/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
24/// [`fuchsia_async::Channel`].
25///
26/// # Panics
27///
28/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
29/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
30pub struct LocalExecutor {
31    // LINT.IfChange
32    /// The inner executor state.
33    pub(crate) ehandle: EHandle,
34    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
35}
36
37impl fmt::Debug for LocalExecutor {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40    }
41}
42
43impl Default for LocalExecutor {
44    /// Create a new single-threaded executor running with actual time.
45    fn default() -> Self {
46        Self::new_with_port(zx::Port::create(), None)
47    }
48}
49
50impl LocalExecutor {
51    /// Create a new single-threaded executor running with actual time, with a port
52    /// and instrumentation.
53    pub(crate) fn new_with_port(
54        port: zx::Port,
55        instrument: Option<Arc<dyn TaskInstrument>>,
56    ) -> Self {
57        let inner = Arc::new(Executor::new_with_port(
58            ExecutorTime::RealTime,
59            /* is_local */ true,
60            /* num_threads */ 1,
61            port,
62            instrument,
63        ));
64        let root_scope = ScopeHandle::root(inner);
65        Executor::set_local(root_scope.clone());
66        Self { ehandle: EHandle { root_scope } }
67    }
68
69    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
70    pub fn port(&self) -> &zx::Port {
71        self.ehandle.port()
72    }
73
74    /// Run a single future to completion on a single thread, also polling other active tasks.
75    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76    where
77        F: Future,
78    {
79        assert!(
80            self.ehandle.inner().is_real_time(),
81            "Error: called `run_singlethreaded` on an executor using fake time"
82        );
83
84        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
85            unreachable!()
86        };
87        result
88    }
89
90    fn run<const UNTIL_STALLED: bool, Fut: Future>(
91        &mut self,
92        main_future: Fut,
93    ) -> Poll<Fut::Output> {
94        /// # Safety
95        ///
96        /// See the comment below.
97        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98            std::mem::transmute(obj)
99        }
100
101        let scope = &self.ehandle.root_scope;
102        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
103
104        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
105        // the required lifetime.
106        unsafe {
107            scope.insert_task(remove_lifetime(task), false);
108        }
109
110        struct DropMainTask<'a>(&'a EHandle);
111        impl Drop for DropMainTask<'_> {
112            fn drop(&mut self) {
113                // SAFETY: drop_main_tasks requires that the executor isn't running
114                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
115                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
116            }
117        }
118        let _drop_main_task = DropMainTask(&self.ehandle);
119
120        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
121
122        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
123        // here.
124        unsafe {
125            self.ehandle.global_scope().poll_join_result(
126                MAIN_TASK_ID,
127                &mut Context::from_waker(&futures::task::noop_waker()),
128            )
129        }
130    }
131
132    #[doc(hidden)]
133    /// Returns the root scope of the executor.
134    pub fn root_scope(&self) -> &ScopeHandle {
135        self.ehandle.global_scope()
136    }
137}
138
139impl Drop for LocalExecutor {
140    fn drop(&mut self) {
141        self.ehandle.inner().mark_done();
142        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
143    }
144}
145
146/// A builder for `LocalExecutor`.
147#[derive(Default)]
148pub struct LocalExecutorBuilder {
149    port: Option<zx::Port>,
150    instrument: Option<Arc<dyn TaskInstrument>>,
151}
152
153impl LocalExecutorBuilder {
154    /// Creates a new builder used for constructing a `LocalExecutor`.
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Sets the port for the executor.
160    pub fn port(mut self, port: zx::Port) -> Self {
161        self.port = Some(port);
162        self
163    }
164
165    /// Sets the instrumentation hook.
166    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
167        self.instrument = instrument;
168        self
169    }
170
171    /// Builds the `LocalExecutor`, consuming this `LocalExecutorBuilder`.
172    pub fn build(self) -> LocalExecutor {
173        match self.port {
174            Some(port) => LocalExecutor::new_with_port(port, self.instrument),
175            None => LocalExecutor::default(),
176        }
177    }
178}
179
180/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
181/// and validating behavior of executed tasks.
182///
183/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
184pub struct TestExecutor {
185    /// LocalExecutor used under the hood, since most of the logic is shared.
186    local: LocalExecutor,
187}
188
189impl Default for TestExecutor {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl TestExecutor {
196    /// Create a new executor for testing.
197    pub fn new() -> Self {
198        Self::builder().build()
199    }
200
201    /// Create a new single-threaded executor running with fake time.
202    pub fn new_with_fake_time() -> Self {
203        Self::builder().fake_time(true).build()
204    }
205
206    /// Creates a new builder for a `TestExecutor`.
207    pub fn builder() -> TestExecutorBuilder {
208        TestExecutorBuilder::new()
209    }
210
211    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
212    pub fn port(&self) -> &zx::Port {
213        self.local.port()
214    }
215
216    /// Return the current time according to the executor.
217    pub fn now(&self) -> MonotonicInstant {
218        self.local.ehandle.inner().now()
219    }
220
221    /// Return the current time on the boot timeline, according to the executor.
222    pub fn boot_now(&self) -> BootInstant {
223        self.local.ehandle.inner().boot_now()
224    }
225
226    /// Set the fake time to a given value.
227    ///
228    /// # Panics
229    ///
230    /// If the executor was not created with fake time.
231    pub fn set_fake_time(&self, t: MonotonicInstant) {
232        self.local.ehandle.inner().set_fake_time(t)
233    }
234
235    /// Set the offset between the reading of the monotonic and the boot
236    /// clocks.
237    ///
238    /// This is useful to test the situations in which the boot and monotonic
239    /// offsets diverge.  In realistic scenarios, the offset can only grow,
240    /// and testers should keep that in view when setting duration.
241    ///
242    /// # Panics
243    ///
244    /// If the executor was not created with fake time.
245    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
246        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
247    }
248
249    /// Get the global scope of the executor.
250    pub fn global_scope(&self) -> &ScopeHandle {
251        self.local.root_scope()
252    }
253
254    /// Run a single future to completion on a single thread, also polling other active tasks.
255    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
256    where
257        F: Future,
258    {
259        self.local.run_singlethreaded(main_future)
260    }
261
262    /// Poll the future. If it is not ready, dispatch available packets and possibly try
263    /// again. Timers will only fire if this executor uses fake time. Never blocks.
264    ///
265    /// This function is for testing. DO NOT use this function in tests or applications that
266    /// involve any interaction with other threads or processes, as those interactions
267    /// may become stalled waiting for signals from "the outside world" which is beyond
268    /// the knowledge of the executor.
269    ///
270    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
271    /// futures must first be pinned using the `pin!` macro.
272    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
273    where
274        F: Future + Unpin,
275    {
276        let mut main_future = pin!(main_future);
277
278        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
279        struct Cleanup(Arc<Executor>);
280        impl Drop for Cleanup {
281            fn drop(&mut self) {
282                *self.0.owner_data.lock() = None;
283            }
284        }
285        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
286        *self.local.ehandle.inner().owner_data.lock() =
287            Some(Box::new(UntilStalledData { watcher: None }));
288
289        loop {
290            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
291            if result.is_ready() {
292                return result;
293            }
294
295            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
296            if let Some(watcher) = with_data(|data| data.watcher.take()) {
297                watcher.waker.wake();
298                // Relaxed ordering is fine here because this atomic is only ever access from the
299                // main thread.
300                watcher.done.store(true, Ordering::Relaxed);
301            } else {
302                break;
303            }
304        }
305
306        Poll::Pending
307    }
308
309    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
310    ///
311    /// This is intended for use in test code in conjunction with fake time.
312    ///
313    /// The wake will have effect on both the monotonic and the boot timers.
314    pub fn wake_expired_timers(&mut self) -> bool {
315        self.local.ehandle.inner().monotonic_timers().wake_timers()
316            || self.local.ehandle.inner().boot_timers().wake_timers()
317    }
318
319    /// Wake up the next task waiting for a timer, if any, and return the time for which the
320    /// timer was scheduled.
321    ///
322    /// This is intended for use in test code in conjunction with `run_until_stalled`.
323    /// For example, here is how one could test that the Timer future fires after the given
324    /// timeout:
325    ///
326    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
327    ///     let mut future = Timer::<Never>::new(deadline);
328    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
329    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
330    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
331    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
332        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
333    }
334
335    /// Similar to [wake_next_timer], but operates on the timers on the boot
336    /// timeline.
337    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
338        self.local.ehandle.inner().boot_timers().wake_next_timer()
339    }
340
341    /// Returns the deadline for the next timer due to expire.
342    pub fn next_timer() -> Option<MonotonicInstant> {
343        EHandle::local().inner().monotonic_timers().next_timer()
344    }
345
346    /// Returns the deadline for the next boot timeline timer due to expire.
347    pub fn next_boot_timer() -> Option<BootInstant> {
348        EHandle::local().inner().boot_timers().next_timer()
349    }
350
351    /// Advances fake time to the specified time.  This will only work if the executor is being run
352    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
353    /// will make sure that repeating timers fire as expected.
354    ///
355    /// # Panics
356    ///
357    /// Panics if the executor was not created with fake time, and for the same reasons
358    /// `poll_until_stalled` can below.
359    pub async fn advance_to(time: MonotonicInstant) {
360        let ehandle = EHandle::local();
361        loop {
362            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
363            if let Some(next_timer) = Self::next_timer() {
364                if next_timer <= time {
365                    ehandle.inner().set_fake_time(next_timer);
366                    continue;
367                }
368            }
369            ehandle.inner().set_fake_time(time);
370            break;
371        }
372    }
373
374    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
375    /// future.
376    ///
377    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
378    /// can only be called by one task at a time.
379    ///
380    /// This can be used in tests to assert that a future should be pending:
381    /// ```
382    /// assert!(
383    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
384    ///     "my_fut should not be ready!"
385    /// );
386    /// ```
387    ///
388    /// If you just want to know when the executor is stalled, you can do:
389    /// ```
390    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
391    /// ```
392    ///
393    /// # Panics
394    ///
395    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
396    /// not using `TestExecutor::run_until_stalled`.
397    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
398        let watcher =
399            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
400
401        assert!(
402            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
403            "Error: Another task has called `poll_until_stalled`."
404        );
405
406        struct Watcher(Arc<StalledWatcher>);
407
408        // Make sure we clean up if we're dropped.
409        impl Drop for Watcher {
410            fn drop(&mut self) {
411                if !self.0.done.swap(true, Ordering::Relaxed) {
412                    with_data(|data| data.watcher = None);
413                }
414            }
415        }
416
417        let watcher = Watcher(watcher);
418
419        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
420            if watcher.0.done.load(Ordering::Relaxed) {
421                Poll::Ready(())
422            } else {
423                watcher.0.waker.register(cx.waker());
424                Poll::Pending
425            }
426        });
427        match future::select(poll_fn, fut).await {
428            Either::Left(_) => Poll::Pending,
429            Either::Right((value, _)) => Poll::Ready(value),
430        }
431    }
432}
433
434/// A builder for `TestExecutor`.
435#[derive(Default)]
436pub struct TestExecutorBuilder {
437    port: Option<zx::Port>,
438    fake_time: bool,
439    instrument: Option<Arc<dyn TaskInstrument>>,
440}
441
442impl TestExecutorBuilder {
443    /// Creates a new builder used for constructing a `TestExecutor`.
444    pub fn new() -> Self {
445        Self::default()
446    }
447
448    /// Sets the port for the executor.
449    pub fn port(mut self, port: zx::Port) -> Self {
450        self.port = Some(port);
451        self
452    }
453
454    /// Sets whether the executor should use fake time.
455    pub fn fake_time(mut self, fake_time: bool) -> Self {
456        self.fake_time = fake_time;
457        self
458    }
459
460    /// Sets the task instrumentation.
461    pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
462        self.instrument = Some(instrument);
463        self
464    }
465
466    /// Builds the `TestExecutor`, consuming this `TestExecutorBuilder`.
467    pub fn build(self) -> TestExecutor {
468        let time = if self.fake_time {
469            ExecutorTime::FakeTime {
470                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
471                mono_to_boot_offset_ns: AtomicI64::new(0),
472            }
473        } else {
474            ExecutorTime::RealTime
475        };
476        let port = self.port.unwrap_or_else(zx::Port::create);
477        let inner = Arc::new(Executor::new_with_port(
478            time,
479            /* is_local */ true,
480            /* num_threads */ 1,
481            port,
482            self.instrument,
483        ));
484        let root_scope = ScopeHandle::root(inner);
485        Executor::set_local(root_scope.clone());
486        let local = LocalExecutor { ehandle: EHandle { root_scope } };
487        TestExecutor { local }
488    }
489}
490
491struct StalledWatcher {
492    waker: AtomicWaker,
493    done: AtomicBool,
494}
495
496struct UntilStalledData {
497    watcher: Option<Arc<StalledWatcher>>,
498}
499
500/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
501///
502/// # Panics
503///
504/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
505fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
506    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
507                           with TestExecutor::run_until_stalled";
508    f(EHandle::local()
509        .inner()
510        .owner_data
511        .lock()
512        .as_mut()
513        .expect(MESSAGE)
514        .downcast_mut::<UntilStalledData>()
515        .expect(MESSAGE))
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::handle::on_signals::OnSignalsFuture;
522    use crate::{Interval, Timer, WakeupTime};
523    use assert_matches::assert_matches;
524    use futures::StreamExt;
525    use std::cell::{Cell, RefCell};
526    use std::rc::Rc;
527    use std::task::Waker;
528    use zx::{self as zx, AsHandleRef};
529
530    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
531        crate::EHandle::local().spawn_detached(future);
532    }
533
534    // Runs a future that suspends and returns after being resumed.
535    #[test]
536    fn stepwise_two_steps() {
537        let fut_step = Rc::new(Cell::new(0));
538        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
539        let fut_waker_clone = fut_waker.clone();
540        let fut_step_clone = fut_step.clone();
541        let fut_fn = move |cx: &mut Context<'_>| {
542            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
543            match fut_step_clone.get() {
544                0 => {
545                    fut_step_clone.set(1);
546                    Poll::Pending
547                }
548                1 => {
549                    fut_step_clone.set(2);
550                    Poll::Ready(())
551                }
552                _ => panic!("future called after done"),
553            }
554        };
555        let fut = Box::new(future::poll_fn(fut_fn));
556        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
557        // Spawn the future rather than waking it the main task because run_until_stalled will wake
558        // the main future on every call, and we want to wake it ourselves using the waker.
559        executor.local.ehandle.spawn_local_detached(fut);
560        assert_eq!(fut_step.get(), 0);
561        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
562        assert_eq!(fut_step.get(), 1);
563
564        fut_waker.borrow_mut().take().unwrap().wake();
565        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
566        assert_eq!(fut_step.get(), 2);
567    }
568
569    #[test]
570    // Runs a future that waits on a timer.
571    fn stepwise_timer() {
572        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
573        executor.set_fake_time(MonotonicInstant::from_nanos(0));
574        let mut fut =
575            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
576
577        let _ = executor.run_until_stalled(&mut fut);
578        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
579
580        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
581        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
582        assert!(executor.run_until_stalled(&mut fut).is_ready());
583    }
584
585    // Runs a future that waits on an event.
586    #[test]
587    fn stepwise_event() {
588        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
589        let event = zx::Event::create();
590        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
591
592        let _ = executor.run_until_stalled(&mut fut);
593
594        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
595        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
596    }
597
598    // Using `run_until_stalled` does not modify the order of events
599    // compared to normal execution.
600    #[test]
601    fn run_until_stalled_preserves_order() {
602        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
603        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
604        let spawned_fut_completed_writer = spawned_fut_completed.clone();
605        let spawned_fut = Box::pin(async move {
606            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
607            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
608        });
609        let mut main_fut = pin!(async {
610            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
611        });
612        spawn(spawned_fut);
613        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
614        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
615        // The timer in `spawned_fut` should fire first, then the
616        // timer in `main_fut`.
617        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
618        assert!(spawned_fut_completed.load(Ordering::SeqCst));
619    }
620
621    #[test]
622    fn task_destruction() {
623        struct DropSpawner {
624            dropped: Arc<AtomicBool>,
625        }
626        impl Drop for DropSpawner {
627            fn drop(&mut self) {
628                self.dropped.store(true, Ordering::SeqCst);
629                let dropped_clone = self.dropped.clone();
630                spawn(async {
631                    // Hold on to a reference here to verify that it, too, is destroyed later
632                    let _dropped_clone = dropped_clone;
633                    panic!("task spawned in drop shouldn't be polled");
634                });
635            }
636        }
637        let mut dropped = Arc::new(AtomicBool::new(false));
638        let drop_spawner = DropSpawner { dropped: dropped.clone() };
639        let mut executor = TestExecutorBuilder::new().build();
640        let mut main_fut = pin!(async move {
641            spawn(async move {
642                // Take ownership of the drop spawner
643                let _drop_spawner = drop_spawner;
644                future::pending::<()>().await;
645            });
646        });
647        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
648        assert!(
649            !dropped.load(Ordering::SeqCst),
650            "executor dropped pending task before destruction"
651        );
652
653        // Should drop the pending task and it's owned drop spawner,
654        // as well as gracefully drop the future spawned from the drop spawner.
655        drop(executor);
656        let dropped = Arc::get_mut(&mut dropped)
657            .expect("someone else is unexpectedly still holding on to a reference");
658        assert!(
659            dropped.load(Ordering::SeqCst),
660            "executor did not drop pending task during destruction"
661        );
662    }
663
664    #[test]
665    fn time_now_real_time() {
666        let _executor = LocalExecutorBuilder::new().build();
667        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
668        let t2 = MonotonicInstant::now().into_zx();
669        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
670        assert!(t1 <= t2);
671        assert!(t2 <= t3);
672    }
673
674    #[test]
675    fn time_now_fake_time() {
676        let executor = TestExecutorBuilder::new().fake_time(true).build();
677        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
678        executor.set_fake_time(t1);
679        assert_eq!(MonotonicInstant::now(), t1);
680
681        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
682        executor.set_fake_time(t2);
683        assert_eq!(MonotonicInstant::now(), t2);
684    }
685
686    #[test]
687    fn time_now_fake_time_boot() {
688        let executor = TestExecutorBuilder::new().fake_time(true).build();
689        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
690        executor.set_fake_time(t1);
691        assert_eq!(MonotonicInstant::now(), t1);
692        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
693
694        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
695        executor.set_fake_time(t2);
696        assert_eq!(MonotonicInstant::now(), t2);
697        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
698
699        const TEST_BOOT_OFFSET: i64 = 42;
700
701        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
702        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
703    }
704
705    #[test]
706    fn time_boot_now() {
707        let executor = TestExecutorBuilder::new().fake_time(true).build();
708        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
709        executor.set_fake_time(t1);
710        assert_eq!(MonotonicInstant::now(), t1);
711        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
712
713        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
714        executor.set_fake_time(t2);
715        assert_eq!(MonotonicInstant::now(), t2);
716        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
717
718        const TEST_BOOT_OFFSET: i64 = 42;
719
720        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
721        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
722    }
723
724    #[test]
725    fn time_after_overflow() {
726        let executor = TestExecutorBuilder::new().fake_time(true).build();
727
728        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
729        assert_eq!(
730            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
731            MonotonicInstant::INFINITE
732        );
733
734        executor.set_fake_time(
735            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
736        );
737        assert_eq!(
738            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
739            MonotonicInstant::INFINITE_PAST
740        );
741    }
742
743    // This future wakes itself up a number of times during the same cycle
744    async fn multi_wake(n: usize) {
745        let mut done = false;
746        futures::future::poll_fn(|cx| {
747            if done {
748                return Poll::Ready(());
749            }
750            for _ in 1..n {
751                cx.waker().wake_by_ref()
752            }
753            done = true;
754            Poll::Pending
755        })
756        .await;
757    }
758
759    #[test]
760    fn test_boot_time_tracks_mono_time() {
761        const FAKE_TIME: i64 = 42;
762        let executor = TestExecutorBuilder::new().fake_time(true).build();
763        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
764        assert_eq!(
765            BootInstant::from_nanos(FAKE_TIME),
766            executor.boot_now(),
767            "boot time should have advanced"
768        );
769
770        // Now advance boot without mono.
771        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
772        assert_eq!(
773            BootInstant::from_nanos(2 * FAKE_TIME),
774            executor.boot_now(),
775            "boot time should have advanced again"
776        );
777    }
778
779    // Ensure that a large amount of wakeups does not exhaust kernel resources,
780    // such as the zx port queue limit.
781    #[test]
782    fn many_wakeups() {
783        let mut executor = LocalExecutorBuilder::new().build();
784        executor.run_singlethreaded(multi_wake(4096 * 2));
785    }
786
787    fn advance_to_with(timer_duration: impl WakeupTime) {
788        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
789        executor.set_fake_time(MonotonicInstant::from_nanos(0));
790
791        let mut fut = pin!(async {
792            let timer_fired = Arc::new(AtomicBool::new(false));
793            futures::join!(
794                async {
795                    // Oneshot timer.
796                    Timer::new(timer_duration).await;
797                    timer_fired.store(true, Ordering::SeqCst);
798                },
799                async {
800                    // Interval timer, fires periodically.
801                    let mut fired = 0;
802                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
803                    while interval.next().await.is_some() {
804                        fired += 1;
805                        if fired == 3 {
806                            break;
807                        }
808                    }
809                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
810                },
811                async {
812                    assert!(
813                        !timer_fired.load(Ordering::SeqCst),
814                        "the oneshot timer shouldn't be fired"
815                    );
816                    TestExecutor::advance_to(MonotonicInstant::after(
817                        zx::MonotonicDuration::from_millis(500),
818                    ))
819                    .await;
820                    // Timer still shouldn't be fired.
821                    assert!(
822                        !timer_fired.load(Ordering::SeqCst),
823                        "the oneshot timer shouldn't be fired"
824                    );
825                    TestExecutor::advance_to(MonotonicInstant::after(
826                        zx::MonotonicDuration::from_millis(500),
827                    ))
828                    .await;
829
830                    assert!(
831                        timer_fired.load(Ordering::SeqCst),
832                        "the oneshot timer should have fired"
833                    );
834
835                    // The interval timer should have fired once.  Make it fire twice more.
836                    TestExecutor::advance_to(MonotonicInstant::after(
837                        zx::MonotonicDuration::from_seconds(2),
838                    ))
839                    .await;
840                }
841            )
842        });
843        assert!(executor.run_until_stalled(&mut fut).is_ready());
844    }
845
846    #[test]
847    fn test_advance_to() {
848        advance_to_with(zx::MonotonicDuration::from_seconds(1));
849    }
850
851    #[test]
852    fn test_advance_to_boot() {
853        advance_to_with(zx::BootDuration::from_seconds(1));
854    }
855}