netemul_sync/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![warn(missing_docs)]
6
7//! Client library for the `fuchsia.netemul.sync` FIDL library.
8
9use fidl_fuchsia_netemul_sync as fnetemul_sync;
10use futures_util::TryStreamExt as _;
11
12#[allow(missing_docs)]
13#[derive(thiserror::Error, Debug)]
14pub enum Error {
15    #[error("clients not present on bus: {0:?}")]
16    ClientsNotPresentOnBus(Vec<String>),
17    #[error("failed to observe expected events: {0:?}")]
18    FailedToObserveEvents(Vec<Event>),
19    #[error("error communicating with the sync-manager: {0:?}")]
20    Fidl(#[from] fidl::Error),
21    #[error("error connecting to protocol: {0:?}")]
22    ConnectToProtocol(anyhow::Error),
23}
24
25type Result<T = ()> = std::result::Result<T, Error>;
26
27/// An event published on a bus.
28#[derive(Debug, PartialEq)]
29pub struct Event {
30    /// User-defined event code.
31    pub code: i32,
32    /// An optional description of the event.
33    pub message: Option<String>,
34    /// An optional collection of serialized arguments.
35    pub arguments: Option<Vec<u8>>,
36}
37
38impl Event {
39    /// Creates an event with the specified event code.
40    pub fn from_code(code: i32) -> Self {
41        Self { code, message: None, arguments: None }
42    }
43}
44
45impl From<Event> for fnetemul_sync::Event {
46    fn from(Event { code, message, arguments }: Event) -> Self {
47        Self { code: Some(code), message, arguments, ..Default::default() }
48    }
49}
50
51impl From<fnetemul_sync::Event> for Event {
52    fn from(event: fnetemul_sync::Event) -> Self {
53        let fnetemul_sync::Event { code, message, arguments, .. } = event;
54        Self { code: code.expect("code not set in event"), message, arguments }
55    }
56}
57
58/// A connection to a named bus.
59///
60/// A bus is a broadcast pub/sub network that distributes events.
61pub struct Bus {
62    bus: fnetemul_sync::BusProxy,
63}
64
65impl Bus {
66    /// Subscribes to bus `name` with client name `client`.
67    pub fn subscribe(name: &str, client: &str) -> Result<Self> {
68        let sync_manager =
69            fuchsia_component::client::connect_to_protocol::<fnetemul_sync::SyncManagerMarker>()
70                .map_err(Error::ConnectToProtocol)?;
71        let (bus, server_end) = fidl::endpoints::create_proxy::<fnetemul_sync::BusMarker>();
72        sync_manager.bus_subscribe(name, client, server_end)?;
73        Ok(Bus { bus })
74    }
75
76    /// Publishes an event on the bus.
77    pub fn publish(&self, event: Event) -> Result {
78        self.bus.publish(&event.into())?;
79        Ok(())
80    }
81
82    /// Waits for the specified client to join the bus.
83    pub async fn wait_for_client(&self, client: &str) -> Result {
84        let (success, absent) =
85            self.bus.wait_for_clients(&[client.to_owned()], /* no timeout */ 0).await?;
86        if !success {
87            let absent = absent.expect("absent clients not set in response");
88            return Err(Error::ClientsNotPresentOnBus(absent));
89        }
90        Ok(())
91    }
92
93    /// Waits for the specified events to be observed on the bus.
94    pub async fn wait_for_events(&self, mut events: Vec<Event>) -> Result {
95        let mut stream = self.bus.take_event_stream();
96        while let Some(event) = stream.try_next().await? {
97            match event {
98                fnetemul_sync::BusEvent::OnBusData { data } => {
99                    let received_event = data.into();
100                    if events.contains(&received_event) {
101                        events.retain(|event| event != &received_event);
102                        if events.is_empty() {
103                            return Ok(());
104                        }
105                    }
106                }
107                _ => {}
108            }
109        }
110        if !events.is_empty() {
111            return Err(Error::FailedToObserveEvents(events));
112        }
113        Ok(())
114    }
115}