1use alloc::collections::{binary_heap, hash_map, BinaryHeap, HashMap};
8use core::hash::Hash;
9use core::time::Duration;
10
11use crate::{CoreTimerContext, Instant, InstantBindingsTypes, TimerBindingsTypes, TimerContext};
12
13#[derive(Debug)]
23pub struct LocalTimerHeap<K, V, BT: TimerBindingsTypes + InstantBindingsTypes> {
24 next_wakeup: BT::Timer,
25 heap: KeyedHeap<K, V, BT::Instant>,
26}
27
28impl<K, V, BC> LocalTimerHeap<K, V, BC>
29where
30 K: Hash + Eq + Clone,
31 BC: TimerContext,
32{
33 pub fn new(bindings_ctx: &mut BC, dispatch_id: BC::DispatchId) -> Self {
35 let next_wakeup = bindings_ctx.new_timer(dispatch_id);
36 Self { next_wakeup, heap: KeyedHeap::new() }
37 }
38
39 pub fn new_with_context<D, CC: CoreTimerContext<D, BC>>(
41 bindings_ctx: &mut BC,
42 dispatch_id: D,
43 ) -> Self {
44 Self::new(bindings_ctx, CC::convert_timer(dispatch_id))
45 }
46
47 pub fn schedule_instant(
52 &mut self,
53 bindings_ctx: &mut BC,
54 timer: K,
55 value: V,
56 at: BC::Instant,
57 ) -> Option<(BC::Instant, V)> {
58 let (prev_value, dirty) = self.heap.schedule(timer, value, at);
59 if dirty {
60 self.heal_and_reschedule(bindings_ctx);
61 }
62 prev_value
63 }
64
65 pub fn schedule_after(
72 &mut self,
73 bindings_ctx: &mut BC,
74 timer: K,
75 value: V,
76 after: Duration,
77 ) -> Option<(BC::Instant, V)> {
78 let time = bindings_ctx.now().checked_add(after).unwrap();
79 self.schedule_instant(bindings_ctx, timer, value, time)
80 }
81
82 pub fn pop(&mut self, bindings_ctx: &mut BC) -> Option<(K, V)> {
84 let Self { next_wakeup: _, heap } = self;
85 let (popped, dirty) = heap.pop_if(|t| t <= bindings_ctx.now());
86 if dirty {
87 self.heal_and_reschedule(bindings_ctx);
88 }
89 popped
90 }
91
92 pub fn get(&self, timer: &K) -> Option<(BC::Instant, &V)> {
95 self.heap.map.get(timer).map(|MapEntry { time, value }| (*time, value))
96 }
97
98 pub fn cancel(&mut self, bindings_ctx: &mut BC, timer: &K) -> Option<(BC::Instant, V)> {
101 let (scheduled, dirty) = self.heap.cancel(timer);
102 if dirty {
103 self.heal_and_reschedule(bindings_ctx);
104 }
105 scheduled
106 }
107
108 pub fn iter(&self) -> impl Iterator<Item = (&K, &V, &BC::Instant)> {
110 self.heap.map.iter().map(|(k, MapEntry { time, value })| (k, value, time))
111 }
112
113 fn heal_and_reschedule(&mut self, bindings_ctx: &mut BC) {
114 let Self { next_wakeup, heap } = self;
115 let mut new_top = None;
116 let _ = heap.pop_if(|t| {
117 new_top = Some(t);
121 false
122 });
123 let _: Option<BC::Instant> = match new_top {
124 Some(time) => bindings_ctx.schedule_timer_instant(time, next_wakeup),
125 None => bindings_ctx.cancel_timer(next_wakeup),
126 };
127 }
128
129 pub fn clear(&mut self, bindings_ctx: &mut BC) {
131 let Self { next_wakeup, heap } = self;
132 heap.clear();
133 let _: Option<BC::Instant> = bindings_ctx.cancel_timer(next_wakeup);
134 }
135
136 pub fn is_empty(&self) -> bool {
138 self.heap.map.is_empty()
139 }
140}
141
142#[derive(Debug)]
146struct KeyedHeap<K, V, T> {
147 map: HashMap<K, MapEntry<T, V>>,
154 heap: BinaryHeap<HeapEntry<T, K>>,
155}
156
157impl<K: Hash + Eq + Clone, V, T: Instant> KeyedHeap<K, V, T> {
158 fn new() -> Self {
159 Self { map: HashMap::new(), heap: BinaryHeap::new() }
160 }
161
162 fn schedule(&mut self, key: K, value: V, at: T) -> (Option<(T, V)>, bool) {
168 let Self { map, heap } = self;
169 let dirty = heap
174 .peek()
175 .map(|HeapEntry { time, key: top_key }| top_key == &key || at < *time)
176 .unwrap_or(true);
177 let (heap_entry, prev) = match map.entry(key) {
178 hash_map::Entry::Occupied(mut o) => {
179 let MapEntry { time, value } = o.insert(MapEntry { time: at, value });
180 let heap_entry = (at < time).then(|| HeapEntry { time: at, key: o.key().clone() });
183 (heap_entry, Some((time, value)))
184 }
185 hash_map::Entry::Vacant(v) => {
186 let heap_entry = Some(HeapEntry { time: at, key: v.key().clone() });
187 let _: &mut MapEntry<_, _> = v.insert(MapEntry { time: at, value });
188 (heap_entry, None)
189 }
190 };
191 if let Some(heap_entry) = heap_entry {
192 heap.push(heap_entry);
193 }
194 (prev, dirty)
195 }
196
197 fn cancel(&mut self, key: &K) -> (Option<(T, V)>, bool) {
203 let Self { heap, map } = self;
204 let was_front = heap.peek().is_some_and(|HeapEntry { time: _, key: top }| key == top);
206 let prev = map.remove(key).map(|MapEntry { time, value }| (time, value));
207 (prev, was_front)
208 }
209
210 fn pop_if<F: FnOnce(T) -> bool>(&mut self, f: F) -> (Option<(K, V)>, bool) {
220 let mut changed_heap = false;
221 let popped = loop {
222 let Self { heap, map } = self;
223 let Some(peek_mut) = heap.peek_mut() else {
224 break None;
225 };
226 let HeapEntry { time: heap_time, key } = &*peek_mut;
227 match map.entry(key.clone()) {
233 hash_map::Entry::Vacant(_) => {
234 let _: HeapEntry<_, _> = binary_heap::PeekMut::pop(peek_mut);
236 changed_heap = true;
237 }
238 hash_map::Entry::Occupied(map_entry) => {
239 let MapEntry { time: scheduled_for, value: _ } = map_entry.get();
240
241 match heap_time.cmp(scheduled_for) {
242 core::cmp::Ordering::Equal => {
243 break f(*scheduled_for).then(|| {
246 let HeapEntry { time: _, key } =
247 binary_heap::PeekMut::pop(peek_mut);
248 changed_heap = true;
249 let MapEntry { time: _, value } = map_entry.remove();
250 (key, value)
251 });
252 }
253 core::cmp::Ordering::Less => {
254 let HeapEntry { time: _, key } = binary_heap::PeekMut::pop(peek_mut);
259 heap.push(HeapEntry { time: *scheduled_for, key });
260 changed_heap = true;
261 }
262 core::cmp::Ordering::Greater => {
263 unreachable!(
271 "observed heap time: {:?} later than the scheduled time {:?}",
272 heap_time, scheduled_for
273 );
274 }
275 }
276 }
277 }
278 };
279 (popped, changed_heap)
280 }
281
282 fn clear(&mut self) {
283 let Self { map, heap } = self;
284 map.clear();
285 heap.clear();
286 }
287}
288
289#[derive(Debug, Eq, PartialEq)]
291struct MapEntry<T, V> {
292 time: T,
293 value: V,
294}
295
296#[derive(Debug)]
300struct HeapEntry<T, K> {
301 time: T,
302 key: K,
303}
304
305impl<T: Instant, K> PartialEq for HeapEntry<T, K> {
307 fn eq(&self, other: &Self) -> bool {
308 self.time == other.time
309 }
310}
311
312impl<T: Instant, K> Eq for HeapEntry<T, K> {}
313
314impl<T: Instant, K> Ord for HeapEntry<T, K> {
315 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
316 Ord::cmp(&other.time, &self.time)
319 }
320}
321
322impl<T: Instant, K> PartialOrd for HeapEntry<T, K> {
323 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
324 Some(Ord::cmp(self, other))
325 }
326}
327
328#[cfg(any(test, feature = "testutils"))]
329mod testutil {
330 use core::fmt::Debug;
331 use core::ops::RangeBounds;
332
333 use super::*;
334
335 impl<K, V, BC> LocalTimerHeap<K, V, BC>
336 where
337 K: Hash + Eq + Clone + Debug,
338 V: Debug + Eq + PartialEq,
339 BC: TimerContext,
340 {
341 #[track_caller]
344 pub fn assert_timers(&self, timers: impl IntoIterator<Item = (K, V, BC::Instant)>) {
345 let map = timers
346 .into_iter()
347 .map(|(k, value, time)| (k, MapEntry { value, time }))
348 .collect::<HashMap<_, _>>();
349 assert_eq!(&self.heap.map, &map);
350 }
351
352 #[track_caller]
355 pub fn assert_timers_after(
356 &self,
357 bindings_ctx: &mut BC,
358 timers: impl IntoIterator<Item = (K, V, Duration)>,
359 ) {
360 let now = bindings_ctx.now();
361 self.assert_timers(timers.into_iter().map(|(k, v, d)| (k, v, now.panicking_add(d))))
362 }
363
364 #[track_caller]
366 pub fn assert_top(&mut self, key: &K, value: &V) {
367 let top = self
371 .heap
372 .map
373 .iter()
374 .min_by_key(|(_key, MapEntry { time, .. })| time)
375 .map(|(key, MapEntry { time: _, value })| (key, value));
376 assert_eq!(top, Some((key, value)));
377 }
378
379 #[track_caller]
382 pub fn assert_range<
383 'a,
384 R: RangeBounds<BC::Instant> + Debug,
385 I: IntoIterator<Item = (&'a K, R)>,
386 >(
387 &'a self,
388 expect: I,
389 ) {
390 for (timer, range) in expect {
391 let time = self
392 .get(timer)
393 .map(|(t, _)| t)
394 .unwrap_or_else(|| panic!("timer {timer:?} not present"));
395 assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
396 }
397 }
398
399 #[track_caller]
402 pub fn assert_range_single<'a, R: RangeBounds<BC::Instant> + Debug>(
403 &'a self,
404 timer: &K,
405 range: R,
406 ) -> (BC::Instant, &V) {
407 let (time, value) =
408 self.get(timer).unwrap_or_else(|| panic!("timer {timer:?} not present"));
409 assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
410 (time, value)
411 }
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use alloc::vec::Vec;
418 use core::convert::Infallible as Never;
419
420 use crate::testutil::{FakeAtomicInstant, FakeInstant, FakeInstantCtx};
421 use crate::InstantContext;
422
423 use super::*;
424
425 #[derive(Default)]
426 struct FakeTimerCtx {
427 instant: FakeInstantCtx,
428 }
429
430 impl InstantBindingsTypes for FakeTimerCtx {
431 type Instant = FakeInstant;
432 type AtomicInstant = FakeAtomicInstant;
433 }
434
435 impl InstantContext for FakeTimerCtx {
436 fn now(&self) -> Self::Instant {
437 self.instant.now()
438 }
439 }
440
441 impl TimerBindingsTypes for FakeTimerCtx {
442 type Timer = FakeTimer;
443 type DispatchId = ();
444 type UniqueTimerId = Never;
445 }
446
447 impl TimerContext for FakeTimerCtx {
448 fn new_timer(&mut self, (): Self::DispatchId) -> Self::Timer {
449 FakeTimer::default()
450 }
451
452 fn schedule_timer_instant(
453 &mut self,
454 time: Self::Instant,
455 timer: &mut Self::Timer,
456 ) -> Option<Self::Instant> {
457 timer.scheduled.replace(time)
458 }
459
460 fn cancel_timer(&mut self, timer: &mut Self::Timer) -> Option<Self::Instant> {
461 timer.scheduled.take()
462 }
463
464 fn scheduled_instant(&self, timer: &mut Self::Timer) -> Option<Self::Instant> {
465 timer.scheduled.clone()
466 }
467
468 fn unique_timer_id(&self, _: &Self::Timer) -> Self::UniqueTimerId {
469 unimplemented!()
470 }
471 }
472
473 #[derive(Default, Debug)]
474 struct FakeTimer {
475 scheduled: Option<FakeInstant>,
476 }
477
478 #[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Copy, Clone, Hash)]
479 struct TimerId(usize);
480
481 type LocalTimerHeap = super::LocalTimerHeap<TimerId, (), FakeTimerCtx>;
482
483 impl LocalTimerHeap {
484 #[track_caller]
485 fn assert_heap_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
486 let mut want = i.into_iter().collect::<Vec<_>>();
487 want.sort();
488 let mut got = self
489 .heap
490 .heap
491 .iter()
492 .map(|HeapEntry { time, key }| (*time, *key))
493 .collect::<Vec<_>>();
494 got.sort();
495 assert_eq!(got, want);
496 }
497
498 #[track_caller]
499 fn assert_map_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
500 let want = i.into_iter().map(|(t, k)| (k, t)).collect::<HashMap<_, _>>();
501 let got = self
502 .heap
503 .map
504 .iter()
505 .map(|(k, MapEntry { time, value: () })| (*k, *time))
506 .collect::<HashMap<_, _>>();
507 assert_eq!(got, want);
508 }
509 }
510
511 const TIMER1: TimerId = TimerId(1);
512 const TIMER2: TimerId = TimerId(2);
513 const TIMER3: TimerId = TimerId(3);
514
515 const T1: FakeInstant = FakeInstant { offset: Duration::from_secs(1) };
516 const T2: FakeInstant = FakeInstant { offset: Duration::from_secs(2) };
517 const T3: FakeInstant = FakeInstant { offset: Duration::from_secs(3) };
518 const T4: FakeInstant = FakeInstant { offset: Duration::from_secs(4) };
519
520 #[test]
521 fn schedule_instant() {
522 let mut ctx = FakeTimerCtx::default();
523 let mut heap = LocalTimerHeap::new(&mut ctx, ());
524 assert_eq!(heap.next_wakeup.scheduled, None);
525 heap.assert_heap_entries([]);
526
527 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
528 heap.assert_heap_entries([(T2, TIMER2)]);
529 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
530
531 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
532 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2)]);
533 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
534
535 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
536 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
537 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
538 }
539
540 #[test]
541 fn schedule_after() {
542 let mut ctx = FakeTimerCtx::default();
543 let mut heap = LocalTimerHeap::new(&mut ctx, ());
544 assert_eq!(heap.next_wakeup.scheduled, None);
545 let long_duration = Duration::from_secs(5);
546 let short_duration = Duration::from_secs(1);
547
548 let long_instant = ctx.now().checked_add(long_duration).unwrap();
549 let short_instant = ctx.now().checked_add(short_duration).unwrap();
550
551 assert_eq!(heap.schedule_after(&mut ctx, TIMER1, (), long_duration), None);
552 assert_eq!(heap.next_wakeup.scheduled, Some(long_instant));
553 heap.assert_heap_entries([(long_instant, TIMER1)]);
554 heap.assert_map_entries([(long_instant, TIMER1)]);
555
556 assert_eq!(
557 heap.schedule_after(&mut ctx, TIMER1, (), short_duration),
558 Some((long_instant, ()))
559 );
560 assert_eq!(heap.next_wakeup.scheduled, Some(short_instant));
561 heap.assert_heap_entries([(short_instant, TIMER1), (long_instant, TIMER1)]);
562 heap.assert_map_entries([(short_instant, TIMER1)]);
563 }
564
565 #[test]
566 fn cancel() {
567 let mut ctx = FakeTimerCtx::default();
568 let mut heap = LocalTimerHeap::new(&mut ctx, ());
569 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
570 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
571 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
572 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
573 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
574
575 assert_eq!(heap.cancel(&mut ctx, &TIMER1), Some((T1, ())));
576 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
577 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
578 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
579
580 assert_eq!(heap.cancel(&mut ctx, &TIMER1), None);
581
582 assert_eq!(heap.cancel(&mut ctx, &TIMER3), Some((T3, ())));
583 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
585 heap.assert_map_entries([(T2, TIMER2)]);
586 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
587
588 assert_eq!(heap.cancel(&mut ctx, &TIMER2), Some((T2, ())));
589 heap.assert_heap_entries([]);
590 heap.assert_map_entries([]);
591 assert_eq!(heap.next_wakeup.scheduled, None);
592 }
593
594 #[test]
595 fn pop() {
596 let mut ctx = FakeTimerCtx::default();
597 let mut heap = LocalTimerHeap::new(&mut ctx, ());
598 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
599 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
600 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
601 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
602 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
603 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
604
605 assert_eq!(heap.pop(&mut ctx), None);
606 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
607 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
608 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
609
610 ctx.instant.time = T1;
611 assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
612 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
613 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
614 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
615 assert_eq!(heap.pop(&mut ctx), None);
616 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
617
618 ctx.instant.time = T3;
619 assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
620 heap.assert_heap_entries([(T3, TIMER3)]);
621 heap.assert_map_entries([(T3, TIMER3)]);
622
623 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
624 assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
625 heap.assert_heap_entries([]);
626 heap.assert_map_entries([]);
627 assert_eq!(heap.next_wakeup.scheduled, None);
628
629 assert_eq!(heap.pop(&mut ctx), None);
630 }
631
632 #[test]
633 fn reschedule() {
634 let mut ctx = FakeTimerCtx::default();
635 let mut heap = LocalTimerHeap::new(&mut ctx, ());
636 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
637 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
638 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
639 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
640 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
641 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
642
643 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T4), Some((T2, ())));
644 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
645 heap.assert_map_entries([(T1, TIMER1), (T4, TIMER2), (T3, TIMER3)]);
646
647 ctx.instant.time = T4;
648 assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
650 heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
651 heap.assert_map_entries([(T4, TIMER2), (T3, TIMER3)]);
652 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
653
654 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), Some((T4, ())));
655 heap.assert_heap_entries([(T2, TIMER2), (T4, TIMER2), (T3, TIMER3)]);
656 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
657 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
658
659 assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
660 heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
662 heap.assert_map_entries([(T3, TIMER3)]);
663 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
664
665 assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
666 heap.assert_heap_entries([]);
667 heap.assert_map_entries([]);
668 assert_eq!(heap.next_wakeup.scheduled, None);
669 assert_eq!(heap.pop(&mut ctx), None);
670 }
671
672 #[test]
675 fn multiple_timers_same_instant() {
676 let mut ctx = FakeTimerCtx::default();
677 let mut heap = LocalTimerHeap::new(&mut ctx, ());
678 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
679 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
680 assert_eq!(heap.next_wakeup.scheduled.take(), Some(T1));
681
682 ctx.instant.time = T1;
683
684 assert!(heap.pop(&mut ctx).is_some());
686 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
687 assert!(heap.pop(&mut ctx).is_some());
688 assert_eq!(heap.next_wakeup.scheduled, None);
689 assert_eq!(heap.pop(&mut ctx), None);
690 }
691
692 #[test]
693 fn clear() {
694 let mut ctx = FakeTimerCtx::default();
695 let mut heap = LocalTimerHeap::new(&mut ctx, ());
696 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
697 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
698 heap.clear(&mut ctx);
699 heap.assert_map_entries([]);
700 assert_eq!(heap.next_wakeup.scheduled, None);
701 }
702}