usb_vsock/
connection.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::channel::{mpsc, oneshot};
6use futures::lock::Mutex;
7use log::{debug, trace, warn};
8use std::collections::hash_map::Entry;
9use std::collections::HashMap;
10use std::io::{Error, ErrorKind};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use fuchsia_async::{Scope, Socket};
15use futures::io::{ReadHalf, WriteHalf};
16use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
17
18use crate::{Address, Header, Packet, PacketType, UsbPacketBuilder, UsbPacketFiller};
19
20/// A marker trait for types that are capable of being used as buffers for a [`Connection`].
21pub trait PacketBuffer: DerefMut<Target = [u8]> + Send + Unpin + 'static {}
22impl<T> PacketBuffer for T where T: DerefMut<Target = [u8]> + Send + Unpin + 'static {}
23
24/// Manages the state of a vsock-over-usb connection and the sockets over which data is being
25/// transmitted for them.
26///
27/// This implementation aims to be agnostic to both the underlying transport and the buffers used
28/// to read and write from it. The buffer type must conform to [`PacketBuffer`], which is essentially
29/// a type that holds a mutable slice of bytes and is [`Send`] and [`Unpin`]-able.
30///
31/// The client of this library will:
32/// - Use methods on this struct to initiate actions like connecting and accepting
33/// connections to the other end.
34/// - Provide buffers to be filled and sent to the other end with [`Connection::fill_usb_packet`].
35/// - Pump usb packets received into it using [`Connection::handle_vsock_packet`].
36pub struct Connection<B> {
37    control_socket_writer: Mutex<WriteHalf<Socket>>,
38    packet_filler: Arc<UsbPacketFiller<B>>,
39    connections: std::sync::Mutex<HashMap<Address, VsockConnection>>,
40    incoming_requests_tx: mpsc::Sender<ConnectionRequest>,
41    _task_scope: Scope,
42}
43
44impl<B: PacketBuffer> Connection<B> {
45    /// Creates a new connection with:
46    /// - a `control_socket`, over which data addressed to and from cid 0, port 0 (a control channel
47    /// between host and device) can be read and written from.
48    /// - An `incoming_requests_tx` that is the sender half of a request queue for incoming
49    /// connection requests from the other side.
50    pub fn new(
51        control_socket: Socket,
52        incoming_requests_tx: mpsc::Sender<ConnectionRequest>,
53    ) -> Self {
54        let (control_socket_reader, control_socket_writer) = control_socket.split();
55        let control_socket_writer = Mutex::new(control_socket_writer);
56        let packet_filler = Arc::new(UsbPacketFiller::default());
57        let connections = Default::default();
58        let task_scope = Scope::new_with_name("vsock_usb");
59        task_scope.spawn(Self::run_socket(
60            control_socket_reader,
61            Address::default(),
62            packet_filler.clone(),
63        ));
64        Self {
65            control_socket_writer,
66            packet_filler,
67            connections,
68            incoming_requests_tx,
69            _task_scope: task_scope,
70        }
71    }
72
73    async fn send_close_packet(address: &Address, usb_packet_filler: &Arc<UsbPacketFiller<B>>) {
74        let header = &mut Header::new(PacketType::Finish);
75        header.set_address(address);
76        usb_packet_filler
77            .write_vsock_packet(&Packet { header, payload: &[] })
78            .await
79            .expect("Finish packet should never be too big");
80    }
81
82    async fn run_socket(
83        mut reader: ReadHalf<Socket>,
84        address: Address,
85        usb_packet_filler: Arc<UsbPacketFiller<B>>,
86    ) {
87        let mut buf = [0; 4096];
88        loop {
89            log::trace!("reading from control socket");
90            let read = match reader.read(&mut buf).await {
91                Ok(0) => {
92                    if !address.is_zeros() {
93                        Self::send_close_packet(&address, &usb_packet_filler).await;
94                    }
95                    return;
96                }
97                Ok(read) => read,
98                Err(err) => {
99                    if address.is_zeros() {
100                        log::error!("Error reading usb socket: {err:?}");
101                    } else {
102                        Self::send_close_packet(&address, &usb_packet_filler).await;
103                    }
104                    return;
105                }
106            };
107            log::trace!("writing {read} bytes to vsock packet");
108            usb_packet_filler.write_vsock_data_all(&address, &buf[..read]).await;
109            log::trace!("wrote {read} bytes to vsock packet");
110        }
111    }
112
113    fn set_connection(&self, address: Address, state: VsockConnectionState) -> Result<(), Error> {
114        let mut connections = self.connections.lock().unwrap();
115        if !connections.contains_key(&address) {
116            connections.insert(address.clone(), VsockConnection { _address: address, state });
117            Ok(())
118        } else {
119            Err(Error::other(format!("connection on address {address:?} already set")))
120        }
121    }
122
123    /// Starts a connection attempt to the other end of the USB connection, and provides a socket
124    /// to read and write from. The function will complete when the other end has accepted or
125    /// rejected the connection, and the returned [`ConnectionState`] handle can be used to wait
126    /// for the connection to be closed.
127    pub async fn connect(&self, addr: Address, socket: Socket) -> Result<ConnectionState, Error> {
128        let (read_socket, write_socket) = socket.split();
129        let write_socket = Arc::new(Mutex::new(write_socket));
130        let (connected_tx, connected_rx) = oneshot::channel();
131
132        self.set_connection(
133            addr.clone(),
134            VsockConnectionState::ConnectingOutgoing(write_socket, read_socket, connected_tx),
135        )?;
136
137        let header = &mut Header::new(PacketType::Connect);
138        header.set_address(&addr);
139        self.packet_filler.write_vsock_packet(&Packet { header, payload: &[] }).await.unwrap();
140        connected_rx.await.map_err(|_| Error::other("Accept was never received for {addr:?}"))?
141    }
142
143    /// Sends a request for the other end to close the connection.
144    pub async fn close(&self, address: &Address) {
145        Self::send_close_packet(address, &self.packet_filler).await
146    }
147
148    /// Resets the named connection without going through a close request.
149    pub async fn reset(&self, address: &Address) -> Result<(), Error> {
150        let mut notify = None;
151        if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
152            if let VsockConnectionState::Connected { notify_closed, .. } = conn.state {
153                notify = Some(notify_closed);
154            }
155        } else {
156            return Err(Error::other(
157                "Client asked to reset connection {address:?} that did not exist",
158            ));
159        }
160
161        if let Some(mut notify) = notify {
162            notify.send(Err(ErrorKind::ConnectionReset.into())).await.ok();
163        }
164
165        let header = &mut Header::new(PacketType::Reset);
166        header.set_address(address);
167        self.packet_filler
168            .write_vsock_packet(&Packet { header, payload: &[] })
169            .await
170            .expect("Reset packet should never be too big");
171        Ok(())
172    }
173
174    /// Accepts a connection for which an outstanding connection request has been made, and
175    /// provides a socket to read and write data packets to and from. The returned [`ConnectionState`]
176    /// can be used to wait for the connection to be closed.
177    pub async fn accept(
178        &self,
179        request: ConnectionRequest,
180        socket: Socket,
181    ) -> Result<ConnectionState, Error> {
182        let address = request.address;
183        let notify_closed_rx;
184        if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
185            let VsockConnectionState::ConnectingIncoming = &conn.state else {
186                return Err(Error::other(format!(
187                    "Attempted to accept connection that was not waiting at {address:?}"
188                )));
189            };
190
191            let (read_socket, write_socket) = socket.split();
192            let writer = Arc::new(Mutex::new(write_socket));
193            let notify_closed = mpsc::channel(2);
194            notify_closed_rx = notify_closed.1;
195            let notify_closed = notify_closed.0;
196
197            let reader_task = Scope::new_with_name("connection-reader");
198            reader_task.spawn(Self::run_socket(read_socket, address, self.packet_filler.clone()));
199
200            conn.state = VsockConnectionState::Connected {
201                writer,
202                _reader_scope: reader_task,
203                notify_closed,
204            };
205        } else {
206            return Err(Error::other(format!(
207                "Attempting to accept connection that did not exist at {address:?}"
208            )));
209        }
210        let header = &mut Header::new(PacketType::Accept);
211        header.set_address(&address);
212        self.packet_filler.write_vsock_packet(&Packet { header, payload: &[] }).await.unwrap();
213        Ok(ConnectionState(notify_closed_rx))
214    }
215
216    /// Rejects a pending connection request from the other side.
217    pub async fn reject(&self, request: ConnectionRequest) -> Result<(), Error> {
218        let address = request.address;
219        match self.connections.lock().unwrap().entry(address.clone()) {
220            Entry::Occupied(entry) => {
221                let VsockConnectionState::ConnectingIncoming = &entry.get().state else {
222                    return Err(Error::other(format!(
223                        "Attempted to reject connection that was not waiting at {address:?}"
224                    )));
225                };
226                entry.remove();
227            }
228            Entry::Vacant(_) => {
229                return Err(Error::other(format!(
230                    "Attempted to reject connection that was not waiting at {address:?}"
231                )));
232            }
233        }
234
235        let header = &mut Header::new(PacketType::Reset);
236        header.set_address(&address);
237        self.packet_filler
238            .write_vsock_packet(&Packet { header, payload: &[] })
239            .await
240            .expect("accept packet should never be too large for packet buffer");
241        Ok(())
242    }
243
244    async fn handle_data_packet(&self, address: Address, payload: &[u8]) -> Result<(), Error> {
245        // all zero data packets go to the control channel
246        if address.is_zeros() {
247            let written = self.control_socket_writer.lock().await.write(payload).await?;
248            assert_eq!(written, payload.len());
249            Ok(())
250        } else {
251            let payload_socket;
252            if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
253                let VsockConnectionState::Connected { writer, .. } = &conn.state else {
254                    warn!(
255                        "Received data packet for connection in unexpected state for {address:?}"
256                    );
257                    return Ok(());
258                };
259                payload_socket = writer.clone();
260            } else {
261                warn!("Received data packet for connection that didn't exist at {address:?}");
262                return Ok(());
263            }
264            payload_socket.lock().await.write_all(payload).await.expect("BOOM do not submit");
265            Ok(())
266        }
267    }
268
269    async fn handle_accept_packet(&self, address: Address) -> Result<(), Error> {
270        if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
271            let state = std::mem::replace(&mut conn.state, VsockConnectionState::Invalid);
272            let VsockConnectionState::ConnectingOutgoing(writer, read_socket, connected_tx) = state
273            else {
274                warn!("Received accept packet for connection in unexpected state for {address:?}");
275                return Ok(());
276            };
277            let (notify_closed, notify_closed_rx) = mpsc::channel(2);
278            if connected_tx.send(Ok(ConnectionState(notify_closed_rx))).is_err() {
279                warn!("Accept packet received for {address:?} but connect caller stopped waiting for it");
280            }
281
282            let reader_task = Scope::new_with_name("connection-reader");
283            reader_task.spawn(Self::run_socket(read_socket, address, self.packet_filler.clone()));
284            conn.state = VsockConnectionState::Connected {
285                writer,
286                _reader_scope: reader_task,
287                notify_closed,
288            };
289        } else {
290            warn!("Got accept packet for connection that was not being made at {address:?}");
291            return Ok(());
292        }
293        Ok(())
294    }
295
296    async fn handle_connect_packet(&self, address: Address) -> Result<(), Error> {
297        trace!("received connect packet for {address:?}");
298        match self.connections.lock().unwrap().entry(address.clone()) {
299            Entry::Vacant(entry) => {
300                debug!("valid connect request for {address:?}");
301                entry.insert(VsockConnection {
302                    _address: address,
303                    state: VsockConnectionState::ConnectingIncoming,
304                });
305            }
306            Entry::Occupied(_) => {
307                warn!("Received connect packet for already existing connection for address {address:?}. Ignoring");
308                return Ok(());
309            }
310        }
311
312        trace!("sending incoming connection request to client for {address:?}");
313        let connection_request = ConnectionRequest { address };
314        self.incoming_requests_tx
315            .clone()
316            .send(connection_request)
317            .await
318            .inspect(|_| trace!("sent incoming request for {address:?}"))
319            .map_err(|_| Error::other("Failed to send connection request"))
320    }
321
322    async fn handle_finish_packet(&self, address: Address) -> Result<(), Error> {
323        trace!("received finish packet for {address:?}");
324        let mut notify;
325        if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
326            let VsockConnectionState::Connected { notify_closed, .. } = conn.state else {
327                warn!("Received finish (close) packet for {address:?} which was not in a connected state. Ignoring and dropping connection state.");
328                return Ok(());
329            };
330            notify = notify_closed;
331        } else {
332            warn!("Received finish (close) packet for connection that didn't exist on address {address:?}. Ignoring");
333            return Ok(());
334        }
335
336        notify.send(Ok(())).await.ok();
337
338        let header = &mut Header::new(PacketType::Reset);
339        header.set_address(&address);
340        self.packet_filler
341            .write_vsock_packet(&Packet { header, payload: &[] })
342            .await
343            .expect("accept packet should never be too large for packet buffer");
344        Ok(())
345    }
346
347    async fn handle_reset_packet(&self, address: Address) -> Result<(), Error> {
348        trace!("received reset packet for {address:?}");
349        let mut notify = None;
350        if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
351            if let VsockConnectionState::Connected { notify_closed, .. } = conn.state {
352                notify = Some(notify_closed);
353            } else {
354                debug!("Received reset packet for connection that wasn't in a connecting or disconnected state on address {address:?}.");
355            }
356        } else {
357            warn!("Received reset packet for connection that didn't exist on address {address:?}. Ignoring");
358        }
359
360        if let Some(mut notify) = notify {
361            notify.send(Ok(())).await.ok();
362        }
363        Ok(())
364    }
365
366    /// Dispatches the given vsock packet type and handles its effect on any outstanding connections
367    /// or the overall state of the connection.
368    pub async fn handle_vsock_packet(&self, packet: Packet<'_>) -> Result<(), Error> {
369        trace!("received vsock packet {header:?}", header = packet.header);
370        let payload_len = packet.header.payload_len.get() as usize;
371        let payload = &packet.payload[..payload_len];
372        let address = Address::from(packet.header);
373        match packet.header.packet_type {
374            PacketType::Sync => Err(Error::other("Received sync packet mid-stream")),
375            PacketType::Data => self.handle_data_packet(address, payload).await,
376            PacketType::Accept => self.handle_accept_packet(address).await,
377            PacketType::Connect => self.handle_connect_packet(address).await,
378            PacketType::Finish => self.handle_finish_packet(address).await,
379            PacketType::Reset => self.handle_reset_packet(address).await,
380        }
381    }
382
383    /// Provides a packet builder for the state machine to write packets to. Returns a future that
384    /// will be fulfilled when there is data available to send on the packet.
385    ///
386    /// # Panics
387    ///
388    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
389    pub async fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> UsbPacketBuilder<B> {
390        self.packet_filler.fill_usb_packet(builder).await
391    }
392}
393
394enum VsockConnectionState {
395    ConnectingOutgoing(
396        Arc<Mutex<WriteHalf<Socket>>>,
397        ReadHalf<Socket>,
398        oneshot::Sender<Result<ConnectionState, Error>>,
399    ),
400    ConnectingIncoming,
401    Connected {
402        writer: Arc<Mutex<WriteHalf<Socket>>>,
403        notify_closed: mpsc::Sender<Result<(), Error>>,
404        _reader_scope: Scope,
405    },
406    Invalid,
407}
408
409struct VsockConnection {
410    _address: Address,
411    state: VsockConnectionState,
412}
413
414/// A handle for the state of a connection established with either [`Connection::connect`] or
415/// [`Connection::accept`]. Use this to get notified when the connection has been closed without
416/// needing to hold on to the Socket end.
417pub struct ConnectionState(mpsc::Receiver<Result<(), Error>>);
418
419impl ConnectionState {
420    /// Wait for this connection to close. Returns Ok(()) if the connection was closed without error,
421    /// and an error if it closed because of an error.
422    pub async fn wait_for_close(mut self) -> Result<(), Error> {
423        self.0
424            .next()
425            .await
426            .ok_or_else(|| Error::other("Connection state's other end was dropped"))?
427    }
428}
429
430/// An outstanding connection request that needs to be either [`Connection::accept`]ed or
431/// [`Connection::reject`]ed.
432pub struct ConnectionRequest {
433    address: Address,
434}
435
436impl ConnectionRequest {
437    /// Creates a new connection request for the given address.
438    pub fn new(address: Address) -> Self {
439        Self { address }
440    }
441
442    /// The address this connection request is being made for.
443    pub fn address(&self) -> &Address {
444        &self.address
445    }
446}
447
448#[cfg(test)]
449mod test {
450    use std::sync::Arc;
451
452    use crate::VsockPacketIterator;
453
454    use super::*;
455
456    #[cfg(not(target_os = "fuchsia"))]
457    use fuchsia_async::emulated_handle::Socket as SyncSocket;
458    use fuchsia_async::Task;
459    use futures::StreamExt;
460    #[cfg(target_os = "fuchsia")]
461    use zx::Socket as SyncSocket;
462
463    async fn usb_echo_server(echo_connection: Arc<Connection<Vec<u8>>>) {
464        let mut builder = UsbPacketBuilder::new(vec![0; 128]);
465        loop {
466            println!("waiting for usb packet");
467            builder = echo_connection.fill_usb_packet(builder).await;
468            let packets = VsockPacketIterator::new(builder.take_usb_packet().unwrap());
469            println!("got usb packet, echoing it back to the other side");
470            let mut packet_count = 0;
471            for packet in packets {
472                let packet = packet.unwrap();
473                match packet.header.packet_type {
474                    PacketType::Connect => {
475                        // respond with an accept packet
476                        let mut reply_header = packet.header.clone();
477                        reply_header.packet_type = PacketType::Accept;
478                        echo_connection
479                            .handle_vsock_packet(Packet { header: &reply_header, payload: &[] })
480                            .await
481                            .unwrap();
482                    }
483                    PacketType::Accept => {
484                        // just ignore it
485                    }
486                    _ => echo_connection.handle_vsock_packet(packet).await.unwrap(),
487                }
488                packet_count += 1;
489            }
490            println!("handled {packet_count} packets");
491        }
492    }
493
494    #[fuchsia::test]
495    async fn data_over_control_socket() {
496        let (socket, other_socket) = SyncSocket::create_stream();
497        let (incoming_requests_tx, _incoming_requests) = mpsc::channel(5);
498        let mut socket = Socket::from_socket(socket);
499        let connection =
500            Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
501
502        let echo_task = Task::spawn(usb_echo_server(connection.clone()));
503
504        for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
505            println!("round tripping packet of size {size}");
506            socket.write_all(&vec![size; size as usize]).await.unwrap();
507            let mut buf = vec![0u8; size as usize];
508            socket.read_exact(&mut buf).await.unwrap();
509            assert_eq!(buf, vec![size; size as usize]);
510        }
511        echo_task.cancel().await;
512    }
513
514    #[fuchsia::test]
515    async fn data_over_normal_outgoing_socket() {
516        let (_control_socket, other_socket) = SyncSocket::create_stream();
517        let (incoming_requests_tx, _incoming_requests) = mpsc::channel(5);
518        let connection =
519            Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
520
521        let echo_task = Task::spawn(usb_echo_server(connection.clone()));
522
523        let (socket, other_socket) = SyncSocket::create_stream();
524        let mut socket = Socket::from_socket(socket);
525        connection
526            .connect(
527                Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 },
528                Socket::from_socket(other_socket),
529            )
530            .await
531            .unwrap();
532
533        for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
534            println!("round tripping packet of size {size}");
535            socket.write_all(&vec![size; size as usize]).await.unwrap();
536            let mut buf = vec![0u8; size as usize];
537            socket.read_exact(&mut buf).await.unwrap();
538            assert_eq!(buf, vec![size; size as usize]);
539        }
540        echo_task.cancel().await;
541    }
542
543    #[fuchsia::test]
544    async fn data_over_normal_incoming_socket() {
545        let (_control_socket, other_socket) = SyncSocket::create_stream();
546        let (incoming_requests_tx, mut incoming_requests) = mpsc::channel(5);
547        let connection =
548            Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
549
550        let echo_task = Task::spawn(usb_echo_server(connection.clone()));
551
552        let header = &mut Header::new(PacketType::Connect);
553        header.set_address(&Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 });
554        connection.handle_vsock_packet(Packet { header, payload: &[] }).await.unwrap();
555
556        let request = incoming_requests.next().await.unwrap();
557        assert_eq!(
558            request.address,
559            Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 }
560        );
561
562        let (socket, other_socket) = SyncSocket::create_stream();
563        let mut socket = Socket::from_socket(socket);
564        connection.accept(request, Socket::from_socket(other_socket)).await.unwrap();
565
566        for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
567            println!("round tripping packet of size {size}");
568            socket.write_all(&vec![size; size as usize]).await.unwrap();
569            let mut buf = vec![0u8; size as usize];
570            socket.read_exact(&mut buf).await.unwrap();
571            assert_eq!(buf, vec![size; size as usize]);
572        }
573        echo_task.cancel().await;
574    }
575
576    async fn copy_connection(from: &Connection<Vec<u8>>, to: &Connection<Vec<u8>>) {
577        let mut builder = UsbPacketBuilder::new(vec![0; 1024]);
578        loop {
579            builder = from.fill_usb_packet(builder).await;
580            let packets = VsockPacketIterator::new(builder.take_usb_packet().unwrap());
581            for packet in packets {
582                println!("forwarding vsock packet");
583                to.handle_vsock_packet(packet.unwrap()).await.unwrap();
584            }
585        }
586    }
587
588    pub(crate) trait EndToEndTestFn<R>:
589        AsyncFnOnce(Arc<Connection<Vec<u8>>>, mpsc::Receiver<ConnectionRequest>) -> R
590    {
591    }
592    impl<T, R> EndToEndTestFn<R> for T where
593        T: AsyncFnOnce(Arc<Connection<Vec<u8>>>, mpsc::Receiver<ConnectionRequest>) -> R
594    {
595    }
596
597    pub(crate) async fn end_to_end_test<R1, R2>(
598        left_side: impl EndToEndTestFn<R1>,
599        right_side: impl EndToEndTestFn<R2>,
600    ) -> (R1, R2) {
601        type Connection = crate::Connection<Vec<u8>>;
602        let (_control_socket1, other_socket1) = SyncSocket::create_stream();
603        let (_control_socket2, other_socket2) = SyncSocket::create_stream();
604        let (incoming_requests_tx1, incoming_requests1) = mpsc::channel(5);
605        let (incoming_requests_tx2, incoming_requests2) = mpsc::channel(5);
606
607        let connection1 =
608            Arc::new(Connection::new(Socket::from_socket(other_socket1), incoming_requests_tx1));
609        let connection2 =
610            Arc::new(Connection::new(Socket::from_socket(other_socket2), incoming_requests_tx2));
611
612        let conn1 = connection1.clone();
613        let conn2 = connection2.clone();
614        let passthrough_task = Task::spawn(async move {
615            futures::join!(copy_connection(&conn1, &conn2), copy_connection(&conn2, &conn1),);
616            println!("passthrough task loop ended");
617        });
618
619        let res = futures::join!(
620            left_side(connection1, incoming_requests1),
621            right_side(connection2, incoming_requests2)
622        );
623        passthrough_task.cancel().await;
624        res
625    }
626
627    #[fuchsia::test]
628    async fn data_over_end_to_end() {
629        end_to_end_test(
630            async |conn, _incoming| {
631                println!("sending request on connection 1");
632                let (socket, other_socket) = SyncSocket::create_stream();
633                let mut socket = Socket::from_socket(socket);
634                let state = conn
635                    .connect(
636                        Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 },
637                        Socket::from_socket(other_socket),
638                    )
639                    .await
640                    .unwrap();
641
642                for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
643                    println!("round tripping packet of size {size}");
644                    socket.write_all(&vec![size; size as usize]).await.unwrap();
645                }
646                drop(socket);
647                state.wait_for_close().await.unwrap();
648            },
649            async |conn, mut incoming| {
650                println!("accepting request on connection 2");
651                let request = incoming.next().await.unwrap();
652                assert_eq!(
653                    request.address,
654                    Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 }
655                );
656
657                let (socket, other_socket) = SyncSocket::create_stream();
658                let mut socket = Socket::from_socket(socket);
659                let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
660
661                println!("accepted request on connection 2");
662                for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
663                    let mut buf = vec![0u8; size as usize];
664                    socket.read_exact(&mut buf).await.unwrap();
665                    assert_eq!(buf, vec![size; size as usize]);
666                }
667                assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
668                state.wait_for_close().await.unwrap();
669            },
670        )
671        .await;
672    }
673
674    #[fuchsia::test]
675    async fn normal_close_end_to_end() {
676        let addr = Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 };
677        end_to_end_test(
678            async |conn, _incoming| {
679                let (socket, other_socket) = SyncSocket::create_stream();
680                let mut socket = Socket::from_socket(socket);
681                let state =
682                    conn.connect(addr.clone(), Socket::from_socket(other_socket)).await.unwrap();
683                conn.close(&addr).await;
684                assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
685                state.wait_for_close().await.unwrap();
686            },
687            async |conn, mut incoming| {
688                println!("accepting request on connection 2");
689                let request = incoming.next().await.unwrap();
690                assert_eq!(request.address, addr.clone(),);
691
692                let (socket, other_socket) = SyncSocket::create_stream();
693                let mut socket = Socket::from_socket(socket);
694                let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
695                assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
696                state.wait_for_close().await.unwrap();
697            },
698        )
699        .await;
700    }
701
702    #[fuchsia::test]
703    async fn reset_end_to_end() {
704        let addr = Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 };
705        end_to_end_test(
706            async |conn, _incoming| {
707                let (socket, other_socket) = SyncSocket::create_stream();
708                let mut socket = Socket::from_socket(socket);
709                let state =
710                    conn.connect(addr.clone(), Socket::from_socket(other_socket)).await.unwrap();
711                conn.reset(&addr).await.unwrap();
712                assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
713                state.wait_for_close().await.expect_err("expected reset");
714            },
715            async |conn, mut incoming| {
716                println!("accepting request on connection 2");
717                let request = incoming.next().await.unwrap();
718                assert_eq!(request.address, addr.clone(),);
719
720                let (socket, other_socket) = SyncSocket::create_stream();
721                let mut socket = Socket::from_socket(socket);
722                let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
723                assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
724                state.wait_for_close().await.unwrap();
725            },
726        )
727        .await;
728    }
729}