1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use super::super::timer::TimerHeap;
use super::{
    common::{with_local_timer_heap, EHandle, ExecutorTime, Inner, MAIN_TASK_ID},
    time::Time,
};
use crate::atomic_future::AtomicFuture;
use futures::{
    future::{self, Either},
    task::AtomicWaker,
};
use std::{
    fmt,
    future::{poll_fn, Future},
    pin::pin,
    sync::{
        atomic::{AtomicBool, AtomicI64, Ordering},
        Arc,
    },
    task::{Context, Poll},
};

/// A single-threaded port-based executor for Fuchsia OS.
///
/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
/// [`fuchsia_async::Channel`].
///
/// # Panics
///
/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
pub struct LocalExecutor {
    /// The inner executor state.
    pub(crate) ehandle: EHandle,
}

impl fmt::Debug for LocalExecutor {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner.port).finish()
    }
}

impl LocalExecutor {
    /// Create a new single-threaded executor running with actual time.
    pub fn new() -> Self {
        let inner = Arc::new(Inner::new(
            ExecutorTime::RealTime,
            /* is_local */ true,
            /* num_threads */ 1,
        ));
        inner.clone().set_local(TimerHeap::default());
        Self { ehandle: EHandle { inner } }
    }

    /// Run a single future to completion on a single thread, also polling other active tasks.
    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
    where
        F: Future,
    {
        assert!(
            self.ehandle.inner.is_real_time(),
            "Error: called `run_singlethreaded` on an executor using fake time"
        );

        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, F::Output>(
            // SAFETY: This is a singlethreaded executor, so the future will never be sent across
            // threads.
            unsafe { AtomicFuture::new_local(main_future, true) }
        ) else {
            unreachable!()
        };
        result
    }

    fn run<const UNTIL_STALLED: bool, R>(&mut self, main_future: AtomicFuture<'_>) -> Poll<R> {
        /// # Safety
        ///
        /// See the comment below.
        unsafe fn remove_lifetime(obj: AtomicFuture<'_>) -> AtomicFuture<'static> {
            std::mem::transmute(obj)
        }

        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
        // the required lifetime.
        self.ehandle.inner.spawn_main(unsafe { remove_lifetime(main_future) });

        struct DropMainTask<'a>(&'a Inner);
        impl Drop for DropMainTask<'_> {
            fn drop(&mut self) {
                // SAFETY: drop_main_tasks requires that the executor isn't running
                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
                unsafe { self.0.drop_main_task() };
            }
        }
        let _drop_main_task = DropMainTask(&self.ehandle.inner);

        self.ehandle.inner.worker_lifecycle::<UNTIL_STALLED>();

        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
        // here.
        unsafe {
            self.ehandle.inner.poll_join_result(
                MAIN_TASK_ID,
                &mut Context::from_waker(&futures::task::noop_waker()),
            )
        }
    }

    #[cfg(test)]
    pub(crate) fn snapshot(&self) -> super::instrumentation::Snapshot {
        self.ehandle.inner.collector.snapshot()
    }
}

impl Drop for LocalExecutor {
    fn drop(&mut self) {
        self.ehandle.inner.mark_done();
        self.ehandle.inner.on_parent_drop();
    }
}

/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
/// and validating behavior of executed tasks.
pub struct TestExecutor {
    /// LocalExecutor used under the hood, since most of the logic is shared.
    local: LocalExecutor,
}

impl TestExecutor {
    /// Create a new executor for testing.
    pub fn new() -> Self {
        Self { local: LocalExecutor::new() }
    }

    /// Create a new single-threaded executor running with fake time.
    pub fn new_with_fake_time() -> Self {
        let inner = Arc::new(Inner::new(
            ExecutorTime::FakeTime(AtomicI64::new(Time::INFINITE_PAST.into_nanos())),
            /* is_local */ true,
            /* num_threads */ 1,
        ));
        inner.clone().set_local(TimerHeap::default());
        Self { local: LocalExecutor { ehandle: EHandle { inner } } }
    }

    /// Return the current time according to the executor.
    pub fn now(&self) -> Time {
        self.local.ehandle.inner.now()
    }

    /// Set the fake time to a given value.
    ///
    /// # Panics
    ///
    /// If the executor was not created with fake time
    pub fn set_fake_time(&self, t: Time) {
        self.local.ehandle.inner.set_fake_time(t)
    }

    /// Run a single future to completion on a single thread, also polling other active tasks.
    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
    where
        F: Future,
    {
        self.local.run_singlethreaded(main_future)
    }

