#![deny(missing_docs)]
#![allow(clippy::let_unit_value)]
use fuchsia_async::TimeoutExt;
use futures::channel::{mpsc, oneshot};
use futures::future::select_all;
use futures::prelude::*;
use futures::select;
use std::collections::VecDeque;
use std::time::Duration;
use thiserror::Error;
mod barrier;
use barrier::{Barrier, BarrierBlock};
const DEFAULT_EVENTS_LIMIT: usize = 10;
pub trait Event: Clone {
fn can_merge(&self, other: &Self) -> bool;
}
#[derive(Debug, Error, PartialEq, Eq)]
#[error("The client is closed and should be removed from the event queue.")]
pub struct ClosedClient;
#[derive(Debug, Error, PartialEq, Eq)]
#[error("The event queue future was dropped before calling control handle functions.")]
pub struct EventQueueDropped;
#[derive(Debug, Error, PartialEq, Eq)]
#[error("The flush operation timed out.")]
pub struct TimedOut;
pub trait Notify {
type Event: Event;
type NotifyFuture: Future<Output = Result<(), ClosedClient>> + Send + Unpin;
fn notify(&self, event: Self::Event) -> Self::NotifyFuture;
}
#[derive(Debug)]
enum Command<N>
where
N: Notify,
{
AddClient(N),
Clear,
QueueEvent(N::Event),
TryFlush(BarrierBlock),
Ping(oneshot::Sender<()>),
}
pub struct ControlHandle<N>
where
N: Notify,
{
sender: mpsc::Sender<Command<N>>,
}
impl<N> Clone for ControlHandle<N>
where
N: Notify,
{
fn clone(&self) -> Self {
ControlHandle { sender: self.sender.clone() }
}
}
impl<N> std::fmt::Debug for ControlHandle<N>
where
N: Notify,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ControlHandle").finish()
}
}
impl<N> ControlHandle<N>
where
N: Notify,
{
fn new(sender: mpsc::Sender<Command<N>>) -> Self {
ControlHandle { sender }
}
pub async fn add_client(&mut self, notifier: N) -> Result<(), EventQueueDropped> {
self.sender.send(Command::AddClient(notifier)).await.map_err(|_| EventQueueDropped)
}
pub async fn clear(&mut self) -> Result<(), EventQueueDropped> {
self.sender.send(Command::Clear).await.map_err(|_| EventQueueDropped)
}
pub async fn queue_event(&mut self, event: N::Event) -> Result<(), EventQueueDropped> {
self.sender.send(Command::QueueEvent(event)).await.map_err(|_| EventQueueDropped)
}
pub async fn try_flush(
&mut self,
timeout: Duration,
) -> Result<impl Future<Output = Result<(), TimedOut>>, EventQueueDropped> {
let (barrier, block) = Barrier::new();
let () = self.sender.send(Command::TryFlush(block)).await.map_err(|_| EventQueueDropped)?;
Ok(barrier.map(Ok).on_timeout(timeout, || Err(TimedOut)))
}
pub async fn ping(&mut self) -> Result<(), EventQueueDropped> {
let (sender, receiver) = oneshot::channel();
self.sender.send(Command::Ping(sender)).await.map_err(|_| EventQueueDropped)?;
let _ = receiver.await;
Ok(())
}
}
pub struct EventQueue<N>
where
N: Notify,
{
clients: Vec<Client<N>>,
receiver: mpsc::Receiver<Command<N>>,
events_limit: usize,
prior_events: Vec<N::Event>,
}
impl<N> EventQueue<N>
where
N: Notify,
{
#[allow(clippy::new_ret_no_self)]
pub fn new() -> (impl Future<Output = ()>, ControlHandle<N>) {
Self::with_limit(DEFAULT_EVENTS_LIMIT)
}
pub fn with_limit(limit: usize) -> (impl Future<Output = ()>, ControlHandle<N>) {
let (sender, receiver) = mpsc::channel(1);
let event_queue =
EventQueue { clients: Vec::new(), receiver, events_limit: limit, prior_events: vec![] };
(event_queue.start(), ControlHandle::new(sender))
}
async fn start(mut self) {
loop {
let pending = future::pending().right_future();
let all_events = self
.clients
.iter_mut()
.filter_map(|c| c.pending_event.as_mut().map(|fut| fut.left_future()))
.chain(std::iter::once(pending));
let mut select_all_events = select_all(all_events).fuse();
select! {
(result, index, _) = select_all_events => {
let i = self.find_client_index(index);
match result {
Ok(()) => self.next_event(i),
Err(ClosedClient) => {
self.clients.swap_remove(i);
},
}
},
command = self.receiver.next() => {
match command {
Some(Command::AddClient(proxy)) => self.add_client(proxy),
Some(Command::Clear) => self.clear(),
Some(Command::QueueEvent(event)) => self.queue_event(event),
Some(Command::TryFlush(block)) => self.try_flush(block),
Some(Command::Ping(pong)) => { let _ = pong.send(()); }
None => break,
}
},
}
}
}
fn add_client(&mut self, notifier: N) {
let mut client = Client::new(notifier);
for event in &self.prior_events {
client.queue_event(event.clone(), self.events_limit);
}
self.clients.push(client);
}
fn clear(&mut self) {
let mut i = 0;
while i < self.clients.len() {
if self.clients[i].pending_event.is_none() {
self.clients.swap_remove(i);
} else {
self.clients[i].accept_new_events = false;
i += 1;
}
}
self.prior_events = vec![];
}
fn queue_event(&mut self, event: N::Event) {
let mut i = 0;
while i < self.clients.len() {
if !self.clients[i].queue_event(event.clone(), self.events_limit) {
self.clients.swap_remove(i);
} else {
i += 1;
}
}
if let Some(newest_mergable_event) = self.prior_events.last() {
if newest_mergable_event.can_merge(&event) {
self.prior_events.pop();
}
}
self.prior_events.push(event);
}
fn try_flush(&mut self, block: BarrierBlock) {
for client in self.clients.iter_mut() {
client.queue_flush_notify(&block);
}
}
fn find_client_index(&self, index: usize) -> usize {
let mut j = 0;
for i in 0..self.clients.len() {
if self.clients[i].pending_event.is_none() {
continue;
}
if j == index {
return i;
}
j += 1;
}
panic!("index {index} too large");
}
fn next_event(&mut self, i: usize) {
self.clients[i].ack_event();
if !self.clients[i].accept_new_events && self.clients[i].pending_event.is_none() {
self.clients.swap_remove(i);
}
}
}
struct Client<N>
where
N: Notify,
{
notifier: N,
pending_event: Option<N::NotifyFuture>,
commands: VecDeque<ClientCommand<N::Event>>,
accept_new_events: bool,
}
impl<N> Client<N>
where
N: Notify,
{
fn new(notifier: N) -> Self {
Client { notifier, pending_event: None, commands: VecDeque::new(), accept_new_events: true }
}
fn pending_event_count(&self) -> usize {
let queued_events = self.commands.iter().filter_map(ClientCommand::event).count();
let pending_event = usize::from(self.pending_event.is_some());
queued_events + pending_event
}
fn newest_queued_event(&mut self) -> Option<&mut N::Event> {
self.commands.iter_mut().rev().find_map(ClientCommand::event_mut)
}
fn queue_event(&mut self, event: N::Event, events_limit: usize) -> bool {
if !self.accept_new_events {
return true;
}
if let Some(newest_mergable_event) =
self.newest_queued_event().filter(|last_event| last_event.can_merge(&event))
{
*newest_mergable_event = event;
return true;
}
if self.pending_event_count() + 1 > events_limit {
return false;
}
self.queue_command(ClientCommand::SendEvent(event));
true
}
fn queue_flush_notify(&mut self, block: &BarrierBlock) {
if !self.accept_new_events {
return;
}
self.queue_command(ClientCommand::NotifyFlush(block.clone()));
}
fn ack_event(&mut self) {
self.pending_event = None;
self.process_queue();
}
fn queue_command(&mut self, cmd: ClientCommand<N::Event>) {
self.commands.push_back(cmd);
if self.pending_event.is_none() {
self.process_queue();
}
}
fn process_queue(&mut self) {
assert!(self.pending_event.is_none());
while let Some(event) = self.commands.pop_front() {
match event {
ClientCommand::SendEvent(event) => {
self.pending_event = Some(self.notifier.notify(event));
return;
}
ClientCommand::NotifyFlush(block) => {
drop(block);
}
}
}
}
}
enum ClientCommand<E> {
SendEvent(E),
NotifyFlush(BarrierBlock),
}
impl<E> ClientCommand<E> {
fn event(&self) -> Option<&E> {
match self {
ClientCommand::SendEvent(event) => Some(event),
ClientCommand::NotifyFlush(_) => None,
}
}
fn event_mut(&mut self) -> Option<&mut E> {
match self {
ClientCommand::SendEvent(event) => Some(event),
ClientCommand::NotifyFlush(_) => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fidl::endpoints::create_proxy_and_stream;
use fidl_test_pkg_eventqueue::{
ExampleEventMonitorMarker, ExampleEventMonitorProxy, ExampleEventMonitorProxyInterface,
ExampleEventMonitorRequest, ExampleEventMonitorRequestStream,
};
use fuchsia_async as fasync;
use futures::future::BoxFuture;
use futures::pin_mut;
use futures::task::Poll;
struct FidlNotifier {
proxy: ExampleEventMonitorProxy,
}
impl Notify for FidlNotifier {
type Event = String;
type NotifyFuture = futures::future::Map<
<ExampleEventMonitorProxy as ExampleEventMonitorProxyInterface>::OnEventResponseFut,
fn(Result<(), fidl::Error>) -> Result<(), ClosedClient>,
>;
fn notify(&self, event: String) -> Self::NotifyFuture {
self.proxy.on_event(&event).map(|res| res.map_err(|_| ClosedClient))
}
}
struct MpscNotifier<T> {
sender: mpsc::Sender<T>,
}
impl<T> Notify for MpscNotifier<T>
where
T: Event + Send + 'static,
{
type Event = T;
type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
fn notify(&self, event: T) -> BoxFuture<'static, Result<(), ClosedClient>> {
let mut sender = self.sender.clone();
async move { sender.send(event).map(|result| result.map_err(|_| ClosedClient)).await }
.boxed()
}
}
impl Event for &'static str {
fn can_merge(&self, other: &&'static str) -> bool {
self == other
}
}
impl Event for String {
fn can_merge(&self, other: &String) -> bool {
self == other
}
}
fn start_event_queue() -> ControlHandle<FidlNotifier> {
let (event_queue, handle) = EventQueue::<FidlNotifier>::new();
fasync::Task::local(event_queue).detach();
handle
}
async fn add_client(
handle: &mut ControlHandle<FidlNotifier>,
) -> ExampleEventMonitorRequestStream {
let (proxy, stream) = create_proxy_and_stream::<ExampleEventMonitorMarker>().unwrap();
handle.add_client(FidlNotifier { proxy }).await.unwrap();
stream
}
async fn assert_events(
stream: &mut ExampleEventMonitorRequestStream,
expected_events: &[&str],
) {
for &expected_event in expected_events {
match stream.try_next().await.unwrap().unwrap() {
ExampleEventMonitorRequest::OnEvent { event, responder } => {
assert_eq!(&event, expected_event);
responder.send().unwrap();
}
}
}
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_simple() {
let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
fasync::Task::local(event_queue).detach();
let (sender, mut receiver) = mpsc::channel(1);
handle.add_client(MpscNotifier { sender }).await.unwrap();
handle.queue_event("event".into()).await.unwrap();
assert_matches!(receiver.next().await.as_deref(), Some("event"));
drop(handle);
assert_matches!(receiver.next().await, None);
}
#[test]
fn flush_with_no_clients_completes_immediately() {
let mut executor = fasync::TestExecutor::new();
let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
let _event_queue = fasync::Task::local(event_queue);
let wait_flush =
executor.run_singlethreaded(handle.try_flush(Duration::from_secs(1))).unwrap();
pin_mut!(wait_flush);
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
}
#[test]
fn flush_with_no_pending_events_completes_immediately() {
let mut executor = fasync::TestExecutor::new();
let (event_queue, mut handle) = EventQueue::<MpscNotifier<String>>::new();
let _event_queue = fasync::Task::local(event_queue);
let (sender, _receiver) = mpsc::channel(0);
#[allow(clippy::async_yields_async)]
let wait_flush = executor.run_singlethreaded(async {
handle.add_client(MpscNotifier { sender }).await.unwrap();
handle.try_flush(Duration::from_secs(1)).await.unwrap()
});
pin_mut!(wait_flush);
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
}
#[test]
fn flush_with_pending_events_completes_once_events_are_flushed() {
let mut executor = fasync::TestExecutor::new();
let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
let _event_queue = fasync::Task::local(event_queue);
let (sender1, mut receiver1) = mpsc::channel(0);
let (sender2, mut receiver2) = mpsc::channel(0);
#[allow(clippy::async_yields_async)]
let wait_flush = executor.run_singlethreaded(async {
handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
handle.queue_event("first").await.unwrap();
handle.queue_event("second").await.unwrap();
handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
handle.queue_event("third").await.unwrap();
wait_flush
});
pin_mut!(wait_flush);
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
let () = executor.run_singlethreaded(async {
assert_eq!(receiver1.next().await, Some("first"));
});
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
let () = executor.run_singlethreaded(async {
assert_eq!(receiver1.next().await, Some("second"));
assert_eq!(receiver2.next().await, Some("first"));
assert_eq!(receiver2.next().await, Some("second"));
assert_eq!(receiver2.next().await, Some("third"));
});
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Ok(())));
}
#[test]
fn flush_with_pending_events_fails_at_timeout() {
let mut executor = fasync::TestExecutor::new_with_fake_time();
let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
let _event_queue = fasync::Task::local(event_queue);
let (sender, mut receiver) = mpsc::channel(0);
#[allow(clippy::async_yields_async)]
let wait_flush = {
let setup = async {
handle.queue_event("first").await.unwrap();
handle.add_client(MpscNotifier { sender }).await.unwrap();
handle.try_flush(Duration::from_secs(1)).await.unwrap()
};
pin_mut!(setup);
match executor.run_until_stalled(&mut setup) {
Poll::Ready(res) => res,
_ => panic!(),
}
};
pin_mut!(wait_flush);
assert!(!executor.wake_expired_timers());
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Pending);
executor.set_fake_time(fasync::MonotonicInstant::after(Duration::from_secs(1).into()));
assert!(executor.wake_expired_timers());
assert_eq!(executor.run_until_stalled(&mut wait_flush), Poll::Ready(Err(TimedOut)));
let teardown = async {
drop(handle);
assert_eq!(receiver.next().await, Some("first"));
assert_eq!(receiver.next().await, None);
};
pin_mut!(teardown);
match executor.run_until_stalled(&mut teardown) {
Poll::Ready(()) => {}
_ => panic!(),
}
}
#[fasync::run_singlethreaded(test)]
async fn flush_only_applies_to_active_clients() {
let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
let _event_queue = fasync::Task::local(event_queue);
let (sender1, mut receiver1) = mpsc::channel(0);
let (sender2, mut receiver2) = mpsc::channel(0);
handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
handle.queue_event("first").await.unwrap();
handle.clear().await.unwrap();
handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
handle.queue_event("second").await.unwrap();
let flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
assert_eq!(receiver2.next().await, Some("second"));
flush.await.unwrap();
assert_eq!(receiver1.next().await, Some("first"));
assert_eq!(receiver1.next().await, None);
handle.clear().await.unwrap();
assert_eq!(receiver2.next().await, None);
handle.try_flush(Duration::from_secs(1)).await.unwrap().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn notify_flush_commands_do_not_count_towards_limit() {
let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::with_limit(3);
let _event_queue = fasync::Task::local(event_queue);
let (sender1, mut receiver1) = mpsc::channel(0);
let (sender2, mut receiver2) = mpsc::channel(0);
handle.add_client(MpscNotifier { sender: sender1 }).await.unwrap();
handle.queue_event("event1").await.unwrap();
handle.queue_event("event2").await.unwrap();
handle.add_client(MpscNotifier { sender: sender2 }).await.unwrap();
let flush2 = handle.try_flush(Duration::from_secs(1)).await.unwrap();
handle.queue_event("event3").await.unwrap();
assert_eq!(receiver1.next().await, Some("event1"));
assert_eq!(receiver1.next().await, Some("event2"));
assert_eq!(receiver1.next().await, Some("event3"));
assert_eq!(receiver2.next().await, Some("event1"));
assert_eq!(receiver2.next().await, Some("event2"));
flush2.await.unwrap();
assert_eq!(receiver2.next().await, Some("event3"));
drop(handle);
assert_eq!(receiver2.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn notify_flush_commands_do_not_interfere_with_event_merge() {
let (event_queue, mut handle) = EventQueue::<MpscNotifier<&'static str>>::new();
let _event_queue = fasync::Task::local(event_queue);
let (sender, mut receiver) = mpsc::channel(0);
handle.add_client(MpscNotifier { sender }).await.unwrap();
handle.queue_event("first").await.unwrap();
handle.queue_event("second_merge").await.unwrap();
let wait_flush = handle.try_flush(Duration::from_secs(1)).await.unwrap();
handle.queue_event("second_merge").await.unwrap();
handle.queue_event("third").await.unwrap();
assert_eq!(receiver.next().await, Some("first"));
assert_eq!(receiver.next().await, Some("second_merge"));
wait_flush.await.unwrap();
assert_eq!(receiver.next().await, Some("third"));
drop(handle);
assert_eq!(receiver.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_simple_fidl() {
let mut handle = start_event_queue();
let mut stream = add_client(&mut handle).await;
handle.queue_event("event".into()).await.unwrap();
assert_events(&mut stream, &["event"]).await;
drop(handle);
assert_matches!(stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_multi_client_multi_event() {
let mut handle = start_event_queue();
let mut stream1 = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
let mut stream2 = add_client(&mut handle).await;
handle.queue_event("event3".into()).await.unwrap();
assert_events(&mut stream1, &["event1", "event2", "event3"]).await;
assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
drop(handle);
assert_matches!(stream1.next().await, None);
assert_matches!(stream2.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_clear_clients() {
let mut handle = start_event_queue();
let mut stream1 = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
handle.clear().await.unwrap();
let mut stream2 = add_client(&mut handle).await;
handle.queue_event("event3".into()).await.unwrap();
assert_events(&mut stream2, &["event3"]).await;
assert_events(&mut stream1, &["event1", "event2"]).await;
assert_matches!(stream1.next().await, None);
handle.clear().await.unwrap();
assert_matches!(stream2.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_drop_unresponsive_clients() {
let mut handle = start_event_queue();
let mut stream = add_client(&mut handle).await;
for i in 1..12 {
handle.queue_event(format!("event{i}")).await.unwrap();
}
assert_events(&mut stream, &["event1"]).await;
assert_matches!(stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_drop_unresponsive_clients_custom_limit() {
let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
fasync::Task::local(event_queue).detach();
let mut stream = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
handle.queue_event("event3".into()).await.unwrap();
assert_events(&mut stream, &["event1"]).await;
assert_matches!(stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_drop_unresponsive_clients_custom_limit_merge() {
let (event_queue, mut handle) = EventQueue::<FidlNotifier>::with_limit(2);
fasync::Task::local(event_queue).detach();
let mut stream = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
handle.ping().await.unwrap();
assert_events(&mut stream, &["event1", "event2"]).await;
drop(handle);
assert_matches!(stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_drop_failed_clients() {
let mut handle = start_event_queue();
let mut stream = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
match stream.try_next().await.unwrap().unwrap() {
ExampleEventMonitorRequest::OnEvent { event, .. } => {
assert_eq!(event, "event1");
}
}
assert_matches!(stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_drop_failed_clients_multiple() {
let mut handle = start_event_queue();
let mut stream1 = add_client(&mut handle).await;
let mut stream2 = add_client(&mut handle).await;
handle.queue_event("event1".into()).await.unwrap();
handle.queue_event("event2".into()).await.unwrap();
match stream1.try_next().await.unwrap().unwrap() {
ExampleEventMonitorRequest::OnEvent { event, .. } => {
assert_eq!(event, "event1");
}
}
assert_matches!(stream1.next().await, None);
handle.queue_event("event3".into()).await.unwrap();
assert_events(&mut stream2, &["event1", "event2", "event3"]).await;
drop(handle);
assert_matches!(stream2.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn test_event_queue_merge_events() {
let mut handle = start_event_queue();
let mut stream = add_client(&mut handle).await;
for i in 0..10 {
handle.queue_event(format!("event{}", i / 4)).await.unwrap();
}
assert_events(&mut stream, &["event0", "event0", "event1", "event2"]).await;
drop(handle);
assert_matches!(stream.next().await, None);
}
}