Skip to main content

guest_cli/
vsockperf.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::platform::PlatformServices;
6use anyhow::{Error, anyhow};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9    GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use futures::{AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt, select, try_join};
12use prettytable::format::consts::FORMAT_CLEAN;
13use prettytable::{Table, cell, row};
14use std::collections::{HashMap, HashSet};
15use std::fmt;
16use std::io::Write;
17use {fuchsia_async as fasync, guest_cli_args as arguments, zx_status};
18
19const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
20const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
21const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
22
23const HOST_PORT: u32 = 8500;
24const CONTROL_STREAM: u32 = 8501;
25const LATENCY_CHECK_STREAM: u32 = 8502;
26
27const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
28const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
29
30const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
31const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
32const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
33const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
34const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
35const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
36const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
37const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
38const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
39const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
40
41const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
42#[allow(dead_code)]
43const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
44
45#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
46enum PercentileUnit {
47    Nanoseconds,
48    MebibytesPerSecond,
49}
50
51#[derive(serde::Serialize, serde::Deserialize)]
52pub struct Percentiles {
53    min: f64,
54    p_25th: f64,
55    p_50th: f64,
56    p_75th: f64,
57    p_99th: f64,
58    max: f64,
59    unit: PercentileUnit,
60}
61
62impl fmt::Display for Percentiles {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        let get_units = |val: f64, unit: PercentileUnit| -> String {
65            match unit {
66                PercentileUnit::Nanoseconds => {
67                    format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
68                }
69                PercentileUnit::MebibytesPerSecond => {
70                    format!("{:.2}MiB/s", val)
71                }
72            }
73        };
74
75        let mut table = Table::new();
76        table.set_format(*FORMAT_CLEAN);
77
78        table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
79        table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
80        table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
81        table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
82        table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
83        table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
84
85        write!(f, "\n{}", table)
86    }
87}
88
89#[derive(Default, serde::Serialize, serde::Deserialize)]
90pub struct Measurements {
91    data_corruption: Option<bool>,
92    round_trip_page: Option<Percentiles>,
93    tx_throughput: Option<Percentiles>,
94    rx_throughput: Option<Percentiles>,
95    single_stream_unidirectional: Option<Percentiles>,
96    multi_stream_unidirectional: Option<Percentiles>,
97}
98
99impl fmt::Display for Measurements {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
102            match percentiles {
103                None => " NOT RUN".to_owned(),
104                Some(percentile) => percentile.to_string(),
105            }
106        };
107
108        writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
109
110        writeln!(
111            f,
112            "* Data corruption check: {}",
113            match self.data_corruption {
114                None => "NOT RUN",
115                Some(result) =>
116                    if result {
117                        "PASSED"
118                    } else {
119                        "FAILED"
120                    },
121            }
122        )?;
123
124        writeln!(
125            f,
126            "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
127            format_percentiles(&self.round_trip_page)
128        )?;
129        writeln!(
130            f,
131            "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
132            format_percentiles(&self.tx_throughput)
133        )?;
134        writeln!(
135            f,
136            "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
137            format_percentiles(&self.rx_throughput)
138        )?;
139        writeln!(
140            f,
141            "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
142            format_percentiles(&self.single_stream_unidirectional)
143        )?;
144        writeln!(
145            f,
146            "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
147            format_percentiles(&self.multi_stream_unidirectional)
148        )
149    }
150}
151
152#[derive(serde::Serialize, serde::Deserialize)]
153pub enum VsockPerfResult {
154    BenchmarkComplete(Box<Measurements>),
155    UnsupportedGuest(arguments::GuestType),
156    Internal(String),
157}
158
159impl fmt::Display for VsockPerfResult {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        match self {
162            VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
163            VsockPerfResult::UnsupportedGuest(guest) => {
164                write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
165            }
166            VsockPerfResult::Internal(context) => {
167                write!(f, "Internal error: {}", context)
168            }
169        }
170    }
171}
172
173fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
174    #[cfg(target_os = "fuchsia")]
175    {
176        (after - before).into_nanos()
177    }
178
179    #[cfg(not(target_os = "fuchsia"))]
180    {
181        (after - before).as_nanos().try_into().unwrap()
182    }
183}
184
185pub async fn handle_vsockperf<P: PlatformServices>(
186    services: &P,
187    args: &arguments::vsockperf_args::VsockPerfArgs,
188) -> Result<VsockPerfResult, Error> {
189    if args.guest_type != arguments::GuestType::Debian {
190        return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
191    }
192
193    let guest_manager = services.connect_to_manager(args.guest_type).await?;
194    #[allow(clippy::large_futures)]
195    Ok(match run_micro_benchmark(guest_manager).await {
196        Err(err) => VsockPerfResult::Internal(format!("{}", err)),
197        Ok(result) => VsockPerfResult::BenchmarkComplete(Box::new(result)),
198    })
199}
200
201fn percentile(durations: &[u64], percentile: u8) -> u64 {
202    assert!(percentile <= 100 && !durations.is_empty());
203    // Don't bother interpolating between two points if this isn't a whole number, just floor it.
204    let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
205    durations[location]
206}
207
208fn latency_percentile(durations: &[u64]) -> Percentiles {
209    Percentiles {
210        min: percentile(&durations, 0) as f64,
211        p_25th: percentile(&durations, 25) as f64,
212        p_50th: percentile(&durations, 50) as f64,
213        p_75th: percentile(&durations, 75) as f64,
214        p_99th: percentile(&durations, 99) as f64,
215        max: percentile(&durations, 100) as f64,
216        unit: PercentileUnit::Nanoseconds,
217    }
218}
219
220fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
221    let to_mebibytes_per_second = |nanos: u64| -> f64 {
222        let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
223        let bytes_per_second = (bytes as f64) / seconds;
224        bytes_per_second / (1 << 20) as f64
225    };
226
227    Percentiles {
228        min: to_mebibytes_per_second(percentile(&durations, 0)),
229        p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
230        p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
231        p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
232        p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
233        max: to_mebibytes_per_second(percentile(&durations, 100)),
234        unit: PercentileUnit::MebibytesPerSecond,
235    }
236}
237
238async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
239    // Send and receive 100 messages, checking for a known but changing pattern.
240    let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
241    for i in 0..100 {
242        let pattern = format!("DAVID{:0>3}", i).repeat(512);
243        let packet = pattern.as_bytes();
244        assert_eq!(packet.len(), buffer.len());
245
246        if packet.len() != socket.as_ref().write(&packet)? {
247            return Err(anyhow!("failed to write full packet"));
248        }
249
250        let timeout =
251            fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
252        select! {
253            () = fasync::Timer::new(timeout).fuse() => {
254                return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
255            }
256            result = socket.read_exact(&mut buffer).fuse() => {
257                result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
258            }
259        }
260
261        if buffer != packet {
262            return Ok(false);
263        }
264    }
265
266    Ok(true)
267}
268
269// Get the magic numbers for a test case from the guest to know that it's ready.
270async fn wait_for_magic_numbers(
271    mut numbers: HashSet<u8>,
272    control_socket: &mut fasync::Socket,
273) -> Result<(), Error> {
274    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
275    let mut magic_buf = [0u8];
276    loop {
277        select! {
278            () = fasync::Timer::new(timeout).fuse() => {
279                return Err(anyhow!("timeout waiting 5s to get the test ready"));
280            }
281            result = control_socket.read_exact(&mut magic_buf).fuse() => {
282                result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
283                match numbers.contains(&magic_buf[0]) {
284                    false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
285                    true => {
286                        numbers.remove(&magic_buf[0]);
287                        Ok(())
288                    }
289                }?;
290
291                if numbers.is_empty() {
292                    break;
293                }
294            }
295        }
296    }
297
298    Ok(())
299}
300
301async fn read_single_stream(
302    total_size: usize,
303    socket: &mut fasync::Socket,
304) -> Result<fasync::MonotonicInstant, Error> {
305    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
306    let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
307    let segments = total_size / buffer.len();
308
309    for _ in 0..segments {
310        select! {
311            () = fasync::Timer::new(timeout).fuse() => {
312                return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
313            }
314            result = socket.read_exact(&mut buffer).fuse() => {
315                result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
316            }
317        }
318    }
319
320    Ok(fasync::MonotonicInstant::now())
321}
322
323async fn write_single_stream(
324    total_size: usize,
325    socket: &mut fasync::Socket,
326) -> Result<fasync::MonotonicInstant, Error> {
327    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
328    let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
329    let segments = total_size / buffer.len();
330
331    for _ in 0..segments {
332        select! {
333            () = fasync::Timer::new(timeout).fuse() => {
334                return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
335            }
336            result = socket.write_all(&buffer).fuse() => {
337                result.map_err(
338                    |err| anyhow!("failed to write segment to socket: {}", err))?;
339            }
340        }
341    }
342
343    Ok(fasync::MonotonicInstant::now())
344}
345
346async fn write_read_high_throughput(
347    total_size: usize,
348    socket: &mut fasync::Socket,
349) -> Result<(), Error> {
350    // This is intentionally sequential to measure roundtrip throughput from the perspective of
351    // the host.
352    write_single_stream(total_size, socket).await?;
353    read_single_stream(total_size, socket).await?;
354    Ok(())
355}
356
357#[cfg(target_os = "fuchsia")]
358async fn run_single_stream_bidirectional_test(
359    mut read_socket: fasync::Socket,
360    control_socket: &mut fasync::Socket,
361    measurements: &mut Measurements,
362) -> Result<(), Error> {
363    use fidl::HandleBased;
364
365    println!("Starting single stream bidirectional round trip throughput test...");
366
367    let mut write_socket = fasync::Socket::from_socket(
368        read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
369    );
370
371    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
372        .await?;
373
374    let total_size = THROUGHPUT_SIZE_BYTES;
375    let mut rx_durations: Vec<u64> = Vec::new();
376    let mut tx_durations: Vec<u64> = Vec::new();
377
378    for i in 0..100 {
379        let before = fasync::MonotonicInstant::now();
380
381        let (write_finish, read_finish) = try_join!(
382            write_single_stream(total_size, &mut write_socket),
383            read_single_stream(total_size, &mut read_socket)
384        )?;
385
386        rx_durations.push(
387            get_time_delta_nanos(before, write_finish)
388                .try_into()
389                .expect("durations measured by the same thread must be greater than zero"),
390        );
391
392        tx_durations.push(
393            get_time_delta_nanos(before, read_finish)
394                .try_into()
395                .expect("durations measured by the same thread must be greater than zero"),
396        );
397
398        print!("\rFinished {} bidirectional throughput measurements", i + 1);
399        std::io::stdout().flush().expect("failed to flush stdout");
400    }
401
402    rx_durations.sort();
403    rx_durations.reverse();
404
405    tx_durations.sort();
406    tx_durations.reverse();
407
408    assert_eq!(rx_durations.len(), tx_durations.len());
409    println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
410
411    measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
412    measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
413
414    Ok(())
415}
416
417async fn run_single_stream_unidirectional_round_trip_test(
418    mut data_socket: fasync::Socket,
419    control_socket: &mut fasync::Socket,
420    measurements: &mut Measurements,
421) -> Result<(), Error> {
422    println!("Starting single stream unidirectional round trip throughput test...");
423
424    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
425
426    let total_size = THROUGHPUT_SIZE_BYTES;
427    let mut durations: Vec<u64> = Vec::new();
428
429    for i in 0..100 {
430        let before = fasync::MonotonicInstant::now();
431
432        write_read_high_throughput(total_size, &mut data_socket).await?;
433
434        let after = fasync::MonotonicInstant::now();
435        durations.push(
436            get_time_delta_nanos(before, after)
437                .try_into()
438                .expect("durations measured by the same thread must be greater than zero"),
439        );
440
441        print!("\rFinished {} round trip throughput measurements", i + 1);
442        std::io::stdout().flush().expect("failed to flush stdout");
443    }
444
445    durations.sort();
446    durations.reverse();
447    println!("\rFinished {} single stream round trip throughput measurements", durations.len());
448
449    measurements.single_stream_unidirectional =
450        Some(throughput_percentile(&durations, total_size * 2));
451
452    Ok(())
453}
454
455async fn run_multi_stream_unidirectional_round_trip_test(
456    mut data_socket1: fasync::Socket,
457    mut data_socket2: fasync::Socket,
458    mut data_socket3: fasync::Socket,
459    mut data_socket4: fasync::Socket,
460    mut data_socket5: fasync::Socket,
461    control_socket: &mut fasync::Socket,
462    measurements: &mut Measurements,
463) -> Result<(), Error> {
464    println!("Starting multistream unidirectional round trip throughput test...");
465
466    wait_for_magic_numbers(
467        HashSet::from([
468            MULTI_STREAM_MAGIC_NUM1,
469            MULTI_STREAM_MAGIC_NUM2,
470            MULTI_STREAM_MAGIC_NUM3,
471            MULTI_STREAM_MAGIC_NUM4,
472            MULTI_STREAM_MAGIC_NUM5,
473        ]),
474        control_socket,
475    )
476    .await?;
477
478    let total_size = THROUGHPUT_SIZE_BYTES;
479    let mut durations: Vec<u64> = Vec::new();
480
481    for i in 0..50 {
482        let before = fasync::MonotonicInstant::now();
483
484        try_join!(
485            write_read_high_throughput(total_size, &mut data_socket1),
486            write_read_high_throughput(total_size, &mut data_socket2),
487            write_read_high_throughput(total_size, &mut data_socket3),
488            write_read_high_throughput(total_size, &mut data_socket4),
489            write_read_high_throughput(total_size, &mut data_socket5)
490        )?;
491
492        let after = fasync::MonotonicInstant::now();
493        durations.push(
494            get_time_delta_nanos(before, after)
495                .try_into()
496                .expect("durations measured by the same thread must be greater than zero"),
497        );
498
499        print!("\rFinished {} multistream round trip throughput measurements", i + 1);
500        std::io::stdout().flush().expect("failed to flush stdout");
501    }
502
503    durations.sort();
504    durations.reverse();
505    println!("\rFinished {} multistream round trip throughput measurements", durations.len());
506
507    measurements.multi_stream_unidirectional =
508        Some(throughput_percentile(&durations, total_size * 2));
509
510    Ok(())
511}
512
513async fn run_latency_test(
514    mut socket: fasync::Socket,
515    measurements: &mut Measurements,
516) -> Result<(), Error> {
517    println!("Checking for data corruption...");
518    measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
519    println!("Finished data corruption check");
520
521    let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
522    let mut buffer = vec![0u8; packet.len()];
523    let mut latencies: Vec<u64> = Vec::new();
524
525    println!("Starting latency test...");
526    for i in 0..10000 {
527        let before = fasync::MonotonicInstant::now();
528        let timeout = before + std::time::Duration::from_millis(100).into();
529
530        if packet.len() != socket.as_ref().write(&packet)? {
531            return Err(anyhow!("failed to write full packet"));
532        }
533
534        select! {
535            () = fasync::Timer::new(timeout).fuse() => {
536                return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
537            }
538            result = socket.read_exact(&mut buffer).fuse() => {
539                result.map_err(
540                    |err| anyhow!("failed to read from socket during latency test: {}", err))?;
541            }
542        }
543
544        let after = fasync::MonotonicInstant::now();
545        latencies.push(
546            get_time_delta_nanos(before, after)
547                .try_into()
548                .expect("durations measured by the same thread must be greater than zero"),
549        );
550
551        if (i + 1) % 50 == 0 {
552            print!("\rFinished measuring round trip latency for {} packets", i + 1);
553            std::io::stdout().flush().expect("failed to flush stdout");
554        }
555    }
556
557    latencies.sort();
558    println!("\rFinished measuring round trip latency for {} packets", latencies.len());
559
560    measurements.round_trip_page = Some(latency_percentile(&latencies));
561
562    Ok(())
563}
564
565async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
566    let guest_info = guest_manager.get_info().await?;
567    if guest_info.guest_status.unwrap() != GuestStatus::Running {
568        return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
569    }
570
571    let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
572    guest_manager
573        .connect(guest_server_end)
574        .await
575        .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
576        .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
577
578    let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
579    guest_endpoint
580        .get_host_vsock_endpoint(vsock_server_end)
581        .await?
582        .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
583
584    let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
585    vsock_endpoint
586        .listen(HOST_PORT, acceptor)
587        .await
588        .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
589        .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
590
591    let socket = guest_endpoint
592        .get_console()
593        .await
594        .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
595        .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
596
597    // Start the micro benchmark utility on the guest which will begin by opening the necessary
598    // connections.
599    let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
600    let bytes_written = socket
601        .write(command)
602        .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
603    if bytes_written != command.len() {
604        return Err(anyhow!(
605            "attempted to send command '{}', but only managed to write '{}'",
606            std::str::from_utf8(command).expect("failed to parse as utf-8"),
607            std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
608        ));
609    }
610
611    let mut expected_connections = HashSet::from([
612        CONTROL_STREAM,
613        LATENCY_CHECK_STREAM,
614        SINGLE_STREAM_THROUGHPUT,
615        MULTI_STREAM_THROUGHPUT1,
616        MULTI_STREAM_THROUGHPUT2,
617        MULTI_STREAM_THROUGHPUT3,
618        MULTI_STREAM_THROUGHPUT4,
619        MULTI_STREAM_THROUGHPUT5,
620        SINGLE_STREAM_BIDIRECTIONAL,
621    ]);
622    let mut active_connections = HashMap::new();
623
624    // Give the utility 15s to open all the expected connections.
625    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
626    loop {
627        select! {
628            () = fasync::Timer::new(timeout).fuse() => {
629                return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
630            }
631            request = client_stream.try_next() => {
632                let request = request
633                    .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
634                    .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
635                let (_src_cid, src_port, _port, responder) = request
636                    .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
637
638                match expected_connections.contains(&src_port) {
639                    false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
640                    true => {
641                        expected_connections.remove(&src_port);
642                        Ok(())
643                    }
644                }?;
645
646                let (client_socket, device_socket) = fidl::Socket::create_stream();
647                let client_socket = fasync::Socket::from_socket(client_socket);
648
649                responder.send(Ok(device_socket))
650                    .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
651
652                if let Some(_) = active_connections.insert(src_port, client_socket) {
653                    panic!("Connections must be unique");
654                }
655
656                if expected_connections.is_empty() {
657                    break;
658                }
659            }
660        }
661    }
662
663    let mut measurements = Measurements::default();
664
665    run_latency_test(
666        active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
667        &mut measurements,
668    )
669    .await?;
670
671    // TODO(https://fxbug.dev/42068091): Re-enable when overnet supports duplicated socket handles.
672    #[cfg(target_os = "fuchsia")]
673    run_single_stream_bidirectional_test(
674        active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
675        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
676        &mut measurements,
677    )
678    .await?;
679
680    run_single_stream_unidirectional_round_trip_test(
681        active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
682        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
683        &mut measurements,
684    )
685    .await?;
686
687    #[allow(clippy::large_futures)]
688    run_multi_stream_unidirectional_round_trip_test(
689        active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
690        active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
691        active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
692        active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
693        active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
694        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
695        &mut measurements,
696    )
697    .await?;
698
699    return Ok(measurements);
700}