Skip to main content

guest_cli/
vsockperf.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::platform::PlatformServices;
6use anyhow::{Error, anyhow};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9    GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use fuchsia_async as fasync;
12use futures::{AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt, select, try_join};
13use guest_cli_args as arguments;
14use prettytable::format::consts::FORMAT_CLEAN;
15use prettytable::{Table, cell, row};
16use std::collections::{HashMap, HashSet};
17use std::fmt;
18use std::io::Write;
19
20const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
21const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
22const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
23
24const HOST_PORT: u32 = 8500;
25const CONTROL_STREAM: u32 = 8501;
26const LATENCY_CHECK_STREAM: u32 = 8502;
27
28const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
29const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
30
31const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
32const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
33const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
34const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
35const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
36const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
37const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
38const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
39const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
40const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
41
42const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
43#[allow(dead_code)]
44const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
45
46#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
47enum PercentileUnit {
48    Nanoseconds,
49    MebibytesPerSecond,
50}
51
52#[derive(serde::Serialize, serde::Deserialize)]
53pub struct Percentiles {
54    min: f64,
55    p_25th: f64,
56    p_50th: f64,
57    p_75th: f64,
58    p_99th: f64,
59    max: f64,
60    unit: PercentileUnit,
61}
62
63impl fmt::Display for Percentiles {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        let get_units = |val: f64, unit: PercentileUnit| -> String {
66            match unit {
67                PercentileUnit::Nanoseconds => {
68                    format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
69                }
70                PercentileUnit::MebibytesPerSecond => {
71                    format!("{:.2}MiB/s", val)
72                }
73            }
74        };
75
76        let mut table = Table::new();
77        table.set_format(*FORMAT_CLEAN);
78
79        table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
80        table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
81        table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
82        table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
83        table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
84        table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
85
86        write!(f, "\n{}", table)
87    }
88}
89
90#[derive(Default, serde::Serialize, serde::Deserialize)]
91pub struct Measurements {
92    data_corruption: Option<bool>,
93    round_trip_page: Option<Percentiles>,
94    tx_throughput: Option<Percentiles>,
95    rx_throughput: Option<Percentiles>,
96    single_stream_unidirectional: Option<Percentiles>,
97    multi_stream_unidirectional: Option<Percentiles>,
98}
99
100impl fmt::Display for Measurements {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
103            match percentiles {
104                None => " NOT RUN".to_owned(),
105                Some(percentile) => percentile.to_string(),
106            }
107        };
108
109        writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
110
111        writeln!(
112            f,
113            "* Data corruption check: {}",
114            match self.data_corruption {
115                None => "NOT RUN",
116                Some(result) =>
117                    if result {
118                        "PASSED"
119                    } else {
120                        "FAILED"
121                    },
122            }
123        )?;
124
125        writeln!(
126            f,
127            "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
128            format_percentiles(&self.round_trip_page)
129        )?;
130        writeln!(
131            f,
132            "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
133            format_percentiles(&self.tx_throughput)
134        )?;
135        writeln!(
136            f,
137            "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
138            format_percentiles(&self.rx_throughput)
139        )?;
140        writeln!(
141            f,
142            "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
143            format_percentiles(&self.single_stream_unidirectional)
144        )?;
145        writeln!(
146            f,
147            "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
148            format_percentiles(&self.multi_stream_unidirectional)
149        )
150    }
151}
152
153#[derive(serde::Serialize, serde::Deserialize)]
154pub enum VsockPerfResult {
155    BenchmarkComplete(Box<Measurements>),
156    UnsupportedGuest(arguments::GuestType),
157    Internal(String),
158}
159
160impl fmt::Display for VsockPerfResult {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        match self {
163            VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
164            VsockPerfResult::UnsupportedGuest(guest) => {
165                write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
166            }
167            VsockPerfResult::Internal(context) => {
168                write!(f, "Internal error: {}", context)
169            }
170        }
171    }
172}
173
174fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
175    #[cfg(target_os = "fuchsia")]
176    {
177        (after - before).into_nanos()
178    }
179
180    #[cfg(not(target_os = "fuchsia"))]
181    {
182        (after - before).as_nanos().try_into().unwrap()
183    }
184}
185
186pub async fn handle_vsockperf<P: PlatformServices>(
187    services: &P,
188    args: &arguments::vsockperf_args::VsockPerfArgs,
189) -> Result<VsockPerfResult, Error> {
190    if args.guest_type != arguments::GuestType::Debian {
191        return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
192    }
193
194    let guest_manager = services.connect_to_manager(args.guest_type).await?;
195    #[allow(clippy::large_futures)]
196    Ok(match run_micro_benchmark(guest_manager).await {
197        Err(err) => VsockPerfResult::Internal(format!("{}", err)),
198        Ok(result) => VsockPerfResult::BenchmarkComplete(Box::new(result)),
199    })
200}
201
202fn percentile(durations: &[u64], percentile: u8) -> u64 {
203    assert!(percentile <= 100 && !durations.is_empty());
204    // Don't bother interpolating between two points if this isn't a whole number, just floor it.
205    let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
206    durations[location]
207}
208
209fn latency_percentile(durations: &[u64]) -> Percentiles {
210    Percentiles {
211        min: percentile(&durations, 0) as f64,
212        p_25th: percentile(&durations, 25) as f64,
213        p_50th: percentile(&durations, 50) as f64,
214        p_75th: percentile(&durations, 75) as f64,
215        p_99th: percentile(&durations, 99) as f64,
216        max: percentile(&durations, 100) as f64,
217        unit: PercentileUnit::Nanoseconds,
218    }
219}
220
221fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
222    let to_mebibytes_per_second = |nanos: u64| -> f64 {
223        let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
224        let bytes_per_second = (bytes as f64) / seconds;
225        bytes_per_second / (1 << 20) as f64
226    };
227
228    Percentiles {
229        min: to_mebibytes_per_second(percentile(&durations, 0)),
230        p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
231        p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
232        p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
233        p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
234        max: to_mebibytes_per_second(percentile(&durations, 100)),
235        unit: PercentileUnit::MebibytesPerSecond,
236    }
237}
238
239async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
240    // Send and receive 100 messages, checking for a known but changing pattern.
241    let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
242    for i in 0..100 {
243        let pattern = format!("DAVID{:0>3}", i).repeat(512);
244        let packet = pattern.as_bytes();
245        assert_eq!(packet.len(), buffer.len());
246
247        if packet.len() != socket.as_ref().write(&packet)? {
248            return Err(anyhow!("failed to write full packet"));
249        }
250
251        let timeout =
252            fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
253        select! {
254            () = fasync::Timer::new(timeout).fuse() => {
255                return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
256            }
257            result = socket.read_exact(&mut buffer).fuse() => {
258                result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
259            }
260        }
261
262        if buffer != packet {
263            return Ok(false);
264        }
265    }
266
267    Ok(true)
268}
269
270// Get the magic numbers for a test case from the guest to know that it's ready.
271async fn wait_for_magic_numbers(
272    mut numbers: HashSet<u8>,
273    control_socket: &mut fasync::Socket,
274) -> Result<(), Error> {
275    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
276    let mut magic_buf = [0u8];
277    loop {
278        select! {
279            () = fasync::Timer::new(timeout).fuse() => {
280                return Err(anyhow!("timeout waiting 5s to get the test ready"));
281            }
282            result = control_socket.read_exact(&mut magic_buf).fuse() => {
283                result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
284                match numbers.contains(&magic_buf[0]) {
285                    false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
286                    true => {
287                        numbers.remove(&magic_buf[0]);
288                        Ok(())
289                    }
290                }?;
291
292                if numbers.is_empty() {
293                    break;
294                }
295            }
296        }
297    }
298
299    Ok(())
300}
301
302async fn read_single_stream(
303    total_size: usize,
304    socket: &mut fasync::Socket,
305) -> Result<fasync::MonotonicInstant, Error> {
306    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
307    let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
308    let segments = total_size / buffer.len();
309
310    for _ in 0..segments {
311        select! {
312            () = fasync::Timer::new(timeout).fuse() => {
313                return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
314            }
315            result = socket.read_exact(&mut buffer).fuse() => {
316                result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
317            }
318        }
319    }
320
321    Ok(fasync::MonotonicInstant::now())
322}
323
324async fn write_single_stream(
325    total_size: usize,
326    socket: &mut fasync::Socket,
327) -> Result<fasync::MonotonicInstant, Error> {
328    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
329    let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
330    let segments = total_size / buffer.len();
331
332    for _ in 0..segments {
333        select! {
334            () = fasync::Timer::new(timeout).fuse() => {
335                return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
336            }
337            result = socket.write_all(&buffer).fuse() => {
338                result.map_err(
339                    |err| anyhow!("failed to write segment to socket: {}", err))?;
340            }
341        }
342    }
343
344    Ok(fasync::MonotonicInstant::now())
345}
346
347async fn write_read_high_throughput(
348    total_size: usize,
349    socket: &mut fasync::Socket,
350) -> Result<(), Error> {
351    // This is intentionally sequential to measure roundtrip throughput from the perspective of
352    // the host.
353    write_single_stream(total_size, socket).await?;
354    read_single_stream(total_size, socket).await?;
355    Ok(())
356}
357
358#[cfg(target_os = "fuchsia")]
359async fn run_single_stream_bidirectional_test(
360    mut read_socket: fasync::Socket,
361    control_socket: &mut fasync::Socket,
362    measurements: &mut Measurements,
363) -> Result<(), Error> {
364    println!("Starting single stream bidirectional round trip throughput test...");
365
366    let mut write_socket = fasync::Socket::from_socket(
367        read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
368    );
369
370    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
371        .await?;
372
373    let total_size = THROUGHPUT_SIZE_BYTES;
374    let mut rx_durations: Vec<u64> = Vec::new();
375    let mut tx_durations: Vec<u64> = Vec::new();
376
377    for i in 0..100 {
378        let before = fasync::MonotonicInstant::now();
379
380        let (write_finish, read_finish) = try_join!(
381            write_single_stream(total_size, &mut write_socket),
382            read_single_stream(total_size, &mut read_socket)
383        )?;
384
385        rx_durations.push(
386            get_time_delta_nanos(before, write_finish)
387                .try_into()
388                .expect("durations measured by the same thread must be greater than zero"),
389        );
390
391        tx_durations.push(
392            get_time_delta_nanos(before, read_finish)
393                .try_into()
394                .expect("durations measured by the same thread must be greater than zero"),
395        );
396
397        print!("\rFinished {} bidirectional throughput measurements", i + 1);
398        std::io::stdout().flush().expect("failed to flush stdout");
399    }
400
401    rx_durations.sort();
402    rx_durations.reverse();
403
404    tx_durations.sort();
405    tx_durations.reverse();
406
407    assert_eq!(rx_durations.len(), tx_durations.len());
408    println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
409
410    measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
411    measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
412
413    Ok(())
414}
415
416async fn run_single_stream_unidirectional_round_trip_test(
417    mut data_socket: fasync::Socket,
418    control_socket: &mut fasync::Socket,
419    measurements: &mut Measurements,
420) -> Result<(), Error> {
421    println!("Starting single stream unidirectional round trip throughput test...");
422
423    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
424
425    let total_size = THROUGHPUT_SIZE_BYTES;
426    let mut durations: Vec<u64> = Vec::new();
427
428    for i in 0..100 {
429        let before = fasync::MonotonicInstant::now();
430
431        write_read_high_throughput(total_size, &mut data_socket).await?;
432
433        let after = fasync::MonotonicInstant::now();
434        durations.push(
435            get_time_delta_nanos(before, after)
436                .try_into()
437                .expect("durations measured by the same thread must be greater than zero"),
438        );
439
440        print!("\rFinished {} round trip throughput measurements", i + 1);
441        std::io::stdout().flush().expect("failed to flush stdout");
442    }
443
444    durations.sort();
445    durations.reverse();
446    println!("\rFinished {} single stream round trip throughput measurements", durations.len());
447
448    measurements.single_stream_unidirectional =
449        Some(throughput_percentile(&durations, total_size * 2));
450
451    Ok(())
452}
453
454async fn run_multi_stream_unidirectional_round_trip_test(
455    mut data_socket1: fasync::Socket,
456    mut data_socket2: fasync::Socket,
457    mut data_socket3: fasync::Socket,
458    mut data_socket4: fasync::Socket,
459    mut data_socket5: fasync::Socket,
460    control_socket: &mut fasync::Socket,
461    measurements: &mut Measurements,
462) -> Result<(), Error> {
463    println!("Starting multistream unidirectional round trip throughput test...");
464
465    wait_for_magic_numbers(
466        HashSet::from([
467            MULTI_STREAM_MAGIC_NUM1,
468            MULTI_STREAM_MAGIC_NUM2,
469            MULTI_STREAM_MAGIC_NUM3,
470            MULTI_STREAM_MAGIC_NUM4,
471            MULTI_STREAM_MAGIC_NUM5,
472        ]),
473        control_socket,
474    )
475    .await?;
476
477    let total_size = THROUGHPUT_SIZE_BYTES;
478    let mut durations: Vec<u64> = Vec::new();
479
480    for i in 0..50 {
481        let before = fasync::MonotonicInstant::now();
482
483        try_join!(
484            write_read_high_throughput(total_size, &mut data_socket1),
485            write_read_high_throughput(total_size, &mut data_socket2),
486            write_read_high_throughput(total_size, &mut data_socket3),
487            write_read_high_throughput(total_size, &mut data_socket4),
488            write_read_high_throughput(total_size, &mut data_socket5)
489        )?;
490
491        let after = fasync::MonotonicInstant::now();
492        durations.push(
493            get_time_delta_nanos(before, after)
494                .try_into()
495                .expect("durations measured by the same thread must be greater than zero"),
496        );
497
498        print!("\rFinished {} multistream round trip throughput measurements", i + 1);
499        std::io::stdout().flush().expect("failed to flush stdout");
500    }
501
502    durations.sort();
503    durations.reverse();
504    println!("\rFinished {} multistream round trip throughput measurements", durations.len());
505
506    measurements.multi_stream_unidirectional =
507        Some(throughput_percentile(&durations, total_size * 2));
508
509    Ok(())
510}
511
512async fn run_latency_test(
513    mut socket: fasync::Socket,
514    measurements: &mut Measurements,
515) -> Result<(), Error> {
516    println!("Checking for data corruption...");
517    measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
518    println!("Finished data corruption check");
519
520    let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
521    let mut buffer = vec![0u8; packet.len()];
522    let mut latencies: Vec<u64> = Vec::new();
523
524    println!("Starting latency test...");
525    for i in 0..10000 {
526        let before = fasync::MonotonicInstant::now();
527        let timeout = before + std::time::Duration::from_millis(100).into();
528
529        if packet.len() != socket.as_ref().write(&packet)? {
530            return Err(anyhow!("failed to write full packet"));
531        }
532
533        select! {
534            () = fasync::Timer::new(timeout).fuse() => {
535                return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
536            }
537            result = socket.read_exact(&mut buffer).fuse() => {
538                result.map_err(
539                    |err| anyhow!("failed to read from socket during latency test: {}", err))?;
540            }
541        }
542
543        let after = fasync::MonotonicInstant::now();
544        latencies.push(
545            get_time_delta_nanos(before, after)
546                .try_into()
547                .expect("durations measured by the same thread must be greater than zero"),
548        );
549
550        if (i + 1) % 50 == 0 {
551            print!("\rFinished measuring round trip latency for {} packets", i + 1);
552            std::io::stdout().flush().expect("failed to flush stdout");
553        }
554    }
555
556    latencies.sort();
557    println!("\rFinished measuring round trip latency for {} packets", latencies.len());
558
559    measurements.round_trip_page = Some(latency_percentile(&latencies));
560
561    Ok(())
562}
563
564async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
565    let guest_info = guest_manager.get_info().await?;
566    if guest_info.guest_status.unwrap() != GuestStatus::Running {
567        return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
568    }
569
570    let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
571    guest_manager
572        .connect(guest_server_end)
573        .await
574        .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
575        .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
576
577    let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
578    guest_endpoint
579        .get_host_vsock_endpoint(vsock_server_end)
580        .await?
581        .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
582
583    let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
584    vsock_endpoint
585        .listen(HOST_PORT, acceptor)
586        .await
587        .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
588        .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
589
590    let socket = guest_endpoint
591        .get_console()
592        .await
593        .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
594        .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
595
596    // Start the micro benchmark utility on the guest which will begin by opening the necessary
597    // connections.
598    let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
599    let bytes_written = socket
600        .write(command)
601        .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
602    if bytes_written != command.len() {
603        return Err(anyhow!(
604            "attempted to send command '{}', but only managed to write '{}'",
605            std::str::from_utf8(command).expect("failed to parse as utf-8"),
606            std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
607        ));
608    }
609
610    let mut expected_connections = HashSet::from([
611        CONTROL_STREAM,
612        LATENCY_CHECK_STREAM,
613        SINGLE_STREAM_THROUGHPUT,
614        MULTI_STREAM_THROUGHPUT1,
615        MULTI_STREAM_THROUGHPUT2,
616        MULTI_STREAM_THROUGHPUT3,
617        MULTI_STREAM_THROUGHPUT4,
618        MULTI_STREAM_THROUGHPUT5,
619        SINGLE_STREAM_BIDIRECTIONAL,
620    ]);
621    let mut active_connections = HashMap::new();
622
623    // Give the utility 15s to open all the expected connections.
624    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
625    loop {
626        select! {
627            () = fasync::Timer::new(timeout).fuse() => {
628                return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
629            }
630            request = client_stream.try_next() => {
631                let request = request
632                    .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
633                    .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
634                let (_src_cid, src_port, _port, responder) = request
635                    .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
636
637                match expected_connections.contains(&src_port) {
638                    false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
639                    true => {
640                        expected_connections.remove(&src_port);
641                        Ok(())
642                    }
643                }?;
644
645                let (client_socket, device_socket) = fidl::Socket::create_stream();
646                let client_socket = fasync::Socket::from_socket(client_socket);
647
648                responder.send(Ok(device_socket))
649                    .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
650
651                if let Some(_) = active_connections.insert(src_port, client_socket) {
652                    panic!("Connections must be unique");
653                }
654
655                if expected_connections.is_empty() {
656                    break;
657                }
658            }
659        }
660    }
661
662    let mut measurements = Measurements::default();
663
664    run_latency_test(
665        active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
666        &mut measurements,
667    )
668    .await?;
669
670    // TODO(https://fxbug.dev/42068091): Re-enable when overnet supports duplicated socket handles.
671    #[cfg(target_os = "fuchsia")]
672    run_single_stream_bidirectional_test(
673        active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
674        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
675        &mut measurements,
676    )
677    .await?;
678
679    run_single_stream_unidirectional_round_trip_test(
680        active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
681        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
682        &mut measurements,
683    )
684    .await?;
685
686    #[allow(clippy::large_futures)]
687    run_multi_stream_unidirectional_round_trip_test(
688        active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
689        active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
690        active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
691        active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
692        active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
693        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
694        &mut measurements,
695    )
696    .await?;
697
698    return Ok(measurements);
699}