Skip to main content

fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21/// A single-threaded port-based executor for Fuchsia.
22///
23/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
24/// [`fuchsia_async::Channel`].
25///
26/// # Panics
27///
28/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
29/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
30pub struct LocalExecutor {
31    // LINT.IfChange
32    /// The inner executor state.
33    pub(crate) ehandle: EHandle,
34    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
35}
36
37impl fmt::Debug for LocalExecutor {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40    }
41}
42
43impl Default for LocalExecutor {
44    /// Create a new single-threaded executor running with actual time.
45    fn default() -> Self {
46        Self::new_with_port(zx::Port::create(), None)
47    }
48}
49
50impl LocalExecutor {
51    /// Create a new single-threaded executor running with actual time, with a port
52    /// and instrumentation.
53    pub(crate) fn new_with_port(
54        port: zx::Port,
55        instrument: Option<Arc<dyn TaskInstrument>>,
56    ) -> Self {
57        let inner = Arc::new(Executor::new_with_port(
58            ExecutorTime::RealTime,
59            /* is_local */ true,
60            /* num_threads */ 1,
61            port,
62            instrument,
63        ));
64        let root_scope = ScopeHandle::root(inner);
65        Executor::set_local(root_scope.clone());
66        Self { ehandle: EHandle { root_scope } }
67    }
68
69    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
70    pub fn port(&self) -> &zx::Port {
71        self.ehandle.port()
72    }
73
74    /// Run a single future to completion on a single thread, also polling other active tasks.
75    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76    where
77        F: Future,
78    {
79        assert!(
80            self.ehandle.inner().is_real_time(),
81            "Error: called `run_singlethreaded` on an executor using fake time"
82        );
83
84        let Poll::Ready(result) = self.run(main_future, |executor, task| {
85            executor.ehandle.inner().worker_lifecycle::</*UNTIL_STALLED=*/false>(Some(task));
86            // SAFETY: This is the correct type for the main future.
87            unsafe { executor.poll_join_result(task) }
88        }) else {
89            unreachable!()
90        };
91        result
92    }
93
94    fn run<Fut: Future>(
95        &mut self, // &mut ensures exclusive access
96        main_future: Fut,
97        runner: impl FnOnce(&Self, &TaskHandle) -> Poll<Fut::Output>,
98    ) -> Poll<Fut::Output> {
99        /// # Safety
100        ///
101        /// See the comment below.
102        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
103            unsafe { std::mem::transmute(obj) }
104        }
105
106        let scope = &self.ehandle.root_scope;
107        let task = scope.new_local_task(main_future);
108        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
109        // the required lifetime.
110        let task_handle = unsafe { remove_lifetime(task) };
111
112        scope.insert_task(task_handle.clone(), false);
113
114        struct DropMainTask<'a>(&'a EHandle, TaskHandle);
115        impl Drop for DropMainTask<'_> {
116            fn drop(&mut self) {
117                // SAFETY: drop_main_tasks requires that the executor isn't running
118                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
119                unsafe { self.0.inner().drop_main_task(&self.0.root_scope, &self.1) };
120            }
121        }
122        let _drop_main_task = DropMainTask(&self.ehandle, task_handle.clone());
123
124        // Ensure that this object is available for zxdb to find in this stack frame (even if it is
125        // inlined). With certain optimization settings, `self` might get optimized such that there
126        // is insufficient metadata for the debugger to recover this symbol, which it needs in
127        // order to walk the async task tree.
128        std::hint::black_box(&self);
129
130        runner(self, &task_handle)
131    }
132
133    /// Polls the join result for the task.
134    ///
135    /// # Safety
136    ///
137    /// `R` must be the correct type.
138    unsafe fn poll_join_result<R>(&self, task: &TaskHandle) -> Poll<R> {
139        // SAFETY: See function comment.
140        unsafe {
141            self.ehandle
142                .global_scope()
143                .poll_join_result(task, &mut Context::from_waker(std::task::Waker::noop()))
144        }
145    }
146
147    #[doc(hidden)]
148    /// Returns the root scope of the executor.
149    pub fn root_scope(&self) -> &ScopeHandle {
150        self.ehandle.global_scope()
151    }
152}
153
154impl Drop for LocalExecutor {
155    fn drop(&mut self) {
156        self.ehandle.inner().mark_done();
157        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
158    }
159}
160
161/// A builder for `LocalExecutor`.
162#[derive(Default)]
163pub struct LocalExecutorBuilder {
164    port: Option<zx::Port>,
165    instrument: Option<Arc<dyn TaskInstrument>>,
166}
167
168impl LocalExecutorBuilder {
169    /// Creates a new builder used for constructing a `LocalExecutor`.
170    pub fn new() -> Self {
171        Self::default()
172    }
173
174    /// Sets the port for the executor.
175    pub fn port(mut self, port: zx::Port) -> Self {
176        self.port = Some(port);
177        self
178    }
179
180    /// Sets the instrumentation hook.
181    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
182        self.instrument = instrument;
183        self
184    }
185
186    /// Builds the `LocalExecutor`, consuming this `LocalExecutorBuilder`.
187    pub fn build(self) -> LocalExecutor {
188        match self.port {
189            Some(port) => LocalExecutor::new_with_port(port, self.instrument),
190            None => LocalExecutor::default(),
191        }
192    }
193}
194
195/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
196/// and validating behavior of executed tasks.
197///
198/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
199pub struct TestExecutor {
200    /// LocalExecutor used under the hood, since most of the logic is shared.
201    local: LocalExecutor,
202}
203
204impl Default for TestExecutor {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210impl TestExecutor {
211    /// Create a new executor for testing.
212    pub fn new() -> Self {
213        Self::builder().build()
214    }
215
216    /// Create a new single-threaded executor running with fake time.
217    pub fn new_with_fake_time() -> Self {
218        Self::builder().fake_time(true).build()
219    }
220
221    /// Creates a new builder for a `TestExecutor`.
222    pub fn builder() -> TestExecutorBuilder {
223        TestExecutorBuilder::new()
224    }
225
226    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
227    pub fn port(&self) -> &zx::Port {
228        self.local.port()
229    }
230
231    /// Return the current time according to the executor.
232    pub fn now(&self) -> MonotonicInstant {
233        self.local.ehandle.inner().now()
234    }
235
236    /// Return the current time on the boot timeline, according to the executor.
237    pub fn boot_now(&self) -> BootInstant {
238        self.local.ehandle.inner().boot_now()
239    }
240
241    /// Set the fake time to a given value.
242    ///
243    /// # Panics
244    ///
245    /// If the executor was not created with fake time.
246    pub fn set_fake_time(&self, t: MonotonicInstant) {
247        self.local.ehandle.inner().set_fake_time(t)
248    }
249
250    /// Set the offset between the reading of the monotonic and the boot
251    /// clocks.
252    ///
253    /// This is useful to test the situations in which the boot and monotonic
254    /// offsets diverge.  In realistic scenarios, the offset can only grow,
255    /// and testers should keep that in view when setting duration.
256    ///
257    /// # Panics
258    ///
259    /// If the executor was not created with fake time.
260    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
261        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
262    }
263
264    /// Get the global scope of the executor.
265    pub fn global_scope(&self) -> &ScopeHandle {
266        self.local.root_scope()
267    }
268
269    /// Run a single future to completion on a single thread, also polling other active tasks.
270    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
271    where
272        F: Future,
273    {
274        self.local.run_singlethreaded(main_future)
275    }
276
277    /// Poll the future. If it is not ready, dispatch available packets and possibly try
278    /// again. Timers will only fire if this executor uses fake time. Never blocks.
279    ///
280    /// This function is for testing. DO NOT use this function in tests or applications that
281    /// involve any interaction with other threads or processes, as those interactions
282    /// may become stalled waiting for signals from "the outside world" which is beyond
283    /// the knowledge of the executor.
284    ///
285    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
286    /// futures must first be pinned using the `pin!` macro.
287    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
288    where
289        F: Future + Unpin,
290    {
291        let main_future = pin!(main_future);
292
293        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
294        struct Cleanup(Arc<Executor>);
295        impl Drop for Cleanup {
296            fn drop(&mut self) {
297                *self.0.owner_data.lock() = None;
298            }
299        }
300        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
301        *self.local.ehandle.inner().owner_data.lock() =
302            Some(Box::new(UntilStalledData { watcher: None }));
303
304        self.local.run(main_future, |executor, task| {
305            loop {
306                executor.ehandle.inner().worker_lifecycle::</*UNTIL_STALLED=*/true>(Some(task));
307
308                // SAFETY: This is the correct type for the main future.
309                let result = unsafe { executor.poll_join_result(task) };
310
311                if result.is_ready() {
312                    break result;
313                }
314
315                // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
316                if let Some(watcher) = with_data(|data| data.watcher.take()) {
317                    watcher.waker.wake();
318                    // Relaxed ordering is fine here because this atomic is only ever access from
319                    // the main thread.
320                    watcher.done.store(true, Ordering::Relaxed);
321                } else {
322                    break Poll::Pending;
323                }
324            }
325        })
326    }
327
328    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
329    ///
330    /// This is intended for use in test code in conjunction with fake time.
331    ///
332    /// The wake will have effect on both the monotonic and the boot timers.
333    pub fn wake_expired_timers(&mut self) -> bool {
334        self.local.ehandle.inner().monotonic_timers().wake_timers()
335            || self.local.ehandle.inner().boot_timers().wake_timers()
336    }
337
338    /// Wake up the next task waiting for a timer, if any, and return the time for which the
339    /// timer was scheduled.
340    ///
341    /// This is intended for use in test code in conjunction with `run_until_stalled`.
342    /// For example, here is how one could test that the Timer future fires after the given
343    /// timeout:
344    ///
345    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
346    ///     let mut future = Timer::<Never>::new(deadline);
347    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
348    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
349    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
350    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
351        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
352    }
353
354    /// Similar to [wake_next_timer], but operates on the timers on the boot
355    /// timeline.
356    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
357        self.local.ehandle.inner().boot_timers().wake_next_timer()
358    }
359
360    /// Returns the deadline for the next timer due to expire.
361    pub fn next_timer() -> Option<MonotonicInstant> {
362        EHandle::local().inner().monotonic_timers().next_timer()
363    }
364
365    /// Returns the deadline for the next boot timeline timer due to expire.
366    pub fn next_boot_timer() -> Option<BootInstant> {
367        EHandle::local().inner().boot_timers().next_timer()
368    }
369
370    /// Advances fake time to the specified time.  This will only work if the executor is being run
371    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
372    /// will make sure that repeating timers fire as expected.
373    ///
374    /// # Panics
375    ///
376    /// Panics if the executor was not created with fake time, and for the same reasons
377    /// `poll_until_stalled` can below.
378    pub async fn advance_to(time: MonotonicInstant) {
379        let ehandle = EHandle::local();
380        loop {
381            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
382            if let Some(next_timer) = Self::next_timer()
383                && next_timer <= time
384            {
385                ehandle.inner().set_fake_time(next_timer);
386                continue;
387            }
388            ehandle.inner().set_fake_time(time);
389            break;
390        }
391    }
392
393    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
394    /// future.
395    ///
396    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
397    /// can only be called by one task at a time.
398    ///
399    /// This can be used in tests to assert that a future should be pending:
400    /// ```
401    /// assert!(
402    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
403    ///     "my_fut should not be ready!"
404    /// );
405    /// ```
406    ///
407    /// If you just want to know when the executor is stalled, you can do:
408    /// ```
409    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
410    /// ```
411    ///
412    /// # Panics
413    ///
414    /// Panics if another task is currently trying to use `poll_until_stalled`, or the executor is
415    /// not using `TestExecutor::run_until_stalled`.
416    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
417        let watcher =
418            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
419
420        assert!(
421            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
422            "Error: Another task has called `poll_until_stalled`."
423        );
424
425        struct Watcher(Arc<StalledWatcher>);
426
427        // Make sure we clean up if we're dropped.
428        impl Drop for Watcher {
429            fn drop(&mut self) {
430                if !self.0.done.swap(true, Ordering::Relaxed) {
431                    with_data(|data| data.watcher = None);
432                }
433            }
434        }
435
436        let watcher = Watcher(watcher);
437
438        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
439            if watcher.0.done.load(Ordering::Relaxed) {
440                Poll::Ready(())
441            } else {
442                watcher.0.waker.register(cx.waker());
443                Poll::Pending
444            }
445        });
446        match future::select(poll_fn, fut).await {
447            Either::Left(_) => Poll::Pending,
448            Either::Right((value, _)) => Poll::Ready(value),
449        }
450    }
451}
452
453/// A builder for `TestExecutor`.
454#[derive(Default)]
455pub struct TestExecutorBuilder {
456    port: Option<zx::Port>,
457    fake_time: bool,
458    instrument: Option<Arc<dyn TaskInstrument>>,
459}
460
461impl TestExecutorBuilder {
462    /// Creates a new builder used for constructing a `TestExecutor`.
463    pub fn new() -> Self {
464        Self::default()
465    }
466
467    /// Sets the port for the executor.
468    pub fn port(mut self, port: zx::Port) -> Self {
469        self.port = Some(port);
470        self
471    }
472
473    /// Sets whether the executor should use fake time.
474    pub fn fake_time(mut self, fake_time: bool) -> Self {
475        self.fake_time = fake_time;
476        self
477    }
478
479    /// Sets the task instrumentation.
480    pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
481        self.instrument = Some(instrument);
482        self
483    }
484
485    /// Builds the `TestExecutor`, consuming this `TestExecutorBuilder`.
486    pub fn build(self) -> TestExecutor {
487        let time = if self.fake_time {
488            ExecutorTime::FakeTime {
489                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
490                mono_to_boot_offset_ns: AtomicI64::new(0),
491            }
492        } else {
493            ExecutorTime::RealTime
494        };
495        let port = self.port.unwrap_or_else(zx::Port::create);
496        let inner = Arc::new(Executor::new_with_port(
497            time,
498            /* is_local */ true,
499            /* num_threads */ 1,
500            port,
501            self.instrument,
502        ));
503        let root_scope = ScopeHandle::root(inner);
504        Executor::set_local(root_scope.clone());
505        let local = LocalExecutor { ehandle: EHandle { root_scope } };
506        TestExecutor { local }
507    }
508}
509
510struct StalledWatcher {
511    waker: AtomicWaker,
512    done: AtomicBool,
513}
514
515struct UntilStalledData {
516    watcher: Option<Arc<StalledWatcher>>,
517}
518
519/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
520///
521/// # Panics
522///
523/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
524fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
525    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
526                           with TestExecutor::run_until_stalled";
527    f(EHandle::local()
528        .inner()
529        .owner_data
530        .lock()
531        .as_mut()
532        .expect(MESSAGE)
533        .downcast_mut::<UntilStalledData>()
534        .expect(MESSAGE))
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::handle::on_signals::OnSignalsFuture;
541    use crate::{Interval, Timer, WakeupTime};
542    use assert_matches::assert_matches;
543    use futures::StreamExt;
544    use std::cell::{Cell, RefCell};
545    use std::rc::Rc;
546    use std::task::Waker;
547
548    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
549        crate::EHandle::local().spawn_detached(future);
550    }
551
552    // Runs a future that suspends and returns after being resumed.
553    #[test]
554    fn stepwise_two_steps() {
555        let fut_step = Rc::new(Cell::new(0));
556        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
557        let fut_waker_clone = fut_waker.clone();
558        let fut_step_clone = fut_step.clone();
559        let fut_fn = move |cx: &mut Context<'_>| {
560            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
561            match fut_step_clone.get() {
562                0 => {
563                    fut_step_clone.set(1);
564                    Poll::Pending
565                }
566                1 => {
567                    fut_step_clone.set(2);
568                    Poll::Ready(())
569                }
570                _ => panic!("future called after done"),
571            }
572        };
573        let fut = Box::new(future::poll_fn(fut_fn));
574        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
575        // Spawn the future rather than waking it the main task because run_until_stalled will wake
576        // the main future on every call, and we want to wake it ourselves using the waker.
577        executor.local.ehandle.spawn_local_detached(fut);
578        assert_eq!(fut_step.get(), 0);
579        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
580        assert_eq!(fut_step.get(), 1);
581
582        fut_waker.borrow_mut().take().unwrap().wake();
583        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
584        assert_eq!(fut_step.get(), 2);
585    }
586
587    #[test]
588    // Runs a future that waits on a timer.
589    fn stepwise_timer() {
590        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
591        executor.set_fake_time(MonotonicInstant::from_nanos(0));
592        let mut fut =
593            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
594
595        let _ = executor.run_until_stalled(&mut fut);
596        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
597
598        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
599        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
600        assert!(executor.run_until_stalled(&mut fut).is_ready());
601    }
602
603    // Runs a future that waits on an event.
604    #[test]
605    fn stepwise_event() {
606        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
607        let event = zx::Event::create();
608        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
609
610        let _ = executor.run_until_stalled(&mut fut);
611
612        event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
613        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
614    }
615
616    // Using `run_until_stalled` does not modify the order of events
617    // compared to normal execution.
618    #[test]
619    fn run_until_stalled_preserves_order() {
620        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
621        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
622        let spawned_fut_completed_writer = spawned_fut_completed.clone();
623        let spawned_fut = Box::pin(async move {
624            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
625            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
626        });
627        let mut main_fut = pin!(async {
628            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
629        });
630        spawn(spawned_fut);
631        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
632        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
633        // The timer in `spawned_fut` should fire first, then the
634        // timer in `main_fut`.
635        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
636        assert!(spawned_fut_completed.load(Ordering::SeqCst));
637    }
638
639    #[test]
640    fn task_destruction() {
641        struct DropSpawner {
642            dropped: Arc<AtomicBool>,
643        }
644        impl Drop for DropSpawner {
645            fn drop(&mut self) {
646                self.dropped.store(true, Ordering::SeqCst);
647                let dropped_clone = self.dropped.clone();
648                spawn(async {
649                    // Hold on to a reference here to verify that it, too, is destroyed later
650                    let _dropped_clone = dropped_clone;
651                    panic!("task spawned in drop shouldn't be polled");
652                });
653            }
654        }
655        let mut dropped = Arc::new(AtomicBool::new(false));
656        let drop_spawner = DropSpawner { dropped: dropped.clone() };
657        let mut executor = TestExecutorBuilder::new().build();
658        let mut main_fut = pin!(async move {
659            spawn(async move {
660                // Take ownership of the drop spawner
661                let _drop_spawner = drop_spawner;
662                future::pending::<()>().await;
663            });
664        });
665        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
666        assert!(
667            !dropped.load(Ordering::SeqCst),
668            "executor dropped pending task before destruction"
669        );
670
671        // Should drop the pending task and it's owned drop spawner,
672        // as well as gracefully drop the future spawned from the drop spawner.
673        drop(executor);
674        let dropped = Arc::get_mut(&mut dropped)
675            .expect("someone else is unexpectedly still holding on to a reference");
676        assert!(
677            dropped.load(Ordering::SeqCst),
678            "executor did not drop pending task during destruction"
679        );
680    }
681
682    #[test]
683    fn time_now_real_time() {
684        let _executor = LocalExecutorBuilder::new().build();
685        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
686        let t2 = MonotonicInstant::now().into_zx();
687        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
688        assert!(t1 <= t2);
689        assert!(t2 <= t3);
690    }
691
692    #[test]
693    fn time_now_fake_time() {
694        let executor = TestExecutorBuilder::new().fake_time(true).build();
695        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
696        executor.set_fake_time(t1);
697        assert_eq!(MonotonicInstant::now(), t1);
698
699        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
700        executor.set_fake_time(t2);
701        assert_eq!(MonotonicInstant::now(), t2);
702    }
703
704    #[test]
705    fn time_now_fake_time_boot() {
706        let executor = TestExecutorBuilder::new().fake_time(true).build();
707        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
708        executor.set_fake_time(t1);
709        assert_eq!(MonotonicInstant::now(), t1);
710        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
711
712        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
713        executor.set_fake_time(t2);
714        assert_eq!(MonotonicInstant::now(), t2);
715        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
716
717        const TEST_BOOT_OFFSET: i64 = 42;
718
719        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
720        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
721    }
722
723    #[test]
724    fn time_boot_now() {
725        let executor = TestExecutorBuilder::new().fake_time(true).build();
726        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
727        executor.set_fake_time(t1);
728        assert_eq!(MonotonicInstant::now(), t1);
729        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
730
731        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
732        executor.set_fake_time(t2);
733        assert_eq!(MonotonicInstant::now(), t2);
734        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
735
736        const TEST_BOOT_OFFSET: i64 = 42;
737
738        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
739        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
740    }
741
742    #[test]
743    fn time_after_overflow() {
744        let executor = TestExecutorBuilder::new().fake_time(true).build();
745
746        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
747        assert_eq!(
748            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
749            MonotonicInstant::INFINITE
750        );
751
752        executor.set_fake_time(
753            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
754        );
755        assert_eq!(
756            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
757            MonotonicInstant::INFINITE_PAST
758        );
759    }
760
761    // This future wakes itself up a number of times during the same cycle
762    async fn multi_wake(n: usize) {
763        let mut done = false;
764        futures::future::poll_fn(|cx| {
765            if done {
766                return Poll::Ready(());
767            }
768            for _ in 1..n {
769                cx.waker().wake_by_ref()
770            }
771            done = true;
772            Poll::Pending
773        })
774        .await;
775    }
776
777    #[test]
778    fn test_boot_time_tracks_mono_time() {
779        const FAKE_TIME: i64 = 42;
780        let executor = TestExecutorBuilder::new().fake_time(true).build();
781        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
782        assert_eq!(
783            BootInstant::from_nanos(FAKE_TIME),
784            executor.boot_now(),
785            "boot time should have advanced"
786        );
787
788        // Now advance boot without mono.
789        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
790        assert_eq!(
791            BootInstant::from_nanos(2 * FAKE_TIME),
792            executor.boot_now(),
793            "boot time should have advanced again"
794        );
795    }
796
797    // Ensure that a large amount of wakeups does not exhaust kernel resources,
798    // such as the zx port queue limit.
799    #[test]
800    fn many_wakeups() {
801        let mut executor = LocalExecutorBuilder::new().build();
802        executor.run_singlethreaded(multi_wake(4096 * 2));
803    }
804
805    fn advance_to_with(timer_duration: impl WakeupTime) {
806        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
807        executor.set_fake_time(MonotonicInstant::from_nanos(0));
808
809        let mut fut = pin!(async {
810            let timer_fired = Arc::new(AtomicBool::new(false));
811            futures::join!(
812                async {
813                    // Oneshot timer.
814                    Timer::new(timer_duration).await;
815                    timer_fired.store(true, Ordering::SeqCst);
816                },
817                async {
818                    // Interval timer, fires periodically.
819                    let mut fired = 0;
820                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
821                    while interval.next().await.is_some() {
822                        fired += 1;
823                        if fired == 3 {
824                            break;
825                        }
826                    }
827                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
828                },
829                async {
830                    assert!(
831                        !timer_fired.load(Ordering::SeqCst),
832                        "the oneshot timer shouldn't be fired"
833                    );
834                    TestExecutor::advance_to(MonotonicInstant::after(
835                        zx::MonotonicDuration::from_millis(500),
836                    ))
837                    .await;
838                    // Timer still shouldn't be fired.
839                    assert!(
840                        !timer_fired.load(Ordering::SeqCst),
841                        "the oneshot timer shouldn't be fired"
842                    );
843                    TestExecutor::advance_to(MonotonicInstant::after(
844                        zx::MonotonicDuration::from_millis(500),
845                    ))
846                    .await;
847
848                    assert!(
849                        timer_fired.load(Ordering::SeqCst),
850                        "the oneshot timer should have fired"
851                    );
852
853                    // The interval timer should have fired once.  Make it fire twice more.
854                    TestExecutor::advance_to(MonotonicInstant::after(
855                        zx::MonotonicDuration::from_seconds(2),
856                    ))
857                    .await;
858                }
859            )
860        });
861        assert!(executor.run_until_stalled(&mut fut).is_ready());
862    }
863
864    #[test]
865    fn test_advance_to() {
866        advance_to_with(zx::MonotonicDuration::from_seconds(1));
867    }
868
869    #[test]
870    fn test_advance_to_boot() {
871        advance_to_with(zx::BootDuration::from_seconds(1));
872    }
873}