usb_vsock/
packet.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18/// The serializable enumeration of packet types that can be used over a usb vsock link. These
19/// roughly correspond to the state machine described by the `fuchsia.hardware.vsock` fidl library.
20#[repr(u8)]
21#[derive(
22    Debug,
23    TryFromBytes,
24    IntoBytes,
25    KnownLayout,
26    Immutable,
27    Unaligned,
28    PartialEq,
29    Eq,
30    PartialOrd,
31    Ord,
32    Hash,
33    Clone,
34    Copy,
35)]
36pub enum PacketType {
37    /// Synchronizes the connection between host and device. Each side must send and receive a
38    /// sync packet with the same payload before any other packet may be recognized on the usb
39    /// connection. If this packet is received mid-stream, all connections must be considered
40    /// reset to avoid data loss. It should also only ever be the last vsock packet in a given
41    /// usb packet.
42    Sync = b'S',
43    /// An outbound echo request. The other end should reply to this with the same body and all the
44    /// same fields in a [`PacketType::EchoReply`] packet, no matter the state of the connection.
45    Echo = b'E',
46    /// A reply to a [`PacketType::Echo`] request packet. The body and all header fields should be
47    /// set the same as the original echo packet's.
48    EchoReply = b'e',
49    /// Connect to a cid:port pair from a cid:port pair on the other side. The payload must be empty.
50    Connect = b'C',
51    /// Notify the other end that this connection should be closed. The other end should respond
52    /// with an [`PacketType::Reset`] when the connection has been closed on the other end. The
53    /// payload must be empty.
54    Finish = b'F',
55    /// Terminate or refuse a connection on a particular cid:port pair set. There must have been a
56    /// previous [`PacketType::Connect`] request for this, and after this that particular set of
57    /// pairs must be considered disconnected and no more [`PacketType::Data`] packets may be sent
58    /// for it unless a new connection is initiated. The payload must be empty.
59    Reset = b'R',
60    /// Accepts a connection previously requested with [`PacketType::Connect`] on the given cid:port
61    /// pair set. The payload must be empty.
62    Accept = b'A',
63    /// A data packet for a particular cid:port pair set previously established with a [`PacketType::Connect`]
64    /// and [`PacketType::Accept`] message. If all of the cid and port fields of the packet are
65    /// zero, this is for a special data stream between the host and device that does not require
66    /// an established connection.
67    Data = b'D',
68}
69
70/// The packet header for a vsock packet passed over the usb vsock link. Each usb packet can contain
71/// one or more packets, each of which must start with a valid header and correct payload length.
72#[repr(C, packed(1))]
73#[derive(
74    Debug,
75    TryFromBytes,
76    IntoBytes,
77    KnownLayout,
78    Immutable,
79    Unaligned,
80    PartialEq,
81    Eq,
82    PartialOrd,
83    Ord,
84    Hash,
85    Clone,
86)]
87pub struct Header {
88    magic: [u8; 3],
89    /// The type of this packet
90    pub packet_type: PacketType,
91    /// For Connect, Reset, Accept, and Data packets this represents the device side's address.
92    /// Usually this will be a special value representing either that it is simply "the device",
93    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
94    /// packet. Must be zero for any other packet type.
95    pub device_cid: little_endian::U32,
96    /// For Connect, Reset, Accept, and Data packets this represents the host side's address.
97    /// Usually this will be a special value representing either that it is simply "the host",
98    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
99    /// packet. Must be zero for any other packet type.
100    pub host_cid: little_endian::U32,
101    /// For Connect, Reset, Accept, and Data packets this represents the device side's port.
102    /// This must be a valid positive value for any of those packet types, unless all of the cid and
103    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
104    /// other packet type.
105    pub device_port: little_endian::U32,
106    /// For Connect, Reset, Accept, and Data packets this represents the host side's port.
107    /// This must be a valid positive value for any of those packet types, unless all of the cid and
108    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
109    /// other packet type.
110    pub host_port: little_endian::U32,
111    /// The length of the packet payload. This must be zero for any packet type other than Sync or
112    /// Data.
113    pub payload_len: little_endian::U32,
114}
115
116impl Header {
117    /// Helper constant for the size of a header on the wire
118    pub const SIZE: usize = size_of::<Self>();
119    const MAGIC: &'static [u8; 3] = b"ffx";
120
121    /// Builds a new packet with correct magic value and packet type and all other fields
122    /// initialized to zero.
123    pub fn new(packet_type: PacketType) -> Self {
124        let device_cid = 0.into();
125        let host_cid = 0.into();
126        let device_port = 0.into();
127        let host_port = 0.into();
128        let payload_len = 0.into();
129        Header {
130            magic: *Self::MAGIC,
131            packet_type,
132            device_cid,
133            host_cid,
134            device_port,
135            host_port,
136            payload_len,
137        }
138    }
139
140    /// Gets the size of this packet on the wire with the header and a payload of length
141    /// [`Self::payload_len`].
142    pub fn packet_size(&self) -> usize {
143        Packet::size_with_payload(self.payload_len.get() as usize)
144    }
145
146    /// Sets the address fields of this packet header based on the normalized address in `addr`.
147    pub fn set_address(&mut self, addr: &Address) {
148        self.device_cid.set(addr.device_cid);
149        self.host_cid.set(addr.host_cid);
150        self.device_port.set(addr.device_port);
151        self.host_port.set(addr.host_port);
152    }
153}
154
155/// A typed reference to the contents of a packet in a buffer.
156#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
157pub struct Packet<'a> {
158    /// The packet's header
159    pub header: &'a Header,
160    /// The packet's payload
161    pub payload: &'a [u8],
162}
163
164impl<'a> Packet<'a> {
165    /// The size of this packet according to its header (as [`Self::payload`] may have been
166    /// over-allocated for the size of the packet).
167    pub fn size(&self) -> usize {
168        self.header.packet_size()
169    }
170
171    fn size_with_payload(payload_size: usize) -> usize {
172        size_of::<Header>() + payload_size
173    }
174
175    fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
176        // split off and validate the header
177        let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
178            return Err(std::io::Error::other("insufficient data for last packet"));
179        };
180        let header = Header::try_ref_from_bytes(header).map_err(|err| {
181            std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
182        })?;
183        if header.magic != *Header::MAGIC {
184            return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
185        }
186        // validate the payload length
187        let payload_len = Into::<u64>::into(header.payload_len) as usize;
188        let body_len = body.len();
189        if payload_len > body_len {
190            return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
191        }
192
193        let (payload, remain) = body.split_at(payload_len);
194        Ok((Packet { header, payload }, remain))
195    }
196
197    /// Writes the packet to a buffer when the buffer is known to be large enough to hold it. Note
198    /// that the packet header's [`Header::payload_len`] must be correct before calling this, it
199    /// does not use the size of [`Self::payload`] to decide how much of the payload buffer is
200    /// valid.
201    ///
202    /// # Panics
203    ///
204    /// Panics if the buffer is not large enough for the packet.
205    pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
206        let (packet, remain) = buf.split_at_mut(self.size());
207        self.header.write_to_prefix(packet).unwrap();
208        self.payload.write_to_suffix(packet).unwrap();
209        remain
210    }
211}
212
213/// A typed mutable reference to the contents of a packet in a buffer.
214#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
215pub struct PacketMut<'a> {
216    /// The packet's header.
217    pub header: &'a mut Header,
218    /// The packet's payload.
219    pub payload: &'a mut [u8],
220}
221
222impl<'a> PacketMut<'a> {
223    /// Creates a new [`PacketMut`] inside the given buffer and initializes the header to the given
224    /// [`PacketType`] before returning it. All other fields in the header will be zeroed, and the
225    /// [`PacketMut::payload`] will be the remaining area of the buffer after the header.
226    ///
227    /// Use [`PacketMut::finish`] to validate and write the proper packet length and return the
228    /// total size of the packet.
229    ///
230    /// # Panics
231    ///
232    /// The buffer must be large enough to hold at least a packet header, and this will panic if
233    /// it's not.
234    pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
235        Header::new(packet_type)
236            .write_to_prefix(buf)
237            .expect("not enough room in buffer for packet header");
238        let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
239        let header = Header::try_mut_from_bytes(header_bytes).unwrap();
240        PacketMut { header, payload }
241    }
242
243    /// Validates the correctness of the packet and returns the size of the packet within the
244    /// original buffer.
245    pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
246        if payload_len <= self.payload.len() {
247            self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
248            Ok(Header::SIZE + payload_len)
249        } else {
250            Err(PacketTooBigError)
251        }
252    }
253}
254
255/// Reads a sequence of vsock packets from a given usb packet buffer
256pub struct VsockPacketIterator<'a> {
257    buf: Option<&'a [u8]>,
258}
259
260impl<'a> VsockPacketIterator<'a> {
261    /// Creates a new [`PacketStream`] from the contents of `buf`. The returned stream will
262    /// iterate over individual vsock packets.
263    pub fn new(buf: &'a [u8]) -> Self {
264        Self { buf: Some(buf) }
265    }
266}
267
268impl<'a> FusedIterator for VsockPacketIterator<'a> {}
269impl<'a> Iterator for VsockPacketIterator<'a> {
270    type Item = Result<Packet<'a>, std::io::Error>;
271
272    fn next(&mut self) -> Option<Self::Item> {
273        // return immediately if we've already returned `None` or `Some(Err)`
274        let data = self.buf.take()?;
275
276        // also return immediately if there's no more data in the buffer
277        if data.len() == 0 {
278            return None;
279        }
280
281        match Packet::parse_next(data) {
282            Ok((header, rest)) => {
283                // update our pointer for next time
284                self.buf = Some(rest);
285                Some(Ok(header))
286            }
287            Err(err) => Some(Err(err)),
288        }
289    }
290}
291
292/// Builds an aggregate usb packet out of vsock packets and gives readiness
293/// notifications when there is room to add another packet or data available to send.
294pub struct UsbPacketBuilder<B> {
295    buffer: B,
296    offset: usize,
297    space_waker: AtomicWaker,
298    packet_waker: AtomicWaker,
299}
300
301/// the size of the packet would have been too large even if the buffer was empty
302#[derive(Debug, Copy, Clone)]
303pub struct PacketTooBigError;
304
305impl<B> UsbPacketBuilder<B> {
306    /// Creates a new builder from `buffer`, which is a type that can be used as a mutable slice
307    /// with space available for storing vsock packets. The `readable_notify` will have a message
308    /// sent to it whenever a usb packet could be transmitted.
309    pub fn new(buffer: B) -> Self {
310        let offset = 0;
311        let space_waker = AtomicWaker::default();
312        let packet_waker = AtomicWaker::default();
313        Self { buffer, offset, space_waker, packet_waker }
314    }
315
316    /// Returns true if the packet has data in it
317    pub fn has_data(&self) -> bool {
318        self.offset > 0
319    }
320}
321
322impl<B> UsbPacketBuilder<B>
323where
324    B: std::ops::DerefMut<Target = [u8]>,
325{
326    /// Gets the space currently available for another packet in the buffer
327    pub fn available(&self) -> usize {
328        self.buffer.len() - self.offset
329    }
330
331    /// Writes the given packet into the buffer. The packet and header must be able to fit
332    /// within the buffer provided at creation time.
333    pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
334        let packet_size = packet.size();
335        if self.available() >= packet_size {
336            packet.write_to_unchecked(&mut self.buffer[self.offset..]);
337            self.offset += packet_size;
338            self.packet_waker.wake();
339            Ok(())
340        } else {
341            Err(PacketTooBigError)
342        }
343    }
344
345    /// Takes the current usb packet, if there is one. The returned mutable slice
346    /// will be only the data written to the buffer so far, and packet writing will be reset to the
347    /// beginning of the buffer.
348    pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
349        let written = self.offset;
350        if written == 0 {
351            return None;
352        }
353        self.offset = 0;
354        self.space_waker.wake();
355        Some(&mut self.buffer[0..written])
356    }
357}
358
359pub(crate) struct UsbPacketFiller<B> {
360    current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
361    out_packet_waker: AtomicWaker,
362    filled_packet_waker: AtomicWaker,
363}
364
365impl<B> Default for UsbPacketFiller<B> {
366    fn default() -> Self {
367        let current_out_packet = Mutex::default();
368        let out_packet_waker = AtomicWaker::default();
369        let filled_packet_waker = AtomicWaker::default();
370        Self { current_out_packet, out_packet_waker, filled_packet_waker }
371    }
372}
373
374impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
375    fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
376        WaitForFillable { filler: &self, min_packet_size }
377    }
378
379    pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
380        let mut builder = self.wait_for_fillable(packet.size()).await;
381        builder.as_mut().unwrap().write_vsock_packet(packet)?;
382        self.filled_packet_waker.wake();
383        Ok(())
384    }
385
386    pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
387        let header = &mut Header::new(PacketType::Data);
388        header.set_address(&address);
389        let mut builder = self.wait_for_fillable(1).await;
390        let builder = builder.as_mut().unwrap();
391        let writing = min(payload.len(), builder.available() - Header::SIZE);
392        header.payload_len.set(writing as u32);
393        builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
394        self.filled_packet_waker.wake();
395        writing
396    }
397
398    pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
399        let mut written = 0;
400        while written < payload.len() {
401            written += self.write_vsock_data(address, &payload[written..]).await;
402        }
403    }
404
405    /// Provides a packet builder for the state machine to write packets to. Returns a future that
406    /// will be fulfilled when there is data available to send on the packet.
407    ///
408    /// # Panics
409    ///
410    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
411    pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
412        FillUsbPacket(&self, Some(builder))
413    }
414}
415
416pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
417
418impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
419    type Output = UsbPacketBuilder<B>;
420
421    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422        // if we're holding a `PacketBuilder` we haven't been waited on yet. Otherwise we want
423        // to return ready when there's a packet and it's got data in it.
424        if let Some(builder) = self.1.take() {
425            // if the packet we were handed for some reason already has data in it, hand it back
426            if builder.has_data() {
427                return Poll::Ready(builder);
428            }
429
430            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
431            assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
432            current_out_packet.replace(builder);
433            self.0.out_packet_waker.wake();
434            self.0.filled_packet_waker.register(cx.waker());
435            Poll::Pending
436        } else {
437            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
438            let Some(builder) = current_out_packet.take() else {
439                panic!("Packet builder was somehow removed from connection prematurely");
440            };
441
442            if builder.has_data() {
443                self.0.filled_packet_waker.wake();
444                Poll::Ready(builder)
445            } else {
446                // if there hasn't been any data placed in the packet, put the builder back and
447                // return Pending.
448                current_out_packet.replace(builder);
449                Poll::Pending
450            }
451        }
452    }
453}
454
455pub(crate) struct WaitForFillable<'a, B> {
456    filler: &'a UsbPacketFiller<B>,
457    min_packet_size: usize,
458}
459
460impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
461    type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
462
463    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
464        let current_out_packet = self.filler.current_out_packet.lock().unwrap();
465        let Some(builder) = &*current_out_packet else {
466            self.filler.out_packet_waker.register(cx.waker());
467            return Poll::Pending;
468        };
469        if builder.available() >= self.min_packet_size {
470            Poll::Ready(current_out_packet)
471        } else {
472            self.filler.out_packet_waker.register(cx.waker());
473            Poll::Pending
474        }
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use std::sync::Arc;
481
482    use super::*;
483    use fuchsia_async::Task;
484    use futures::poll;
485
486    async fn assert_pending<F: Future>(fut: F) {
487        let fut = std::pin::pin!(fut);
488        if let Poll::Ready(_) = poll!(fut) {
489            panic!("Future was ready when it shouldn't have been");
490        }
491    }
492
493    #[fuchsia::test]
494    async fn roundtrip_packet() {
495        let payload = b"hello world!";
496        let packet = Packet {
497            payload,
498            header: &Header {
499                device_cid: 1.into(),
500                host_cid: 2.into(),
501                device_port: 3.into(),
502                host_port: 4.into(),
503                payload_len: little_endian::U32::from(payload.len() as u32),
504                ..Header::new(PacketType::Data)
505            },
506        };
507        let buffer = vec![0; packet.size()];
508        let builder = UsbPacketBuilder::new(buffer);
509        let filler = UsbPacketFiller::default();
510        let mut filled_fut = filler.fill_usb_packet(builder);
511        println!("we should not be ready to pull a usb packet off yet");
512        assert_pending(&mut filled_fut).await;
513
514        println!("we should be able to write a packet though ({} bytes)", packet.size());
515        filler.write_vsock_packet(&packet).await.unwrap();
516
517        println!("we shouldn't have any space for another packet now");
518        assert_pending(filler.wait_for_fillable(1)).await;
519
520        println!("but we should have a new usb packet available");
521        let mut builder = filled_fut.await;
522        let buffer = builder.take_usb_packet().unwrap();
523
524        println!("the packet we get back out should be the same one we put in");
525        let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
526        assert_eq!(packet, read_packet);
527        assert!(remain.is_empty());
528    }
529
530    #[fuchsia::test]
531    async fn many_packets() {
532        fn make_numbered_packet(num: u32) -> (Header, String) {
533            let payload = format!("packet #{num}!");
534            let header = Header {
535                device_cid: num.into(),
536                device_port: num.into(),
537                host_cid: num.into(),
538                host_port: num.into(),
539                payload_len: little_endian::U32::from(payload.len() as u32),
540                ..Header::new(PacketType::Data)
541            };
542            (header, payload)
543        }
544        const BUFFER_SIZE: usize = 256;
545        let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
546        let filler = Arc::new(UsbPacketFiller::default());
547
548        let send_filler = filler.clone();
549        let send_task = Task::spawn(async move {
550            for packet_num in 0..1024 {
551                let next_packet = make_numbered_packet(packet_num);
552                let next_packet =
553                    Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
554                send_filler.write_vsock_packet(&next_packet).await.unwrap();
555            }
556        });
557
558        let mut read_packet_num = 0;
559        while read_packet_num < 1024 {
560            builder = filler.fill_usb_packet(builder).await;
561            let buffer = builder.take_usb_packet().unwrap();
562            let mut num_packets = 0;
563            for packet in VsockPacketIterator::new(&buffer) {
564                let packet_compare = make_numbered_packet(read_packet_num);
565                let packet_compare =
566                    Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
567                assert_eq!(packet.unwrap(), packet_compare);
568                read_packet_num += 1;
569                num_packets += 1;
570            }
571            println!(
572                "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
573                count = BUFFER_SIZE - buffer.len()
574            );
575        }
576        send_task.await;
577        assert_eq!(1024, read_packet_num);
578    }
579
580    #[fuchsia::test]
581    async fn packet_fillable_futures() {
582        let filler = UsbPacketFiller::default();
583
584        for _ in 0..10 {
585            println!("register an interest in filling a usb packet");
586            let mut fillable_fut = filler.wait_for_fillable(1);
587            println!("make sure we have nothing to fill");
588            assert!(poll!(&mut fillable_fut).is_pending());
589
590            println!("register a packet for filling");
591            let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
592            println!("make sure we've registered the buffer");
593            assert!(poll!(&mut filled_fut).is_pending());
594
595            println!("now put some things in the packet");
596            let header = &mut Header::new(PacketType::Data);
597            header.payload_len.set(99);
598            let Poll::Ready(mut builder) = poll!(fillable_fut) else {
599                panic!("should have been ready to fill a packet")
600            };
601            builder
602                .as_mut()
603                .unwrap()
604                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
605                .unwrap();
606            drop(builder);
607            let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
608                panic!("should have been ready to fill a packet(2)")
609            };
610            builder
611                .as_mut()
612                .unwrap()
613                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
614                .unwrap();
615            drop(builder);
616
617            println!("but if we ask for too much space we'll get pending");
618            assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
619
620            println!("and now resolve the filled future and get our data back");
621            let mut filled = filled_fut.await;
622            let packets =
623                Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
624            assert_eq!(packets.len(), 2);
625        }
626    }
627}