netemul_sync/
lib.rs
1#![warn(missing_docs)]
6
7use fidl_fuchsia_netemul_sync as fnetemul_sync;
10use futures_util::TryStreamExt as _;
11
12#[allow(missing_docs)]
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15 #[error("clients not present on bus: {0:?}")]
16 ClientsNotPresentOnBus(Vec<String>),
17 #[error("failed to observe expected events: {0:?}")]
18 FailedToObserveEvents(Vec<Event>),
19 #[error("error communicating with the sync-manager: {0:?}")]
20 Fidl(#[from] fidl::Error),
21 #[error("error connecting to protocol: {0:?}")]
22 ConnectToProtocol(anyhow::Error),
23}
24
25type Result<T = ()> = std::result::Result<T, Error>;
26
27#[derive(Debug, PartialEq)]
29pub struct Event {
30 pub code: i32,
32 pub message: Option<String>,
34 pub arguments: Option<Vec<u8>>,
36}
37
38impl Event {
39 pub fn from_code(code: i32) -> Self {
41 Self { code, message: None, arguments: None }
42 }
43}
44
45impl From<Event> for fnetemul_sync::Event {
46 fn from(Event { code, message, arguments }: Event) -> Self {
47 Self { code: Some(code), message, arguments, ..Default::default() }
48 }
49}
50
51impl From<fnetemul_sync::Event> for Event {
52 fn from(event: fnetemul_sync::Event) -> Self {
53 let fnetemul_sync::Event { code, message, arguments, .. } = event;
54 Self { code: code.expect("code not set in event"), message, arguments }
55 }
56}
57
58pub struct Bus {
62 bus: fnetemul_sync::BusProxy,
63}
64
65impl Bus {
66 pub fn subscribe(name: &str, client: &str) -> Result<Self> {
68 let sync_manager =
69 fuchsia_component::client::connect_to_protocol::<fnetemul_sync::SyncManagerMarker>()
70 .map_err(Error::ConnectToProtocol)?;
71 let (bus, server_end) = fidl::endpoints::create_proxy::<fnetemul_sync::BusMarker>();
72 sync_manager.bus_subscribe(name, client, server_end)?;
73 Ok(Bus { bus })
74 }
75
76 pub fn publish(&self, event: Event) -> Result {
78 self.bus.publish(&event.into())?;
79 Ok(())
80 }
81
82 pub async fn wait_for_client(&self, client: &str) -> Result {
84 let (success, absent) =
85 self.bus.wait_for_clients(&[client.to_owned()], 0).await?;
86 if !success {
87 let absent = absent.expect("absent clients not set in response");
88 return Err(Error::ClientsNotPresentOnBus(absent));
89 }
90 Ok(())
91 }
92
93 pub async fn wait_for_events(&self, mut events: Vec<Event>) -> Result {
95 let mut stream = self.bus.take_event_stream();
96 while let Some(event) = stream.try_next().await? {
97 match event {
98 fnetemul_sync::BusEvent::OnBusData { data } => {
99 let received_event = data.into();
100 if events.contains(&received_event) {
101 events.retain(|event| event != &received_event);
102 if events.is_empty() {
103 return Ok(());
104 }
105 }
106 }
107 _ => {}
108 }
109 }
110 if !events.is_empty() {
111 return Err(Error::FailedToObserveEvents(events));
112 }
113 Ok(())
114 }
115}