netdevice_client/
client.rsuse std::convert::TryInto as _;
use async_utils::hanging_get::client::HangingGetStream;
use fidl_fuchsia_hardware_network as netdev;
use fidl_table_validation::ValidFidlTable;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use std::pin::pin;
use crate::error::{Error, Result};
use crate::session::{Config, DerivableConfig, DeviceInfo, Port, Session, Task};
#[derive(Clone)]
pub struct Client {
device: netdev::DeviceProxy,
}
const DEFAULT_PORT_STREAM_BUFFER_SIZE: u32 = 64;
pub fn new_port_status_stream(
port_proxy: &netdev::PortProxy,
buffer: Option<u32>,
) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
let (watcher_proxy, watcher_server) =
fidl::endpoints::create_proxy::<netdev::StatusWatcherMarker>();
let () = port_proxy
.get_status_watcher(watcher_server, buffer.unwrap_or(DEFAULT_PORT_STREAM_BUFFER_SIZE))?;
Ok(HangingGetStream::new(watcher_proxy, netdev::StatusWatcherProxy::watch_status)
.err_into()
.and_then(|status| futures::future::ready(status.try_into().map_err(Error::PortStatus))))
}
impl Client {
pub fn new(device: netdev::DeviceProxy) -> Self {
Client { device }
}
pub fn connect_port(&self, port: Port) -> Result<netdev::PortProxy> {
let (port_proxy, port_server) = fidl::endpoints::create_proxy::<netdev::PortMarker>();
self.connect_port_server_end(port, port_server).map(move |()| port_proxy)
}
pub fn connect_port_server_end(
&self,
port: Port,
server_end: fidl::endpoints::ServerEnd<netdev::PortMarker>,
) -> Result<()> {
Ok(self.device.get_port(&port.into(), server_end)?)
}
pub async fn device_info(&self) -> Result<DeviceInfo> {
Ok(self.device.get_info().await?.try_into()?)
}
pub fn port_status_stream(
&self,
port: Port,
) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
self.port_status_stream_with_buffer_size(port, DEFAULT_PORT_STREAM_BUFFER_SIZE)
}
pub fn port_status_stream_with_buffer_size(
&self,
port: Port,
buffer: u32,
) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
let port_proxy = self.connect_port(port)?;
new_port_status_stream(&port_proxy, Some(buffer))
}
pub async fn wait_online(&self, port: Port) -> Result<PortStatus> {
let stream = self
.port_status_stream_with_buffer_size(
port, 0, )?
.try_filter(|status| {
futures::future::ready(status.flags.contains(netdev::StatusFlags::ONLINE))
});
let mut stream = pin!(stream);
stream.next().await.expect("HangingGetStream should never terminate")
}
pub fn device_port_event_stream(
&self,
) -> Result<impl Stream<Item = Result<DevicePortEvent>> + Unpin> {
let (proxy, server) = fidl::endpoints::create_proxy::<netdev::PortWatcherMarker>();
let () = self.device.get_port_watcher(server)?;
Ok(HangingGetStream::new(proxy, netdev::PortWatcherProxy::watch).err_into())
}
pub async fn new_session(&self, name: &str, config: Config) -> Result<(Session, Task)> {
Session::new(&self.device, name, config).await
}
pub async fn new_session_with_derivable_config(
&self,
name: &str,
config: DerivableConfig,
) -> Result<(Session, Task)> {
let device_info = self.device_info().await?;
let config = device_info.make_config(config)?;
Session::new(&self.device, name, config).await
}
}
#[derive(Debug, Clone, ValidFidlTable)]
#[fidl_table_src(netdev::PortBaseInfo)]
#[fidl_table_strict]
pub struct PortBaseInfo {
pub port_class: netdev::PortClass,
pub rx_types: Vec<netdev::FrameType>,
pub tx_types: Vec<netdev::FrameTypeSupport>,
}
#[derive(Debug, Clone, ValidFidlTable)]
#[fidl_table_src(netdev::PortInfo)]
#[fidl_table_strict]
pub struct PortInfo {
pub id: Port,
pub base_info: PortBaseInfo,
}
#[derive(Debug, Clone, PartialEq, Eq, ValidFidlTable)]
#[fidl_table_src(netdev::PortStatus)]
#[fidl_table_strict]
pub struct PortStatus {
pub flags: netdev::StatusFlags,
pub mtu: u32,
}
pub use netdev::DevicePortEvent;