1use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18#[repr(u8)]
21#[derive(
22 Debug,
23 TryFromBytes,
24 IntoBytes,
25 KnownLayout,
26 Immutable,
27 Unaligned,
28 PartialEq,
29 Eq,
30 PartialOrd,
31 Ord,
32 Hash,
33 Clone,
34 Copy,
35)]
36pub enum PacketType {
37 Sync = b'S',
43 Echo = b'E',
46 EchoReply = b'e',
49 Connect = b'C',
51 Finish = b'F',
55 Reset = b'R',
60 Accept = b'A',
63 Data = b'D',
68}
69
70#[repr(C, packed(1))]
73#[derive(
74 Debug,
75 TryFromBytes,
76 IntoBytes,
77 KnownLayout,
78 Immutable,
79 Unaligned,
80 PartialEq,
81 Eq,
82 PartialOrd,
83 Ord,
84 Hash,
85 Clone,
86)]
87pub struct Header {
88 magic: [u8; 3],
89 pub packet_type: PacketType,
91 pub device_cid: little_endian::U32,
96 pub host_cid: little_endian::U32,
101 pub device_port: little_endian::U32,
106 pub host_port: little_endian::U32,
111 pub payload_len: little_endian::U32,
114}
115
116impl Header {
117 pub const SIZE: usize = size_of::<Self>();
119 const MAGIC: &'static [u8; 3] = b"ffx";
120
121 pub fn new(packet_type: PacketType) -> Self {
124 let device_cid = 0.into();
125 let host_cid = 0.into();
126 let device_port = 0.into();
127 let host_port = 0.into();
128 let payload_len = 0.into();
129 Header {
130 magic: *Self::MAGIC,
131 packet_type,
132 device_cid,
133 host_cid,
134 device_port,
135 host_port,
136 payload_len,
137 }
138 }
139
140 pub fn packet_size(&self) -> usize {
143 Packet::size_with_payload(self.payload_len.get() as usize)
144 }
145
146 pub fn set_address(&mut self, addr: &Address) {
148 self.device_cid.set(addr.device_cid);
149 self.host_cid.set(addr.host_cid);
150 self.device_port.set(addr.device_port);
151 self.host_port.set(addr.host_port);
152 }
153}
154
155#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
157pub struct Packet<'a> {
158 pub header: &'a Header,
160 pub payload: &'a [u8],
162}
163
164impl<'a> Packet<'a> {
165 pub fn size(&self) -> usize {
168 self.header.packet_size()
169 }
170
171 fn size_with_payload(payload_size: usize) -> usize {
172 size_of::<Header>() + payload_size
173 }
174
175 fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
176 let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
178 return Err(std::io::Error::other("insufficient data for last packet"));
179 };
180 let header = Header::try_ref_from_bytes(header).map_err(|err| {
181 std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
182 })?;
183 if header.magic != *Header::MAGIC {
184 return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
185 }
186 let payload_len = Into::<u64>::into(header.payload_len) as usize;
188 let body_len = body.len();
189 if payload_len > body_len {
190 return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
191 }
192
193 let (payload, remain) = body.split_at(payload_len);
194 Ok((Packet { header, payload }, remain))
195 }
196
197 pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
206 let (packet, remain) = buf.split_at_mut(self.size());
207 self.header.write_to_prefix(packet).unwrap();
208 self.payload.write_to_suffix(packet).unwrap();
209 remain
210 }
211}
212
213#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
215pub struct PacketMut<'a> {
216 pub header: &'a mut Header,
218 pub payload: &'a mut [u8],
220}
221
222impl<'a> PacketMut<'a> {
223 pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
235 Header::new(packet_type)
236 .write_to_prefix(buf)
237 .expect("not enough room in buffer for packet header");
238 let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
239 let header = Header::try_mut_from_bytes(header_bytes).unwrap();
240 PacketMut { header, payload }
241 }
242
243 pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
246 if payload_len <= self.payload.len() {
247 self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
248 Ok(Header::SIZE + payload_len)
249 } else {
250 Err(PacketTooBigError)
251 }
252 }
253}
254
255pub struct VsockPacketIterator<'a> {
257 buf: Option<&'a [u8]>,
258}
259
260impl<'a> VsockPacketIterator<'a> {
261 pub fn new(buf: &'a [u8]) -> Self {
264 Self { buf: Some(buf) }
265 }
266}
267
268impl<'a> FusedIterator for VsockPacketIterator<'a> {}
269impl<'a> Iterator for VsockPacketIterator<'a> {
270 type Item = Result<Packet<'a>, std::io::Error>;
271
272 fn next(&mut self) -> Option<Self::Item> {
273 let data = self.buf.take()?;
275
276 if data.len() == 0 {
278 return None;
279 }
280
281 match Packet::parse_next(data) {
282 Ok((header, rest)) => {
283 self.buf = Some(rest);
285 Some(Ok(header))
286 }
287 Err(err) => Some(Err(err)),
288 }
289 }
290}
291
292pub struct UsbPacketBuilder<B> {
295 buffer: B,
296 offset: usize,
297 space_waker: AtomicWaker,
298 packet_waker: AtomicWaker,
299}
300
301#[derive(Debug, Copy, Clone)]
303pub struct PacketTooBigError;
304
305impl<B> UsbPacketBuilder<B> {
306 pub fn new(buffer: B) -> Self {
310 let offset = 0;
311 let space_waker = AtomicWaker::default();
312 let packet_waker = AtomicWaker::default();
313 Self { buffer, offset, space_waker, packet_waker }
314 }
315
316 pub fn has_data(&self) -> bool {
318 self.offset > 0
319 }
320}
321
322impl<B> UsbPacketBuilder<B>
323where
324 B: std::ops::DerefMut<Target = [u8]>,
325{
326 pub fn available(&self) -> usize {
328 self.buffer.len() - self.offset
329 }
330
331 pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
334 let packet_size = packet.size();
335 if self.available() >= packet_size {
336 packet.write_to_unchecked(&mut self.buffer[self.offset..]);
337 self.offset += packet_size;
338 self.packet_waker.wake();
339 Ok(())
340 } else {
341 Err(PacketTooBigError)
342 }
343 }
344
345 pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
349 let written = self.offset;
350 if written == 0 {
351 return None;
352 }
353 self.offset = 0;
354 self.space_waker.wake();
355 Some(&mut self.buffer[0..written])
356 }
357}
358
359pub(crate) struct UsbPacketFiller<B> {
360 current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
361 out_packet_waker: AtomicWaker,
362 filled_packet_waker: AtomicWaker,
363}
364
365impl<B> Default for UsbPacketFiller<B> {
366 fn default() -> Self {
367 let current_out_packet = Mutex::default();
368 let out_packet_waker = AtomicWaker::default();
369 let filled_packet_waker = AtomicWaker::default();
370 Self { current_out_packet, out_packet_waker, filled_packet_waker }
371 }
372}
373
374impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
375 fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
376 WaitForFillable { filler: &self, min_packet_size }
377 }
378
379 pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
380 let mut builder = self.wait_for_fillable(packet.size()).await;
381 builder.as_mut().unwrap().write_vsock_packet(packet)?;
382 self.filled_packet_waker.wake();
383 Ok(())
384 }
385
386 pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
387 let header = &mut Header::new(PacketType::Data);
388 header.set_address(&address);
389 let mut builder = self.wait_for_fillable(1).await;
390 let builder = builder.as_mut().unwrap();
391 let writing = min(payload.len(), builder.available() - Header::SIZE);
392 header.payload_len.set(writing as u32);
393 builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
394 self.filled_packet_waker.wake();
395 writing
396 }
397
398 pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
399 let mut written = 0;
400 while written < payload.len() {
401 written += self.write_vsock_data(address, &payload[written..]).await;
402 }
403 }
404
405 pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
412 FillUsbPacket(&self, Some(builder))
413 }
414}
415
416pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
417
418impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
419 type Output = UsbPacketBuilder<B>;
420
421 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422 if let Some(builder) = self.1.take() {
425 if builder.has_data() {
427 return Poll::Ready(builder);
428 }
429
430 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
431 assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
432 current_out_packet.replace(builder);
433 self.0.out_packet_waker.wake();
434 self.0.filled_packet_waker.register(cx.waker());
435 Poll::Pending
436 } else {
437 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
438 let Some(builder) = current_out_packet.take() else {
439 panic!("Packet builder was somehow removed from connection prematurely");
440 };
441
442 if builder.has_data() {
443 self.0.filled_packet_waker.wake();
444 Poll::Ready(builder)
445 } else {
446 current_out_packet.replace(builder);
449 Poll::Pending
450 }
451 }
452 }
453}
454
455pub(crate) struct WaitForFillable<'a, B> {
456 filler: &'a UsbPacketFiller<B>,
457 min_packet_size: usize,
458}
459
460impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
461 type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
462
463 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
464 let current_out_packet = self.filler.current_out_packet.lock().unwrap();
465 let Some(builder) = &*current_out_packet else {
466 self.filler.out_packet_waker.register(cx.waker());
467 return Poll::Pending;
468 };
469 if builder.available() >= self.min_packet_size {
470 Poll::Ready(current_out_packet)
471 } else {
472 self.filler.out_packet_waker.register(cx.waker());
473 Poll::Pending
474 }
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use std::sync::Arc;
481
482 use super::*;
483 use fuchsia_async::Task;
484 use futures::poll;
485
486 async fn assert_pending<F: Future>(fut: F) {
487 let fut = std::pin::pin!(fut);
488 if let Poll::Ready(_) = poll!(fut) {
489 panic!("Future was ready when it shouldn't have been");
490 }
491 }
492
493 #[fuchsia::test]
494 async fn roundtrip_packet() {
495 let payload = b"hello world!";
496 let packet = Packet {
497 payload,
498 header: &Header {
499 device_cid: 1.into(),
500 host_cid: 2.into(),
501 device_port: 3.into(),
502 host_port: 4.into(),
503 payload_len: little_endian::U32::from(payload.len() as u32),
504 ..Header::new(PacketType::Data)
505 },
506 };
507 let buffer = vec![0; packet.size()];
508 let builder = UsbPacketBuilder::new(buffer);
509 let filler = UsbPacketFiller::default();
510 let mut filled_fut = filler.fill_usb_packet(builder);
511 println!("we should not be ready to pull a usb packet off yet");
512 assert_pending(&mut filled_fut).await;
513
514 println!("we should be able to write a packet though ({} bytes)", packet.size());
515 filler.write_vsock_packet(&packet).await.unwrap();
516
517 println!("we shouldn't have any space for another packet now");
518 assert_pending(filler.wait_for_fillable(1)).await;
519
520 println!("but we should have a new usb packet available");
521 let mut builder = filled_fut.await;
522 let buffer = builder.take_usb_packet().unwrap();
523
524 println!("the packet we get back out should be the same one we put in");
525 let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
526 assert_eq!(packet, read_packet);
527 assert!(remain.is_empty());
528 }
529
530 #[fuchsia::test]
531 async fn many_packets() {
532 fn make_numbered_packet(num: u32) -> (Header, String) {
533 let payload = format!("packet #{num}!");
534 let header = Header {
535 device_cid: num.into(),
536 device_port: num.into(),
537 host_cid: num.into(),
538 host_port: num.into(),
539 payload_len: little_endian::U32::from(payload.len() as u32),
540 ..Header::new(PacketType::Data)
541 };
542 (header, payload)
543 }
544 const BUFFER_SIZE: usize = 256;
545 let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
546 let filler = Arc::new(UsbPacketFiller::default());
547
548 let send_filler = filler.clone();
549 let send_task = Task::spawn(async move {
550 for packet_num in 0..1024 {
551 let next_packet = make_numbered_packet(packet_num);
552 let next_packet =
553 Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
554 send_filler.write_vsock_packet(&next_packet).await.unwrap();
555 }
556 });
557
558 let mut read_packet_num = 0;
559 while read_packet_num < 1024 {
560 builder = filler.fill_usb_packet(builder).await;
561 let buffer = builder.take_usb_packet().unwrap();
562 let mut num_packets = 0;
563 for packet in VsockPacketIterator::new(&buffer) {
564 let packet_compare = make_numbered_packet(read_packet_num);
565 let packet_compare =
566 Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
567 assert_eq!(packet.unwrap(), packet_compare);
568 read_packet_num += 1;
569 num_packets += 1;
570 }
571 println!(
572 "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
573 count = BUFFER_SIZE - buffer.len()
574 );
575 }
576 send_task.await;
577 assert_eq!(1024, read_packet_num);
578 }
579
580 #[fuchsia::test]
581 async fn packet_fillable_futures() {
582 let filler = UsbPacketFiller::default();
583
584 for _ in 0..10 {
585 println!("register an interest in filling a usb packet");
586 let mut fillable_fut = filler.wait_for_fillable(1);
587 println!("make sure we have nothing to fill");
588 assert!(poll!(&mut fillable_fut).is_pending());
589
590 println!("register a packet for filling");
591 let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
592 println!("make sure we've registered the buffer");
593 assert!(poll!(&mut filled_fut).is_pending());
594
595 println!("now put some things in the packet");
596 let header = &mut Header::new(PacketType::Data);
597 header.payload_len.set(99);
598 let Poll::Ready(mut builder) = poll!(fillable_fut) else {
599 panic!("should have been ready to fill a packet")
600 };
601 builder
602 .as_mut()
603 .unwrap()
604 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
605 .unwrap();
606 drop(builder);
607 let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
608 panic!("should have been ready to fill a packet(2)")
609 };
610 builder
611 .as_mut()
612 .unwrap()
613 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
614 .unwrap();
615 drop(builder);
616
617 println!("but if we ask for too much space we'll get pending");
618 assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
619
620 println!("and now resolve the filled future and get our data back");
621 let mut filled = filled_fut.await;
622 let packets =
623 Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
624 assert_eq!(packets.len(), 2);
625 }
626 }
627}