1use crate::error::{Error, Result};
6use crate::{Node, Quality, stream};
7
8use fuchsia_sync::Mutex as SyncMutex;
9use futures::StreamExt;
10use futures::channel::mpsc::{Receiver, Sender, UnboundedSender, channel};
11use futures::channel::oneshot;
12use futures::future::Either;
13use futures::prelude::*;
14use std::collections::HashMap;
15use std::pin::pin;
16use std::sync::Arc;
17
18enum StreamStatus {
20 Open(stream::Writer),
22 ReadClosed,
25 Closed,
27}
28
29pub async fn multi_stream(
52 reader: stream::Reader,
53 writer: stream::Writer,
54 is_server: bool,
55 streams_out: Sender<(stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>,
56 streams_in: Receiver<(stream::Reader, stream::Writer)>,
57 stream_errors_out: UnboundedSender<Error>,
58 remote_name: String,
59) -> Result<()> {
60 let (new_readers_sender, new_readers) = channel(1);
61 let (stream_errors_sender, stream_errors) = channel(1);
62 let first_stream_id = if is_server { 1 } else { 0 };
63 let mut stream_ids = (first_stream_id..).step_by(2);
64 let writer = Arc::new(SyncMutex::new(writer));
65 let writer_for_reader = Arc::clone(&writer);
66
67 let handle_read = async move {
68 let writer = writer_for_reader;
69 let streams = SyncMutex::new(HashMap::<u32, StreamStatus>::new());
70
71 let read_result_stream = futures::stream::unfold((), |_| async {
79 let got = reader
80 .read(6, |buf| {
81 let mut streams = streams.lock();
82 let (size, new_stream) =
83 handle_one_chunk(&mut *streams, is_server, buf, &remote_name)?;
84
85 Ok((
86 new_stream.map(|(id, first_chunk_data)| {
87 let (reader, remote_writer) = stream::stream();
88 remote_writer
89 .write(first_chunk_data.len(), |out| {
90 out[..first_chunk_data.len()]
91 .copy_from_slice(&first_chunk_data);
92 Ok(first_chunk_data.len())
93 })
94 .expect("We just created this stream!");
95 streams.insert(id, StreamStatus::Open(remote_writer));
96 (id, reader)
97 }),
98 size,
99 ))
100 })
101 .await;
102
103 let got = match got {
104 Ok(Some((id, reader))) => {
105 let (remote_reader, writer) = stream::stream();
110
111 if new_readers_sender.clone().send((id, remote_reader)).await.is_err() {
112 Err(Error::ConnectionClosed(Some(
113 "New stream reader handler disappeared".to_owned(),
114 )))
115 } else {
116 let (sender, receiver) = oneshot::channel();
117 if stream_errors_sender.clone().send(receiver).await.is_ok() {
118 streams_out.clone().send((reader, writer, sender)).await.map_err(|_| {
119 Error::ConnectionClosed(Some(
120 "New stream handler disappeared".to_owned(),
121 ))
122 })
123 } else {
124 Err(Error::ConnectionClosed(Some(
125 "Error reporting channel closed".to_owned(),
126 )))
127 }
128 }
129 }
130 Ok(None) => Ok(()),
131 Err(e) => Err(e),
132 };
133
134 Some((got, ()))
135 });
136
137 let stream_errors = stream_errors
141 .map(move |x| {
142 let stream_errors_out = stream_errors_out.clone();
143 async move {
144 if let Err(x) = x.await.unwrap_or_else(|_| {
145 Err(Error::ConnectionClosed(Some(
146 "Stream handler hung up without returning a status".to_owned(),
147 )))
148 }) {
149 let _ = stream_errors_out.unbounded_send(x);
150 }
151 }
152 })
153 .buffer_unordered(usize::MAX)
156 .filter_map(|()| futures::future::ready(None));
158
159 let read_result_stream = futures::stream::select(stream_errors, read_result_stream);
160
161 let read_result_stream = read_result_stream.map(Either::Left);
165
166 let streams_in = streams_in.chain(futures::stream::pending()).map(Either::Right);
170
171 let events = futures::stream::select(read_result_stream, streams_in);
172 let mut events = pin!(events);
173
174 let mut ret = Ok(());
175 while let Some(event) = events.next().await {
176 match event {
177 Either::Left(Ok(())) => (),
178 Either::Left(other) => {
179 ret = other;
180 break;
181 }
182 Either::Right((reader, writer)) => {
183 let id = stream_ids.next().expect("This iterator should be infinite!");
184 if new_readers_sender.clone().send((id, reader)).await.is_err() {
185 break;
186 }
187 streams.lock().insert(id, StreamStatus::Open(writer));
188 }
189 }
190 }
191
192 if matches!(ret, Err(Error::ConnectionClosed(None))) {
193 let writer = writer.lock();
194 if writer.is_closed() {
195 ret = Err(Error::ConnectionClosed(writer.closed_reason()))
196 }
197 }
198
199 let mut streams = streams.lock();
200
201 for (_, stream) in streams.drain() {
202 if let StreamStatus::Open(writer) = stream {
203 writer.close(format!("Multi-stream terminated: {ret:?}"));
204 }
205 }
206
207 ret
208 };
209
210 let handle_write = new_readers.for_each_concurrent(None, move |(id, stream)| {
211 let writer = Arc::clone(&writer);
212 async move {
213 if let Some(reason) = write_as_chunks(id, &stream, writer).await {
214 stream.close(format!("Stream terminated ({reason})"));
215 } else {
216 stream.close(format!("Stream terminated"));
217 }
218 }
219 });
220
221 let handle_read = pin!(handle_read);
222 let handle_write = pin!(handle_write);
223
224 match futures::future::select(handle_read, handle_write).await {
225 Either::Left((res, _)) => res,
226 Either::Right(((), handle_read)) => handle_read.await,
227 }
228}
229
230fn handle_one_chunk<'a>(
246 streams: &mut HashMap<u32, StreamStatus>,
247 is_server: bool,
248 buf: &'a [u8],
249 remote_name: &str,
250) -> Result<(usize, Option<(u32, &'a [u8])>)> {
251 if buf.len() < 6 {
252 return Err(Error::BufferTooShort(6));
253 }
254
255 let id = u32::from_le_bytes(buf[..4].try_into().unwrap());
256 let length = u16::from_le_bytes(buf[4..6].try_into().unwrap());
257
258 let length = length as usize;
259 let chunk_length = length + 6;
260 let buf = &buf[6..];
261
262 if length == 0 {
263 if buf.len() < 2 {
264 return Err(Error::BufferTooShort(8));
265 }
266 let length = u16::from_le_bytes(buf[..2].try_into().unwrap());
267 let length = length as usize;
268
269 if buf.len() < length + 2 {
270 return Err(Error::BufferTooShort(8 + length));
271 }
272
273 let epitaph =
274 if length > 0 { Some(String::from_utf8_lossy(&buf[2..][..length])) } else { None };
275
276 if let Some(old) = streams.insert(id, StreamStatus::Closed) {
277 if let StreamStatus::Open(old) = old {
278 if let Some(epitaph) = epitaph {
279 old.close(format!("{remote_name} reported: {epitaph}"));
280 } else {
281 old.close(format!("{remote_name} reported no epitaph"));
282 }
283 }
284 Ok((length + 8, None))
285 } else {
286 Err(Error::BadStreamId)
287 }
288 } else if buf.len() < length {
289 Err(Error::BufferTooShort(chunk_length))
290 } else if let Some(stream) = streams.get(&id) {
291 match stream {
292 StreamStatus::Open(stream) => {
293 if stream
294 .write(length, |out| {
295 out[..length].copy_from_slice(&buf[..length]);
296 Ok(length)
297 })
298 .is_err()
299 {
300 let _ = streams.insert(id, StreamStatus::ReadClosed);
306 }
307 Ok((chunk_length, None))
308 }
309 StreamStatus::ReadClosed => Ok((chunk_length, None)),
310 StreamStatus::Closed => Err(Error::BadStreamId),
311 }
312 } else if (id & 1 == 0) == is_server {
313 Ok((chunk_length, Some((id, &buf[..length]))))
314 } else {
315 Err(Error::BadStreamId)
316 }
317}
318
319async fn write_as_chunks(
328 id: u32,
329 reader: &stream::Reader,
330 writer: Arc<SyncMutex<stream::Writer>>,
331) -> Option<String> {
332 loop {
333 let got = reader
339 .read(1, |buf| {
340 let mut total_len = 0;
341 while buf.len() > total_len {
342 let buf = &buf[total_len..];
343 let buf =
344 if buf.len() > u16::MAX as usize { &buf[..u16::MAX as usize] } else { buf };
345
346 let len: u16 = buf
347 .len()
348 .try_into()
349 .expect("We just truncated the length so it would fit!");
350
351 if let e @ Err(_) = writer.lock().write(6 + buf.len(), |out_buf| {
352 out_buf[..4].copy_from_slice(&id.to_le_bytes());
353 let out_buf = &mut out_buf[4..];
354 out_buf[..2].copy_from_slice(&len.to_le_bytes());
355 out_buf[2..][..buf.len()].copy_from_slice(buf);
356 Ok(buf.len() + 6)
357 }) {
358 return Ok((e, total_len));
359 } else {
360 total_len += buf.len();
361 }
362 }
363
364 Ok((Ok(()), total_len))
365 })
366 .await;
367
368 match got {
369 Err(Error::ConnectionClosed(epitaph)) => {
370 let epitaph = epitaph.as_ref().map(|x| x.as_bytes()).unwrap_or(b"");
371 let length_u16: u16 = epitaph.len().try_into().unwrap_or(u16::MAX);
372 let length = length_u16 as usize;
373
374 let write_result = writer.lock().write(8 + length, |out_buf| {
376 out_buf[..4].copy_from_slice(&id.to_le_bytes());
377 let out_buf = &mut out_buf[4..];
378 out_buf[..2].copy_from_slice(&0u16.to_le_bytes());
379 let out_buf = &mut out_buf[2..];
380 out_buf[..2].copy_from_slice(&length_u16.to_le_bytes());
381 let out_buf = &mut out_buf[2..];
382 out_buf[..length].copy_from_slice(&epitaph[..length]);
383 Ok(8 + length)
384 });
385
386 match write_result {
387 Ok(()) | Err(Error::ConnectionClosed(None)) => break None,
388 Err(Error::ConnectionClosed(Some(s))) => break Some(format!("write: {s}")),
389 other => unreachable!("Unexpected write error: {other:?}"),
390 }
391 }
392 Ok(Ok(())) => (),
393 Ok(Err(Error::ConnectionClosed(None))) => break None,
394 Ok(Err(Error::ConnectionClosed(Some(s)))) => break Some(format!("read: {s}")),
395 Ok(other) => unreachable!("Unexpected write error: {other:?}"),
396 other => unreachable!("Unexpected read error: {other:?}"),
397 }
398 }
399}
400
401pub fn multi_stream_node_connection(
414 node: &Node,
415 reader: stream::Reader,
416 writer: stream::Writer,
417 is_server: bool,
418 quality: Quality,
419 stream_errors_out: UnboundedSender<Error>,
420 remote_name: String,
421) -> impl Future<Output = Result<()>> + Send + use<> {
422 let (mut new_stream_sender, streams_in) = channel(1);
423 let (streams_out, new_stream_receiver) = channel(1);
424
425 let control_stream = if is_server {
426 let (control_reader, control_writer_remote) = stream::stream();
427 let (control_reader_remote, control_writer) = stream::stream();
428
429 new_stream_sender
430 .try_send((control_reader_remote, control_writer_remote))
431 .expect("We just created this channel!");
432 Some((control_reader, control_writer))
433 } else {
434 None
435 };
436
437 let stream_fut = multi_stream(
438 reader,
439 writer,
440 is_server,
441 streams_out,
442 streams_in,
443 stream_errors_out,
444 remote_name,
445 );
446 let node_fut = node.link_node(control_stream, new_stream_sender, new_stream_receiver, quality);
447
448 async move {
449 let node_fut = pin!(node_fut);
450 let stream_fut = pin!(stream_fut);
451
452 match futures::future::select(node_fut, stream_fut).await {
455 Either::Left((res, stream_fut)) => res.and(stream_fut.await),
456 Either::Right((res, node_fut)) => res.and(node_fut.await),
457 }
458 }
459}
460
461pub async fn multi_stream_node_connection_to_async(
464 node: &Node,
465 rx: &mut (dyn AsyncRead + Unpin + Send),
466 tx: &mut (dyn AsyncWrite + Unpin + Send),
467 is_server: bool,
468 quality: Quality,
469 stream_errors_out: UnboundedSender<Error>,
470 remote_name: String,
471) -> Result<()> {
472 let (reader, remote_writer) = stream::stream();
473 let (remote_reader, writer) = stream::stream();
474 let conn_fut = multi_stream_node_connection(
475 node,
476 remote_reader,
477 remote_writer,
478 is_server,
479 quality,
480 stream_errors_out,
481 remote_name.clone(),
482 );
483 let remote_name = &remote_name;
484
485 let read_fut = async move {
486 let mut buf = [0u8; 4096];
487 loop {
488 let n = match rx.read(&mut buf).await {
489 Ok(0) => {
490 writer
491 .close(format!("connection closed (either by transport or {remote_name})"));
492 break Ok(());
493 }
494 Ok(n) => n,
495 Err(e) => {
496 writer.close(format!("{remote_name} connection failed (read): {e:?}"));
497 return Err(Error::from(e));
498 }
499 };
500 writer.write(n, |write_buf| {
501 write_buf[..n].copy_from_slice(&buf[..n]);
502 Ok(n)
503 })?
504 }
505 };
506
507 let write_fut = async move {
508 let mut tx = std::pin::pin!(tx);
509 loop {
510 let res = futures::future::poll_fn(|ctx| {
511 reader.poll_read(ctx, &mut 1, |ctx, read_buf| {
512 let res = match std::task::ready!(tx.as_mut().poll_write(ctx, read_buf)) {
513 Ok(x) => Ok(((), x)),
514 Err(e) => Err(Error::IO(e)),
515 };
516 std::task::Poll::Ready(res)
517 })
518 })
519 .await;
520
521 if let Err(Error::IO(e)) = &res {
522 reader.close(format!("{remote_name} connection failed (write): {e:?}"));
523 return res;
524 }
525
526 res?;
527 }
528 };
529
530 let read_write = futures::future::try_join(read_fut, write_fut);
531
532 let conn_fut = pin!(conn_fut);
533
534 let cleanup = {
535 let read_write = pin!(read_write);
540
541 match futures::future::select(conn_fut, read_write).await {
542 Either::Left((res, _)) => {
543 return res;
544 }
545 Either::Right((read_write_result, conn_fut)) => {
546 conn_fut.map(move |conn_fut_result| match (conn_fut_result, read_write_result) {
547 (Ok(()), Ok(((), ()))) => Ok(()),
548 (Ok(()), Err(e)) | (Err(e), _) => Err(e),
551 })
552 }
553 }
554 };
555
556 cleanup.await
559}
560
561#[cfg(test)]
562mod test {
563 use super::*;
564 use fuchsia_async as fasync;
565 use futures::channel::mpsc::unbounded;
566
567 #[fuchsia::test]
568 async fn one_stream() {
569 let (a_reader, b_writer) = stream::stream();
570 let (b_reader, a_writer) = stream::stream();
571 let (mut create_stream_a, a_streams_in) = channel(1);
572 let (_create_stream_b, b_streams_in) = channel(1);
573 let (a_streams_out, _get_stream_a) = channel(100);
574 let (b_streams_out, mut get_stream_b) = channel(1);
575 let (errors_sink_a, _black_hole) = unbounded();
578 let errors_sink_b = errors_sink_a.clone();
579
580 let _a = fasync::Task::spawn(async move {
581 assert!(matches!(
582 multi_stream(
583 a_reader,
584 a_writer,
585 true,
586 a_streams_out,
587 a_streams_in,
588 errors_sink_a,
589 "b".to_owned()
590 )
591 .await,
592 Ok(()) | Err(Error::ConnectionClosed(None))
593 ))
594 });
595 let _b = fasync::Task::spawn(async move {
596 assert!(matches!(
597 multi_stream(
598 b_reader,
599 b_writer,
600 false,
601 b_streams_out,
602 b_streams_in,
603 errors_sink_b,
604 "a".to_owned()
605 )
606 .await,
607 Ok(()) | Err(Error::ConnectionClosed(None))
608 ))
609 });
610
611 let (ab_reader_a, ab_reader_write) = stream::stream();
612 let (ab_writer_read, ab_writer_a) = stream::stream();
613
614 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
615
616 ab_writer_a
617 .write(8, |buf| {
618 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
619 Ok(8)
620 })
621 .unwrap();
622
623 let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
624 err.send(Ok(())).unwrap();
625
626 ab_writer_b
627 .write(8, |buf| {
628 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
629 Ok(8)
630 })
631 .unwrap();
632
633 ab_reader_b
634 .read(8, |buf| {
635 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
636 Ok(((), 8))
637 })
638 .await
639 .unwrap();
640 ab_reader_a
641 .read(8, |buf| {
642 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
643 Ok(((), 8))
644 })
645 .await
646 .unwrap();
647
648 std::mem::drop(ab_writer_b);
649 assert!(matches!(
650 ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
651 Err(Error::ConnectionClosed(_))
652 ));
653
654 std::mem::drop(ab_reader_b);
655 ab_writer_a
656 .write(8, |buf| {
657 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
658 Ok(8)
659 })
660 .unwrap();
661 }
662
663 #[fuchsia::test]
664 async fn fallible_stream() {
665 let (a_reader, b_writer) = stream::stream();
666 let (b_reader, a_writer) = stream::stream();
667 let (mut create_stream_a, a_streams_in) = channel(1);
668 let (_create_stream_b, b_streams_in) = channel(1);
669 let (a_streams_out, _get_stream_a) = channel(100);
670 let (b_streams_out, mut get_stream_b) = channel(1);
671 let (errors_sink_a, _black_hole) = unbounded();
672 let (errors_sink_b, mut b_errors) = unbounded();
675
676 let _a = fasync::Task::spawn(async move {
677 assert!(matches!(
678 multi_stream(
679 a_reader,
680 a_writer,
681 true,
682 a_streams_out,
683 a_streams_in,
684 errors_sink_a,
685 "b".to_owned()
686 )
687 .await,
688 Ok(()) | Err(Error::ConnectionClosed(None))
689 ))
690 });
691 let _b = fasync::Task::spawn(async move {
692 assert!(matches!(
693 multi_stream(
694 b_reader,
695 b_writer,
696 false,
697 b_streams_out,
698 b_streams_in,
699 errors_sink_b,
700 "a".to_owned()
701 )
702 .await,
703 Ok(()) | Err(Error::ConnectionClosed(None))
704 ))
705 });
706
707 let (fail_reader, fail_reader_write) = stream::stream();
709 let (_ignore, fail_writer) = stream::stream();
710 create_stream_a.send((fail_reader, fail_writer)).await.unwrap();
711
712 fail_reader_write
715 .write(8, |buf| {
716 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
717 Ok(8)
718 })
719 .unwrap();
720
721 let (_, _, err) = get_stream_b.next().await.unwrap();
722 err.send(Err(Error::ConnectionClosed(Some("Testing".to_owned())))).unwrap();
723
724 loop {
725 if let Some(Error::ConnectionClosed(Some(s))) = b_errors.next().await {
726 if s == "Testing" {
727 break;
728 }
729 } else {
730 panic!("Error stream closed without reporting our error.");
731 }
732 }
733
734 let (ab_reader_a, ab_reader_write) = stream::stream();
735 let (ab_writer_read, ab_writer_a) = stream::stream();
736
737 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
738
739 ab_writer_a
740 .write(8, |buf| {
741 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
742 Ok(8)
743 })
744 .unwrap();
745
746 let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
747 err.send(Ok(())).unwrap();
748
749 ab_writer_b
750 .write(8, |buf| {
751 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
752 Ok(8)
753 })
754 .unwrap();
755
756 ab_reader_b
757 .read(8, |buf| {
758 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
759 Ok(((), 8))
760 })
761 .await
762 .unwrap();
763 ab_reader_a
764 .read(8, |buf| {
765 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
766 Ok(((), 8))
767 })
768 .await
769 .unwrap();
770
771 std::mem::drop(ab_writer_b);
772 assert!(matches!(
773 ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
774 Err(Error::ConnectionClosed(_))
775 ));
776
777 std::mem::drop(ab_reader_b);
778 ab_writer_a
779 .write(8, |buf| {
780 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
781 Ok(8)
782 })
783 .unwrap();
784 }
785
786 #[fuchsia::test]
787 async fn two_streams() {
788 let (a_reader, b_writer) = stream::stream();
789 let (b_reader, a_writer) = stream::stream();
790 let (mut create_stream_a, a_streams_in) = channel(1);
791 let (mut create_stream_b, b_streams_in) = channel(1);
792 let (a_streams_out, mut get_stream_a) = channel(1);
793 let (b_streams_out, mut get_stream_b) = channel(1);
794 let (errors_sink, _black_hole) = unbounded();
797
798 let _a = fasync::Task::spawn(multi_stream(
799 a_reader,
800 a_writer,
801 true,
802 a_streams_out,
803 a_streams_in,
804 errors_sink.clone(),
805 "b".to_owned(),
806 ));
807 let _b = fasync::Task::spawn(multi_stream(
808 b_reader,
809 b_writer,
810 false,
811 b_streams_out,
812 b_streams_in,
813 errors_sink.clone(),
814 "a".to_owned(),
815 ));
816
817 let (ab_reader_a, ab_reader_write) = stream::stream();
818 let (ab_writer_read, ab_writer_a) = stream::stream();
819 let (ba_reader_b, ba_reader_write) = stream::stream();
820 let (ba_writer_read, ba_writer_b) = stream::stream();
821
822 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
823 create_stream_b.send((ba_writer_read, ba_reader_write)).await.unwrap();
824
825 ab_writer_a
826 .write(8, |buf| {
827 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
828 Ok(8)
829 })
830 .unwrap();
831 ba_writer_b
832 .write(8, |buf| {
833 buf[..8].copy_from_slice(&[25, 26, 27, 28, 29, 30, 31, 32]);
834 Ok(8)
835 })
836 .unwrap();
837
838 let (ab_reader_b, ab_writer_b, err_ab) = get_stream_b.next().await.unwrap();
839 let (ba_reader_a, ba_writer_a, err_ba) = get_stream_a.next().await.unwrap();
840 err_ab.send(Ok(())).unwrap();
841 err_ba.send(Ok(())).unwrap();
842
843 ab_writer_b
844 .write(8, |buf| {
845 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
846 Ok(8)
847 })
848 .unwrap();
849 ba_writer_a
850 .write(8, |buf| {
851 buf[..8].copy_from_slice(&[17, 18, 19, 20, 21, 22, 23, 24]);
852 Ok(8)
853 })
854 .unwrap();
855
856 ab_reader_b
857 .read(8, |buf| {
858 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
859 Ok(((), 8))
860 })
861 .await
862 .unwrap();
863 ab_reader_a
864 .read(8, |buf| {
865 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
866 Ok(((), 8))
867 })
868 .await
869 .unwrap();
870 ba_reader_b
871 .read(8, |buf| {
872 assert_eq!(&buf[..8], &[17, 18, 19, 20, 21, 22, 23, 24]);
873 Ok(((), 8))
874 })
875 .await
876 .unwrap();
877 ba_reader_a
878 .read(8, |buf| {
879 assert_eq!(&buf[..8], &[25, 26, 27, 28, 29, 30, 31, 32]);
880 Ok(((), 8))
881 })
882 .await
883 .unwrap();
884 }
885
886 #[fuchsia::test]
887 async fn node_connect() {
888 let (new_peer_sender_a, mut new_peers) = channel(1);
889 let (new_peer_sender_b, _new_peers_b) = channel(100);
890 let (incoming_streams_sender_a, _streams_a) = channel(100);
891 let (incoming_streams_sender_b, mut streams) = channel(1);
892 let a = Node::new("a", "test", new_peer_sender_a, incoming_streams_sender_a).unwrap();
893 let b = Node::new("b", "test", new_peer_sender_b, incoming_streams_sender_b).unwrap();
894 let (errors_sink, _black_hole) = unbounded();
897
898 let (ab_reader, ab_writer) = stream::stream();
899 let (ba_reader, ba_writer) = stream::stream();
900
901 let _a_conn = fasync::Task::spawn(multi_stream_node_connection(
902 &a,
903 ba_reader,
904 ab_writer,
905 true,
906 Quality::IN_PROCESS,
907 errors_sink.clone(),
908 "b".to_owned(),
909 ));
910 let _b_conn = fasync::Task::spawn(multi_stream_node_connection(
911 &b,
912 ab_reader,
913 ba_writer,
914 false,
915 Quality::IN_PROCESS,
916 errors_sink.clone(),
917 "a".to_owned(),
918 ));
919
920 let new_peer = new_peers.next().await.unwrap();
921 assert_eq!("b", &new_peer);
922
923 let (_reader, peer_writer) = stream::stream();
924 let (peer_reader, writer) = stream::stream();
925 a.connect_to_peer(peer_reader, peer_writer, "b").await.unwrap();
926
927 writer
928 .write(8, |buf| {
929 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
930 Ok(8)
931 })
932 .unwrap();
933
934 let (reader, _writer, from) = streams.next().await.unwrap();
935 assert_eq!("a", &from);
936
937 reader
938 .read(8, |buf| {
939 assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], &buf);
940 Ok(((), 8))
941 })
942 .await
943 .unwrap();
944 }
945}