1use crate::error::{Error, Result};
6use crate::{stream, Node, Quality};
7
8use futures::channel::mpsc::{channel, Receiver, Sender, UnboundedSender};
9use futures::channel::oneshot;
10use futures::future::Either;
11use futures::prelude::*;
12use futures::StreamExt;
13use std::collections::HashMap;
14use std::pin::pin;
15use std::sync::{Arc, Mutex as SyncMutex};
16
17enum StreamStatus {
19 Open(stream::Writer),
21 ReadClosed,
24 Closed,
26}
27
28pub async fn multi_stream(
51 reader: stream::Reader,
52 writer: stream::Writer,
53 is_server: bool,
54 streams_out: Sender<(stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>,
55 streams_in: Receiver<(stream::Reader, stream::Writer)>,
56 stream_errors_out: UnboundedSender<Error>,
57 remote_name: String,
58) -> Result<()> {
59 let (new_readers_sender, new_readers) = channel(1);
60 let (stream_errors_sender, stream_errors) = channel(1);
61 let first_stream_id = if is_server { 1 } else { 0 };
62 let mut stream_ids = (first_stream_id..).step_by(2);
63 let writer = Arc::new(SyncMutex::new(writer));
64 let writer_for_reader = Arc::clone(&writer);
65
66 let handle_read = async move {
67 let writer = writer_for_reader;
68 let streams = SyncMutex::new(HashMap::<u32, StreamStatus>::new());
69
70 let read_result_stream = futures::stream::unfold((), |_| async {
78 let got = reader
79 .read(6, |buf| {
80 let mut streams = streams.lock().unwrap();
81 let (size, new_stream) =
82 handle_one_chunk(&mut *streams, is_server, buf, &remote_name)?;
83
84 Ok((
85 new_stream.map(|(id, first_chunk_data)| {
86 let (reader, remote_writer) = stream::stream();
87 remote_writer
88 .write(first_chunk_data.len(), |out| {
89 out[..first_chunk_data.len()]
90 .copy_from_slice(&first_chunk_data);
91 Ok(first_chunk_data.len())
92 })
93 .expect("We just created this stream!");
94 streams.insert(id, StreamStatus::Open(remote_writer));
95 (id, reader)
96 }),
97 size,
98 ))
99 })
100 .await;
101
102 let got = match got {
103 Ok(Some((id, reader))) => {
104 let (remote_reader, writer) = stream::stream();
109
110 if new_readers_sender.clone().send((id, remote_reader)).await.is_err() {
111 Err(Error::ConnectionClosed(Some(
112 "New stream reader handler disappeared".to_owned(),
113 )))
114 } else {
115 let (sender, receiver) = oneshot::channel();
116 if stream_errors_sender.clone().send(receiver).await.is_ok() {
117 streams_out.clone().send((reader, writer, sender)).await.map_err(|_| {
118 Error::ConnectionClosed(Some(
119 "New stream handler disappeared".to_owned(),
120 ))
121 })
122 } else {
123 Err(Error::ConnectionClosed(Some(
124 "Error reporting channel closed".to_owned(),
125 )))
126 }
127 }
128 }
129 Ok(None) => Ok(()),
130 Err(e) => Err(e),
131 };
132
133 Some((got, ()))
134 });
135
136 let stream_errors = stream_errors
140 .map(move |x| {
141 let stream_errors_out = stream_errors_out.clone();
142 async move {
143 if let Err(x) = x.await.unwrap_or_else(|_| {
144 Err(Error::ConnectionClosed(Some(
145 "Stream handler hung up without returning a status".to_owned(),
146 )))
147 }) {
148 let _ = stream_errors_out.unbounded_send(x);
149 }
150 }
151 })
152 .buffer_unordered(usize::MAX)
155 .filter_map(|()| futures::future::ready(None));
157
158 let read_result_stream = futures::stream::select(stream_errors, read_result_stream);
159
160 let read_result_stream = read_result_stream.map(Either::Left);
164
165 let streams_in = streams_in.chain(futures::stream::pending()).map(Either::Right);
169
170 let events = futures::stream::select(read_result_stream, streams_in);
171 let mut events = pin!(events);
172
173 let mut ret = Ok(());
174 while let Some(event) = events.next().await {
175 match event {
176 Either::Left(Ok(())) => (),
177 Either::Left(other) => {
178 ret = other;
179 break;
180 }
181 Either::Right((reader, writer)) => {
182 let id = stream_ids.next().expect("This iterator should be infinite!");
183 if new_readers_sender.clone().send((id, reader)).await.is_err() {
184 break;
185 }
186 streams.lock().unwrap().insert(id, StreamStatus::Open(writer));
187 }
188 }
189 }
190
191 if matches!(ret, Err(Error::ConnectionClosed(None))) {
192 let writer = writer.lock().unwrap();
193 if writer.is_closed() {
194 ret = Err(Error::ConnectionClosed(writer.closed_reason()))
195 }
196 }
197
198 let mut streams = streams.lock().unwrap();
199
200 for (_, stream) in streams.drain() {
201 if let StreamStatus::Open(writer) = stream {
202 writer.close(format!("Multi-stream terminated: {ret:?}"));
203 }
204 }
205
206 ret
207 };
208
209 let handle_write = new_readers.for_each_concurrent(None, move |(id, stream)| {
210 let writer = Arc::clone(&writer);
211 async move {
212 if let Some(reason) = write_as_chunks(id, &stream, writer).await {
213 stream.close(format!("Stream terminated ({reason})"));
214 } else {
215 stream.close(format!("Stream terminated"));
216 }
217 }
218 });
219
220 let handle_read = pin!(handle_read);
221 let handle_write = pin!(handle_write);
222
223 match futures::future::select(handle_read, handle_write).await {
224 Either::Left((res, _)) => res,
225 Either::Right(((), handle_read)) => handle_read.await,
226 }
227}
228
229fn handle_one_chunk<'a>(
245 streams: &mut HashMap<u32, StreamStatus>,
246 is_server: bool,
247 buf: &'a [u8],
248 remote_name: &str,
249) -> Result<(usize, Option<(u32, &'a [u8])>)> {
250 if buf.len() < 6 {
251 return Err(Error::BufferTooShort(6));
252 }
253
254 let id = u32::from_le_bytes(buf[..4].try_into().unwrap());
255 let length = u16::from_le_bytes(buf[4..6].try_into().unwrap());
256
257 let length = length as usize;
258 let chunk_length = length + 6;
259 let buf = &buf[6..];
260
261 if length == 0 {
262 if buf.len() < 2 {
263 return Err(Error::BufferTooShort(8));
264 }
265 let length = u16::from_le_bytes(buf[..2].try_into().unwrap());
266 let length = length as usize;
267
268 if buf.len() < length + 2 {
269 return Err(Error::BufferTooShort(8 + length));
270 }
271
272 let epitaph =
273 if length > 0 { Some(String::from_utf8_lossy(&buf[2..][..length])) } else { None };
274
275 if let Some(old) = streams.insert(id, StreamStatus::Closed) {
276 if let StreamStatus::Open(old) = old {
277 if let Some(epitaph) = epitaph {
278 old.close(format!("{remote_name} reported: {epitaph}"));
279 } else {
280 old.close(format!("{remote_name} reported no epitaph"));
281 }
282 }
283 Ok((length + 8, None))
284 } else {
285 Err(Error::BadStreamId)
286 }
287 } else if buf.len() < length {
288 Err(Error::BufferTooShort(chunk_length))
289 } else if let Some(stream) = streams.get(&id) {
290 match stream {
291 StreamStatus::Open(stream) => {
292 if stream
293 .write(length, |out| {
294 out[..length].copy_from_slice(&buf[..length]);
295 Ok(length)
296 })
297 .is_err()
298 {
299 let _ = streams.insert(id, StreamStatus::ReadClosed);
305 }
306 Ok((chunk_length, None))
307 }
308 StreamStatus::ReadClosed => Ok((chunk_length, None)),
309 StreamStatus::Closed => Err(Error::BadStreamId),
310 }
311 } else if (id & 1 == 0) == is_server {
312 Ok((chunk_length, Some((id, &buf[..length]))))
313 } else {
314 Err(Error::BadStreamId)
315 }
316}
317
318async fn write_as_chunks(
327 id: u32,
328 reader: &stream::Reader,
329 writer: Arc<SyncMutex<stream::Writer>>,
330) -> Option<String> {
331 loop {
332 let got = reader
338 .read(1, |buf| {
339 let mut total_len = 0;
340 while buf.len() > total_len {
341 let buf = &buf[total_len..];
342 let buf =
343 if buf.len() > u16::MAX as usize { &buf[..u16::MAX as usize] } else { buf };
344
345 let len: u16 = buf
346 .len()
347 .try_into()
348 .expect("We just truncated the length so it would fit!");
349
350 if let e @ Err(_) = writer.lock().unwrap().write(6 + buf.len(), |out_buf| {
351 out_buf[..4].copy_from_slice(&id.to_le_bytes());
352 let out_buf = &mut out_buf[4..];
353 out_buf[..2].copy_from_slice(&len.to_le_bytes());
354 out_buf[2..][..buf.len()].copy_from_slice(buf);
355 Ok(buf.len() + 6)
356 }) {
357 return Ok((e, total_len));
358 } else {
359 total_len += buf.len();
360 }
361 }
362
363 Ok((Ok(()), total_len))
364 })
365 .await;
366
367 match got {
368 Err(Error::ConnectionClosed(epitaph)) => {
369 let epitaph = epitaph.as_ref().map(|x| x.as_bytes()).unwrap_or(b"");
370 let length_u16: u16 = epitaph.len().try_into().unwrap_or(u16::MAX);
371 let length = length_u16 as usize;
372
373 let write_result = writer.lock().unwrap().write(8 + length, |out_buf| {
375 out_buf[..4].copy_from_slice(&id.to_le_bytes());
376 let out_buf = &mut out_buf[4..];
377 out_buf[..2].copy_from_slice(&0u16.to_le_bytes());
378 let out_buf = &mut out_buf[2..];
379 out_buf[..2].copy_from_slice(&length_u16.to_le_bytes());
380 let out_buf = &mut out_buf[2..];
381 out_buf[..length].copy_from_slice(&epitaph[..length]);
382 Ok(8 + length)
383 });
384
385 match write_result {
386 Ok(()) | Err(Error::ConnectionClosed(None)) => break None,
387 Err(Error::ConnectionClosed(Some(s))) => break Some(format!("write: {s}")),
388 other => unreachable!("Unexpected write error: {other:?}"),
389 }
390 }
391 Ok(Ok(())) => (),
392 Ok(Err(Error::ConnectionClosed(None))) => break None,
393 Ok(Err(Error::ConnectionClosed(Some(s)))) => break Some(format!("read: {s}")),
394 Ok(other) => unreachable!("Unexpected write error: {other:?}"),
395 other => unreachable!("Unexpected read error: {other:?}"),
396 }
397 }
398}
399
400pub fn multi_stream_node_connection(
413 node: &Node,
414 reader: stream::Reader,
415 writer: stream::Writer,
416 is_server: bool,
417 quality: Quality,
418 stream_errors_out: UnboundedSender<Error>,
419 remote_name: String,
420) -> impl Future<Output = Result<()>> + Send {
421 let (mut new_stream_sender, streams_in) = channel(1);
422 let (streams_out, new_stream_receiver) = channel(1);
423
424 let control_stream = if is_server {
425 let (control_reader, control_writer_remote) = stream::stream();
426 let (control_reader_remote, control_writer) = stream::stream();
427
428 new_stream_sender
429 .try_send((control_reader_remote, control_writer_remote))
430 .expect("We just created this channel!");
431 Some((control_reader, control_writer))
432 } else {
433 None
434 };
435
436 let stream_fut = multi_stream(
437 reader,
438 writer,
439 is_server,
440 streams_out,
441 streams_in,
442 stream_errors_out,
443 remote_name,
444 );
445 let node_fut = node.link_node(control_stream, new_stream_sender, new_stream_receiver, quality);
446
447 async move {
448 let node_fut = pin!(node_fut);
449 let stream_fut = pin!(stream_fut);
450
451 match futures::future::select(node_fut, stream_fut).await {
454 Either::Left((res, stream_fut)) => res.and(stream_fut.await),
455 Either::Right((res, node_fut)) => res.and(node_fut.await),
456 }
457 }
458}
459
460pub async fn multi_stream_node_connection_to_async(
463 node: &Node,
464 rx: &mut (dyn AsyncRead + Unpin + Send),
465 tx: &mut (dyn AsyncWrite + Unpin + Send),
466 is_server: bool,
467 quality: Quality,
468 stream_errors_out: UnboundedSender<Error>,
469 remote_name: String,
470) -> Result<()> {
471 let (reader, remote_writer) = stream::stream();
472 let (remote_reader, writer) = stream::stream();
473 let conn_fut = multi_stream_node_connection(
474 node,
475 remote_reader,
476 remote_writer,
477 is_server,
478 quality,
479 stream_errors_out,
480 remote_name.clone(),
481 );
482 let remote_name = &remote_name;
483
484 let read_fut = async move {
485 let mut buf = [0u8; 4096];
486 loop {
487 let n = match rx.read(&mut buf).await {
488 Ok(0) => {
489 writer
490 .close(format!("connection closed (either by transport or {remote_name})"));
491 break Ok(());
492 }
493 Ok(n) => n,
494 Err(e) => {
495 writer.close(format!("{remote_name} connection failed (read): {e:?}"));
496 return Err(Error::from(e));
497 }
498 };
499 writer.write(n, |write_buf| {
500 write_buf[..n].copy_from_slice(&buf[..n]);
501 Ok(n)
502 })?
503 }
504 };
505
506 let write_fut = async move {
507 loop {
508 let mut buf = [0u8; 4096];
509 let len = reader
510 .read(1, |read_buf| {
511 let read_buf = &read_buf[..std::cmp::min(buf.len(), read_buf.len())];
512 buf[..read_buf.len()].copy_from_slice(read_buf);
513 Ok((read_buf.len(), read_buf.len()))
514 })
515 .await?;
516 let write_res = async {
517 tx.write_all(&buf[..len]).await?;
518 tx.flush().await?;
519 Result::<_, Error>::Ok(())
520 }
521 .await;
522
523 if let Err(e) = write_res {
524 reader.close(format!("{remote_name} connection failed (write): {e:?}"));
525 return Err(e.into());
526 }
527 }
528 };
529
530 let read_write = futures::future::try_join(read_fut, write_fut);
531
532 let conn_fut = pin!(conn_fut);
533
534 let cleanup = {
535 let read_write = pin!(read_write);
540
541 match futures::future::select(conn_fut, read_write).await {
542 Either::Left((res, _)) => {
543 return res;
544 }
545 Either::Right((read_write_result, conn_fut)) => {
546 conn_fut.map(move |conn_fut_result| match (conn_fut_result, read_write_result) {
547 (Ok(()), Ok(((), ()))) => Ok(()),
548 (Ok(()), Err(e)) | (Err(e), _) => Err(e),
551 })
552 }
553 }
554 };
555
556 cleanup.await
559}
560
561#[cfg(test)]
562mod test {
563 use super::*;
564 use fuchsia_async as fasync;
565 use futures::channel::mpsc::unbounded;
566
567 #[fuchsia::test]
568 async fn one_stream() {
569 let (a_reader, b_writer) = stream::stream();
570 let (b_reader, a_writer) = stream::stream();
571 let (mut create_stream_a, a_streams_in) = channel(1);
572 let (_create_stream_b, b_streams_in) = channel(1);
573 let (a_streams_out, _get_stream_a) = channel(100);
574 let (b_streams_out, mut get_stream_b) = channel(1);
575 let (errors_sink_a, _black_hole) = unbounded();
578 let errors_sink_b = errors_sink_a.clone();
579
580 let _a = fasync::Task::spawn(async move {
581 assert!(matches!(
582 multi_stream(
583 a_reader,
584 a_writer,
585 true,
586 a_streams_out,
587 a_streams_in,
588 errors_sink_a,
589 "b".to_owned()
590 )
591 .await,
592 Ok(()) | Err(Error::ConnectionClosed(None))
593 ))
594 });
595 let _b = fasync::Task::spawn(async move {
596 assert!(matches!(
597 multi_stream(
598 b_reader,
599 b_writer,
600 false,
601 b_streams_out,
602 b_streams_in,
603 errors_sink_b,
604 "a".to_owned()
605 )
606 .await,
607 Ok(()) | Err(Error::ConnectionClosed(None))
608 ))
609 });
610
611 let (ab_reader_a, ab_reader_write) = stream::stream();
612 let (ab_writer_read, ab_writer_a) = stream::stream();
613
614 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
615
616 ab_writer_a
617 .write(8, |buf| {
618 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
619 Ok(8)
620 })
621 .unwrap();
622
623 let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
624 err.send(Ok(())).unwrap();
625
626 ab_writer_b
627 .write(8, |buf| {
628 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
629 Ok(8)
630 })
631 .unwrap();
632
633 ab_reader_b
634 .read(8, |buf| {
635 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
636 Ok(((), 8))
637 })
638 .await
639 .unwrap();
640 ab_reader_a
641 .read(8, |buf| {
642 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
643 Ok(((), 8))
644 })
645 .await
646 .unwrap();
647
648 std::mem::drop(ab_writer_b);
649 assert!(matches!(
650 ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
651 Err(Error::ConnectionClosed(_))
652 ));
653
654 std::mem::drop(ab_reader_b);
655 ab_writer_a
656 .write(8, |buf| {
657 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
658 Ok(8)
659 })
660 .unwrap();
661 }
662
663 #[fuchsia::test]
664 async fn fallible_stream() {
665 let (a_reader, b_writer) = stream::stream();
666 let (b_reader, a_writer) = stream::stream();
667 let (mut create_stream_a, a_streams_in) = channel(1);
668 let (_create_stream_b, b_streams_in) = channel(1);
669 let (a_streams_out, _get_stream_a) = channel(100);
670 let (b_streams_out, mut get_stream_b) = channel(1);
671 let (errors_sink_a, _black_hole) = unbounded();
672 let (errors_sink_b, mut b_errors) = unbounded();
675
676 let _a = fasync::Task::spawn(async move {
677 assert!(matches!(
678 multi_stream(
679 a_reader,
680 a_writer,
681 true,
682 a_streams_out,
683 a_streams_in,
684 errors_sink_a,
685 "b".to_owned()
686 )
687 .await,
688 Ok(()) | Err(Error::ConnectionClosed(None))
689 ))
690 });
691 let _b = fasync::Task::spawn(async move {
692 assert!(matches!(
693 multi_stream(
694 b_reader,
695 b_writer,
696 false,
697 b_streams_out,
698 b_streams_in,
699 errors_sink_b,
700 "a".to_owned()
701 )
702 .await,
703 Ok(()) | Err(Error::ConnectionClosed(None))
704 ))
705 });
706
707 let (fail_reader, fail_reader_write) = stream::stream();
709 let (_ignore, fail_writer) = stream::stream();
710 create_stream_a.send((fail_reader, fail_writer)).await.unwrap();
711
712 fail_reader_write
715 .write(8, |buf| {
716 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
717 Ok(8)
718 })
719 .unwrap();
720
721 let (_, _, err) = get_stream_b.next().await.unwrap();
722 err.send(Err(Error::ConnectionClosed(Some("Testing".to_owned())))).unwrap();
723
724 loop {
725 if let Some(Error::ConnectionClosed(Some(s))) = b_errors.next().await {
726 if s == "Testing" {
727 break;
728 }
729 } else {
730 panic!("Error stream closed without reporting our error.");
731 }
732 }
733
734 let (ab_reader_a, ab_reader_write) = stream::stream();
735 let (ab_writer_read, ab_writer_a) = stream::stream();
736
737 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
738
739 ab_writer_a
740 .write(8, |buf| {
741 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
742 Ok(8)
743 })
744 .unwrap();
745
746 let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
747 err.send(Ok(())).unwrap();
748
749 ab_writer_b
750 .write(8, |buf| {
751 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
752 Ok(8)
753 })
754 .unwrap();
755
756 ab_reader_b
757 .read(8, |buf| {
758 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
759 Ok(((), 8))
760 })
761 .await
762 .unwrap();
763 ab_reader_a
764 .read(8, |buf| {
765 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
766 Ok(((), 8))
767 })
768 .await
769 .unwrap();
770
771 std::mem::drop(ab_writer_b);
772 assert!(matches!(
773 ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
774 Err(Error::ConnectionClosed(_))
775 ));
776
777 std::mem::drop(ab_reader_b);
778 ab_writer_a
779 .write(8, |buf| {
780 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
781 Ok(8)
782 })
783 .unwrap();
784 }
785
786 #[fuchsia::test]
787 async fn two_streams() {
788 let (a_reader, b_writer) = stream::stream();
789 let (b_reader, a_writer) = stream::stream();
790 let (mut create_stream_a, a_streams_in) = channel(1);
791 let (mut create_stream_b, b_streams_in) = channel(1);
792 let (a_streams_out, mut get_stream_a) = channel(1);
793 let (b_streams_out, mut get_stream_b) = channel(1);
794 let (errors_sink, _black_hole) = unbounded();
797
798 let _a = fasync::Task::spawn(multi_stream(
799 a_reader,
800 a_writer,
801 true,
802 a_streams_out,
803 a_streams_in,
804 errors_sink.clone(),
805 "b".to_owned(),
806 ));
807 let _b = fasync::Task::spawn(multi_stream(
808 b_reader,
809 b_writer,
810 false,
811 b_streams_out,
812 b_streams_in,
813 errors_sink.clone(),
814 "a".to_owned(),
815 ));
816
817 let (ab_reader_a, ab_reader_write) = stream::stream();
818 let (ab_writer_read, ab_writer_a) = stream::stream();
819 let (ba_reader_b, ba_reader_write) = stream::stream();
820 let (ba_writer_read, ba_writer_b) = stream::stream();
821
822 create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
823 create_stream_b.send((ba_writer_read, ba_reader_write)).await.unwrap();
824
825 ab_writer_a
826 .write(8, |buf| {
827 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
828 Ok(8)
829 })
830 .unwrap();
831 ba_writer_b
832 .write(8, |buf| {
833 buf[..8].copy_from_slice(&[25, 26, 27, 28, 29, 30, 31, 32]);
834 Ok(8)
835 })
836 .unwrap();
837
838 let (ab_reader_b, ab_writer_b, err_ab) = get_stream_b.next().await.unwrap();
839 let (ba_reader_a, ba_writer_a, err_ba) = get_stream_a.next().await.unwrap();
840 err_ab.send(Ok(())).unwrap();
841 err_ba.send(Ok(())).unwrap();
842
843 ab_writer_b
844 .write(8, |buf| {
845 buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
846 Ok(8)
847 })
848 .unwrap();
849 ba_writer_a
850 .write(8, |buf| {
851 buf[..8].copy_from_slice(&[17, 18, 19, 20, 21, 22, 23, 24]);
852 Ok(8)
853 })
854 .unwrap();
855
856 ab_reader_b
857 .read(8, |buf| {
858 assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
859 Ok(((), 8))
860 })
861 .await
862 .unwrap();
863 ab_reader_a
864 .read(8, |buf| {
865 assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
866 Ok(((), 8))
867 })
868 .await
869 .unwrap();
870 ba_reader_b
871 .read(8, |buf| {
872 assert_eq!(&buf[..8], &[17, 18, 19, 20, 21, 22, 23, 24]);
873 Ok(((), 8))
874 })
875 .await
876 .unwrap();
877 ba_reader_a
878 .read(8, |buf| {
879 assert_eq!(&buf[..8], &[25, 26, 27, 28, 29, 30, 31, 32]);
880 Ok(((), 8))
881 })
882 .await
883 .unwrap();
884 }
885
886 #[fuchsia::test]
887 async fn node_connect() {
888 let (new_peer_sender_a, mut new_peers) = channel(1);
889 let (new_peer_sender_b, _new_peers_b) = channel(100);
890 let (incoming_streams_sender_a, _streams_a) = channel(100);
891 let (incoming_streams_sender_b, mut streams) = channel(1);
892 let a = Node::new("a", "test", new_peer_sender_a, incoming_streams_sender_a).unwrap();
893 let b = Node::new("b", "test", new_peer_sender_b, incoming_streams_sender_b).unwrap();
894 let (errors_sink, _black_hole) = unbounded();
897
898 let (ab_reader, ab_writer) = stream::stream();
899 let (ba_reader, ba_writer) = stream::stream();
900
901 let _a_conn = fasync::Task::spawn(multi_stream_node_connection(
902 &a,
903 ba_reader,
904 ab_writer,
905 true,
906 Quality::IN_PROCESS,
907 errors_sink.clone(),
908 "b".to_owned(),
909 ));
910 let _b_conn = fasync::Task::spawn(multi_stream_node_connection(
911 &b,
912 ab_reader,
913 ba_writer,
914 false,
915 Quality::IN_PROCESS,
916 errors_sink.clone(),
917 "a".to_owned(),
918 ));
919
920 let new_peer = new_peers.next().await.unwrap();
921 assert_eq!("b", &new_peer);
922
923 let (_reader, peer_writer) = stream::stream();
924 let (peer_reader, writer) = stream::stream();
925 a.connect_to_peer(peer_reader, peer_writer, "b").await.unwrap();
926
927 writer
928 .write(8, |buf| {
929 buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
930 Ok(8)
931 })
932 .unwrap();
933
934 let (reader, _writer, from) = streams.next().await.unwrap();
935 assert_eq!("a", &from);
936
937 reader
938 .read(8, |buf| {
939 assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], &buf);
940 Ok(((), 8))
941 })
942 .await
943 .unwrap();
944 }
945}