1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45//! Fuchsia netdevice client.
67use std::convert::TryInto as _;
89use async_utils::hanging_get::client::HangingGetStream;
10use fidl_fuchsia_hardware_network as netdev;
11use fidl_table_validation::ValidFidlTable;
12use futures::{Stream, StreamExt as _, TryStreamExt as _};
13use std::pin::pin;
1415use crate::error::{Error, Result};
16use crate::session::{Config, DerivableConfig, DeviceInfo, Port, Session, Task};
1718#[derive(Clone)]
19/// A client that communicates with a network device to send and receive packets.
20pub struct Client {
21 device: netdev::DeviceProxy,
22}
2324/// A sensible default port stream buffer size.
25const DEFAULT_PORT_STREAM_BUFFER_SIZE: u32 = 64;
2627/// Creates a [`Stream`] of [`PortStatus`] from a [`netdev::PortProxy`].
28///
29/// If `buffer` is `None`, a sensible nonzero default buffer size will be used.
30pub fn new_port_status_stream(
31 port_proxy: &netdev::PortProxy,
32 buffer: Option<u32>,
33) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
34let (watcher_proxy, watcher_server) =
35 fidl::endpoints::create_proxy::<netdev::StatusWatcherMarker>();
36let () = port_proxy
37 .get_status_watcher(watcher_server, buffer.unwrap_or(DEFAULT_PORT_STREAM_BUFFER_SIZE))?;
38Ok(HangingGetStream::new(watcher_proxy, netdev::StatusWatcherProxy::watch_status)
39 .err_into()
40 .and_then(|status| futures::future::ready(status.try_into().map_err(Error::PortStatus))))
41}
4243impl Client {
44/// Creates a new network device client for the [`netdev::DeviceProxy`].
45pub fn new(device: netdev::DeviceProxy) -> Self {
46 Client { device }
47 }
4849/// Connects to the specified `port`.
50pub fn connect_port(&self, port: Port) -> Result<netdev::PortProxy> {
51let (port_proxy, port_server) = fidl::endpoints::create_proxy::<netdev::PortMarker>();
52self.connect_port_server_end(port, port_server).map(move |()| port_proxy)
53 }
5455/// Connects to the specified `port` with the provided `server_end`.
56pub fn connect_port_server_end(
57&self,
58 port: Port,
59 server_end: fidl::endpoints::ServerEnd<netdev::PortMarker>,
60 ) -> Result<()> {
61Ok(self.device.get_port(&port.into(), server_end)?)
62 }
6364/// Retrieves information about the underlying network device.
65pub async fn device_info(&self) -> Result<DeviceInfo> {
66Ok(self.device.get_info().await?.try_into()?)
67 }
6869/// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
70 ///
71 /// A sensible nonzero default buffer size will be used. It is encouraged
72 /// to use this function.
73pub fn port_status_stream(
74&self,
75 port: Port,
76 ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
77self.port_status_stream_with_buffer_size(port, DEFAULT_PORT_STREAM_BUFFER_SIZE)
78 }
7980/// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
81pub fn port_status_stream_with_buffer_size(
82&self,
83 port: Port,
84 buffer: u32,
85 ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
86let port_proxy = self.connect_port(port)?;
87 new_port_status_stream(&port_proxy, Some(buffer))
88 }
8990/// Waits for `port` to become online and report the [`PortStatus`].
91pub async fn wait_online(&self, port: Port) -> Result<PortStatus> {
92let stream = self
93.port_status_stream_with_buffer_size(
94 port, 0, /* we are not interested in every event */
95)?
96.try_filter(|status| {
97 futures::future::ready(status.flags.contains(netdev::StatusFlags::ONLINE))
98 });
99let mut stream = pin!(stream);
100 stream.next().await.expect("HangingGetStream should never terminate")
101 }
102103/// Gets a [`Stream`] of [`DevicePortEvent`] to monitor port changes from the device.
104pub fn device_port_event_stream(
105&self,
106 ) -> Result<impl Stream<Item = Result<DevicePortEvent>> + Unpin> {
107let (proxy, server) = fidl::endpoints::create_proxy::<netdev::PortWatcherMarker>();
108let () = self.device.get_port_watcher(server)?;
109Ok(HangingGetStream::new(proxy, netdev::PortWatcherProxy::watch).err_into())
110 }
111112/// Creates a new session with the given the given `name` and `config`.
113pub async fn new_session(&self, name: &str, config: Config) -> Result<(Session, Task)> {
114 Session::new(&self.device, name, config).await
115}
116117/// Creates a session using the higher level [`DerivableConfig`] instead of
118 /// a raw [`Config`].
119pub async fn new_session_with_derivable_config(
120&self,
121 name: &str,
122 config: DerivableConfig,
123 ) -> Result<(Session, Task)> {
124let device_info = self.device_info().await?;
125let config = device_info.make_config(config)?;
126 Session::new(&self.device, name, config).await
127}
128}
129130/// Network device base info with all required fields.
131#[derive(Debug, Clone, ValidFidlTable)]
132#[fidl_table_src(netdev::PortBaseInfo)]
133#[fidl_table_strict]
134pub struct PortBaseInfo {
135/// Port's class.
136pub port_class: netdev::PortClass,
137/// Supported rx frame types on this port.
138pub rx_types: Vec<netdev::FrameType>,
139/// Supported tx frame types on this port.
140pub tx_types: Vec<netdev::FrameTypeSupport>,
141}
142143/// Network device information with all required fields.
144#[derive(Debug, Clone, ValidFidlTable)]
145#[fidl_table_src(netdev::PortInfo)]
146#[fidl_table_strict]
147pub struct PortInfo {
148/// Port's identifier.
149pub id: Port,
150/// Port's base info.
151pub base_info: PortBaseInfo,
152}
153154/// Dynamic port information with all required fields.
155#[derive(Debug, Clone, PartialEq, Eq, ValidFidlTable)]
156#[fidl_table_src(netdev::PortStatus)]
157#[fidl_table_strict]
158pub struct PortStatus {
159/// Port status flags.
160pub flags: netdev::StatusFlags,
161/// Maximum transmit unit for this port, in bytes.
162 ///
163 /// The reported MTU is the size of an entire frame, including any header
164 /// and trailer bytes for whatever protocols this port supports.
165pub mtu: u32,
166}
167168pub use netdev::DevicePortEvent;