circuit/
multi_stream.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::error::{Error, Result};
6use crate::{Node, Quality, stream};
7
8use fuchsia_sync::Mutex as SyncMutex;
9use futures::StreamExt;
10use futures::channel::mpsc::{Receiver, Sender, UnboundedSender, channel};
11use futures::channel::oneshot;
12use futures::future::Either;
13use futures::prelude::*;
14use std::collections::HashMap;
15use std::pin::pin;
16use std::sync::Arc;
17
18/// Status of an individual stream.
19enum StreamStatus {
20    /// Stream is open to traffic. Incoming traffic should be delivered to the given writer.
21    Open(stream::Writer),
22    /// Stream hasn't been officially closed but the user has stopped listening for incoming data.
23    /// The outgoing side of the stream might still be running.
24    ReadClosed,
25    /// Stream is closed. We never reuse stream IDs so a closed stream should never reopen.
26    Closed,
27}
28
29/// Multiplexes multiple streams into one.
30///
31/// While running, this function serves a protocol on the `reader` and `writer` that multiplexes
32/// multiple streams into a single stream.
33///
34/// The protocol is a series of variable length frames each consisting of:
35/// * 4 bytes - A stream ID in little-endian.
36/// * 2 bytes - a LENGTH in little-endian.
37/// * LENGTH bytes - data.
38///
39/// To start a new stream, an endpoint simply begins sending frames with that stream ID. To close a
40/// stream, an endpoint can send a zero-length frame. Once a stream is closed that stream ID should
41/// never be used again.
42///
43/// We distinguish the two endpoints of the protocol as a client and a server. The only difference
44/// between the two is in which stream IDs they may initiate; Any new streams started by the client
45/// should have odd-numbered IDs, and any new streams initiated by the server should have
46/// even-numbered IDs. The `is_server` argument controls this behavior.
47///
48/// Any new streams initiated by the other end of the connection will be returned to us via the
49/// `streams_out` channel, and we may initiate a stream by sending a reader and writer to the
50/// `streams_in` channel.
51pub async fn multi_stream(
52    reader: stream::Reader,
53    writer: stream::Writer,
54    is_server: bool,
55    streams_out: Sender<(stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>,
56    streams_in: Receiver<(stream::Reader, stream::Writer)>,
57    stream_errors_out: UnboundedSender<Error>,
58    remote_name: String,
59) -> Result<()> {
60    let (new_readers_sender, new_readers) = channel(1);
61    let (stream_errors_sender, stream_errors) = channel(1);
62    let first_stream_id = if is_server { 1 } else { 0 };
63    let mut stream_ids = (first_stream_id..).step_by(2);
64    let writer = Arc::new(SyncMutex::new(writer));
65    let writer_for_reader = Arc::clone(&writer);
66
67    let handle_read = async move {
68        let writer = writer_for_reader;
69        let streams = SyncMutex::new(HashMap::<u32, StreamStatus>::new());
70
71        // Creates a stream (that's Stream in the rust async sense, not in the sense of our
72        // protocol) which when polled will read some data from the reader and attempt to
73        // demultiplex it.
74        //
75        // Each time we call `.next()` on this stream one read operation is performed, and the
76        // stream yields a `Result` indicating if an error occurred. Therefore, to pump read-side
77        // IO, we simply need to pull values from this stream until one of them is an error.
78        let read_result_stream = futures::stream::unfold((), |_| async {
79            let got = reader
80                .read(6, |buf| {
81                    let mut streams = streams.lock();
82                    let (size, new_stream) =
83                        handle_one_chunk(&mut *streams, is_server, buf, &remote_name)?;
84
85                    Ok((
86                        new_stream.map(|(id, first_chunk_data)| {
87                            let (reader, remote_writer) = stream::stream();
88                            remote_writer
89                                .write(first_chunk_data.len(), |out| {
90                                    out[..first_chunk_data.len()]
91                                        .copy_from_slice(&first_chunk_data);
92                                    Ok(first_chunk_data.len())
93                                })
94                                .expect("We just created this stream!");
95                            streams.insert(id, StreamStatus::Open(remote_writer));
96                            (id, reader)
97                        }),
98                        size,
99                    ))
100                })
101                .await;
102
103            let got = match got {
104                Ok(Some((id, reader))) => {
105                    // We got a request to initiate a new stream. It's already
106                    // in the streams table, just have to hand the reader over
107                    // to the other task to be dispatched, and the writer out to
108                    // the caller.
109                    let (remote_reader, writer) = stream::stream();
110
111                    if new_readers_sender.clone().send((id, remote_reader)).await.is_err() {
112                        Err(Error::ConnectionClosed(Some(
113                            "New stream reader handler disappeared".to_owned(),
114                        )))
115                    } else {
116                        let (sender, receiver) = oneshot::channel();
117                        if stream_errors_sender.clone().send(receiver).await.is_ok() {
118                            streams_out.clone().send((reader, writer, sender)).await.map_err(|_| {
119                                Error::ConnectionClosed(Some(
120                                    "New stream handler disappeared".to_owned(),
121                                ))
122                            })
123                        } else {
124                            Err(Error::ConnectionClosed(Some(
125                                "Error reporting channel closed".to_owned(),
126                            )))
127                        }
128                    }
129                }
130                Ok(None) => Ok(()),
131                Err(e) => Err(e),
132            };
133
134            Some((got, ()))
135        });
136
137        // Send stream errors to our error output. This functions as a stream that never returns anything
138        // so we can wrap it in a select with read_result_stream and thereby continuously poll it as
139        // we handle errors.
140        let stream_errors = stream_errors
141            .map(move |x| {
142                let stream_errors_out = stream_errors_out.clone();
143                async move {
144                    if let Err(x) = x.await.unwrap_or_else(|_| {
145                        Err(Error::ConnectionClosed(Some(
146                            "Stream handler hung up without returning a status".to_owned(),
147                        )))
148                    }) {
149                        let _ = stream_errors_out.unbounded_send(x);
150                    }
151                }
152            })
153            // Buffer unordered all the stream errors otherwise we'll block the
154            // reader task.
155            .buffer_unordered(usize::MAX)
156            // Never yield anything and match the type of read_result_stream.
157            .filter_map(|()| futures::future::ready(None));
158
159        let read_result_stream = futures::stream::select(stream_errors, read_result_stream);
160
161        // The `futures::stream::select` function requires both of the streams you give it to be the
162        // same type and effectively interleaves the output of the two streams. We want to poll two
163        // streams at once but handle their output differently, so we wrap them in an `Either`.
164        let read_result_stream = read_result_stream.map(Either::Left);
165
166        // `streams_in` will yield any new streams the user wants to create. If the user hangs up
167        // that channel, we don't need to react, as the existing streams may still be in use. So we
168        // make streams_in poll forever once the user stops supplying new streams.
169        let streams_in = streams_in.chain(futures::stream::pending()).map(Either::Right);
170
171        let events = futures::stream::select(read_result_stream, streams_in);
172        let mut events = pin!(events);
173
174        let mut ret = Ok(());
175        while let Some(event) = events.next().await {
176            match event {
177                Either::Left(Ok(())) => (),
178                Either::Left(other) => {
179                    ret = other;
180                    break;
181                }
182                Either::Right((reader, writer)) => {
183                    let id = stream_ids.next().expect("This iterator should be infinite!");
184                    if new_readers_sender.clone().send((id, reader)).await.is_err() {
185                        break;
186                    }
187                    streams.lock().insert(id, StreamStatus::Open(writer));
188                }
189            }
190        }
191
192        if matches!(ret, Err(Error::ConnectionClosed(None))) {
193            let writer = writer.lock();
194            if writer.is_closed() {
195                ret = Err(Error::ConnectionClosed(writer.closed_reason()))
196            }
197        }
198
199        let mut streams = streams.lock();
200
201        for (_, stream) in streams.drain() {
202            if let StreamStatus::Open(writer) = stream {
203                writer.close(format!("Multi-stream terminated: {ret:?}"));
204            }
205        }
206
207        ret
208    };
209
210    let handle_write = new_readers.for_each_concurrent(None, move |(id, stream)| {
211        let writer = Arc::clone(&writer);
212        async move {
213            if let Some(reason) = write_as_chunks(id, &stream, writer).await {
214                stream.close(format!("Stream terminated ({reason})"));
215            } else {
216                stream.close(format!("Stream terminated"));
217            }
218        }
219    });
220
221    let handle_read = pin!(handle_read);
222    let handle_write = pin!(handle_write);
223
224    match futures::future::select(handle_read, handle_write).await {
225        Either::Left((res, _)) => res,
226        Either::Right(((), handle_read)) => handle_read.await,
227    }
228}
229
230/// Handles one chunk of data from the incoming stream.
231///
232/// The incoming stream is an interleaving of several byte streams. They are interleaved by
233/// splitting them into chunks and attaching a header to each. This function assumes `buf` has been
234/// filled with data from the incoming stream, and tries to parse the header for the first chunk and
235/// process the data within.
236///
237/// If the buffer isn't long enough to contain an entire chunk, this will return `BufferTooShort`,
238/// otherwise it will return the size of the chunk processed in the first element of the returned
239/// tuple.
240///
241/// Once a chunk is parsed, the table of individual streams given with the `streams` argument will
242/// be used to route the incoming data. The second element of the returned tuple will be `Some` if
243/// and only if the chunk indicates a new stream is being started, in which case it will contain the
244/// ID of the new stream, and the data portion of the chunk.
245fn handle_one_chunk<'a>(
246    streams: &mut HashMap<u32, StreamStatus>,
247    is_server: bool,
248    buf: &'a [u8],
249    remote_name: &str,
250) -> Result<(usize, Option<(u32, &'a [u8])>)> {
251    if buf.len() < 6 {
252        return Err(Error::BufferTooShort(6));
253    }
254
255    let id = u32::from_le_bytes(buf[..4].try_into().unwrap());
256    let length = u16::from_le_bytes(buf[4..6].try_into().unwrap());
257
258    let length = length as usize;
259    let chunk_length = length + 6;
260    let buf = &buf[6..];
261
262    if length == 0 {
263        if buf.len() < 2 {
264            return Err(Error::BufferTooShort(8));
265        }
266        let length = u16::from_le_bytes(buf[..2].try_into().unwrap());
267        let length = length as usize;
268
269        if buf.len() < length + 2 {
270            return Err(Error::BufferTooShort(8 + length));
271        }
272
273        let epitaph =
274            if length > 0 { Some(String::from_utf8_lossy(&buf[2..][..length])) } else { None };
275
276        if let Some(old) = streams.insert(id, StreamStatus::Closed) {
277            if let StreamStatus::Open(old) = old {
278                if let Some(epitaph) = epitaph {
279                    old.close(format!("{remote_name} reported: {epitaph}"));
280                } else {
281                    old.close(format!("{remote_name} reported no epitaph"));
282                }
283            }
284            Ok((length + 8, None))
285        } else {
286            Err(Error::BadStreamId)
287        }
288    } else if buf.len() < length {
289        Err(Error::BufferTooShort(chunk_length))
290    } else if let Some(stream) = streams.get(&id) {
291        match stream {
292            StreamStatus::Open(stream) => {
293                if stream
294                    .write(length, |out| {
295                        out[..length].copy_from_slice(&buf[..length]);
296                        Ok(length)
297                    })
298                    .is_err()
299                {
300                    // The user isn't listening for incoming data anymore. Don't
301                    // treat that as a fatal error, and don't send a hangup in case
302                    // the user is still sending data. Just quietly ignore it. If
303                    // the user hangs up the other side of the connection that's
304                    // when we can complain.
305                    let _ = streams.insert(id, StreamStatus::ReadClosed);
306                }
307                Ok((chunk_length, None))
308            }
309            StreamStatus::ReadClosed => Ok((chunk_length, None)),
310            StreamStatus::Closed => Err(Error::BadStreamId),
311        }
312    } else if (id & 1 == 0) == is_server {
313        Ok((chunk_length, Some((id, &buf[..length]))))
314    } else {
315        Err(Error::BadStreamId)
316    }
317}
318
319/// Reads data from the given reader, then splits it into chunks of no more than 2^16 - 1 bytes,
320/// attaches a header to each chunk containing the given `id` number and the size of the chunk, then
321/// writes each chunk to the given writer.
322///
323/// The point, of course, is that multiple functions can do this to the same writer, and since the
324/// chunks are labeled, the data can be parsed back out into separate streams on the other end.
325///
326/// If writing stops because the writer is closed, this will return the reported reason for closure.
327async fn write_as_chunks(
328    id: u32,
329    reader: &stream::Reader,
330    writer: Arc<SyncMutex<stream::Writer>>,
331) -> Option<String> {
332    loop {
333        // We want to handle errors with the read and errors with the write differently, so we
334        // return a nested result.
335        //
336        // In short, this will return Ok(Ok(())) if all is well, Ok(Err(...)) if we read data
337        // successfully but then failed to write it, and Err(...) if we failed to read.
338        let got = reader
339            .read(1, |buf| {
340                let mut total_len = 0;
341                while buf.len() > total_len {
342                    let buf = &buf[total_len..];
343                    let buf =
344                        if buf.len() > u16::MAX as usize { &buf[..u16::MAX as usize] } else { buf };
345
346                    let len: u16 = buf
347                        .len()
348                        .try_into()
349                        .expect("We just truncated the length so it would fit!");
350
351                    if let e @ Err(_) = writer.lock().write(6 + buf.len(), |out_buf| {
352                        out_buf[..4].copy_from_slice(&id.to_le_bytes());
353                        let out_buf = &mut out_buf[4..];
354                        out_buf[..2].copy_from_slice(&len.to_le_bytes());
355                        out_buf[2..][..buf.len()].copy_from_slice(buf);
356                        Ok(buf.len() + 6)
357                    }) {
358                        return Ok((e, total_len));
359                    } else {
360                        total_len += buf.len();
361                    }
362                }
363
364                Ok((Ok(()), total_len))
365            })
366            .await;
367
368        match got {
369            Err(Error::ConnectionClosed(epitaph)) => {
370                let epitaph = epitaph.as_ref().map(|x| x.as_bytes()).unwrap_or(b"");
371                let length_u16: u16 = epitaph.len().try_into().unwrap_or(u16::MAX);
372                let length = length_u16 as usize;
373
374                // If the stream was closed, send a frame indicating such.
375                let write_result = writer.lock().write(8 + length, |out_buf| {
376                    out_buf[..4].copy_from_slice(&id.to_le_bytes());
377                    let out_buf = &mut out_buf[4..];
378                    out_buf[..2].copy_from_slice(&0u16.to_le_bytes());
379                    let out_buf = &mut out_buf[2..];
380                    out_buf[..2].copy_from_slice(&length_u16.to_le_bytes());
381                    let out_buf = &mut out_buf[2..];
382                    out_buf[..length].copy_from_slice(&epitaph[..length]);
383                    Ok(8 + length)
384                });
385
386                match write_result {
387                    Ok(()) | Err(Error::ConnectionClosed(None)) => break None,
388                    Err(Error::ConnectionClosed(Some(s))) => break Some(format!("write: {s}")),
389                    other => unreachable!("Unexpected write error: {other:?}"),
390                }
391            }
392            Ok(Ok(())) => (),
393            Ok(Err(Error::ConnectionClosed(None))) => break None,
394            Ok(Err(Error::ConnectionClosed(Some(s)))) => break Some(format!("read: {s}")),
395            Ok(other) => unreachable!("Unexpected write error: {other:?}"),
396            other => unreachable!("Unexpected read error: {other:?}"),
397        }
398    }
399}
400
401/// Creates a new connection to a circuit node, and merges all streams produced and consumed by that
402/// connection into a multi-stream. In this way you can service a connection between nodes with a
403/// single stream of bytes.
404///
405/// The `is_server` boolean should be `true` at one end of the connection and `false` at the other.
406/// Usually it will be `true` for the node receiving a connection and `false` for the node
407/// initiating one.
408///
409/// Traffic will be written to, and read from, the given `reader` and `writer`.
410///
411/// The `quality` will be used to make routing decisions when establishing streams across multiple
412/// nodes.
413pub fn multi_stream_node_connection(
414    node: &Node,
415    reader: stream::Reader,
416    writer: stream::Writer,
417    is_server: bool,
418    quality: Quality,
419    stream_errors_out: UnboundedSender<Error>,
420    remote_name: String,
421) -> impl Future<Output = Result<()>> + Send + use<> {
422    let (mut new_stream_sender, streams_in) = channel(1);
423    let (streams_out, new_stream_receiver) = channel(1);
424
425    let control_stream = if is_server {
426        let (control_reader, control_writer_remote) = stream::stream();
427        let (control_reader_remote, control_writer) = stream::stream();
428
429        new_stream_sender
430            .try_send((control_reader_remote, control_writer_remote))
431            .expect("We just created this channel!");
432        Some((control_reader, control_writer))
433    } else {
434        None
435    };
436
437    let stream_fut = multi_stream(
438        reader,
439        writer,
440        is_server,
441        streams_out,
442        streams_in,
443        stream_errors_out,
444        remote_name,
445    );
446    let node_fut = node.link_node(control_stream, new_stream_sender, new_stream_receiver, quality);
447
448    async move {
449        let node_fut = pin!(node_fut);
450        let stream_fut = pin!(stream_fut);
451
452        // If either the node connection or the multi stream dies we assume the other will also die
453        // shortly after, so we always await both futures to completion.
454        match futures::future::select(node_fut, stream_fut).await {
455            Either::Left((res, stream_fut)) => res.and(stream_fut.await),
456            Either::Right((res, node_fut)) => res.and(node_fut.await),
457        }
458    }
459}
460
461/// Same as `multi_stream_node_connection` but reads and writes to and from implementors of the
462/// standard `AsyncRead` and `AsyncWrite` traits rather than circuit streams.
463pub async fn multi_stream_node_connection_to_async(
464    node: &Node,
465    rx: &mut (dyn AsyncRead + Unpin + Send),
466    tx: &mut (dyn AsyncWrite + Unpin + Send),
467    is_server: bool,
468    quality: Quality,
469    stream_errors_out: UnboundedSender<Error>,
470    remote_name: String,
471) -> Result<()> {
472    let (reader, remote_writer) = stream::stream();
473    let (remote_reader, writer) = stream::stream();
474    let conn_fut = multi_stream_node_connection(
475        node,
476        remote_reader,
477        remote_writer,
478        is_server,
479        quality,
480        stream_errors_out,
481        remote_name.clone(),
482    );
483    let remote_name = &remote_name;
484
485    let read_fut = async move {
486        let mut buf = [0u8; 4096];
487        loop {
488            let n = match rx.read(&mut buf).await {
489                Ok(0) => {
490                    writer
491                        .close(format!("connection closed (either by transport or {remote_name})"));
492                    break Ok(());
493                }
494                Ok(n) => n,
495                Err(e) => {
496                    writer.close(format!("{remote_name} connection failed (read): {e:?}"));
497                    return Err(Error::from(e));
498                }
499            };
500            writer.write(n, |write_buf| {
501                write_buf[..n].copy_from_slice(&buf[..n]);
502                Ok(n)
503            })?
504        }
505    };
506
507    let write_fut = async move {
508        let mut tx = std::pin::pin!(tx);
509        loop {
510            let res = futures::future::poll_fn(|ctx| {
511                reader.poll_read(ctx, &mut 1, |ctx, read_buf| {
512                    let res = match std::task::ready!(tx.as_mut().poll_write(ctx, read_buf)) {
513                        Ok(x) => Ok(((), x)),
514                        Err(e) => Err(Error::IO(e)),
515                    };
516                    std::task::Poll::Ready(res)
517                })
518            })
519            .await;
520
521            if let Err(Error::IO(e)) = &res {
522                reader.close(format!("{remote_name} connection failed (write): {e:?}"));
523                return res;
524            }
525
526            res?;
527        }
528    };
529
530    let read_write = futures::future::try_join(read_fut, write_fut);
531
532    let conn_fut = pin!(conn_fut);
533
534    let cleanup = {
535        // We must pin read_write to this scope so the streams are dropped when
536        // we leave this scope and conn_fut can run to completion. If it's
537        // pinned to the stack alongside conn_fut then the streams aren't
538        // dropped and conn_fut doesn't observe the closed status.
539        let read_write = pin!(read_write);
540
541        match futures::future::select(conn_fut, read_write).await {
542            Either::Left((res, _)) => {
543                return res;
544            }
545            Either::Right((read_write_result, conn_fut)) => {
546                conn_fut.map(move |conn_fut_result| match (conn_fut_result, read_write_result) {
547                    (Ok(()), Ok(((), ()))) => Ok(()),
548                    // Report back any errors, preferring the one from the
549                    // connection future.
550                    (Ok(()), Err(e)) | (Err(e), _) => Err(e),
551                })
552            }
553        }
554    };
555
556    // If the read/write future finished first, we need to wait for the
557    // connection future to run to completion to ensure cleanup happens.
558    cleanup.await
559}
560
561#[cfg(test)]
562mod test {
563    use super::*;
564    use fuchsia_async as fasync;
565    use futures::channel::mpsc::unbounded;
566
567    #[fuchsia::test]
568    async fn one_stream() {
569        let (a_reader, b_writer) = stream::stream();
570        let (b_reader, a_writer) = stream::stream();
571        let (mut create_stream_a, a_streams_in) = channel(1);
572        let (_create_stream_b, b_streams_in) = channel(1);
573        let (a_streams_out, _get_stream_a) = channel(100);
574        let (b_streams_out, mut get_stream_b) = channel(1);
575        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
576        // observe them in a test.
577        let (errors_sink_a, _black_hole) = unbounded();
578        let errors_sink_b = errors_sink_a.clone();
579
580        let _a = fasync::Task::spawn(async move {
581            assert!(matches!(
582                multi_stream(
583                    a_reader,
584                    a_writer,
585                    true,
586                    a_streams_out,
587                    a_streams_in,
588                    errors_sink_a,
589                    "b".to_owned()
590                )
591                .await,
592                Ok(()) | Err(Error::ConnectionClosed(None))
593            ))
594        });
595        let _b = fasync::Task::spawn(async move {
596            assert!(matches!(
597                multi_stream(
598                    b_reader,
599                    b_writer,
600                    false,
601                    b_streams_out,
602                    b_streams_in,
603                    errors_sink_b,
604                    "a".to_owned()
605                )
606                .await,
607                Ok(()) | Err(Error::ConnectionClosed(None))
608            ))
609        });
610
611        let (ab_reader_a, ab_reader_write) = stream::stream();
612        let (ab_writer_read, ab_writer_a) = stream::stream();
613
614        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
615
616        ab_writer_a
617            .write(8, |buf| {
618                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
619                Ok(8)
620            })
621            .unwrap();
622
623        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
624        err.send(Ok(())).unwrap();
625
626        ab_writer_b
627            .write(8, |buf| {
628                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
629                Ok(8)
630            })
631            .unwrap();
632
633        ab_reader_b
634            .read(8, |buf| {
635                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
636                Ok(((), 8))
637            })
638            .await
639            .unwrap();
640        ab_reader_a
641            .read(8, |buf| {
642                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
643                Ok(((), 8))
644            })
645            .await
646            .unwrap();
647
648        std::mem::drop(ab_writer_b);
649        assert!(matches!(
650            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
651            Err(Error::ConnectionClosed(_))
652        ));
653
654        std::mem::drop(ab_reader_b);
655        ab_writer_a
656            .write(8, |buf| {
657                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
658                Ok(8)
659            })
660            .unwrap();
661    }
662
663    #[fuchsia::test]
664    async fn fallible_stream() {
665        let (a_reader, b_writer) = stream::stream();
666        let (b_reader, a_writer) = stream::stream();
667        let (mut create_stream_a, a_streams_in) = channel(1);
668        let (_create_stream_b, b_streams_in) = channel(1);
669        let (a_streams_out, _get_stream_a) = channel(100);
670        let (b_streams_out, mut get_stream_b) = channel(1);
671        let (errors_sink_a, _black_hole) = unbounded();
672        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
673        // observe them in a test.
674        let (errors_sink_b, mut b_errors) = unbounded();
675
676        let _a = fasync::Task::spawn(async move {
677            assert!(matches!(
678                multi_stream(
679                    a_reader,
680                    a_writer,
681                    true,
682                    a_streams_out,
683                    a_streams_in,
684                    errors_sink_a,
685                    "b".to_owned()
686                )
687                .await,
688                Ok(()) | Err(Error::ConnectionClosed(None))
689            ))
690        });
691        let _b = fasync::Task::spawn(async move {
692            assert!(matches!(
693                multi_stream(
694                    b_reader,
695                    b_writer,
696                    false,
697                    b_streams_out,
698                    b_streams_in,
699                    errors_sink_b,
700                    "a".to_owned()
701                )
702                .await,
703                Ok(()) | Err(Error::ConnectionClosed(None))
704            ))
705        });
706
707        // The first stream fails to be created.
708        let (fail_reader, fail_reader_write) = stream::stream();
709        let (_ignore, fail_writer) = stream::stream();
710        create_stream_a.send((fail_reader, fail_writer)).await.unwrap();
711
712        // There's a laziness to stream creation in the protocol so we need to send a little data to
713        // actually create the stream.
714        fail_reader_write
715            .write(8, |buf| {
716                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
717                Ok(8)
718            })
719            .unwrap();
720
721        let (_, _, err) = get_stream_b.next().await.unwrap();
722        err.send(Err(Error::ConnectionClosed(Some("Testing".to_owned())))).unwrap();
723
724        loop {
725            if let Some(Error::ConnectionClosed(Some(s))) = b_errors.next().await {
726                if s == "Testing" {
727                    break;
728                }
729            } else {
730                panic!("Error stream closed without reporting our error.");
731            }
732        }
733
734        let (ab_reader_a, ab_reader_write) = stream::stream();
735        let (ab_writer_read, ab_writer_a) = stream::stream();
736
737        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
738
739        ab_writer_a
740            .write(8, |buf| {
741                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
742                Ok(8)
743            })
744            .unwrap();
745
746        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
747        err.send(Ok(())).unwrap();
748
749        ab_writer_b
750            .write(8, |buf| {
751                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
752                Ok(8)
753            })
754            .unwrap();
755
756        ab_reader_b
757            .read(8, |buf| {
758                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
759                Ok(((), 8))
760            })
761            .await
762            .unwrap();
763        ab_reader_a
764            .read(8, |buf| {
765                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
766                Ok(((), 8))
767            })
768            .await
769            .unwrap();
770
771        std::mem::drop(ab_writer_b);
772        assert!(matches!(
773            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
774            Err(Error::ConnectionClosed(_))
775        ));
776
777        std::mem::drop(ab_reader_b);
778        ab_writer_a
779            .write(8, |buf| {
780                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
781                Ok(8)
782            })
783            .unwrap();
784    }
785
786    #[fuchsia::test]
787    async fn two_streams() {
788        let (a_reader, b_writer) = stream::stream();
789        let (b_reader, a_writer) = stream::stream();
790        let (mut create_stream_a, a_streams_in) = channel(1);
791        let (mut create_stream_b, b_streams_in) = channel(1);
792        let (a_streams_out, mut get_stream_a) = channel(1);
793        let (b_streams_out, mut get_stream_b) = channel(1);
794        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
795        // observe them in a test.
796        let (errors_sink, _black_hole) = unbounded();
797
798        let _a = fasync::Task::spawn(multi_stream(
799            a_reader,
800            a_writer,
801            true,
802            a_streams_out,
803            a_streams_in,
804            errors_sink.clone(),
805            "b".to_owned(),
806        ));
807        let _b = fasync::Task::spawn(multi_stream(
808            b_reader,
809            b_writer,
810            false,
811            b_streams_out,
812            b_streams_in,
813            errors_sink.clone(),
814            "a".to_owned(),
815        ));
816
817        let (ab_reader_a, ab_reader_write) = stream::stream();
818        let (ab_writer_read, ab_writer_a) = stream::stream();
819        let (ba_reader_b, ba_reader_write) = stream::stream();
820        let (ba_writer_read, ba_writer_b) = stream::stream();
821
822        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
823        create_stream_b.send((ba_writer_read, ba_reader_write)).await.unwrap();
824
825        ab_writer_a
826            .write(8, |buf| {
827                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
828                Ok(8)
829            })
830            .unwrap();
831        ba_writer_b
832            .write(8, |buf| {
833                buf[..8].copy_from_slice(&[25, 26, 27, 28, 29, 30, 31, 32]);
834                Ok(8)
835            })
836            .unwrap();
837
838        let (ab_reader_b, ab_writer_b, err_ab) = get_stream_b.next().await.unwrap();
839        let (ba_reader_a, ba_writer_a, err_ba) = get_stream_a.next().await.unwrap();
840        err_ab.send(Ok(())).unwrap();
841        err_ba.send(Ok(())).unwrap();
842
843        ab_writer_b
844            .write(8, |buf| {
845                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
846                Ok(8)
847            })
848            .unwrap();
849        ba_writer_a
850            .write(8, |buf| {
851                buf[..8].copy_from_slice(&[17, 18, 19, 20, 21, 22, 23, 24]);
852                Ok(8)
853            })
854            .unwrap();
855
856        ab_reader_b
857            .read(8, |buf| {
858                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
859                Ok(((), 8))
860            })
861            .await
862            .unwrap();
863        ab_reader_a
864            .read(8, |buf| {
865                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
866                Ok(((), 8))
867            })
868            .await
869            .unwrap();
870        ba_reader_b
871            .read(8, |buf| {
872                assert_eq!(&buf[..8], &[17, 18, 19, 20, 21, 22, 23, 24]);
873                Ok(((), 8))
874            })
875            .await
876            .unwrap();
877        ba_reader_a
878            .read(8, |buf| {
879                assert_eq!(&buf[..8], &[25, 26, 27, 28, 29, 30, 31, 32]);
880                Ok(((), 8))
881            })
882            .await
883            .unwrap();
884    }
885
886    #[fuchsia::test]
887    async fn node_connect() {
888        let (new_peer_sender_a, mut new_peers) = channel(1);
889        let (new_peer_sender_b, _new_peers_b) = channel(100);
890        let (incoming_streams_sender_a, _streams_a) = channel(100);
891        let (incoming_streams_sender_b, mut streams) = channel(1);
892        let a = Node::new("a", "test", new_peer_sender_a, incoming_streams_sender_a).unwrap();
893        let b = Node::new("b", "test", new_peer_sender_b, incoming_streams_sender_b).unwrap();
894        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
895        // observe them in a test.
896        let (errors_sink, _black_hole) = unbounded();
897
898        let (ab_reader, ab_writer) = stream::stream();
899        let (ba_reader, ba_writer) = stream::stream();
900
901        let _a_conn = fasync::Task::spawn(multi_stream_node_connection(
902            &a,
903            ba_reader,
904            ab_writer,
905            true,
906            Quality::IN_PROCESS,
907            errors_sink.clone(),
908            "b".to_owned(),
909        ));
910        let _b_conn = fasync::Task::spawn(multi_stream_node_connection(
911            &b,
912            ab_reader,
913            ba_writer,
914            false,
915            Quality::IN_PROCESS,
916            errors_sink.clone(),
917            "a".to_owned(),
918        ));
919
920        let new_peer = new_peers.next().await.unwrap();
921        assert_eq!("b", &new_peer);
922
923        let (_reader, peer_writer) = stream::stream();
924        let (peer_reader, writer) = stream::stream();
925        a.connect_to_peer(peer_reader, peer_writer, "b").await.unwrap();
926
927        writer
928            .write(8, |buf| {
929                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
930                Ok(8)
931            })
932            .unwrap();
933
934        let (reader, _writer, from) = streams.next().await.unwrap();
935        assert_eq!("a", &from);
936
937        reader
938            .read(8, |buf| {
939                assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], &buf);
940                Ok(((), 8))
941            })
942            .await
943            .unwrap();
944    }
945}