fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, MAIN_TASK_ID, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21pub struct LocalExecutor {
31 pub(crate) ehandle: EHandle,
34 }
36
37impl fmt::Debug for LocalExecutor {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40 }
41}
42
43impl Default for LocalExecutor {
44 fn default() -> Self {
46 Self::new_with_port(zx::Port::create(), None)
47 }
48}
49
50impl LocalExecutor {
51 pub(crate) fn new_with_port(
54 port: zx::Port,
55 instrument: Option<Arc<dyn TaskInstrument>>,
56 ) -> Self {
57 let inner = Arc::new(Executor::new_with_port(
58 ExecutorTime::RealTime,
59 true,
60 1,
61 port,
62 instrument,
63 ));
64 let root_scope = ScopeHandle::root(inner);
65 Executor::set_local(root_scope.clone());
66 Self { ehandle: EHandle { root_scope } }
67 }
68
69 pub fn port(&self) -> &zx::Port {
71 self.ehandle.port()
72 }
73
74 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76 where
77 F: Future,
78 {
79 assert!(
80 self.ehandle.inner().is_real_time(),
81 "Error: called `run_singlethreaded` on an executor using fake time"
82 );
83
84 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
85 unreachable!()
86 };
87 result
88 }
89
90 fn run<const UNTIL_STALLED: bool, Fut: Future>(
91 &mut self,
92 main_future: Fut,
93 ) -> Poll<Fut::Output> {
94 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98 std::mem::transmute(obj)
99 }
100
101 let scope = &self.ehandle.root_scope;
102 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
103
104 unsafe {
107 scope.insert_task(remove_lifetime(task), false);
108 }
109
110 struct DropMainTask<'a>(&'a EHandle);
111 impl Drop for DropMainTask<'_> {
112 fn drop(&mut self) {
113 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
116 }
117 }
118 let _drop_main_task = DropMainTask(&self.ehandle);
119
120 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
121
122 unsafe {
125 self.ehandle.global_scope().poll_join_result(
126 MAIN_TASK_ID,
127 &mut Context::from_waker(&futures::task::noop_waker()),
128 )
129 }
130 }
131
132 #[doc(hidden)]
133 pub fn root_scope(&self) -> &ScopeHandle {
135 self.ehandle.global_scope()
136 }
137}
138
139impl Drop for LocalExecutor {
140 fn drop(&mut self) {
141 self.ehandle.inner().mark_done();
142 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
143 }
144}
145
146#[derive(Default)]
148pub struct LocalExecutorBuilder {
149 port: Option<zx::Port>,
150 instrument: Option<Arc<dyn TaskInstrument>>,
151}
152
153impl LocalExecutorBuilder {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn port(mut self, port: zx::Port) -> Self {
161 self.port = Some(port);
162 self
163 }
164
165 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
167 self.instrument = instrument;
168 self
169 }
170
171 pub fn build(self) -> LocalExecutor {
173 match self.port {
174 Some(port) => LocalExecutor::new_with_port(port, self.instrument),
175 None => LocalExecutor::default(),
176 }
177 }
178}
179
180pub struct TestExecutor {
185 local: LocalExecutor,
187}
188
189impl Default for TestExecutor {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195impl TestExecutor {
196 pub fn new() -> Self {
198 Self::builder().build()
199 }
200
201 pub fn new_with_fake_time() -> Self {
203 Self::builder().fake_time(true).build()
204 }
205
206 pub fn builder() -> TestExecutorBuilder {
208 TestExecutorBuilder::new()
209 }
210
211 pub fn port(&self) -> &zx::Port {
213 self.local.port()
214 }
215
216 pub fn now(&self) -> MonotonicInstant {
218 self.local.ehandle.inner().now()
219 }
220
221 pub fn boot_now(&self) -> BootInstant {
223 self.local.ehandle.inner().boot_now()
224 }
225
226 pub fn set_fake_time(&self, t: MonotonicInstant) {
232 self.local.ehandle.inner().set_fake_time(t)
233 }
234
235 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
246 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
247 }
248
249 pub fn global_scope(&self) -> &ScopeHandle {
251 self.local.root_scope()
252 }
253
254 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
256 where
257 F: Future,
258 {
259 self.local.run_singlethreaded(main_future)
260 }
261
262 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
273 where
274 F: Future + Unpin,
275 {
276 let mut main_future = pin!(main_future);
277
278 struct Cleanup(Arc<Executor>);
280 impl Drop for Cleanup {
281 fn drop(&mut self) {
282 *self.0.owner_data.lock() = None;
283 }
284 }
285 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
286 *self.local.ehandle.inner().owner_data.lock() =
287 Some(Box::new(UntilStalledData { watcher: None }));
288
289 loop {
290 let result = self.local.run::<true, _>(main_future.as_mut());
291 if result.is_ready() {
292 return result;
293 }
294
295 if let Some(watcher) = with_data(|data| data.watcher.take()) {
297 watcher.waker.wake();
298 watcher.done.store(true, Ordering::Relaxed);
301 } else {
302 break;
303 }
304 }
305
306 Poll::Pending
307 }
308
309 pub fn wake_expired_timers(&mut self) -> bool {
315 self.local.ehandle.inner().monotonic_timers().wake_timers()
316 || self.local.ehandle.inner().boot_timers().wake_timers()
317 }
318
319 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
332 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
333 }
334
335 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
338 self.local.ehandle.inner().boot_timers().wake_next_timer()
339 }
340
341 pub fn next_timer() -> Option<MonotonicInstant> {
343 EHandle::local().inner().monotonic_timers().next_timer()
344 }
345
346 pub fn next_boot_timer() -> Option<BootInstant> {
348 EHandle::local().inner().boot_timers().next_timer()
349 }
350
351 pub async fn advance_to(time: MonotonicInstant) {
360 let ehandle = EHandle::local();
361 loop {
362 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
363 if let Some(next_timer) = Self::next_timer() {
364 if next_timer <= time {
365 ehandle.inner().set_fake_time(next_timer);
366 continue;
367 }
368 }
369 ehandle.inner().set_fake_time(time);
370 break;
371 }
372 }
373
374 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
398 let watcher =
399 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
400
401 assert!(
402 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
403 "Error: Another task has called `poll_until_stalled`."
404 );
405
406 struct Watcher(Arc<StalledWatcher>);
407
408 impl Drop for Watcher {
410 fn drop(&mut self) {
411 if !self.0.done.swap(true, Ordering::Relaxed) {
412 with_data(|data| data.watcher = None);
413 }
414 }
415 }
416
417 let watcher = Watcher(watcher);
418
419 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
420 if watcher.0.done.load(Ordering::Relaxed) {
421 Poll::Ready(())
422 } else {
423 watcher.0.waker.register(cx.waker());
424 Poll::Pending
425 }
426 });
427 match future::select(poll_fn, fut).await {
428 Either::Left(_) => Poll::Pending,
429 Either::Right((value, _)) => Poll::Ready(value),
430 }
431 }
432}
433
434#[derive(Default)]
436pub struct TestExecutorBuilder {
437 port: Option<zx::Port>,
438 fake_time: bool,
439 instrument: Option<Arc<dyn TaskInstrument>>,
440}
441
442impl TestExecutorBuilder {
443 pub fn new() -> Self {
445 Self::default()
446 }
447
448 pub fn port(mut self, port: zx::Port) -> Self {
450 self.port = Some(port);
451 self
452 }
453
454 pub fn fake_time(mut self, fake_time: bool) -> Self {
456 self.fake_time = fake_time;
457 self
458 }
459
460 pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
462 self.instrument = Some(instrument);
463 self
464 }
465
466 pub fn build(self) -> TestExecutor {
468 let time = if self.fake_time {
469 ExecutorTime::FakeTime {
470 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
471 mono_to_boot_offset_ns: AtomicI64::new(0),
472 }
473 } else {
474 ExecutorTime::RealTime
475 };
476 let port = self.port.unwrap_or_else(zx::Port::create);
477 let inner = Arc::new(Executor::new_with_port(
478 time,
479 true,
480 1,
481 port,
482 self.instrument,
483 ));
484 let root_scope = ScopeHandle::root(inner);
485 Executor::set_local(root_scope.clone());
486 let local = LocalExecutor { ehandle: EHandle { root_scope } };
487 TestExecutor { local }
488 }
489}
490
491struct StalledWatcher {
492 waker: AtomicWaker,
493 done: AtomicBool,
494}
495
496struct UntilStalledData {
497 watcher: Option<Arc<StalledWatcher>>,
498}
499
500fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
506 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
507 with TestExecutor::run_until_stalled";
508 f(EHandle::local()
509 .inner()
510 .owner_data
511 .lock()
512 .as_mut()
513 .expect(MESSAGE)
514 .downcast_mut::<UntilStalledData>()
515 .expect(MESSAGE))
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::handle::on_signals::OnSignalsFuture;
522 use crate::{Interval, Timer, WakeupTime};
523 use assert_matches::assert_matches;
524 use futures::StreamExt;
525 use std::cell::{Cell, RefCell};
526 use std::rc::Rc;
527 use std::task::Waker;
528 use zx::{self as zx, AsHandleRef};
529
530 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
531 crate::EHandle::local().spawn_detached(future);
532 }
533
534 #[test]
536 fn stepwise_two_steps() {
537 let fut_step = Rc::new(Cell::new(0));
538 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
539 let fut_waker_clone = fut_waker.clone();
540 let fut_step_clone = fut_step.clone();
541 let fut_fn = move |cx: &mut Context<'_>| {
542 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
543 match fut_step_clone.get() {
544 0 => {
545 fut_step_clone.set(1);
546 Poll::Pending
547 }
548 1 => {
549 fut_step_clone.set(2);
550 Poll::Ready(())
551 }
552 _ => panic!("future called after done"),
553 }
554 };
555 let fut = Box::new(future::poll_fn(fut_fn));
556 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
557 executor.local.ehandle.spawn_local_detached(fut);
560 assert_eq!(fut_step.get(), 0);
561 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
562 assert_eq!(fut_step.get(), 1);
563
564 fut_waker.borrow_mut().take().unwrap().wake();
565 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
566 assert_eq!(fut_step.get(), 2);
567 }
568
569 #[test]
570 fn stepwise_timer() {
572 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
573 executor.set_fake_time(MonotonicInstant::from_nanos(0));
574 let mut fut =
575 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
576
577 let _ = executor.run_until_stalled(&mut fut);
578 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
579
580 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
581 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
582 assert!(executor.run_until_stalled(&mut fut).is_ready());
583 }
584
585 #[test]
587 fn stepwise_event() {
588 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
589 let event = zx::Event::create();
590 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
591
592 let _ = executor.run_until_stalled(&mut fut);
593
594 event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
595 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
596 }
597
598 #[test]
601 fn run_until_stalled_preserves_order() {
602 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
603 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
604 let spawned_fut_completed_writer = spawned_fut_completed.clone();
605 let spawned_fut = Box::pin(async move {
606 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
607 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
608 });
609 let mut main_fut = pin!(async {
610 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
611 });
612 spawn(spawned_fut);
613 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
614 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
615 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
618 assert!(spawned_fut_completed.load(Ordering::SeqCst));
619 }
620
621 #[test]
622 fn task_destruction() {
623 struct DropSpawner {
624 dropped: Arc<AtomicBool>,
625 }
626 impl Drop for DropSpawner {
627 fn drop(&mut self) {
628 self.dropped.store(true, Ordering::SeqCst);
629 let dropped_clone = self.dropped.clone();
630 spawn(async {
631 let _dropped_clone = dropped_clone;
633 panic!("task spawned in drop shouldn't be polled");
634 });
635 }
636 }
637 let mut dropped = Arc::new(AtomicBool::new(false));
638 let drop_spawner = DropSpawner { dropped: dropped.clone() };
639 let mut executor = TestExecutorBuilder::new().build();
640 let mut main_fut = pin!(async move {
641 spawn(async move {
642 let _drop_spawner = drop_spawner;
644 future::pending::<()>().await;
645 });
646 });
647 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
648 assert!(
649 !dropped.load(Ordering::SeqCst),
650 "executor dropped pending task before destruction"
651 );
652
653 drop(executor);
656 let dropped = Arc::get_mut(&mut dropped)
657 .expect("someone else is unexpectedly still holding on to a reference");
658 assert!(
659 dropped.load(Ordering::SeqCst),
660 "executor did not drop pending task during destruction"
661 );
662 }
663
664 #[test]
665 fn time_now_real_time() {
666 let _executor = LocalExecutorBuilder::new().build();
667 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
668 let t2 = MonotonicInstant::now().into_zx();
669 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
670 assert!(t1 <= t2);
671 assert!(t2 <= t3);
672 }
673
674 #[test]
675 fn time_now_fake_time() {
676 let executor = TestExecutorBuilder::new().fake_time(true).build();
677 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
678 executor.set_fake_time(t1);
679 assert_eq!(MonotonicInstant::now(), t1);
680
681 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
682 executor.set_fake_time(t2);
683 assert_eq!(MonotonicInstant::now(), t2);
684 }
685
686 #[test]
687 fn time_now_fake_time_boot() {
688 let executor = TestExecutorBuilder::new().fake_time(true).build();
689 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
690 executor.set_fake_time(t1);
691 assert_eq!(MonotonicInstant::now(), t1);
692 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
693
694 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
695 executor.set_fake_time(t2);
696 assert_eq!(MonotonicInstant::now(), t2);
697 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
698
699 const TEST_BOOT_OFFSET: i64 = 42;
700
701 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
702 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
703 }
704
705 #[test]
706 fn time_boot_now() {
707 let executor = TestExecutorBuilder::new().fake_time(true).build();
708 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
709 executor.set_fake_time(t1);
710 assert_eq!(MonotonicInstant::now(), t1);
711 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
712
713 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
714 executor.set_fake_time(t2);
715 assert_eq!(MonotonicInstant::now(), t2);
716 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
717
718 const TEST_BOOT_OFFSET: i64 = 42;
719
720 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
721 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
722 }
723
724 #[test]
725 fn time_after_overflow() {
726 let executor = TestExecutorBuilder::new().fake_time(true).build();
727
728 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
729 assert_eq!(
730 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
731 MonotonicInstant::INFINITE
732 );
733
734 executor.set_fake_time(
735 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
736 );
737 assert_eq!(
738 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
739 MonotonicInstant::INFINITE_PAST
740 );
741 }
742
743 async fn multi_wake(n: usize) {
745 let mut done = false;
746 futures::future::poll_fn(|cx| {
747 if done {
748 return Poll::Ready(());
749 }
750 for _ in 1..n {
751 cx.waker().wake_by_ref()
752 }
753 done = true;
754 Poll::Pending
755 })
756 .await;
757 }
758
759 #[test]
760 fn test_boot_time_tracks_mono_time() {
761 const FAKE_TIME: i64 = 42;
762 let executor = TestExecutorBuilder::new().fake_time(true).build();
763 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
764 assert_eq!(
765 BootInstant::from_nanos(FAKE_TIME),
766 executor.boot_now(),
767 "boot time should have advanced"
768 );
769
770 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
772 assert_eq!(
773 BootInstant::from_nanos(2 * FAKE_TIME),
774 executor.boot_now(),
775 "boot time should have advanced again"
776 );
777 }
778
779 #[test]
782 fn many_wakeups() {
783 let mut executor = LocalExecutorBuilder::new().build();
784 executor.run_singlethreaded(multi_wake(4096 * 2));
785 }
786
787 fn advance_to_with(timer_duration: impl WakeupTime) {
788 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
789 executor.set_fake_time(MonotonicInstant::from_nanos(0));
790
791 let mut fut = pin!(async {
792 let timer_fired = Arc::new(AtomicBool::new(false));
793 futures::join!(
794 async {
795 Timer::new(timer_duration).await;
797 timer_fired.store(true, Ordering::SeqCst);
798 },
799 async {
800 let mut fired = 0;
802 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
803 while interval.next().await.is_some() {
804 fired += 1;
805 if fired == 3 {
806 break;
807 }
808 }
809 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
810 },
811 async {
812 assert!(
813 !timer_fired.load(Ordering::SeqCst),
814 "the oneshot timer shouldn't be fired"
815 );
816 TestExecutor::advance_to(MonotonicInstant::after(
817 zx::MonotonicDuration::from_millis(500),
818 ))
819 .await;
820 assert!(
822 !timer_fired.load(Ordering::SeqCst),
823 "the oneshot timer shouldn't be fired"
824 );
825 TestExecutor::advance_to(MonotonicInstant::after(
826 zx::MonotonicDuration::from_millis(500),
827 ))
828 .await;
829
830 assert!(
831 timer_fired.load(Ordering::SeqCst),
832 "the oneshot timer should have fired"
833 );
834
835 TestExecutor::advance_to(MonotonicInstant::after(
837 zx::MonotonicDuration::from_seconds(2),
838 ))
839 .await;
840 }
841 )
842 });
843 assert!(executor.run_until_stalled(&mut fut).is_ready());
844 }
845
846 #[test]
847 fn test_advance_to() {
848 advance_to_with(zx::MonotonicDuration::from_seconds(1));
849 }
850
851 #[test]
852 fn test_advance_to_boot() {
853 advance_to_with(zx::BootDuration::from_seconds(1));
854 }
855}