wlan_common/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::channel::mpsc;
6use futures::{FutureExt, Stream, StreamExt};
7use std::sync::{atomic, Arc};
8use {fuchsia_async as fasync, zx};
9
10use crate::sink::UnboundedSink;
11
12pub type ScheduledEvent<E> = (zx::MonotonicInstant, Event<E>, EventHandle);
13pub type EventSender<E> = UnboundedSink<ScheduledEvent<E>>;
14pub type EventStream<E> = mpsc::UnboundedReceiver<ScheduledEvent<E>>;
15pub type EventId = u64;
16
17// The returned timer will send scheduled timeouts to the returned EventStream.
18// Note that this will not actually have any timed behavior unless events are pulled off
19// the EventStream and handled asynchronously.
20pub fn create_timer<E>() -> (Timer<E>, EventStream<E>) {
21    let (timer_sink, time_stream) = mpsc::unbounded();
22    (Timer::new(UnboundedSink::new(timer_sink)), time_stream)
23}
24
25pub fn make_async_timed_event_stream<E>(
26    time_stream: impl Stream<Item = ScheduledEvent<E>>,
27) -> impl Stream<Item = Event<E>> {
28    // Timer firings are not correctly ordered if we
29    // filter_map before buffered_unordered.
30    Box::pin(
31        time_stream
32            .map(|(deadline, timed_event, handle)| {
33                fasync::Timer::new(fasync::MonotonicInstant::from_zx(deadline))
34                    .map(|_| (timed_event, handle))
35            })
36            .buffer_unordered(usize::max_value())
37            .filter_map(|(timed_event, handle)| async move {
38                if handle.is_active() {
39                    Some(timed_event)
40                } else {
41                    None
42                }
43            }),
44    )
45}
46
47#[derive(Debug)]
48pub struct Event<E> {
49    pub id: EventId,
50    pub event: E,
51}
52
53impl<E: Clone> Clone for Event<E> {
54    fn clone(&self) -> Self {
55        Event { id: self.id, event: self.event.clone() }
56    }
57}
58
59#[derive(Debug)]
60pub struct Timer<E> {
61    sender: EventSender<E>,
62    next_id: EventId,
63}
64
65impl<E> Timer<E> {
66    pub fn new(sender: EventSender<E>) -> Self {
67        Timer { sender, next_id: 0 }
68    }
69
70    /// Returns the current time according to the global executor.
71    ///
72    /// # Panics
73    ///
74    /// This function will panic if it's called when no executor is set up.
75    pub fn now(&self) -> zx::MonotonicInstant {
76        // We use fasync to support time manipulation in tests.
77        fasync::MonotonicInstant::now().into_zx()
78    }
79
80    pub fn schedule_at(&mut self, deadline: zx::MonotonicInstant, event: E) -> EventHandle {
81        let id = self.next_id;
82        let timer_handle = EventHandle::new(id);
83        let inner_handle = EventHandle {
84            active: Arc::clone(&timer_handle.active),
85            event_id: id,
86            // This field is only used in the timer handle returned by this fn, so the value
87            // here does not matter.
88            cancel_on_drop: true,
89        };
90        self.sender.send((deadline, Event { id, event }, inner_handle));
91        self.next_id += 1;
92        timer_handle
93    }
94
95    pub fn schedule_after(&mut self, duration: zx::MonotonicDuration, event: E) -> EventHandle {
96        self.schedule_at(fasync::MonotonicInstant::after(duration).into_zx(), event)
97    }
98
99    pub fn schedule<EV>(&mut self, event: EV) -> EventHandle
100    where
101        EV: TimeoutDuration + Into<E>,
102    {
103        self.schedule_after(event.timeout_duration(), event.into())
104    }
105}
106
107pub trait TimeoutDuration {
108    fn timeout_duration(&self) -> zx::MonotonicDuration;
109}
110
111/// An EventHandle is used to manage a single scheduled timer. If a handle is
112/// dropped, the corresponding timeout will not fire. This behavior may be
113/// bypassed via `EventHandle::drop_without_cancel`.
114#[derive(Debug)]
115pub struct EventHandle {
116    active: Arc<atomic::AtomicBool>,
117    event_id: EventId,
118    cancel_on_drop: bool,
119}
120
121impl EventHandle {
122    fn new(event_id: EventId) -> Self {
123        Self { active: Arc::new(atomic::AtomicBool::new(true)), event_id, cancel_on_drop: true }
124    }
125
126    /// Helper fn to construct an EventHandle with a specific event ID.
127    /// For tests only.
128    pub fn new_test(event_id: EventId) -> Self {
129        Self::new(event_id)
130    }
131
132    /// Returns true if the event is still scheduled to fire.
133    fn is_active(&self) -> bool {
134        self.active.load(atomic::Ordering::Acquire)
135    }
136
137    /// The unique ID assigned to this event.
138    pub fn id(&self) -> EventId {
139        self.event_id
140    }
141
142    /// Drop this event handle, but still fire the underlying timer when expired.
143    /// If we will never cancel a scheduled timer, this fn can be used to avoid
144    /// unnecessary bookkeeping.
145    pub fn drop_without_cancel(mut self) {
146        self.cancel_on_drop = false;
147    }
148}
149
150impl std::ops::Drop for EventHandle {
151    fn drop(&mut self) {
152        if self.cancel_on_drop {
153            self.active.store(false, atomic::Ordering::Release);
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::assert_variant;
162    use fuchsia_async as fasync;
163
164    use futures::channel::mpsc::UnboundedSender;
165    use std::pin::pin;
166    use std::task::Poll;
167
168    type TestEvent = u32;
169    impl TimeoutDuration for TestEvent {
170        fn timeout_duration(&self) -> zx::MonotonicDuration {
171            zx::MonotonicDuration::from_seconds(10)
172        }
173    }
174
175    #[test]
176    fn test_timer_schedule_at() {
177        let _exec = fasync::TestExecutor::new();
178        let (mut timer, mut time_stream) = create_timer::<TestEvent>();
179        let timeout1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5));
180        let timeout2 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10));
181        let event_handle1 = timer.schedule_at(timeout1, 7);
182        let event_handle2 = timer.schedule_at(timeout2, 9);
183        assert_eq!(event_handle1.id(), 0);
184        assert_eq!(event_handle2.id(), 1);
185
186        let (t1, event1, _) = time_stream.try_next().unwrap().expect("expect time entry");
187        assert_eq!(t1, timeout1);
188        assert_eq!(event1.id, 0);
189        assert_eq!(event1.event, 7);
190
191        let (t2, event2, _) = time_stream.try_next().unwrap().expect("expect time entry");
192        assert_eq!(t2, timeout2);
193        assert_eq!(event2.id, 1);
194        assert_eq!(event2.event, 9);
195
196        assert_variant!(time_stream.try_next(), Err(e) => {
197            assert_eq!(e.to_string(), "receiver channel is empty")
198        });
199    }
200
201    #[test]
202    fn test_timer_schedule_after() {
203        let _exec = fasync::TestExecutor::new();
204        let (mut timer, mut time_stream) = create_timer::<TestEvent>();
205        let timeout1 = zx::MonotonicDuration::from_seconds(1000);
206        let timeout2 = zx::MonotonicDuration::from_seconds(5);
207        let event_handle1 = timer.schedule_after(timeout1, 7);
208        let event_handle2 = timer.schedule_after(timeout2, 9);
209        assert_eq!(event_handle1.id(), 0);
210        assert_eq!(event_handle2.id(), 1);
211
212        let (t1, event1, _) = time_stream.try_next().unwrap().expect("expect time entry");
213        assert_eq!(event1.id, 0);
214        assert_eq!(event1.event, 7);
215
216        let (t2, event2, _) = time_stream.try_next().unwrap().expect("expect time entry");
217        assert_eq!(event2.id, 1);
218        assert_eq!(event2.event, 9);
219
220        // Confirm that the ordering of timeouts is expected. We can't check the actual
221        // values since they're dependent on the system clock.
222        assert!(t1.into_nanos() > t2.into_nanos());
223
224        assert_variant!(time_stream.try_next(), Err(e) => {
225            assert_eq!(e.to_string(), "receiver channel is empty")
226        });
227    }
228
229    #[test]
230    fn test_timer_schedule() {
231        let _exec = fasync::TestExecutor::new();
232        let (mut timer, mut time_stream) = create_timer::<TestEvent>();
233        let start = zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(0));
234
235        let event_handle = timer.schedule(5u32);
236        assert_eq!(event_handle.id(), 0);
237
238        let (t, event, _) = time_stream.try_next().unwrap().expect("expect time entry");
239        assert_eq!(event.id, 0);
240        assert_eq!(event.event, 5);
241        assert!(start + zx::MonotonicDuration::from_seconds(10) <= t);
242    }
243
244    #[test]
245    fn test_timer_stream() {
246        let mut exec = fasync::TestExecutor::new_with_fake_time();
247        let fut = async {
248            let (timer, time_stream) = mpsc::unbounded::<ScheduledEvent<TestEvent>>();
249            let mut timeout_stream = make_async_timed_event_stream(time_stream);
250            let now = zx::MonotonicInstant::get();
251            let _handle1 = schedule(&timer, now + zx::MonotonicDuration::from_millis(40), 0);
252            let _handle2 = schedule(&timer, now + zx::MonotonicDuration::from_millis(10), 1);
253            let _handle3 = schedule(&timer, now + zx::MonotonicDuration::from_millis(20), 2);
254            let _handle4 = schedule(&timer, now + zx::MonotonicDuration::from_millis(30), 3);
255
256            let mut events = vec![];
257            for _ in 0u32..4 {
258                let event = timeout_stream.next().await.expect("timer terminated prematurely");
259                events.push(event.event);
260            }
261            events
262        };
263        let mut fut = pin!(fut);
264        for _ in 0u32..4 {
265            assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
266            assert!(exec.wake_next_timer().is_some());
267        }
268        assert_variant!(
269            exec.run_until_stalled(&mut fut),
270            Poll::Ready(events) => assert_eq!(events, vec![1, 2, 3, 0]),
271        );
272    }
273
274    #[test]
275    fn test_timer_stream_cancel() {
276        let mut exec = fasync::TestExecutor::new_with_fake_time();
277        let (mut timer, time_stream) = create_timer::<TestEvent>();
278        let mut timeout_stream = make_async_timed_event_stream(time_stream);
279
280        let deadline = zx::MonotonicInstant::after(zx::Duration::from_seconds(5));
281
282        {
283            // Schedule an event and then drop the handle.
284            let _event_handle = timer.schedule_at(deadline, 0);
285        }
286
287        exec.set_fake_time(deadline.into());
288        let mut next = timeout_stream.next();
289        assert_variant!(exec.run_until_stalled(&mut next), Poll::Pending);
290    }
291
292    #[test]
293    fn test_timer_stream_drop_without_cancel() {
294        let mut exec = fasync::TestExecutor::new_with_fake_time();
295        let (mut timer, time_stream) = create_timer::<TestEvent>();
296        let mut timeout_stream = make_async_timed_event_stream(time_stream);
297
298        let deadline = zx::MonotonicInstant::after(zx::Duration::from_seconds(5));
299
300        {
301            // Schedule an event and then drop the handle.
302            timer.schedule_at(deadline, 7357).drop_without_cancel();
303        }
304
305        exec.set_fake_time(deadline.into());
306        let mut next = timeout_stream.next();
307        // The event still appears.
308        let event =
309            assert_variant!(exec.run_until_stalled(&mut next), Poll::Ready(Some(event)) => event);
310        assert_eq!(event.event, 7357);
311    }
312
313    fn schedule(
314        timer: &UnboundedSender<ScheduledEvent<TestEvent>>,
315        deadline: zx::MonotonicInstant,
316        event: TestEvent,
317    ) -> EventHandle {
318        let id = 0;
319        let handle = EventHandle::new(id);
320        let inner_handle =
321            EventHandle { active: Arc::clone(&handle.active), event_id: id, cancel_on_drop: true };
322        let entry = (deadline, Event { id, event }, inner_handle);
323        timer.unbounded_send(entry).expect("expect send successful");
324        handle
325    }
326}