usb_vsock/
packet.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_sync::{Mutex, MutexGuard};
6use std::cmp::min;
7use std::future::Future;
8use std::iter::FusedIterator;
9use std::ops::DerefMut;
10use std::pin::Pin;
11use std::sync::atomic;
12use std::task::{Context, Poll, Waker};
13
14use futures::task::AtomicWaker;
15use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned, little_endian};
16
17use crate::Address;
18
19/// The serializable enumeration of packet types that can be used over a usb vsock link. These
20/// roughly correspond to the state machine described by the `fuchsia.hardware.vsock` fidl library.
21#[repr(u8)]
22#[derive(
23    Debug,
24    TryFromBytes,
25    IntoBytes,
26    KnownLayout,
27    Immutable,
28    Unaligned,
29    PartialEq,
30    Eq,
31    PartialOrd,
32    Ord,
33    Hash,
34    Clone,
35    Copy,
36)]
37pub enum PacketType {
38    /// Synchronizes the connection between host and device. Each side must send and receive a
39    /// sync packet with the same payload before any other packet may be recognized on the usb
40    /// connection. If this packet is received mid-stream, all connections must be considered
41    /// reset to avoid data loss. It should also only ever be the last vsock packet in a given
42    /// usb packet.
43    Sync = b'S',
44    /// An outbound echo request. The other end should reply to this with the same body and all the
45    /// same fields in a [`PacketType::EchoReply`] packet, no matter the state of the connection.
46    Echo = b'E',
47    /// A reply to a [`PacketType::Echo`] request packet. The body and all header fields should be
48    /// set the same as the original echo packet's.
49    EchoReply = b'e',
50    /// Connect to a cid:port pair from a cid:port pair on the other side. The payload must be empty.
51    Connect = b'C',
52    /// Notify the other end that this connection should be closed. The other end should respond
53    /// with an [`PacketType::Reset`] when the connection has been closed on the other end. The
54    /// payload must be empty.
55    Finish = b'F',
56    /// Terminate or refuse a connection on a particular cid:port pair set. There must have been a
57    /// previous [`PacketType::Connect`] request for this, and after this that particular set of
58    /// pairs must be considered disconnected and no more [`PacketType::Data`] packets may be sent
59    /// for it unless a new connection is initiated. The payload must be empty.
60    Reset = b'R',
61    /// Accepts a connection previously requested with [`PacketType::Connect`] on the given cid:port
62    /// pair set. The payload must be empty.
63    Accept = b'A',
64    /// A data packet for a particular cid:port pair set previously established with a [`PacketType::Connect`]
65    /// and [`PacketType::Accept`] message. If all of the cid and port fields of the packet are
66    /// zero, this is for a special data stream between the host and device that does not require
67    /// an established connection.
68    Data = b'D',
69    /// Advisory flow control message. Payload indicates flow control state "on"
70    /// or "off". If "on", it is recommended that the receiver not send more data
71    /// for this connection if possible, until the state becomes "off" again.
72    /// State is assumed "off" when no packet has been received.
73    Pause = b'X',
74}
75
76/// The packet header for a vsock packet passed over the usb vsock link. Each usb packet can contain
77/// one or more packets, each of which must start with a valid header and correct payload length.
78#[repr(C, packed(1))]
79#[derive(
80    Debug,
81    TryFromBytes,
82    IntoBytes,
83    KnownLayout,
84    Immutable,
85    Unaligned,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90    Hash,
91    Clone,
92)]
93pub struct Header {
94    magic: [u8; 3],
95    /// The type of this packet
96    pub packet_type: PacketType,
97    /// For Connect, Reset, Accept, and Data packets this represents the device side's address.
98    /// Usually this will be a special value representing either that it is simply "the device",
99    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
100    /// packet. Must be zero for any other packet type.
101    pub device_cid: little_endian::U32,
102    /// For Connect, Reset, Accept, and Data packets this represents the host side's address.
103    /// Usually this will be a special value representing either that it is simply "the host",
104    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
105    /// packet. Must be zero for any other packet type.
106    pub host_cid: little_endian::U32,
107    /// For Connect, Reset, Accept, and Data packets this represents the device side's port.
108    /// This must be a valid positive value for any of those packet types, unless all of the cid and
109    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
110    /// other packet type.
111    pub device_port: little_endian::U32,
112    /// For Connect, Reset, Accept, and Data packets this represents the host side's port.
113    /// This must be a valid positive value for any of those packet types, unless all of the cid and
114    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
115    /// other packet type.
116    pub host_port: little_endian::U32,
117    /// The length of the packet payload. This must be zero for any packet type other than Data,
118    /// Echo, EchoReply, or Pause.
119    pub payload_len: little_endian::U32,
120}
121
122impl Header {
123    /// Helper constant for the size of a header on the wire
124    pub const SIZE: usize = size_of::<Self>();
125    const MAGIC: &'static [u8; 3] = b"ffx";
126
127    /// Builds a new packet with correct magic value and packet type and all other fields
128    /// initialized to zero.
129    pub fn new(packet_type: PacketType) -> Self {
130        let device_cid = 0.into();
131        let host_cid = 0.into();
132        let device_port = 0.into();
133        let host_port = 0.into();
134        let payload_len = 0.into();
135        Header {
136            magic: *Self::MAGIC,
137            packet_type,
138            device_cid,
139            host_cid,
140            device_port,
141            host_port,
142            payload_len,
143        }
144    }
145
146    /// Gets the size of this packet on the wire with the header and a payload of length
147    /// [`Self::payload_len`].
148    pub fn packet_size(&self) -> usize {
149        Packet::size_with_payload(self.payload_len.get() as usize)
150    }
151
152    /// Sets the address fields of this packet header based on the normalized address in `addr`.
153    pub fn set_address(&mut self, addr: &Address) {
154        self.device_cid.set(addr.device_cid);
155        self.host_cid.set(addr.host_cid);
156        self.device_port.set(addr.device_port);
157        self.host_port.set(addr.host_port);
158    }
159}
160
161/// A typed reference to the contents of a packet in a buffer.
162#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
163pub struct Packet<'a> {
164    /// The packet's header
165    pub header: &'a Header,
166    /// The packet's payload
167    pub payload: &'a [u8],
168}
169
170impl<'a> Packet<'a> {
171    /// The size of this packet according to its header (as [`Self::payload`] may have been
172    /// over-allocated for the size of the packet).
173    pub fn size(&self) -> usize {
174        self.header.packet_size()
175    }
176
177    fn size_with_payload(payload_size: usize) -> usize {
178        size_of::<Header>() + payload_size
179    }
180
181    fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
182        // split off and validate the header
183        let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
184            return Err(std::io::Error::other("insufficient data for last packet"));
185        };
186        let header = Header::try_ref_from_bytes(header).map_err(|err| {
187            std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
188        })?;
189        if header.magic != *Header::MAGIC {
190            return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
191        }
192        // validate the payload length
193        let payload_len = Into::<u64>::into(header.payload_len) as usize;
194        let body_len = body.len();
195        if payload_len > body_len {
196            return Err(std::io::Error::other(format!(
197                "payload length ({payload_len}) on usb vsock header {header:#?} was larger than available in buffer ({body_len})"
198            )));
199        }
200
201        let (payload, remain) = body.split_at(payload_len);
202        Ok((Packet { header, payload }, remain))
203    }
204
205    /// Writes the packet to a buffer when the buffer is known to be large enough to hold it. Note
206    /// that the packet header's [`Header::payload_len`] must be correct before calling this, it
207    /// does not use the size of [`Self::payload`] to decide how much of the payload buffer is
208    /// valid.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the buffer is not large enough for the packet.
213    pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
214        let (packet, remain) = buf.split_at_mut(self.size());
215        let payload_len = u32::from(self.header.payload_len) as usize;
216        self.header.write_to_prefix(packet).unwrap();
217        self.payload[..payload_len].write_to_suffix(packet).unwrap();
218        remain
219    }
220}
221
222/// A typed mutable reference to the contents of a packet in a buffer.
223#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
224pub struct PacketMut<'a> {
225    /// The packet's header.
226    pub header: &'a mut Header,
227    /// The packet's payload.
228    pub payload: &'a mut [u8],
229}
230
231impl<'a> PacketMut<'a> {
232    /// Creates a new [`PacketMut`] inside the given buffer and initializes the header to the given
233    /// [`PacketType`] before returning it. All other fields in the header will be zeroed, and the
234    /// [`PacketMut::payload`] will be the remaining area of the buffer after the header.
235    ///
236    /// Use [`PacketMut::finish`] to validate and write the proper packet length and return the
237    /// total size of the packet.
238    ///
239    /// # Panics
240    ///
241    /// The buffer must be large enough to hold at least a packet header, and this will panic if
242    /// it's not.
243    pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
244        Header::new(packet_type)
245            .write_to_prefix(buf)
246            .expect("not enough room in buffer for packet header");
247        let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
248        let header = Header::try_mut_from_bytes(header_bytes).unwrap();
249        PacketMut { header, payload }
250    }
251
252    /// Validates the correctness of the packet and returns the size of the packet within the
253    /// original buffer.
254    pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
255        if payload_len <= self.payload.len() {
256            self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
257            Ok(Header::SIZE + payload_len)
258        } else {
259            Err(PacketTooBigError)
260        }
261    }
262}
263
264/// Reads a sequence of vsock packets from a given usb packet buffer
265pub struct VsockPacketIterator<'a> {
266    buf: Option<&'a [u8]>,
267}
268
269impl<'a> VsockPacketIterator<'a> {
270    /// Creates a new [`PacketStream`] from the contents of `buf`. The returned stream will
271    /// iterate over individual vsock packets.
272    pub fn new(buf: &'a [u8]) -> Self {
273        Self { buf: Some(buf) }
274    }
275}
276
277impl<'a> FusedIterator for VsockPacketIterator<'a> {}
278impl<'a> Iterator for VsockPacketIterator<'a> {
279    type Item = Result<Packet<'a>, std::io::Error>;
280
281    fn next(&mut self) -> Option<Self::Item> {
282        // return immediately if we've already returned `None` or `Some(Err)`
283        let data = self.buf.take()?;
284
285        // also return immediately if there's no more data in the buffer
286        if data.len() == 0 {
287            return None;
288        }
289
290        match Packet::parse_next(data) {
291            Ok((header, rest)) => {
292                // update our pointer for next time
293                self.buf = Some(rest);
294                Some(Ok(header))
295            }
296            Err(err) => Some(Err(err)),
297        }
298    }
299}
300
301/// Builds an aggregate usb packet out of vsock packets and gives readiness
302/// notifications when there is room to add another packet or data available to send.
303pub struct UsbPacketBuilder<B> {
304    buffer: B,
305    offset: usize,
306    space_waker: AtomicWaker,
307    packet_waker: AtomicWaker,
308}
309
310/// the size of the packet would have been too large even if the buffer was empty
311#[derive(Debug, Copy, Clone)]
312pub struct PacketTooBigError;
313
314impl<B> UsbPacketBuilder<B> {
315    /// Creates a new builder from `buffer`, which is a type that can be used as a mutable slice
316    /// with space available for storing vsock packets. The `readable_notify` will have a message
317    /// sent to it whenever a usb packet could be transmitted.
318    pub fn new(buffer: B) -> Self {
319        let offset = 0;
320        let space_waker = AtomicWaker::default();
321        let packet_waker = AtomicWaker::default();
322        Self { buffer, offset, space_waker, packet_waker }
323    }
324
325    /// Returns true if the packet has data in it
326    pub fn has_data(&self) -> bool {
327        self.offset > 0
328    }
329}
330
331impl<B> UsbPacketBuilder<B>
332where
333    B: std::ops::DerefMut<Target = [u8]>,
334{
335    /// Gets the space currently available for another packet in the buffer
336    pub fn available(&self) -> usize {
337        self.buffer.len() - self.offset
338    }
339
340    /// Writes the given packet into the buffer. The packet and header must be able to fit
341    /// within the buffer provided at creation time, and `header.payload_length` must be correctly
342    /// set.
343    pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
344        let packet_size = packet.size();
345        if self.available() >= packet_size {
346            packet.write_to_unchecked(&mut self.buffer[self.offset..]);
347            self.offset += packet_size;
348            self.packet_waker.wake();
349            Ok(())
350        } else {
351            Err(PacketTooBigError)
352        }
353    }
354
355    /// Takes the current usb packet, if there is one. The returned mutable slice
356    /// will be only the data written to the buffer so far, and packet writing will be reset to the
357    /// beginning of the buffer.
358    pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
359        let written = self.offset;
360        if written == 0 {
361            return None;
362        }
363        self.offset = 0;
364        self.space_waker.wake();
365        Some(&mut self.buffer[0..written])
366    }
367}
368
369/// We have shut down a packet filler and so cannot act on it further.
370#[derive(Debug, Copy, Clone)]
371pub struct ShutdownError;
372
373impl From<ShutdownError> for std::io::Error {
374    fn from(_: ShutdownError) -> Self {
375        std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Transport shut down")
376    }
377}
378
379/// Errors returned from [`UsbPacketFiller::write_vsock_packet`].
380#[derive(Debug, Copy, Clone)]
381pub(crate) enum WritePacketError {
382    /// See [`PacketTooBigError`]
383    PacketTooBig(PacketTooBigError),
384    /// See [`ShutdownError`]
385    Shutdown(ShutdownError),
386}
387
388impl From<PacketTooBigError> for WritePacketError {
389    fn from(value: PacketTooBigError) -> Self {
390        WritePacketError::PacketTooBig(value)
391    }
392}
393
394impl From<ShutdownError> for WritePacketError {
395    fn from(value: ShutdownError) -> Self {
396        WritePacketError::Shutdown(value)
397    }
398}
399
400/// Extension for [`WritePacketError`] to allow asserting [`PacketTooBigError`]
401/// while extracting [`ShutdownError`].
402pub(crate) trait WritePacketErrorExt {
403    /// The type we return when we've extracted and handled any [`PacketTooBigError`]
404    type Residue;
405
406    /// Assert that this type is not a [`PacketTooBigError`] and map it to a new
407    /// type that cannot represent `PacketTooBigError`.
408    fn assert_right_size(self) -> Self::Residue;
409
410    /// Same as `assert_right_size` but allow a custom error message if we find
411    /// a [`PacketTooBigError`].
412    fn expect_right_size(self, err: &str) -> Self::Residue;
413}
414
415impl WritePacketErrorExt for WritePacketError {
416    type Residue = ShutdownError;
417
418    fn assert_right_size(self) -> ShutdownError {
419        self.expect_right_size("Unexpected PacketTooBigError in WritePacketError")
420    }
421
422    fn expect_right_size(self, err: &str) -> ShutdownError {
423        return match self {
424            WritePacketError::PacketTooBig(_) => panic!("{err}"),
425            WritePacketError::Shutdown(shutdown_error) => shutdown_error,
426        };
427    }
428}
429
430impl<T> WritePacketErrorExt for Result<T, WritePacketError> {
431    type Residue = Result<T, ShutdownError>;
432
433    fn assert_right_size(self) -> Result<T, ShutdownError> {
434        match self {
435            Ok(x) => Ok(Ok(x)),
436            Err(WritePacketError::Shutdown(s)) => Ok(Err(s)),
437            Err(WritePacketError::PacketTooBig(p)) => Err(p),
438        }
439        .unwrap()
440    }
441
442    fn expect_right_size(self, err: &str) -> Result<T, ShutdownError> {
443        match self {
444            Ok(x) => Ok(Ok(x)),
445            Err(WritePacketError::Shutdown(s)) => Ok(Err(s)),
446            Err(WritePacketError::PacketTooBig(p)) => Err(p),
447        }
448        .expect(err)
449    }
450}
451
452pub(crate) struct UsbPacketFiller<B> {
453    current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
454    out_packet_wakers: Mutex<Vec<Waker>>,
455    filled_packet_waker: AtomicWaker,
456    is_shutdown: atomic::AtomicBool,
457}
458
459impl<B> Default for UsbPacketFiller<B> {
460    fn default() -> Self {
461        let current_out_packet = Mutex::default();
462        let out_packet_wakers = Mutex::default();
463        let filled_packet_waker = AtomicWaker::default();
464        let is_shutdown = atomic::AtomicBool::new(false);
465        Self { current_out_packet, out_packet_wakers, filled_packet_waker, is_shutdown }
466    }
467}
468
469impl<B> UsbPacketFiller<B> {
470    fn is_shutdown(&self) -> bool {
471        self.is_shutdown.load(atomic::Ordering::Acquire)
472    }
473}
474
475impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
476    fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
477        WaitForFillable { filler: &self, min_packet_size }
478    }
479
480    pub fn shutdown(&self) {
481        let mut out_packets = self.current_out_packet.lock();
482        let mut out_packet_wakers = self.out_packet_wakers.lock();
483        self.is_shutdown.store(true, atomic::Ordering::Release);
484        *out_packets = None;
485        out_packet_wakers.drain(..).for_each(Waker::wake);
486        self.filled_packet_waker.wake();
487    }
488
489    pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), WritePacketError> {
490        let mut builder = self.wait_for_fillable(packet.size()).await?;
491        builder.as_mut().unwrap().write_vsock_packet(packet)?;
492        self.filled_packet_waker.wake();
493        Ok(())
494    }
495
496    pub async fn write_vsock_data(
497        &self,
498        address: &Address,
499        payload: &[u8],
500    ) -> Result<usize, ShutdownError> {
501        let header = &mut Header::new(PacketType::Data);
502        header.set_address(&address);
503        let mut builder = self.wait_for_fillable(Header::SIZE + 1).await?;
504        let builder = builder.as_mut().unwrap();
505        let writing = min(payload.len(), builder.available() - Header::SIZE);
506        header.payload_len.set(writing as u32);
507        builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
508        self.filled_packet_waker.wake();
509        Ok(writing)
510    }
511
512    pub async fn write_vsock_data_all(
513        &self,
514        address: &Address,
515        payload: &[u8],
516    ) -> Result<(), ShutdownError> {
517        let mut written = 0;
518        while written < payload.len() {
519            written += self.write_vsock_data(address, &payload[written..]).await?;
520        }
521        Ok(())
522    }
523
524    /// Provides a packet builder for the state machine to write packets to. Returns a future that
525    /// will be fulfilled when there is data available to send on the packet.
526    ///
527    /// # Panics
528    ///
529    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
530    pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
531        FillUsbPacket(&self, Some(builder))
532    }
533}
534
535pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
536
537impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
538    type Output = Result<UsbPacketBuilder<B>, ShutdownError>;
539
540    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541        // if we're holding a `PacketBuilder` we haven't been waited on yet. Otherwise we want
542        // to return ready when there's a packet and it's got data in it.
543        if let Some(builder) = self.1.take() {
544            // if the packet we were handed for some reason already has data in it, hand it back
545            if builder.has_data() {
546                return Poll::Ready(Ok(builder));
547            }
548
549            let mut current_out_packet = self.0.current_out_packet.lock();
550            if self.0.is_shutdown() {
551                return Poll::Ready(Err(ShutdownError));
552            }
553            assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
554            current_out_packet.replace(builder);
555
556            // before we drop the lock, register our interest in packets being filled in.
557            self.0.filled_packet_waker.register(cx.waker());
558            // drop the lock so that we aren't immediately stalling any tasks we wake below.
559            drop(current_out_packet);
560
561            // wake all pending vsock packet writes so that we can fill the buffer as quickly as
562            // possible.
563            let mut wakers = self.0.out_packet_wakers.lock();
564            for waker in wakers.drain(..) {
565                waker.wake();
566            }
567            if self.0.is_shutdown() { Poll::Ready(Err(ShutdownError)) } else { Poll::Pending }
568        } else {
569            let mut current_out_packet = self.0.current_out_packet.lock();
570            if self.0.is_shutdown() {
571                return Poll::Ready(Err(ShutdownError));
572            }
573            let Some(builder) = current_out_packet.take() else {
574                panic!("Packet builder was somehow removed from connection prematurely");
575            };
576
577            if builder.has_data() {
578                self.0.filled_packet_waker.wake();
579                Poll::Ready(Ok(builder))
580            } else {
581                // if there hasn't been any data placed in the packet, put the builder back and
582                // return Pending.
583                current_out_packet.replace(builder);
584                Poll::Pending
585            }
586        }
587    }
588}
589
590pub(crate) struct WaitForFillable<'a, B> {
591    filler: &'a UsbPacketFiller<B>,
592    min_packet_size: usize,
593}
594
595impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
596    type Output = Result<MutexGuard<'a, Option<UsbPacketBuilder<B>>>, ShutdownError>;
597
598    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
599        let current_out_packet = self.filler.current_out_packet.lock();
600        if self.filler.is_shutdown() {
601            return Poll::Ready(Err(ShutdownError));
602        }
603        let Some(builder) = &*current_out_packet else {
604            self.filler.out_packet_wakers.lock().push(cx.waker().clone());
605            return Poll::Pending;
606        };
607        if builder.available() >= self.min_packet_size {
608            Poll::Ready(Ok(current_out_packet))
609        } else {
610            self.filler.out_packet_wakers.lock().push(cx.waker().clone());
611            Poll::Pending
612        }
613    }
614}
615
616#[cfg(test)]
617mod tests {
618    use std::sync::Arc;
619
620    use super::*;
621    use fuchsia_async::Task;
622    use futures::poll;
623
624    async fn assert_pending<F: Future>(fut: F) {
625        let fut = std::pin::pin!(fut);
626        if let Poll::Ready(_) = poll!(fut) {
627            panic!("Future was ready when it shouldn't have been");
628        }
629    }
630    #[test]
631    fn packet_write_doesnt_clobber_header_with_incorrect_len() {
632        let header = &mut Header::new(PacketType::Pause);
633        let payload = &[1];
634        let packet = Packet { header, payload };
635        let mut buf = vec![0; packet.size()];
636        packet.write_to_unchecked(&mut buf);
637
638        let read_packet = Packet::parse_next(&buf).unwrap().0;
639        assert_eq!(&read_packet.header, &packet.header);
640    }
641
642    #[fuchsia::test]
643    async fn roundtrip_packet() {
644        let payload = b"hello world!";
645        let packet = Packet {
646            payload,
647            header: &Header {
648                device_cid: 1.into(),
649                host_cid: 2.into(),
650                device_port: 3.into(),
651                host_port: 4.into(),
652                payload_len: little_endian::U32::from(payload.len() as u32),
653                ..Header::new(PacketType::Data)
654            },
655        };
656        let buffer = vec![0; packet.size()];
657        let builder = UsbPacketBuilder::new(buffer);
658        let filler = UsbPacketFiller::default();
659        let mut filled_fut = filler.fill_usb_packet(builder);
660        println!("we should not be ready to pull a usb packet off yet");
661        assert_pending(&mut filled_fut).await;
662
663        println!("we should be able to write a packet though ({} bytes)", packet.size());
664        filler.write_vsock_packet(&packet).await.unwrap();
665
666        println!("we shouldn't have any space for another packet now");
667        assert_pending(filler.wait_for_fillable(1)).await;
668
669        println!("but we should have a new usb packet available");
670        let mut builder = filled_fut.await.unwrap();
671        let buffer = builder.take_usb_packet().unwrap();
672
673        println!("the packet we get back out should be the same one we put in");
674        let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
675        assert_eq!(packet, read_packet);
676        assert!(remain.is_empty());
677    }
678
679    #[fuchsia::test]
680    async fn many_packets() {
681        fn make_numbered_packet(num: u32) -> (Header, String) {
682            let payload = format!("packet #{num}!");
683            let header = Header {
684                device_cid: num.into(),
685                device_port: num.into(),
686                host_cid: num.into(),
687                host_port: num.into(),
688                payload_len: little_endian::U32::from(payload.len() as u32),
689                ..Header::new(PacketType::Data)
690            };
691            (header, payload)
692        }
693        const BUFFER_SIZE: usize = 256;
694        let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
695        let filler = Arc::new(UsbPacketFiller::default());
696
697        let send_filler = filler.clone();
698        let send_task = Task::spawn(async move {
699            for packet_num in 0..1024 {
700                let next_packet = make_numbered_packet(packet_num);
701                let next_packet =
702                    Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
703                send_filler.write_vsock_packet(&next_packet).await.unwrap();
704            }
705        });
706
707        let mut read_packet_num = 0;
708        while read_packet_num < 1024 {
709            builder = filler.fill_usb_packet(builder).await.unwrap();
710            let buffer = builder.take_usb_packet().unwrap();
711            let mut num_packets = 0;
712            for packet in VsockPacketIterator::new(&buffer) {
713                let packet_compare = make_numbered_packet(read_packet_num);
714                let packet_compare =
715                    Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
716                assert_eq!(packet.unwrap(), packet_compare);
717                read_packet_num += 1;
718                num_packets += 1;
719            }
720            println!(
721                "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
722                count = BUFFER_SIZE - buffer.len()
723            );
724        }
725        send_task.await;
726        assert_eq!(1024, read_packet_num);
727    }
728
729    #[fuchsia::test]
730    async fn packet_fillable_futures() {
731        let filler = UsbPacketFiller::default();
732
733        for _ in 0..10 {
734            println!("register an interest in filling a usb packet");
735            let mut fillable_fut = filler.wait_for_fillable(1);
736            println!("make sure we have nothing to fill");
737            assert!(poll!(&mut fillable_fut).is_pending());
738
739            println!("register a packet for filling");
740            let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
741            println!("make sure we've registered the buffer");
742            assert!(poll!(&mut filled_fut).is_pending());
743
744            println!("now put some things in the packet");
745            let header = &mut Header::new(PacketType::Data);
746            header.payload_len.set(99);
747            let Poll::Ready(Ok(mut builder)) = poll!(fillable_fut) else {
748                panic!("should have been ready to fill a packet")
749            };
750            builder
751                .as_mut()
752                .unwrap()
753                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
754                .unwrap();
755            drop(builder);
756            let Poll::Ready(Ok(mut builder)) = poll!(filler.wait_for_fillable(1)) else {
757                panic!("should have been ready to fill a packet(2)")
758            };
759            builder
760                .as_mut()
761                .unwrap()
762                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
763                .unwrap();
764            drop(builder);
765
766            println!("but if we ask for too much space we'll get pending");
767            assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
768
769            println!("and now resolve the filled future and get our data back");
770            let mut filled = filled_fut.await.unwrap();
771            let packets =
772                Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
773            assert_eq!(packets.len(), 2);
774        }
775    }
776}