use alloc::collections::{binary_heap, hash_map, BinaryHeap, HashMap};
use core::hash::Hash;
use core::time::Duration;
use crate::{CoreTimerContext, Instant, InstantBindingsTypes, TimerBindingsTypes, TimerContext};
#[derive(Debug)]
pub struct LocalTimerHeap<K, V, BT: TimerBindingsTypes + InstantBindingsTypes> {
next_wakeup: BT::Timer,
heap: KeyedHeap<K, V, BT::Instant>,
}
impl<K, V, BC> LocalTimerHeap<K, V, BC>
where
K: Hash + Eq + Clone,
BC: TimerContext,
{
pub fn new(bindings_ctx: &mut BC, dispatch_id: BC::DispatchId) -> Self {
let next_wakeup = bindings_ctx.new_timer(dispatch_id);
Self { next_wakeup, heap: KeyedHeap::new() }
}
pub fn new_with_context<D, CC: CoreTimerContext<D, BC>>(
bindings_ctx: &mut BC,
dispatch_id: D,
) -> Self {
Self::new(bindings_ctx, CC::convert_timer(dispatch_id))
}
pub fn schedule_instant(
&mut self,
bindings_ctx: &mut BC,
timer: K,
value: V,
at: BC::Instant,
) -> Option<(BC::Instant, V)> {
let (prev_value, dirty) = self.heap.schedule(timer, value, at);
if dirty {
self.heal_and_reschedule(bindings_ctx);
}
prev_value
}
pub fn schedule_after(
&mut self,
bindings_ctx: &mut BC,
timer: K,
value: V,
after: Duration,
) -> Option<(BC::Instant, V)> {
let time = bindings_ctx.now().checked_add(after).unwrap();
self.schedule_instant(bindings_ctx, timer, value, time)
}
pub fn pop(&mut self, bindings_ctx: &mut BC) -> Option<(K, V)> {
let Self { next_wakeup: _, heap } = self;
let (popped, dirty) = heap.pop_if(|t| t <= bindings_ctx.now());
if dirty {
self.heal_and_reschedule(bindings_ctx);
}
popped
}
pub fn get(&self, timer: &K) -> Option<(BC::Instant, &V)> {
self.heap.map.get(timer).map(|MapEntry { time, value }| (*time, value))
}
pub fn cancel(&mut self, bindings_ctx: &mut BC, timer: &K) -> Option<(BC::Instant, V)> {
let (scheduled, dirty) = self.heap.cancel(timer);
if dirty {
self.heal_and_reschedule(bindings_ctx);
}
scheduled
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &V, &BC::Instant)> {
self.heap.map.iter().map(|(k, MapEntry { time, value })| (k, value, time))
}
fn heal_and_reschedule(&mut self, bindings_ctx: &mut BC) {
let Self { next_wakeup, heap } = self;
let mut new_top = None;
let _ = heap.pop_if(|t| {
new_top = Some(t);
false
});
let _: Option<BC::Instant> = match new_top {
Some(time) => bindings_ctx.schedule_timer_instant(time, next_wakeup),
None => bindings_ctx.cancel_timer(next_wakeup),
};
}
pub fn clear(&mut self, bindings_ctx: &mut BC) {
let Self { next_wakeup, heap } = self;
heap.clear();
let _: Option<BC::Instant> = bindings_ctx.cancel_timer(next_wakeup);
}
pub fn is_empty(&self) -> bool {
self.heap.map.is_empty()
}
}
#[derive(Debug)]
struct KeyedHeap<K, V, T> {
map: HashMap<K, MapEntry<T, V>>,
heap: BinaryHeap<HeapEntry<T, K>>,
}
impl<K: Hash + Eq + Clone, V, T: Instant> KeyedHeap<K, V, T> {
fn new() -> Self {
Self { map: HashMap::new(), heap: BinaryHeap::new() }
}
fn schedule(&mut self, key: K, value: V, at: T) -> (Option<(T, V)>, bool) {
let Self { map, heap } = self;
let dirty = heap
.peek()
.map(|HeapEntry { time, key: top_key }| top_key == &key || at < *time)
.unwrap_or(true);
let (heap_entry, prev) = match map.entry(key) {
hash_map::Entry::Occupied(mut o) => {
let MapEntry { time, value } = o.insert(MapEntry { time: at, value });
let heap_entry = (at < time).then(|| HeapEntry { time: at, key: o.key().clone() });
(heap_entry, Some((time, value)))
}
hash_map::Entry::Vacant(v) => {
let heap_entry = Some(HeapEntry { time: at, key: v.key().clone() });
let _: &mut MapEntry<_, _> = v.insert(MapEntry { time: at, value });
(heap_entry, None)
}
};
if let Some(heap_entry) = heap_entry {
heap.push(heap_entry);
}
(prev, dirty)
}
fn cancel(&mut self, key: &K) -> (Option<(T, V)>, bool) {
let Self { heap, map } = self;
let was_front = heap.peek().is_some_and(|HeapEntry { time: _, key: top }| key == top);
let prev = map.remove(key).map(|MapEntry { time, value }| (time, value));
(prev, was_front)
}
fn pop_if<F: FnOnce(T) -> bool>(&mut self, f: F) -> (Option<(K, V)>, bool) {
let mut changed_heap = false;
let popped = loop {
let Self { heap, map } = self;
let Some(peek_mut) = heap.peek_mut() else {
break None;
};
let HeapEntry { time: heap_time, key } = &*peek_mut;
match map.entry(key.clone()) {
hash_map::Entry::Vacant(_) => {
let _: HeapEntry<_, _> = binary_heap::PeekMut::pop(peek_mut);
changed_heap = true;
}
hash_map::Entry::Occupied(map_entry) => {
let MapEntry { time: scheduled_for, value: _ } = map_entry.get();
match heap_time.cmp(scheduled_for) {
core::cmp::Ordering::Equal => {
break f(*scheduled_for).then(|| {
let HeapEntry { time: _, key } =
binary_heap::PeekMut::pop(peek_mut);
changed_heap = true;
let MapEntry { time: _, value } = map_entry.remove();
(key, value)
});
}
core::cmp::Ordering::Less => {
let HeapEntry { time: _, key } = binary_heap::PeekMut::pop(peek_mut);
heap.push(HeapEntry { time: *scheduled_for, key });
changed_heap = true;
}
core::cmp::Ordering::Greater => {
unreachable!(
"observed heap time: {:?} later than the scheduled time {:?}",
heap_time, scheduled_for
);
}
}
}
}
};
(popped, changed_heap)
}
fn clear(&mut self) {
let Self { map, heap } = self;
map.clear();
heap.clear();
}
}
#[derive(Debug, Eq, PartialEq)]
struct MapEntry<T, V> {
time: T,
value: V,
}
#[derive(Debug)]
struct HeapEntry<T, K> {
time: T,
key: K,
}
impl<T: Instant, K> PartialEq for HeapEntry<T, K> {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl<T: Instant, K> Eq for HeapEntry<T, K> {}
impl<T: Instant, K> Ord for HeapEntry<T, K> {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
Ord::cmp(&other.time, &self.time)
}
}
impl<T: Instant, K> PartialOrd for HeapEntry<T, K> {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(Ord::cmp(self, other))
}
}
#[cfg(any(test, feature = "testutils"))]
mod testutil {
use core::fmt::Debug;
use core::ops::RangeBounds;
use super::*;
impl<K, V, BC> LocalTimerHeap<K, V, BC>
where
K: Hash + Eq + Clone + Debug,
V: Debug + Eq + PartialEq,
BC: TimerContext,
{
#[track_caller]
pub fn assert_timers(&self, timers: impl IntoIterator<Item = (K, V, BC::Instant)>) {
let map = timers
.into_iter()
.map(|(k, value, time)| (k, MapEntry { value, time }))
.collect::<HashMap<_, _>>();
assert_eq!(&self.heap.map, &map);
}
#[track_caller]
pub fn assert_timers_after(
&self,
bindings_ctx: &mut BC,
timers: impl IntoIterator<Item = (K, V, Duration)>,
) {
let now = bindings_ctx.now();
self.assert_timers(timers.into_iter().map(|(k, v, d)| (k, v, now.panicking_add(d))))
}
#[track_caller]
pub fn assert_top(&mut self, key: &K, value: &V) {
let top = self
.heap
.map
.iter()
.min_by_key(|(_key, MapEntry { time, .. })| time)
.map(|(key, MapEntry { time: _, value })| (key, value));
assert_eq!(top, Some((key, value)));
}
#[track_caller]
pub fn assert_range<
'a,
R: RangeBounds<BC::Instant> + Debug,
I: IntoIterator<Item = (&'a K, R)>,
>(
&'a self,
expect: I,
) {
for (timer, range) in expect {
let time = self
.get(timer)
.map(|(t, _)| t)
.unwrap_or_else(|| panic!("timer {timer:?} not present"));
assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
}
}
#[track_caller]
pub fn assert_range_single<'a, R: RangeBounds<BC::Instant> + Debug>(
&'a self,
timer: &K,
range: R,
) -> (BC::Instant, &V) {
let (time, value) =
self.get(timer).unwrap_or_else(|| panic!("timer {timer:?} not present"));
assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
(time, value)
}
}
}
#[cfg(test)]
mod tests {
use alloc::vec::Vec;
use core::convert::Infallible as Never;
use crate::testutil::{FakeAtomicInstant, FakeInstant, FakeInstantCtx};
use crate::InstantContext;
use super::*;
#[derive(Default)]
struct FakeTimerCtx {
instant: FakeInstantCtx,
}
impl InstantBindingsTypes for FakeTimerCtx {
type Instant = FakeInstant;
type AtomicInstant = FakeAtomicInstant;
}
impl InstantContext for FakeTimerCtx {
fn now(&self) -> Self::Instant {
self.instant.now()
}
}
impl TimerBindingsTypes for FakeTimerCtx {
type Timer = FakeTimer;
type DispatchId = ();
type UniqueTimerId = Never;
}
impl TimerContext for FakeTimerCtx {
fn new_timer(&mut self, (): Self::DispatchId) -> Self::Timer {
FakeTimer::default()
}
fn schedule_timer_instant(
&mut self,
time: Self::Instant,
timer: &mut Self::Timer,
) -> Option<Self::Instant> {
timer.scheduled.replace(time)
}
fn cancel_timer(&mut self, timer: &mut Self::Timer) -> Option<Self::Instant> {
timer.scheduled.take()
}
fn scheduled_instant(&self, timer: &mut Self::Timer) -> Option<Self::Instant> {
timer.scheduled.clone()
}
fn unique_timer_id(&self, _: &Self::Timer) -> Self::UniqueTimerId {
unimplemented!()
}
}
#[derive(Default, Debug)]
struct FakeTimer {
scheduled: Option<FakeInstant>,
}
#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Copy, Clone, Hash)]
struct TimerId(usize);
type LocalTimerHeap = super::LocalTimerHeap<TimerId, (), FakeTimerCtx>;
impl LocalTimerHeap {
#[track_caller]
fn assert_heap_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
let mut want = i.into_iter().collect::<Vec<_>>();
want.sort();
let mut got = self
.heap
.heap
.iter()
.map(|HeapEntry { time, key }| (*time, *key))
.collect::<Vec<_>>();
got.sort();
assert_eq!(got, want);
}
#[track_caller]
fn assert_map_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
let want = i.into_iter().map(|(t, k)| (k, t)).collect::<HashMap<_, _>>();
let got = self
.heap
.map
.iter()
.map(|(k, MapEntry { time, value: () })| (*k, *time))
.collect::<HashMap<_, _>>();
assert_eq!(got, want);
}
}
const TIMER1: TimerId = TimerId(1);
const TIMER2: TimerId = TimerId(2);
const TIMER3: TimerId = TimerId(3);
const T1: FakeInstant = FakeInstant { offset: Duration::from_secs(1) };
const T2: FakeInstant = FakeInstant { offset: Duration::from_secs(2) };
const T3: FakeInstant = FakeInstant { offset: Duration::from_secs(3) };
const T4: FakeInstant = FakeInstant { offset: Duration::from_secs(4) };
#[test]
fn schedule_instant() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.next_wakeup.scheduled, None);
heap.assert_heap_entries([]);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
heap.assert_heap_entries([(T2, TIMER2)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
}
#[test]
fn schedule_after() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.next_wakeup.scheduled, None);
let long_duration = Duration::from_secs(5);
let short_duration = Duration::from_secs(1);
let long_instant = ctx.now().checked_add(long_duration).unwrap();
let short_instant = ctx.now().checked_add(short_duration).unwrap();
assert_eq!(heap.schedule_after(&mut ctx, TIMER1, (), long_duration), None);
assert_eq!(heap.next_wakeup.scheduled, Some(long_instant));
heap.assert_heap_entries([(long_instant, TIMER1)]);
heap.assert_map_entries([(long_instant, TIMER1)]);
assert_eq!(
heap.schedule_after(&mut ctx, TIMER1, (), short_duration),
Some((long_instant, ()))
);
assert_eq!(heap.next_wakeup.scheduled, Some(short_instant));
heap.assert_heap_entries([(short_instant, TIMER1), (long_instant, TIMER1)]);
heap.assert_map_entries([(short_instant, TIMER1)]);
}
#[test]
fn cancel() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
assert_eq!(heap.cancel(&mut ctx, &TIMER1), Some((T1, ())));
heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
assert_eq!(heap.cancel(&mut ctx, &TIMER1), None);
assert_eq!(heap.cancel(&mut ctx, &TIMER3), Some((T3, ())));
heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T2, TIMER2)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
assert_eq!(heap.cancel(&mut ctx, &TIMER2), Some((T2, ())));
heap.assert_heap_entries([]);
heap.assert_map_entries([]);
assert_eq!(heap.next_wakeup.scheduled, None);
}
#[test]
fn pop() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
assert_eq!(heap.pop(&mut ctx), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
ctx.instant.time = T1;
assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
assert_eq!(heap.pop(&mut ctx), None);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
ctx.instant.time = T3;
assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
heap.assert_heap_entries([(T3, TIMER3)]);
heap.assert_map_entries([(T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T3));
assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
heap.assert_heap_entries([]);
heap.assert_map_entries([]);
assert_eq!(heap.next_wakeup.scheduled, None);
assert_eq!(heap.pop(&mut ctx), None);
}
#[test]
fn reschedule() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T4), Some((T2, ())));
heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T1, TIMER1), (T4, TIMER2), (T3, TIMER3)]);
ctx.instant.time = T4;
assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T4, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T3));
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), Some((T4, ())));
heap.assert_heap_entries([(T2, TIMER2), (T4, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T2));
assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
heap.assert_map_entries([(T3, TIMER3)]);
assert_eq!(heap.next_wakeup.scheduled, Some(T3));
assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
heap.assert_heap_entries([]);
heap.assert_map_entries([]);
assert_eq!(heap.next_wakeup.scheduled, None);
assert_eq!(heap.pop(&mut ctx), None);
}
#[test]
fn multiple_timers_same_instant() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
assert_eq!(heap.next_wakeup.scheduled.take(), Some(T1));
ctx.instant.time = T1;
assert!(heap.pop(&mut ctx).is_some());
assert_eq!(heap.next_wakeup.scheduled, Some(T1));
assert!(heap.pop(&mut ctx).is_some());
assert_eq!(heap.next_wakeup.scheduled, None);
assert_eq!(heap.pop(&mut ctx), None);
}
#[test]
fn clear() {
let mut ctx = FakeTimerCtx::default();
let mut heap = LocalTimerHeap::new(&mut ctx, ());
assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
heap.clear(&mut ctx);
heap.assert_map_entries([]);
assert_eq!(heap.next_wakeup.scheduled, None);
}
}