1use std::fmt::{self, Display};
6use std::num::{NonZeroU32, TryFromIntError};
7use std::time::{Duration, Instant};
8
9use flex_client::ProxyHasDomain;
10use flex_fuchsia_developer_ffx_speedtest as fspeedtest;
11use futures::TryFutureExt as _;
12use thiserror::Error;
13
14use crate::throughput::BytesFormatter;
15use crate::{Throughput, socket};
16pub use socket::TransferParams;
17
18pub struct Client {
19 proxy: fspeedtest::SpeedtestProxy,
20}
21
22#[derive(Error, Debug)]
23pub enum ClientError {
24 #[error(transparent)]
25 Fidl(#[from] fidl::Error),
26 #[error("integer conversion error {0}")]
27 Conversion(#[from] TryFromIntError),
28 #[error(transparent)]
29 Transfer(#[from] socket::TransferError),
30 #[error(transparent)]
31 Encoding(#[from] socket::MissingFieldError),
32}
33
34#[derive(Debug)]
35pub struct PingReport {
36 pub min: Duration,
37 pub avg: Duration,
38 pub max: Duration,
39}
40
41impl Display for PingReport {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 let Self { max, avg, min } = self;
44 write!(f, "min = {min:?}, avg = {avg:?}, max = {max:?}")?;
45 Ok(())
46 }
47}
48
49#[derive(Debug, Copy, Clone, Eq, PartialEq)]
50pub enum Direction {
51 Tx,
52 Rx,
53}
54
55impl Direction {
56 pub fn flip(self) -> Self {
57 match self {
58 Self::Tx => Self::Rx,
59 Self::Rx => Self::Tx,
60 }
61 }
62
63 fn local_label(&self) -> &'static str {
64 match self {
65 Direction::Tx => "sender",
66 Direction::Rx => "receiver",
67 }
68 }
69}
70
71#[derive(Debug)]
72pub struct SocketTransferParams {
73 pub direction: Direction,
74 pub params: socket::TransferParams,
75}
76
77#[derive(Debug)]
78pub struct SocketTransferReport {
79 pub direction: Direction,
80 pub client: SocketTransferReportInner,
81 pub server: SocketTransferReportInner,
82}
83
84impl Display for SocketTransferReport {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 let Self { direction, client, server } = self;
87 let local_label = direction.local_label();
88 let remote_label = direction.flip().local_label();
89 writeln!(f, "local({local_label}): {client}")?;
90 write!(f, "remote({remote_label}): {server}")?;
91 Ok(())
92 }
93}
94
95#[derive(Debug)]
96pub struct SocketTransferReportInner {
97 pub transfer_len: NonZeroU32,
98 pub duration: Duration,
99 pub throughput: Throughput,
100}
101
102impl Display for SocketTransferReportInner {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 let Self { transfer_len, duration, throughput } = self;
105 let transfer_len = BytesFormatter(transfer_len.get().into());
106 write!(f, "{transfer_len} in {duration:.1?} => {throughput}")
107 }
108}
109
110impl SocketTransferReportInner {
111 fn new(params: &socket::TransferParams, report: socket::Report) -> Self {
112 let socket::Report { duration } = report;
113 let transfer_len = params.data_len;
114 let throughput = Throughput::from_len_and_duration(transfer_len.get(), duration);
115 Self { transfer_len, duration, throughput }
116 }
117}
118
119impl Client {
120 pub async fn new(proxy: fspeedtest::SpeedtestProxy) -> Result<Self, ClientError> {
121 proxy.ping().await?;
123 Ok(Self { proxy })
124 }
125
126 pub async fn ping(&self, repeat: NonZeroU32) -> Result<PingReport, ClientError> {
127 let mut total = Duration::ZERO;
128 let mut max = Duration::ZERO;
129 let mut min = Duration::MAX;
130 for _ in 0..repeat.get() {
131 let start = Instant::now();
132 self.proxy.ping().await?;
133 let dur = Instant::now() - start;
134 total += dur;
135 max = max.max(dur);
136 min = min.min(dur);
137 }
138
139 Ok(PingReport { max, avg: total / repeat.get(), min })
140 }
141
142 pub async fn socket(
143 &self,
144 params: SocketTransferParams,
145 ) -> Result<SocketTransferReport, ClientError> {
146 let SocketTransferParams { direction, params } = params;
147 let (client, server) = self.proxy.domain().create_stream_socket();
148 let transfer = socket::Transfer { socket: client, params: params.clone() };
149 let (server_report, client_report) = match direction {
150 Direction::Tx => {
151 let server_fut = self
152 .proxy
153 .socket_down(server, ¶ms.clone().try_into()?)
154 .map_err(ClientError::from);
155 let client_fut = transfer.send().map_err(ClientError::from);
156 futures::future::try_join(server_fut, client_fut).await?
157 }
158 Direction::Rx => {
159 let server_fut = self
160 .proxy
161 .socket_up(server, ¶ms.clone().try_into()?)
162 .map_err(ClientError::from);
163 let client_fut = transfer.receive().map_err(ClientError::from);
164 futures::future::try_join(server_fut, client_fut).await?
165 }
166 };
167 Ok(SocketTransferReport {
168 direction,
169 client: SocketTransferReportInner::new(¶ms, client_report),
170 server: SocketTransferReportInner::new(¶ms, server_report.try_into()?),
171 })
172 }
173}