1#![deny(missing_docs)]
6#![allow(clippy::let_unit_value)]
7
8use fuchsia_async::TimeoutExt;
52use futures::channel::{mpsc, oneshot};
53use futures::future::select_all;
54use futures::prelude::*;
55use futures::select;
56use std::collections::VecDeque;
57use std::time::Duration;
58use thiserror::Error;
59
60mod barrier;
61use barrier::{Barrier, BarrierBlock};
62
63const DEFAULT_EVENTS_LIMIT: usize = 10;
64
65pub trait Event: Clone {
71 fn can_merge(&self, other: &Self) -> bool;
73}
74
75#[derive(Debug, Error, PartialEq, Eq)]
77#[error("The client is closed and should be removed from the event queue.")]
78pub struct ClosedClient;
79
80#[derive(Debug, Error, PartialEq, Eq)]
82#[error("The event queue future was dropped before calling control handle functions.")]
83pub struct EventQueueDropped;
84
85#[derive(Debug, Error, PartialEq, Eq)]
87#[error("The flush operation timed out.")]
88pub struct TimedOut;
89
90pub trait Notify {
93 type Event: Event;
95
96 type NotifyFuture: Future<Output = Result<(), ClosedClient>> + Send + Unpin;
99
100 fn notify(&self, event: Self::Event) -> Self::NotifyFuture;
104}
105
106#[derive(Debug)]
107enum Command<N>
108where
109 N: Notify,
110{
111 AddClient(N),
112 Clear,
113 QueueEvent(N::Event),
114 TryFlush(BarrierBlock),
115 Ping(oneshot::Sender<()>),
116}
117
118pub struct ControlHandle<N>
120where
121 N: Notify,
122{
123 sender: mpsc::Sender<Command<N>>,
124}
125
126impl<N> Clone for ControlHandle<N>
127where
128 N: Notify,
129{
130 fn clone(&self) -> Self {
131 ControlHandle { sender: self.sender.clone() }
132 }
133}
134
135impl<N> std::fmt::Debug for ControlHandle<N>
136where
137 N: Notify,
138{
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct("ControlHandle").finish()
141 }
142}
143
144impl<N> ControlHandle<N>
145where
146 N: Notify,
147{
148 fn new(sender: mpsc::Sender<Command<N>>) -> Self {
149 ControlHandle { sender }
150 }
151
152 pub async fn add_client(&mut self, notifier: N) -> Result<(), EventQueueDropped> {
155 self.sender.send(Command::AddClient(notifier)).await.map_err(|_| EventQueueDropped)
156 }
157
158 pub async fn clear(&mut self) -> Result<(), EventQueueDropped> {
161 self.sender.send(Command::Clear).await.map_err(|_| EventQueueDropped)
162 }
163
164 pub async fn queue_event(&mut self, event: N::Event) -> Result<(), EventQueueDropped> {
166 self.sender.send(Command::QueueEvent(event)).await.map_err(|_| EventQueueDropped)
167 }
168
169 pub async fn try_flush(
172 &mut self,
173 timeout: Duration,
174 ) -> Result<impl Future<Output = Result<(), TimedOut>>, EventQueueDropped> {
175 let (barrier, block) = Barrier::new();
176 let () = self.sender.send(Command::TryFlush(block)).await.map_err(|_| EventQueueDropped)?;
177
178 Ok(barrier.map(Ok).on_timeout(timeout, || Err(TimedOut)))
179 }
180
181 pub async fn ping(&mut self) -> Result<(), EventQueueDropped> {
184 let (sender, receiver) = oneshot::channel();
185 self.sender.send(Command::Ping(sender)).await.map_err(|_| EventQueueDropped)?;
186 let _ = receiver.await;
187 Ok(())
188 }
189}
190
191pub struct EventQueue<N>
194where
195 N: Notify,
196{
197 clients: Vec<Client<N>>,
198 receiver: mpsc::Receiver<Command<N>>,
199 events_limit: usize,
200 prior_events: Vec<N::Event>,
201}
202
203impl<N> EventQueue<N>
204where
205 N: Notify,
206{
207 #[allow(clippy::new_ret_no_self)]
210 pub fn new() -> (impl Future<Output = ()>, ControlHandle<N>) {
211 Self::with_limit(DEFAULT_EVENTS_LIMIT)
212 }
213
214 pub fn with_limit(limit: usize) -> (impl Future<Output = ()>, ControlHandle<N>) {
218 let (sender, receiver) = mpsc::channel(1);
219 let event_queue =
220 EventQueue { clients: Vec::new(), receiver, events_limit: limit, prior_events: vec![] };
221 (event_queue.start(), ControlHandle::new(sender))
222 }
223
224 async fn start(mut self) {
226 loop {
227 let pending = future::pending().right_future();
230 let all_events = self
231 .clients
232 .iter_mut()
233 .filter_map(|c| c.pending_event.as_mut().map(|fut| fut.left_future()))
234 .chain(std::iter::once(pending));
235 let mut select_all_events = select_all(all_events).fuse();
236 select! {
237 (result, index, _) = select_all_events => {
238 let i = self.find_client_index(index);
239 match result {
240 Ok(()) => self.next_event(i),
241 Err(ClosedClient) => {
242 self.clients.swap_remove(i);
243 },
244 }
245 },
246 command = self.receiver.next() => {
247 match command {
248 Some(Command::AddClient(proxy)) => self.add_client(proxy),
249 Some(Command::Clear) => self.clear(),
250 Some(Command::QueueEvent(event)) => self.queue_event(event),
251 Some(Command::TryFlush(block)) => self.try_flush(block),
252 Some(Command::Ping(pong)) => { let _ = pong.send(()); }
253 None => break,
254 }
255 },
256 }
257 }
258 }
259
260 fn add_client(&mut self, notifier: N) {
261 let mut client = Client::new(notifier);
262 for event in &self.prior_events {
263 client.queue_event(event.clone(), self.events_limit);
264 }
265 self.clients.push(client);
266 }
267
268 fn clear(&mut self) {
270 let mut i = 0;
271 while i < self.clients.len() {
272 if self.clients[i].pending_event.is_none() {
273 self.clients.swap_remove(i);
274 } else {
275 self.clients[i].accept_new_events = false;
276 i += 1;
277 }
278 }
279 self.prior_events = vec![];
280 }
281
282 fn queue_event(&mut self, event: N::Event) {
283 let mut i = 0;
284 while i < self.clients.len() {
285 if !self.clients[i].queue_event(event.clone(), self.events_limit) {
286 self.clients.swap_remove(i);
287 } else {
288 i += 1;
289 }
290 }
291
292 if let Some(newest_mergable_event) = self.prior_events.last() {
294 if newest_mergable_event.can_merge(&event) {
295 self.prior_events.pop();
296 }
297 }
298 self.prior_events.push(event);
299 }
300
301 fn try_flush(&mut self, block: BarrierBlock) {
302 for client in self.clients.iter_mut() {
303 client.queue_flush_notify(&block);
304 }
305 }
306
307 fn find_client_index(&self, index: usize) -> usize {
309 let mut j = 0;
310 for i in 0..self.clients.len() {
311 if self.clients[i].pending_event.is_none() {
312 continue;
313 }
314
315 if j == index {
316 return i;
317 }
318
319 j += 1;
320 }
321 panic!("index {index} too large");
322 }
323
324 fn next_event(&mut self, i: usize) {
325 self.clients[i].ack_event();
326 if !self.clients[i].accept_new_events && self.clients[i].pending_event.is_none() {
327 self.clients.swap_remove(i);
328 }
329 }
330}
331
332struct Client<N>
333where
334 N: Notify,
335{
336 notifier: N,
337 pending_event: Option<N::NotifyFuture>,
338 commands: VecDeque<ClientCommand<N::Event>>,
339 accept_new_events: bool,
340}
341
342impl<N> Client<N>
343where
344 N: Notify,
345{
346 fn new(notifier: N) -> Self {
347 Client { notifier, pending_event: None, commands: VecDeque::new(), accept_new_events: true }
348 }
349
350 fn pending_event_count(&self) -> usize {
352 let queued_events = self.commands.iter().filter_map(ClientCommand::event).count();
353 let pending_event = usize::from(self.pending_event.is_some());
354
355 queued_events + pending_event
356 }
357
358 fn newest_queued_event(&mut self) -> Option<&mut N::Event> {
360 self.commands.iter_mut().rev().find_map(ClientCommand::event_mut)
361 }
362
363 fn queue_event(&mut self, event: N::Event, events_limit: usize) -> bool {
364 if !self.accept_new_events {
366 return true;
367 }
368
369 if let Some(newest_mergable_event) =
371 self.newest_queued_event().filter(|last_event| last_event.can_merge(&event))
372 {
373 *newest_mergable_event = event;
374 return true;
375 }
376
377 if self.pending_event_count() + 1 > events_limit {
379 return false;
380 }
381
382 self.queue_command(ClientCommand::SendEvent(event));
384 true
385 }
386
387 fn queue_flush_notify(&mut self, block: &BarrierBlock) {
389 if !self.accept_new_events {
391 return;
392 }
393
394 self.queue_command(ClientCommand::NotifyFlush(block.clone()));
395 }
396
397 fn ack_event(&mut self) {
399 self.pending_event = None;
400 self.process_queue();
401 }
402
403 fn queue_command(&mut self, cmd: ClientCommand<N::Event>) {
405 self.commands.push_back(cmd);
406 if self.pending_event.is_none() {
407 self.process_queue();
408 }
409 }
410
411 fn process_queue(&mut self) {
414 assert!(self.pending_event.is_none());
415
416 while let Some(event) = self.commands.pop_front() {
417 match event {
418 ClientCommand::SendEvent(event) => {
419 self.pending_event = Some(self.notifier.notify(event));
420 return;
421 }
422 ClientCommand::NotifyFlush(block) => {
423 drop(block);
425 }
426 }
427 }
428 }
429}
430
431enum ClientCommand<E> {
432 SendEvent(E),
433 NotifyFlush(BarrierBlock),
434}
435
436impl<E> ClientCommand<E> {
437 fn event(&self) -> Option<&E> {
438 match self {
439 ClientCommand::SendEvent(event) => Some(event),
440 ClientCommand::NotifyFlush(_) => None,
441 }
442 }
443 fn event_mut(&mut self) -> Option<&mut E> {
444 match self {
445 ClientCommand::SendEvent(event) => Some(event),
446 ClientCommand::NotifyFlush(_) => None,
447 }
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use assert_matches::assert_matches;
455 use fidl::endpoints::create_proxy_and_stream;
456 use fidl_test_pkg_eventqueue::{
457 ExampleEventMonitorMarker, ExampleEventMonitorProxy, ExampleEventMonitorProxyInterface,
458 ExampleEventMonitorRequest, ExampleEventMonitorRequestStream,
459 };
460 use fuchsia_async as fasync;
461 use futures::future::BoxFuture;
462 use futures::pin_mut;
463 use futures::task::Poll;
464
465 struct FidlNotifier {
466 proxy: ExampleEventMonitorProxy,
467 }
468
469 impl Notify for FidlNotifier {
470 type Event = String;
471 type NotifyFuture = futures::future::Map<
472 <ExampleEventMonitorProxy as ExampleEventMonitorProxyInterface>::OnEventResponseFut,
473 fn(Result<(), fidl::Error>) -> Result<(), ClosedClient>,
474 >;
475
476 fn notify(&self, event: String) -> Self::NotifyFuture {
477 self.proxy.on_event(&event).map(|res| res.map_err(|_| ClosedClient))
478 }
479 }
480
481 struct MpscNotifier<T> {
482 sender: mpsc::Sender<T>,
483 }
484
485 impl<T> Notify for MpscNotifier<T>
486 where
487 T: Event + Send + 'static,
488 {
489 type Event = T;
490 type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
491
492 fn notify(&self, event: T) -> BoxFuture<'static, Result<(), ClosedClient>> {
493 let mut sender = self.sender.clone();
494 async move { sender.send(event).map(|result| result.map_err(|_| ClosedClient)).await }
495 .boxed()
496 }
497 }
498
499 impl Event for &'static str {
500 fn can_merge(&self, other: &&'static str) -> bool {
501 self == other
502 }
503 }
504
505 impl Event for String {
506 fn can_merge(&self, other: &String) -> bool {
507 self == other
508 }
509 }
510
511 fn start_event_queue() -> ControlHandle<FidlNotifier> {
512 let (event_queue, handle) = EventQueue::<FidlNotifier>::new();
513 fasync::Task::local(event_queue).detach();
514 handle
515 }
516
517 async fn add_client(
518 handle: &mut ControlHandle<FidlNotifier>,
519 ) -> ExampleEventMonitorRequestStream {
520 let (proxy, stream) = create_proxy_and_stream::<ExampleEventMonitorMarker>();
521 handle.add_client(FidlNotifier { proxy }).await.unwrap();
522 stream
523 }
524
525 async fn assert_events(
526 stream: &mut ExampleEventMonitorRequestStream,
527 expected_events: &[&str],
528 ) {
529 for &expected_event in expected_events {
530 match stream.try_next().await.unwrap().unwrap() {
531 ExampleEventMonitorRequest::OnEvent { event, responder } => {
532 assert_eq!(&event, expected_event);
533 responder.send().unwrap();
534 }
535 }
536 }
537 }
538
539 #[fasync::run_singlethreaded(test)]
540 async fn test_event_queue_simple() {
541 let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
542 fasync::Task::local(event_queue).detach();
543 let (sender, mut receiver) = mpsc::channel(1);
544 handle.add_client(MpscNotifier { sender }).await.unwrap();
545 handle.queue_event("event".into()).await.unwrap();
546 assert_matches!(receiver.next().await.as_deref(), Some("event"));
547 drop(handle);
548 assert_matches!(receiver.next().await, None);
549 }
550
551 #[test]
552 fn flush_with_no_clients_completes_immediately() {
553 let mut executor = fasync::TestExecutor::new();
554 let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
555 let _event_queue = fasync::Task::local(event_queue);
556
557 let wait_flush =
558 executor.run_singlethreaded(handle.try_flush(Duration::from_secs(1))).unwrap();
559 pin_mut!(wait_flush);
560
561 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
562 }
563
564 #[test]
565 fn flush_with_no_pending_events_completes_immediately() {
566 let mut executor = fasync::TestExecutor::new();
567 let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
568 let _event_queue = fasync::Task::local(event_queue);
569
570 let (sender, _receiver) = mpsc::channel(0);
571 #[allow(clippy::async_yields_async)]
572 let wait_flush = executor.run_singlethreaded(async {
574 handle.add_client(MpscNotifier { sender }).await.unwrap();
575 handle.try_flush(Duration::from_secs(1)).await.unwrap()
576 });
577 pin_mut!(wait_flush);
578
579 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
580 }
581
582 #[test]
583 fn flush_with_pending_events_completes_once_events_are_flushed() {
584 let mut executor = fasync::TestExecutor::new();
585 let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
586 let _event_queue = fasync::Task::local(event_queue);
587
588 let (sender1, mut receiver1) = mpsc::channel(0);
589 let (sender2, mut receiver2) = mpsc::channel(0);
590 #[allow(clippy::async_yields_async)]
591 let wait_flush = executor.run_singlethreaded(async {
593 handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
594 handle.queue_event("first").await.unwrap();
595 handle.queue_event("second").await.unwrap();
596 handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
597 let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
598 handle.queue_event("third").await.unwrap();
599 wait_flush
600 });
601 pin_mut!(wait_flush);
602
603 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
605
606 let () = executor.run_singlethreaded(async {
608 assert_eq!(receiver1.next().await, Some("first"));
609 });
610 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
611
612 let () = executor.run_singlethreaded(async {
614 assert_eq!(receiver1.next().await, Some("second"));
615 assert_eq!(receiver2.next().await, Some("first"));
616 assert_eq!(receiver2.next().await, Some("second"));
617 assert_eq!(receiver2.next().await, Some("third"));
618 });
619 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
620 }
621
622 #[test]
623 fn flush_with_pending_events_fails_at_timeout() {
624 let mut executor = fasync::TestExecutor::new_with_fake_time();
625 let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
626 let _event_queue = fasync::Task::local(event_queue);
627
628 let (sender, mut receiver) = mpsc::channel(0);
629 #[allow(clippy::async_yields_async)]
630 let wait_flush = {
632 let setup = async {
633 handle.queue_event("first").await.unwrap();
634 handle.add_client(MpscNotifier { sender }).await.unwrap();
635 handle.try_flush(Duration::from_secs(1)).await.unwrap()
636 };
637 pin_mut!(setup);
638 match executor.run_until_stalled(&mut setup) {
639 Poll::Ready(res) => res,
640 _ => panic!(),
641 }
642 };
643 pin_mut!(wait_flush);
644
645 assert!(!executor.wake_expired_timers());
646 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
647
648 executor.set_fake_time(fasync::MonotonicInstant::after(Duration::from_secs(1).into()));
649 assert!(executor.wake_expired_timers());
650 assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Err(TimedOut)));
651
652 let teardown = async {
654 drop(handle);
655 assert_eq!(receiver.next().await, Some("first"));
656 assert_eq!(receiver.next().await, None);
657 };
658 pin_mut!(teardown);
659 match executor.run_until_stalled(&mut teardown) {
660 Poll::Ready(()) => {}
661 _ => panic!(),
662 }
663 }
664
665 #[fasync::run_singlethreaded(test)]
666 async fn flush_only_applies_to_active_clients() {
667 let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
668 let _event_queue = fasync::Task::local(event_queue);
669
670 let (sender1, mut receiver1) = mpsc::channel(0);
671 let (sender2, mut receiver2) = mpsc::channel(0);
672
673 handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
674 handle.queue_event("first").await.unwrap();
675 handle.clear().await.unwrap();
676
677 handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
678 handle.queue_event("second").await.unwrap();
679 let flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
680
681 assert_eq!(receiver2.next().await, Some("second"));
683 flush.await.unwrap();
684
685 assert_eq!(receiver1.next().await, Some("first"));
687 assert_eq!(receiver1.next().await, None);
688 handle.clear().await.unwrap();
689 assert_eq!(receiver2.next().await, None);
690
691 handle.try_flush(Duration::from_secs(1)).await.unwrap().await.unwrap();
693 }
694
695 #[fasync::run_singlethreaded(test)]
696 async fn notify_flush_commands_do_not_count_towards_limit() {
697 let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::with_limit(3);
698 let _event_queue = fasync::Task::local(event_queue);
699
700 let (sender1, mut receiver1) = mpsc::channel(0);
701 let (sender2, mut receiver2) = mpsc::channel(0);
702
703 handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
704 handle.queue_event("event1").await.unwrap();
705 handle.queue_event("event2").await.unwrap();
706
707 handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
708 let flush2 = handle.try_flush(Duration::from_secs(1)).await.unwrap();
709 handle.queue_event("event3").await.unwrap();
710
711 assert_eq!(receiver1.next().await, Some("event1"));
712 assert_eq!(receiver1.next().await, Some("event2"));
713 assert_eq!(receiver1.next().await, Some("event3"));
714
715 assert_eq!(receiver2.next().await, Some("event1"));
716 assert_eq!(receiver2.next().await, Some("event2"));
717
718 flush2.await.unwrap();
720
721 assert_eq!(receiver2.next().await, Some("event3"));
722 drop(handle);
723 assert_eq!(receiver2.next().await, None);
724 }
725
726 #[fasync::run_singlethreaded(test)]
727 async fn notify_flush_commands_do_not_interfere_with_event_merge() {
728 let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
729 let _event_queue = fasync::Task::local(event_queue);
730
731 let (sender, mut receiver) = mpsc::channel(0);
732
733 handle.add_client(MpscNotifier { sender }).await.unwrap();
734 handle.queue_event("first").await.unwrap();
735 handle.queue_event("second_merge").await.unwrap();
736 let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
737 handle.queue_event("second_merge").await.unwrap();
738 handle.queue_event("third").await.unwrap();
739
740 assert_eq!(receiver.next().await, Some("first"));
741 assert_eq!(receiver.next().await, Some("second_merge"));
742 wait_flush.await.unwrap();
743 assert_eq!(receiver.next().await, Some("third"));
744 drop(handle);
745 assert_eq!(receiver.next().await, None);
746 }
747
748 #[fasync::run_singlethreaded(test)]
749 async fn test_event_queue_simple_fidl() {
750 let mut handle = start_event_queue();
751 let mut stream = add_client(&mut handle).await;
752 handle.queue_event("event".into()).await.unwrap();
753 assert_events(&mut stream, &["event"]).await;
754 drop(handle);
755 assert_matches!(stream.next().await, None);
756 }
757
758 #[fasync::run_singlethreaded(test)]
759 async fn test_event_queue_multi_client_multi_event() {
760 let mut handle = start_event_queue();
761 let mut stream1 = add_client(&mut handle).await;
762 handle.queue_event("event1".into()).await.unwrap();
763 handle.queue_event("event2".into()).await.unwrap();
764
765 let mut stream2 = add_client(&mut handle).await;
766 handle.queue_event("event3".into()).await.unwrap();
767
768 assert_events(&mut stream1, &["event1", "event2", "event3"]).await;
769 assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
770
771 drop(handle);
772 assert_matches!(stream1.next().await, None);
773 assert_matches!(stream2.next().await, None);
774 }
775
776 #[fasync::run_singlethreaded(test)]
777 async fn test_event_queue_clear_clients() {
778 let mut handle = start_event_queue();
779 let mut stream1 = add_client(&mut handle).await;
780 handle.queue_event("event1".into()).await.unwrap();
781 handle.queue_event("event2".into()).await.unwrap();
782 handle.clear().await.unwrap();
783 let mut stream2 = add_client(&mut handle).await;
784 handle.queue_event("event3".into()).await.unwrap();
785 assert_events(&mut stream2, &["event3"]).await;
786 assert_events(&mut stream1, &["event1", "event2"]).await;
787 assert_matches!(stream1.next().await, None);
789
790 handle.clear().await.unwrap();
792 assert_matches!(stream2.next().await, None);
793 }
794
795 #[fasync::run_singlethreaded(test)]
796 async fn test_event_queue_drop_unresponsive_clients() {
797 let mut handle = start_event_queue();
798 let mut stream = add_client(&mut handle).await;
799 for i in 1..12 {
800 handle.queue_event(format!("event{i}")).await.unwrap();
801 }
802 assert_events(&mut stream, &["event1"]).await;
803 assert_matches!(stream.next().await, None);
804 }
805
806 #[fasync::run_singlethreaded(test)]
807 async fn test_event_queue_drop_unresponsive_clients_custom_limit() {
808 let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
809 fasync::Task::local(event_queue).detach();
810 let mut stream = add_client(&mut handle).await;
811
812 handle.queue_event("event1".into()).await.unwrap();
813 handle.queue_event("event2".into()).await.unwrap();
814 handle.queue_event("event3".into()).await.unwrap();
815 assert_events(&mut stream, &["event1"]).await;
816 assert_matches!(stream.next().await, None);
817 }
818
819 #[fasync::run_singlethreaded(test)]
820 async fn test_event_queue_drop_unresponsive_clients_custom_limit_merge() {
821 let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
822 fasync::Task::local(event_queue).detach();
823 let mut stream = add_client(&mut handle).await;
824
825 handle.queue_event("event1".into()).await.unwrap();
826 handle.queue_event("event2".into()).await.unwrap();
827 handle.queue_event("event2".into()).await.unwrap();
828 handle.queue_event("event2".into()).await.unwrap();
829 handle.ping().await.unwrap();
830 assert_events(&mut stream, &["event1", "event2"]).await;
831 drop(handle);
832 assert_matches!(stream.next().await, None);
833 }
834
835 #[fasync::run_singlethreaded(test)]
836 async fn test_event_queue_drop_failed_clients() {
837 let mut handle = start_event_queue();
838 let mut stream = add_client(&mut handle).await;
839 handle.queue_event("event1".into()).await.unwrap();
840 handle.queue_event("event2".into()).await.unwrap();
841 match stream.try_next().await.unwrap().unwrap() {
842 ExampleEventMonitorRequest::OnEvent { event, .. } => {
843 assert_eq!(event, "event1");
844 }
846 }
847 assert_matches!(stream.next().await, None);
848 }
849
850 #[fasync::run_singlethreaded(test)]
851 async fn test_event_queue_drop_failed_clients_multiple() {
852 let mut handle = start_event_queue();
853 let mut stream1 = add_client(&mut handle).await;
854 let mut stream2 = add_client(&mut handle).await;
855 handle.queue_event("event1".into()).await.unwrap();
856 handle.queue_event("event2".into()).await.unwrap();
857 match stream1.try_next().await.unwrap().unwrap() {
858 ExampleEventMonitorRequest::OnEvent { event, .. } => {
859 assert_eq!(event, "event1");
860 }
862 }
863 assert_matches!(stream1.next().await, None);
864 handle.queue_event("event3".into()).await.unwrap();
866 assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
867 drop(handle);
868 assert_matches!(stream2.next().await, None);
869 }
870
871 #[fasync::run_singlethreaded(test)]
872 async fn test_event_queue_merge_events() {
873 let mut handle = start_event_queue();
874 let mut stream = add_client(&mut handle).await;
875 for i in 0..10 {
876 handle.queue_event(format!("event{}", i / 4)).await.unwrap();
877 }
878 assert_events(&mut stream, &["event0", "event0", "event1", "event2"]).await;
880 drop(handle);
881 assert_matches!(stream.next().await, None);
882 }
883}