event_queue/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6#![allow(clippy::let_unit_value)]
7
8//! This is a crate for broadcasting events to multiple clients and waiting for each client to
9//! receive it before sending another event to that client. If an event was failed to send, or if
10//! the number of events in the queue exceeds the limit, then the client is removed from the event
11//! queue.
12//!
13//! # Example
14//!
15//! ```
16//! #[derive(Clone)]
17//! struct FooEvent {
18//!     state: String,
19//!     progress: u8,
20//! }
21//!
22//! impl Event for FooEvent {
23//!     fn can_merge(&self, other: &FooEvent) -> bool {
24//!         self.state == other.state
25//!     }
26//! }
27//!
28//! struct FooNotifier {
29//!     proxy: FooProxy,
30//! }
31//!
32//! impl Notify for FooNotifier {
33//!     type Event = FooEvent;
34//!     type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
35//!
36//!     fn notify(&self, event: FooEvent) -> Self::NotifyFuture {
37//!         self.proxy.on_event(&event).map(|result| result.map_err(|_| ClosedClient)).boxed()
38//!     }
39//! }
40//!
41//! async fn foo(proxy: FooProxy) {
42//!     let (event_queue, mut handle) = EventQueue::<FooNotifier>::new();
43//!     let fut = async move {
44//!         handle.add_client(FooNotifier { proxy }).await.unwrap();
45//!         handle.queue_event(FooEvent { state: "new state".to_string(), progress: 0 }).await.unwrap();
46//!     };
47//!     future::join(fut, event_queue).await;
48//! }
49//! ```
50
51use fuchsia_async::TimeoutExt;
52use futures::channel::{mpsc, oneshot};
53use futures::future::select_all;
54use futures::prelude::*;
55use futures::select;
56use std::collections::VecDeque;
57use std::time::Duration;
58use thiserror::Error;
59
60mod barrier;
61use barrier::{Barrier, BarrierBlock};
62
63const DEFAULT_EVENTS_LIMIT: usize = 10;
64
65/// EventQueue event trait
66///
67/// The event type need to implement this trait to tell the event queue whether two consecutive
68/// pending events can be merged into a single event, if `can_merge` returns true, the event queue
69/// will replace the last event in the queue with the latest event.
70pub trait Event: Clone {
71    /// Returns whether this event can be merged with another event.
72    fn can_merge(&self, other: &Self) -> bool;
73}
74
75/// The client is closed and should be removed from the event queue.
76#[derive(Debug, Error, PartialEq, Eq)]
77#[error("The client is closed and should be removed from the event queue.")]
78pub struct ClosedClient;
79
80/// The event queue future was dropped before calling control handle functions.
81#[derive(Debug, Error, PartialEq, Eq)]
82#[error("The event queue future was dropped before calling control handle functions.")]
83pub struct EventQueueDropped;
84
85/// The flush operation timed out.
86#[derive(Debug, Error, PartialEq, Eq)]
87#[error("The flush operation timed out.")]
88pub struct TimedOut;
89
90/// This trait defines how an event should be notified for a client. The struct that implements
91/// this trait can hold client specific data.
92pub trait Notify {
93    /// The type of the event that can be sent through this notification channel.
94    type Event: Event;
95
96    /// The type of the future that will resolve when the client acknowledges the event or the
97    /// client is lost.
98    type NotifyFuture: Future<Output = Result<(), ClosedClient>> + Send + Unpin;
99
100    /// If the event was notified successfully, the future should return `Ok(())`, otherwise if the
101    /// client is closed, the future should return `Err(ClosedClient)` which will results in the
102    /// corresponding client being removed from the event queue.
103    fn notify(&self, event: Self::Event) -> Self::NotifyFuture;
104}
105
106#[derive(Debug)]
107enum Command<N>
108where
109    N: Notify,
110{
111    AddClient(N),
112    Clear,
113    QueueEvent(N::Event),
114    TryFlush(BarrierBlock),
115    Ping(oneshot::Sender<()>),
116}
117
118/// A control handle that can control the event queue.
119pub struct ControlHandle<N>
120where
121    N: Notify,
122{
123    sender: mpsc::Sender<Command<N>>,
124}
125
126impl<N> Clone for ControlHandle<N>
127where
128    N: Notify,
129{
130    fn clone(&self) -> Self {
131        ControlHandle { sender: self.sender.clone() }
132    }
133}
134
135impl<N> std::fmt::Debug for ControlHandle<N>
136where
137    N: Notify,
138{
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct("ControlHandle").finish()
141    }
142}
143
144impl<N> ControlHandle<N>
145where
146    N: Notify,
147{
148    fn new(sender: mpsc::Sender<Command<N>>) -> Self {
149        ControlHandle { sender }
150    }
151
152    /// Add a new client to the event queue.
153    /// If there were events queued before, the new client will receive the last event.
154    pub async fn add_client(&mut self, notifier: N) -> Result<(), EventQueueDropped> {
155        self.sender.send(Command::AddClient(notifier)).await.map_err(|_| EventQueueDropped)
156    }
157
158    /// Make all existing clients in the event queue stop adding new events to their queue, and
159    /// once they receive all queued events, they will be dropped.
160    pub async fn clear(&mut self) -> Result<(), EventQueueDropped> {
161        self.sender.send(Command::Clear).await.map_err(|_| EventQueueDropped)
162    }
163
164    /// Queue an event that will be sent to all clients.
165    pub async fn queue_event(&mut self, event: N::Event) -> Result<(), EventQueueDropped> {
166        self.sender.send(Command::QueueEvent(event)).await.map_err(|_| EventQueueDropped)
167    }
168
169    /// Try to flush all pending events to all connected clients, returning a future that completes
170    /// once all events are flushed or the given timeout duration is reached.
171    pub async fn try_flush(
172        &mut self,
173        timeout: Duration,
174    ) -> Result<impl Future<Output = Result<(), TimedOut>>, EventQueueDropped> {
175        let (barrier, block) = Barrier::new();
176        let () = self.sender.send(Command::TryFlush(block)).await.map_err(|_| EventQueueDropped)?;
177
178        Ok(barrier.map(Ok).on_timeout(timeout, || Err(TimedOut)))
179    }
180
181    /// Pings the event queue.  When this returns, you can be sure that all previous actions via
182    /// this control handle have been processed (but not necessarily forwarded to all clients yet).
183    pub async fn ping(&mut self) -> Result<(), EventQueueDropped> {
184        let (sender, receiver) = oneshot::channel();
185        self.sender.send(Command::Ping(sender)).await.map_err(|_| EventQueueDropped)?;
186        let _ = receiver.await;
187        Ok(())
188    }
189}
190
191/// An event queue for broadcasting events to multiple clients.
192/// Clients that failed to receive events or do not receive events fast enough will be dropped.
193pub struct EventQueue<N>
194where
195    N: Notify,
196{
197    clients: Vec<Client<N>>,
198    receiver: mpsc::Receiver<Command<N>>,
199    events_limit: usize,
200    prior_events: Vec<N::Event>,
201}
202
203impl<N> EventQueue<N>
204where
205    N: Notify,
206{
207    /// Create a new `EventQueue` and returns a future for the event queue and a control handle
208    /// to control the event queue.
209    #[allow(clippy::new_ret_no_self)]
210    pub fn new() -> (impl Future<Output = ()>, ControlHandle<N>) {
211        Self::with_limit(DEFAULT_EVENTS_LIMIT)
212    }
213
214    /// Set the maximum number of events each client can have in the queue.
215    /// Clients exceeding this limit will be dropped.
216    /// The default value if not set is 10.
217    pub fn with_limit(limit: usize) -> (impl Future<Output = ()>, ControlHandle<N>) {
218        let (sender, receiver) = mpsc::channel(1);
219        let event_queue =
220            EventQueue { clients: Vec::new(), receiver, events_limit: limit, prior_events: vec![] };
221        (event_queue.start(), ControlHandle::new(sender))
222    }
223
224    /// Start the event queue, this function will finish if the sender was dropped.
225    async fn start(mut self) {
226        loop {
227            // select_all will panic if the iterator has nothing in it, so we chain a
228            // future::pending to it.
229            let pending = future::pending().right_future();
230            let all_events = self
231                .clients
232                .iter_mut()
233                .filter_map(|c| c.pending_event.as_mut().map(|fut| fut.left_future()))
234                .chain(std::iter::once(pending));
235            let mut select_all_events = select_all(all_events).fuse();
236            select! {
237                (result, index, _) = select_all_events => {
238                    let i = self.find_client_index(index);
239                    match result {
240                        Ok(()) => self.next_event(i),
241                        Err(ClosedClient) => {
242                            self.clients.swap_remove(i);
243                        },
244                    }
245                },
246                command = self.receiver.next() => {
247                    match command {
248                        Some(Command::AddClient(proxy)) => self.add_client(proxy),
249                        Some(Command::Clear) => self.clear(),
250                        Some(Command::QueueEvent(event)) => self.queue_event(event),
251                        Some(Command::TryFlush(block)) => self.try_flush(block),
252                        Some(Command::Ping(pong)) => { let _ = pong.send(()); }
253                        None => break,
254                    }
255                },
256            }
257        }
258    }
259
260    fn add_client(&mut self, notifier: N) {
261        let mut client = Client::new(notifier);
262        for event in &self.prior_events {
263            client.queue_event(event.clone(), self.events_limit);
264        }
265        self.clients.push(client);
266    }
267
268    // Remove clients that have no pending events, otherwise tell them to not accept any new events.
269    fn clear(&mut self) {
270        let mut i = 0;
271        while i < self.clients.len() {
272            if self.clients[i].pending_event.is_none() {
273                self.clients.swap_remove(i);
274            } else {
275                self.clients[i].accept_new_events = false;
276                i += 1;
277            }
278        }
279        self.prior_events = vec![];
280    }
281
282    fn queue_event(&mut self, event: N::Event) {
283        let mut i = 0;
284        while i < self.clients.len() {
285            if !self.clients[i].queue_event(event.clone(), self.events_limit) {
286                self.clients.swap_remove(i);
287            } else {
288                i += 1;
289            }
290        }
291
292        // Merge this new event with the most recent event, if one exists and is mergable.
293        if let Some(newest_mergable_event) = self.prior_events.last() {
294            if newest_mergable_event.can_merge(&event) {
295                self.prior_events.pop();
296            }
297        }
298        self.prior_events.push(event);
299    }
300
301    fn try_flush(&mut self, block: BarrierBlock) {
302        for client in self.clients.iter_mut() {
303            client.queue_flush_notify(&block);
304        }
305    }
306
307    // Figure out the actual client index based on the filtered index.
308    fn find_client_index(&self, index: usize) -> usize {
309        let mut j = 0;
310        for i in 0..self.clients.len() {
311            if self.clients[i].pending_event.is_none() {
312                continue;
313            }
314
315            if j == index {
316                return i;
317            }
318
319            j += 1;
320        }
321        panic!("index {index} too large");
322    }
323
324    fn next_event(&mut self, i: usize) {
325        self.clients[i].ack_event();
326        if !self.clients[i].accept_new_events && self.clients[i].pending_event.is_none() {
327            self.clients.swap_remove(i);
328        }
329    }
330}
331
332struct Client<N>
333where
334    N: Notify,
335{
336    notifier: N,
337    pending_event: Option<N::NotifyFuture>,
338    commands: VecDeque<ClientCommand<N::Event>>,
339    accept_new_events: bool,
340}
341
342impl<N> Client<N>
343where
344    N: Notify,
345{
346    fn new(notifier: N) -> Self {
347        Client { notifier, pending_event: None, commands: VecDeque::new(), accept_new_events: true }
348    }
349
350    /// Returns the count of in-flight and queued events.
351    fn pending_event_count(&self) -> usize {
352        let queued_events = self.commands.iter().filter_map(ClientCommand::event).count();
353        let pending_event = usize::from(self.pending_event.is_some());
354
355        queued_events + pending_event
356    }
357
358    /// Find the most recently queued event, if one exists.
359    fn newest_queued_event(&mut self) -> Option<&mut N::Event> {
360        self.commands.iter_mut().rev().find_map(ClientCommand::event_mut)
361    }
362
363    fn queue_event(&mut self, event: N::Event, events_limit: usize) -> bool {
364        // Silently ignore new events if this client is part of a cleared session.
365        if !self.accept_new_events {
366            return true;
367        }
368
369        // Merge this new event with the most recent event, if one exists and is mergable.
370        if let Some(newest_mergable_event) =
371            self.newest_queued_event().filter(|last_event| last_event.can_merge(&event))
372        {
373            *newest_mergable_event = event;
374            return true;
375        }
376
377        // If the event can't be merged, make sure this event isn't the 1 to exceed the limit.
378        if self.pending_event_count() + 1 > events_limit {
379            return false;
380        }
381
382        // Enqueue the event and, if one is not in-flight, dispatch it.
383        self.queue_command(ClientCommand::SendEvent(event));
384        true
385    }
386
387    /// Drop block once all prior events are sent/acked.
388    fn queue_flush_notify(&mut self, block: &BarrierBlock) {
389        // A cleared client is not part of this flush request.
390        if !self.accept_new_events {
391            return;
392        }
393
394        self.queue_command(ClientCommand::NotifyFlush(block.clone()));
395    }
396
397    /// Mark the in-flight event as acknowledged and process items from the queue.
398    fn ack_event(&mut self) {
399        self.pending_event = None;
400        self.process_queue();
401    }
402
403    /// Enqueue the command for processing, and, if possible, process items from the queue.
404    fn queue_command(&mut self, cmd: ClientCommand<N::Event>) {
405        self.commands.push_back(cmd);
406        if self.pending_event.is_none() {
407            self.process_queue();
408        }
409    }
410
411    /// Assuming that no event is in-flight, process items from the queue until one is or the queue
412    /// is empty.
413    fn process_queue(&mut self) {
414        assert!(self.pending_event.is_none());
415
416        while let Some(event) = self.commands.pop_front() {
417            match event {
418                ClientCommand::SendEvent(event) => {
419                    self.pending_event = Some(self.notifier.notify(event));
420                    return;
421                }
422                ClientCommand::NotifyFlush(block) => {
423                    // drop the block handle to indicate this client has flushed all prior events.
424                    drop(block);
425                }
426            }
427        }
428    }
429}
430
431enum ClientCommand<E> {
432    SendEvent(E),
433    NotifyFlush(BarrierBlock),
434}
435
436impl<E> ClientCommand<E> {
437    fn event(&self) -> Option<&E> {
438        match self {
439            ClientCommand::SendEvent(event) => Some(event),
440            ClientCommand::NotifyFlush(_) => None,
441        }
442    }
443    fn event_mut(&mut self) -> Option<&mut E> {
444        match self {
445            ClientCommand::SendEvent(event) => Some(event),
446            ClientCommand::NotifyFlush(_) => None,
447        }
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use assert_matches::assert_matches;
455    use fidl::endpoints::create_proxy_and_stream;
456    use fidl_test_pkg_eventqueue::{
457        ExampleEventMonitorMarker, ExampleEventMonitorProxy, ExampleEventMonitorProxyInterface,
458        ExampleEventMonitorRequest, ExampleEventMonitorRequestStream,
459    };
460    use fuchsia_async as fasync;
461    use futures::future::BoxFuture;
462    use futures::pin_mut;
463    use futures::task::Poll;
464
465    struct FidlNotifier {
466        proxy: ExampleEventMonitorProxy,
467    }
468
469    impl Notify for FidlNotifier {
470        type Event = String;
471        type NotifyFuture = futures::future::Map<
472            <ExampleEventMonitorProxy as ExampleEventMonitorProxyInterface>::OnEventResponseFut,
473            fn(Result<(), fidl::Error>) -> Result<(), ClosedClient>,
474        >;
475
476        fn notify(&self, event: String) -> Self::NotifyFuture {
477            self.proxy.on_event(&event).map(|res| res.map_err(|_| ClosedClient))
478        }
479    }
480
481    struct MpscNotifier<T> {
482        sender: mpsc::Sender<T>,
483    }
484
485    impl<T> Notify for MpscNotifier<T>
486    where
487        T: Event + Send + 'static,
488    {
489        type Event = T;
490        type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
491
492        fn notify(&self, event: T) -> BoxFuture<'static, Result<(), ClosedClient>> {
493            let mut sender = self.sender.clone();
494            async move { sender.send(event).map(|result| result.map_err(|_| ClosedClient)).await }
495                .boxed()
496        }
497    }
498
499    impl Event for &'static str {
500        fn can_merge(&self, other: &&'static str) -> bool {
501            self == other
502        }
503    }
504
505    impl Event for String {
506        fn can_merge(&self, other: &String) -> bool {
507            self == other
508        }
509    }
510
511    fn start_event_queue() -> ControlHandle<FidlNotifier> {
512        let (event_queue, handle) = EventQueue::<FidlNotifier>::new();
513        fasync::Task::local(event_queue).detach();
514        handle
515    }
516
517    async fn add_client(
518        handle: &mut ControlHandle<FidlNotifier>,
519    ) -> ExampleEventMonitorRequestStream {
520        let (proxy, stream) = create_proxy_and_stream::<ExampleEventMonitorMarker>();
521        handle.add_client(FidlNotifier { proxy }).await.unwrap();
522        stream
523    }
524
525    async fn assert_events(
526        stream: &mut ExampleEventMonitorRequestStream,
527        expected_events: &[&str],
528    ) {
529        for &expected_event in expected_events {
530            match stream.try_next().await.unwrap().unwrap() {
531                ExampleEventMonitorRequest::OnEvent { event, responder } => {
532                    assert_eq!(&event, expected_event);
533                    responder.send().unwrap();
534                }
535            }
536        }
537    }
538
539    #[fasync::run_singlethreaded(test)]
540    async fn test_event_queue_simple() {
541        let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
542        fasync::Task::local(event_queue).detach();
543        let (sender, mut receiver) = mpsc::channel(1);
544        handle.add_client(MpscNotifier { sender }).await.unwrap();
545        handle.queue_event("event".into()).await.unwrap();
546        assert_matches!(receiver.next().await.as_deref(), Some("event"));
547        drop(handle);
548        assert_matches!(receiver.next().await, None);
549    }
550
551    #[test]
552    fn flush_with_no_clients_completes_immediately() {
553        let mut executor = fasync::TestExecutor::new();
554        let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
555        let _event_queue = fasync::Task::local(event_queue);
556
557        let wait_flush =
558            executor.run_singlethreaded(handle.try_flush(Duration::from_secs(1))).unwrap();
559        pin_mut!(wait_flush);
560
561        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
562    }
563
564    #[test]
565    fn flush_with_no_pending_events_completes_immediately() {
566        let mut executor = fasync::TestExecutor::new();
567        let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
568        let _event_queue = fasync::Task::local(event_queue);
569
570        let (sender, _receiver) = mpsc::channel(0);
571        #[allow(clippy::async_yields_async)]
572        // We want this future to run on our specific executor, not await it directly.
573        let wait_flush = executor.run_singlethreaded(async {
574            handle.add_client(MpscNotifier { sender }).await.unwrap();
575            handle.try_flush(Duration::from_secs(1)).await.unwrap()
576        });
577        pin_mut!(wait_flush);
578
579        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
580    }
581
582    #[test]
583    fn flush_with_pending_events_completes_once_events_are_flushed() {
584        let mut executor = fasync::TestExecutor::new();
585        let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
586        let _event_queue = fasync::Task::local(event_queue);
587
588        let (sender1, mut receiver1) = mpsc::channel(0);
589        let (sender2, mut receiver2) = mpsc::channel(0);
590        #[allow(clippy::async_yields_async)]
591        // We want this future to run on our specific executor, not await it directly.
592        let wait_flush = executor.run_singlethreaded(async {
593            handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
594            handle.queue_event("first").await.unwrap();
595            handle.queue_event("second").await.unwrap();
596            handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
597            let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
598            handle.queue_event("third").await.unwrap();
599            wait_flush
600        });
601        pin_mut!(wait_flush);
602
603        // No events acked yet, so the flush is pending.
604        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
605
606        // Some, but not all events acked, so the flush is still pending.
607        let () = executor.run_singlethreaded(async {
608            assert_eq!(receiver1.next().await, Some("first"));
609        });
610        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
611
612        // All events prior to the flush now acked, so the flush is done.
613        let () = executor.run_singlethreaded(async {
614            assert_eq!(receiver1.next().await, Some("second"));
615            assert_eq!(receiver2.next().await, Some("first"));
616            assert_eq!(receiver2.next().await, Some("second"));
617            assert_eq!(receiver2.next().await, Some("third"));
618        });
619        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
620    }
621
622    #[test]
623    fn flush_with_pending_events_fails_at_timeout() {
624        let mut executor = fasync::TestExecutor::new_with_fake_time();
625        let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
626        let _event_queue = fasync::Task::local(event_queue);
627
628        let (sender, mut receiver) = mpsc::channel(0);
629        #[allow(clippy::async_yields_async)]
630        // We want this future to run on our specific executor, not await it directly.
631        let wait_flush = {
632            let setup = async {
633                handle.queue_event("first").await.unwrap();
634                handle.add_client(MpscNotifier { sender }).await.unwrap();
635                handle.try_flush(Duration::from_secs(1)).await.unwrap()
636            };
637            pin_mut!(setup);
638            match executor.run_until_stalled(&mut setup) {
639                Poll::Ready(res) => res,
640                _ => panic!(),
641            }
642        };
643        pin_mut!(wait_flush);
644
645        assert!(!executor.wake_expired_timers());
646        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
647
648        executor.set_fake_time(fasync::MonotonicInstant::after(Duration::from_secs(1).into()));
649        assert!(executor.wake_expired_timers());
650        assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Err(TimedOut)));
651
652        // A flush timing out does not otherwise affect the queue.
653        let teardown = async {
654            drop(handle);
655            assert_eq!(receiver.next().await, Some("first"));
656            assert_eq!(receiver.next().await, None);
657        };
658        pin_mut!(teardown);
659        match executor.run_until_stalled(&mut teardown) {
660            Poll::Ready(()) => {}
661            _ => panic!(),
662        }
663    }
664
665    #[fasync::run_singlethreaded(test)]
666    async fn flush_only_applies_to_active_clients() {
667        let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
668        let _event_queue = fasync::Task::local(event_queue);
669
670        let (sender1, mut receiver1) = mpsc::channel(0);
671        let (sender2, mut receiver2) = mpsc::channel(0);
672
673        handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
674        handle.queue_event("first").await.unwrap();
675        handle.clear().await.unwrap();
676
677        handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
678        handle.queue_event("second").await.unwrap();
679        let flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
680
681        // The flush completes even though the first cleared client hasn't acked its event yet.
682        assert_eq!(receiver2.next().await, Some("second"));
683        flush.await.unwrap();
684
685        // Clear out all clients.
686        assert_eq!(receiver1.next().await, Some("first"));
687        assert_eq!(receiver1.next().await, None);
688        handle.clear().await.unwrap();
689        assert_eq!(receiver2.next().await, None);
690
691        // Nop flush is fast.
692        handle.try_flush(Duration::from_secs(1)).await.unwrap().await.unwrap();
693    }
694
695    #[fasync::run_singlethreaded(test)]
696    async fn notify_flush_commands_do_not_count_towards_limit() {
697        let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::with_limit(3);
698        let _event_queue = fasync::Task::local(event_queue);
699
700        let (sender1, mut receiver1) = mpsc::channel(0);
701        let (sender2, mut receiver2) = mpsc::channel(0);
702
703        handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
704        handle.queue_event("event1").await.unwrap();
705        handle.queue_event("event2").await.unwrap();
706
707        handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
708        let flush2 = handle.try_flush(Duration::from_secs(1)).await.unwrap();
709        handle.queue_event("event3").await.unwrap();
710
711        assert_eq!(receiver1.next().await, Some("event1"));
712        assert_eq!(receiver1.next().await, Some("event2"));
713        assert_eq!(receiver1.next().await, Some("event3"));
714
715        assert_eq!(receiver2.next().await, Some("event1"));
716        assert_eq!(receiver2.next().await, Some("event2"));
717
718        // Flush doesn't count towards client 2's limit.
719        flush2.await.unwrap();
720
721        assert_eq!(receiver2.next().await, Some("event3"));
722        drop(handle);
723        assert_eq!(receiver2.next().await, None);
724    }
725
726    #[fasync::run_singlethreaded(test)]
727    async fn notify_flush_commands_do_not_interfere_with_event_merge() {
728        let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
729        let _event_queue = fasync::Task::local(event_queue);
730
731        let (sender, mut receiver) = mpsc::channel(0);
732
733        handle.add_client(MpscNotifier { sender }).await.unwrap();
734        handle.queue_event("first").await.unwrap();
735        handle.queue_event("second_merge").await.unwrap();
736        let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
737        handle.queue_event("second_merge").await.unwrap();
738        handle.queue_event("third").await.unwrap();
739
740        assert_eq!(receiver.next().await, Some("first"));
741        assert_eq!(receiver.next().await, Some("second_merge"));
742        wait_flush.await.unwrap();
743        assert_eq!(receiver.next().await, Some("third"));
744        drop(handle);
745        assert_eq!(receiver.next().await, None);
746    }
747
748    #[fasync::run_singlethreaded(test)]
749    async fn test_event_queue_simple_fidl() {
750        let mut handle = start_event_queue();
751        let mut stream = add_client(&mut handle).await;
752        handle.queue_event("event".into()).await.unwrap();
753        assert_events(&mut stream, &["event"]).await;
754        drop(handle);
755        assert_matches!(stream.next().await, None);
756    }
757
758    #[fasync::run_singlethreaded(test)]
759    async fn test_event_queue_multi_client_multi_event() {
760        let mut handle = start_event_queue();
761        let mut stream1 = add_client(&mut handle).await;
762        handle.queue_event("event1".into()).await.unwrap();
763        handle.queue_event("event2".into()).await.unwrap();
764
765        let mut stream2 = add_client(&mut handle).await;
766        handle.queue_event("event3".into()).await.unwrap();
767
768        assert_events(&mut stream1, &["event1", "event2", "event3"]).await;
769        assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
770
771        drop(handle);
772        assert_matches!(stream1.next().await, None);
773        assert_matches!(stream2.next().await, None);
774    }
775
776    #[fasync::run_singlethreaded(test)]
777    async fn test_event_queue_clear_clients() {
778        let mut handle = start_event_queue();
779        let mut stream1 = add_client(&mut handle).await;
780        handle.queue_event("event1".into()).await.unwrap();
781        handle.queue_event("event2".into()).await.unwrap();
782        handle.clear().await.unwrap();
783        let mut stream2 = add_client(&mut handle).await;
784        handle.queue_event("event3".into()).await.unwrap();
785        assert_events(&mut stream2, &["event3"]).await;
786        assert_events(&mut stream1, &["event1", "event2"]).await;
787        // No event3 because the event queue was cleared.
788        assert_matches!(stream1.next().await, None);
789
790        // client2 should have no pending events and be dropped.
791        handle.clear().await.unwrap();
792        assert_matches!(stream2.next().await, None);
793    }
794
795    #[fasync::run_singlethreaded(test)]
796    async fn test_event_queue_drop_unresponsive_clients() {
797        let mut handle = start_event_queue();
798        let mut stream = add_client(&mut handle).await;
799        for i in 1..12 {
800            handle.queue_event(format!("event{i}")).await.unwrap();
801        }
802        assert_events(&mut stream, &["event1"]).await;
803        assert_matches!(stream.next().await, None);
804    }
805
806    #[fasync::run_singlethreaded(test)]
807    async fn test_event_queue_drop_unresponsive_clients_custom_limit() {
808        let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
809        fasync::Task::local(event_queue).detach();
810        let mut stream = add_client(&mut handle).await;
811
812        handle.queue_event("event1".into()).await.unwrap();
813        handle.queue_event("event2".into()).await.unwrap();
814        handle.queue_event("event3".into()).await.unwrap();
815        assert_events(&mut stream, &["event1"]).await;
816        assert_matches!(stream.next().await, None);
817    }
818
819    #[fasync::run_singlethreaded(test)]
820    async fn test_event_queue_drop_unresponsive_clients_custom_limit_merge() {
821        let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
822        fasync::Task::local(event_queue).detach();
823        let mut stream = add_client(&mut handle).await;
824
825        handle.queue_event("event1".into()).await.unwrap();
826        handle.queue_event("event2".into()).await.unwrap();
827        handle.queue_event("event2".into()).await.unwrap();
828        handle.queue_event("event2".into()).await.unwrap();
829        handle.ping().await.unwrap();
830        assert_events(&mut stream, &["event1", "event2"]).await;
831        drop(handle);
832        assert_matches!(stream.next().await, None);
833    }
834
835    #[fasync::run_singlethreaded(test)]
836    async fn test_event_queue_drop_failed_clients() {
837        let mut handle = start_event_queue();
838        let mut stream = add_client(&mut handle).await;
839        handle.queue_event("event1".into()).await.unwrap();
840        handle.queue_event("event2".into()).await.unwrap();
841        match stream.try_next().await.unwrap().unwrap() {
842            ExampleEventMonitorRequest::OnEvent { event, .. } => {
843                assert_eq!(event, "event1");
844                // Don't respond.
845            }
846        }
847        assert_matches!(stream.next().await, None);
848    }
849
850    #[fasync::run_singlethreaded(test)]
851    async fn test_event_queue_drop_failed_clients_multiple() {
852        let mut handle = start_event_queue();
853        let mut stream1 = add_client(&mut handle).await;
854        let mut stream2 = add_client(&mut handle).await;
855        handle.queue_event("event1".into()).await.unwrap();
856        handle.queue_event("event2".into()).await.unwrap();
857        match stream1.try_next().await.unwrap().unwrap() {
858            ExampleEventMonitorRequest::OnEvent { event, .. } => {
859                assert_eq!(event, "event1");
860                // Don't respond.
861            }
862        }
863        assert_matches!(stream1.next().await, None);
864        // stream2 can still receive events.
865        handle.queue_event("event3".into()).await.unwrap();
866        assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
867        drop(handle);
868        assert_matches!(stream2.next().await, None);
869    }
870
871    #[fasync::run_singlethreaded(test)]
872    async fn test_event_queue_merge_events() {
873        let mut handle = start_event_queue();
874        let mut stream = add_client(&mut handle).await;
875        for i in 0..10 {
876            handle.queue_event(format!("event{}", i / 4)).await.unwrap();
877        }
878        // The first event won't be merged because it's already sent before the second event comes.
879        assert_events(&mut stream, &["event0", "event0", "event1", "event2"]).await;
880        drop(handle);
881        assert_matches!(stream.next().await, None);
882    }
883}