fuchsia_async/runtime/fuchsia/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for creating futures that represent timers.
6//!
7//! This module contains the `Timer` type which is a future that will resolve
8//! at a particular point in the future.
9
10use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU8, Ordering};
26use std::task::{Poll, Waker, ready};
27use zx::AsHandleRef as _;
28
29pub trait TimeInterface:
30    Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
31{
32    type Timeline: zx::Timeline + Send + Sync + 'static;
33
34    fn from_nanos(nanos: i64) -> Self;
35    fn into_nanos(self) -> i64;
36    fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
37    fn now() -> i64;
38    fn create_timer() -> zx::Timer<Self::Timeline>;
39}
40
41impl TimeInterface for MonotonicInstant {
42    type Timeline = zx::MonotonicTimeline;
43
44    fn from_nanos(nanos: i64) -> Self {
45        Self::from_nanos(nanos)
46    }
47
48    fn into_nanos(self) -> i64 {
49        self.into_nanos()
50    }
51
52    fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
53        zx::MonotonicInstant::from_nanos(nanos)
54    }
55
56    fn now() -> i64 {
57        EHandle::local().inner().now().into_nanos()
58    }
59
60    fn create_timer() -> zx::Timer<Self::Timeline> {
61        zx::Timer::<Self::Timeline>::create()
62    }
63}
64
65impl TimeInterface for BootInstant {
66    type Timeline = zx::BootTimeline;
67
68    fn from_nanos(nanos: i64) -> Self {
69        Self::from_nanos(nanos)
70    }
71
72    fn into_nanos(self) -> i64 {
73        self.into_nanos()
74    }
75
76    fn zx_instant(nanos: i64) -> zx::BootInstant {
77        zx::BootInstant::from_nanos(nanos)
78    }
79
80    fn now() -> i64 {
81        EHandle::local().inner().boot_now().into_nanos()
82    }
83
84    fn create_timer() -> zx::Timer<Self::Timeline> {
85        zx::Timer::<Self::Timeline>::create()
86    }
87}
88
89impl WakeupTime for std::time::Instant {
90    fn into_timer(self) -> Timer {
91        let now_as_instant = std::time::Instant::now();
92        let now_as_time = MonotonicInstant::now();
93        EHandle::local()
94            .mono_timers()
95            .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
96    }
97}
98
99impl WakeupTime for MonotonicInstant {
100    fn into_timer(self) -> Timer {
101        EHandle::local().mono_timers().new_timer(self)
102    }
103}
104
105impl WakeupTime for BootInstant {
106    fn into_timer(self) -> Timer {
107        EHandle::local().boot_timers().new_timer(self)
108    }
109}
110
111impl WakeupTime for zx::MonotonicInstant {
112    fn into_timer(self) -> Timer {
113        EHandle::local().mono_timers().new_timer(self.into())
114    }
115}
116
117impl WakeupTime for zx::BootInstant {
118    fn into_timer(self) -> Timer {
119        EHandle::local().boot_timers().new_timer(self.into())
120    }
121}
122
123/// An asynchronous timer.
124#[must_use = "futures do nothing unless polled"]
125pub struct Timer(TimerState);
126
127impl Timer {
128    /// Create a new timer scheduled to fire at `time`.
129    pub fn new(time: impl WakeupTime) -> Self {
130        time.into_timer()
131    }
132
133    /// Reset the `Timer` to a fire at a new time.
134    pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
135        let nanos = time.into_nanos();
136        // If in the UNREGISTERED state, we can skip the call to `try_reset_timer` because the timer
137        // has *never* been registered and so there's no danger of another thread having access to
138        // this timer. In all other states, including the FIRED and TERMINATED states, we must use
139        // `try_reset_timer` as that will take a lock and guarantee that other threads are not
140        // concurrently accessing the timer.
141        //
142        // This can be Relaxed because because there are no loads or stores that follow that could
143        // possibly be reordered before here that matter: the first thing `try_reset_timer` does is
144        // take a lock which will have its own memory barriers, and the store to the time is next
145        // going to be read by this same task prior to taking the lock in `Timers::inner`.
146        if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
147            || !self.0.timers.try_reset_timer(&self.0, nanos)
148        {
149            // SAFETY: This is safe because we know the timer isn't registered which means we truly
150            // have exclusive access to TimerState.
151            unsafe { *self.0.nanos.get() = nanos };
152            self.0.state.store(UNREGISTERED, Ordering::Relaxed);
153        }
154    }
155}
156
157impl fmt::Debug for Timer {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
159        f.debug_struct("Timer").field("time", &self.0.nanos).finish()
160    }
161}
162
163impl Drop for Timer {
164    fn drop(&mut self) {
165        self.0.timers.unregister(&self.0);
166    }
167}
168
169impl Future for Timer {
170    type Output = ();
171    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172        // SAFETY: We call `unregister` when `Timer` is dropped.
173        unsafe { self.0.timers.poll(self.as_ref(), cx) }
174    }
175}
176
177struct TimerState {
178    timers: Arc<dyn TimersInterface>,
179
180    // This is safe to access/mutate if the lock on `Timers::inner` is held.
181    nanos: UnsafeCell<i64>,
182
183    waker: AtomicWaker,
184    state: AtomicU8,
185
186    // Holds the index in the heap.  This is safe to access/mutate if the lock on `Timers::inner` is
187    // held.
188    index: UnsafeCell<HeapIndex>,
189
190    // StateRef stores a pointer to `TimerState`, so this must be pinned.
191    _pinned: PhantomPinned,
192}
193
194// SAFETY: TimerState is thread-safe.  See the safety comments elsewhere.
195unsafe impl Send for TimerState {}
196unsafe impl Sync for TimerState {}
197
198// Set when the timer is not registered in the heap.
199const UNREGISTERED: u8 = 0;
200
201// Set when the timer is in the heap.
202const REGISTERED: u8 = 1;
203
204// Set when the timer has fired.
205const FIRED: u8 = 2;
206
207// Set when the timer is terminated.
208const TERMINATED: u8 = 3;
209
210/// An index in the heap.
211#[derive(Clone, Copy, Debug, Eq, PartialEq)]
212struct HeapIndex(usize);
213
214impl HeapIndex {
215    const NULL: HeapIndex = HeapIndex(usize::MAX);
216
217    fn get(&self) -> Option<usize> {
218        if *self == HeapIndex::NULL { None } else { Some(self.0) }
219    }
220}
221
222impl From<usize> for HeapIndex {
223    fn from(value: usize) -> Self {
224        Self(value)
225    }
226}
227
228impl FusedFuture for Timer {
229    fn is_terminated(&self) -> bool {
230        self.0.state.load(Ordering::Relaxed) == TERMINATED
231    }
232}
233
234// A note on safety:
235//
236//  1. We remove the timer from the heap before we drop TimerState, and TimerState is pinned, so
237//     it's safe to store pointers in the heap i.e. the pointers are live since we make sure we
238//     remove them before dropping `TimerState`.
239//
240//  2. Provided we do #1, it is always safe to access the atomic fields of TimerState.
241//
242//  3. Once the timer has been registered, it is safe to access the non-atomic fields of TimerState
243//     whilst holding the lock on `Timers::inner`.
244#[derive(Copy, Clone, Debug)]
245struct StateRef(*const TimerState);
246
247// SAFETY: See the notes above regarding safety.
248unsafe impl Send for StateRef {}
249unsafe impl Sync for StateRef {}
250
251impl StateRef {
252    fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
253        // SAFETY: `inner` is locked.
254        unsafe {
255            // As soon as we set the state to FIRED, the heap no longer owns the timer and it might
256            // be re-registered.  This store is safe to be Relaxed because `AtomicWaker::take` has a
257            // Release barrier, so the store can't be reordered after it, and therefore we can be
258            // certain that another thread which re-registers the waker will see the state is FIRED
259            // (and will interpret that as meaning that the task should not block and instead
260            // immediately complete; see `Timers::poll`).
261            (*self.0).state.store(FIRED, Ordering::Relaxed);
262            (*self.0).waker.take()
263        }
264    }
265
266    // # Safety
267    //
268    // `Timers::inner` must be locked.
269    unsafe fn nanos(&self) -> i64 {
270        *(*self.0).nanos.get()
271    }
272
273    // # Safety
274    //
275    // `Timers::inner` must be locked.
276    unsafe fn nanos_mut(&mut self) -> &mut i64 {
277        &mut *(*self.0).nanos.get()
278    }
279
280    // # Safety
281    //
282    // `Timers::inner` must be locked.
283    unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
284        std::mem::replace(&mut *(*self.0).index.get(), index)
285    }
286}
287
288/// An asynchronous interval timer.
289///
290/// This is a stream of events resolving at a rate of once-per interval.  This generates an event
291/// for *every* elapsed duration, even if multiple have elapsed since last polled.
292///
293/// TODO(https://fxbug.dev/375632319): This is lack of BootInstant support.
294#[derive(Debug)]
295#[must_use = "streams do nothing unless polled"]
296pub struct Interval {
297    timer: Pin<Box<Timer>>,
298    next: MonotonicInstant,
299    duration: zx::MonotonicDuration,
300}
301
302impl Interval {
303    /// Create a new `Interval` which yields every `duration`.
304    pub fn new(duration: zx::MonotonicDuration) -> Self {
305        let next = MonotonicInstant::after(duration);
306        Interval { timer: Box::pin(Timer::new(next)), next, duration }
307    }
308}
309
310impl FusedStream for Interval {
311    fn is_terminated(&self) -> bool {
312        // `Interval` never yields `None`
313        false
314    }
315}
316
317impl Stream for Interval {
318    type Item = ();
319    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320        ready!(self.timer.poll_unpin(cx));
321        let next = self.next + self.duration;
322        self.timer.as_mut().reset(next);
323        self.next = next;
324        Poll::Ready(Some(()))
325    }
326}
327
328pub(crate) struct Timers<T: TimeInterface> {
329    inner: Mutex<Inner>,
330
331    fake: bool,
332
333    timer: zx::Timer<T::Timeline>,
334
335    // This will form a reference cycle which the caller *must* break by calling `deregister`.
336    receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
337}
338
339struct Inner {
340    // The queue of active timer objects.
341    timers: Heap,
342
343    // The last deadline we set on the zircon timer, or None if the queue was empty and the timer
344    // was canceled most recently.
345    last_deadline: Option<i64>,
346
347    // True if there's a pending async_wait.
348    async_wait: bool,
349
350    // The port key.
351    port_key: u64,
352}
353
354impl<T: TimeInterface> Timers<T> {
355    pub fn new(fake: bool) -> Self {
356        Self {
357            inner: Mutex::new(Inner {
358                timers: Heap::default(),
359                last_deadline: None,
360                async_wait: false,
361                port_key: 0,
362            }),
363            fake,
364            timer: T::create_timer(),
365            receiver_registration: Mutex::default(),
366        }
367    }
368
369    pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
370        let nanos = time.into_nanos();
371        Timer(TimerState {
372            timers: self.clone(),
373            nanos: UnsafeCell::new(nanos),
374            waker: AtomicWaker::new(),
375            state: AtomicU8::new(UNREGISTERED),
376            index: UnsafeCell::new(HeapIndex::NULL),
377            _pinned: PhantomPinned,
378        })
379    }
380
381    /// Registers the timers to receive packets.  This will establish a reference cycle that
382    /// the caller must break by calling `deregister`.
383    pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
384        let key = self
385            .receiver_registration
386            .lock()
387            .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
388            .key();
389        self.inner.lock().port_key = key;
390    }
391
392    /// Deregisters the timers and breaks the reference cycle.
393    pub fn deregister(&self) {
394        *self.receiver_registration.lock() = None;
395    }
396
397    /// Ensures the underlying Zircon timer has been correctly set or canceled after
398    /// the queue has been updated.
399    ///
400    /// # Safety
401    ///
402    /// Callers must ensure that `self.inner` is locked before calling this method.
403    fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
404        // Our new deadline is the deadline for the front of the queue, or no deadline (infinite) if
405        // there is no front of the queue.
406        let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
407
408        // If the effective deadline of the queue has changed, reprogram the zircon timer's
409        // deadline.
410        if new_deadline != inner.last_deadline {
411            inner.last_deadline = new_deadline;
412            match inner.last_deadline {
413                Some(deadline) => {
414                    self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
415                }
416                None => self.timer.cancel().unwrap(),
417            }
418        }
419
420        // If this is being called while processing the timer packet from a previous async wait,
421        // then clear the async wait flag.  This is the very last thing we need to do, so this async
422        // wait operation is effectively over.
423        if from_receive_packet {
424            inner.async_wait = false;
425        }
426
427        // If we have a valid timeout, but we have no in-flight async wait operation, post a new
428        // one.
429        if inner.last_deadline.is_some() && !inner.async_wait {
430            if self.fake {
431                // Clear the signal used for fake timers so that we can use it to trigger
432                // next time.
433                self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
434            }
435
436            self.timer
437                .wait_async_handle(
438                    EHandle::local().port(),
439                    inner.port_key,
440                    if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
441                    zx::WaitAsyncOpts::empty(),
442                )
443                .unwrap();
444
445            inner.async_wait = true;
446        }
447    }
448
449    /// Wakes timers that should be firing now.  Returns true if any timers were woken.
450    pub fn wake_timers(&self) -> bool {
451        self.wake_timers_impl(false)
452    }
453
454    fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
455        let now = T::now();
456        let mut timers_woken = false;
457
458        loop {
459            let waker = {
460                let mut inner = self.inner.lock();
461
462                // SAFETY: `inner` is locked.
463                let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
464                if deadline.is_some_and(|d| d <= now) {
465                    let timer = inner.timers.pop().unwrap();
466                    timer.into_waker(&mut inner)
467                } else {
468                    // We are now finished (one way or the other). Setup the underlying Zircon timer
469                    // to reflect the new state of the queue, and break out of the loop.
470                    //
471                    // When processing a timer packet (from_receive_packet is true), it is very
472                    // important that this always be the last thing we do before breaking out of the
473                    // loop (dropping the lock in the process) for good.
474                    //
475                    // Failing to do this at the end can lead to the timer queue stalling.  Doing it
476                    // early (when we are dispatching expired timers) can lead to an ever
477                    // multiplying army of posted async waits.
478                    //
479                    // See https://g-issues.fuchsia.dev/issues/396173066 for details.
480                    self.setup_zircon_timer(&mut inner, from_receive_packet);
481                    break;
482                }
483            };
484            if let Some(waker) = waker {
485                waker.wake()
486            }
487            timers_woken = true;
488        }
489        timers_woken
490    }
491
492    /// Wakes the next timer and returns its time.
493    pub fn wake_next_timer(&self) -> Option<T> {
494        let (nanos, waker) = {
495            let mut inner = self.inner.lock();
496            let timer = inner.timers.pop()?;
497            // SAFETY: `inner` is locked.
498            let nanos = unsafe { timer.nanos() };
499            (nanos, timer.into_waker(&mut inner))
500        };
501        if let Some(waker) = waker {
502            waker.wake();
503        }
504        Some(T::from_nanos(nanos))
505    }
506
507    /// Returns the next timer due to expire.
508    pub fn next_timer(&self) -> Option<T> {
509        // SAFETY: `inner` is locked.
510        self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
511    }
512
513    /// If there's a timer ready, sends a notification to wake up the receiver.
514    ///
515    /// # Panics
516    ///
517    /// This will panic if we are not using fake time.
518    pub fn maybe_notify(&self, now: T) {
519        assert!(self.fake, "calling this function requires using fake time.");
520        // SAFETY: `inner` is locked.
521        if self
522            .inner
523            .lock()
524            .timers
525            .peek()
526            .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
527        {
528            self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
529        }
530    }
531}
532
533impl<T: TimeInterface> PacketReceiver for Timers<T> {
534    fn receive_packet(&self, _packet: zx::Packet) {
535        self.wake_timers_impl(true);
536    }
537}
538
539// See comments on the implementation below.
540trait TimersInterface: Send + Sync + 'static {
541    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
542    fn unregister(&self, state: &TimerState);
543    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
544}
545
546impl<T: TimeInterface> TimersInterface for Timers<T> {
547    // # Safety
548    //
549    // `unregister` must be called before `Timer` is dropped.
550    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
551        // See https://docs.rs/futures/0.3.5/futures/task/struct.AtomicWaker.html
552        // for more information.
553        // Quick check to avoid registration if already done.
554        //
555        // This is safe to be Relaxed because `AtomicWaker::register` has barriers which means that
556        // the load further down can't be moved before registering the waker, which means we can't
557        // miss the timer firing.  If the timer isn't registered, the time might have been reset but
558        // that would have been by the same task, so there should be no ordering issue there.  If we
559        // then try and register the timer, we take the lock on `inner` so there will be barriers
560        // there.
561        let state = timer.0.state.load(Ordering::Relaxed);
562
563        if state == TERMINATED {
564            return Poll::Ready(());
565        }
566
567        if state == FIRED {
568            timer.0.state.store(TERMINATED, Ordering::Relaxed);
569            return Poll::Ready(());
570        }
571
572        if state == UNREGISTERED {
573            // SAFETY: The state is UNREGISTERED, so we have exclusive access.
574            let nanos = unsafe { *timer.0.nanos.get() };
575            if nanos <= T::now() {
576                timer.0.state.store(FIRED, Ordering::Relaxed);
577                return Poll::Ready(());
578            }
579            let mut inner = self.inner.lock();
580
581            // We store a pointer to `timer` here. This is safe to do because `timer` is pinned, and
582            // we always make sure we call `unregister` before `timer` is dropped.
583            inner.timers.push(StateRef(&timer.0));
584
585            // Now that we have added a new timer to the queue, setup the
586            // underlying zircon timer to reflect the new state of the queue.
587            self.setup_zircon_timer(&mut inner, false);
588
589            timer.0.state.store(REGISTERED, Ordering::Relaxed);
590        }
591
592        timer.0.waker.register(cx.waker());
593
594        // Now that we've registered a waker, we need to check to see if the timer has been marked
595        // as FIRED by another thread in the meantime (e.g. in StateRef::into_waker).  In that case
596        // the timer is never going to fire again as it is no longer managed by the timer heap, so
597        // the timer's task would become Pending but nothing would wake it up later.
598        // Loading the state *must* happen after the above `AtomicWaker::register` (which
599        // establishes an Acquire barrier, preventing the below load from being reordered above it),
600        // or else we could racily hit the above scenario.
601        let state = timer.0.state.load(Ordering::Relaxed);
602        match state {
603            FIRED => {
604                timer.0.state.store(TERMINATED, Ordering::Relaxed);
605                Poll::Ready(())
606            }
607            REGISTERED => Poll::Pending,
608            // TERMINATED is only set in `poll` which has exclusive access to the task (&mut
609            // Context).
610            // UNREGISTERED would indicate a logic bug somewhere.
611            _ => {
612                unreachable!();
613            }
614        }
615    }
616
617    fn unregister(&self, timer: &TimerState) {
618        if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
619            // If the timer was never registered, then we have exclusive access and we can skip the
620            // rest of this (avoiding the lock on `inner`).
621            // We cannot early-exit if the timer is FIRED or TERMINATED because then we could race
622            // with another thread that is actively using the timer object, and if this call
623            // completes before it blocks on `inner`, then the timer's resources could be
624            // deallocated, which would result in a use-after-free on the other thread.
625            return;
626        }
627        let mut inner = self.inner.lock();
628        // SAFETY: `inner` is locked.
629        let index = unsafe { *timer.index.get() };
630        if let Some(index) = index.get() {
631            inner.timers.remove(index);
632            if index == 0 {
633                // The front of the queue just changed.  Make sure to update the underlying zircon
634                // timer state to match the new queue state.
635                self.setup_zircon_timer(&mut inner, false);
636            }
637            timer.state.store(UNREGISTERED, Ordering::Relaxed);
638        }
639    }
640
641    /// Returns true if the timer was successfully reset.
642    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
643        let mut inner = self.inner.lock();
644        // SAFETY: `inner` is locked.
645        let index = unsafe { *timer.index.get() };
646        if let Some(old_index) = index.get() {
647            if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
648                // If the timer has moved into or out-of the front queue position, update the
649                // underlying zircon timer to reflect the new queue state.
650                self.setup_zircon_timer(&mut inner, false);
651            }
652            timer.state.store(REGISTERED, Ordering::Relaxed);
653            true
654        } else {
655            false
656        }
657    }
658}
659
660#[derive(Default)]
661struct Heap(Vec<StateRef>);
662
663// BinaryHeap doesn't support removal, and BTreeSet ends up increasing binary size significantly,
664// so we roll our own binary heap.
665impl Heap {
666    fn push(&mut self, mut timer: StateRef) {
667        let index = self.0.len();
668        self.0.push(timer);
669        // SAFETY: `inner` is locked.
670        unsafe {
671            timer.set_index(index.into());
672        }
673        self.fix_up(index);
674    }
675
676    fn peek(&self) -> Option<&StateRef> {
677        self.0.first()
678    }
679
680    fn pop(&mut self) -> Option<StateRef> {
681        if let Some(&first) = self.0.first() {
682            self.remove(0);
683            Some(first)
684        } else {
685            None
686        }
687    }
688
689    fn swap(&mut self, a: usize, b: usize) {
690        self.0.swap(a, b);
691        // SAFETY: `inner` is locked.
692        unsafe {
693            self.0[a].set_index(a.into());
694            self.0[b].set_index(b.into());
695        }
696    }
697
698    /// Resets the timer at the given index to the new time and returns the new index.
699    fn reset(&mut self, index: usize, nanos: i64) -> usize {
700        // SAFETY: `inner` is locked.
701        if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
702            self.fix_up(index)
703        } else {
704            self.fix_down(index)
705        }
706    }
707
708    fn remove(&mut self, index: usize) {
709        // SAFETY: `inner` is locked.
710        unsafe {
711            let old_index = self.0[index].set_index(HeapIndex::NULL);
712            debug_assert_eq!(old_index, index.into());
713        }
714
715        // Swap the item at slot `index` to the end of the vector so we can truncate it away, and
716        // then swap the previously last item into the correct spot.
717        let last = self.0.len() - 1;
718        if index < last {
719            let fix_up;
720            unsafe {
721                // SAFETY: `inner` is locked.
722                fix_up = self.0[last].nanos() < self.0[index].nanos();
723                self.0[index] = self.0[last];
724                self.0[index].set_index(index.into());
725            };
726            self.0.truncate(last);
727            if fix_up {
728                self.fix_up(index);
729            } else {
730                self.fix_down(index);
731            }
732        } else {
733            self.0.truncate(last);
734        };
735    }
736
737    /// Returns the new index
738    fn fix_up(&mut self, mut index: usize) -> usize {
739        while index > 0 {
740            let parent = (index - 1) / 2;
741            // SAFETY: `inner` is locked.
742            if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
743                return index;
744            }
745            self.swap(parent, index);
746            index = parent;
747        }
748        index
749    }
750
751    /// Returns the new index
752    fn fix_down(&mut self, mut index: usize) -> usize {
753        let len = self.0.len();
754        loop {
755            let left = index * 2 + 1;
756            if left >= len {
757                return index;
758            }
759
760            let mut swap_with = None;
761
762            // SAFETY: `inner` is locked.
763            unsafe {
764                let mut nanos = self.0[index].nanos();
765                let left_nanos = self.0[left].nanos();
766                if left_nanos < nanos {
767                    swap_with = Some(left);
768                    nanos = left_nanos;
769                }
770                let right = left + 1;
771                if right < len && self.0[right].nanos() < nanos {
772                    swap_with = Some(right);
773                }
774            }
775
776            let Some(swap_with) = swap_with else { return index };
777            self.swap(index, swap_with);
778            index = swap_with;
779        }
780    }
781}
782
783#[cfg(test)]
784mod test {
785    use super::*;
786    use crate::{LocalExecutor, SendExecutorBuilder, Task, TestExecutor};
787    use assert_matches::assert_matches;
788    use futures::channel::oneshot::channel;
789    use futures::future::Either;
790    use futures::prelude::*;
791    use rand::seq::SliceRandom;
792    use rand::{Rng, rng};
793    use std::future::poll_fn;
794    use std::pin::pin;
795    use zx::MonotonicDuration;
796
797    trait TestTimeInterface:
798        TimeInterface
799        + WakeupTime
800        + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
801        + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
802    {
803        fn after(duration: zx::Duration<Self::Timeline>) -> Self;
804    }
805
806    impl TestTimeInterface for MonotonicInstant {
807        fn after(duration: zx::MonotonicDuration) -> Self {
808            Self::after(duration)
809        }
810    }
811
812    impl TestTimeInterface for BootInstant {
813        fn after(duration: zx::BootDuration) -> Self {
814            Self::after(duration)
815        }
816    }
817
818    fn test_shorter_fires_first<T: TestTimeInterface>() {
819        let mut exec = LocalExecutor::default();
820        let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
821        let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
822        match exec.run_singlethreaded(future::select(shorter, longer)) {
823            Either::Left(_) => {}
824            Either::Right(_) => panic!("wrong timer fired"),
825        }
826    }
827
828    #[test]
829    fn shorter_fires_first() {
830        test_shorter_fires_first::<MonotonicInstant>();
831        test_shorter_fires_first::<BootInstant>();
832    }
833
834    fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
835        SendExecutorBuilder::new().num_threads(4).build().run(async {
836            let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
837            let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
838            match future::select(shorter, longer).await {
839                Either::Left(_) => {}
840                Either::Right(_) => panic!("wrong timer fired"),
841            }
842        });
843    }
844
845    #[test]
846    fn shorter_fires_first_multithreaded() {
847        test_shorter_fires_first_multithreaded::<MonotonicInstant>();
848        test_shorter_fires_first_multithreaded::<BootInstant>();
849    }
850
851    fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
852        let mut exec = TestExecutor::new();
853        let now = T::now();
854        let before = pin!(Timer::new(T::from_nanos(now - 1)));
855        let after = pin!(Timer::new(T::from_nanos(now + 1)));
856        assert_matches!(
857            exec.run_singlethreaded(futures::future::select(before, after)),
858            Either::Left(_),
859            "Timer in the past should fire first"
860        );
861    }
862
863    #[test]
864    fn timer_before_now_fires_immediately() {
865        test_timer_before_now_fires_immediately::<MonotonicInstant>();
866        test_timer_before_now_fires_immediately::<BootInstant>();
867    }
868
869    #[test]
870    fn fires_after_timeout() {
871        let mut exec = TestExecutor::new_with_fake_time();
872        exec.set_fake_time(MonotonicInstant::from_nanos(0));
873        let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
874        let mut future = pin!(Timer::new(deadline));
875        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
876        exec.set_fake_time(deadline);
877        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
878    }
879
880    #[test]
881    fn interval() {
882        let mut exec = TestExecutor::new_with_fake_time();
883        let start = MonotonicInstant::from_nanos(0);
884        exec.set_fake_time(start);
885
886        let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
887        let mut future = pin!({
888            let counter = counter.clone();
889            Interval::new(MonotonicDuration::from_seconds(5))
890                .map(move |()| {
891                    counter.fetch_add(1, Ordering::SeqCst);
892                })
893                .collect::<()>()
894        });
895
896        // PollResult for the first time before the timer runs
897        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
898        assert_eq!(0, counter.load(Ordering::SeqCst));
899
900        // Pretend to wait until the next timer
901        let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
902        assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
903        exec.set_fake_time(first_deadline);
904        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
905        assert_eq!(1, counter.load(Ordering::SeqCst));
906
907        // PollResulting again before the timer runs shouldn't produce another item from the stream
908        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
909        assert_eq!(1, counter.load(Ordering::SeqCst));
910
911        // "Wait" until the next timeout and poll again: expect another item from the stream
912        let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
913        exec.set_fake_time(second_deadline);
914        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
915        assert_eq!(2, counter.load(Ordering::SeqCst));
916
917        assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
918    }
919
920    #[test]
921    fn timer_fake_time() {
922        let mut exec = TestExecutor::new_with_fake_time();
923        exec.set_fake_time(MonotonicInstant::from_nanos(0));
924
925        let mut timer =
926            pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
927        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
928
929        exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
930        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
931    }
932
933    fn create_timers(
934        timers: &Arc<Timers<MonotonicInstant>>,
935        nanos: &[i64],
936        timer_futures: &mut Vec<Pin<Box<Timer>>>,
937    ) {
938        let waker = futures::task::noop_waker();
939        let mut cx = Context::from_waker(&waker);
940        for &n in nanos {
941            let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
942            let _ = timer.poll_unpin(&mut cx);
943            timer_futures.push(timer);
944        }
945    }
946
947    #[test]
948    fn timer_heap() {
949        let _exec = TestExecutor::new_with_fake_time();
950        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
951        timers.register(EHandle::local().inner());
952
953        let mut timer_futures = Vec::new();
954        let mut nanos: Vec<_> = (0..1000).collect();
955        let mut rng = rng();
956        nanos.shuffle(&mut rng);
957
958        create_timers(&timers, &nanos, &mut timer_futures);
959
960        // Make sure the timers fire in the correct order.
961        for i in 0..1000 {
962            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
963        }
964
965        timer_futures.clear();
966        create_timers(&timers, &nanos, &mut timer_futures);
967
968        // Remove half of them in random order, and ensure the remaining timers are correctly
969        // ordered.
970        timer_futures.shuffle(&mut rng);
971        timer_futures.truncate(500);
972        let mut last_time = None;
973        for _ in 0..500 {
974            let time = timers.wake_next_timer().unwrap();
975            if let Some(last_time) = last_time {
976                assert!(last_time <= time);
977            }
978            last_time = Some(time);
979        }
980        assert_eq!(timers.wake_next_timer(), None);
981
982        timer_futures = vec![];
983        create_timers(&timers, &nanos, &mut timer_futures);
984
985        // Replace them all in random order.
986        timer_futures.shuffle(&mut rng);
987        let mut nanos: Vec<_> = (1000..2000).collect();
988        nanos.shuffle(&mut rng);
989
990        for (fut, n) in timer_futures.iter_mut().zip(nanos) {
991            fut.as_mut().reset(MonotonicInstant::from_nanos(n));
992        }
993
994        // Check they all get changed and now fire in the correct order.
995        for i in 1000..2000 {
996            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
997        }
998
999        timers.deregister();
1000    }
1001
1002    #[test]
1003    fn timer_heap_with_same_time() {
1004        let _exec = TestExecutor::new_with_fake_time();
1005        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1006        timers.register(EHandle::local().inner());
1007
1008        let mut timer_futures = Vec::new();
1009        let mut nanos: Vec<_> = (1..100).collect();
1010        let mut rng = rng();
1011        nanos.shuffle(&mut rng);
1012
1013        create_timers(&timers, &nanos, &mut timer_futures);
1014
1015        // Create some timers with the same time.
1016        let time = rng.random_range(0..101);
1017        let same_time = [time; 100];
1018        create_timers(&timers, &same_time, &mut timer_futures);
1019
1020        nanos.extend(&same_time);
1021        nanos.sort();
1022
1023        for n in nanos {
1024            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1025        }
1026
1027        timers.deregister();
1028    }
1029
1030    #[test]
1031    fn timer_reset_to_earlier_time() {
1032        let mut exec = LocalExecutor::default();
1033
1034        for _ in 0..100 {
1035            let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1036            let (sender, receiver) = channel();
1037            let task = Task::spawn(async move {
1038                let mut timer = pin!(Timer::new(instant));
1039                let mut receiver = pin!(receiver.fuse());
1040                poll_fn(|cx| {
1041                    loop {
1042                        if timer.as_mut().poll_unpin(cx).is_ready() {
1043                            return Poll::Ready(());
1044                        }
1045                        if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1046                            timer
1047                                .as_mut()
1048                                .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1049                        } else {
1050                            return Poll::Pending;
1051                        }
1052                    }
1053                })
1054                .await;
1055            });
1056            sender.send(()).unwrap();
1057
1058            exec.run_singlethreaded(task);
1059
1060            if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1061                return;
1062            }
1063        }
1064
1065        panic!("Timer fired late in all 100 attempts");
1066    }
1067
1068    #[test]
1069    fn test_reset() {
1070        // This is a test for https://fxbug.dev/418235546.
1071        SendExecutorBuilder::new().num_threads(2).build().run(async {
1072            const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1073            let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1074            for _ in 0..10000 {
1075                let _ = futures::poll!(timer.as_mut());
1076                std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1077                timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1078                timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1079            }
1080        });
1081    }
1082}