fuchsia_async/runtime/fuchsia/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for creating futures that represent timers.
6//!
7//! This module contains the `Timer` type which is a future that will resolve
8//! at a particular point in the future.
9
10use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU8, Ordering};
26use std::task::{Poll, Waker, ready};
27
28pub trait TimeInterface:
29    Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31    type Timeline: zx::Timeline + Send + Sync + 'static;
32
33    fn from_nanos(nanos: i64) -> Self;
34    fn into_nanos(self) -> i64;
35    fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36    fn now() -> i64;
37    fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41    type Timeline = zx::MonotonicTimeline;
42
43    fn from_nanos(nanos: i64) -> Self {
44        Self::from_nanos(nanos)
45    }
46
47    fn into_nanos(self) -> i64 {
48        self.into_nanos()
49    }
50
51    fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52        zx::MonotonicInstant::from_nanos(nanos)
53    }
54
55    fn now() -> i64 {
56        EHandle::local().inner().now().into_nanos()
57    }
58
59    fn create_timer() -> zx::Timer<Self::Timeline> {
60        zx::Timer::<Self::Timeline>::create()
61    }
62}
63
64impl TimeInterface for BootInstant {
65    type Timeline = zx::BootTimeline;
66
67    fn from_nanos(nanos: i64) -> Self {
68        Self::from_nanos(nanos)
69    }
70
71    fn into_nanos(self) -> i64 {
72        self.into_nanos()
73    }
74
75    fn zx_instant(nanos: i64) -> zx::BootInstant {
76        zx::BootInstant::from_nanos(nanos)
77    }
78
79    fn now() -> i64 {
80        EHandle::local().inner().boot_now().into_nanos()
81    }
82
83    fn create_timer() -> zx::Timer<Self::Timeline> {
84        zx::Timer::<Self::Timeline>::create()
85    }
86}
87
88impl WakeupTime for std::time::Instant {
89    fn into_timer(self) -> Timer {
90        let now_as_instant = std::time::Instant::now();
91        let now_as_time = MonotonicInstant::now();
92        EHandle::local()
93            .mono_timers()
94            .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95    }
96}
97
98impl WakeupTime for MonotonicInstant {
99    fn into_timer(self) -> Timer {
100        EHandle::local().mono_timers().new_timer(self)
101    }
102}
103
104impl WakeupTime for BootInstant {
105    fn into_timer(self) -> Timer {
106        EHandle::local().boot_timers().new_timer(self)
107    }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111    fn into_timer(self) -> Timer {
112        EHandle::local().mono_timers().new_timer(self.into())
113    }
114}
115
116impl WakeupTime for zx::BootInstant {
117    fn into_timer(self) -> Timer {
118        EHandle::local().boot_timers().new_timer(self.into())
119    }
120}
121
122/// An asynchronous timer.
123#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127    /// Create a new timer scheduled to fire at `time`.
128    pub fn new(time: impl WakeupTime) -> Self {
129        time.into_timer()
130    }
131
132    /// Reset the `Timer` to a fire at a new time.
133    pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134        let nanos = time.into_nanos();
135        // If in the UNREGISTERED state, we can skip the call to `try_reset_timer` because the timer
136        // has *never* been registered and so there's no danger of another thread having access to
137        // this timer. In all other states, including the FIRED and TERMINATED states, we must use
138        // `try_reset_timer` as that will take a lock and guarantee that other threads are not
139        // concurrently accessing the timer.
140        //
141        // This can be Relaxed because because there are no loads or stores that follow that could
142        // possibly be reordered before here that matter: the first thing `try_reset_timer` does is
143        // take a lock which will have its own memory barriers, and the store to the time is next
144        // going to be read by this same task prior to taking the lock in `Timers::inner`.
145        if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
146            || !self.0.timers.try_reset_timer(&self.0, nanos)
147        {
148            // SAFETY: This is safe because we know the timer isn't registered which means we truly
149            // have exclusive access to TimerState.
150            unsafe { *self.0.nanos.get() = nanos };
151            self.0.state.store(UNREGISTERED, Ordering::Relaxed);
152        }
153    }
154}
155
156impl fmt::Debug for Timer {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
158        f.debug_struct("Timer").field("time", &self.0.nanos).finish()
159    }
160}
161
162impl Drop for Timer {
163    fn drop(&mut self) {
164        self.0.timers.unregister(&self.0);
165    }
166}
167
168impl Future for Timer {
169    type Output = ();
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        // SAFETY: We call `unregister` when `Timer` is dropped.
172        unsafe { self.0.timers.poll(self.as_ref(), cx) }
173    }
174}
175
176struct TimerState {
177    timers: Arc<dyn TimersInterface>,
178
179    // This is safe to access/mutate if the lock on `Timers::inner` is held.
180    nanos: UnsafeCell<i64>,
181
182    waker: AtomicWaker,
183    state: AtomicU8,
184
185    // Holds the index in the heap.  This is safe to access/mutate if the lock on `Timers::inner` is
186    // held.
187    index: UnsafeCell<HeapIndex>,
188
189    // StateRef stores a pointer to `TimerState`, so this must be pinned.
190    _pinned: PhantomPinned,
191}
192
193// SAFETY: TimerState is thread-safe.  See the safety comments elsewhere.
194unsafe impl Send for TimerState {}
195unsafe impl Sync for TimerState {}
196
197// Set when the timer is not registered in the heap.
198const UNREGISTERED: u8 = 0;
199
200// Set when the timer is in the heap.
201const REGISTERED: u8 = 1;
202
203// Set when the timer has fired.
204const FIRED: u8 = 2;
205
206// Set when the timer is terminated.
207const TERMINATED: u8 = 3;
208
209/// An index in the heap.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211struct HeapIndex(usize);
212
213impl HeapIndex {
214    const NULL: HeapIndex = HeapIndex(usize::MAX);
215
216    fn get(&self) -> Option<usize> {
217        if *self == HeapIndex::NULL { None } else { Some(self.0) }
218    }
219}
220
221impl From<usize> for HeapIndex {
222    fn from(value: usize) -> Self {
223        Self(value)
224    }
225}
226
227impl FusedFuture for Timer {
228    fn is_terminated(&self) -> bool {
229        self.0.state.load(Ordering::Relaxed) == TERMINATED
230    }
231}
232
233// A note on safety:
234//
235//  1. We remove the timer from the heap before we drop TimerState, and TimerState is pinned, so
236//     it's safe to store pointers in the heap i.e. the pointers are live since we make sure we
237//     remove them before dropping `TimerState`.
238//
239//  2. Provided we do #1, it is always safe to access the atomic fields of TimerState.
240//
241//  3. Once the timer has been registered, it is safe to access the non-atomic fields of TimerState
242//     whilst holding the lock on `Timers::inner`.
243#[derive(Copy, Clone, Debug)]
244struct StateRef(*const TimerState);
245
246// SAFETY: See the notes above regarding safety.
247unsafe impl Send for StateRef {}
248unsafe impl Sync for StateRef {}
249
250impl StateRef {
251    fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
252        // SAFETY: `inner` is locked.
253        unsafe {
254            // As soon as we set the state to FIRED, the heap no longer owns the timer and it might
255            // be re-registered.  This store is safe to be Relaxed because `AtomicWaker::take` has a
256            // Release barrier, so the store can't be reordered after it, and therefore we can be
257            // certain that another thread which re-registers the waker will see the state is FIRED
258            // (and will interpret that as meaning that the task should not block and instead
259            // immediately complete; see `Timers::poll`).
260            (*self.0).state.store(FIRED, Ordering::Relaxed);
261            (*self.0).waker.take()
262        }
263    }
264
265    // # Safety
266    //
267    // `Timers::inner` must be locked.
268    unsafe fn nanos(&self) -> i64 {
269        unsafe { *(*self.0).nanos.get() }
270    }
271
272    // # Safety
273    //
274    // `Timers::inner` must be locked.
275    unsafe fn nanos_mut(&mut self) -> &mut i64 {
276        unsafe { &mut *(*self.0).nanos.get() }
277    }
278
279    // # Safety
280    //
281    // `Timers::inner` must be locked.
282    unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
283        std::mem::replace(unsafe { &mut *(*self.0).index.get() }, index)
284    }
285}
286
287/// An asynchronous interval timer.
288///
289/// This is a stream of events resolving at a rate of once-per interval.  This generates an event
290/// for *every* elapsed duration, even if multiple have elapsed since last polled.
291///
292/// TODO(https://fxbug.dev/375632319): This is lack of BootInstant support.
293#[derive(Debug)]
294#[must_use = "streams do nothing unless polled"]
295pub struct Interval {
296    timer: Pin<Box<Timer>>,
297    next: MonotonicInstant,
298    duration: zx::MonotonicDuration,
299}
300
301impl Interval {
302    /// Create a new `Interval` which yields every `duration`.
303    pub fn new(duration: zx::MonotonicDuration) -> Self {
304        let next = MonotonicInstant::after(duration);
305        Interval { timer: Box::pin(Timer::new(next)), next, duration }
306    }
307}
308
309impl FusedStream for Interval {
310    fn is_terminated(&self) -> bool {
311        // `Interval` never yields `None`
312        false
313    }
314}
315
316impl Stream for Interval {
317    type Item = ();
318    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
319        ready!(self.timer.poll_unpin(cx));
320        let next = self.next + self.duration;
321        self.timer.as_mut().reset(next);
322        self.next = next;
323        Poll::Ready(Some(()))
324    }
325}
326
327pub(crate) struct Timers<T: TimeInterface> {
328    inner: Mutex<Inner>,
329
330    fake: bool,
331
332    timer: zx::Timer<T::Timeline>,
333
334    // This will form a reference cycle which the caller *must* break by calling `deregister`.
335    receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
336}
337
338struct Inner {
339    // The queue of active timer objects.
340    timers: Heap,
341
342    // The last deadline we set on the zircon timer, or None if the queue was empty and the timer
343    // was canceled most recently.
344    last_deadline: Option<i64>,
345
346    // True if there's a pending async_wait.
347    async_wait: bool,
348
349    // The port key.
350    port_key: u64,
351}
352
353impl<T: TimeInterface> Timers<T> {
354    pub fn new(fake: bool) -> Self {
355        Self {
356            inner: Mutex::new(Inner {
357                timers: Heap::default(),
358                last_deadline: None,
359                async_wait: false,
360                port_key: 0,
361            }),
362            fake,
363            timer: T::create_timer(),
364            receiver_registration: Mutex::default(),
365        }
366    }
367
368    pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
369        let nanos = time.into_nanos();
370        Timer(TimerState {
371            timers: self.clone(),
372            nanos: UnsafeCell::new(nanos),
373            waker: AtomicWaker::new(),
374            state: AtomicU8::new(UNREGISTERED),
375            index: UnsafeCell::new(HeapIndex::NULL),
376            _pinned: PhantomPinned,
377        })
378    }
379
380    /// Registers the timers to receive packets.  This will establish a reference cycle that
381    /// the caller must break by calling `deregister`.
382    pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
383        let key = self
384            .receiver_registration
385            .lock()
386            .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
387            .key();
388        self.inner.lock().port_key = key;
389    }
390
391    /// Deregisters the timers and breaks the reference cycle.
392    pub fn deregister(&self) {
393        *self.receiver_registration.lock() = None;
394    }
395
396    /// Ensures the underlying Zircon timer has been correctly set or canceled after
397    /// the queue has been updated.
398    ///
399    /// # Safety
400    ///
401    /// Callers must ensure that `self.inner` is locked before calling this method.
402    fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
403        // Our new deadline is the deadline for the front of the queue, or no deadline (infinite) if
404        // there is no front of the queue.
405        let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
406
407        // If the effective deadline of the queue has changed, reprogram the zircon timer's
408        // deadline.
409        if new_deadline != inner.last_deadline {
410            inner.last_deadline = new_deadline;
411            match inner.last_deadline {
412                Some(deadline) => {
413                    self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
414                }
415                None => self.timer.cancel().unwrap(),
416            }
417        }
418
419        // If this is being called while processing the timer packet from a previous async wait,
420        // then clear the async wait flag.  This is the very last thing we need to do, so this async
421        // wait operation is effectively over.
422        if from_receive_packet {
423            inner.async_wait = false;
424        }
425
426        // If we have a valid timeout, but we have no in-flight async wait operation, post a new
427        // one.
428        if inner.last_deadline.is_some() && !inner.async_wait {
429            if self.fake {
430                // Clear the signal used for fake timers so that we can use it to trigger
431                // next time.
432                self.timer.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
433            }
434
435            self.timer
436                .wait_async(
437                    EHandle::local().port(),
438                    inner.port_key,
439                    if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
440                    zx::WaitAsyncOpts::empty(),
441                )
442                .unwrap();
443
444            inner.async_wait = true;
445        }
446    }
447
448    /// Wakes timers that should be firing now.  Returns true if any timers were woken.
449    pub fn wake_timers(&self) -> bool {
450        self.wake_timers_impl(false)
451    }
452
453    fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
454        let now = T::now();
455        let mut timers_woken = false;
456
457        loop {
458            let waker = {
459                let mut inner = self.inner.lock();
460
461                // SAFETY: `inner` is locked.
462                let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
463                if deadline.is_some_and(|d| d <= now) {
464                    let timer = inner.timers.pop().unwrap();
465                    timer.into_waker(&mut inner)
466                } else {
467                    // We are now finished (one way or the other). Setup the underlying Zircon timer
468                    // to reflect the new state of the queue, and break out of the loop.
469                    //
470                    // When processing a timer packet (from_receive_packet is true), it is very
471                    // important that this always be the last thing we do before breaking out of the
472                    // loop (dropping the lock in the process) for good.
473                    //
474                    // Failing to do this at the end can lead to the timer queue stalling.  Doing it
475                    // early (when we are dispatching expired timers) can lead to an ever
476                    // multiplying army of posted async waits.
477                    //
478                    // See https://g-issues.fuchsia.dev/issues/396173066 for details.
479                    self.setup_zircon_timer(&mut inner, from_receive_packet);
480                    break;
481                }
482            };
483            if let Some(waker) = waker {
484                waker.wake()
485            }
486            timers_woken = true;
487        }
488        timers_woken
489    }
490
491    /// Wakes the next timer and returns its time.
492    pub fn wake_next_timer(&self) -> Option<T> {
493        let (nanos, waker) = {
494            let mut inner = self.inner.lock();
495            let timer = inner.timers.pop()?;
496            // SAFETY: `inner` is locked.
497            let nanos = unsafe { timer.nanos() };
498            (nanos, timer.into_waker(&mut inner))
499        };
500        if let Some(waker) = waker {
501            waker.wake();
502        }
503        Some(T::from_nanos(nanos))
504    }
505
506    /// Returns the next timer due to expire.
507    pub fn next_timer(&self) -> Option<T> {
508        // SAFETY: `inner` is locked.
509        self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
510    }
511
512    /// If there's a timer ready, sends a notification to wake up the receiver.
513    ///
514    /// # Panics
515    ///
516    /// This will panic if we are not using fake time.
517    pub fn maybe_notify(&self, now: T) {
518        assert!(self.fake, "calling this function requires using fake time.");
519        // SAFETY: `inner` is locked.
520        if self
521            .inner
522            .lock()
523            .timers
524            .peek()
525            .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
526        {
527            self.timer.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
528        }
529    }
530}
531
532impl<T: TimeInterface> PacketReceiver for Timers<T> {
533    fn receive_packet(&self, _packet: zx::Packet) {
534        self.wake_timers_impl(true);
535    }
536}
537
538// See comments on the implementation below.
539trait TimersInterface: Send + Sync + 'static {
540    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
541    fn unregister(&self, state: &TimerState);
542    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
543}
544
545impl<T: TimeInterface> TimersInterface for Timers<T> {
546    // # Safety
547    //
548    // `unregister` must be called before `Timer` is dropped.
549    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
550        // See https://docs.rs/futures/0.3.5/futures/task/struct.AtomicWaker.html
551        // for more information.
552        // Quick check to avoid registration if already done.
553        //
554        // This is safe to be Relaxed because `AtomicWaker::register` has barriers which means that
555        // the load further down can't be moved before registering the waker, which means we can't
556        // miss the timer firing.  If the timer isn't registered, the time might have been reset but
557        // that would have been by the same task, so there should be no ordering issue there.  If we
558        // then try and register the timer, we take the lock on `inner` so there will be barriers
559        // there.
560        let state = timer.0.state.load(Ordering::Relaxed);
561
562        if state == TERMINATED {
563            return Poll::Ready(());
564        }
565
566        if state == FIRED {
567            timer.0.state.store(TERMINATED, Ordering::Relaxed);
568            return Poll::Ready(());
569        }
570
571        if state == UNREGISTERED {
572            // SAFETY: The state is UNREGISTERED, so we have exclusive access.
573            let nanos = unsafe { *timer.0.nanos.get() };
574            if nanos <= T::now() {
575                timer.0.state.store(FIRED, Ordering::Relaxed);
576                return Poll::Ready(());
577            }
578            let mut inner = self.inner.lock();
579
580            // We store a pointer to `timer` here. This is safe to do because `timer` is pinned, and
581            // we always make sure we call `unregister` before `timer` is dropped.
582            inner.timers.push(StateRef(&timer.0));
583
584            // Now that we have added a new timer to the queue, setup the
585            // underlying zircon timer to reflect the new state of the queue.
586            self.setup_zircon_timer(&mut inner, false);
587
588            timer.0.state.store(REGISTERED, Ordering::Relaxed);
589        }
590
591        timer.0.waker.register(cx.waker());
592
593        // Now that we've registered a waker, we need to check to see if the timer has been marked
594        // as FIRED by another thread in the meantime (e.g. in StateRef::into_waker).  In that case
595        // the timer is never going to fire again as it is no longer managed by the timer heap, so
596        // the timer's task would become Pending but nothing would wake it up later.
597        // Loading the state *must* happen after the above `AtomicWaker::register` (which
598        // establishes an Acquire barrier, preventing the below load from being reordered above it),
599        // or else we could racily hit the above scenario.
600        let state = timer.0.state.load(Ordering::Relaxed);
601        match state {
602            FIRED => {
603                timer.0.state.store(TERMINATED, Ordering::Relaxed);
604                Poll::Ready(())
605            }
606            REGISTERED => Poll::Pending,
607            // TERMINATED is only set in `poll` which has exclusive access to the task (&mut
608            // Context).
609            // UNREGISTERED would indicate a logic bug somewhere.
610            _ => {
611                unreachable!();
612            }
613        }
614    }
615
616    fn unregister(&self, timer: &TimerState) {
617        if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
618            // If the timer was never registered, then we have exclusive access and we can skip the
619            // rest of this (avoiding the lock on `inner`).
620            // We cannot early-exit if the timer is FIRED or TERMINATED because then we could race
621            // with another thread that is actively using the timer object, and if this call
622            // completes before it blocks on `inner`, then the timer's resources could be
623            // deallocated, which would result in a use-after-free on the other thread.
624            return;
625        }
626        let mut inner = self.inner.lock();
627        // SAFETY: `inner` is locked.
628        let index = unsafe { *timer.index.get() };
629        if let Some(index) = index.get() {
630            inner.timers.remove(index);
631            if index == 0 {
632                // The front of the queue just changed.  Make sure to update the underlying zircon
633                // timer state to match the new queue state.
634                self.setup_zircon_timer(&mut inner, false);
635            }
636            timer.state.store(UNREGISTERED, Ordering::Relaxed);
637        }
638    }
639
640    /// Returns true if the timer was successfully reset.
641    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
642        let mut inner = self.inner.lock();
643        // SAFETY: `inner` is locked.
644        let index = unsafe { *timer.index.get() };
645        if let Some(old_index) = index.get() {
646            if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
647                // If the timer has moved into or out-of the front queue position, update the
648                // underlying zircon timer to reflect the new queue state.
649                self.setup_zircon_timer(&mut inner, false);
650            }
651            timer.state.store(REGISTERED, Ordering::Relaxed);
652            true
653        } else {
654            false
655        }
656    }
657}
658
659#[derive(Default)]
660struct Heap(Vec<StateRef>);
661
662// BinaryHeap doesn't support removal, and BTreeSet ends up increasing binary size significantly,
663// so we roll our own binary heap.
664impl Heap {
665    fn push(&mut self, mut timer: StateRef) {
666        let index = self.0.len();
667        self.0.push(timer);
668        // SAFETY: `inner` is locked.
669        unsafe {
670            timer.set_index(index.into());
671        }
672        self.fix_up(index);
673    }
674
675    fn peek(&self) -> Option<&StateRef> {
676        self.0.first()
677    }
678
679    fn pop(&mut self) -> Option<StateRef> {
680        if let Some(&first) = self.0.first() {
681            self.remove(0);
682            Some(first)
683        } else {
684            None
685        }
686    }
687
688    fn swap(&mut self, a: usize, b: usize) {
689        self.0.swap(a, b);
690        // SAFETY: `inner` is locked.
691        unsafe {
692            self.0[a].set_index(a.into());
693            self.0[b].set_index(b.into());
694        }
695    }
696
697    /// Resets the timer at the given index to the new time and returns the new index.
698    fn reset(&mut self, index: usize, nanos: i64) -> usize {
699        // SAFETY: `inner` is locked.
700        if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
701            self.fix_up(index)
702        } else {
703            self.fix_down(index)
704        }
705    }
706
707    fn remove(&mut self, index: usize) {
708        // SAFETY: `inner` is locked.
709        unsafe {
710            let old_index = self.0[index].set_index(HeapIndex::NULL);
711            debug_assert_eq!(old_index, index.into());
712        }
713
714        // Swap the item at slot `index` to the end of the vector so we can truncate it away, and
715        // then swap the previously last item into the correct spot.
716        let last = self.0.len() - 1;
717        if index < last {
718            let fix_up;
719            unsafe {
720                // SAFETY: `inner` is locked.
721                fix_up = self.0[last].nanos() < self.0[index].nanos();
722                self.0[index] = self.0[last];
723                self.0[index].set_index(index.into());
724            };
725            self.0.truncate(last);
726            if fix_up {
727                self.fix_up(index);
728            } else {
729                self.fix_down(index);
730            }
731        } else {
732            self.0.truncate(last);
733        };
734    }
735
736    /// Returns the new index
737    fn fix_up(&mut self, mut index: usize) -> usize {
738        while index > 0 {
739            let parent = (index - 1) / 2;
740            // SAFETY: `inner` is locked.
741            if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
742                return index;
743            }
744            self.swap(parent, index);
745            index = parent;
746        }
747        index
748    }
749
750    /// Returns the new index
751    fn fix_down(&mut self, mut index: usize) -> usize {
752        let len = self.0.len();
753        loop {
754            let left = index * 2 + 1;
755            if left >= len {
756                return index;
757            }
758
759            let mut swap_with = None;
760
761            // SAFETY: `inner` is locked.
762            unsafe {
763                let mut nanos = self.0[index].nanos();
764                let left_nanos = self.0[left].nanos();
765                if left_nanos < nanos {
766                    swap_with = Some(left);
767                    nanos = left_nanos;
768                }
769                let right = left + 1;
770                if right < len && self.0[right].nanos() < nanos {
771                    swap_with = Some(right);
772                }
773            }
774
775            let Some(swap_with) = swap_with else { return index };
776            self.swap(index, swap_with);
777            index = swap_with;
778        }
779    }
780}
781
782#[cfg(test)]
783mod test {
784    use super::*;
785    use crate::{LocalExecutor, SendExecutorBuilder, Task, TestExecutor};
786    use assert_matches::assert_matches;
787    use futures::channel::oneshot::channel;
788    use futures::future::Either;
789    use futures::prelude::*;
790    use rand::seq::SliceRandom;
791    use rand::{Rng, rng};
792    use std::future::poll_fn;
793    use std::pin::pin;
794    use zx::MonotonicDuration;
795
796    trait TestTimeInterface:
797        TimeInterface
798        + WakeupTime
799        + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
800        + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
801    {
802        fn after(duration: zx::Duration<Self::Timeline>) -> Self;
803    }
804
805    impl TestTimeInterface for MonotonicInstant {
806        fn after(duration: zx::MonotonicDuration) -> Self {
807            Self::after(duration)
808        }
809    }
810
811    impl TestTimeInterface for BootInstant {
812        fn after(duration: zx::BootDuration) -> Self {
813            Self::after(duration)
814        }
815    }
816
817    fn test_shorter_fires_first<T: TestTimeInterface>() {
818        let mut exec = LocalExecutor::default();
819        let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
820        let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
821        match exec.run_singlethreaded(future::select(shorter, longer)) {
822            Either::Left(_) => {}
823            Either::Right(_) => panic!("wrong timer fired"),
824        }
825    }
826
827    #[test]
828    fn shorter_fires_first() {
829        test_shorter_fires_first::<MonotonicInstant>();
830        test_shorter_fires_first::<BootInstant>();
831    }
832
833    fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
834        SendExecutorBuilder::new().num_threads(4).build().run(async {
835            let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
836            let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
837            match future::select(shorter, longer).await {
838                Either::Left(_) => {}
839                Either::Right(_) => panic!("wrong timer fired"),
840            }
841        });
842    }
843
844    #[test]
845    fn shorter_fires_first_multithreaded() {
846        test_shorter_fires_first_multithreaded::<MonotonicInstant>();
847        test_shorter_fires_first_multithreaded::<BootInstant>();
848    }
849
850    fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
851        let mut exec = TestExecutor::new();
852        let now = T::now();
853        let before = pin!(Timer::new(T::from_nanos(now - 1)));
854        let after = pin!(Timer::new(T::from_nanos(now + 1)));
855        assert_matches!(
856            exec.run_singlethreaded(futures::future::select(before, after)),
857            Either::Left(_),
858            "Timer in the past should fire first"
859        );
860    }
861
862    #[test]
863    fn timer_before_now_fires_immediately() {
864        test_timer_before_now_fires_immediately::<MonotonicInstant>();
865        test_timer_before_now_fires_immediately::<BootInstant>();
866    }
867
868    #[test]
869    fn fires_after_timeout() {
870        let mut exec = TestExecutor::new_with_fake_time();
871        exec.set_fake_time(MonotonicInstant::from_nanos(0));
872        let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
873        let mut future = pin!(Timer::new(deadline));
874        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
875        exec.set_fake_time(deadline);
876        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
877    }
878
879    #[test]
880    fn interval() {
881        let mut exec = TestExecutor::new_with_fake_time();
882        let start = MonotonicInstant::from_nanos(0);
883        exec.set_fake_time(start);
884
885        let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
886        let mut future = pin!({
887            let counter = counter.clone();
888            Interval::new(MonotonicDuration::from_seconds(5))
889                .map(move |()| {
890                    counter.fetch_add(1, Ordering::SeqCst);
891                })
892                .collect::<()>()
893        });
894
895        // PollResult for the first time before the timer runs
896        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
897        assert_eq!(0, counter.load(Ordering::SeqCst));
898
899        // Pretend to wait until the next timer
900        let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
901        assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
902        exec.set_fake_time(first_deadline);
903        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
904        assert_eq!(1, counter.load(Ordering::SeqCst));
905
906        // PollResulting again before the timer runs shouldn't produce another item from the stream
907        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
908        assert_eq!(1, counter.load(Ordering::SeqCst));
909
910        // "Wait" until the next timeout and poll again: expect another item from the stream
911        let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
912        exec.set_fake_time(second_deadline);
913        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
914        assert_eq!(2, counter.load(Ordering::SeqCst));
915
916        assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
917    }
918
919    #[test]
920    fn timer_fake_time() {
921        let mut exec = TestExecutor::new_with_fake_time();
922        exec.set_fake_time(MonotonicInstant::from_nanos(0));
923
924        let mut timer =
925            pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
926        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
927
928        exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
929        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
930    }
931
932    fn create_timers(
933        timers: &Arc<Timers<MonotonicInstant>>,
934        nanos: &[i64],
935        timer_futures: &mut Vec<Pin<Box<Timer>>>,
936    ) {
937        let waker = futures::task::noop_waker();
938        let mut cx = Context::from_waker(&waker);
939        for &n in nanos {
940            let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
941            let _ = timer.poll_unpin(&mut cx);
942            timer_futures.push(timer);
943        }
944    }
945
946    #[test]
947    fn timer_heap() {
948        let _exec = TestExecutor::new_with_fake_time();
949        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
950        timers.register(EHandle::local().inner());
951
952        let mut timer_futures = Vec::new();
953        let mut nanos: Vec<_> = (0..1000).collect();
954        let mut rng = rng();
955        nanos.shuffle(&mut rng);
956
957        create_timers(&timers, &nanos, &mut timer_futures);
958
959        // Make sure the timers fire in the correct order.
960        for i in 0..1000 {
961            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
962        }
963
964        timer_futures.clear();
965        create_timers(&timers, &nanos, &mut timer_futures);
966
967        // Remove half of them in random order, and ensure the remaining timers are correctly
968        // ordered.
969        timer_futures.shuffle(&mut rng);
970        timer_futures.truncate(500);
971        let mut last_time = None;
972        for _ in 0..500 {
973            let time = timers.wake_next_timer().unwrap();
974            if let Some(last_time) = last_time {
975                assert!(last_time <= time);
976            }
977            last_time = Some(time);
978        }
979        assert_eq!(timers.wake_next_timer(), None);
980
981        timer_futures = vec![];
982        create_timers(&timers, &nanos, &mut timer_futures);
983
984        // Replace them all in random order.
985        timer_futures.shuffle(&mut rng);
986        let mut nanos: Vec<_> = (1000..2000).collect();
987        nanos.shuffle(&mut rng);
988
989        for (fut, n) in timer_futures.iter_mut().zip(nanos) {
990            fut.as_mut().reset(MonotonicInstant::from_nanos(n));
991        }
992
993        // Check they all get changed and now fire in the correct order.
994        for i in 1000..2000 {
995            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
996        }
997
998        timers.deregister();
999    }
1000
1001    #[test]
1002    fn timer_heap_with_same_time() {
1003        let _exec = TestExecutor::new_with_fake_time();
1004        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1005        timers.register(EHandle::local().inner());
1006
1007        let mut timer_futures = Vec::new();
1008        let mut nanos: Vec<_> = (1..100).collect();
1009        let mut rng = rng();
1010        nanos.shuffle(&mut rng);
1011
1012        create_timers(&timers, &nanos, &mut timer_futures);
1013
1014        // Create some timers with the same time.
1015        let time = rng.random_range(0..101);
1016        let same_time = [time; 100];
1017        create_timers(&timers, &same_time, &mut timer_futures);
1018
1019        nanos.extend(&same_time);
1020        nanos.sort();
1021
1022        for n in nanos {
1023            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1024        }
1025
1026        timers.deregister();
1027    }
1028
1029    #[test]
1030    fn timer_reset_to_earlier_time() {
1031        let mut exec = LocalExecutor::default();
1032
1033        for _ in 0..100 {
1034            let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1035            let (sender, receiver) = channel();
1036            let task = Task::spawn(async move {
1037                let mut timer = pin!(Timer::new(instant));
1038                let mut receiver = pin!(receiver.fuse());
1039                poll_fn(|cx| {
1040                    loop {
1041                        if timer.as_mut().poll_unpin(cx).is_ready() {
1042                            return Poll::Ready(());
1043                        }
1044                        if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1045                            timer
1046                                .as_mut()
1047                                .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1048                        } else {
1049                            return Poll::Pending;
1050                        }
1051                    }
1052                })
1053                .await;
1054            });
1055            sender.send(()).unwrap();
1056
1057            exec.run_singlethreaded(task);
1058
1059            if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1060                return;
1061            }
1062        }
1063
1064        panic!("Timer fired late in all 100 attempts");
1065    }
1066
1067    #[test]
1068    fn test_reset() {
1069        // This is a test for https://fxbug.dev/418235546.
1070        SendExecutorBuilder::new().num_threads(2).build().run(async {
1071            const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1072            let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1073            for _ in 0..10000 {
1074                let _ = futures::poll!(timer.as_mut());
1075                std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1076                timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1077                timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1078            }
1079        });
1080    }
1081}