1use crate::platform::PlatformServices;
6use anyhow::{Error, anyhow};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9 GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use futures::{AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt, select, try_join};
12use prettytable::format::consts::FORMAT_CLEAN;
13use prettytable::{Table, cell, row};
14use std::collections::{HashMap, HashSet};
15use std::fmt;
16use std::io::Write;
17use {fuchsia_async as fasync, guest_cli_args as arguments, zx_status};
18
19const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
20const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
21const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
22
23const HOST_PORT: u32 = 8500;
24const CONTROL_STREAM: u32 = 8501;
25const LATENCY_CHECK_STREAM: u32 = 8502;
26
27const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
28const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
29
30const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
31const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
32const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
33const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
34const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
35const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
36const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
37const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
38const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
39const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
40
41const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
42#[allow(dead_code)]
43const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
44
45#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
46enum PercentileUnit {
47 Nanoseconds,
48 MebibytesPerSecond,
49}
50
51#[derive(serde::Serialize, serde::Deserialize)]
52pub struct Percentiles {
53 min: f64,
54 p_25th: f64,
55 p_50th: f64,
56 p_75th: f64,
57 p_99th: f64,
58 max: f64,
59 unit: PercentileUnit,
60}
61
62impl fmt::Display for Percentiles {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 let get_units = |val: f64, unit: PercentileUnit| -> String {
65 match unit {
66 PercentileUnit::Nanoseconds => {
67 format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
68 }
69 PercentileUnit::MebibytesPerSecond => {
70 format!("{:.2}MiB/s", val)
71 }
72 }
73 };
74
75 let mut table = Table::new();
76 table.set_format(*FORMAT_CLEAN);
77
78 table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
79 table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
80 table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
81 table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
82 table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
83 table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
84
85 write!(f, "\n{}", table)
86 }
87}
88
89#[derive(Default, serde::Serialize, serde::Deserialize)]
90pub struct Measurements {
91 data_corruption: Option<bool>,
92 round_trip_page: Option<Percentiles>,
93 tx_throughput: Option<Percentiles>,
94 rx_throughput: Option<Percentiles>,
95 single_stream_unidirectional: Option<Percentiles>,
96 multi_stream_unidirectional: Option<Percentiles>,
97}
98
99impl fmt::Display for Measurements {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
102 match percentiles {
103 None => " NOT RUN".to_owned(),
104 Some(percentile) => percentile.to_string(),
105 }
106 };
107
108 writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
109
110 writeln!(
111 f,
112 "* Data corruption check: {}",
113 match self.data_corruption {
114 None => "NOT RUN",
115 Some(result) =>
116 if result {
117 "PASSED"
118 } else {
119 "FAILED"
120 },
121 }
122 )?;
123
124 writeln!(
125 f,
126 "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
127 format_percentiles(&self.round_trip_page)
128 )?;
129 writeln!(
130 f,
131 "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
132 format_percentiles(&self.tx_throughput)
133 )?;
134 writeln!(
135 f,
136 "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
137 format_percentiles(&self.rx_throughput)
138 )?;
139 writeln!(
140 f,
141 "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
142 format_percentiles(&self.single_stream_unidirectional)
143 )?;
144 writeln!(
145 f,
146 "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
147 format_percentiles(&self.multi_stream_unidirectional)
148 )
149 }
150}
151
152#[derive(serde::Serialize, serde::Deserialize)]
153pub enum VsockPerfResult {
154 BenchmarkComplete(Box<Measurements>),
155 UnsupportedGuest(arguments::GuestType),
156 Internal(String),
157}
158
159impl fmt::Display for VsockPerfResult {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 match self {
162 VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
163 VsockPerfResult::UnsupportedGuest(guest) => {
164 write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
165 }
166 VsockPerfResult::Internal(context) => {
167 write!(f, "Internal error: {}", context)
168 }
169 }
170 }
171}
172
173fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
174 #[cfg(target_os = "fuchsia")]
175 {
176 (after - before).into_nanos()
177 }
178
179 #[cfg(not(target_os = "fuchsia"))]
180 {
181 (after - before).as_nanos().try_into().unwrap()
182 }
183}
184
185pub async fn handle_vsockperf<P: PlatformServices>(
186 services: &P,
187 args: &arguments::vsockperf_args::VsockPerfArgs,
188) -> Result<VsockPerfResult, Error> {
189 if args.guest_type != arguments::GuestType::Debian {
190 return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
191 }
192
193 let guest_manager = services.connect_to_manager(args.guest_type).await?;
194 #[allow(clippy::large_futures)]
195 Ok(match run_micro_benchmark(guest_manager).await {
196 Err(err) => VsockPerfResult::Internal(format!("{}", err)),
197 Ok(result) => VsockPerfResult::BenchmarkComplete(Box::new(result)),
198 })
199}
200
201fn percentile(durations: &[u64], percentile: u8) -> u64 {
202 assert!(percentile <= 100 && !durations.is_empty());
203 let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
205 durations[location]
206}
207
208fn latency_percentile(durations: &[u64]) -> Percentiles {
209 Percentiles {
210 min: percentile(&durations, 0) as f64,
211 p_25th: percentile(&durations, 25) as f64,
212 p_50th: percentile(&durations, 50) as f64,
213 p_75th: percentile(&durations, 75) as f64,
214 p_99th: percentile(&durations, 99) as f64,
215 max: percentile(&durations, 100) as f64,
216 unit: PercentileUnit::Nanoseconds,
217 }
218}
219
220fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
221 let to_mebibytes_per_second = |nanos: u64| -> f64 {
222 let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
223 let bytes_per_second = (bytes as f64) / seconds;
224 bytes_per_second / (1 << 20) as f64
225 };
226
227 Percentiles {
228 min: to_mebibytes_per_second(percentile(&durations, 0)),
229 p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
230 p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
231 p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
232 p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
233 max: to_mebibytes_per_second(percentile(&durations, 100)),
234 unit: PercentileUnit::MebibytesPerSecond,
235 }
236}
237
238async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
239 let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
241 for i in 0..100 {
242 let pattern = format!("DAVID{:0>3}", i).repeat(512);
243 let packet = pattern.as_bytes();
244 assert_eq!(packet.len(), buffer.len());
245
246 if packet.len() != socket.as_ref().write(&packet)? {
247 return Err(anyhow!("failed to write full packet"));
248 }
249
250 let timeout =
251 fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
252 select! {
253 () = fasync::Timer::new(timeout).fuse() => {
254 return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
255 }
256 result = socket.read_exact(&mut buffer).fuse() => {
257 result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
258 }
259 }
260
261 if buffer != packet {
262 return Ok(false);
263 }
264 }
265
266 Ok(true)
267}
268
269async fn wait_for_magic_numbers(
271 mut numbers: HashSet<u8>,
272 control_socket: &mut fasync::Socket,
273) -> Result<(), Error> {
274 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
275 let mut magic_buf = [0u8];
276 loop {
277 select! {
278 () = fasync::Timer::new(timeout).fuse() => {
279 return Err(anyhow!("timeout waiting 5s to get the test ready"));
280 }
281 result = control_socket.read_exact(&mut magic_buf).fuse() => {
282 result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
283 match numbers.contains(&magic_buf[0]) {
284 false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
285 true => {
286 numbers.remove(&magic_buf[0]);
287 Ok(())
288 }
289 }?;
290
291 if numbers.is_empty() {
292 break;
293 }
294 }
295 }
296 }
297
298 Ok(())
299}
300
301async fn read_single_stream(
302 total_size: usize,
303 socket: &mut fasync::Socket,
304) -> Result<fasync::MonotonicInstant, Error> {
305 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
306 let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
308
309 for _ in 0..segments {
310 select! {
311 () = fasync::Timer::new(timeout).fuse() => {
312 return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
313 }
314 result = socket.read_exact(&mut buffer).fuse() => {
315 result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
316 }
317 }
318 }
319
320 Ok(fasync::MonotonicInstant::now())
321}
322
323async fn write_single_stream(
324 total_size: usize,
325 socket: &mut fasync::Socket,
326) -> Result<fasync::MonotonicInstant, Error> {
327 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
328 let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
330
331 for _ in 0..segments {
332 select! {
333 () = fasync::Timer::new(timeout).fuse() => {
334 return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
335 }
336 result = socket.write_all(&buffer).fuse() => {
337 result.map_err(
338 |err| anyhow!("failed to write segment to socket: {}", err))?;
339 }
340 }
341 }
342
343 Ok(fasync::MonotonicInstant::now())
344}
345
346async fn write_read_high_throughput(
347 total_size: usize,
348 socket: &mut fasync::Socket,
349) -> Result<(), Error> {
350 write_single_stream(total_size, socket).await?;
353 read_single_stream(total_size, socket).await?;
354 Ok(())
355}
356
357#[cfg(target_os = "fuchsia")]
358async fn run_single_stream_bidirectional_test(
359 mut read_socket: fasync::Socket,
360 control_socket: &mut fasync::Socket,
361 measurements: &mut Measurements,
362) -> Result<(), Error> {
363 use fidl::HandleBased;
364
365 println!("Starting single stream bidirectional round trip throughput test...");
366
367 let mut write_socket = fasync::Socket::from_socket(
368 read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
369 );
370
371 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
372 .await?;
373
374 let total_size = THROUGHPUT_SIZE_BYTES;
375 let mut rx_durations: Vec<u64> = Vec::new();
376 let mut tx_durations: Vec<u64> = Vec::new();
377
378 for i in 0..100 {
379 let before = fasync::MonotonicInstant::now();
380
381 let (write_finish, read_finish) = try_join!(
382 write_single_stream(total_size, &mut write_socket),
383 read_single_stream(total_size, &mut read_socket)
384 )?;
385
386 rx_durations.push(
387 get_time_delta_nanos(before, write_finish)
388 .try_into()
389 .expect("durations measured by the same thread must be greater than zero"),
390 );
391
392 tx_durations.push(
393 get_time_delta_nanos(before, read_finish)
394 .try_into()
395 .expect("durations measured by the same thread must be greater than zero"),
396 );
397
398 print!("\rFinished {} bidirectional throughput measurements", i + 1);
399 std::io::stdout().flush().expect("failed to flush stdout");
400 }
401
402 rx_durations.sort();
403 rx_durations.reverse();
404
405 tx_durations.sort();
406 tx_durations.reverse();
407
408 assert_eq!(rx_durations.len(), tx_durations.len());
409 println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
410
411 measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
412 measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
413
414 Ok(())
415}
416
417async fn run_single_stream_unidirectional_round_trip_test(
418 mut data_socket: fasync::Socket,
419 control_socket: &mut fasync::Socket,
420 measurements: &mut Measurements,
421) -> Result<(), Error> {
422 println!("Starting single stream unidirectional round trip throughput test...");
423
424 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
425
426 let total_size = THROUGHPUT_SIZE_BYTES;
427 let mut durations: Vec<u64> = Vec::new();
428
429 for i in 0..100 {
430 let before = fasync::MonotonicInstant::now();
431
432 write_read_high_throughput(total_size, &mut data_socket).await?;
433
434 let after = fasync::MonotonicInstant::now();
435 durations.push(
436 get_time_delta_nanos(before, after)
437 .try_into()
438 .expect("durations measured by the same thread must be greater than zero"),
439 );
440
441 print!("\rFinished {} round trip throughput measurements", i + 1);
442 std::io::stdout().flush().expect("failed to flush stdout");
443 }
444
445 durations.sort();
446 durations.reverse();
447 println!("\rFinished {} single stream round trip throughput measurements", durations.len());
448
449 measurements.single_stream_unidirectional =
450 Some(throughput_percentile(&durations, total_size * 2));
451
452 Ok(())
453}
454
455async fn run_multi_stream_unidirectional_round_trip_test(
456 mut data_socket1: fasync::Socket,
457 mut data_socket2: fasync::Socket,
458 mut data_socket3: fasync::Socket,
459 mut data_socket4: fasync::Socket,
460 mut data_socket5: fasync::Socket,
461 control_socket: &mut fasync::Socket,
462 measurements: &mut Measurements,
463) -> Result<(), Error> {
464 println!("Starting multistream unidirectional round trip throughput test...");
465
466 wait_for_magic_numbers(
467 HashSet::from([
468 MULTI_STREAM_MAGIC_NUM1,
469 MULTI_STREAM_MAGIC_NUM2,
470 MULTI_STREAM_MAGIC_NUM3,
471 MULTI_STREAM_MAGIC_NUM4,
472 MULTI_STREAM_MAGIC_NUM5,
473 ]),
474 control_socket,
475 )
476 .await?;
477
478 let total_size = THROUGHPUT_SIZE_BYTES;
479 let mut durations: Vec<u64> = Vec::new();
480
481 for i in 0..50 {
482 let before = fasync::MonotonicInstant::now();
483
484 try_join!(
485 write_read_high_throughput(total_size, &mut data_socket1),
486 write_read_high_throughput(total_size, &mut data_socket2),
487 write_read_high_throughput(total_size, &mut data_socket3),
488 write_read_high_throughput(total_size, &mut data_socket4),
489 write_read_high_throughput(total_size, &mut data_socket5)
490 )?;
491
492 let after = fasync::MonotonicInstant::now();
493 durations.push(
494 get_time_delta_nanos(before, after)
495 .try_into()
496 .expect("durations measured by the same thread must be greater than zero"),
497 );
498
499 print!("\rFinished {} multistream round trip throughput measurements", i + 1);
500 std::io::stdout().flush().expect("failed to flush stdout");
501 }
502
503 durations.sort();
504 durations.reverse();
505 println!("\rFinished {} multistream round trip throughput measurements", durations.len());
506
507 measurements.multi_stream_unidirectional =
508 Some(throughput_percentile(&durations, total_size * 2));
509
510 Ok(())
511}
512
513async fn run_latency_test(
514 mut socket: fasync::Socket,
515 measurements: &mut Measurements,
516) -> Result<(), Error> {
517 println!("Checking for data corruption...");
518 measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
519 println!("Finished data corruption check");
520
521 let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
522 let mut buffer = vec![0u8; packet.len()];
523 let mut latencies: Vec<u64> = Vec::new();
524
525 println!("Starting latency test...");
526 for i in 0..10000 {
527 let before = fasync::MonotonicInstant::now();
528 let timeout = before + std::time::Duration::from_millis(100).into();
529
530 if packet.len() != socket.as_ref().write(&packet)? {
531 return Err(anyhow!("failed to write full packet"));
532 }
533
534 select! {
535 () = fasync::Timer::new(timeout).fuse() => {
536 return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
537 }
538 result = socket.read_exact(&mut buffer).fuse() => {
539 result.map_err(
540 |err| anyhow!("failed to read from socket during latency test: {}", err))?;
541 }
542 }
543
544 let after = fasync::MonotonicInstant::now();
545 latencies.push(
546 get_time_delta_nanos(before, after)
547 .try_into()
548 .expect("durations measured by the same thread must be greater than zero"),
549 );
550
551 if (i + 1) % 50 == 0 {
552 print!("\rFinished measuring round trip latency for {} packets", i + 1);
553 std::io::stdout().flush().expect("failed to flush stdout");
554 }
555 }
556
557 latencies.sort();
558 println!("\rFinished measuring round trip latency for {} packets", latencies.len());
559
560 measurements.round_trip_page = Some(latency_percentile(&latencies));
561
562 Ok(())
563}
564
565async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
566 let guest_info = guest_manager.get_info().await?;
567 if guest_info.guest_status.unwrap() != GuestStatus::Running {
568 return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
569 }
570
571 let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
572 guest_manager
573 .connect(guest_server_end)
574 .await
575 .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
576 .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
577
578 let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
579 guest_endpoint
580 .get_host_vsock_endpoint(vsock_server_end)
581 .await?
582 .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
583
584 let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
585 vsock_endpoint
586 .listen(HOST_PORT, acceptor)
587 .await
588 .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
589 .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
590
591 let socket = guest_endpoint
592 .get_console()
593 .await
594 .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
595 .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
596
597 let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
600 let bytes_written = socket
601 .write(command)
602 .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
603 if bytes_written != command.len() {
604 return Err(anyhow!(
605 "attempted to send command '{}', but only managed to write '{}'",
606 std::str::from_utf8(command).expect("failed to parse as utf-8"),
607 std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
608 ));
609 }
610
611 let mut expected_connections = HashSet::from([
612 CONTROL_STREAM,
613 LATENCY_CHECK_STREAM,
614 SINGLE_STREAM_THROUGHPUT,
615 MULTI_STREAM_THROUGHPUT1,
616 MULTI_STREAM_THROUGHPUT2,
617 MULTI_STREAM_THROUGHPUT3,
618 MULTI_STREAM_THROUGHPUT4,
619 MULTI_STREAM_THROUGHPUT5,
620 SINGLE_STREAM_BIDIRECTIONAL,
621 ]);
622 let mut active_connections = HashMap::new();
623
624 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
626 loop {
627 select! {
628 () = fasync::Timer::new(timeout).fuse() => {
629 return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
630 }
631 request = client_stream.try_next() => {
632 let request = request
633 .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
634 .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
635 let (_src_cid, src_port, _port, responder) = request
636 .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
637
638 match expected_connections.contains(&src_port) {
639 false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
640 true => {
641 expected_connections.remove(&src_port);
642 Ok(())
643 }
644 }?;
645
646 let (client_socket, device_socket) = fidl::Socket::create_stream();
647 let client_socket = fasync::Socket::from_socket(client_socket);
648
649 responder.send(Ok(device_socket))
650 .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
651
652 if let Some(_) = active_connections.insert(src_port, client_socket) {
653 panic!("Connections must be unique");
654 }
655
656 if expected_connections.is_empty() {
657 break;
658 }
659 }
660 }
661 }
662
663 let mut measurements = Measurements::default();
664
665 run_latency_test(
666 active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
667 &mut measurements,
668 )
669 .await?;
670
671 #[cfg(target_os = "fuchsia")]
673 run_single_stream_bidirectional_test(
674 active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
675 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
676 &mut measurements,
677 )
678 .await?;
679
680 run_single_stream_unidirectional_round_trip_test(
681 active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
682 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
683 &mut measurements,
684 )
685 .await?;
686
687 #[allow(clippy::large_futures)]
688 run_multi_stream_unidirectional_round_trip_test(
689 active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
690 active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
691 active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
692 active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
693 active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
694 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
695 &mut measurements,
696 )
697 .await?;
698
699 return Ok(measurements);
700}