circuit/
multi_stream.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::error::{Error, Result};
6use crate::{stream, Node, Quality};
7
8use futures::channel::mpsc::{channel, Receiver, Sender, UnboundedSender};
9use futures::channel::oneshot;
10use futures::future::Either;
11use futures::prelude::*;
12use futures::StreamExt;
13use std::collections::HashMap;
14use std::pin::pin;
15use std::sync::{Arc, Mutex as SyncMutex};
16
17/// Status of an individual stream.
18enum StreamStatus {
19    /// Stream is open to traffic. Incoming traffic should be delivered to the given writer.
20    Open(stream::Writer),
21    /// Stream hasn't been officially closed but the user has stopped listening for incoming data.
22    /// The outgoing side of the stream might still be running.
23    ReadClosed,
24    /// Stream is closed. We never reuse stream IDs so a closed stream should never reopen.
25    Closed,
26}
27
28/// Multiplexes multiple streams into one.
29///
30/// While running, this function serves a protocol on the `reader` and `writer` that multiplexes
31/// multiple streams into a single stream.
32///
33/// The protocol is a series of variable length frames each consisting of:
34/// * 4 bytes - A stream ID in little-endian.
35/// * 2 bytes - a LENGTH in little-endian.
36/// * LENGTH bytes - data.
37///
38/// To start a new stream, an endpoint simply begins sending frames with that stream ID. To close a
39/// stream, an endpoint can send a zero-length frame. Once a stream is closed that stream ID should
40/// never be used again.
41///
42/// We distinguish the two endpoints of the protocol as a client and a server. The only difference
43/// between the two is in which stream IDs they may initiate; Any new streams started by the client
44/// should have odd-numbered IDs, and any new streams initiated by the server should have
45/// even-numbered IDs. The `is_server` argument controls this behavior.
46///
47/// Any new streams initiated by the other end of the connection will be returned to us via the
48/// `streams_out` channel, and we may initiate a stream by sending a reader and writer to the
49/// `streams_in` channel.
50pub async fn multi_stream(
51    reader: stream::Reader,
52    writer: stream::Writer,
53    is_server: bool,
54    streams_out: Sender<(stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>,
55    streams_in: Receiver<(stream::Reader, stream::Writer)>,
56    stream_errors_out: UnboundedSender<Error>,
57    remote_name: String,
58) -> Result<()> {
59    let (new_readers_sender, new_readers) = channel(1);
60    let (stream_errors_sender, stream_errors) = channel(1);
61    let first_stream_id = if is_server { 1 } else { 0 };
62    let mut stream_ids = (first_stream_id..).step_by(2);
63    let writer = Arc::new(SyncMutex::new(writer));
64    let writer_for_reader = Arc::clone(&writer);
65
66    let handle_read = async move {
67        let writer = writer_for_reader;
68        let streams = SyncMutex::new(HashMap::<u32, StreamStatus>::new());
69
70        // Creates a stream (that's Stream in the rust async sense, not in the sense of our
71        // protocol) which when polled will read some data from the reader and attempt to
72        // demultiplex it.
73        //
74        // Each time we call `.next()` on this stream one read operation is performed, and the
75        // stream yields a `Result` indicating if an error occurred. Therefore, to pump read-side
76        // IO, we simply need to pull values from this stream until one of them is an error.
77        let read_result_stream = futures::stream::unfold((), |_| async {
78            let got = reader
79                .read(6, |buf| {
80                    let mut streams = streams.lock().unwrap();
81                    let (size, new_stream) =
82                        handle_one_chunk(&mut *streams, is_server, buf, &remote_name)?;
83
84                    Ok((
85                        new_stream.map(|(id, first_chunk_data)| {
86                            let (reader, remote_writer) = stream::stream();
87                            remote_writer
88                                .write(first_chunk_data.len(), |out| {
89                                    out[..first_chunk_data.len()]
90                                        .copy_from_slice(&first_chunk_data);
91                                    Ok(first_chunk_data.len())
92                                })
93                                .expect("We just created this stream!");
94                            streams.insert(id, StreamStatus::Open(remote_writer));
95                            (id, reader)
96                        }),
97                        size,
98                    ))
99                })
100                .await;
101
102            let got = match got {
103                Ok(Some((id, reader))) => {
104                    // We got a request to initiate a new stream. It's already
105                    // in the streams table, just have to hand the reader over
106                    // to the other task to be dispatched, and the writer out to
107                    // the caller.
108                    let (remote_reader, writer) = stream::stream();
109
110                    if new_readers_sender.clone().send((id, remote_reader)).await.is_err() {
111                        Err(Error::ConnectionClosed(Some(
112                            "New stream reader handler disappeared".to_owned(),
113                        )))
114                    } else {
115                        let (sender, receiver) = oneshot::channel();
116                        if stream_errors_sender.clone().send(receiver).await.is_ok() {
117                            streams_out.clone().send((reader, writer, sender)).await.map_err(|_| {
118                                Error::ConnectionClosed(Some(
119                                    "New stream handler disappeared".to_owned(),
120                                ))
121                            })
122                        } else {
123                            Err(Error::ConnectionClosed(Some(
124                                "Error reporting channel closed".to_owned(),
125                            )))
126                        }
127                    }
128                }
129                Ok(None) => Ok(()),
130                Err(e) => Err(e),
131            };
132
133            Some((got, ()))
134        });
135
136        // Send stream errors to our error output. This functions as a stream that never returns anything
137        // so we can wrap it in a select with read_result_stream and thereby continuously poll it as
138        // we handle errors.
139        let stream_errors = stream_errors
140            .map(move |x| {
141                let stream_errors_out = stream_errors_out.clone();
142                async move {
143                    if let Err(x) = x.await.unwrap_or_else(|_| {
144                        Err(Error::ConnectionClosed(Some(
145                            "Stream handler hung up without returning a status".to_owned(),
146                        )))
147                    }) {
148                        let _ = stream_errors_out.unbounded_send(x);
149                    }
150                }
151            })
152            // Buffer unordered all the stream errors otherwise we'll block the
153            // reader task.
154            .buffer_unordered(usize::MAX)
155            // Never yield anything and match the type of read_result_stream.
156            .filter_map(|()| futures::future::ready(None));
157
158        let read_result_stream = futures::stream::select(stream_errors, read_result_stream);
159
160        // The `futures::stream::select` function requires both of the streams you give it to be the
161        // same type and effectively interleaves the output of the two streams. We want to poll two
162        // streams at once but handle their output differently, so we wrap them in an `Either`.
163        let read_result_stream = read_result_stream.map(Either::Left);
164
165        // `streams_in` will yield any new streams the user wants to create. If the user hangs up
166        // that channel, we don't need to react, as the existing streams may still be in use. So we
167        // make streams_in poll forever once the user stops supplying new streams.
168        let streams_in = streams_in.chain(futures::stream::pending()).map(Either::Right);
169
170        let events = futures::stream::select(read_result_stream, streams_in);
171        let mut events = pin!(events);
172
173        let mut ret = Ok(());
174        while let Some(event) = events.next().await {
175            match event {
176                Either::Left(Ok(())) => (),
177                Either::Left(other) => {
178                    ret = other;
179                    break;
180                }
181                Either::Right((reader, writer)) => {
182                    let id = stream_ids.next().expect("This iterator should be infinite!");
183                    if new_readers_sender.clone().send((id, reader)).await.is_err() {
184                        break;
185                    }
186                    streams.lock().unwrap().insert(id, StreamStatus::Open(writer));
187                }
188            }
189        }
190
191        if matches!(ret, Err(Error::ConnectionClosed(None))) {
192            let writer = writer.lock().unwrap();
193            if writer.is_closed() {
194                ret = Err(Error::ConnectionClosed(writer.closed_reason()))
195            }
196        }
197
198        let mut streams = streams.lock().unwrap();
199
200        for (_, stream) in streams.drain() {
201            if let StreamStatus::Open(writer) = stream {
202                writer.close(format!("Multi-stream terminated: {ret:?}"));
203            }
204        }
205
206        ret
207    };
208
209    let handle_write = new_readers.for_each_concurrent(None, move |(id, stream)| {
210        let writer = Arc::clone(&writer);
211        async move {
212            if let Some(reason) = write_as_chunks(id, &stream, writer).await {
213                stream.close(format!("Stream terminated ({reason})"));
214            } else {
215                stream.close(format!("Stream terminated"));
216            }
217        }
218    });
219
220    let handle_read = pin!(handle_read);
221    let handle_write = pin!(handle_write);
222
223    match futures::future::select(handle_read, handle_write).await {
224        Either::Left((res, _)) => res,
225        Either::Right(((), handle_read)) => handle_read.await,
226    }
227}
228
229/// Handles one chunk of data from the incoming stream.
230///
231/// The incoming stream is an interleaving of several byte streams. They are interleaved by
232/// splitting them into chunks and attaching a header to each. This function assumes `buf` has been
233/// filled with data from the incoming stream, and tries to parse the header for the first chunk and
234/// process the data within.
235///
236/// If the buffer isn't long enough to contain an entire chunk, this will return `BufferTooShort`,
237/// otherwise it will return the size of the chunk processed in the first element of the returned
238/// tuple.
239///
240/// Once a chunk is parsed, the table of individual streams given with the `streams` argument will
241/// be used to route the incoming data. The second element of the returned tuple will be `Some` if
242/// and only if the chunk indicates a new stream is being started, in which case it will contain the
243/// ID of the new stream, and the data portion of the chunk.
244fn handle_one_chunk<'a>(
245    streams: &mut HashMap<u32, StreamStatus>,
246    is_server: bool,
247    buf: &'a [u8],
248    remote_name: &str,
249) -> Result<(usize, Option<(u32, &'a [u8])>)> {
250    if buf.len() < 6 {
251        return Err(Error::BufferTooShort(6));
252    }
253
254    let id = u32::from_le_bytes(buf[..4].try_into().unwrap());
255    let length = u16::from_le_bytes(buf[4..6].try_into().unwrap());
256
257    let length = length as usize;
258    let chunk_length = length + 6;
259    let buf = &buf[6..];
260
261    if length == 0 {
262        if buf.len() < 2 {
263            return Err(Error::BufferTooShort(8));
264        }
265        let length = u16::from_le_bytes(buf[..2].try_into().unwrap());
266        let length = length as usize;
267
268        if buf.len() < length + 2 {
269            return Err(Error::BufferTooShort(8 + length));
270        }
271
272        let epitaph =
273            if length > 0 { Some(String::from_utf8_lossy(&buf[2..][..length])) } else { None };
274
275        if let Some(old) = streams.insert(id, StreamStatus::Closed) {
276            if let StreamStatus::Open(old) = old {
277                if let Some(epitaph) = epitaph {
278                    old.close(format!("{remote_name} reported: {epitaph}"));
279                } else {
280                    old.close(format!("{remote_name} reported no epitaph"));
281                }
282            }
283            Ok((length + 8, None))
284        } else {
285            Err(Error::BadStreamId)
286        }
287    } else if buf.len() < length {
288        Err(Error::BufferTooShort(chunk_length))
289    } else if let Some(stream) = streams.get(&id) {
290        match stream {
291            StreamStatus::Open(stream) => {
292                if stream
293                    .write(length, |out| {
294                        out[..length].copy_from_slice(&buf[..length]);
295                        Ok(length)
296                    })
297                    .is_err()
298                {
299                    // The user isn't listening for incoming data anymore. Don't
300                    // treat that as a fatal error, and don't send a hangup in case
301                    // the user is still sending data. Just quietly ignore it. If
302                    // the user hangs up the other side of the connection that's
303                    // when we can complain.
304                    let _ = streams.insert(id, StreamStatus::ReadClosed);
305                }
306                Ok((chunk_length, None))
307            }
308            StreamStatus::ReadClosed => Ok((chunk_length, None)),
309            StreamStatus::Closed => Err(Error::BadStreamId),
310        }
311    } else if (id & 1 == 0) == is_server {
312        Ok((chunk_length, Some((id, &buf[..length]))))
313    } else {
314        Err(Error::BadStreamId)
315    }
316}
317
318/// Reads data from the given reader, then splits it into chunks of no more than 2^16 - 1 bytes,
319/// attaches a header to each chunk containing the given `id` number and the size of the chunk, then
320/// writes each chunk to the given writer.
321///
322/// The point, of course, is that multiple functions can do this to the same writer, and since the
323/// chunks are labeled, the data can be parsed back out into separate streams on the other end.
324///
325/// If writing stops because the writer is closed, this will return the reported reason for closure.
326async fn write_as_chunks(
327    id: u32,
328    reader: &stream::Reader,
329    writer: Arc<SyncMutex<stream::Writer>>,
330) -> Option<String> {
331    loop {
332        // We want to handle errors with the read and errors with the write differently, so we
333        // return a nested result.
334        //
335        // In short, this will return Ok(Ok(())) if all is well, Ok(Err(...)) if we read data
336        // successfully but then failed to write it, and Err(...) if we failed to read.
337        let got = reader
338            .read(1, |buf| {
339                let mut total_len = 0;
340                while buf.len() > total_len {
341                    let buf = &buf[total_len..];
342                    let buf =
343                        if buf.len() > u16::MAX as usize { &buf[..u16::MAX as usize] } else { buf };
344
345                    let len: u16 = buf
346                        .len()
347                        .try_into()
348                        .expect("We just truncated the length so it would fit!");
349
350                    if let e @ Err(_) = writer.lock().unwrap().write(6 + buf.len(), |out_buf| {
351                        out_buf[..4].copy_from_slice(&id.to_le_bytes());
352                        let out_buf = &mut out_buf[4..];
353                        out_buf[..2].copy_from_slice(&len.to_le_bytes());
354                        out_buf[2..][..buf.len()].copy_from_slice(buf);
355                        Ok(buf.len() + 6)
356                    }) {
357                        return Ok((e, total_len));
358                    } else {
359                        total_len += buf.len();
360                    }
361                }
362
363                Ok((Ok(()), total_len))
364            })
365            .await;
366
367        match got {
368            Err(Error::ConnectionClosed(epitaph)) => {
369                let epitaph = epitaph.as_ref().map(|x| x.as_bytes()).unwrap_or(b"");
370                let length_u16: u16 = epitaph.len().try_into().unwrap_or(u16::MAX);
371                let length = length_u16 as usize;
372
373                // If the stream was closed, send a frame indicating such.
374                let write_result = writer.lock().unwrap().write(8 + length, |out_buf| {
375                    out_buf[..4].copy_from_slice(&id.to_le_bytes());
376                    let out_buf = &mut out_buf[4..];
377                    out_buf[..2].copy_from_slice(&0u16.to_le_bytes());
378                    let out_buf = &mut out_buf[2..];
379                    out_buf[..2].copy_from_slice(&length_u16.to_le_bytes());
380                    let out_buf = &mut out_buf[2..];
381                    out_buf[..length].copy_from_slice(&epitaph[..length]);
382                    Ok(8 + length)
383                });
384
385                match write_result {
386                    Ok(()) | Err(Error::ConnectionClosed(None)) => break None,
387                    Err(Error::ConnectionClosed(Some(s))) => break Some(format!("write: {s}")),
388                    other => unreachable!("Unexpected write error: {other:?}"),
389                }
390            }
391            Ok(Ok(())) => (),
392            Ok(Err(Error::ConnectionClosed(None))) => break None,
393            Ok(Err(Error::ConnectionClosed(Some(s)))) => break Some(format!("read: {s}")),
394            Ok(other) => unreachable!("Unexpected write error: {other:?}"),
395            other => unreachable!("Unexpected read error: {other:?}"),
396        }
397    }
398}
399
400/// Creates a new connection to a circuit node, and merges all streams produced and consumed by that
401/// connection into a multi-stream. In this way you can service a connection between nodes with a
402/// single stream of bytes.
403///
404/// The `is_server` boolean should be `true` at one end of the connection and `false` at the other.
405/// Usually it will be `true` for the node receiving a connection and `false` for the node
406/// initiating one.
407///
408/// Traffic will be written to, and read from, the given `reader` and `writer`.
409///
410/// The `quality` will be used to make routing decisions when establishing streams across multiple
411/// nodes.
412pub fn multi_stream_node_connection(
413    node: &Node,
414    reader: stream::Reader,
415    writer: stream::Writer,
416    is_server: bool,
417    quality: Quality,
418    stream_errors_out: UnboundedSender<Error>,
419    remote_name: String,
420) -> impl Future<Output = Result<()>> + Send {
421    let (mut new_stream_sender, streams_in) = channel(1);
422    let (streams_out, new_stream_receiver) = channel(1);
423
424    let control_stream = if is_server {
425        let (control_reader, control_writer_remote) = stream::stream();
426        let (control_reader_remote, control_writer) = stream::stream();
427
428        new_stream_sender
429            .try_send((control_reader_remote, control_writer_remote))
430            .expect("We just created this channel!");
431        Some((control_reader, control_writer))
432    } else {
433        None
434    };
435
436    let stream_fut = multi_stream(
437        reader,
438        writer,
439        is_server,
440        streams_out,
441        streams_in,
442        stream_errors_out,
443        remote_name,
444    );
445    let node_fut = node.link_node(control_stream, new_stream_sender, new_stream_receiver, quality);
446
447    async move {
448        let node_fut = pin!(node_fut);
449        let stream_fut = pin!(stream_fut);
450
451        // If either the node connection or the multi stream dies we assume the other will also die
452        // shortly after, so we always await both futures to completion.
453        match futures::future::select(node_fut, stream_fut).await {
454            Either::Left((res, stream_fut)) => res.and(stream_fut.await),
455            Either::Right((res, node_fut)) => res.and(node_fut.await),
456        }
457    }
458}
459
460/// Same as `multi_stream_node_connection` but reads and writes to and from implementors of the
461/// standard `AsyncRead` and `AsyncWrite` traits rather than circuit streams.
462pub async fn multi_stream_node_connection_to_async(
463    node: &Node,
464    rx: &mut (dyn AsyncRead + Unpin + Send),
465    tx: &mut (dyn AsyncWrite + Unpin + Send),
466    is_server: bool,
467    quality: Quality,
468    stream_errors_out: UnboundedSender<Error>,
469    remote_name: String,
470) -> Result<()> {
471    let (reader, remote_writer) = stream::stream();
472    let (remote_reader, writer) = stream::stream();
473    let conn_fut = multi_stream_node_connection(
474        node,
475        remote_reader,
476        remote_writer,
477        is_server,
478        quality,
479        stream_errors_out,
480        remote_name.clone(),
481    );
482    let remote_name = &remote_name;
483
484    let read_fut = async move {
485        let mut buf = [0u8; 4096];
486        loop {
487            let n = match rx.read(&mut buf).await {
488                Ok(0) => {
489                    writer
490                        .close(format!("connection closed (either by transport or {remote_name})"));
491                    break Ok(());
492                }
493                Ok(n) => n,
494                Err(e) => {
495                    writer.close(format!("{remote_name} connection failed (read): {e:?}"));
496                    return Err(Error::from(e));
497                }
498            };
499            writer.write(n, |write_buf| {
500                write_buf[..n].copy_from_slice(&buf[..n]);
501                Ok(n)
502            })?
503        }
504    };
505
506    let write_fut = async move {
507        loop {
508            let mut buf = [0u8; 4096];
509            let len = reader
510                .read(1, |read_buf| {
511                    let read_buf = &read_buf[..std::cmp::min(buf.len(), read_buf.len())];
512                    buf[..read_buf.len()].copy_from_slice(read_buf);
513                    Ok((read_buf.len(), read_buf.len()))
514                })
515                .await?;
516            let write_res = async {
517                tx.write_all(&buf[..len]).await?;
518                tx.flush().await?;
519                Result::<_, Error>::Ok(())
520            }
521            .await;
522
523            if let Err(e) = write_res {
524                reader.close(format!("{remote_name} connection failed (write): {e:?}"));
525                return Err(e.into());
526            }
527        }
528    };
529
530    let read_write = futures::future::try_join(read_fut, write_fut);
531
532    let conn_fut = pin!(conn_fut);
533
534    let cleanup = {
535        // We must pin read_write to this scope so the streams are dropped when
536        // we leave this scope and conn_fut can run to completion. If it's
537        // pinned to the stack alongside conn_fut then the streams aren't
538        // dropped and conn_fut doesn't observe the closed status.
539        let read_write = pin!(read_write);
540
541        match futures::future::select(conn_fut, read_write).await {
542            Either::Left((res, _)) => {
543                return res;
544            }
545            Either::Right((read_write_result, conn_fut)) => {
546                conn_fut.map(move |conn_fut_result| match (conn_fut_result, read_write_result) {
547                    (Ok(()), Ok(((), ()))) => Ok(()),
548                    // Report back any errors, preferring the one from the
549                    // connection future.
550                    (Ok(()), Err(e)) | (Err(e), _) => Err(e),
551                })
552            }
553        }
554    };
555
556    // If the read/write future finished first, we need to wait for the
557    // connection future to run to completion to ensure cleanup happens.
558    cleanup.await
559}
560
561#[cfg(test)]
562mod test {
563    use super::*;
564    use fuchsia_async as fasync;
565    use futures::channel::mpsc::unbounded;
566
567    #[fuchsia::test]
568    async fn one_stream() {
569        let (a_reader, b_writer) = stream::stream();
570        let (b_reader, a_writer) = stream::stream();
571        let (mut create_stream_a, a_streams_in) = channel(1);
572        let (_create_stream_b, b_streams_in) = channel(1);
573        let (a_streams_out, _get_stream_a) = channel(100);
574        let (b_streams_out, mut get_stream_b) = channel(1);
575        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
576        // observe them in a test.
577        let (errors_sink_a, _black_hole) = unbounded();
578        let errors_sink_b = errors_sink_a.clone();
579
580        let _a = fasync::Task::spawn(async move {
581            assert!(matches!(
582                multi_stream(
583                    a_reader,
584                    a_writer,
585                    true,
586                    a_streams_out,
587                    a_streams_in,
588                    errors_sink_a,
589                    "b".to_owned()
590                )
591                .await,
592                Ok(()) | Err(Error::ConnectionClosed(None))
593            ))
594        });
595        let _b = fasync::Task::spawn(async move {
596            assert!(matches!(
597                multi_stream(
598                    b_reader,
599                    b_writer,
600                    false,
601                    b_streams_out,
602                    b_streams_in,
603                    errors_sink_b,
604                    "a".to_owned()
605                )
606                .await,
607                Ok(()) | Err(Error::ConnectionClosed(None))
608            ))
609        });
610
611        let (ab_reader_a, ab_reader_write) = stream::stream();
612        let (ab_writer_read, ab_writer_a) = stream::stream();
613
614        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
615
616        ab_writer_a
617            .write(8, |buf| {
618                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
619                Ok(8)
620            })
621            .unwrap();
622
623        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
624        err.send(Ok(())).unwrap();
625
626        ab_writer_b
627            .write(8, |buf| {
628                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
629                Ok(8)
630            })
631            .unwrap();
632
633        ab_reader_b
634            .read(8, |buf| {
635                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
636                Ok(((), 8))
637            })
638            .await
639            .unwrap();
640        ab_reader_a
641            .read(8, |buf| {
642                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
643                Ok(((), 8))
644            })
645            .await
646            .unwrap();
647
648        std::mem::drop(ab_writer_b);
649        assert!(matches!(
650            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
651            Err(Error::ConnectionClosed(_))
652        ));
653
654        std::mem::drop(ab_reader_b);
655        ab_writer_a
656            .write(8, |buf| {
657                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
658                Ok(8)
659            })
660            .unwrap();
661    }
662
663    #[fuchsia::test]
664    async fn fallible_stream() {
665        let (a_reader, b_writer) = stream::stream();
666        let (b_reader, a_writer) = stream::stream();
667        let (mut create_stream_a, a_streams_in) = channel(1);
668        let (_create_stream_b, b_streams_in) = channel(1);
669        let (a_streams_out, _get_stream_a) = channel(100);
670        let (b_streams_out, mut get_stream_b) = channel(1);
671        let (errors_sink_a, _black_hole) = unbounded();
672        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
673        // observe them in a test.
674        let (errors_sink_b, mut b_errors) = unbounded();
675
676        let _a = fasync::Task::spawn(async move {
677            assert!(matches!(
678                multi_stream(
679                    a_reader,
680                    a_writer,
681                    true,
682                    a_streams_out,
683                    a_streams_in,
684                    errors_sink_a,
685                    "b".to_owned()
686                )
687                .await,
688                Ok(()) | Err(Error::ConnectionClosed(None))
689            ))
690        });
691        let _b = fasync::Task::spawn(async move {
692            assert!(matches!(
693                multi_stream(
694                    b_reader,
695                    b_writer,
696                    false,
697                    b_streams_out,
698                    b_streams_in,
699                    errors_sink_b,
700                    "a".to_owned()
701                )
702                .await,
703                Ok(()) | Err(Error::ConnectionClosed(None))
704            ))
705        });
706
707        // The first stream fails to be created.
708        let (fail_reader, fail_reader_write) = stream::stream();
709        let (_ignore, fail_writer) = stream::stream();
710        create_stream_a.send((fail_reader, fail_writer)).await.unwrap();
711
712        // There's a laziness to stream creation in the protocol so we need to send a little data to
713        // actually create the stream.
714        fail_reader_write
715            .write(8, |buf| {
716                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
717                Ok(8)
718            })
719            .unwrap();
720
721        let (_, _, err) = get_stream_b.next().await.unwrap();
722        err.send(Err(Error::ConnectionClosed(Some("Testing".to_owned())))).unwrap();
723
724        loop {
725            if let Some(Error::ConnectionClosed(Some(s))) = b_errors.next().await {
726                if s == "Testing" {
727                    break;
728                }
729            } else {
730                panic!("Error stream closed without reporting our error.");
731            }
732        }
733
734        let (ab_reader_a, ab_reader_write) = stream::stream();
735        let (ab_writer_read, ab_writer_a) = stream::stream();
736
737        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
738
739        ab_writer_a
740            .write(8, |buf| {
741                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
742                Ok(8)
743            })
744            .unwrap();
745
746        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
747        err.send(Ok(())).unwrap();
748
749        ab_writer_b
750            .write(8, |buf| {
751                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
752                Ok(8)
753            })
754            .unwrap();
755
756        ab_reader_b
757            .read(8, |buf| {
758                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
759                Ok(((), 8))
760            })
761            .await
762            .unwrap();
763        ab_reader_a
764            .read(8, |buf| {
765                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
766                Ok(((), 8))
767            })
768            .await
769            .unwrap();
770
771        std::mem::drop(ab_writer_b);
772        assert!(matches!(
773            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
774            Err(Error::ConnectionClosed(_))
775        ));
776
777        std::mem::drop(ab_reader_b);
778        ab_writer_a
779            .write(8, |buf| {
780                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
781                Ok(8)
782            })
783            .unwrap();
784    }
785
786    #[fuchsia::test]
787    async fn two_streams() {
788        let (a_reader, b_writer) = stream::stream();
789        let (b_reader, a_writer) = stream::stream();
790        let (mut create_stream_a, a_streams_in) = channel(1);
791        let (mut create_stream_b, b_streams_in) = channel(1);
792        let (a_streams_out, mut get_stream_a) = channel(1);
793        let (b_streams_out, mut get_stream_b) = channel(1);
794        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
795        // observe them in a test.
796        let (errors_sink, _black_hole) = unbounded();
797
798        let _a = fasync::Task::spawn(multi_stream(
799            a_reader,
800            a_writer,
801            true,
802            a_streams_out,
803            a_streams_in,
804            errors_sink.clone(),
805            "b".to_owned(),
806        ));
807        let _b = fasync::Task::spawn(multi_stream(
808            b_reader,
809            b_writer,
810            false,
811            b_streams_out,
812            b_streams_in,
813            errors_sink.clone(),
814            "a".to_owned(),
815        ));
816
817        let (ab_reader_a, ab_reader_write) = stream::stream();
818        let (ab_writer_read, ab_writer_a) = stream::stream();
819        let (ba_reader_b, ba_reader_write) = stream::stream();
820        let (ba_writer_read, ba_writer_b) = stream::stream();
821
822        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
823        create_stream_b.send((ba_writer_read, ba_reader_write)).await.unwrap();
824
825        ab_writer_a
826            .write(8, |buf| {
827                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
828                Ok(8)
829            })
830            .unwrap();
831        ba_writer_b
832            .write(8, |buf| {
833                buf[..8].copy_from_slice(&[25, 26, 27, 28, 29, 30, 31, 32]);
834                Ok(8)
835            })
836            .unwrap();
837
838        let (ab_reader_b, ab_writer_b, err_ab) = get_stream_b.next().await.unwrap();
839        let (ba_reader_a, ba_writer_a, err_ba) = get_stream_a.next().await.unwrap();
840        err_ab.send(Ok(())).unwrap();
841        err_ba.send(Ok(())).unwrap();
842
843        ab_writer_b
844            .write(8, |buf| {
845                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
846                Ok(8)
847            })
848            .unwrap();
849        ba_writer_a
850            .write(8, |buf| {
851                buf[..8].copy_from_slice(&[17, 18, 19, 20, 21, 22, 23, 24]);
852                Ok(8)
853            })
854            .unwrap();
855
856        ab_reader_b
857            .read(8, |buf| {
858                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
859                Ok(((), 8))
860            })
861            .await
862            .unwrap();
863        ab_reader_a
864            .read(8, |buf| {
865                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
866                Ok(((), 8))
867            })
868            .await
869            .unwrap();
870        ba_reader_b
871            .read(8, |buf| {
872                assert_eq!(&buf[..8], &[17, 18, 19, 20, 21, 22, 23, 24]);
873                Ok(((), 8))
874            })
875            .await
876            .unwrap();
877        ba_reader_a
878            .read(8, |buf| {
879                assert_eq!(&buf[..8], &[25, 26, 27, 28, 29, 30, 31, 32]);
880                Ok(((), 8))
881            })
882            .await
883            .unwrap();
884    }
885
886    #[fuchsia::test]
887    async fn node_connect() {
888        let (new_peer_sender_a, mut new_peers) = channel(1);
889        let (new_peer_sender_b, _new_peers_b) = channel(100);
890        let (incoming_streams_sender_a, _streams_a) = channel(100);
891        let (incoming_streams_sender_b, mut streams) = channel(1);
892        let a = Node::new("a", "test", new_peer_sender_a, incoming_streams_sender_a).unwrap();
893        let b = Node::new("b", "test", new_peer_sender_b, incoming_streams_sender_b).unwrap();
894        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
895        // observe them in a test.
896        let (errors_sink, _black_hole) = unbounded();
897
898        let (ab_reader, ab_writer) = stream::stream();
899        let (ba_reader, ba_writer) = stream::stream();
900
901        let _a_conn = fasync::Task::spawn(multi_stream_node_connection(
902            &a,
903            ba_reader,
904            ab_writer,
905            true,
906            Quality::IN_PROCESS,
907            errors_sink.clone(),
908            "b".to_owned(),
909        ));
910        let _b_conn = fasync::Task::spawn(multi_stream_node_connection(
911            &b,
912            ab_reader,
913            ba_writer,
914            false,
915            Quality::IN_PROCESS,
916            errors_sink.clone(),
917            "a".to_owned(),
918        ));
919
920        let new_peer = new_peers.next().await.unwrap();
921        assert_eq!("b", &new_peer);
922
923        let (_reader, peer_writer) = stream::stream();
924        let (peer_reader, writer) = stream::stream();
925        a.connect_to_peer(peer_reader, peer_writer, "b").await.unwrap();
926
927        writer
928            .write(8, |buf| {
929                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
930                Ok(8)
931            })
932            .unwrap();
933
934        let (reader, _writer, from) = streams.next().await.unwrap();
935        assert_eq!("a", &from);
936
937        reader
938            .read(8, |buf| {
939                assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], &buf);
940                Ok(((), 8))
941            })
942            .await
943            .unwrap();
944    }
945}