1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Fuchsia netdevice client.

use std::convert::TryInto as _;

use async_utils::hanging_get::client::HangingGetStream;
use fidl_fuchsia_hardware_network as netdev;
use fidl_table_validation::ValidFidlTable;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use std::pin::pin;

use crate::error::{Error, Result};
use crate::session::{Config, DeviceInfo, Port, Session, Task};

#[derive(Clone)]
/// A client that communicates with a network device to send and receive packets.
pub struct Client {
    device: netdev::DeviceProxy,
}

/// A sensible default port stream buffer size.
const DEFAULT_PORT_STREAM_BUFFER_SIZE: u32 = 64;

/// Creates a [`Stream`] of [`PortStatus`] from a [`netdev::PortProxy`].
///
/// If `buffer` is `None`, a sensible nonzero default buffer size will be used.
pub fn new_port_status_stream(
    port_proxy: &netdev::PortProxy,
    buffer: Option<u32>,
) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
    let (watcher_proxy, watcher_server) =
        fidl::endpoints::create_proxy::<netdev::StatusWatcherMarker>()?;
    let () = port_proxy
        .get_status_watcher(watcher_server, buffer.unwrap_or(DEFAULT_PORT_STREAM_BUFFER_SIZE))?;
    Ok(HangingGetStream::new(watcher_proxy, netdev::StatusWatcherProxy::watch_status)
        .err_into()
        .and_then(|status| futures::future::ready(status.try_into().map_err(Error::PortStatus))))
}

impl Client {
    /// Creates a new network device client for the [`netdev::DeviceProxy`].
    pub fn new(device: netdev::DeviceProxy) -> Self {
        Client { device }
    }

    /// Connects to the specified `port`.
    pub fn connect_port(&self, port: Port) -> Result<netdev::PortProxy> {
        let (port_proxy, port_server) = fidl::endpoints::create_proxy::<netdev::PortMarker>()?;
        self.connect_port_server_end(port, port_server).map(move |()| port_proxy)
    }

    /// Connects to the specified `port` with the provided `server_end`.
    pub fn connect_port_server_end(
        &self,
        port: Port,
        server_end: fidl::endpoints::ServerEnd<netdev::PortMarker>,
    ) -> Result<()> {
        Ok(self.device.get_port(&port.into(), server_end)?)
    }

    /// Retrieves information about the underlying network device.
    pub async fn device_info(&self) -> Result<DeviceInfo> {
        Ok(self.device.get_info().await?.try_into()?)
    }

    /// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
    ///
    /// A sensible nonzero default buffer size will be used. It is encouraged
    /// to use this function.
    pub fn port_status_stream(
        &self,
        port: Port,
    ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
        self.port_status_stream_with_buffer_size(port, DEFAULT_PORT_STREAM_BUFFER_SIZE)
    }

    /// Gets a [`Stream`] of [`PortStatus`] for status changes from the device.
    pub fn port_status_stream_with_buffer_size(
        &self,
        port: Port,
        buffer: u32,
    ) -> Result<impl Stream<Item = Result<PortStatus>> + Unpin> {
        let port_proxy = self.connect_port(port)?;
        new_port_status_stream(&port_proxy, Some(buffer))
    }

    /// Waits for `port` to become online and report the [`PortStatus`].
    pub async fn wait_online(&self, port: Port) -> Result<PortStatus> {
        let stream = self
            .port_status_stream_with_buffer_size(
                port, 0, /* we are not interested in every event */
            )?
            .try_filter(|status| {
                futures::future::ready(status.flags.contains(netdev::StatusFlags::ONLINE))
            });
        let mut stream = pin!(stream);
        stream.next().await.expect("HangingGetStream should never terminate")
    }

    /// Gets a [`Stream`] of [`DevicePortEvent`] to monitor port changes from the device.
    pub fn device_port_event_stream(
        &self,
    ) -> Result<impl Stream<Item = Result<DevicePortEvent>> + Unpin> {
        let (proxy, server) = fidl::endpoints::create_proxy::<netdev::PortWatcherMarker>()?;
        let () = self.device.get_port_watcher(server)?;
        Ok(HangingGetStream::new(proxy, netdev::PortWatcherProxy::watch).err_into())
    }

    /// Creates a new session with the given the given `name` and `config`.
    pub async fn new_session(&self, name: &str, config: Config) -> Result<(Session, Task)> {
        Session::new(&self.device, name, config).await
    }

    /// Creates a primary session.
    pub async fn primary_session(
        &self,
        name: &str,
        default_buffer_length: usize,
    ) -> Result<(Session, Task)> {
        let device_info = self.device_info().await?;
        let primary_config = device_info.primary_config(default_buffer_length)?;
        Session::new(&self.device, name, primary_config).await
    }
}

/// Network device base info with all required fields.
#[derive(Debug, Clone, ValidFidlTable)]
#[fidl_table_src(netdev::PortBaseInfo)]
pub struct PortBaseInfo {
    /// Port's class.
    pub port_class: netdev::DeviceClass,
    /// Supported rx frame types on this port.
    pub rx_types: Vec<netdev::FrameType>,
    /// Supported tx frame types on this port.
    pub tx_types: Vec<netdev::FrameTypeSupport>,
}

/// Network device information with all required fields.
#[derive(Debug, Clone, ValidFidlTable)]
#[fidl_table_src(netdev::PortInfo)]
pub struct PortInfo {
    /// Port's identifier.
    pub id: Port,
    /// Port's base info.
    pub base_info: PortBaseInfo,
}

/// Dynamic port information with all required fields.
#[derive(Debug, Clone, PartialEq, Eq, ValidFidlTable)]
#[fidl_table_src(netdev::PortStatus)]
pub struct PortStatus {
    /// Port status flags.
    pub flags: netdev::StatusFlags,
    /// Maximum transmit unit for this port, in bytes.
    ///
    /// The reported MTU is the size of an entire frame, including any header
    /// and trailer bytes for whatever protocols this port supports.
    pub mtu: u32,
}

pub use netdev::DevicePortEvent;