fuchsia_async/runtime/fuchsia/
timer.rs1use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU8, Ordering};
26use std::task::{Poll, Waker, ready};
27
28pub trait TimeInterface:
29 Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31 type Timeline: zx::Timeline + Send + Sync + 'static;
32
33 fn from_nanos(nanos: i64) -> Self;
34 fn into_nanos(self) -> i64;
35 fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36 fn now() -> i64;
37 fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41 type Timeline = zx::MonotonicTimeline;
42
43 fn from_nanos(nanos: i64) -> Self {
44 Self::from_nanos(nanos)
45 }
46
47 fn into_nanos(self) -> i64 {
48 self.into_nanos()
49 }
50
51 fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52 zx::MonotonicInstant::from_nanos(nanos)
53 }
54
55 fn now() -> i64 {
56 EHandle::local().inner().now().into_nanos()
57 }
58
59 fn create_timer() -> zx::Timer<Self::Timeline> {
60 zx::Timer::<Self::Timeline>::create()
61 }
62}
63
64impl TimeInterface for BootInstant {
65 type Timeline = zx::BootTimeline;
66
67 fn from_nanos(nanos: i64) -> Self {
68 Self::from_nanos(nanos)
69 }
70
71 fn into_nanos(self) -> i64 {
72 self.into_nanos()
73 }
74
75 fn zx_instant(nanos: i64) -> zx::BootInstant {
76 zx::BootInstant::from_nanos(nanos)
77 }
78
79 fn now() -> i64 {
80 EHandle::local().inner().boot_now().into_nanos()
81 }
82
83 fn create_timer() -> zx::Timer<Self::Timeline> {
84 zx::Timer::<Self::Timeline>::create()
85 }
86}
87
88impl WakeupTime for std::time::Instant {
89 fn into_timer(self) -> Timer {
90 let now_as_instant = std::time::Instant::now();
91 let now_as_time = MonotonicInstant::now();
92 EHandle::local()
93 .mono_timers()
94 .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95 }
96}
97
98impl WakeupTime for MonotonicInstant {
99 fn into_timer(self) -> Timer {
100 EHandle::local().mono_timers().new_timer(self)
101 }
102}
103
104impl WakeupTime for BootInstant {
105 fn into_timer(self) -> Timer {
106 EHandle::local().boot_timers().new_timer(self)
107 }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111 fn into_timer(self) -> Timer {
112 EHandle::local().mono_timers().new_timer(self.into())
113 }
114}
115
116impl WakeupTime for zx::BootInstant {
117 fn into_timer(self) -> Timer {
118 EHandle::local().boot_timers().new_timer(self.into())
119 }
120}
121
122#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127 pub fn new(time: impl WakeupTime) -> Self {
129 time.into_timer()
130 }
131
132 pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134 let nanos = time.into_nanos();
135 if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
146 || !self.0.timers.try_reset_timer(&self.0, nanos)
147 {
148 unsafe { *self.0.nanos.get() = nanos };
151 self.0.state.store(UNREGISTERED, Ordering::Relaxed);
152 }
153 }
154}
155
156impl fmt::Debug for Timer {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
158 f.debug_struct("Timer").field("time", &self.0.nanos).finish()
159 }
160}
161
162impl Drop for Timer {
163 fn drop(&mut self) {
164 self.0.timers.unregister(&self.0);
165 }
166}
167
168impl Future for Timer {
169 type Output = ();
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 unsafe { self.0.timers.poll(self.as_ref(), cx) }
173 }
174}
175
176struct TimerState {
177 timers: Arc<dyn TimersInterface>,
178
179 nanos: UnsafeCell<i64>,
181
182 waker: AtomicWaker,
183 state: AtomicU8,
184
185 index: UnsafeCell<HeapIndex>,
188
189 _pinned: PhantomPinned,
191}
192
193unsafe impl Send for TimerState {}
195unsafe impl Sync for TimerState {}
196
197const UNREGISTERED: u8 = 0;
199
200const REGISTERED: u8 = 1;
202
203const FIRED: u8 = 2;
205
206const TERMINATED: u8 = 3;
208
209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211struct HeapIndex(usize);
212
213impl HeapIndex {
214 const NULL: HeapIndex = HeapIndex(usize::MAX);
215
216 fn get(&self) -> Option<usize> {
217 if *self == HeapIndex::NULL { None } else { Some(self.0) }
218 }
219}
220
221impl From<usize> for HeapIndex {
222 fn from(value: usize) -> Self {
223 Self(value)
224 }
225}
226
227impl FusedFuture for Timer {
228 fn is_terminated(&self) -> bool {
229 self.0.state.load(Ordering::Relaxed) == TERMINATED
230 }
231}
232
233#[derive(Copy, Clone, Debug)]
244struct StateRef(*const TimerState);
245
246unsafe impl Send for StateRef {}
248unsafe impl Sync for StateRef {}
249
250impl StateRef {
251 fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
252 unsafe {
254 (*self.0).state.store(FIRED, Ordering::Relaxed);
261 (*self.0).waker.take()
262 }
263 }
264
265 unsafe fn nanos(&self) -> i64 {
269 unsafe { *(*self.0).nanos.get() }
270 }
271
272 unsafe fn nanos_mut(&mut self) -> &mut i64 {
276 unsafe { &mut *(*self.0).nanos.get() }
277 }
278
279 unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
283 std::mem::replace(unsafe { &mut *(*self.0).index.get() }, index)
284 }
285}
286
287#[derive(Debug)]
294#[must_use = "streams do nothing unless polled"]
295pub struct Interval {
296 timer: Pin<Box<Timer>>,
297 next: MonotonicInstant,
298 duration: zx::MonotonicDuration,
299}
300
301impl Interval {
302 pub fn new(duration: zx::MonotonicDuration) -> Self {
304 let next = MonotonicInstant::after(duration);
305 Interval { timer: Box::pin(Timer::new(next)), next, duration }
306 }
307}
308
309impl FusedStream for Interval {
310 fn is_terminated(&self) -> bool {
311 false
313 }
314}
315
316impl Stream for Interval {
317 type Item = ();
318 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
319 ready!(self.timer.poll_unpin(cx));
320 let next = self.next + self.duration;
321 self.timer.as_mut().reset(next);
322 self.next = next;
323 Poll::Ready(Some(()))
324 }
325}
326
327pub(crate) struct Timers<T: TimeInterface> {
328 inner: Mutex<Inner>,
329
330 fake: bool,
331
332 timer: zx::Timer<T::Timeline>,
333
334 receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
336}
337
338struct Inner {
339 timers: Heap,
341
342 last_deadline: Option<i64>,
345
346 async_wait: bool,
348
349 port_key: u64,
351}
352
353impl<T: TimeInterface> Timers<T> {
354 pub fn new(fake: bool) -> Self {
355 Self {
356 inner: Mutex::new(Inner {
357 timers: Heap::default(),
358 last_deadline: None,
359 async_wait: false,
360 port_key: 0,
361 }),
362 fake,
363 timer: T::create_timer(),
364 receiver_registration: Mutex::default(),
365 }
366 }
367
368 pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
369 let nanos = time.into_nanos();
370 Timer(TimerState {
371 timers: self.clone(),
372 nanos: UnsafeCell::new(nanos),
373 waker: AtomicWaker::new(),
374 state: AtomicU8::new(UNREGISTERED),
375 index: UnsafeCell::new(HeapIndex::NULL),
376 _pinned: PhantomPinned,
377 })
378 }
379
380 pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
383 let key = self
384 .receiver_registration
385 .lock()
386 .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
387 .key();
388 self.inner.lock().port_key = key;
389 }
390
391 pub fn deregister(&self) {
393 *self.receiver_registration.lock() = None;
394 }
395
396 fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
403 let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
406
407 if new_deadline != inner.last_deadline {
410 inner.last_deadline = new_deadline;
411 match inner.last_deadline {
412 Some(deadline) => {
413 self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
414 }
415 None => self.timer.cancel().unwrap(),
416 }
417 }
418
419 if from_receive_packet {
423 inner.async_wait = false;
424 }
425
426 if inner.last_deadline.is_some() && !inner.async_wait {
429 if self.fake {
430 self.timer.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
433 }
434
435 self.timer
436 .wait_async(
437 EHandle::local().port(),
438 inner.port_key,
439 if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
440 zx::WaitAsyncOpts::empty(),
441 )
442 .unwrap();
443
444 inner.async_wait = true;
445 }
446 }
447
448 pub fn wake_timers(&self) -> bool {
450 self.wake_timers_impl(false)
451 }
452
453 fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
454 let now = T::now();
455 let mut timers_woken = false;
456
457 loop {
458 let waker = {
459 let mut inner = self.inner.lock();
460
461 let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
463 if deadline.is_some_and(|d| d <= now) {
464 let timer = inner.timers.pop().unwrap();
465 timer.into_waker(&mut inner)
466 } else {
467 self.setup_zircon_timer(&mut inner, from_receive_packet);
480 break;
481 }
482 };
483 if let Some(waker) = waker {
484 waker.wake()
485 }
486 timers_woken = true;
487 }
488 timers_woken
489 }
490
491 pub fn wake_next_timer(&self) -> Option<T> {
493 let (nanos, waker) = {
494 let mut inner = self.inner.lock();
495 let timer = inner.timers.pop()?;
496 let nanos = unsafe { timer.nanos() };
498 (nanos, timer.into_waker(&mut inner))
499 };
500 if let Some(waker) = waker {
501 waker.wake();
502 }
503 Some(T::from_nanos(nanos))
504 }
505
506 pub fn next_timer(&self) -> Option<T> {
508 self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
510 }
511
512 pub fn maybe_notify(&self, now: T) {
518 assert!(self.fake, "calling this function requires using fake time.");
519 if self
521 .inner
522 .lock()
523 .timers
524 .peek()
525 .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
526 {
527 self.timer.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
528 }
529 }
530}
531
532impl<T: TimeInterface> PacketReceiver for Timers<T> {
533 fn receive_packet(&self, _packet: zx::Packet) {
534 self.wake_timers_impl(true);
535 }
536}
537
538trait TimersInterface: Send + Sync + 'static {
540 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
541 fn unregister(&self, state: &TimerState);
542 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
543}
544
545impl<T: TimeInterface> TimersInterface for Timers<T> {
546 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
550 let state = timer.0.state.load(Ordering::Relaxed);
561
562 if state == TERMINATED {
563 return Poll::Ready(());
564 }
565
566 if state == FIRED {
567 timer.0.state.store(TERMINATED, Ordering::Relaxed);
568 return Poll::Ready(());
569 }
570
571 if state == UNREGISTERED {
572 let nanos = unsafe { *timer.0.nanos.get() };
574 if nanos <= T::now() {
575 timer.0.state.store(FIRED, Ordering::Relaxed);
576 return Poll::Ready(());
577 }
578 let mut inner = self.inner.lock();
579
580 inner.timers.push(StateRef(&timer.0));
583
584 self.setup_zircon_timer(&mut inner, false);
587
588 timer.0.state.store(REGISTERED, Ordering::Relaxed);
589 }
590
591 timer.0.waker.register(cx.waker());
592
593 let state = timer.0.state.load(Ordering::Relaxed);
601 match state {
602 FIRED => {
603 timer.0.state.store(TERMINATED, Ordering::Relaxed);
604 Poll::Ready(())
605 }
606 REGISTERED => Poll::Pending,
607 _ => {
611 unreachable!();
612 }
613 }
614 }
615
616 fn unregister(&self, timer: &TimerState) {
617 if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
618 return;
625 }
626 let mut inner = self.inner.lock();
627 let index = unsafe { *timer.index.get() };
629 if let Some(index) = index.get() {
630 inner.timers.remove(index);
631 if index == 0 {
632 self.setup_zircon_timer(&mut inner, false);
635 }
636 timer.state.store(UNREGISTERED, Ordering::Relaxed);
637 }
638 }
639
640 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
642 let mut inner = self.inner.lock();
643 let index = unsafe { *timer.index.get() };
645 if let Some(old_index) = index.get() {
646 if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
647 self.setup_zircon_timer(&mut inner, false);
650 }
651 timer.state.store(REGISTERED, Ordering::Relaxed);
652 true
653 } else {
654 false
655 }
656 }
657}
658
659#[derive(Default)]
660struct Heap(Vec<StateRef>);
661
662impl Heap {
665 fn push(&mut self, mut timer: StateRef) {
666 let index = self.0.len();
667 self.0.push(timer);
668 unsafe {
670 timer.set_index(index.into());
671 }
672 self.fix_up(index);
673 }
674
675 fn peek(&self) -> Option<&StateRef> {
676 self.0.first()
677 }
678
679 fn pop(&mut self) -> Option<StateRef> {
680 if let Some(&first) = self.0.first() {
681 self.remove(0);
682 Some(first)
683 } else {
684 None
685 }
686 }
687
688 fn swap(&mut self, a: usize, b: usize) {
689 self.0.swap(a, b);
690 unsafe {
692 self.0[a].set_index(a.into());
693 self.0[b].set_index(b.into());
694 }
695 }
696
697 fn reset(&mut self, index: usize, nanos: i64) -> usize {
699 if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
701 self.fix_up(index)
702 } else {
703 self.fix_down(index)
704 }
705 }
706
707 fn remove(&mut self, index: usize) {
708 unsafe {
710 let old_index = self.0[index].set_index(HeapIndex::NULL);
711 debug_assert_eq!(old_index, index.into());
712 }
713
714 let last = self.0.len() - 1;
717 if index < last {
718 let fix_up;
719 unsafe {
720 fix_up = self.0[last].nanos() < self.0[index].nanos();
722 self.0[index] = self.0[last];
723 self.0[index].set_index(index.into());
724 };
725 self.0.truncate(last);
726 if fix_up {
727 self.fix_up(index);
728 } else {
729 self.fix_down(index);
730 }
731 } else {
732 self.0.truncate(last);
733 };
734 }
735
736 fn fix_up(&mut self, mut index: usize) -> usize {
738 while index > 0 {
739 let parent = (index - 1) / 2;
740 if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
742 return index;
743 }
744 self.swap(parent, index);
745 index = parent;
746 }
747 index
748 }
749
750 fn fix_down(&mut self, mut index: usize) -> usize {
752 let len = self.0.len();
753 loop {
754 let left = index * 2 + 1;
755 if left >= len {
756 return index;
757 }
758
759 let mut swap_with = None;
760
761 unsafe {
763 let mut nanos = self.0[index].nanos();
764 let left_nanos = self.0[left].nanos();
765 if left_nanos < nanos {
766 swap_with = Some(left);
767 nanos = left_nanos;
768 }
769 let right = left + 1;
770 if right < len && self.0[right].nanos() < nanos {
771 swap_with = Some(right);
772 }
773 }
774
775 let Some(swap_with) = swap_with else { return index };
776 self.swap(index, swap_with);
777 index = swap_with;
778 }
779 }
780}
781
782#[cfg(test)]
783mod test {
784 use super::*;
785 use crate::{LocalExecutor, SendExecutorBuilder, Task, TestExecutor};
786 use assert_matches::assert_matches;
787 use futures::channel::oneshot::channel;
788 use futures::future::Either;
789 use futures::prelude::*;
790 use rand::seq::SliceRandom;
791 use rand::{Rng, rng};
792 use std::future::poll_fn;
793 use std::pin::pin;
794 use zx::MonotonicDuration;
795
796 trait TestTimeInterface:
797 TimeInterface
798 + WakeupTime
799 + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
800 + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
801 {
802 fn after(duration: zx::Duration<Self::Timeline>) -> Self;
803 }
804
805 impl TestTimeInterface for MonotonicInstant {
806 fn after(duration: zx::MonotonicDuration) -> Self {
807 Self::after(duration)
808 }
809 }
810
811 impl TestTimeInterface for BootInstant {
812 fn after(duration: zx::BootDuration) -> Self {
813 Self::after(duration)
814 }
815 }
816
817 fn test_shorter_fires_first<T: TestTimeInterface>() {
818 let mut exec = LocalExecutor::default();
819 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
820 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
821 match exec.run_singlethreaded(future::select(shorter, longer)) {
822 Either::Left(_) => {}
823 Either::Right(_) => panic!("wrong timer fired"),
824 }
825 }
826
827 #[test]
828 fn shorter_fires_first() {
829 test_shorter_fires_first::<MonotonicInstant>();
830 test_shorter_fires_first::<BootInstant>();
831 }
832
833 fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
834 SendExecutorBuilder::new().num_threads(4).build().run(async {
835 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
836 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
837 match future::select(shorter, longer).await {
838 Either::Left(_) => {}
839 Either::Right(_) => panic!("wrong timer fired"),
840 }
841 });
842 }
843
844 #[test]
845 fn shorter_fires_first_multithreaded() {
846 test_shorter_fires_first_multithreaded::<MonotonicInstant>();
847 test_shorter_fires_first_multithreaded::<BootInstant>();
848 }
849
850 fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
851 let mut exec = TestExecutor::new();
852 let now = T::now();
853 let before = pin!(Timer::new(T::from_nanos(now - 1)));
854 let after = pin!(Timer::new(T::from_nanos(now + 1)));
855 assert_matches!(
856 exec.run_singlethreaded(futures::future::select(before, after)),
857 Either::Left(_),
858 "Timer in the past should fire first"
859 );
860 }
861
862 #[test]
863 fn timer_before_now_fires_immediately() {
864 test_timer_before_now_fires_immediately::<MonotonicInstant>();
865 test_timer_before_now_fires_immediately::<BootInstant>();
866 }
867
868 #[test]
869 fn fires_after_timeout() {
870 let mut exec = TestExecutor::new_with_fake_time();
871 exec.set_fake_time(MonotonicInstant::from_nanos(0));
872 let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
873 let mut future = pin!(Timer::new(deadline));
874 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
875 exec.set_fake_time(deadline);
876 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
877 }
878
879 #[test]
880 fn interval() {
881 let mut exec = TestExecutor::new_with_fake_time();
882 let start = MonotonicInstant::from_nanos(0);
883 exec.set_fake_time(start);
884
885 let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
886 let mut future = pin!({
887 let counter = counter.clone();
888 Interval::new(MonotonicDuration::from_seconds(5))
889 .map(move |()| {
890 counter.fetch_add(1, Ordering::SeqCst);
891 })
892 .collect::<()>()
893 });
894
895 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
897 assert_eq!(0, counter.load(Ordering::SeqCst));
898
899 let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
901 assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
902 exec.set_fake_time(first_deadline);
903 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
904 assert_eq!(1, counter.load(Ordering::SeqCst));
905
906 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
908 assert_eq!(1, counter.load(Ordering::SeqCst));
909
910 let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
912 exec.set_fake_time(second_deadline);
913 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
914 assert_eq!(2, counter.load(Ordering::SeqCst));
915
916 assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
917 }
918
919 #[test]
920 fn timer_fake_time() {
921 let mut exec = TestExecutor::new_with_fake_time();
922 exec.set_fake_time(MonotonicInstant::from_nanos(0));
923
924 let mut timer =
925 pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
926 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
927
928 exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
929 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
930 }
931
932 fn create_timers(
933 timers: &Arc<Timers<MonotonicInstant>>,
934 nanos: &[i64],
935 timer_futures: &mut Vec<Pin<Box<Timer>>>,
936 ) {
937 let waker = futures::task::noop_waker();
938 let mut cx = Context::from_waker(&waker);
939 for &n in nanos {
940 let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
941 let _ = timer.poll_unpin(&mut cx);
942 timer_futures.push(timer);
943 }
944 }
945
946 #[test]
947 fn timer_heap() {
948 let _exec = TestExecutor::new_with_fake_time();
949 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
950 timers.register(EHandle::local().inner());
951
952 let mut timer_futures = Vec::new();
953 let mut nanos: Vec<_> = (0..1000).collect();
954 let mut rng = rng();
955 nanos.shuffle(&mut rng);
956
957 create_timers(&timers, &nanos, &mut timer_futures);
958
959 for i in 0..1000 {
961 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
962 }
963
964 timer_futures.clear();
965 create_timers(&timers, &nanos, &mut timer_futures);
966
967 timer_futures.shuffle(&mut rng);
970 timer_futures.truncate(500);
971 let mut last_time = None;
972 for _ in 0..500 {
973 let time = timers.wake_next_timer().unwrap();
974 if let Some(last_time) = last_time {
975 assert!(last_time <= time);
976 }
977 last_time = Some(time);
978 }
979 assert_eq!(timers.wake_next_timer(), None);
980
981 timer_futures = vec![];
982 create_timers(&timers, &nanos, &mut timer_futures);
983
984 timer_futures.shuffle(&mut rng);
986 let mut nanos: Vec<_> = (1000..2000).collect();
987 nanos.shuffle(&mut rng);
988
989 for (fut, n) in timer_futures.iter_mut().zip(nanos) {
990 fut.as_mut().reset(MonotonicInstant::from_nanos(n));
991 }
992
993 for i in 1000..2000 {
995 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
996 }
997
998 timers.deregister();
999 }
1000
1001 #[test]
1002 fn timer_heap_with_same_time() {
1003 let _exec = TestExecutor::new_with_fake_time();
1004 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1005 timers.register(EHandle::local().inner());
1006
1007 let mut timer_futures = Vec::new();
1008 let mut nanos: Vec<_> = (1..100).collect();
1009 let mut rng = rng();
1010 nanos.shuffle(&mut rng);
1011
1012 create_timers(&timers, &nanos, &mut timer_futures);
1013
1014 let time = rng.random_range(0..101);
1016 let same_time = [time; 100];
1017 create_timers(&timers, &same_time, &mut timer_futures);
1018
1019 nanos.extend(&same_time);
1020 nanos.sort();
1021
1022 for n in nanos {
1023 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1024 }
1025
1026 timers.deregister();
1027 }
1028
1029 #[test]
1030 fn timer_reset_to_earlier_time() {
1031 let mut exec = LocalExecutor::default();
1032
1033 for _ in 0..100 {
1034 let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1035 let (sender, receiver) = channel();
1036 let task = Task::spawn(async move {
1037 let mut timer = pin!(Timer::new(instant));
1038 let mut receiver = pin!(receiver.fuse());
1039 poll_fn(|cx| {
1040 loop {
1041 if timer.as_mut().poll_unpin(cx).is_ready() {
1042 return Poll::Ready(());
1043 }
1044 if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1045 timer
1046 .as_mut()
1047 .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1048 } else {
1049 return Poll::Pending;
1050 }
1051 }
1052 })
1053 .await;
1054 });
1055 sender.send(()).unwrap();
1056
1057 exec.run_singlethreaded(task);
1058
1059 if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1060 return;
1061 }
1062 }
1063
1064 panic!("Timer fired late in all 100 attempts");
1065 }
1066
1067 #[test]
1068 fn test_reset() {
1069 SendExecutorBuilder::new().num_threads(2).build().run(async {
1071 const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1072 let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1073 for _ in 0..10000 {
1074 let _ = futures::poll!(timer.as_mut());
1075 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1076 timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1077 timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1078 }
1079 });
1080 }
1081}