    /// PollResult the future. If it is not ready, dispatch available packets and possibly try
    /// again. Timers will only fire if this executor uses fake time. Never blocks.
    ///
    /// This function is for testing. DO NOT use this function in tests or applications that
    /// involve any interaction with other threads or processes, as those interactions
    /// may become stalled waiting for signals from "the outside world" which is beyond
    /// the knowledge of the executor.
    ///
    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
    /// futures must first be pinned using the `pin!` macro.
    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
    where
        F: Future + Unpin,
    {
        let mut main_future = pin!(main_future);

        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
        struct Cleanup(Arc<Inner>);
        impl Drop for Cleanup {
            fn drop(&mut self) {
                *self.0.owner_data.lock() = None;
            }
        }
        let _cleanup = Cleanup(self.local.ehandle.inner.clone());
        *self.local.ehandle.inner.owner_data.lock() =
            Some(Box::new(UntilStalledData { watcher: None }));

        loop {
            let result = self.local.run::</* UNTIL_STALLED: */ true, F::Output>(
                // SAFETY: We don't move the main future across threads.
                unsafe { AtomicFuture::new_local(main_future.as_mut(), true) }
            );
            if result.is_ready() {
                return result;
            }

            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
            if let Some(watcher) = with_data(|data| data.watcher.take()) {
                watcher.waker.wake();
                // Relaxed ordering is fine here because this atomic is only ever access from the
                // main thread.
                watcher.done.store(true, Ordering::Relaxed);
            } else {
                break;
            }
        }

        Poll::Pending
    }

    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
    ///
    /// This is intended for use in test code in conjunction with fake time.
    pub fn wake_expired_timers(&mut self) -> bool {
        let now = self.now();
        with_local_timer_heap(|timer_heap| {
            let mut ret = false;
            while let Some(waker) = timer_heap.next_deadline().filter(|waker| waker.time() <= now) {
                waker.wake();
                timer_heap.pop();
                ret = true;
            }
            ret
        })
    }

    /// Wake up the next task waiting for a timer, if any, and return the time for which the
    /// timer was scheduled.
    ///
    /// This is intended for use in test code in conjunction with `run_until_stalled`.
    /// For example, here is how one could test that the Timer future fires after the given
    /// timeout:
    ///
    ///     let deadline = 5.seconds().after_now();
    ///     let mut future = Timer::<Never>::new(deadline);
    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
    pub fn wake_next_timer(&mut self) -> Option<Time> {
        with_local_timer_heap(|timer_heap| {
            let deadline = timer_heap.next_deadline().map(|waker| {
                waker.wake();
                waker.time()
            });
            if deadline.is_some() {
                timer_heap.pop();
            }
            deadline
        })
    }

    /// Returns the deadline for the next timer due to expire.
    pub fn next_timer() -> Option<Time> {
        with_local_timer_heap(|timer_heap| timer_heap.next_deadline().map(|t| t.time()))
    }

    #[cfg(test)]
    pub(crate) fn snapshot(&self) -> super::instrumentation::Snapshot {
        self.local.ehandle.inner.collector.snapshot()
    }

    /// Advances fake time to the specified time.  This will only work if the executor is being run
    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
    /// will make sure that repeating timers fire as expected.
    ///
    /// # Panics
    ///
    /// Panics if the executor was not created with fake time, and for the same reasons
    /// `poll_until_stalled` can below.
    pub async fn advance_to(time: Time) {
        let ehandle = EHandle::local();
        loop {
            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
            if let Some(next_timer) = Self::next_timer() {
                if next_timer <= time {
                    ehandle.inner.set_fake_time(next_timer);
                    continue;
                }
            }
            ehandle.inner.set_fake_time(time);
            break;
        }
    }

    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
    /// future.
    ///
    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
    /// can only be called by one task at a time.
    ///
    /// This can be used in tests to assert that a future should be pending:
    /// ```
    /// assert!(
    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
    ///     "my_fut should not be ready!"
    /// );
    /// ```
    ///
    /// If you just want to know when the executor is stalled, you can do:
    /// ```
    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
    /// ```
    ///
    /// # Panics
    ///
    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
    /// not using `TestExecutor::run_until_stalled`.
    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
        let watcher =
            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });

        assert!(
            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
            "Error: Another task has called `poll_until_stalled`."
        );

        struct Watcher(Arc<StalledWatcher>);

        // Make sure we clean up if we're dropped.
        impl Drop for Watcher {
            fn drop(&mut self) {
                if !self.0.done.swap(true, Ordering::Relaxed) {
                    with_data(|data| data.watcher = None);
                }
            }
        }

        let watcher = Watcher(watcher);

        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
            if watcher.0.done.load(Ordering::Relaxed) {
                Poll::Ready(())
            } else {
                watcher.0.waker.register(cx.waker());
                Poll::Pending
            }
        });
        match future::select(poll_fn, fut).await {
            Either::Left(_) => Poll::Pending,
            Either::Right((value, _)) => Poll::Ready(value),
        }
    }
}

