1use futures::channel::{mpsc, oneshot};
6use futures::lock::Mutex;
7use log::{debug, trace, warn};
8use std::collections::hash_map::Entry;
9use std::collections::HashMap;
10use std::io::{Error, ErrorKind};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use fuchsia_async::{Scope, Socket};
15use futures::io::{ReadHalf, WriteHalf};
16use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
17
18use crate::{Address, Header, Packet, PacketType, UsbPacketBuilder, UsbPacketFiller};
19
20pub trait PacketBuffer: DerefMut<Target = [u8]> + Send + Unpin + 'static {}
22impl<T> PacketBuffer for T where T: DerefMut<Target = [u8]> + Send + Unpin + 'static {}
23
24pub struct Connection<B> {
37 control_socket_writer: Mutex<WriteHalf<Socket>>,
38 packet_filler: Arc<UsbPacketFiller<B>>,
39 connections: std::sync::Mutex<HashMap<Address, VsockConnection>>,
40 incoming_requests_tx: mpsc::Sender<ConnectionRequest>,
41 _task_scope: Scope,
42}
43
44impl<B: PacketBuffer> Connection<B> {
45 pub fn new(
51 control_socket: Socket,
52 incoming_requests_tx: mpsc::Sender<ConnectionRequest>,
53 ) -> Self {
54 let (control_socket_reader, control_socket_writer) = control_socket.split();
55 let control_socket_writer = Mutex::new(control_socket_writer);
56 let packet_filler = Arc::new(UsbPacketFiller::default());
57 let connections = Default::default();
58 let task_scope = Scope::new_with_name("vsock_usb");
59 task_scope.spawn(Self::run_socket(
60 control_socket_reader,
61 Address::default(),
62 packet_filler.clone(),
63 ));
64 Self {
65 control_socket_writer,
66 packet_filler,
67 connections,
68 incoming_requests_tx,
69 _task_scope: task_scope,
70 }
71 }
72
73 async fn send_close_packet(address: &Address, usb_packet_filler: &Arc<UsbPacketFiller<B>>) {
74 let header = &mut Header::new(PacketType::Finish);
75 header.set_address(address);
76 usb_packet_filler
77 .write_vsock_packet(&Packet { header, payload: &[] })
78 .await
79 .expect("Finish packet should never be too big");
80 }
81
82 async fn run_socket(
83 mut reader: ReadHalf<Socket>,
84 address: Address,
85 usb_packet_filler: Arc<UsbPacketFiller<B>>,
86 ) {
87 let mut buf = [0; 4096];
88 loop {
89 log::trace!("reading from control socket");
90 let read = match reader.read(&mut buf).await {
91 Ok(0) => {
92 if !address.is_zeros() {
93 Self::send_close_packet(&address, &usb_packet_filler).await;
94 }
95 return;
96 }
97 Ok(read) => read,
98 Err(err) => {
99 if address.is_zeros() {
100 log::error!("Error reading usb socket: {err:?}");
101 } else {
102 Self::send_close_packet(&address, &usb_packet_filler).await;
103 }
104 return;
105 }
106 };
107 log::trace!("writing {read} bytes to vsock packet");
108 usb_packet_filler.write_vsock_data_all(&address, &buf[..read]).await;
109 log::trace!("wrote {read} bytes to vsock packet");
110 }
111 }
112
113 fn set_connection(&self, address: Address, state: VsockConnectionState) -> Result<(), Error> {
114 let mut connections = self.connections.lock().unwrap();
115 if !connections.contains_key(&address) {
116 connections.insert(address.clone(), VsockConnection { _address: address, state });
117 Ok(())
118 } else {
119 Err(Error::other(format!("connection on address {address:?} already set")))
120 }
121 }
122
123 pub async fn connect(&self, addr: Address, socket: Socket) -> Result<ConnectionState, Error> {
128 let (read_socket, write_socket) = socket.split();
129 let write_socket = Arc::new(Mutex::new(write_socket));
130 let (connected_tx, connected_rx) = oneshot::channel();
131
132 self.set_connection(
133 addr.clone(),
134 VsockConnectionState::ConnectingOutgoing(write_socket, read_socket, connected_tx),
135 )?;
136
137 let header = &mut Header::new(PacketType::Connect);
138 header.set_address(&addr);
139 self.packet_filler.write_vsock_packet(&Packet { header, payload: &[] }).await.unwrap();
140 connected_rx.await.map_err(|_| Error::other("Accept was never received for {addr:?}"))?
141 }
142
143 pub async fn close(&self, address: &Address) {
145 Self::send_close_packet(address, &self.packet_filler).await
146 }
147
148 pub async fn reset(&self, address: &Address) -> Result<(), Error> {
150 let mut notify = None;
151 if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
152 if let VsockConnectionState::Connected { notify_closed, .. } = conn.state {
153 notify = Some(notify_closed);
154 }
155 } else {
156 return Err(Error::other(
157 "Client asked to reset connection {address:?} that did not exist",
158 ));
159 }
160
161 if let Some(mut notify) = notify {
162 notify.send(Err(ErrorKind::ConnectionReset.into())).await.ok();
163 }
164
165 let header = &mut Header::new(PacketType::Reset);
166 header.set_address(address);
167 self.packet_filler
168 .write_vsock_packet(&Packet { header, payload: &[] })
169 .await
170 .expect("Reset packet should never be too big");
171 Ok(())
172 }
173
174 pub async fn accept(
178 &self,
179 request: ConnectionRequest,
180 socket: Socket,
181 ) -> Result<ConnectionState, Error> {
182 let address = request.address;
183 let notify_closed_rx;
184 if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
185 let VsockConnectionState::ConnectingIncoming = &conn.state else {
186 return Err(Error::other(format!(
187 "Attempted to accept connection that was not waiting at {address:?}"
188 )));
189 };
190
191 let (read_socket, write_socket) = socket.split();
192 let writer = Arc::new(Mutex::new(write_socket));
193 let notify_closed = mpsc::channel(2);
194 notify_closed_rx = notify_closed.1;
195 let notify_closed = notify_closed.0;
196
197 let reader_task = Scope::new_with_name("connection-reader");
198 reader_task.spawn(Self::run_socket(read_socket, address, self.packet_filler.clone()));
199
200 conn.state = VsockConnectionState::Connected {
201 writer,
202 _reader_scope: reader_task,
203 notify_closed,
204 };
205 } else {
206 return Err(Error::other(format!(
207 "Attempting to accept connection that did not exist at {address:?}"
208 )));
209 }
210 let header = &mut Header::new(PacketType::Accept);
211 header.set_address(&address);
212 self.packet_filler.write_vsock_packet(&Packet { header, payload: &[] }).await.unwrap();
213 Ok(ConnectionState(notify_closed_rx))
214 }
215
216 pub async fn reject(&self, request: ConnectionRequest) -> Result<(), Error> {
218 let address = request.address;
219 match self.connections.lock().unwrap().entry(address.clone()) {
220 Entry::Occupied(entry) => {
221 let VsockConnectionState::ConnectingIncoming = &entry.get().state else {
222 return Err(Error::other(format!(
223 "Attempted to reject connection that was not waiting at {address:?}"
224 )));
225 };
226 entry.remove();
227 }
228 Entry::Vacant(_) => {
229 return Err(Error::other(format!(
230 "Attempted to reject connection that was not waiting at {address:?}"
231 )));
232 }
233 }
234
235 let header = &mut Header::new(PacketType::Reset);
236 header.set_address(&address);
237 self.packet_filler
238 .write_vsock_packet(&Packet { header, payload: &[] })
239 .await
240 .expect("accept packet should never be too large for packet buffer");
241 Ok(())
242 }
243
244 async fn handle_data_packet(&self, address: Address, payload: &[u8]) -> Result<(), Error> {
245 if address.is_zeros() {
247 let written = self.control_socket_writer.lock().await.write(payload).await?;
248 assert_eq!(written, payload.len());
249 Ok(())
250 } else {
251 let payload_socket;
252 if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
253 let VsockConnectionState::Connected { writer, .. } = &conn.state else {
254 warn!(
255 "Received data packet for connection in unexpected state for {address:?}"
256 );
257 return Ok(());
258 };
259 payload_socket = writer.clone();
260 } else {
261 warn!("Received data packet for connection that didn't exist at {address:?}");
262 return Ok(());
263 }
264 payload_socket.lock().await.write_all(payload).await.expect("BOOM do not submit");
265 Ok(())
266 }
267 }
268
269 async fn handle_accept_packet(&self, address: Address) -> Result<(), Error> {
270 if let Some(conn) = self.connections.lock().unwrap().get_mut(&address) {
271 let state = std::mem::replace(&mut conn.state, VsockConnectionState::Invalid);
272 let VsockConnectionState::ConnectingOutgoing(writer, read_socket, connected_tx) = state
273 else {
274 warn!("Received accept packet for connection in unexpected state for {address:?}");
275 return Ok(());
276 };
277 let (notify_closed, notify_closed_rx) = mpsc::channel(2);
278 if connected_tx.send(Ok(ConnectionState(notify_closed_rx))).is_err() {
279 warn!("Accept packet received for {address:?} but connect caller stopped waiting for it");
280 }
281
282 let reader_task = Scope::new_with_name("connection-reader");
283 reader_task.spawn(Self::run_socket(read_socket, address, self.packet_filler.clone()));
284 conn.state = VsockConnectionState::Connected {
285 writer,
286 _reader_scope: reader_task,
287 notify_closed,
288 };
289 } else {
290 warn!("Got accept packet for connection that was not being made at {address:?}");
291 return Ok(());
292 }
293 Ok(())
294 }
295
296 async fn handle_connect_packet(&self, address: Address) -> Result<(), Error> {
297 trace!("received connect packet for {address:?}");
298 match self.connections.lock().unwrap().entry(address.clone()) {
299 Entry::Vacant(entry) => {
300 debug!("valid connect request for {address:?}");
301 entry.insert(VsockConnection {
302 _address: address,
303 state: VsockConnectionState::ConnectingIncoming,
304 });
305 }
306 Entry::Occupied(_) => {
307 warn!("Received connect packet for already existing connection for address {address:?}. Ignoring");
308 return Ok(());
309 }
310 }
311
312 trace!("sending incoming connection request to client for {address:?}");
313 let connection_request = ConnectionRequest { address };
314 self.incoming_requests_tx
315 .clone()
316 .send(connection_request)
317 .await
318 .inspect(|_| trace!("sent incoming request for {address:?}"))
319 .map_err(|_| Error::other("Failed to send connection request"))
320 }
321
322 async fn handle_finish_packet(&self, address: Address) -> Result<(), Error> {
323 trace!("received finish packet for {address:?}");
324 let mut notify;
325 if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
326 let VsockConnectionState::Connected { notify_closed, .. } = conn.state else {
327 warn!("Received finish (close) packet for {address:?} which was not in a connected state. Ignoring and dropping connection state.");
328 return Ok(());
329 };
330 notify = notify_closed;
331 } else {
332 warn!("Received finish (close) packet for connection that didn't exist on address {address:?}. Ignoring");
333 return Ok(());
334 }
335
336 notify.send(Ok(())).await.ok();
337
338 let header = &mut Header::new(PacketType::Reset);
339 header.set_address(&address);
340 self.packet_filler
341 .write_vsock_packet(&Packet { header, payload: &[] })
342 .await
343 .expect("accept packet should never be too large for packet buffer");
344 Ok(())
345 }
346
347 async fn handle_reset_packet(&self, address: Address) -> Result<(), Error> {
348 trace!("received reset packet for {address:?}");
349 let mut notify = None;
350 if let Some(conn) = self.connections.lock().unwrap().remove(&address) {
351 if let VsockConnectionState::Connected { notify_closed, .. } = conn.state {
352 notify = Some(notify_closed);
353 } else {
354 debug!("Received reset packet for connection that wasn't in a connecting or disconnected state on address {address:?}.");
355 }
356 } else {
357 warn!("Received reset packet for connection that didn't exist on address {address:?}. Ignoring");
358 }
359
360 if let Some(mut notify) = notify {
361 notify.send(Ok(())).await.ok();
362 }
363 Ok(())
364 }
365
366 pub async fn handle_vsock_packet(&self, packet: Packet<'_>) -> Result<(), Error> {
369 trace!("received vsock packet {header:?}", header = packet.header);
370 let payload_len = packet.header.payload_len.get() as usize;
371 let payload = &packet.payload[..payload_len];
372 let address = Address::from(packet.header);
373 match packet.header.packet_type {
374 PacketType::Sync => Err(Error::other("Received sync packet mid-stream")),
375 PacketType::Data => self.handle_data_packet(address, payload).await,
376 PacketType::Accept => self.handle_accept_packet(address).await,
377 PacketType::Connect => self.handle_connect_packet(address).await,
378 PacketType::Finish => self.handle_finish_packet(address).await,
379 PacketType::Reset => self.handle_reset_packet(address).await,
380 }
381 }
382
383 pub async fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> UsbPacketBuilder<B> {
390 self.packet_filler.fill_usb_packet(builder).await
391 }
392}
393
394enum VsockConnectionState {
395 ConnectingOutgoing(
396 Arc<Mutex<WriteHalf<Socket>>>,
397 ReadHalf<Socket>,
398 oneshot::Sender<Result<ConnectionState, Error>>,
399 ),
400 ConnectingIncoming,
401 Connected {
402 writer: Arc<Mutex<WriteHalf<Socket>>>,
403 notify_closed: mpsc::Sender<Result<(), Error>>,
404 _reader_scope: Scope,
405 },
406 Invalid,
407}
408
409struct VsockConnection {
410 _address: Address,
411 state: VsockConnectionState,
412}
413
414pub struct ConnectionState(mpsc::Receiver<Result<(), Error>>);
418
419impl ConnectionState {
420 pub async fn wait_for_close(mut self) -> Result<(), Error> {
423 self.0
424 .next()
425 .await
426 .ok_or_else(|| Error::other("Connection state's other end was dropped"))?
427 }
428}
429
430pub struct ConnectionRequest {
433 address: Address,
434}
435
436impl ConnectionRequest {
437 pub fn new(address: Address) -> Self {
439 Self { address }
440 }
441
442 pub fn address(&self) -> &Address {
444 &self.address
445 }
446}
447
448#[cfg(test)]
449mod test {
450 use std::sync::Arc;
451
452 use crate::VsockPacketIterator;
453
454 use super::*;
455
456 #[cfg(not(target_os = "fuchsia"))]
457 use fuchsia_async::emulated_handle::Socket as SyncSocket;
458 use fuchsia_async::Task;
459 use futures::StreamExt;
460 #[cfg(target_os = "fuchsia")]
461 use zx::Socket as SyncSocket;
462
463 async fn usb_echo_server(echo_connection: Arc<Connection<Vec<u8>>>) {
464 let mut builder = UsbPacketBuilder::new(vec![0; 128]);
465 loop {
466 println!("waiting for usb packet");
467 builder = echo_connection.fill_usb_packet(builder).await;
468 let packets = VsockPacketIterator::new(builder.take_usb_packet().unwrap());
469 println!("got usb packet, echoing it back to the other side");
470 let mut packet_count = 0;
471 for packet in packets {
472 let packet = packet.unwrap();
473 match packet.header.packet_type {
474 PacketType::Connect => {
475 let mut reply_header = packet.header.clone();
477 reply_header.packet_type = PacketType::Accept;
478 echo_connection
479 .handle_vsock_packet(Packet { header: &reply_header, payload: &[] })
480 .await
481 .unwrap();
482 }
483 PacketType::Accept => {
484 }
486 _ => echo_connection.handle_vsock_packet(packet).await.unwrap(),
487 }
488 packet_count += 1;
489 }
490 println!("handled {packet_count} packets");
491 }
492 }
493
494 #[fuchsia::test]
495 async fn data_over_control_socket() {
496 let (socket, other_socket) = SyncSocket::create_stream();
497 let (incoming_requests_tx, _incoming_requests) = mpsc::channel(5);
498 let mut socket = Socket::from_socket(socket);
499 let connection =
500 Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
501
502 let echo_task = Task::spawn(usb_echo_server(connection.clone()));
503
504 for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
505 println!("round tripping packet of size {size}");
506 socket.write_all(&vec![size; size as usize]).await.unwrap();
507 let mut buf = vec![0u8; size as usize];
508 socket.read_exact(&mut buf).await.unwrap();
509 assert_eq!(buf, vec![size; size as usize]);
510 }
511 echo_task.cancel().await;
512 }
513
514 #[fuchsia::test]
515 async fn data_over_normal_outgoing_socket() {
516 let (_control_socket, other_socket) = SyncSocket::create_stream();
517 let (incoming_requests_tx, _incoming_requests) = mpsc::channel(5);
518 let connection =
519 Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
520
521 let echo_task = Task::spawn(usb_echo_server(connection.clone()));
522
523 let (socket, other_socket) = SyncSocket::create_stream();
524 let mut socket = Socket::from_socket(socket);
525 connection
526 .connect(
527 Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 },
528 Socket::from_socket(other_socket),
529 )
530 .await
531 .unwrap();
532
533 for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
534 println!("round tripping packet of size {size}");
535 socket.write_all(&vec![size; size as usize]).await.unwrap();
536 let mut buf = vec![0u8; size as usize];
537 socket.read_exact(&mut buf).await.unwrap();
538 assert_eq!(buf, vec![size; size as usize]);
539 }
540 echo_task.cancel().await;
541 }
542
543 #[fuchsia::test]
544 async fn data_over_normal_incoming_socket() {
545 let (_control_socket, other_socket) = SyncSocket::create_stream();
546 let (incoming_requests_tx, mut incoming_requests) = mpsc::channel(5);
547 let connection =
548 Arc::new(Connection::new(Socket::from_socket(other_socket), incoming_requests_tx));
549
550 let echo_task = Task::spawn(usb_echo_server(connection.clone()));
551
552 let header = &mut Header::new(PacketType::Connect);
553 header.set_address(&Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 });
554 connection.handle_vsock_packet(Packet { header, payload: &[] }).await.unwrap();
555
556 let request = incoming_requests.next().await.unwrap();
557 assert_eq!(
558 request.address,
559 Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 }
560 );
561
562 let (socket, other_socket) = SyncSocket::create_stream();
563 let mut socket = Socket::from_socket(socket);
564 connection.accept(request, Socket::from_socket(other_socket)).await.unwrap();
565
566 for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
567 println!("round tripping packet of size {size}");
568 socket.write_all(&vec![size; size as usize]).await.unwrap();
569 let mut buf = vec![0u8; size as usize];
570 socket.read_exact(&mut buf).await.unwrap();
571 assert_eq!(buf, vec![size; size as usize]);
572 }
573 echo_task.cancel().await;
574 }
575
576 async fn copy_connection(from: &Connection<Vec<u8>>, to: &Connection<Vec<u8>>) {
577 let mut builder = UsbPacketBuilder::new(vec![0; 1024]);
578 loop {
579 builder = from.fill_usb_packet(builder).await;
580 let packets = VsockPacketIterator::new(builder.take_usb_packet().unwrap());
581 for packet in packets {
582 println!("forwarding vsock packet");
583 to.handle_vsock_packet(packet.unwrap()).await.unwrap();
584 }
585 }
586 }
587
588 pub(crate) trait EndToEndTestFn<R>:
589 AsyncFnOnce(Arc<Connection<Vec<u8>>>, mpsc::Receiver<ConnectionRequest>) -> R
590 {
591 }
592 impl<T, R> EndToEndTestFn<R> for T where
593 T: AsyncFnOnce(Arc<Connection<Vec<u8>>>, mpsc::Receiver<ConnectionRequest>) -> R
594 {
595 }
596
597 pub(crate) async fn end_to_end_test<R1, R2>(
598 left_side: impl EndToEndTestFn<R1>,
599 right_side: impl EndToEndTestFn<R2>,
600 ) -> (R1, R2) {
601 type Connection = crate::Connection<Vec<u8>>;
602 let (_control_socket1, other_socket1) = SyncSocket::create_stream();
603 let (_control_socket2, other_socket2) = SyncSocket::create_stream();
604 let (incoming_requests_tx1, incoming_requests1) = mpsc::channel(5);
605 let (incoming_requests_tx2, incoming_requests2) = mpsc::channel(5);
606
607 let connection1 =
608 Arc::new(Connection::new(Socket::from_socket(other_socket1), incoming_requests_tx1));
609 let connection2 =
610 Arc::new(Connection::new(Socket::from_socket(other_socket2), incoming_requests_tx2));
611
612 let conn1 = connection1.clone();
613 let conn2 = connection2.clone();
614 let passthrough_task = Task::spawn(async move {
615 futures::join!(copy_connection(&conn1, &conn2), copy_connection(&conn2, &conn1),);
616 println!("passthrough task loop ended");
617 });
618
619 let res = futures::join!(
620 left_side(connection1, incoming_requests1),
621 right_side(connection2, incoming_requests2)
622 );
623 passthrough_task.cancel().await;
624 res
625 }
626
627 #[fuchsia::test]
628 async fn data_over_end_to_end() {
629 end_to_end_test(
630 async |conn, _incoming| {
631 println!("sending request on connection 1");
632 let (socket, other_socket) = SyncSocket::create_stream();
633 let mut socket = Socket::from_socket(socket);
634 let state = conn
635 .connect(
636 Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 },
637 Socket::from_socket(other_socket),
638 )
639 .await
640 .unwrap();
641
642 for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
643 println!("round tripping packet of size {size}");
644 socket.write_all(&vec![size; size as usize]).await.unwrap();
645 }
646 drop(socket);
647 state.wait_for_close().await.unwrap();
648 },
649 async |conn, mut incoming| {
650 println!("accepting request on connection 2");
651 let request = incoming.next().await.unwrap();
652 assert_eq!(
653 request.address,
654 Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 }
655 );
656
657 let (socket, other_socket) = SyncSocket::create_stream();
658 let mut socket = Socket::from_socket(socket);
659 let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
660
661 println!("accepted request on connection 2");
662 for size in [1u8, 2, 8, 16, 32, 64, 128, 255] {
663 let mut buf = vec![0u8; size as usize];
664 socket.read_exact(&mut buf).await.unwrap();
665 assert_eq!(buf, vec![size; size as usize]);
666 }
667 assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
668 state.wait_for_close().await.unwrap();
669 },
670 )
671 .await;
672 }
673
674 #[fuchsia::test]
675 async fn normal_close_end_to_end() {
676 let addr = Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 };
677 end_to_end_test(
678 async |conn, _incoming| {
679 let (socket, other_socket) = SyncSocket::create_stream();
680 let mut socket = Socket::from_socket(socket);
681 let state =
682 conn.connect(addr.clone(), Socket::from_socket(other_socket)).await.unwrap();
683 conn.close(&addr).await;
684 assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
685 state.wait_for_close().await.unwrap();
686 },
687 async |conn, mut incoming| {
688 println!("accepting request on connection 2");
689 let request = incoming.next().await.unwrap();
690 assert_eq!(request.address, addr.clone(),);
691
692 let (socket, other_socket) = SyncSocket::create_stream();
693 let mut socket = Socket::from_socket(socket);
694 let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
695 assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
696 state.wait_for_close().await.unwrap();
697 },
698 )
699 .await;
700 }
701
702 #[fuchsia::test]
703 async fn reset_end_to_end() {
704 let addr = Address { device_cid: 1, host_cid: 2, device_port: 3, host_port: 4 };
705 end_to_end_test(
706 async |conn, _incoming| {
707 let (socket, other_socket) = SyncSocket::create_stream();
708 let mut socket = Socket::from_socket(socket);
709 let state =
710 conn.connect(addr.clone(), Socket::from_socket(other_socket)).await.unwrap();
711 conn.reset(&addr).await.unwrap();
712 assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
713 state.wait_for_close().await.expect_err("expected reset");
714 },
715 async |conn, mut incoming| {
716 println!("accepting request on connection 2");
717 let request = incoming.next().await.unwrap();
718 assert_eq!(request.address, addr.clone(),);
719
720 let (socket, other_socket) = SyncSocket::create_stream();
721 let mut socket = Socket::from_socket(socket);
722 let state = conn.accept(request, Socket::from_socket(other_socket)).await.unwrap();
723 assert_eq!(socket.read(&mut [0u8; 1]).await.unwrap(), 0);
724 state.wait_for_close().await.unwrap();
725 },
726 )
727 .await;
728 }
729}