fuchsia_async/runtime/fuchsia/
timer.rs1use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU8, Ordering};
26use std::task::{Poll, Waker, ready};
27use zx::AsHandleRef as _;
28
29pub trait TimeInterface:
30 Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
31{
32 type Timeline: zx::Timeline + Send + Sync + 'static;
33
34 fn from_nanos(nanos: i64) -> Self;
35 fn into_nanos(self) -> i64;
36 fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
37 fn now() -> i64;
38 fn create_timer() -> zx::Timer<Self::Timeline>;
39}
40
41impl TimeInterface for MonotonicInstant {
42 type Timeline = zx::MonotonicTimeline;
43
44 fn from_nanos(nanos: i64) -> Self {
45 Self::from_nanos(nanos)
46 }
47
48 fn into_nanos(self) -> i64 {
49 self.into_nanos()
50 }
51
52 fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
53 zx::MonotonicInstant::from_nanos(nanos)
54 }
55
56 fn now() -> i64 {
57 EHandle::local().inner().now().into_nanos()
58 }
59
60 fn create_timer() -> zx::Timer<Self::Timeline> {
61 zx::Timer::<Self::Timeline>::create()
62 }
63}
64
65impl TimeInterface for BootInstant {
66 type Timeline = zx::BootTimeline;
67
68 fn from_nanos(nanos: i64) -> Self {
69 Self::from_nanos(nanos)
70 }
71
72 fn into_nanos(self) -> i64 {
73 self.into_nanos()
74 }
75
76 fn zx_instant(nanos: i64) -> zx::BootInstant {
77 zx::BootInstant::from_nanos(nanos)
78 }
79
80 fn now() -> i64 {
81 EHandle::local().inner().boot_now().into_nanos()
82 }
83
84 fn create_timer() -> zx::Timer<Self::Timeline> {
85 zx::Timer::<Self::Timeline>::create()
86 }
87}
88
89impl WakeupTime for std::time::Instant {
90 fn into_timer(self) -> Timer {
91 let now_as_instant = std::time::Instant::now();
92 let now_as_time = MonotonicInstant::now();
93 EHandle::local()
94 .mono_timers()
95 .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
96 }
97}
98
99impl WakeupTime for MonotonicInstant {
100 fn into_timer(self) -> Timer {
101 EHandle::local().mono_timers().new_timer(self)
102 }
103}
104
105impl WakeupTime for BootInstant {
106 fn into_timer(self) -> Timer {
107 EHandle::local().boot_timers().new_timer(self)
108 }
109}
110
111impl WakeupTime for zx::MonotonicInstant {
112 fn into_timer(self) -> Timer {
113 EHandle::local().mono_timers().new_timer(self.into())
114 }
115}
116
117impl WakeupTime for zx::BootInstant {
118 fn into_timer(self) -> Timer {
119 EHandle::local().boot_timers().new_timer(self.into())
120 }
121}
122
123#[must_use = "futures do nothing unless polled"]
125pub struct Timer(TimerState);
126
127impl Timer {
128 pub fn new(time: impl WakeupTime) -> Self {
130 time.into_timer()
131 }
132
133 pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
135 let nanos = time.into_nanos();
136 if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
147 || !self.0.timers.try_reset_timer(&self.0, nanos)
148 {
149 unsafe { *self.0.nanos.get() = nanos };
152 self.0.state.store(UNREGISTERED, Ordering::Relaxed);
153 }
154 }
155}
156
157impl fmt::Debug for Timer {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
159 f.debug_struct("Timer").field("time", &self.0.nanos).finish()
160 }
161}
162
163impl Drop for Timer {
164 fn drop(&mut self) {
165 self.0.timers.unregister(&self.0);
166 }
167}
168
169impl Future for Timer {
170 type Output = ();
171 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172 unsafe { self.0.timers.poll(self.as_ref(), cx) }
174 }
175}
176
177struct TimerState {
178 timers: Arc<dyn TimersInterface>,
179
180 nanos: UnsafeCell<i64>,
182
183 waker: AtomicWaker,
184 state: AtomicU8,
185
186 index: UnsafeCell<HeapIndex>,
189
190 _pinned: PhantomPinned,
192}
193
194unsafe impl Send for TimerState {}
196unsafe impl Sync for TimerState {}
197
198const UNREGISTERED: u8 = 0;
200
201const REGISTERED: u8 = 1;
203
204const FIRED: u8 = 2;
206
207const TERMINATED: u8 = 3;
209
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
212struct HeapIndex(usize);
213
214impl HeapIndex {
215 const NULL: HeapIndex = HeapIndex(usize::MAX);
216
217 fn get(&self) -> Option<usize> {
218 if *self == HeapIndex::NULL { None } else { Some(self.0) }
219 }
220}
221
222impl From<usize> for HeapIndex {
223 fn from(value: usize) -> Self {
224 Self(value)
225 }
226}
227
228impl FusedFuture for Timer {
229 fn is_terminated(&self) -> bool {
230 self.0.state.load(Ordering::Relaxed) == TERMINATED
231 }
232}
233
234#[derive(Copy, Clone, Debug)]
245struct StateRef(*const TimerState);
246
247unsafe impl Send for StateRef {}
249unsafe impl Sync for StateRef {}
250
251impl StateRef {
252 fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
253 unsafe {
255 (*self.0).state.store(FIRED, Ordering::Relaxed);
262 (*self.0).waker.take()
263 }
264 }
265
266 unsafe fn nanos(&self) -> i64 {
270 *(*self.0).nanos.get()
271 }
272
273 unsafe fn nanos_mut(&mut self) -> &mut i64 {
277 &mut *(*self.0).nanos.get()
278 }
279
280 unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
284 std::mem::replace(&mut *(*self.0).index.get(), index)
285 }
286}
287
288#[derive(Debug)]
295#[must_use = "streams do nothing unless polled"]
296pub struct Interval {
297 timer: Pin<Box<Timer>>,
298 next: MonotonicInstant,
299 duration: zx::MonotonicDuration,
300}
301
302impl Interval {
303 pub fn new(duration: zx::MonotonicDuration) -> Self {
305 let next = MonotonicInstant::after(duration);
306 Interval { timer: Box::pin(Timer::new(next)), next, duration }
307 }
308}
309
310impl FusedStream for Interval {
311 fn is_terminated(&self) -> bool {
312 false
314 }
315}
316
317impl Stream for Interval {
318 type Item = ();
319 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320 ready!(self.timer.poll_unpin(cx));
321 let next = self.next + self.duration;
322 self.timer.as_mut().reset(next);
323 self.next = next;
324 Poll::Ready(Some(()))
325 }
326}
327
328pub(crate) struct Timers<T: TimeInterface> {
329 inner: Mutex<Inner>,
330
331 fake: bool,
332
333 timer: zx::Timer<T::Timeline>,
334
335 receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
337}
338
339struct Inner {
340 timers: Heap,
342
343 last_deadline: Option<i64>,
346
347 async_wait: bool,
349
350 port_key: u64,
352}
353
354impl<T: TimeInterface> Timers<T> {
355 pub fn new(fake: bool) -> Self {
356 Self {
357 inner: Mutex::new(Inner {
358 timers: Heap::default(),
359 last_deadline: None,
360 async_wait: false,
361 port_key: 0,
362 }),
363 fake,
364 timer: T::create_timer(),
365 receiver_registration: Mutex::default(),
366 }
367 }
368
369 pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
370 let nanos = time.into_nanos();
371 Timer(TimerState {
372 timers: self.clone(),
373 nanos: UnsafeCell::new(nanos),
374 waker: AtomicWaker::new(),
375 state: AtomicU8::new(UNREGISTERED),
376 index: UnsafeCell::new(HeapIndex::NULL),
377 _pinned: PhantomPinned,
378 })
379 }
380
381 pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
384 let key = self
385 .receiver_registration
386 .lock()
387 .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
388 .key();
389 self.inner.lock().port_key = key;
390 }
391
392 pub fn deregister(&self) {
394 *self.receiver_registration.lock() = None;
395 }
396
397 fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
404 let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
407
408 if new_deadline != inner.last_deadline {
411 inner.last_deadline = new_deadline;
412 match inner.last_deadline {
413 Some(deadline) => {
414 self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
415 }
416 None => self.timer.cancel().unwrap(),
417 }
418 }
419
420 if from_receive_packet {
424 inner.async_wait = false;
425 }
426
427 if inner.last_deadline.is_some() && !inner.async_wait {
430 if self.fake {
431 self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
434 }
435
436 self.timer
437 .wait_async_handle(
438 EHandle::local().port(),
439 inner.port_key,
440 if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
441 zx::WaitAsyncOpts::empty(),
442 )
443 .unwrap();
444
445 inner.async_wait = true;
446 }
447 }
448
449 pub fn wake_timers(&self) -> bool {
451 self.wake_timers_impl(false)
452 }
453
454 fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
455 let now = T::now();
456 let mut timers_woken = false;
457
458 loop {
459 let waker = {
460 let mut inner = self.inner.lock();
461
462 let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
464 if deadline.is_some_and(|d| d <= now) {
465 let timer = inner.timers.pop().unwrap();
466 timer.into_waker(&mut inner)
467 } else {
468 self.setup_zircon_timer(&mut inner, from_receive_packet);
481 break;
482 }
483 };
484 if let Some(waker) = waker {
485 waker.wake()
486 }
487 timers_woken = true;
488 }
489 timers_woken
490 }
491
492 pub fn wake_next_timer(&self) -> Option<T> {
494 let (nanos, waker) = {
495 let mut inner = self.inner.lock();
496 let timer = inner.timers.pop()?;
497 let nanos = unsafe { timer.nanos() };
499 (nanos, timer.into_waker(&mut inner))
500 };
501 if let Some(waker) = waker {
502 waker.wake();
503 }
504 Some(T::from_nanos(nanos))
505 }
506
507 pub fn next_timer(&self) -> Option<T> {
509 self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
511 }
512
513 pub fn maybe_notify(&self, now: T) {
519 assert!(self.fake, "calling this function requires using fake time.");
520 if self
522 .inner
523 .lock()
524 .timers
525 .peek()
526 .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
527 {
528 self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
529 }
530 }
531}
532
533impl<T: TimeInterface> PacketReceiver for Timers<T> {
534 fn receive_packet(&self, _packet: zx::Packet) {
535 self.wake_timers_impl(true);
536 }
537}
538
539trait TimersInterface: Send + Sync + 'static {
541 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
542 fn unregister(&self, state: &TimerState);
543 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
544}
545
546impl<T: TimeInterface> TimersInterface for Timers<T> {
547 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
551 let state = timer.0.state.load(Ordering::Relaxed);
562
563 if state == TERMINATED {
564 return Poll::Ready(());
565 }
566
567 if state == FIRED {
568 timer.0.state.store(TERMINATED, Ordering::Relaxed);
569 return Poll::Ready(());
570 }
571
572 if state == UNREGISTERED {
573 let nanos = unsafe { *timer.0.nanos.get() };
575 if nanos <= T::now() {
576 timer.0.state.store(FIRED, Ordering::Relaxed);
577 return Poll::Ready(());
578 }
579 let mut inner = self.inner.lock();
580
581 inner.timers.push(StateRef(&timer.0));
584
585 self.setup_zircon_timer(&mut inner, false);
588
589 timer.0.state.store(REGISTERED, Ordering::Relaxed);
590 }
591
592 timer.0.waker.register(cx.waker());
593
594 let state = timer.0.state.load(Ordering::Relaxed);
602 match state {
603 FIRED => {
604 timer.0.state.store(TERMINATED, Ordering::Relaxed);
605 Poll::Ready(())
606 }
607 REGISTERED => Poll::Pending,
608 _ => {
612 unreachable!();
613 }
614 }
615 }
616
617 fn unregister(&self, timer: &TimerState) {
618 if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
619 return;
626 }
627 let mut inner = self.inner.lock();
628 let index = unsafe { *timer.index.get() };
630 if let Some(index) = index.get() {
631 inner.timers.remove(index);
632 if index == 0 {
633 self.setup_zircon_timer(&mut inner, false);
636 }
637 timer.state.store(UNREGISTERED, Ordering::Relaxed);
638 }
639 }
640
641 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
643 let mut inner = self.inner.lock();
644 let index = unsafe { *timer.index.get() };
646 if let Some(old_index) = index.get() {
647 if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
648 self.setup_zircon_timer(&mut inner, false);
651 }
652 timer.state.store(REGISTERED, Ordering::Relaxed);
653 true
654 } else {
655 false
656 }
657 }
658}
659
660#[derive(Default)]
661struct Heap(Vec<StateRef>);
662
663impl Heap {
666 fn push(&mut self, mut timer: StateRef) {
667 let index = self.0.len();
668 self.0.push(timer);
669 unsafe {
671 timer.set_index(index.into());
672 }
673 self.fix_up(index);
674 }
675
676 fn peek(&self) -> Option<&StateRef> {
677 self.0.first()
678 }
679
680 fn pop(&mut self) -> Option<StateRef> {
681 if let Some(&first) = self.0.first() {
682 self.remove(0);
683 Some(first)
684 } else {
685 None
686 }
687 }
688
689 fn swap(&mut self, a: usize, b: usize) {
690 self.0.swap(a, b);
691 unsafe {
693 self.0[a].set_index(a.into());
694 self.0[b].set_index(b.into());
695 }
696 }
697
698 fn reset(&mut self, index: usize, nanos: i64) -> usize {
700 if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
702 self.fix_up(index)
703 } else {
704 self.fix_down(index)
705 }
706 }
707
708 fn remove(&mut self, index: usize) {
709 unsafe {
711 let old_index = self.0[index].set_index(HeapIndex::NULL);
712 debug_assert_eq!(old_index, index.into());
713 }
714
715 let last = self.0.len() - 1;
718 if index < last {
719 let fix_up;
720 unsafe {
721 fix_up = self.0[last].nanos() < self.0[index].nanos();
723 self.0[index] = self.0[last];
724 self.0[index].set_index(index.into());
725 };
726 self.0.truncate(last);
727 if fix_up {
728 self.fix_up(index);
729 } else {
730 self.fix_down(index);
731 }
732 } else {
733 self.0.truncate(last);
734 };
735 }
736
737 fn fix_up(&mut self, mut index: usize) -> usize {
739 while index > 0 {
740 let parent = (index - 1) / 2;
741 if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
743 return index;
744 }
745 self.swap(parent, index);
746 index = parent;
747 }
748 index
749 }
750
751 fn fix_down(&mut self, mut index: usize) -> usize {
753 let len = self.0.len();
754 loop {
755 let left = index * 2 + 1;
756 if left >= len {
757 return index;
758 }
759
760 let mut swap_with = None;
761
762 unsafe {
764 let mut nanos = self.0[index].nanos();
765 let left_nanos = self.0[left].nanos();
766 if left_nanos < nanos {
767 swap_with = Some(left);
768 nanos = left_nanos;
769 }
770 let right = left + 1;
771 if right < len && self.0[right].nanos() < nanos {
772 swap_with = Some(right);
773 }
774 }
775
776 let Some(swap_with) = swap_with else { return index };
777 self.swap(index, swap_with);
778 index = swap_with;
779 }
780 }
781}
782
783#[cfg(test)]
784mod test {
785 use super::*;
786 use crate::{LocalExecutor, SendExecutorBuilder, Task, TestExecutor};
787 use assert_matches::assert_matches;
788 use futures::channel::oneshot::channel;
789 use futures::future::Either;
790 use futures::prelude::*;
791 use rand::seq::SliceRandom;
792 use rand::{Rng, rng};
793 use std::future::poll_fn;
794 use std::pin::pin;
795 use zx::MonotonicDuration;
796
797 trait TestTimeInterface:
798 TimeInterface
799 + WakeupTime
800 + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
801 + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
802 {
803 fn after(duration: zx::Duration<Self::Timeline>) -> Self;
804 }
805
806 impl TestTimeInterface for MonotonicInstant {
807 fn after(duration: zx::MonotonicDuration) -> Self {
808 Self::after(duration)
809 }
810 }
811
812 impl TestTimeInterface for BootInstant {
813 fn after(duration: zx::BootDuration) -> Self {
814 Self::after(duration)
815 }
816 }
817
818 fn test_shorter_fires_first<T: TestTimeInterface>() {
819 let mut exec = LocalExecutor::default();
820 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
821 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
822 match exec.run_singlethreaded(future::select(shorter, longer)) {
823 Either::Left(_) => {}
824 Either::Right(_) => panic!("wrong timer fired"),
825 }
826 }
827
828 #[test]
829 fn shorter_fires_first() {
830 test_shorter_fires_first::<MonotonicInstant>();
831 test_shorter_fires_first::<BootInstant>();
832 }
833
834 fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
835 SendExecutorBuilder::new().num_threads(4).build().run(async {
836 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
837 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
838 match future::select(shorter, longer).await {
839 Either::Left(_) => {}
840 Either::Right(_) => panic!("wrong timer fired"),
841 }
842 });
843 }
844
845 #[test]
846 fn shorter_fires_first_multithreaded() {
847 test_shorter_fires_first_multithreaded::<MonotonicInstant>();
848 test_shorter_fires_first_multithreaded::<BootInstant>();
849 }
850
851 fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
852 let mut exec = TestExecutor::new();
853 let now = T::now();
854 let before = pin!(Timer::new(T::from_nanos(now - 1)));
855 let after = pin!(Timer::new(T::from_nanos(now + 1)));
856 assert_matches!(
857 exec.run_singlethreaded(futures::future::select(before, after)),
858 Either::Left(_),
859 "Timer in the past should fire first"
860 );
861 }
862
863 #[test]
864 fn timer_before_now_fires_immediately() {
865 test_timer_before_now_fires_immediately::<MonotonicInstant>();
866 test_timer_before_now_fires_immediately::<BootInstant>();
867 }
868
869 #[test]
870 fn fires_after_timeout() {
871 let mut exec = TestExecutor::new_with_fake_time();
872 exec.set_fake_time(MonotonicInstant::from_nanos(0));
873 let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
874 let mut future = pin!(Timer::new(deadline));
875 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
876 exec.set_fake_time(deadline);
877 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
878 }
879
880 #[test]
881 fn interval() {
882 let mut exec = TestExecutor::new_with_fake_time();
883 let start = MonotonicInstant::from_nanos(0);
884 exec.set_fake_time(start);
885
886 let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
887 let mut future = pin!({
888 let counter = counter.clone();
889 Interval::new(MonotonicDuration::from_seconds(5))
890 .map(move |()| {
891 counter.fetch_add(1, Ordering::SeqCst);
892 })
893 .collect::<()>()
894 });
895
896 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
898 assert_eq!(0, counter.load(Ordering::SeqCst));
899
900 let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
902 assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
903 exec.set_fake_time(first_deadline);
904 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
905 assert_eq!(1, counter.load(Ordering::SeqCst));
906
907 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
909 assert_eq!(1, counter.load(Ordering::SeqCst));
910
911 let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
913 exec.set_fake_time(second_deadline);
914 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
915 assert_eq!(2, counter.load(Ordering::SeqCst));
916
917 assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
918 }
919
920 #[test]
921 fn timer_fake_time() {
922 let mut exec = TestExecutor::new_with_fake_time();
923 exec.set_fake_time(MonotonicInstant::from_nanos(0));
924
925 let mut timer =
926 pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
927 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
928
929 exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
930 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
931 }
932
933 fn create_timers(
934 timers: &Arc<Timers<MonotonicInstant>>,
935 nanos: &[i64],
936 timer_futures: &mut Vec<Pin<Box<Timer>>>,
937 ) {
938 let waker = futures::task::noop_waker();
939 let mut cx = Context::from_waker(&waker);
940 for &n in nanos {
941 let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
942 let _ = timer.poll_unpin(&mut cx);
943 timer_futures.push(timer);
944 }
945 }
946
947 #[test]
948 fn timer_heap() {
949 let _exec = TestExecutor::new_with_fake_time();
950 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
951 timers.register(EHandle::local().inner());
952
953 let mut timer_futures = Vec::new();
954 let mut nanos: Vec<_> = (0..1000).collect();
955 let mut rng = rng();
956 nanos.shuffle(&mut rng);
957
958 create_timers(&timers, &nanos, &mut timer_futures);
959
960 for i in 0..1000 {
962 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
963 }
964
965 timer_futures.clear();
966 create_timers(&timers, &nanos, &mut timer_futures);
967
968 timer_futures.shuffle(&mut rng);
971 timer_futures.truncate(500);
972 let mut last_time = None;
973 for _ in 0..500 {
974 let time = timers.wake_next_timer().unwrap();
975 if let Some(last_time) = last_time {
976 assert!(last_time <= time);
977 }
978 last_time = Some(time);
979 }
980 assert_eq!(timers.wake_next_timer(), None);
981
982 timer_futures = vec![];
983 create_timers(&timers, &nanos, &mut timer_futures);
984
985 timer_futures.shuffle(&mut rng);
987 let mut nanos: Vec<_> = (1000..2000).collect();
988 nanos.shuffle(&mut rng);
989
990 for (fut, n) in timer_futures.iter_mut().zip(nanos) {
991 fut.as_mut().reset(MonotonicInstant::from_nanos(n));
992 }
993
994 for i in 1000..2000 {
996 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
997 }
998
999 timers.deregister();
1000 }
1001
1002 #[test]
1003 fn timer_heap_with_same_time() {
1004 let _exec = TestExecutor::new_with_fake_time();
1005 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1006 timers.register(EHandle::local().inner());
1007
1008 let mut timer_futures = Vec::new();
1009 let mut nanos: Vec<_> = (1..100).collect();
1010 let mut rng = rng();
1011 nanos.shuffle(&mut rng);
1012
1013 create_timers(&timers, &nanos, &mut timer_futures);
1014
1015 let time = rng.random_range(0..101);
1017 let same_time = [time; 100];
1018 create_timers(&timers, &same_time, &mut timer_futures);
1019
1020 nanos.extend(&same_time);
1021 nanos.sort();
1022
1023 for n in nanos {
1024 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1025 }
1026
1027 timers.deregister();
1028 }
1029
1030 #[test]
1031 fn timer_reset_to_earlier_time() {
1032 let mut exec = LocalExecutor::default();
1033
1034 for _ in 0..100 {
1035 let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1036 let (sender, receiver) = channel();
1037 let task = Task::spawn(async move {
1038 let mut timer = pin!(Timer::new(instant));
1039 let mut receiver = pin!(receiver.fuse());
1040 poll_fn(|cx| {
1041 loop {
1042 if timer.as_mut().poll_unpin(cx).is_ready() {
1043 return Poll::Ready(());
1044 }
1045 if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1046 timer
1047 .as_mut()
1048 .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1049 } else {
1050 return Poll::Pending;
1051 }
1052 }
1053 })
1054 .await;
1055 });
1056 sender.send(()).unwrap();
1057
1058 exec.run_singlethreaded(task);
1059
1060 if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1061 return;
1062 }
1063 }
1064
1065 panic!("Timer fired late in all 100 attempts");
1066 }
1067
1068 #[test]
1069 fn test_reset() {
1070 SendExecutorBuilder::new().num_threads(2).build().run(async {
1072 const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1073 let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1074 for _ in 0..10000 {
1075 let _ = futures::poll!(timer.as_mut());
1076 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1077 timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1078 timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1079 }
1080 });
1081 }
1082}