struct StalledWatcher {
    waker: AtomicWaker,
    done: AtomicBool,
}

struct UntilStalledData {
    watcher: Option<Arc<StalledWatcher>>,
}

/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
///
/// # Panics
///
/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
                           with TestExecutor::run_until_stalled";
    f(&mut EHandle::local()
        .inner
        .owner_data
        .lock()
        .as_mut()
        .expect(MESSAGE)
        .downcast_mut::<UntilStalledData>()
        .expect(MESSAGE))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{handle::on_signals::OnSignals, Interval, Timer};
    use assert_matches::assert_matches;
    use fuchsia_zircon::{self as zx, AsHandleRef, DurationNum};
    use futures::StreamExt;
    use std::{
        cell::{Cell, RefCell},
        task::Waker,
    };

    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
        crate::EHandle::local().spawn_detached(future);
    }

    // Runs a future that suspends and returns after being resumed.
    #[test]
    fn stepwise_two_steps() {
        let fut_step = Arc::new(Cell::new(0));
        let fut_waker: Arc<RefCell<Option<Waker>>> = Arc::new(RefCell::new(None));
        let fut_waker_clone = fut_waker.clone();
        let fut_step_clone = fut_step.clone();
        let fut_fn = move |cx: &mut Context<'_>| {
            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
            match fut_step_clone.get() {
                0 => {
                    fut_step_clone.set(1);
                    Poll::Pending
                }
                1 => {
                    fut_step_clone.set(2);
                    Poll::Ready(())
                }
                _ => panic!("future called after done"),
            }
        };
        let fut = Box::new(future::poll_fn(fut_fn));
        let mut executor = TestExecutor::new_with_fake_time();
        // Spawn the future rather than waking it the main task because run_until_stalled will wake
        // the main future on every call, and we want to wake it ourselves using the waker.
        executor.local.ehandle.spawn_local_detached(fut);
        assert_eq!(fut_step.get(), 0);
        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
        assert_eq!(fut_step.get(), 1);

        fut_waker.borrow_mut().take().unwrap().wake();
        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
        assert_eq!(fut_step.get(), 2);
    }

    #[test]
    // Runs a future that waits on a timer.
    fn stepwise_timer() {
        let mut executor = TestExecutor::new_with_fake_time();
        executor.set_fake_time(Time::from_nanos(0));
        let mut fut = pin!(Timer::new(Time::after(1000.nanos())));

        let _ = executor.run_until_stalled(&mut fut);
        assert_eq!(Time::now(), Time::from_nanos(0));

        executor.set_fake_time(Time::from_nanos(1000));
        assert_eq!(Time::now(), Time::from_nanos(1000));
        assert!(executor.run_until_stalled(&mut fut).is_ready());
    }

    // Runs a future that waits on an event.
    #[test]
    fn stepwise_event() {
        let mut executor = TestExecutor::new_with_fake_time();
        let event = zx::Event::create();
        let mut fut = pin!(OnSignals::new(&event, zx::Signals::USER_0));

        let _ = executor.run_until_stalled(&mut fut);

        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
    }

    // Using `run_until_stalled` does not modify the order of events
    // compared to normal execution.
    #[test]
    fn run_until_stalled_preserves_order() {
        let mut executor = TestExecutor::new_with_fake_time();
        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
        let spawned_fut_completed_writer = spawned_fut_completed.clone();
        let spawned_fut = Box::pin(async move {
            Timer::new(Time::after(5.seconds())).await;
            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
        });
        let mut main_fut = pin!(async {
            Timer::new(Time::after(10.seconds())).await;
        });
        spawn(spawned_fut);
        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
        executor.set_fake_time(Time::after(15.seconds()));
        // The timer in `spawned_fut` should fire first, then the
        // timer in `main_fut`.
        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
        assert_eq!(spawned_fut_completed.load(Ordering::SeqCst), true);
    }

    #[test]
    fn task_destruction() {
        struct DropSpawner {
            dropped: Arc<AtomicBool>,
        }
        impl Drop for DropSpawner {
            fn drop(&mut self) {
                self.dropped.store(true, Ordering::SeqCst);
                let dropped_clone = self.dropped.clone();
                spawn(async {
                    // Hold on to a reference here to verify that it, too, is destroyed later
                    let _dropped_clone = dropped_clone;
                    panic!("task spawned in drop shouldn't be polled");
                });
            }
        }
        let mut dropped = Arc::new(AtomicBool::new(false));
        let drop_spawner = DropSpawner { dropped: dropped.clone() };
        let mut executor = TestExecutor::new();
        let mut main_fut = pin!(async move {
            spawn(async move {
                // Take ownership of the drop spawner
                let _drop_spawner = drop_spawner;
                future::pending::<()>().await;
            });
        });
        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
        assert_eq!(
            dropped.load(Ordering::SeqCst),
            false,
            "executor dropped pending task before destruction"
        );

        // Should drop the pending task and it's owned drop spawner,
        // as well as gracefully drop the future spawned from the drop spawner.
        drop(executor);
        let dropped = Arc::get_mut(&mut dropped)
            .expect("someone else is unexpectedly still holding on to a reference");
        assert_eq!(
            dropped.load(Ordering::SeqCst),
            true,
            "executor did not drop pending task during destruction"
        );
    }

    #[test]
    fn time_now_real_time() {
        let _executor = LocalExecutor::new();
        let t1 = zx::Time::after(0.seconds());
        let t2 = Time::now().into_zx();
        let t3 = zx::Time::after(0.seconds());
        assert!(t1 <= t2);
        assert!(t2 <= t3);
    }

    #[test]
    fn time_now_fake_time() {
        let executor = TestExecutor::new_with_fake_time();
        let t1 = Time::from_zx(zx::Time::from_nanos(0));
        executor.set_fake_time(t1);
        assert_eq!(Time::now(), t1);

        let t2 = Time::from_zx(zx::Time::from_nanos(1000));
        executor.set_fake_time(t2);
        assert_eq!(Time::now(), t2);
    }

    #[test]
    fn time_after_overflow() {
        let executor = TestExecutor::new_with_fake_time();

        executor.set_fake_time(Time::INFINITE - 100.nanos());
        assert_eq!(Time::after(200.seconds()), Time::INFINITE);

        executor.set_fake_time(Time::INFINITE_PAST + 100.nanos());
        assert_eq!(Time::after((-200).seconds()), Time::INFINITE_PAST);
    }

    // This future wakes itself up a number of times during the same cycle
    async fn multi_wake(n: usize) {
        let mut done = false;
        futures::future::poll_fn(|cx| {
            if done {
                return Poll::Ready(());
            }
            for _ in 1..n {
                cx.waker().wake_by_ref()
            }
            done = true;
            Poll::Pending
        })
        .await;
    }

    #[test]
    fn dedup_wakeups() {
        let run = |n| {
            let mut executor = LocalExecutor::new();
            executor.run_singlethreaded(multi_wake(n));
            let snapshot = executor.ehandle.inner.collector.snapshot();
            snapshot.wakeups_notification
        };
        assert_eq!(run(5), run(10)); // Same number of notifications independent of wakeup calls
    }

    // Ensure that a large amount of wakeups does not exhaust kernel resources,
    // such as the zx port queue limit.
    #[test]
    fn many_wakeups() {
        let mut executor = LocalExecutor::new();
        executor.run_singlethreaded(multi_wake(4096 * 2));
    }

    #[test]
    fn test_advance_to() {
        let mut executor = TestExecutor::new_with_fake_time();
        executor.set_fake_time(Time::from_nanos(0));

        let mut fut = pin!(async {
            let timer_fired = Arc::new(AtomicBool::new(false));
            futures::join!(
                async {
                    Timer::new(1.seconds()).await;
                    timer_fired.store(true, Ordering::SeqCst);
                },
                async {
                    let mut fired = 0;
                    let mut interval = Interval::new(1.seconds());
                    while let Some(_) = interval.next().await {
                        fired += 1;
                        if fired == 3 {
                            break;
                        }
                    }
                    assert_eq!(fired, 3);
                },
                async {
                    assert!(!timer_fired.load(Ordering::SeqCst));
                    TestExecutor::advance_to(Time::after(500.millis())).await;
                    // Timer still shouldn't be fired.
                    assert!(!timer_fired.load(Ordering::SeqCst));
                    TestExecutor::advance_to(Time::after(500.millis())).await;

                    // The timer should have fired.
                    assert!(timer_fired.load(Ordering::SeqCst));

                    // The interval timer should have fired once.  Make it fire twice more.
                    TestExecutor::advance_to(Time::after(2.seconds())).await;
                }
            )
        });
        assert!(executor.run_until_stalled(&mut fut).is_ready());
    }
}