1use crate::platform::PlatformServices;
6use anyhow::{Error, anyhow};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9 GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use fuchsia_async as fasync;
12use futures::{AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt, select, try_join};
13use guest_cli_args as arguments;
14use prettytable::format::consts::FORMAT_CLEAN;
15use prettytable::{Table, cell, row};
16use std::collections::{HashMap, HashSet};
17use std::fmt;
18use std::io::Write;
19
20const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
21const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
22const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
23
24const HOST_PORT: u32 = 8500;
25const CONTROL_STREAM: u32 = 8501;
26const LATENCY_CHECK_STREAM: u32 = 8502;
27
28const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
29const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
30
31const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
32const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
33const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
34const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
35const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
36const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
37const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
38const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
39const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
40const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
41
42const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
43#[allow(dead_code)]
44const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
45
46#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
47enum PercentileUnit {
48 Nanoseconds,
49 MebibytesPerSecond,
50}
51
52#[derive(serde::Serialize, serde::Deserialize)]
53pub struct Percentiles {
54 min: f64,
55 p_25th: f64,
56 p_50th: f64,
57 p_75th: f64,
58 p_99th: f64,
59 max: f64,
60 unit: PercentileUnit,
61}
62
63impl fmt::Display for Percentiles {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 let get_units = |val: f64, unit: PercentileUnit| -> String {
66 match unit {
67 PercentileUnit::Nanoseconds => {
68 format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
69 }
70 PercentileUnit::MebibytesPerSecond => {
71 format!("{:.2}MiB/s", val)
72 }
73 }
74 };
75
76 let mut table = Table::new();
77 table.set_format(*FORMAT_CLEAN);
78
79 table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
80 table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
81 table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
82 table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
83 table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
84 table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
85
86 write!(f, "\n{}", table)
87 }
88}
89
90#[derive(Default, serde::Serialize, serde::Deserialize)]
91pub struct Measurements {
92 data_corruption: Option<bool>,
93 round_trip_page: Option<Percentiles>,
94 tx_throughput: Option<Percentiles>,
95 rx_throughput: Option<Percentiles>,
96 single_stream_unidirectional: Option<Percentiles>,
97 multi_stream_unidirectional: Option<Percentiles>,
98}
99
100impl fmt::Display for Measurements {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
103 match percentiles {
104 None => " NOT RUN".to_owned(),
105 Some(percentile) => percentile.to_string(),
106 }
107 };
108
109 writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
110
111 writeln!(
112 f,
113 "* Data corruption check: {}",
114 match self.data_corruption {
115 None => "NOT RUN",
116 Some(result) =>
117 if result {
118 "PASSED"
119 } else {
120 "FAILED"
121 },
122 }
123 )?;
124
125 writeln!(
126 f,
127 "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
128 format_percentiles(&self.round_trip_page)
129 )?;
130 writeln!(
131 f,
132 "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
133 format_percentiles(&self.tx_throughput)
134 )?;
135 writeln!(
136 f,
137 "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
138 format_percentiles(&self.rx_throughput)
139 )?;
140 writeln!(
141 f,
142 "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
143 format_percentiles(&self.single_stream_unidirectional)
144 )?;
145 writeln!(
146 f,
147 "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
148 format_percentiles(&self.multi_stream_unidirectional)
149 )
150 }
151}
152
153#[derive(serde::Serialize, serde::Deserialize)]
154pub enum VsockPerfResult {
155 BenchmarkComplete(Box<Measurements>),
156 UnsupportedGuest(arguments::GuestType),
157 Internal(String),
158}
159
160impl fmt::Display for VsockPerfResult {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 match self {
163 VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
164 VsockPerfResult::UnsupportedGuest(guest) => {
165 write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
166 }
167 VsockPerfResult::Internal(context) => {
168 write!(f, "Internal error: {}", context)
169 }
170 }
171 }
172}
173
174fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
175 #[cfg(target_os = "fuchsia")]
176 {
177 (after - before).into_nanos()
178 }
179
180 #[cfg(not(target_os = "fuchsia"))]
181 {
182 (after - before).as_nanos().try_into().unwrap()
183 }
184}
185
186pub async fn handle_vsockperf<P: PlatformServices>(
187 services: &P,
188 args: &arguments::vsockperf_args::VsockPerfArgs,
189) -> Result<VsockPerfResult, Error> {
190 if args.guest_type != arguments::GuestType::Debian {
191 return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
192 }
193
194 let guest_manager = services.connect_to_manager(args.guest_type).await?;
195 #[allow(clippy::large_futures)]
196 Ok(match run_micro_benchmark(guest_manager).await {
197 Err(err) => VsockPerfResult::Internal(format!("{}", err)),
198 Ok(result) => VsockPerfResult::BenchmarkComplete(Box::new(result)),
199 })
200}
201
202fn percentile(durations: &[u64], percentile: u8) -> u64 {
203 assert!(percentile <= 100 && !durations.is_empty());
204 let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
206 durations[location]
207}
208
209fn latency_percentile(durations: &[u64]) -> Percentiles {
210 Percentiles {
211 min: percentile(&durations, 0) as f64,
212 p_25th: percentile(&durations, 25) as f64,
213 p_50th: percentile(&durations, 50) as f64,
214 p_75th: percentile(&durations, 75) as f64,
215 p_99th: percentile(&durations, 99) as f64,
216 max: percentile(&durations, 100) as f64,
217 unit: PercentileUnit::Nanoseconds,
218 }
219}
220
221fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
222 let to_mebibytes_per_second = |nanos: u64| -> f64 {
223 let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
224 let bytes_per_second = (bytes as f64) / seconds;
225 bytes_per_second / (1 << 20) as f64
226 };
227
228 Percentiles {
229 min: to_mebibytes_per_second(percentile(&durations, 0)),
230 p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
231 p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
232 p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
233 p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
234 max: to_mebibytes_per_second(percentile(&durations, 100)),
235 unit: PercentileUnit::MebibytesPerSecond,
236 }
237}
238
239async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
240 let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
242 for i in 0..100 {
243 let pattern = format!("DAVID{:0>3}", i).repeat(512);
244 let packet = pattern.as_bytes();
245 assert_eq!(packet.len(), buffer.len());
246
247 if packet.len() != socket.as_ref().write(&packet)? {
248 return Err(anyhow!("failed to write full packet"));
249 }
250
251 let timeout =
252 fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
253 select! {
254 () = fasync::Timer::new(timeout).fuse() => {
255 return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
256 }
257 result = socket.read_exact(&mut buffer).fuse() => {
258 result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
259 }
260 }
261
262 if buffer != packet {
263 return Ok(false);
264 }
265 }
266
267 Ok(true)
268}
269
270async fn wait_for_magic_numbers(
272 mut numbers: HashSet<u8>,
273 control_socket: &mut fasync::Socket,
274) -> Result<(), Error> {
275 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
276 let mut magic_buf = [0u8];
277 loop {
278 select! {
279 () = fasync::Timer::new(timeout).fuse() => {
280 return Err(anyhow!("timeout waiting 5s to get the test ready"));
281 }
282 result = control_socket.read_exact(&mut magic_buf).fuse() => {
283 result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
284 match numbers.contains(&magic_buf[0]) {
285 false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
286 true => {
287 numbers.remove(&magic_buf[0]);
288 Ok(())
289 }
290 }?;
291
292 if numbers.is_empty() {
293 break;
294 }
295 }
296 }
297 }
298
299 Ok(())
300}
301
302async fn read_single_stream(
303 total_size: usize,
304 socket: &mut fasync::Socket,
305) -> Result<fasync::MonotonicInstant, Error> {
306 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
307 let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
309
310 for _ in 0..segments {
311 select! {
312 () = fasync::Timer::new(timeout).fuse() => {
313 return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
314 }
315 result = socket.read_exact(&mut buffer).fuse() => {
316 result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
317 }
318 }
319 }
320
321 Ok(fasync::MonotonicInstant::now())
322}
323
324async fn write_single_stream(
325 total_size: usize,
326 socket: &mut fasync::Socket,
327) -> Result<fasync::MonotonicInstant, Error> {
328 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
329 let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
331
332 for _ in 0..segments {
333 select! {
334 () = fasync::Timer::new(timeout).fuse() => {
335 return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
336 }
337 result = socket.write_all(&buffer).fuse() => {
338 result.map_err(
339 |err| anyhow!("failed to write segment to socket: {}", err))?;
340 }
341 }
342 }
343
344 Ok(fasync::MonotonicInstant::now())
345}
346
347async fn write_read_high_throughput(
348 total_size: usize,
349 socket: &mut fasync::Socket,
350) -> Result<(), Error> {
351 write_single_stream(total_size, socket).await?;
354 read_single_stream(total_size, socket).await?;
355 Ok(())
356}
357
358#[cfg(target_os = "fuchsia")]
359async fn run_single_stream_bidirectional_test(
360 mut read_socket: fasync::Socket,
361 control_socket: &mut fasync::Socket,
362 measurements: &mut Measurements,
363) -> Result<(), Error> {
364 println!("Starting single stream bidirectional round trip throughput test...");
365
366 let mut write_socket = fasync::Socket::from_socket(
367 read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
368 );
369
370 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
371 .await?;
372
373 let total_size = THROUGHPUT_SIZE_BYTES;
374 let mut rx_durations: Vec<u64> = Vec::new();
375 let mut tx_durations: Vec<u64> = Vec::new();
376
377 for i in 0..100 {
378 let before = fasync::MonotonicInstant::now();
379
380 let (write_finish, read_finish) = try_join!(
381 write_single_stream(total_size, &mut write_socket),
382 read_single_stream(total_size, &mut read_socket)
383 )?;
384
385 rx_durations.push(
386 get_time_delta_nanos(before, write_finish)
387 .try_into()
388 .expect("durations measured by the same thread must be greater than zero"),
389 );
390
391 tx_durations.push(
392 get_time_delta_nanos(before, read_finish)
393 .try_into()
394 .expect("durations measured by the same thread must be greater than zero"),
395 );
396
397 print!("\rFinished {} bidirectional throughput measurements", i + 1);
398 std::io::stdout().flush().expect("failed to flush stdout");
399 }
400
401 rx_durations.sort();
402 rx_durations.reverse();
403
404 tx_durations.sort();
405 tx_durations.reverse();
406
407 assert_eq!(rx_durations.len(), tx_durations.len());
408 println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
409
410 measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
411 measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
412
413 Ok(())
414}
415
416async fn run_single_stream_unidirectional_round_trip_test(
417 mut data_socket: fasync::Socket,
418 control_socket: &mut fasync::Socket,
419 measurements: &mut Measurements,
420) -> Result<(), Error> {
421 println!("Starting single stream unidirectional round trip throughput test...");
422
423 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
424
425 let total_size = THROUGHPUT_SIZE_BYTES;
426 let mut durations: Vec<u64> = Vec::new();
427
428 for i in 0..100 {
429 let before = fasync::MonotonicInstant::now();
430
431 write_read_high_throughput(total_size, &mut data_socket).await?;
432
433 let after = fasync::MonotonicInstant::now();
434 durations.push(
435 get_time_delta_nanos(before, after)
436 .try_into()
437 .expect("durations measured by the same thread must be greater than zero"),
438 );
439
440 print!("\rFinished {} round trip throughput measurements", i + 1);
441 std::io::stdout().flush().expect("failed to flush stdout");
442 }
443
444 durations.sort();
445 durations.reverse();
446 println!("\rFinished {} single stream round trip throughput measurements", durations.len());
447
448 measurements.single_stream_unidirectional =
449 Some(throughput_percentile(&durations, total_size * 2));
450
451 Ok(())
452}
453
454async fn run_multi_stream_unidirectional_round_trip_test(
455 mut data_socket1: fasync::Socket,
456 mut data_socket2: fasync::Socket,
457 mut data_socket3: fasync::Socket,
458 mut data_socket4: fasync::Socket,
459 mut data_socket5: fasync::Socket,
460 control_socket: &mut fasync::Socket,
461 measurements: &mut Measurements,
462) -> Result<(), Error> {
463 println!("Starting multistream unidirectional round trip throughput test...");
464
465 wait_for_magic_numbers(
466 HashSet::from([
467 MULTI_STREAM_MAGIC_NUM1,
468 MULTI_STREAM_MAGIC_NUM2,
469 MULTI_STREAM_MAGIC_NUM3,
470 MULTI_STREAM_MAGIC_NUM4,
471 MULTI_STREAM_MAGIC_NUM5,
472 ]),
473 control_socket,
474 )
475 .await?;
476
477 let total_size = THROUGHPUT_SIZE_BYTES;
478 let mut durations: Vec<u64> = Vec::new();
479
480 for i in 0..50 {
481 let before = fasync::MonotonicInstant::now();
482
483 try_join!(
484 write_read_high_throughput(total_size, &mut data_socket1),
485 write_read_high_throughput(total_size, &mut data_socket2),
486 write_read_high_throughput(total_size, &mut data_socket3),
487 write_read_high_throughput(total_size, &mut data_socket4),
488 write_read_high_throughput(total_size, &mut data_socket5)
489 )?;
490
491 let after = fasync::MonotonicInstant::now();
492 durations.push(
493 get_time_delta_nanos(before, after)
494 .try_into()
495 .expect("durations measured by the same thread must be greater than zero"),
496 );
497
498 print!("\rFinished {} multistream round trip throughput measurements", i + 1);
499 std::io::stdout().flush().expect("failed to flush stdout");
500 }
501
502 durations.sort();
503 durations.reverse();
504 println!("\rFinished {} multistream round trip throughput measurements", durations.len());
505
506 measurements.multi_stream_unidirectional =
507 Some(throughput_percentile(&durations, total_size * 2));
508
509 Ok(())
510}
511
512async fn run_latency_test(
513 mut socket: fasync::Socket,
514 measurements: &mut Measurements,
515) -> Result<(), Error> {
516 println!("Checking for data corruption...");
517 measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
518 println!("Finished data corruption check");
519
520 let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
521 let mut buffer = vec![0u8; packet.len()];
522 let mut latencies: Vec<u64> = Vec::new();
523
524 println!("Starting latency test...");
525 for i in 0..10000 {
526 let before = fasync::MonotonicInstant::now();
527 let timeout = before + std::time::Duration::from_millis(100).into();
528
529 if packet.len() != socket.as_ref().write(&packet)? {
530 return Err(anyhow!("failed to write full packet"));
531 }
532
533 select! {
534 () = fasync::Timer::new(timeout).fuse() => {
535 return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
536 }
537 result = socket.read_exact(&mut buffer).fuse() => {
538 result.map_err(
539 |err| anyhow!("failed to read from socket during latency test: {}", err))?;
540 }
541 }
542
543 let after = fasync::MonotonicInstant::now();
544 latencies.push(
545 get_time_delta_nanos(before, after)
546 .try_into()
547 .expect("durations measured by the same thread must be greater than zero"),
548 );
549
550 if (i + 1) % 50 == 0 {
551 print!("\rFinished measuring round trip latency for {} packets", i + 1);
552 std::io::stdout().flush().expect("failed to flush stdout");
553 }
554 }
555
556 latencies.sort();
557 println!("\rFinished measuring round trip latency for {} packets", latencies.len());
558
559 measurements.round_trip_page = Some(latency_percentile(&latencies));
560
561 Ok(())
562}
563
564async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
565 let guest_info = guest_manager.get_info().await?;
566 if guest_info.guest_status.unwrap() != GuestStatus::Running {
567 return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
568 }
569
570 let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
571 guest_manager
572 .connect(guest_server_end)
573 .await
574 .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
575 .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
576
577 let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
578 guest_endpoint
579 .get_host_vsock_endpoint(vsock_server_end)
580 .await?
581 .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
582
583 let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
584 vsock_endpoint
585 .listen(HOST_PORT, acceptor)
586 .await
587 .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
588 .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
589
590 let socket = guest_endpoint
591 .get_console()
592 .await
593 .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
594 .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
595
596 let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
599 let bytes_written = socket
600 .write(command)
601 .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
602 if bytes_written != command.len() {
603 return Err(anyhow!(
604 "attempted to send command '{}', but only managed to write '{}'",
605 std::str::from_utf8(command).expect("failed to parse as utf-8"),
606 std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
607 ));
608 }
609
610 let mut expected_connections = HashSet::from([
611 CONTROL_STREAM,
612 LATENCY_CHECK_STREAM,
613 SINGLE_STREAM_THROUGHPUT,
614 MULTI_STREAM_THROUGHPUT1,
615 MULTI_STREAM_THROUGHPUT2,
616 MULTI_STREAM_THROUGHPUT3,
617 MULTI_STREAM_THROUGHPUT4,
618 MULTI_STREAM_THROUGHPUT5,
619 SINGLE_STREAM_BIDIRECTIONAL,
620 ]);
621 let mut active_connections = HashMap::new();
622
623 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
625 loop {
626 select! {
627 () = fasync::Timer::new(timeout).fuse() => {
628 return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
629 }
630 request = client_stream.try_next() => {
631 let request = request
632 .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
633 .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
634 let (_src_cid, src_port, _port, responder) = request
635 .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
636
637 match expected_connections.contains(&src_port) {
638 false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
639 true => {
640 expected_connections.remove(&src_port);
641 Ok(())
642 }
643 }?;
644
645 let (client_socket, device_socket) = fidl::Socket::create_stream();
646 let client_socket = fasync::Socket::from_socket(client_socket);
647
648 responder.send(Ok(device_socket))
649 .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
650
651 if let Some(_) = active_connections.insert(src_port, client_socket) {
652 panic!("Connections must be unique");
653 }
654
655 if expected_connections.is_empty() {
656 break;
657 }
658 }
659 }
660 }
661
662 let mut measurements = Measurements::default();
663
664 run_latency_test(
665 active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
666 &mut measurements,
667 )
668 .await?;
669
670 #[cfg(target_os = "fuchsia")]
672 run_single_stream_bidirectional_test(
673 active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
674 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
675 &mut measurements,
676 )
677 .await?;
678
679 run_single_stream_unidirectional_round_trip_test(
680 active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
681 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
682 &mut measurements,
683 )
684 .await?;
685
686 #[allow(clippy::large_futures)]
687 run_multi_stream_unidirectional_round_trip_test(
688 active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
689 active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
690 active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
691 active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
692 active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
693 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
694 &mut measurements,
695 )
696 .await?;
697
698 return Ok(measurements);
699}