#![warn(missing_docs)]
use fidl_fuchsia_netemul_sync as fnetemul_sync;
use futures_util::TryStreamExt as _;
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("clients not present on bus: {0:?}")]
ClientsNotPresentOnBus(Vec<String>),
#[error("failed to observe expected events: {0:?}")]
FailedToObserveEvents(Vec<Event>),
#[error("error communicating with the sync-manager: {0:?}")]
Fidl(#[from] fidl::Error),
#[error("error connecting to protocol: {0:?}")]
ConnectToProtocol(anyhow::Error),
}
type Result<T = ()> = std::result::Result<T, Error>;
#[derive(Debug, PartialEq)]
pub struct Event {
pub code: i32,
pub message: Option<String>,
pub arguments: Option<Vec<u8>>,
}
impl Event {
pub fn from_code(code: i32) -> Self {
Self { code, message: None, arguments: None }
}
}
impl From<Event> for fnetemul_sync::Event {
fn from(Event { code, message, arguments }: Event) -> Self {
Self { code: Some(code), message, arguments, ..Default::default() }
}
}
impl From<fnetemul_sync::Event> for Event {
fn from(event: fnetemul_sync::Event) -> Self {
let fnetemul_sync::Event { code, message, arguments, .. } = event;
Self { code: code.expect("code not set in event"), message, arguments }
}
}
pub struct Bus {
bus: fnetemul_sync::BusProxy,
}
impl Bus {
pub fn subscribe(name: &str, client: &str) -> Result<Self> {
let sync_manager =
fuchsia_component::client::connect_to_protocol::<fnetemul_sync::SyncManagerMarker>()
.map_err(Error::ConnectToProtocol)?;
let (bus, server_end) = fidl::endpoints::create_proxy::<fnetemul_sync::BusMarker>();
sync_manager.bus_subscribe(name, client, server_end)?;
Ok(Bus { bus })
}
pub fn publish(&self, event: Event) -> Result {
self.bus.publish(&event.into())?;
Ok(())
}
pub async fn wait_for_client(&self, client: &str) -> Result {
let (success, absent) =
self.bus.wait_for_clients(&[client.to_owned()], 0).await?;
if !success {
let absent = absent.expect("absent clients not set in response");
return Err(Error::ClientsNotPresentOnBus(absent));
}
Ok(())
}
pub async fn wait_for_events(&self, mut events: Vec<Event>) -> Result {
let mut stream = self.bus.take_event_stream();
while let Some(event) = stream.try_next().await? {
match event {
fnetemul_sync::BusEvent::OnBusData { data } => {
let received_event = data.into();
if events.contains(&received_event) {
events.retain(|event| event != &received_event);
if events.is_empty() {
return Ok(());
}
}
}
_ => {}
}
}
if !events.is_empty() {
return Err(Error::FailedToObserveEvents(events));
}
Ok(())
}
}