fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21pub struct LocalExecutor {
31 pub(crate) ehandle: EHandle,
34 }
36
37impl fmt::Debug for LocalExecutor {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40 }
41}
42
43impl Default for LocalExecutor {
44 fn default() -> Self {
46 Self::new_with_port(zx::Port::create(), None)
47 }
48}
49
50impl LocalExecutor {
51 pub(crate) fn new_with_port(
54 port: zx::Port,
55 instrument: Option<Arc<dyn TaskInstrument>>,
56 ) -> Self {
57 let inner = Arc::new(Executor::new_with_port(
58 ExecutorTime::RealTime,
59 true,
60 1,
61 port,
62 instrument,
63 ));
64 let root_scope = ScopeHandle::root(inner);
65 Executor::set_local(root_scope.clone());
66 Self { ehandle: EHandle { root_scope } }
67 }
68
69 pub fn port(&self) -> &zx::Port {
71 self.ehandle.port()
72 }
73
74 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76 where
77 F: Future,
78 {
79 assert!(
80 self.ehandle.inner().is_real_time(),
81 "Error: called `run_singlethreaded` on an executor using fake time"
82 );
83
84 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
85 unreachable!()
86 };
87 result
88 }
89
90 fn run<const UNTIL_STALLED: bool, Fut: Future>(
91 &mut self,
92 main_future: Fut,
93 ) -> Poll<Fut::Output> {
94 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98 unsafe { std::mem::transmute(obj) }
99 }
100
101 let scope = &self.ehandle.root_scope;
102 let task = scope.new_local_task(main_future);
103 let task_handle = unsafe { remove_lifetime(task) };
106
107 scope.insert_task(task_handle.clone(), false);
108
109 struct DropMainTask<'a>(&'a EHandle, TaskHandle);
110 impl Drop for DropMainTask<'_> {
111 fn drop(&mut self) {
112 unsafe { self.0.inner().drop_main_task(&self.0.root_scope, &self.1) };
115 }
116 }
117 let _drop_main_task = DropMainTask(&self.ehandle, task_handle.clone());
118
119 std::hint::black_box(&self);
124
125 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>(Some(&task_handle));
126
127 unsafe {
130 self.ehandle
131 .global_scope()
132 .poll_join_result(&task_handle, &mut Context::from_waker(std::task::Waker::noop()))
133 }
134 }
135
136 #[doc(hidden)]
137 pub fn root_scope(&self) -> &ScopeHandle {
139 self.ehandle.global_scope()
140 }
141}
142
143impl Drop for LocalExecutor {
144 fn drop(&mut self) {
145 self.ehandle.inner().mark_done();
146 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
147 }
148}
149
150#[derive(Default)]
152pub struct LocalExecutorBuilder {
153 port: Option<zx::Port>,
154 instrument: Option<Arc<dyn TaskInstrument>>,
155}
156
157impl LocalExecutorBuilder {
158 pub fn new() -> Self {
160 Self::default()
161 }
162
163 pub fn port(mut self, port: zx::Port) -> Self {
165 self.port = Some(port);
166 self
167 }
168
169 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
171 self.instrument = instrument;
172 self
173 }
174
175 pub fn build(self) -> LocalExecutor {
177 match self.port {
178 Some(port) => LocalExecutor::new_with_port(port, self.instrument),
179 None => LocalExecutor::default(),
180 }
181 }
182}
183
184pub struct TestExecutor {
189 local: LocalExecutor,
191}
192
193impl Default for TestExecutor {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl TestExecutor {
200 pub fn new() -> Self {
202 Self::builder().build()
203 }
204
205 pub fn new_with_fake_time() -> Self {
207 Self::builder().fake_time(true).build()
208 }
209
210 pub fn builder() -> TestExecutorBuilder {
212 TestExecutorBuilder::new()
213 }
214
215 pub fn port(&self) -> &zx::Port {
217 self.local.port()
218 }
219
220 pub fn now(&self) -> MonotonicInstant {
222 self.local.ehandle.inner().now()
223 }
224
225 pub fn boot_now(&self) -> BootInstant {
227 self.local.ehandle.inner().boot_now()
228 }
229
230 pub fn set_fake_time(&self, t: MonotonicInstant) {
236 self.local.ehandle.inner().set_fake_time(t)
237 }
238
239 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
250 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
251 }
252
253 pub fn global_scope(&self) -> &ScopeHandle {
255 self.local.root_scope()
256 }
257
258 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
260 where
261 F: Future,
262 {
263 self.local.run_singlethreaded(main_future)
264 }
265
266 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
277 where
278 F: Future + Unpin,
279 {
280 let mut main_future = pin!(main_future);
281
282 struct Cleanup(Arc<Executor>);
284 impl Drop for Cleanup {
285 fn drop(&mut self) {
286 *self.0.owner_data.lock() = None;
287 }
288 }
289 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
290 *self.local.ehandle.inner().owner_data.lock() =
291 Some(Box::new(UntilStalledData { watcher: None }));
292
293 loop {
294 let result = self.local.run::<true, _>(main_future.as_mut());
295 if result.is_ready() {
296 return result;
297 }
298
299 if let Some(watcher) = with_data(|data| data.watcher.take()) {
301 watcher.waker.wake();
302 watcher.done.store(true, Ordering::Relaxed);
305 } else {
306 break;
307 }
308 }
309
310 Poll::Pending
311 }
312
313 pub fn wake_expired_timers(&mut self) -> bool {
319 self.local.ehandle.inner().monotonic_timers().wake_timers()
320 || self.local.ehandle.inner().boot_timers().wake_timers()
321 }
322
323 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
336 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
337 }
338
339 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
342 self.local.ehandle.inner().boot_timers().wake_next_timer()
343 }
344
345 pub fn next_timer() -> Option<MonotonicInstant> {
347 EHandle::local().inner().monotonic_timers().next_timer()
348 }
349
350 pub fn next_boot_timer() -> Option<BootInstant> {
352 EHandle::local().inner().boot_timers().next_timer()
353 }
354
355 pub async fn advance_to(time: MonotonicInstant) {
364 let ehandle = EHandle::local();
365 loop {
366 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
367 if let Some(next_timer) = Self::next_timer()
368 && next_timer <= time
369 {
370 ehandle.inner().set_fake_time(next_timer);
371 continue;
372 }
373 ehandle.inner().set_fake_time(time);
374 break;
375 }
376 }
377
378 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
402 let watcher =
403 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
404
405 assert!(
406 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
407 "Error: Another task has called `poll_until_stalled`."
408 );
409
410 struct Watcher(Arc<StalledWatcher>);
411
412 impl Drop for Watcher {
414 fn drop(&mut self) {
415 if !self.0.done.swap(true, Ordering::Relaxed) {
416 with_data(|data| data.watcher = None);
417 }
418 }
419 }
420
421 let watcher = Watcher(watcher);
422
423 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
424 if watcher.0.done.load(Ordering::Relaxed) {
425 Poll::Ready(())
426 } else {
427 watcher.0.waker.register(cx.waker());
428 Poll::Pending
429 }
430 });
431 match future::select(poll_fn, fut).await {
432 Either::Left(_) => Poll::Pending,
433 Either::Right((value, _)) => Poll::Ready(value),
434 }
435 }
436}
437
438#[derive(Default)]
440pub struct TestExecutorBuilder {
441 port: Option<zx::Port>,
442 fake_time: bool,
443 instrument: Option<Arc<dyn TaskInstrument>>,
444}
445
446impl TestExecutorBuilder {
447 pub fn new() -> Self {
449 Self::default()
450 }
451
452 pub fn port(mut self, port: zx::Port) -> Self {
454 self.port = Some(port);
455 self
456 }
457
458 pub fn fake_time(mut self, fake_time: bool) -> Self {
460 self.fake_time = fake_time;
461 self
462 }
463
464 pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
466 self.instrument = Some(instrument);
467 self
468 }
469
470 pub fn build(self) -> TestExecutor {
472 let time = if self.fake_time {
473 ExecutorTime::FakeTime {
474 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
475 mono_to_boot_offset_ns: AtomicI64::new(0),
476 }
477 } else {
478 ExecutorTime::RealTime
479 };
480 let port = self.port.unwrap_or_else(zx::Port::create);
481 let inner = Arc::new(Executor::new_with_port(
482 time,
483 true,
484 1,
485 port,
486 self.instrument,
487 ));
488 let root_scope = ScopeHandle::root(inner);
489 Executor::set_local(root_scope.clone());
490 let local = LocalExecutor { ehandle: EHandle { root_scope } };
491 TestExecutor { local }
492 }
493}
494
495struct StalledWatcher {
496 waker: AtomicWaker,
497 done: AtomicBool,
498}
499
500struct UntilStalledData {
501 watcher: Option<Arc<StalledWatcher>>,
502}
503
504fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
510 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
511 with TestExecutor::run_until_stalled";
512 f(EHandle::local()
513 .inner()
514 .owner_data
515 .lock()
516 .as_mut()
517 .expect(MESSAGE)
518 .downcast_mut::<UntilStalledData>()
519 .expect(MESSAGE))
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use crate::handle::on_signals::OnSignalsFuture;
526 use crate::{Interval, Timer, WakeupTime};
527 use assert_matches::assert_matches;
528 use futures::StreamExt;
529 use std::cell::{Cell, RefCell};
530 use std::rc::Rc;
531 use std::task::Waker;
532
533 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
534 crate::EHandle::local().spawn_detached(future);
535 }
536
537 #[test]
539 fn stepwise_two_steps() {
540 let fut_step = Rc::new(Cell::new(0));
541 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
542 let fut_waker_clone = fut_waker.clone();
543 let fut_step_clone = fut_step.clone();
544 let fut_fn = move |cx: &mut Context<'_>| {
545 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
546 match fut_step_clone.get() {
547 0 => {
548 fut_step_clone.set(1);
549 Poll::Pending
550 }
551 1 => {
552 fut_step_clone.set(2);
553 Poll::Ready(())
554 }
555 _ => panic!("future called after done"),
556 }
557 };
558 let fut = Box::new(future::poll_fn(fut_fn));
559 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
560 executor.local.ehandle.spawn_local_detached(fut);
563 assert_eq!(fut_step.get(), 0);
564 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
565 assert_eq!(fut_step.get(), 1);
566
567 fut_waker.borrow_mut().take().unwrap().wake();
568 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
569 assert_eq!(fut_step.get(), 2);
570 }
571
572 #[test]
573 fn stepwise_timer() {
575 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
576 executor.set_fake_time(MonotonicInstant::from_nanos(0));
577 let mut fut =
578 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
579
580 let _ = executor.run_until_stalled(&mut fut);
581 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
582
583 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
584 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
585 assert!(executor.run_until_stalled(&mut fut).is_ready());
586 }
587
588 #[test]
590 fn stepwise_event() {
591 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
592 let event = zx::Event::create();
593 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
594
595 let _ = executor.run_until_stalled(&mut fut);
596
597 event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
598 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
599 }
600
601 #[test]
604 fn run_until_stalled_preserves_order() {
605 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
606 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
607 let spawned_fut_completed_writer = spawned_fut_completed.clone();
608 let spawned_fut = Box::pin(async move {
609 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
610 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
611 });
612 let mut main_fut = pin!(async {
613 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
614 });
615 spawn(spawned_fut);
616 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
617 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
618 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
621 assert!(spawned_fut_completed.load(Ordering::SeqCst));
622 }
623
624 #[test]
625 fn task_destruction() {
626 struct DropSpawner {
627 dropped: Arc<AtomicBool>,
628 }
629 impl Drop for DropSpawner {
630 fn drop(&mut self) {
631 self.dropped.store(true, Ordering::SeqCst);
632 let dropped_clone = self.dropped.clone();
633 spawn(async {
634 let _dropped_clone = dropped_clone;
636 panic!("task spawned in drop shouldn't be polled");
637 });
638 }
639 }
640 let mut dropped = Arc::new(AtomicBool::new(false));
641 let drop_spawner = DropSpawner { dropped: dropped.clone() };
642 let mut executor = TestExecutorBuilder::new().build();
643 let mut main_fut = pin!(async move {
644 spawn(async move {
645 let _drop_spawner = drop_spawner;
647 future::pending::<()>().await;
648 });
649 });
650 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
651 assert!(
652 !dropped.load(Ordering::SeqCst),
653 "executor dropped pending task before destruction"
654 );
655
656 drop(executor);
659 let dropped = Arc::get_mut(&mut dropped)
660 .expect("someone else is unexpectedly still holding on to a reference");
661 assert!(
662 dropped.load(Ordering::SeqCst),
663 "executor did not drop pending task during destruction"
664 );
665 }
666
667 #[test]
668 fn time_now_real_time() {
669 let _executor = LocalExecutorBuilder::new().build();
670 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
671 let t2 = MonotonicInstant::now().into_zx();
672 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
673 assert!(t1 <= t2);
674 assert!(t2 <= t3);
675 }
676
677 #[test]
678 fn time_now_fake_time() {
679 let executor = TestExecutorBuilder::new().fake_time(true).build();
680 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
681 executor.set_fake_time(t1);
682 assert_eq!(MonotonicInstant::now(), t1);
683
684 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
685 executor.set_fake_time(t2);
686 assert_eq!(MonotonicInstant::now(), t2);
687 }
688
689 #[test]
690 fn time_now_fake_time_boot() {
691 let executor = TestExecutorBuilder::new().fake_time(true).build();
692 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
693 executor.set_fake_time(t1);
694 assert_eq!(MonotonicInstant::now(), t1);
695 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
696
697 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
698 executor.set_fake_time(t2);
699 assert_eq!(MonotonicInstant::now(), t2);
700 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
701
702 const TEST_BOOT_OFFSET: i64 = 42;
703
704 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
705 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
706 }
707
708 #[test]
709 fn time_boot_now() {
710 let executor = TestExecutorBuilder::new().fake_time(true).build();
711 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
712 executor.set_fake_time(t1);
713 assert_eq!(MonotonicInstant::now(), t1);
714 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
715
716 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
717 executor.set_fake_time(t2);
718 assert_eq!(MonotonicInstant::now(), t2);
719 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
720
721 const TEST_BOOT_OFFSET: i64 = 42;
722
723 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
724 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
725 }
726
727 #[test]
728 fn time_after_overflow() {
729 let executor = TestExecutorBuilder::new().fake_time(true).build();
730
731 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
732 assert_eq!(
733 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
734 MonotonicInstant::INFINITE
735 );
736
737 executor.set_fake_time(
738 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
739 );
740 assert_eq!(
741 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
742 MonotonicInstant::INFINITE_PAST
743 );
744 }
745
746 async fn multi_wake(n: usize) {
748 let mut done = false;
749 futures::future::poll_fn(|cx| {
750 if done {
751 return Poll::Ready(());
752 }
753 for _ in 1..n {
754 cx.waker().wake_by_ref()
755 }
756 done = true;
757 Poll::Pending
758 })
759 .await;
760 }
761
762 #[test]
763 fn test_boot_time_tracks_mono_time() {
764 const FAKE_TIME: i64 = 42;
765 let executor = TestExecutorBuilder::new().fake_time(true).build();
766 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
767 assert_eq!(
768 BootInstant::from_nanos(FAKE_TIME),
769 executor.boot_now(),
770 "boot time should have advanced"
771 );
772
773 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
775 assert_eq!(
776 BootInstant::from_nanos(2 * FAKE_TIME),
777 executor.boot_now(),
778 "boot time should have advanced again"
779 );
780 }
781
782 #[test]
785 fn many_wakeups() {
786 let mut executor = LocalExecutorBuilder::new().build();
787 executor.run_singlethreaded(multi_wake(4096 * 2));
788 }
789
790 fn advance_to_with(timer_duration: impl WakeupTime) {
791 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
792 executor.set_fake_time(MonotonicInstant::from_nanos(0));
793
794 let mut fut = pin!(async {
795 let timer_fired = Arc::new(AtomicBool::new(false));
796 futures::join!(
797 async {
798 Timer::new(timer_duration).await;
800 timer_fired.store(true, Ordering::SeqCst);
801 },
802 async {
803 let mut fired = 0;
805 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
806 while interval.next().await.is_some() {
807 fired += 1;
808 if fired == 3 {
809 break;
810 }
811 }
812 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
813 },
814 async {
815 assert!(
816 !timer_fired.load(Ordering::SeqCst),
817 "the oneshot timer shouldn't be fired"
818 );
819 TestExecutor::advance_to(MonotonicInstant::after(
820 zx::MonotonicDuration::from_millis(500),
821 ))
822 .await;
823 assert!(
825 !timer_fired.load(Ordering::SeqCst),
826 "the oneshot timer shouldn't be fired"
827 );
828 TestExecutor::advance_to(MonotonicInstant::after(
829 zx::MonotonicDuration::from_millis(500),
830 ))
831 .await;
832
833 assert!(
834 timer_fired.load(Ordering::SeqCst),
835 "the oneshot timer should have fired"
836 );
837
838 TestExecutor::advance_to(MonotonicInstant::after(
840 zx::MonotonicDuration::from_seconds(2),
841 ))
842 .await;
843 }
844 )
845 });
846 assert!(executor.run_until_stalled(&mut fut).is_ready());
847 }
848
849 #[test]
850 fn test_advance_to() {
851 advance_to_with(zx::MonotonicDuration::from_seconds(1));
852 }
853
854 #[test]
855 fn test_advance_to_boot() {
856 advance_to_with(zx::BootDuration::from_seconds(1));
857 }
858}