1use futures::channel::mpsc;
6use futures::{FutureExt, Stream, StreamExt};
7use std::sync::{atomic, Arc};
8use {fuchsia_async as fasync, zx};
9
10use crate::sink::UnboundedSink;
11
12pub type ScheduledEvent<E> = (zx::MonotonicInstant, Event<E>, EventHandle);
13pub type EventSender<E> = UnboundedSink<ScheduledEvent<E>>;
14pub type EventStream<E> = mpsc::UnboundedReceiver<ScheduledEvent<E>>;
15pub type EventId = u64;
16
17pub fn create_timer<E>() -> (Timer<E>, EventStream<E>) {
21 let (timer_sink, time_stream) = mpsc::unbounded();
22 (Timer::new(UnboundedSink::new(timer_sink)), time_stream)
23}
24
25pub fn make_async_timed_event_stream<E>(
26 time_stream: impl Stream<Item = ScheduledEvent<E>>,
27) -> impl Stream<Item = Event<E>> {
28 Box::pin(
31 time_stream
32 .map(|(deadline, timed_event, handle)| {
33 fasync::Timer::new(fasync::MonotonicInstant::from_zx(deadline))
34 .map(|_| (timed_event, handle))
35 })
36 .buffer_unordered(usize::max_value())
37 .filter_map(|(timed_event, handle)| async move {
38 if handle.is_active() {
39 Some(timed_event)
40 } else {
41 None
42 }
43 }),
44 )
45}
46
47#[derive(Debug)]
48pub struct Event<E> {
49 pub id: EventId,
50 pub event: E,
51}
52
53impl<E: Clone> Clone for Event<E> {
54 fn clone(&self) -> Self {
55 Event { id: self.id, event: self.event.clone() }
56 }
57}
58
59#[derive(Debug)]
60pub struct Timer<E> {
61 sender: EventSender<E>,
62 next_id: EventId,
63}
64
65impl<E> Timer<E> {
66 pub fn new(sender: EventSender<E>) -> Self {
67 Timer { sender, next_id: 0 }
68 }
69
70 pub fn now(&self) -> zx::MonotonicInstant {
76 fasync::MonotonicInstant::now().into_zx()
78 }
79
80 pub fn schedule_at(&mut self, deadline: zx::MonotonicInstant, event: E) -> EventHandle {
81 let id = self.next_id;
82 let timer_handle = EventHandle::new(id);
83 let inner_handle = EventHandle {
84 active: Arc::clone(&timer_handle.active),
85 event_id: id,
86 cancel_on_drop: true,
89 };
90 self.sender.send((deadline, Event { id, event }, inner_handle));
91 self.next_id += 1;
92 timer_handle
93 }
94
95 pub fn schedule_after(&mut self, duration: zx::MonotonicDuration, event: E) -> EventHandle {
96 self.schedule_at(fasync::MonotonicInstant::after(duration).into_zx(), event)
97 }
98
99 pub fn schedule<EV>(&mut self, event: EV) -> EventHandle
100 where
101 EV: TimeoutDuration + Into<E>,
102 {
103 self.schedule_after(event.timeout_duration(), event.into())
104 }
105}
106
107pub trait TimeoutDuration {
108 fn timeout_duration(&self) -> zx::MonotonicDuration;
109}
110
111#[derive(Debug)]
115pub struct EventHandle {
116 active: Arc<atomic::AtomicBool>,
117 event_id: EventId,
118 cancel_on_drop: bool,
119}
120
121impl EventHandle {
122 fn new(event_id: EventId) -> Self {
123 Self { active: Arc::new(atomic::AtomicBool::new(true)), event_id, cancel_on_drop: true }
124 }
125
126 pub fn new_test(event_id: EventId) -> Self {
129 Self::new(event_id)
130 }
131
132 fn is_active(&self) -> bool {
134 self.active.load(atomic::Ordering::Acquire)
135 }
136
137 pub fn id(&self) -> EventId {
139 self.event_id
140 }
141
142 pub fn drop_without_cancel(mut self) {
146 self.cancel_on_drop = false;
147 }
148}
149
150impl std::ops::Drop for EventHandle {
151 fn drop(&mut self) {
152 if self.cancel_on_drop {
153 self.active.store(false, atomic::Ordering::Release);
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::assert_variant;
162 use fuchsia_async as fasync;
163
164 use futures::channel::mpsc::UnboundedSender;
165 use std::pin::pin;
166 use std::task::Poll;
167
168 type TestEvent = u32;
169 impl TimeoutDuration for TestEvent {
170 fn timeout_duration(&self) -> zx::MonotonicDuration {
171 zx::MonotonicDuration::from_seconds(10)
172 }
173 }
174
175 #[test]
176 fn test_timer_schedule_at() {
177 let _exec = fasync::TestExecutor::new();
178 let (mut timer, mut time_stream) = create_timer::<TestEvent>();
179 let timeout1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5));
180 let timeout2 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10));
181 let event_handle1 = timer.schedule_at(timeout1, 7);
182 let event_handle2 = timer.schedule_at(timeout2, 9);
183 assert_eq!(event_handle1.id(), 0);
184 assert_eq!(event_handle2.id(), 1);
185
186 let (t1, event1, _) = time_stream.try_next().unwrap().expect("expect time entry");
187 assert_eq!(t1, timeout1);
188 assert_eq!(event1.id, 0);
189 assert_eq!(event1.event, 7);
190
191 let (t2, event2, _) = time_stream.try_next().unwrap().expect("expect time entry");
192 assert_eq!(t2, timeout2);
193 assert_eq!(event2.id, 1);
194 assert_eq!(event2.event, 9);
195
196 assert_variant!(time_stream.try_next(), Err(e) => {
197 assert_eq!(e.to_string(), "receiver channel is empty")
198 });
199 }
200
201 #[test]
202 fn test_timer_schedule_after() {
203 let _exec = fasync::TestExecutor::new();
204 let (mut timer, mut time_stream) = create_timer::<TestEvent>();
205 let timeout1 = zx::MonotonicDuration::from_seconds(1000);
206 let timeout2 = zx::MonotonicDuration::from_seconds(5);
207 let event_handle1 = timer.schedule_after(timeout1, 7);
208 let event_handle2 = timer.schedule_after(timeout2, 9);
209 assert_eq!(event_handle1.id(), 0);
210 assert_eq!(event_handle2.id(), 1);
211
212 let (t1, event1, _) = time_stream.try_next().unwrap().expect("expect time entry");
213 assert_eq!(event1.id, 0);
214 assert_eq!(event1.event, 7);
215
216 let (t2, event2, _) = time_stream.try_next().unwrap().expect("expect time entry");
217 assert_eq!(event2.id, 1);
218 assert_eq!(event2.event, 9);
219
220 assert!(t1.into_nanos() > t2.into_nanos());
223
224 assert_variant!(time_stream.try_next(), Err(e) => {
225 assert_eq!(e.to_string(), "receiver channel is empty")
226 });
227 }
228
229 #[test]
230 fn test_timer_schedule() {
231 let _exec = fasync::TestExecutor::new();
232 let (mut timer, mut time_stream) = create_timer::<TestEvent>();
233 let start = zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(0));
234
235 let event_handle = timer.schedule(5u32);
236 assert_eq!(event_handle.id(), 0);
237
238 let (t, event, _) = time_stream.try_next().unwrap().expect("expect time entry");
239 assert_eq!(event.id, 0);
240 assert_eq!(event.event, 5);
241 assert!(start + zx::MonotonicDuration::from_seconds(10) <= t);
242 }
243
244 #[test]
245 fn test_timer_stream() {
246 let mut exec = fasync::TestExecutor::new_with_fake_time();
247 let fut = async {
248 let (timer, time_stream) = mpsc::unbounded::<ScheduledEvent<TestEvent>>();
249 let mut timeout_stream = make_async_timed_event_stream(time_stream);
250 let now = zx::MonotonicInstant::get();
251 let _handle1 = schedule(&timer, now + zx::MonotonicDuration::from_millis(40), 0);
252 let _handle2 = schedule(&timer, now + zx::MonotonicDuration::from_millis(10), 1);
253 let _handle3 = schedule(&timer, now + zx::MonotonicDuration::from_millis(20), 2);
254 let _handle4 = schedule(&timer, now + zx::MonotonicDuration::from_millis(30), 3);
255
256 let mut events = vec![];
257 for _ in 0u32..4 {
258 let event = timeout_stream.next().await.expect("timer terminated prematurely");
259 events.push(event.event);
260 }
261 events
262 };
263 let mut fut = pin!(fut);
264 for _ in 0u32..4 {
265 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
266 assert!(exec.wake_next_timer().is_some());
267 }
268 assert_variant!(
269 exec.run_until_stalled(&mut fut),
270 Poll::Ready(events) => assert_eq!(events, vec![1, 2, 3, 0]),
271 );
272 }
273
274 #[test]
275 fn test_timer_stream_cancel() {
276 let mut exec = fasync::TestExecutor::new_with_fake_time();
277 let (mut timer, time_stream) = create_timer::<TestEvent>();
278 let mut timeout_stream = make_async_timed_event_stream(time_stream);
279
280 let deadline = zx::MonotonicInstant::after(zx::Duration::from_seconds(5));
281
282 {
283 let _event_handle = timer.schedule_at(deadline, 0);
285 }
286
287 exec.set_fake_time(deadline.into());
288 let mut next = timeout_stream.next();
289 assert_variant!(exec.run_until_stalled(&mut next), Poll::Pending);
290 }
291
292 #[test]
293 fn test_timer_stream_drop_without_cancel() {
294 let mut exec = fasync::TestExecutor::new_with_fake_time();
295 let (mut timer, time_stream) = create_timer::<TestEvent>();
296 let mut timeout_stream = make_async_timed_event_stream(time_stream);
297
298 let deadline = zx::MonotonicInstant::after(zx::Duration::from_seconds(5));
299
300 {
301 timer.schedule_at(deadline, 7357).drop_without_cancel();
303 }
304
305 exec.set_fake_time(deadline.into());
306 let mut next = timeout_stream.next();
307 let event =
309 assert_variant!(exec.run_until_stalled(&mut next), Poll::Ready(Some(event)) => event);
310 assert_eq!(event.event, 7357);
311 }
312
313 fn schedule(
314 timer: &UnboundedSender<ScheduledEvent<TestEvent>>,
315 deadline: zx::MonotonicInstant,
316 event: TestEvent,
317 ) -> EventHandle {
318 let id = 0;
319 let handle = EventHandle::new(id);
320 let inner_handle =
321 EventHandle { active: Arc::clone(&handle.active), event_id: id, cancel_on_drop: true };
322 let entry = (deadline, Event { id, event }, inner_handle);
323 timer.unbounded_send(entry).expect("expect send successful");
324 handle
325 }
326}