1use fuchsia_sync::{Mutex, MutexGuard};
6use std::cmp::min;
7use std::future::Future;
8use std::iter::FusedIterator;
9use std::ops::DerefMut;
10use std::pin::Pin;
11use std::sync::atomic;
12use std::task::{Context, Poll, Waker};
13
14use futures::task::AtomicWaker;
15use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned, little_endian};
16
17use crate::Address;
18
19#[repr(u8)]
22#[derive(
23 Debug,
24 TryFromBytes,
25 IntoBytes,
26 KnownLayout,
27 Immutable,
28 Unaligned,
29 PartialEq,
30 Eq,
31 PartialOrd,
32 Ord,
33 Hash,
34 Clone,
35 Copy,
36)]
37pub enum PacketType {
38 Sync = b'S',
44 Echo = b'E',
47 EchoReply = b'e',
50 Connect = b'C',
52 Finish = b'F',
56 Reset = b'R',
61 Accept = b'A',
64 Data = b'D',
69 Pause = b'X',
74}
75
76#[repr(C, packed(1))]
79#[derive(
80 Debug,
81 TryFromBytes,
82 IntoBytes,
83 KnownLayout,
84 Immutable,
85 Unaligned,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90 Hash,
91 Clone,
92)]
93pub struct Header {
94 magic: [u8; 3],
95 pub packet_type: PacketType,
97 pub device_cid: little_endian::U32,
102 pub host_cid: little_endian::U32,
107 pub device_port: little_endian::U32,
112 pub host_port: little_endian::U32,
117 pub payload_len: little_endian::U32,
120}
121
122impl Header {
123 pub const SIZE: usize = size_of::<Self>();
125 const MAGIC: &'static [u8; 3] = b"ffx";
126
127 pub fn new(packet_type: PacketType) -> Self {
130 let device_cid = 0.into();
131 let host_cid = 0.into();
132 let device_port = 0.into();
133 let host_port = 0.into();
134 let payload_len = 0.into();
135 Header {
136 magic: *Self::MAGIC,
137 packet_type,
138 device_cid,
139 host_cid,
140 device_port,
141 host_port,
142 payload_len,
143 }
144 }
145
146 pub fn packet_size(&self) -> usize {
149 Packet::size_with_payload(self.payload_len.get() as usize)
150 }
151
152 pub fn set_address(&mut self, addr: &Address) {
154 self.device_cid.set(addr.device_cid);
155 self.host_cid.set(addr.host_cid);
156 self.device_port.set(addr.device_port);
157 self.host_port.set(addr.host_port);
158 }
159}
160
161#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
163pub struct Packet<'a> {
164 pub header: &'a Header,
166 pub payload: &'a [u8],
168}
169
170impl<'a> Packet<'a> {
171 pub fn size(&self) -> usize {
174 self.header.packet_size()
175 }
176
177 fn size_with_payload(payload_size: usize) -> usize {
178 size_of::<Header>() + payload_size
179 }
180
181 fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
182 let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
184 return Err(std::io::Error::other("insufficient data for last packet"));
185 };
186 let header = Header::try_ref_from_bytes(header).map_err(|err| {
187 std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
188 })?;
189 if header.magic != *Header::MAGIC {
190 return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
191 }
192 let payload_len = Into::<u64>::into(header.payload_len) as usize;
194 let body_len = body.len();
195 if payload_len > body_len {
196 return Err(std::io::Error::other(format!(
197 "payload length ({payload_len}) on usb vsock header {header:#?} was larger than available in buffer ({body_len})"
198 )));
199 }
200
201 let (payload, remain) = body.split_at(payload_len);
202 Ok((Packet { header, payload }, remain))
203 }
204
205 pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
214 let (packet, remain) = buf.split_at_mut(self.size());
215 let payload_len = u32::from(self.header.payload_len) as usize;
216 self.header.write_to_prefix(packet).unwrap();
217 self.payload[..payload_len].write_to_suffix(packet).unwrap();
218 remain
219 }
220}
221
222#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
224pub struct PacketMut<'a> {
225 pub header: &'a mut Header,
227 pub payload: &'a mut [u8],
229}
230
231impl<'a> PacketMut<'a> {
232 pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
244 Header::new(packet_type)
245 .write_to_prefix(buf)
246 .expect("not enough room in buffer for packet header");
247 let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
248 let header = Header::try_mut_from_bytes(header_bytes).unwrap();
249 PacketMut { header, payload }
250 }
251
252 pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
255 if payload_len <= self.payload.len() {
256 self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
257 Ok(Header::SIZE + payload_len)
258 } else {
259 Err(PacketTooBigError)
260 }
261 }
262}
263
264pub struct VsockPacketIterator<'a> {
266 buf: Option<&'a [u8]>,
267}
268
269impl<'a> VsockPacketIterator<'a> {
270 pub fn new(buf: &'a [u8]) -> Self {
273 Self { buf: Some(buf) }
274 }
275}
276
277impl<'a> FusedIterator for VsockPacketIterator<'a> {}
278impl<'a> Iterator for VsockPacketIterator<'a> {
279 type Item = Result<Packet<'a>, std::io::Error>;
280
281 fn next(&mut self) -> Option<Self::Item> {
282 let data = self.buf.take()?;
284
285 if data.len() == 0 {
287 return None;
288 }
289
290 match Packet::parse_next(data) {
291 Ok((header, rest)) => {
292 self.buf = Some(rest);
294 Some(Ok(header))
295 }
296 Err(err) => Some(Err(err)),
297 }
298 }
299}
300
301pub struct UsbPacketBuilder<B> {
304 buffer: B,
305 offset: usize,
306 space_waker: AtomicWaker,
307 packet_waker: AtomicWaker,
308}
309
310#[derive(Debug, Copy, Clone)]
312pub struct PacketTooBigError;
313
314impl<B> UsbPacketBuilder<B> {
315 pub fn new(buffer: B) -> Self {
319 let offset = 0;
320 let space_waker = AtomicWaker::default();
321 let packet_waker = AtomicWaker::default();
322 Self { buffer, offset, space_waker, packet_waker }
323 }
324
325 pub fn has_data(&self) -> bool {
327 self.offset > 0
328 }
329}
330
331impl<B> UsbPacketBuilder<B>
332where
333 B: std::ops::DerefMut<Target = [u8]>,
334{
335 pub fn available(&self) -> usize {
337 self.buffer.len() - self.offset
338 }
339
340 pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
344 let packet_size = packet.size();
345 if self.available() >= packet_size {
346 packet.write_to_unchecked(&mut self.buffer[self.offset..]);
347 self.offset += packet_size;
348 self.packet_waker.wake();
349 Ok(())
350 } else {
351 Err(PacketTooBigError)
352 }
353 }
354
355 pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
359 let written = self.offset;
360 if written == 0 {
361 return None;
362 }
363 self.offset = 0;
364 self.space_waker.wake();
365 Some(&mut self.buffer[0..written])
366 }
367}
368
369#[derive(Debug, Copy, Clone)]
371pub struct ShutdownError;
372
373impl From<ShutdownError> for std::io::Error {
374 fn from(_: ShutdownError) -> Self {
375 std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Transport shut down")
376 }
377}
378
379#[derive(Debug, Copy, Clone)]
381pub(crate) enum WritePacketError {
382 PacketTooBig(PacketTooBigError),
384 Shutdown(ShutdownError),
386}
387
388impl From<PacketTooBigError> for WritePacketError {
389 fn from(value: PacketTooBigError) -> Self {
390 WritePacketError::PacketTooBig(value)
391 }
392}
393
394impl From<ShutdownError> for WritePacketError {
395 fn from(value: ShutdownError) -> Self {
396 WritePacketError::Shutdown(value)
397 }
398}
399
400pub(crate) trait WritePacketErrorExt {
403 type Residue;
405
406 fn assert_right_size(self) -> Self::Residue;
409
410 fn expect_right_size(self, err: &str) -> Self::Residue;
413}
414
415impl WritePacketErrorExt for WritePacketError {
416 type Residue = ShutdownError;
417
418 fn assert_right_size(self) -> ShutdownError {
419 self.expect_right_size("Unexpected PacketTooBigError in WritePacketError")
420 }
421
422 fn expect_right_size(self, err: &str) -> ShutdownError {
423 return match self {
424 WritePacketError::PacketTooBig(_) => panic!("{err}"),
425 WritePacketError::Shutdown(shutdown_error) => shutdown_error,
426 };
427 }
428}
429
430impl<T> WritePacketErrorExt for Result<T, WritePacketError> {
431 type Residue = Result<T, ShutdownError>;
432
433 fn assert_right_size(self) -> Result<T, ShutdownError> {
434 match self {
435 Ok(x) => Ok(Ok(x)),
436 Err(WritePacketError::Shutdown(s)) => Ok(Err(s)),
437 Err(WritePacketError::PacketTooBig(p)) => Err(p),
438 }
439 .unwrap()
440 }
441
442 fn expect_right_size(self, err: &str) -> Result<T, ShutdownError> {
443 match self {
444 Ok(x) => Ok(Ok(x)),
445 Err(WritePacketError::Shutdown(s)) => Ok(Err(s)),
446 Err(WritePacketError::PacketTooBig(p)) => Err(p),
447 }
448 .expect(err)
449 }
450}
451
452pub(crate) struct UsbPacketFiller<B> {
453 current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
454 out_packet_wakers: Mutex<Vec<Waker>>,
455 filled_packet_waker: AtomicWaker,
456 is_shutdown: atomic::AtomicBool,
457}
458
459impl<B> Default for UsbPacketFiller<B> {
460 fn default() -> Self {
461 let current_out_packet = Mutex::default();
462 let out_packet_wakers = Mutex::default();
463 let filled_packet_waker = AtomicWaker::default();
464 let is_shutdown = atomic::AtomicBool::new(false);
465 Self { current_out_packet, out_packet_wakers, filled_packet_waker, is_shutdown }
466 }
467}
468
469impl<B> UsbPacketFiller<B> {
470 fn is_shutdown(&self) -> bool {
471 self.is_shutdown.load(atomic::Ordering::Acquire)
472 }
473}
474
475impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
476 fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
477 WaitForFillable { filler: &self, min_packet_size }
478 }
479
480 pub fn shutdown(&self) {
481 let mut out_packets = self.current_out_packet.lock();
482 let mut out_packet_wakers = self.out_packet_wakers.lock();
483 self.is_shutdown.store(true, atomic::Ordering::Release);
484 *out_packets = None;
485 out_packet_wakers.drain(..).for_each(Waker::wake);
486 self.filled_packet_waker.wake();
487 }
488
489 pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), WritePacketError> {
490 let mut builder = self.wait_for_fillable(packet.size()).await?;
491 builder.as_mut().unwrap().write_vsock_packet(packet)?;
492 self.filled_packet_waker.wake();
493 Ok(())
494 }
495
496 pub async fn write_vsock_data(
497 &self,
498 address: &Address,
499 payload: &[u8],
500 ) -> Result<usize, ShutdownError> {
501 let header = &mut Header::new(PacketType::Data);
502 header.set_address(&address);
503 let mut builder = self.wait_for_fillable(Header::SIZE + 1).await?;
504 let builder = builder.as_mut().unwrap();
505 let writing = min(payload.len(), builder.available() - Header::SIZE);
506 header.payload_len.set(writing as u32);
507 builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
508 self.filled_packet_waker.wake();
509 Ok(writing)
510 }
511
512 pub async fn write_vsock_data_all(
513 &self,
514 address: &Address,
515 payload: &[u8],
516 ) -> Result<(), ShutdownError> {
517 let mut written = 0;
518 while written < payload.len() {
519 written += self.write_vsock_data(address, &payload[written..]).await?;
520 }
521 Ok(())
522 }
523
524 pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
531 FillUsbPacket(&self, Some(builder))
532 }
533}
534
535pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
536
537impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
538 type Output = Result<UsbPacketBuilder<B>, ShutdownError>;
539
540 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541 if let Some(builder) = self.1.take() {
544 if builder.has_data() {
546 return Poll::Ready(Ok(builder));
547 }
548
549 let mut current_out_packet = self.0.current_out_packet.lock();
550 if self.0.is_shutdown() {
551 return Poll::Ready(Err(ShutdownError));
552 }
553 assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
554 current_out_packet.replace(builder);
555
556 self.0.filled_packet_waker.register(cx.waker());
558 drop(current_out_packet);
560
561 let mut wakers = self.0.out_packet_wakers.lock();
564 for waker in wakers.drain(..) {
565 waker.wake();
566 }
567 if self.0.is_shutdown() { Poll::Ready(Err(ShutdownError)) } else { Poll::Pending }
568 } else {
569 let mut current_out_packet = self.0.current_out_packet.lock();
570 if self.0.is_shutdown() {
571 return Poll::Ready(Err(ShutdownError));
572 }
573 let Some(builder) = current_out_packet.take() else {
574 panic!("Packet builder was somehow removed from connection prematurely");
575 };
576
577 if builder.has_data() {
578 self.0.filled_packet_waker.wake();
579 Poll::Ready(Ok(builder))
580 } else {
581 current_out_packet.replace(builder);
584 Poll::Pending
585 }
586 }
587 }
588}
589
590pub(crate) struct WaitForFillable<'a, B> {
591 filler: &'a UsbPacketFiller<B>,
592 min_packet_size: usize,
593}
594
595impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
596 type Output = Result<MutexGuard<'a, Option<UsbPacketBuilder<B>>>, ShutdownError>;
597
598 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
599 let current_out_packet = self.filler.current_out_packet.lock();
600 if self.filler.is_shutdown() {
601 return Poll::Ready(Err(ShutdownError));
602 }
603 let Some(builder) = &*current_out_packet else {
604 self.filler.out_packet_wakers.lock().push(cx.waker().clone());
605 return Poll::Pending;
606 };
607 if builder.available() >= self.min_packet_size {
608 Poll::Ready(Ok(current_out_packet))
609 } else {
610 self.filler.out_packet_wakers.lock().push(cx.waker().clone());
611 Poll::Pending
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use std::sync::Arc;
619
620 use super::*;
621 use fuchsia_async::Task;
622 use futures::poll;
623
624 async fn assert_pending<F: Future>(fut: F) {
625 let fut = std::pin::pin!(fut);
626 if let Poll::Ready(_) = poll!(fut) {
627 panic!("Future was ready when it shouldn't have been");
628 }
629 }
630 #[test]
631 fn packet_write_doesnt_clobber_header_with_incorrect_len() {
632 let header = &mut Header::new(PacketType::Pause);
633 let payload = &[1];
634 let packet = Packet { header, payload };
635 let mut buf = vec![0; packet.size()];
636 packet.write_to_unchecked(&mut buf);
637
638 let read_packet = Packet::parse_next(&buf).unwrap().0;
639 assert_eq!(&read_packet.header, &packet.header);
640 }
641
642 #[fuchsia::test]
643 async fn roundtrip_packet() {
644 let payload = b"hello world!";
645 let packet = Packet {
646 payload,
647 header: &Header {
648 device_cid: 1.into(),
649 host_cid: 2.into(),
650 device_port: 3.into(),
651 host_port: 4.into(),
652 payload_len: little_endian::U32::from(payload.len() as u32),
653 ..Header::new(PacketType::Data)
654 },
655 };
656 let buffer = vec![0; packet.size()];
657 let builder = UsbPacketBuilder::new(buffer);
658 let filler = UsbPacketFiller::default();
659 let mut filled_fut = filler.fill_usb_packet(builder);
660 println!("we should not be ready to pull a usb packet off yet");
661 assert_pending(&mut filled_fut).await;
662
663 println!("we should be able to write a packet though ({} bytes)", packet.size());
664 filler.write_vsock_packet(&packet).await.unwrap();
665
666 println!("we shouldn't have any space for another packet now");
667 assert_pending(filler.wait_for_fillable(1)).await;
668
669 println!("but we should have a new usb packet available");
670 let mut builder = filled_fut.await.unwrap();
671 let buffer = builder.take_usb_packet().unwrap();
672
673 println!("the packet we get back out should be the same one we put in");
674 let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
675 assert_eq!(packet, read_packet);
676 assert!(remain.is_empty());
677 }
678
679 #[fuchsia::test]
680 async fn many_packets() {
681 fn make_numbered_packet(num: u32) -> (Header, String) {
682 let payload = format!("packet #{num}!");
683 let header = Header {
684 device_cid: num.into(),
685 device_port: num.into(),
686 host_cid: num.into(),
687 host_port: num.into(),
688 payload_len: little_endian::U32::from(payload.len() as u32),
689 ..Header::new(PacketType::Data)
690 };
691 (header, payload)
692 }
693 const BUFFER_SIZE: usize = 256;
694 let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
695 let filler = Arc::new(UsbPacketFiller::default());
696
697 let send_filler = filler.clone();
698 let send_task = Task::spawn(async move {
699 for packet_num in 0..1024 {
700 let next_packet = make_numbered_packet(packet_num);
701 let next_packet =
702 Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
703 send_filler.write_vsock_packet(&next_packet).await.unwrap();
704 }
705 });
706
707 let mut read_packet_num = 0;
708 while read_packet_num < 1024 {
709 builder = filler.fill_usb_packet(builder).await.unwrap();
710 let buffer = builder.take_usb_packet().unwrap();
711 let mut num_packets = 0;
712 for packet in VsockPacketIterator::new(&buffer) {
713 let packet_compare = make_numbered_packet(read_packet_num);
714 let packet_compare =
715 Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
716 assert_eq!(packet.unwrap(), packet_compare);
717 read_packet_num += 1;
718 num_packets += 1;
719 }
720 println!(
721 "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
722 count = BUFFER_SIZE - buffer.len()
723 );
724 }
725 send_task.await;
726 assert_eq!(1024, read_packet_num);
727 }
728
729 #[fuchsia::test]
730 async fn packet_fillable_futures() {
731 let filler = UsbPacketFiller::default();
732
733 for _ in 0..10 {
734 println!("register an interest in filling a usb packet");
735 let mut fillable_fut = filler.wait_for_fillable(1);
736 println!("make sure we have nothing to fill");
737 assert!(poll!(&mut fillable_fut).is_pending());
738
739 println!("register a packet for filling");
740 let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
741 println!("make sure we've registered the buffer");
742 assert!(poll!(&mut filled_fut).is_pending());
743
744 println!("now put some things in the packet");
745 let header = &mut Header::new(PacketType::Data);
746 header.payload_len.set(99);
747 let Poll::Ready(Ok(mut builder)) = poll!(fillable_fut) else {
748 panic!("should have been ready to fill a packet")
749 };
750 builder
751 .as_mut()
752 .unwrap()
753 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
754 .unwrap();
755 drop(builder);
756 let Poll::Ready(Ok(mut builder)) = poll!(filler.wait_for_fillable(1)) else {
757 panic!("should have been ready to fill a packet(2)")
758 };
759 builder
760 .as_mut()
761 .unwrap()
762 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
763 .unwrap();
764 drop(builder);
765
766 println!("but if we ask for too much space we'll get pending");
767 assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
768
769 println!("and now resolve the filled future and get our data back");
770 let mut filled = filled_fut.await.unwrap();
771 let packets =
772 Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
773 assert_eq!(packets.len(), 2);
774 }
775 }
776}