fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21/// A single-threaded port-based executor for Fuchsia.
22///
23/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
24/// [`fuchsia_async::Channel`].
25///
26/// # Panics
27///
28/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
29/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
30pub struct LocalExecutor {
31    // LINT.IfChange
32    /// The inner executor state.
33    pub(crate) ehandle: EHandle,
34    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
35}
36
37impl fmt::Debug for LocalExecutor {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40    }
41}
42
43impl Default for LocalExecutor {
44    /// Create a new single-threaded executor running with actual time.
45    fn default() -> Self {
46        Self::new_with_port(zx::Port::create(), None)
47    }
48}
49
50impl LocalExecutor {
51    /// Create a new single-threaded executor running with actual time, with a port
52    /// and instrumentation.
53    pub(crate) fn new_with_port(
54        port: zx::Port,
55        instrument: Option<Arc<dyn TaskInstrument>>,
56    ) -> Self {
57        let inner = Arc::new(Executor::new_with_port(
58            ExecutorTime::RealTime,
59            /* is_local */ true,
60            /* num_threads */ 1,
61            port,
62            instrument,
63        ));
64        let root_scope = ScopeHandle::root(inner);
65        Executor::set_local(root_scope.clone());
66        Self { ehandle: EHandle { root_scope } }
67    }
68
69    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
70    pub fn port(&self) -> &zx::Port {
71        self.ehandle.port()
72    }
73
74    /// Run a single future to completion on a single thread, also polling other active tasks.
75    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76    where
77        F: Future,
78    {
79        assert!(
80            self.ehandle.inner().is_real_time(),
81            "Error: called `run_singlethreaded` on an executor using fake time"
82        );
83
84        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
85            unreachable!()
86        };
87        result
88    }
89
90    fn run<const UNTIL_STALLED: bool, Fut: Future>(
91        &mut self,
92        main_future: Fut,
93    ) -> Poll<Fut::Output> {
94        /// # Safety
95        ///
96        /// See the comment below.
97        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98            unsafe { std::mem::transmute(obj) }
99        }
100
101        let scope = &self.ehandle.root_scope;
102        let task = scope.new_local_task(main_future);
103        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
104        // the required lifetime.
105        let task_handle = unsafe { remove_lifetime(task) };
106
107        scope.insert_task(task_handle.clone(), false);
108
109        struct DropMainTask<'a>(&'a EHandle, TaskHandle);
110        impl Drop for DropMainTask<'_> {
111            fn drop(&mut self) {
112                // SAFETY: drop_main_tasks requires that the executor isn't running
113                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
114                unsafe { self.0.inner().drop_main_task(&self.0.root_scope, &self.1) };
115            }
116        }
117        let _drop_main_task = DropMainTask(&self.ehandle, task_handle.clone());
118
119        // Ensure that this object is available for zxdb to find in this stack frame (even if it is
120        // inlined). With certain optimization settings, `self` might get optimized such that there
121        // is insufficient metadata for the debugger to recover this symbol, which it needs in
122        // order to walk the async task tree.
123        std::hint::black_box(&self);
124
125        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>(Some(&task_handle));
126
127        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
128        // here.
129        unsafe {
130            self.ehandle
131                .global_scope()
132                .poll_join_result(&task_handle, &mut Context::from_waker(std::task::Waker::noop()))
133        }
134    }
135
136    #[doc(hidden)]
137    /// Returns the root scope of the executor.
138    pub fn root_scope(&self) -> &ScopeHandle {
139        self.ehandle.global_scope()
140    }
141}
142
143impl Drop for LocalExecutor {
144    fn drop(&mut self) {
145        self.ehandle.inner().mark_done();
146        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
147    }
148}
149
150/// A builder for `LocalExecutor`.
151#[derive(Default)]
152pub struct LocalExecutorBuilder {
153    port: Option<zx::Port>,
154    instrument: Option<Arc<dyn TaskInstrument>>,
155}
156
157impl LocalExecutorBuilder {
158    /// Creates a new builder used for constructing a `LocalExecutor`.
159    pub fn new() -> Self {
160        Self::default()
161    }
162
163    /// Sets the port for the executor.
164    pub fn port(mut self, port: zx::Port) -> Self {
165        self.port = Some(port);
166        self
167    }
168
169    /// Sets the instrumentation hook.
170    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
171        self.instrument = instrument;
172        self
173    }
174
175    /// Builds the `LocalExecutor`, consuming this `LocalExecutorBuilder`.
176    pub fn build(self) -> LocalExecutor {
177        match self.port {
178            Some(port) => LocalExecutor::new_with_port(port, self.instrument),
179            None => LocalExecutor::default(),
180        }
181    }
182}
183
184/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
185/// and validating behavior of executed tasks.
186///
187/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
188pub struct TestExecutor {
189    /// LocalExecutor used under the hood, since most of the logic is shared.
190    local: LocalExecutor,
191}
192
193impl Default for TestExecutor {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl TestExecutor {
200    /// Create a new executor for testing.
201    pub fn new() -> Self {
202        Self::builder().build()
203    }
204
205    /// Create a new single-threaded executor running with fake time.
206    pub fn new_with_fake_time() -> Self {
207        Self::builder().fake_time(true).build()
208    }
209
210    /// Creates a new builder for a `TestExecutor`.
211    pub fn builder() -> TestExecutorBuilder {
212        TestExecutorBuilder::new()
213    }
214
215    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
216    pub fn port(&self) -> &zx::Port {
217        self.local.port()
218    }
219
220    /// Return the current time according to the executor.
221    pub fn now(&self) -> MonotonicInstant {
222        self.local.ehandle.inner().now()
223    }
224
225    /// Return the current time on the boot timeline, according to the executor.
226    pub fn boot_now(&self) -> BootInstant {
227        self.local.ehandle.inner().boot_now()
228    }
229
230    /// Set the fake time to a given value.
231    ///
232    /// # Panics
233    ///
234    /// If the executor was not created with fake time.
235    pub fn set_fake_time(&self, t: MonotonicInstant) {
236        self.local.ehandle.inner().set_fake_time(t)
237    }
238
239    /// Set the offset between the reading of the monotonic and the boot
240    /// clocks.
241    ///
242    /// This is useful to test the situations in which the boot and monotonic
243    /// offsets diverge.  In realistic scenarios, the offset can only grow,
244    /// and testers should keep that in view when setting duration.
245    ///
246    /// # Panics
247    ///
248    /// If the executor was not created with fake time.
249    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
250        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
251    }
252
253    /// Get the global scope of the executor.
254    pub fn global_scope(&self) -> &ScopeHandle {
255        self.local.root_scope()
256    }
257
258    /// Run a single future to completion on a single thread, also polling other active tasks.
259    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
260    where
261        F: Future,
262    {
263        self.local.run_singlethreaded(main_future)
264    }
265
266    /// Poll the future. If it is not ready, dispatch available packets and possibly try
267    /// again. Timers will only fire if this executor uses fake time. Never blocks.
268    ///
269    /// This function is for testing. DO NOT use this function in tests or applications that
270    /// involve any interaction with other threads or processes, as those interactions
271    /// may become stalled waiting for signals from "the outside world" which is beyond
272    /// the knowledge of the executor.
273    ///
274    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
275    /// futures must first be pinned using the `pin!` macro.
276    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
277    where
278        F: Future + Unpin,
279    {
280        let mut main_future = pin!(main_future);
281
282        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
283        struct Cleanup(Arc<Executor>);
284        impl Drop for Cleanup {
285            fn drop(&mut self) {
286                *self.0.owner_data.lock() = None;
287            }
288        }
289        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
290        *self.local.ehandle.inner().owner_data.lock() =
291            Some(Box::new(UntilStalledData { watcher: None }));
292
293        loop {
294            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
295            if result.is_ready() {
296                return result;
297            }
298
299            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
300            if let Some(watcher) = with_data(|data| data.watcher.take()) {
301                watcher.waker.wake();
302                // Relaxed ordering is fine here because this atomic is only ever access from the
303                // main thread.
304                watcher.done.store(true, Ordering::Relaxed);
305            } else {
306                break;
307            }
308        }
309
310        Poll::Pending
311    }
312
313    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
314    ///
315    /// This is intended for use in test code in conjunction with fake time.
316    ///
317    /// The wake will have effect on both the monotonic and the boot timers.
318    pub fn wake_expired_timers(&mut self) -> bool {
319        self.local.ehandle.inner().monotonic_timers().wake_timers()
320            || self.local.ehandle.inner().boot_timers().wake_timers()
321    }
322
323    /// Wake up the next task waiting for a timer, if any, and return the time for which the
324    /// timer was scheduled.
325    ///
326    /// This is intended for use in test code in conjunction with `run_until_stalled`.
327    /// For example, here is how one could test that the Timer future fires after the given
328    /// timeout:
329    ///
330    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
331    ///     let mut future = Timer::<Never>::new(deadline);
332    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
333    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
334    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
335    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
336        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
337    }
338
339    /// Similar to [wake_next_timer], but operates on the timers on the boot
340    /// timeline.
341    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
342        self.local.ehandle.inner().boot_timers().wake_next_timer()
343    }
344
345    /// Returns the deadline for the next timer due to expire.
346    pub fn next_timer() -> Option<MonotonicInstant> {
347        EHandle::local().inner().monotonic_timers().next_timer()
348    }
349
350    /// Returns the deadline for the next boot timeline timer due to expire.
351    pub fn next_boot_timer() -> Option<BootInstant> {
352        EHandle::local().inner().boot_timers().next_timer()
353    }
354
355    /// Advances fake time to the specified time.  This will only work if the executor is being run
356    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
357    /// will make sure that repeating timers fire as expected.
358    ///
359    /// # Panics
360    ///
361    /// Panics if the executor was not created with fake time, and for the same reasons
362    /// `poll_until_stalled` can below.
363    pub async fn advance_to(time: MonotonicInstant) {
364        let ehandle = EHandle::local();
365        loop {
366            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
367            if let Some(next_timer) = Self::next_timer()
368                && next_timer <= time
369            {
370                ehandle.inner().set_fake_time(next_timer);
371                continue;
372            }
373            ehandle.inner().set_fake_time(time);
374            break;
375        }
376    }
377
378    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
379    /// future.
380    ///
381    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
382    /// can only be called by one task at a time.
383    ///
384    /// This can be used in tests to assert that a future should be pending:
385    /// ```
386    /// assert!(
387    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
388    ///     "my_fut should not be ready!"
389    /// );
390    /// ```
391    ///
392    /// If you just want to know when the executor is stalled, you can do:
393    /// ```
394    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
395    /// ```
396    ///
397    /// # Panics
398    ///
399    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
400    /// not using `TestExecutor::run_until_stalled`.
401    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
402        let watcher =
403            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
404
405        assert!(
406            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
407            "Error: Another task has called `poll_until_stalled`."
408        );
409
410        struct Watcher(Arc<StalledWatcher>);
411
412        // Make sure we clean up if we're dropped.
413        impl Drop for Watcher {
414            fn drop(&mut self) {
415                if !self.0.done.swap(true, Ordering::Relaxed) {
416                    with_data(|data| data.watcher = None);
417                }
418            }
419        }
420
421        let watcher = Watcher(watcher);
422
423        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
424            if watcher.0.done.load(Ordering::Relaxed) {
425                Poll::Ready(())
426            } else {
427                watcher.0.waker.register(cx.waker());
428                Poll::Pending
429            }
430        });
431        match future::select(poll_fn, fut).await {
432            Either::Left(_) => Poll::Pending,
433            Either::Right((value, _)) => Poll::Ready(value),
434        }
435    }
436}
437
438/// A builder for `TestExecutor`.
439#[derive(Default)]
440pub struct TestExecutorBuilder {
441    port: Option<zx::Port>,
442    fake_time: bool,
443    instrument: Option<Arc<dyn TaskInstrument>>,
444}
445
446impl TestExecutorBuilder {
447    /// Creates a new builder used for constructing a `TestExecutor`.
448    pub fn new() -> Self {
449        Self::default()
450    }
451
452    /// Sets the port for the executor.
453    pub fn port(mut self, port: zx::Port) -> Self {
454        self.port = Some(port);
455        self
456    }
457
458    /// Sets whether the executor should use fake time.
459    pub fn fake_time(mut self, fake_time: bool) -> Self {
460        self.fake_time = fake_time;
461        self
462    }
463
464    /// Sets the task instrumentation.
465    pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
466        self.instrument = Some(instrument);
467        self
468    }
469
470    /// Builds the `TestExecutor`, consuming this `TestExecutorBuilder`.
471    pub fn build(self) -> TestExecutor {
472        let time = if self.fake_time {
473            ExecutorTime::FakeTime {
474                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
475                mono_to_boot_offset_ns: AtomicI64::new(0),
476            }
477        } else {
478            ExecutorTime::RealTime
479        };
480        let port = self.port.unwrap_or_else(zx::Port::create);
481        let inner = Arc::new(Executor::new_with_port(
482            time,
483            /* is_local */ true,
484            /* num_threads */ 1,
485            port,
486            self.instrument,
487        ));
488        let root_scope = ScopeHandle::root(inner);
489        Executor::set_local(root_scope.clone());
490        let local = LocalExecutor { ehandle: EHandle { root_scope } };
491        TestExecutor { local }
492    }
493}
494
495struct StalledWatcher {
496    waker: AtomicWaker,
497    done: AtomicBool,
498}
499
500struct UntilStalledData {
501    watcher: Option<Arc<StalledWatcher>>,
502}
503
504/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
505///
506/// # Panics
507///
508/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
509fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
510    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
511                           with TestExecutor::run_until_stalled";
512    f(EHandle::local()
513        .inner()
514        .owner_data
515        .lock()
516        .as_mut()
517        .expect(MESSAGE)
518        .downcast_mut::<UntilStalledData>()
519        .expect(MESSAGE))
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use crate::handle::on_signals::OnSignalsFuture;
526    use crate::{Interval, Timer, WakeupTime};
527    use assert_matches::assert_matches;
528    use futures::StreamExt;
529    use std::cell::{Cell, RefCell};
530    use std::rc::Rc;
531    use std::task::Waker;
532
533    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
534        crate::EHandle::local().spawn_detached(future);
535    }
536
537    // Runs a future that suspends and returns after being resumed.
538    #[test]
539    fn stepwise_two_steps() {
540        let fut_step = Rc::new(Cell::new(0));
541        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
542        let fut_waker_clone = fut_waker.clone();
543        let fut_step_clone = fut_step.clone();
544        let fut_fn = move |cx: &mut Context<'_>| {
545            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
546            match fut_step_clone.get() {
547                0 => {
548                    fut_step_clone.set(1);
549                    Poll::Pending
550                }
551                1 => {
552                    fut_step_clone.set(2);
553                    Poll::Ready(())
554                }
555                _ => panic!("future called after done"),
556            }
557        };
558        let fut = Box::new(future::poll_fn(fut_fn));
559        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
560        // Spawn the future rather than waking it the main task because run_until_stalled will wake
561        // the main future on every call, and we want to wake it ourselves using the waker.
562        executor.local.ehandle.spawn_local_detached(fut);
563        assert_eq!(fut_step.get(), 0);
564        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
565        assert_eq!(fut_step.get(), 1);
566
567        fut_waker.borrow_mut().take().unwrap().wake();
568        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
569        assert_eq!(fut_step.get(), 2);
570    }
571
572    #[test]
573    // Runs a future that waits on a timer.
574    fn stepwise_timer() {
575        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
576        executor.set_fake_time(MonotonicInstant::from_nanos(0));
577        let mut fut =
578            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
579
580        let _ = executor.run_until_stalled(&mut fut);
581        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
582
583        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
584        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
585        assert!(executor.run_until_stalled(&mut fut).is_ready());
586    }
587
588    // Runs a future that waits on an event.
589    #[test]
590    fn stepwise_event() {
591        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
592        let event = zx::Event::create();
593        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
594
595        let _ = executor.run_until_stalled(&mut fut);
596
597        event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
598        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
599    }
600
601    // Using `run_until_stalled` does not modify the order of events
602    // compared to normal execution.
603    #[test]
604    fn run_until_stalled_preserves_order() {
605        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
606        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
607        let spawned_fut_completed_writer = spawned_fut_completed.clone();
608        let spawned_fut = Box::pin(async move {
609            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
610            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
611        });
612        let mut main_fut = pin!(async {
613            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
614        });
615        spawn(spawned_fut);
616        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
617        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
618        // The timer in `spawned_fut` should fire first, then the
619        // timer in `main_fut`.
620        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
621        assert!(spawned_fut_completed.load(Ordering::SeqCst));
622    }
623
624    #[test]
625    fn task_destruction() {
626        struct DropSpawner {
627            dropped: Arc<AtomicBool>,
628        }
629        impl Drop for DropSpawner {
630            fn drop(&mut self) {
631                self.dropped.store(true, Ordering::SeqCst);
632                let dropped_clone = self.dropped.clone();
633                spawn(async {
634                    // Hold on to a reference here to verify that it, too, is destroyed later
635                    let _dropped_clone = dropped_clone;
636                    panic!("task spawned in drop shouldn't be polled");
637                });
638            }
639        }
640        let mut dropped = Arc::new(AtomicBool::new(false));
641        let drop_spawner = DropSpawner { dropped: dropped.clone() };
642        let mut executor = TestExecutorBuilder::new().build();
643        let mut main_fut = pin!(async move {
644            spawn(async move {
645                // Take ownership of the drop spawner
646                let _drop_spawner = drop_spawner;
647                future::pending::<()>().await;
648            });
649        });
650        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
651        assert!(
652            !dropped.load(Ordering::SeqCst),
653            "executor dropped pending task before destruction"
654        );
655
656        // Should drop the pending task and it's owned drop spawner,
657        // as well as gracefully drop the future spawned from the drop spawner.
658        drop(executor);
659        let dropped = Arc::get_mut(&mut dropped)
660            .expect("someone else is unexpectedly still holding on to a reference");
661        assert!(
662            dropped.load(Ordering::SeqCst),
663            "executor did not drop pending task during destruction"
664        );
665    }
666
667    #[test]
668    fn time_now_real_time() {
669        let _executor = LocalExecutorBuilder::new().build();
670        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
671        let t2 = MonotonicInstant::now().into_zx();
672        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
673        assert!(t1 <= t2);
674        assert!(t2 <= t3);
675    }
676
677    #[test]
678    fn time_now_fake_time() {
679        let executor = TestExecutorBuilder::new().fake_time(true).build();
680        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
681        executor.set_fake_time(t1);
682        assert_eq!(MonotonicInstant::now(), t1);
683
684        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
685        executor.set_fake_time(t2);
686        assert_eq!(MonotonicInstant::now(), t2);
687    }
688
689    #[test]
690    fn time_now_fake_time_boot() {
691        let executor = TestExecutorBuilder::new().fake_time(true).build();
692        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
693        executor.set_fake_time(t1);
694        assert_eq!(MonotonicInstant::now(), t1);
695        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
696
697        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
698        executor.set_fake_time(t2);
699        assert_eq!(MonotonicInstant::now(), t2);
700        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
701
702        const TEST_BOOT_OFFSET: i64 = 42;
703
704        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
705        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
706    }
707
708    #[test]
709    fn time_boot_now() {
710        let executor = TestExecutorBuilder::new().fake_time(true).build();
711        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
712        executor.set_fake_time(t1);
713        assert_eq!(MonotonicInstant::now(), t1);
714        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
715
716        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
717        executor.set_fake_time(t2);
718        assert_eq!(MonotonicInstant::now(), t2);
719        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
720
721        const TEST_BOOT_OFFSET: i64 = 42;
722
723        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
724        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
725    }
726
727    #[test]
728    fn time_after_overflow() {
729        let executor = TestExecutorBuilder::new().fake_time(true).build();
730
731        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
732        assert_eq!(
733            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
734            MonotonicInstant::INFINITE
735        );
736
737        executor.set_fake_time(
738            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
739        );
740        assert_eq!(
741            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
742            MonotonicInstant::INFINITE_PAST
743        );
744    }
745
746    // This future wakes itself up a number of times during the same cycle
747    async fn multi_wake(n: usize) {
748        let mut done = false;
749        futures::future::poll_fn(|cx| {
750            if done {
751                return Poll::Ready(());
752            }
753            for _ in 1..n {
754                cx.waker().wake_by_ref()
755            }
756            done = true;
757            Poll::Pending
758        })
759        .await;
760    }
761
762    #[test]
763    fn test_boot_time_tracks_mono_time() {
764        const FAKE_TIME: i64 = 42;
765        let executor = TestExecutorBuilder::new().fake_time(true).build();
766        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
767        assert_eq!(
768            BootInstant::from_nanos(FAKE_TIME),
769            executor.boot_now(),
770            "boot time should have advanced"
771        );
772
773        // Now advance boot without mono.
774        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
775        assert_eq!(
776            BootInstant::from_nanos(2 * FAKE_TIME),
777            executor.boot_now(),
778            "boot time should have advanced again"
779        );
780    }
781
782    // Ensure that a large amount of wakeups does not exhaust kernel resources,
783    // such as the zx port queue limit.
784    #[test]
785    fn many_wakeups() {
786        let mut executor = LocalExecutorBuilder::new().build();
787        executor.run_singlethreaded(multi_wake(4096 * 2));
788    }
789
790    fn advance_to_with(timer_duration: impl WakeupTime) {
791        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
792        executor.set_fake_time(MonotonicInstant::from_nanos(0));
793
794        let mut fut = pin!(async {
795            let timer_fired = Arc::new(AtomicBool::new(false));
796            futures::join!(
797                async {
798                    // Oneshot timer.
799                    Timer::new(timer_duration).await;
800                    timer_fired.store(true, Ordering::SeqCst);
801                },
802                async {
803                    // Interval timer, fires periodically.
804                    let mut fired = 0;
805                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
806                    while interval.next().await.is_some() {
807                        fired += 1;
808                        if fired == 3 {
809                            break;
810                        }
811                    }
812                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
813                },
814                async {
815                    assert!(
816                        !timer_fired.load(Ordering::SeqCst),
817                        "the oneshot timer shouldn't be fired"
818                    );
819                    TestExecutor::advance_to(MonotonicInstant::after(
820                        zx::MonotonicDuration::from_millis(500),
821                    ))
822                    .await;
823                    // Timer still shouldn't be fired.
824                    assert!(
825                        !timer_fired.load(Ordering::SeqCst),
826                        "the oneshot timer shouldn't be fired"
827                    );
828                    TestExecutor::advance_to(MonotonicInstant::after(
829                        zx::MonotonicDuration::from_millis(500),
830                    ))
831                    .await;
832
833                    assert!(
834                        timer_fired.load(Ordering::SeqCst),
835                        "the oneshot timer should have fired"
836                    );
837
838                    // The interval timer should have fired once.  Make it fire twice more.
839                    TestExecutor::advance_to(MonotonicInstant::after(
840                        zx::MonotonicDuration::from_seconds(2),
841                    ))
842                    .await;
843                }
844            )
845        });
846        assert!(executor.run_until_stalled(&mut fut).is_ready());
847    }
848
849    #[test]
850    fn test_advance_to() {
851        advance_to_with(zx::MonotonicDuration::from_seconds(1));
852    }
853
854    #[test]
855    fn test_advance_to_boot() {
856        advance_to_with(zx::BootDuration::from_seconds(1));
857    }
858}