netdevice_client/
client.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice client.
6
7use std::convert::TryInto as _;
8
9use async_utils::hanging_get::client::HangingGetStream;
10use fidl_fuchsia_hardware_network as netdev;
11use fidl_table_validation::ValidFidlTable;
12use futures::{Stream, StreamExt as _, TryStreamExt as _};
13use std::pin::pin;
14
15use crate::error::{Error, Result};
16use crate::session::{Config, DerivableConfig, DeviceInfo, Port, Session, Task};
17
18#[derive(Clone)]
19/// A client that communicates with a network device to send and receive packets.
20pub struct Client {
21    device: netdev::DeviceProxy,
22}
23
24/// A sensible default port stream buffer size.
25const DEFAULT_PORT_STREAM_BUFFER_SIZE: u32 = 64;
26
27/// Creates a [`Stream`] of [`PortStatus`] from a [`netdev::PortProxy`].
28///
29/// If `buffer` is `None`, a sensible nonzero default buffer size will be used.
30pub fn new_port_status_stream(
31    port_proxy: &netdev::PortProxy,
32    buffer: Option<u32>,
33) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
34    let (watcher_proxy, watcher_server) =
35        fidl::endpoints::create_proxy::<netdev::StatusWatcherMarker>();
36    let () = port_proxy
37        .get_status_watcher(watcher_server, buffer.unwrap_or(DEFAULT_PORT_STREAM_BUFFER_SIZE))?;
38    Ok(HangingGetStream::new(watcher_proxy, netdev::StatusWatcherProxy::watch_status)
39        .err_into()
40        .and_then(|status| futures::future::ready(status.try_into().map_err(Error::PortStatus))))
41}
42
43impl Client {
44    /// Creates a new network device client for the [`netdev::DeviceProxy`].
45    pub fn new(device: netdev::DeviceProxy) -> Self {
46        Client { device }
47    }
48
49    /// Connects to the specified `port`.
50    pub fn connect_port(&self, port: Port) -> Result<netdev::PortProxy> {
51        let (port_proxy, port_server) = fidl::endpoints::create_proxy::<netdev::PortMarker>();
52        self.connect_port_server_end(port, port_server).map(move |()| port_proxy)
53    }
54
55    /// Connects to the specified `port` with the provided `server_end`.
56    pub fn connect_port_server_end(
57        &self,
58        port: Port,
59        server_end: fidl::endpoints::ServerEnd<netdev::PortMarker>,
60    ) -> Result<()> {
61        Ok(self.device.get_port(&port.into(), server_end)?)
62    }
63
64    /// Retrieves information about the underlying network device.
65    pub async fn device_info(&self) -> Result<DeviceInfo> {
66        Ok(self.device.get_info().await?.try_into()?)
67    }
68
69    /// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
70    ///
71    /// A sensible nonzero default buffer size will be used. It is encouraged
72    /// to use this function.
73    pub fn port_status_stream(
74        &self,
75        port: Port,
76    ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
77        self.port_status_stream_with_buffer_size(port, DEFAULT_PORT_STREAM_BUFFER_SIZE)
78    }
79
80    /// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
81    pub fn port_status_stream_with_buffer_size(
82        &self,
83        port: Port,
84        buffer: u32,
85    ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
86        let port_proxy = self.connect_port(port)?;
87        new_port_status_stream(&port_proxy, Some(buffer))
88    }
89
90    /// Waits for `port` to become online and report the [`PortStatus`].
91    pub async fn wait_online(&self, port: Port) -> Result<PortStatus> {
92        let stream = self
93            .port_status_stream_with_buffer_size(
94                port, 0, /* we are not interested in every event */
95            )?
96            .try_filter(|status| {
97                futures::future::ready(status.flags.contains(netdev::StatusFlags::ONLINE))
98            });
99        let mut stream = pin!(stream);
100        stream.next().await.expect("HangingGetStream should never terminate")
101    }
102
103    /// Gets a [`Stream`] of [`DevicePortEvent`] to monitor port changes from the device.
104    pub fn device_port_event_stream(
105        &self,
106    ) -> Result<impl Stream<Item = Result<DevicePortEvent>> + Unpin> {
107        let (proxy, server) = fidl::endpoints::create_proxy::<netdev::PortWatcherMarker>();
108        let () = self.device.get_port_watcher(server)?;
109        Ok(HangingGetStream::new(proxy, netdev::PortWatcherProxy::watch).err_into())
110    }
111
112    /// Creates a new session with the given the given `name` and `config`.
113    pub async fn new_session(&self, name: &str, config: Config) -> Result<(Session, Task)> {
114        Session::new(&self.device, name, config).await
115    }
116
117    /// Creates a session using the higher level [`DerivableConfig`] instead of
118    /// a raw [`Config`].
119    pub async fn new_session_with_derivable_config(
120        &self,
121        name: &str,
122        config: DerivableConfig,
123    ) -> Result<(Session, Task)> {
124        let device_info = self.device_info().await?;
125        let config = device_info.make_config(config)?;
126        Session::new(&self.device, name, config).await
127    }
128}
129
130/// Network device base info with all required fields.
131#[derive(Debug, Clone, ValidFidlTable)]
132#[fidl_table_src(netdev::PortBaseInfo)]
133#[fidl_table_strict]
134pub struct PortBaseInfo {
135    /// Port's class.
136    pub port_class: netdev::PortClass,
137    /// Supported rx frame types on this port.
138    pub rx_types: Vec<netdev::FrameType>,
139    /// Supported tx frame types on this port.
140    pub tx_types: Vec<netdev::FrameTypeSupport>,
141}
142
143/// Network device information with all required fields.
144#[derive(Debug, Clone, ValidFidlTable)]
145#[fidl_table_src(netdev::PortInfo)]
146#[fidl_table_strict]
147pub struct PortInfo {
148    /// Port's identifier.
149    pub id: Port,
150    /// Port's base info.
151    pub base_info: PortBaseInfo,
152}
153
154/// Dynamic port information with all required fields.
155#[derive(Debug, Clone, PartialEq, Eq, ValidFidlTable)]
156#[fidl_table_src(netdev::PortStatus)]
157#[fidl_table_strict]
158pub struct PortStatus {
159    /// Port status flags.
160    pub flags: netdev::StatusFlags,
161    /// Maximum transmit unit for this port, in bytes.
162    ///
163    /// The reported MTU is the size of an entire frame, including any header
164    /// and trailer bytes for whatever protocols this port supports.
165    pub mtu: u32,
166}
167
168pub use netdev::DevicePortEvent;