fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21pub struct LocalExecutor {
31 pub(crate) ehandle: EHandle,
34 }
36
37impl fmt::Debug for LocalExecutor {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40 }
41}
42
43impl Default for LocalExecutor {
44 fn default() -> Self {
46 Self::new_with_port(zx::Port::create(), None)
47 }
48}
49
50impl LocalExecutor {
51 pub(crate) fn new_with_port(
54 port: zx::Port,
55 instrument: Option<Arc<dyn TaskInstrument>>,
56 ) -> Self {
57 let inner = Arc::new(Executor::new_with_port(
58 ExecutorTime::RealTime,
59 true,
60 1,
61 port,
62 instrument,
63 ));
64 let root_scope = ScopeHandle::root(inner);
65 Executor::set_local(root_scope.clone());
66 Self { ehandle: EHandle { root_scope } }
67 }
68
69 pub fn port(&self) -> &zx::Port {
71 self.ehandle.port()
72 }
73
74 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76 where
77 F: Future,
78 {
79 assert!(
80 self.ehandle.inner().is_real_time(),
81 "Error: called `run_singlethreaded` on an executor using fake time"
82 );
83
84 let Poll::Ready(result) = self.run(main_future, |executor, task| {
85 executor.ehandle.inner().worker_lifecycle::<false>(Some(task));
86 unsafe { executor.poll_join_result(task) }
88 }) else {
89 unreachable!()
90 };
91 result
92 }
93
94 fn run<Fut: Future>(
95 &mut self, main_future: Fut,
97 runner: impl FnOnce(&Self, &TaskHandle) -> Poll<Fut::Output>,
98 ) -> Poll<Fut::Output> {
99 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
103 unsafe { std::mem::transmute(obj) }
104 }
105
106 let scope = &self.ehandle.root_scope;
107 let task = scope.new_local_task(main_future);
108 let task_handle = unsafe { remove_lifetime(task) };
111
112 scope.insert_task(task_handle.clone(), false);
113
114 struct DropMainTask<'a>(&'a EHandle, TaskHandle);
115 impl Drop for DropMainTask<'_> {
116 fn drop(&mut self) {
117 unsafe { self.0.inner().drop_main_task(&self.0.root_scope, &self.1) };
120 }
121 }
122 let _drop_main_task = DropMainTask(&self.ehandle, task_handle.clone());
123
124 std::hint::black_box(&self);
129
130 runner(self, &task_handle)
131 }
132
133 unsafe fn poll_join_result<R>(&self, task: &TaskHandle) -> Poll<R> {
139 unsafe {
141 self.ehandle
142 .global_scope()
143 .poll_join_result(task, &mut Context::from_waker(std::task::Waker::noop()))
144 }
145 }
146
147 #[doc(hidden)]
148 pub fn root_scope(&self) -> &ScopeHandle {
150 self.ehandle.global_scope()
151 }
152}
153
154impl Drop for LocalExecutor {
155 fn drop(&mut self) {
156 self.ehandle.inner().mark_done();
157 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
158 }
159}
160
161#[derive(Default)]
163pub struct LocalExecutorBuilder {
164 port: Option<zx::Port>,
165 instrument: Option<Arc<dyn TaskInstrument>>,
166}
167
168impl LocalExecutorBuilder {
169 pub fn new() -> Self {
171 Self::default()
172 }
173
174 pub fn port(mut self, port: zx::Port) -> Self {
176 self.port = Some(port);
177 self
178 }
179
180 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
182 self.instrument = instrument;
183 self
184 }
185
186 pub fn build(self) -> LocalExecutor {
188 match self.port {
189 Some(port) => LocalExecutor::new_with_port(port, self.instrument),
190 None => LocalExecutor::default(),
191 }
192 }
193}
194
195pub struct TestExecutor {
200 local: LocalExecutor,
202}
203
204impl Default for TestExecutor {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210impl TestExecutor {
211 pub fn new() -> Self {
213 Self::builder().build()
214 }
215
216 pub fn new_with_fake_time() -> Self {
218 Self::builder().fake_time(true).build()
219 }
220
221 pub fn builder() -> TestExecutorBuilder {
223 TestExecutorBuilder::new()
224 }
225
226 pub fn port(&self) -> &zx::Port {
228 self.local.port()
229 }
230
231 pub fn now(&self) -> MonotonicInstant {
233 self.local.ehandle.inner().now()
234 }
235
236 pub fn boot_now(&self) -> BootInstant {
238 self.local.ehandle.inner().boot_now()
239 }
240
241 pub fn set_fake_time(&self, t: MonotonicInstant) {
247 self.local.ehandle.inner().set_fake_time(t)
248 }
249
250 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
261 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
262 }
263
264 pub fn global_scope(&self) -> &ScopeHandle {
266 self.local.root_scope()
267 }
268
269 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
271 where
272 F: Future,
273 {
274 self.local.run_singlethreaded(main_future)
275 }
276
277 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
288 where
289 F: Future + Unpin,
290 {
291 let main_future = pin!(main_future);
292
293 struct Cleanup(Arc<Executor>);
295 impl Drop for Cleanup {
296 fn drop(&mut self) {
297 *self.0.owner_data.lock() = None;
298 }
299 }
300 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
301 *self.local.ehandle.inner().owner_data.lock() =
302 Some(Box::new(UntilStalledData { watcher: None }));
303
304 self.local.run(main_future, |executor, task| {
305 loop {
306 executor.ehandle.inner().worker_lifecycle::<true>(Some(task));
307
308 let result = unsafe { executor.poll_join_result(task) };
310
311 if result.is_ready() {
312 break result;
313 }
314
315 if let Some(watcher) = with_data(|data| data.watcher.take()) {
317 watcher.waker.wake();
318 watcher.done.store(true, Ordering::Relaxed);
321 } else {
322 break Poll::Pending;
323 }
324 }
325 })
326 }
327
328 pub fn wake_expired_timers(&mut self) -> bool {
334 self.local.ehandle.inner().monotonic_timers().wake_timers()
335 || self.local.ehandle.inner().boot_timers().wake_timers()
336 }
337
338 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
351 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
352 }
353
354 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
357 self.local.ehandle.inner().boot_timers().wake_next_timer()
358 }
359
360 pub fn next_timer() -> Option<MonotonicInstant> {
362 EHandle::local().inner().monotonic_timers().next_timer()
363 }
364
365 pub fn next_boot_timer() -> Option<BootInstant> {
367 EHandle::local().inner().boot_timers().next_timer()
368 }
369
370 pub async fn advance_to(time: MonotonicInstant) {
379 let ehandle = EHandle::local();
380 loop {
381 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
382 if let Some(next_timer) = Self::next_timer()
383 && next_timer <= time
384 {
385 ehandle.inner().set_fake_time(next_timer);
386 continue;
387 }
388 ehandle.inner().set_fake_time(time);
389 break;
390 }
391 }
392
393 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
417 let watcher =
418 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
419
420 assert!(
421 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
422 "Error: Another task has called `poll_until_stalled`."
423 );
424
425 struct Watcher(Arc<StalledWatcher>);
426
427 impl Drop for Watcher {
429 fn drop(&mut self) {
430 if !self.0.done.swap(true, Ordering::Relaxed) {
431 with_data(|data| data.watcher = None);
432 }
433 }
434 }
435
436 let watcher = Watcher(watcher);
437
438 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
439 if watcher.0.done.load(Ordering::Relaxed) {
440 Poll::Ready(())
441 } else {
442 watcher.0.waker.register(cx.waker());
443 Poll::Pending
444 }
445 });
446 match future::select(poll_fn, fut).await {
447 Either::Left(_) => Poll::Pending,
448 Either::Right((value, _)) => Poll::Ready(value),
449 }
450 }
451}
452
453#[derive(Default)]
455pub struct TestExecutorBuilder {
456 port: Option<zx::Port>,
457 fake_time: bool,
458 instrument: Option<Arc<dyn TaskInstrument>>,
459}
460
461impl TestExecutorBuilder {
462 pub fn new() -> Self {
464 Self::default()
465 }
466
467 pub fn port(mut self, port: zx::Port) -> Self {
469 self.port = Some(port);
470 self
471 }
472
473 pub fn fake_time(mut self, fake_time: bool) -> Self {
475 self.fake_time = fake_time;
476 self
477 }
478
479 pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
481 self.instrument = Some(instrument);
482 self
483 }
484
485 pub fn build(self) -> TestExecutor {
487 let time = if self.fake_time {
488 ExecutorTime::FakeTime {
489 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
490 mono_to_boot_offset_ns: AtomicI64::new(0),
491 }
492 } else {
493 ExecutorTime::RealTime
494 };
495 let port = self.port.unwrap_or_else(zx::Port::create);
496 let inner = Arc::new(Executor::new_with_port(
497 time,
498 true,
499 1,
500 port,
501 self.instrument,
502 ));
503 let root_scope = ScopeHandle::root(inner);
504 Executor::set_local(root_scope.clone());
505 let local = LocalExecutor { ehandle: EHandle { root_scope } };
506 TestExecutor { local }
507 }
508}
509
510struct StalledWatcher {
511 waker: AtomicWaker,
512 done: AtomicBool,
513}
514
515struct UntilStalledData {
516 watcher: Option<Arc<StalledWatcher>>,
517}
518
519fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
525 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
526 with TestExecutor::run_until_stalled";
527 f(EHandle::local()
528 .inner()
529 .owner_data
530 .lock()
531 .as_mut()
532 .expect(MESSAGE)
533 .downcast_mut::<UntilStalledData>()
534 .expect(MESSAGE))
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use crate::handle::on_signals::OnSignalsFuture;
541 use crate::{Interval, Timer, WakeupTime};
542 use assert_matches::assert_matches;
543 use futures::StreamExt;
544 use std::cell::{Cell, RefCell};
545 use std::rc::Rc;
546 use std::task::Waker;
547
548 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
549 crate::EHandle::local().spawn_detached(future);
550 }
551
552 #[test]
554 fn stepwise_two_steps() {
555 let fut_step = Rc::new(Cell::new(0));
556 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
557 let fut_waker_clone = fut_waker.clone();
558 let fut_step_clone = fut_step.clone();
559 let fut_fn = move |cx: &mut Context<'_>| {
560 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
561 match fut_step_clone.get() {
562 0 => {
563 fut_step_clone.set(1);
564 Poll::Pending
565 }
566 1 => {
567 fut_step_clone.set(2);
568 Poll::Ready(())
569 }
570 _ => panic!("future called after done"),
571 }
572 };
573 let fut = Box::new(future::poll_fn(fut_fn));
574 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
575 executor.local.ehandle.spawn_local_detached(fut);
578 assert_eq!(fut_step.get(), 0);
579 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
580 assert_eq!(fut_step.get(), 1);
581
582 fut_waker.borrow_mut().take().unwrap().wake();
583 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
584 assert_eq!(fut_step.get(), 2);
585 }
586
587 #[test]
588 fn stepwise_timer() {
590 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
591 executor.set_fake_time(MonotonicInstant::from_nanos(0));
592 let mut fut =
593 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
594
595 let _ = executor.run_until_stalled(&mut fut);
596 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
597
598 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
599 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
600 assert!(executor.run_until_stalled(&mut fut).is_ready());
601 }
602
603 #[test]
605 fn stepwise_event() {
606 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
607 let event = zx::Event::create();
608 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
609
610 let _ = executor.run_until_stalled(&mut fut);
611
612 event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
613 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
614 }
615
616 #[test]
619 fn run_until_stalled_preserves_order() {
620 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
621 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
622 let spawned_fut_completed_writer = spawned_fut_completed.clone();
623 let spawned_fut = Box::pin(async move {
624 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
625 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
626 });
627 let mut main_fut = pin!(async {
628 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
629 });
630 spawn(spawned_fut);
631 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
632 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
633 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
636 assert!(spawned_fut_completed.load(Ordering::SeqCst));
637 }
638
639 #[test]
640 fn task_destruction() {
641 struct DropSpawner {
642 dropped: Arc<AtomicBool>,
643 }
644 impl Drop for DropSpawner {
645 fn drop(&mut self) {
646 self.dropped.store(true, Ordering::SeqCst);
647 let dropped_clone = self.dropped.clone();
648 spawn(async {
649 let _dropped_clone = dropped_clone;
651 panic!("task spawned in drop shouldn't be polled");
652 });
653 }
654 }
655 let mut dropped = Arc::new(AtomicBool::new(false));
656 let drop_spawner = DropSpawner { dropped: dropped.clone() };
657 let mut executor = TestExecutorBuilder::new().build();
658 let mut main_fut = pin!(async move {
659 spawn(async move {
660 let _drop_spawner = drop_spawner;
662 future::pending::<()>().await;
663 });
664 });
665 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
666 assert!(
667 !dropped.load(Ordering::SeqCst),
668 "executor dropped pending task before destruction"
669 );
670
671 drop(executor);
674 let dropped = Arc::get_mut(&mut dropped)
675 .expect("someone else is unexpectedly still holding on to a reference");
676 assert!(
677 dropped.load(Ordering::SeqCst),
678 "executor did not drop pending task during destruction"
679 );
680 }
681
682 #[test]
683 fn time_now_real_time() {
684 let _executor = LocalExecutorBuilder::new().build();
685 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
686 let t2 = MonotonicInstant::now().into_zx();
687 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
688 assert!(t1 <= t2);
689 assert!(t2 <= t3);
690 }
691
692 #[test]
693 fn time_now_fake_time() {
694 let executor = TestExecutorBuilder::new().fake_time(true).build();
695 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
696 executor.set_fake_time(t1);
697 assert_eq!(MonotonicInstant::now(), t1);
698
699 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
700 executor.set_fake_time(t2);
701 assert_eq!(MonotonicInstant::now(), t2);
702 }
703
704 #[test]
705 fn time_now_fake_time_boot() {
706 let executor = TestExecutorBuilder::new().fake_time(true).build();
707 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
708 executor.set_fake_time(t1);
709 assert_eq!(MonotonicInstant::now(), t1);
710 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
711
712 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
713 executor.set_fake_time(t2);
714 assert_eq!(MonotonicInstant::now(), t2);
715 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
716
717 const TEST_BOOT_OFFSET: i64 = 42;
718
719 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
720 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
721 }
722
723 #[test]
724 fn time_boot_now() {
725 let executor = TestExecutorBuilder::new().fake_time(true).build();
726 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
727 executor.set_fake_time(t1);
728 assert_eq!(MonotonicInstant::now(), t1);
729 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
730
731 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
732 executor.set_fake_time(t2);
733 assert_eq!(MonotonicInstant::now(), t2);
734 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
735
736 const TEST_BOOT_OFFSET: i64 = 42;
737
738 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
739 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
740 }
741
742 #[test]
743 fn time_after_overflow() {
744 let executor = TestExecutorBuilder::new().fake_time(true).build();
745
746 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
747 assert_eq!(
748 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
749 MonotonicInstant::INFINITE
750 );
751
752 executor.set_fake_time(
753 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
754 );
755 assert_eq!(
756 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
757 MonotonicInstant::INFINITE_PAST
758 );
759 }
760
761 async fn multi_wake(n: usize) {
763 let mut done = false;
764 futures::future::poll_fn(|cx| {
765 if done {
766 return Poll::Ready(());
767 }
768 for _ in 1..n {
769 cx.waker().wake_by_ref()
770 }
771 done = true;
772 Poll::Pending
773 })
774 .await;
775 }
776
777 #[test]
778 fn test_boot_time_tracks_mono_time() {
779 const FAKE_TIME: i64 = 42;
780 let executor = TestExecutorBuilder::new().fake_time(true).build();
781 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
782 assert_eq!(
783 BootInstant::from_nanos(FAKE_TIME),
784 executor.boot_now(),
785 "boot time should have advanced"
786 );
787
788 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
790 assert_eq!(
791 BootInstant::from_nanos(2 * FAKE_TIME),
792 executor.boot_now(),
793 "boot time should have advanced again"
794 );
795 }
796
797 #[test]
800 fn many_wakeups() {
801 let mut executor = LocalExecutorBuilder::new().build();
802 executor.run_singlethreaded(multi_wake(4096 * 2));
803 }
804
805 fn advance_to_with(timer_duration: impl WakeupTime) {
806 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
807 executor.set_fake_time(MonotonicInstant::from_nanos(0));
808
809 let mut fut = pin!(async {
810 let timer_fired = Arc::new(AtomicBool::new(false));
811 futures::join!(
812 async {
813 Timer::new(timer_duration).await;
815 timer_fired.store(true, Ordering::SeqCst);
816 },
817 async {
818 let mut fired = 0;
820 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
821 while interval.next().await.is_some() {
822 fired += 1;
823 if fired == 3 {
824 break;
825 }
826 }
827 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
828 },
829 async {
830 assert!(
831 !timer_fired.load(Ordering::SeqCst),
832 "the oneshot timer shouldn't be fired"
833 );
834 TestExecutor::advance_to(MonotonicInstant::after(
835 zx::MonotonicDuration::from_millis(500),
836 ))
837 .await;
838 assert!(
840 !timer_fired.load(Ordering::SeqCst),
841 "the oneshot timer shouldn't be fired"
842 );
843 TestExecutor::advance_to(MonotonicInstant::after(
844 zx::MonotonicDuration::from_millis(500),
845 ))
846 .await;
847
848 assert!(
849 timer_fired.load(Ordering::SeqCst),
850 "the oneshot timer should have fired"
851 );
852
853 TestExecutor::advance_to(MonotonicInstant::after(
855 zx::MonotonicDuration::from_seconds(2),
856 ))
857 .await;
858 }
859 )
860 });
861 assert!(executor.run_until_stalled(&mut fut).is_ready());
862 }
863
864 #[test]
865 fn test_advance_to() {
866 advance_to_with(zx::MonotonicDuration::from_seconds(1));
867 }
868
869 #[test]
870 fn test_advance_to_boot() {
871 advance_to_with(zx::BootDuration::from_seconds(1));
872 }
873}