circuit/
multi_stream.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::error::{Error, Result};
6use crate::{stream, Node, Quality};
7
8use futures::channel::mpsc::{channel, Receiver, Sender, UnboundedSender};
9use futures::channel::oneshot;
10use futures::future::Either;
11use futures::prelude::*;
12use futures::StreamExt;
13use std::collections::HashMap;
14use std::pin::pin;
15use std::sync::{Arc, Mutex as SyncMutex};
16
17/// Status of an individual stream.
18enum StreamStatus {
19    /// Stream is open to traffic. Incoming traffic should be delivered to the given writer.
20    Open(stream::Writer),
21    /// Stream hasn't been officially closed but the user has stopped listening for incoming data.
22    /// The outgoing side of the stream might still be running.
23    ReadClosed,
24    /// Stream is closed. We never reuse stream IDs so a closed stream should never reopen.
25    Closed,
26}
27
28/// Multiplexes multiple streams into one.
29///
30/// While running, this function serves a protocol on the `reader` and `writer` that multiplexes
31/// multiple streams into a single stream.
32///
33/// The protocol is a series of variable length frames each consisting of:
34/// * 4 bytes - A stream ID in little-endian.
35/// * 2 bytes - a LENGTH in little-endian.
36/// * LENGTH bytes - data.
37///
38/// To start a new stream, an endpoint simply begins sending frames with that stream ID. To close a
39/// stream, an endpoint can send a zero-length frame. Once a stream is closed that stream ID should
40/// never be used again.
41///
42/// We distinguish the two endpoints of the protocol as a client and a server. The only difference
43/// between the two is in which stream IDs they may initiate; Any new streams started by the client
44/// should have odd-numbered IDs, and any new streams initiated by the server should have
45/// even-numbered IDs. The `is_server` argument controls this behavior.
46///
47/// Any new streams initiated by the other end of the connection will be returned to us via the
48/// `streams_out` channel, and we may initiate a stream by sending a reader and writer to the
49/// `streams_in` channel.
50pub async fn multi_stream(
51    reader: stream::Reader,
52    writer: stream::Writer,
53    is_server: bool,
54    streams_out: Sender<(stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>,
55    streams_in: Receiver<(stream::Reader, stream::Writer)>,
56    stream_errors_out: UnboundedSender<Error>,
57    remote_name: String,
58) -> Result<()> {
59    let (new_readers_sender, new_readers) = channel(1);
60    let (stream_errors_sender, stream_errors) = channel(1);
61    let first_stream_id = if is_server { 1 } else { 0 };
62    let mut stream_ids = (first_stream_id..).step_by(2);
63    let writer = Arc::new(SyncMutex::new(writer));
64    let writer_for_reader = Arc::clone(&writer);
65
66    let handle_read = async move {
67        let writer = writer_for_reader;
68        let streams = SyncMutex::new(HashMap::<u32, StreamStatus>::new());
69
70        // Creates a stream (that's Stream in the rust async sense, not in the sense of our
71        // protocol) which when polled will read some data from the reader and attempt to
72        // demultiplex it.
73        //
74        // Each time we call `.next()` on this stream one read operation is performed, and the
75        // stream yields a `Result` indicating if an error occurred. Therefore, to pump read-side
76        // IO, we simply need to pull values from this stream until one of them is an error.
77        let read_result_stream = futures::stream::unfold((), |_| async {
78            let got = reader
79                .read(6, |buf| {
80                    let mut streams = streams.lock().unwrap();
81                    let (size, new_stream) =
82                        handle_one_chunk(&mut *streams, is_server, buf, &remote_name)?;
83
84                    Ok((
85                        new_stream.map(|(id, first_chunk_data)| {
86                            let (reader, remote_writer) = stream::stream();
87                            remote_writer
88                                .write(first_chunk_data.len(), |out| {
89                                    out[..first_chunk_data.len()]
90                                        .copy_from_slice(&first_chunk_data);
91                                    Ok(first_chunk_data.len())
92                                })
93                                .expect("We just created this stream!");
94                            streams.insert(id, StreamStatus::Open(remote_writer));
95                            (id, reader)
96                        }),
97                        size,
98                    ))
99                })
100                .await;
101
102            let got = match got {
103                Ok(Some((id, reader))) => {
104                    // We got a request to initiate a new stream. It's already
105                    // in the streams table, just have to hand the reader over
106                    // to the other task to be dispatched, and the writer out to
107                    // the caller.
108                    let (remote_reader, writer) = stream::stream();
109
110                    if new_readers_sender.clone().send((id, remote_reader)).await.is_err() {
111                        Err(Error::ConnectionClosed(Some(
112                            "New stream reader handler disappeared".to_owned(),
113                        )))
114                    } else {
115                        let (sender, receiver) = oneshot::channel();
116                        if stream_errors_sender.clone().send(receiver).await.is_ok() {
117                            streams_out.clone().send((reader, writer, sender)).await.map_err(|_| {
118                                Error::ConnectionClosed(Some(
119                                    "New stream handler disappeared".to_owned(),
120                                ))
121                            })
122                        } else {
123                            Err(Error::ConnectionClosed(Some(
124                                "Error reporting channel closed".to_owned(),
125                            )))
126                        }
127                    }
128                }
129                Ok(None) => Ok(()),
130                Err(e) => Err(e),
131            };
132
133            Some((got, ()))
134        });
135
136        // Send stream errors to our error output. This functions as a stream that never returns anything
137        // so we can wrap it in a select with read_result_stream and thereby continuously poll it as
138        // we handle errors.
139        let stream_errors = stream_errors
140            .map(move |x| {
141                let stream_errors_out = stream_errors_out.clone();
142                async move {
143                    if let Err(x) = x.await.unwrap_or_else(|_| {
144                        Err(Error::ConnectionClosed(Some(
145                            "Stream handler hung up without returning a status".to_owned(),
146                        )))
147                    }) {
148                        let _ = stream_errors_out.unbounded_send(x);
149                    }
150                }
151            })
152            // Buffer unordered all the stream errors otherwise we'll block the
153            // reader task.
154            .buffer_unordered(usize::MAX)
155            // Never yield anything and match the type of read_result_stream.
156            .filter_map(|()| futures::future::ready(None));
157
158        let read_result_stream = futures::stream::select(stream_errors, read_result_stream);
159
160        // The `futures::stream::select` function requires both of the streams you give it to be the
161        // same type and effectively interleaves the output of the two streams. We want to poll two
162        // streams at once but handle their output differently, so we wrap them in an `Either`.
163        let read_result_stream = read_result_stream.map(Either::Left);
164
165        // `streams_in` will yield any new streams the user wants to create. If the user hangs up
166        // that channel, we don't need to react, as the existing streams may still be in use. So we
167        // make streams_in poll forever once the user stops supplying new streams.
168        let streams_in = streams_in.chain(futures::stream::pending()).map(Either::Right);
169
170        let events = futures::stream::select(read_result_stream, streams_in);
171        let mut events = pin!(events);
172
173        let mut ret = Ok(());
174        while let Some(event) = events.next().await {
175            match event {
176                Either::Left(Ok(())) => (),
177                Either::Left(other) => {
178                    ret = other;
179                    break;
180                }
181                Either::Right((reader, writer)) => {
182                    let id = stream_ids.next().expect("This iterator should be infinite!");
183                    if new_readers_sender.clone().send((id, reader)).await.is_err() {
184                        break;
185                    }
186                    streams.lock().unwrap().insert(id, StreamStatus::Open(writer));
187                }
188            }
189        }
190
191        if matches!(ret, Err(Error::ConnectionClosed(None))) {
192            let writer = writer.lock().unwrap();
193            if writer.is_closed() {
194                ret = Err(Error::ConnectionClosed(writer.closed_reason()))
195            }
196        }
197
198        let mut streams = streams.lock().unwrap();
199
200        for (_, stream) in streams.drain() {
201            if let StreamStatus::Open(writer) = stream {
202                writer.close(format!("Multi-stream terminated: {ret:?}"));
203            }
204        }
205
206        ret
207    };
208
209    let handle_write = new_readers.for_each_concurrent(None, move |(id, stream)| {
210        let writer = Arc::clone(&writer);
211        async move {
212            if let Some(reason) = write_as_chunks(id, &stream, writer).await {
213                stream.close(format!("Stream terminated ({reason})"));
214            } else {
215                stream.close(format!("Stream terminated"));
216            }
217        }
218    });
219
220    let handle_read = pin!(handle_read);
221    let handle_write = pin!(handle_write);
222
223    match futures::future::select(handle_read, handle_write).await {
224        Either::Left((res, _)) => res,
225        Either::Right(((), handle_read)) => handle_read.await,
226    }
227}
228
229/// Handles one chunk of data from the incoming stream.
230///
231/// The incoming stream is an interleaving of several byte streams. They are interleaved by
232/// splitting them into chunks and attaching a header to each. This function assumes `buf` has been
233/// filled with data from the incoming stream, and tries to parse the header for the first chunk and
234/// process the data within.
235///
236/// If the buffer isn't long enough to contain an entire chunk, this will return `BufferTooShort`,
237/// otherwise it will return the size of the chunk processed in the first element of the returned
238/// tuple.
239///
240/// Once a chunk is parsed, the table of individual streams given with the `streams` argument will
241/// be used to route the incoming data. The second element of the returned tuple will be `Some` if
242/// and only if the chunk indicates a new stream is being started, in which case it will contain the
243/// ID of the new stream, and the data portion of the chunk.
244fn handle_one_chunk<'a>(
245    streams: &mut HashMap<u32, StreamStatus>,
246    is_server: bool,
247    buf: &'a [u8],
248    remote_name: &str,
249) -> Result<(usize, Option<(u32, &'a [u8])>)> {
250    if buf.len() < 6 {
251        return Err(Error::BufferTooShort(6));
252    }
253
254    let id = u32::from_le_bytes(buf[..4].try_into().unwrap());
255    let length = u16::from_le_bytes(buf[4..6].try_into().unwrap());
256
257    let length = length as usize;
258    let chunk_length = length + 6;
259    let buf = &buf[6..];
260
261    if length == 0 {
262        if buf.len() < 2 {
263            return Err(Error::BufferTooShort(8));
264        }
265        let length = u16::from_le_bytes(buf[..2].try_into().unwrap());
266        let length = length as usize;
267
268        if buf.len() < length + 2 {
269            return Err(Error::BufferTooShort(8 + length));
270        }
271
272        let epitaph =
273            if length > 0 { Some(String::from_utf8_lossy(&buf[2..][..length])) } else { None };
274
275        if let Some(old) = streams.insert(id, StreamStatus::Closed) {
276            if let StreamStatus::Open(old) = old {
277                if let Some(epitaph) = epitaph {
278                    old.close(format!("{remote_name} reported: {epitaph}"));
279                } else {
280                    old.close(format!("{remote_name} reported no epitaph"));
281                }
282            }
283            Ok((length + 8, None))
284        } else {
285            Err(Error::BadStreamId)
286        }
287    } else if buf.len() < length {
288        Err(Error::BufferTooShort(chunk_length))
289    } else if let Some(stream) = streams.get(&id) {
290        match stream {
291            StreamStatus::Open(stream) => {
292                if stream
293                    .write(length, |out| {
294                        out[..length].copy_from_slice(&buf[..length]);
295                        Ok(length)
296                    })
297                    .is_err()
298                {
299                    // The user isn't listening for incoming data anymore. Don't
300                    // treat that as a fatal error, and don't send a hangup in case
301                    // the user is still sending data. Just quietly ignore it. If
302                    // the user hangs up the other side of the connection that's
303                    // when we can complain.
304                    let _ = streams.insert(id, StreamStatus::ReadClosed);
305                }
306                Ok((chunk_length, None))
307            }
308            StreamStatus::ReadClosed => Ok((chunk_length, None)),
309            StreamStatus::Closed => Err(Error::BadStreamId),
310        }
311    } else if (id & 1 == 0) == is_server {
312        Ok((chunk_length, Some((id, &buf[..length]))))
313    } else {
314        Err(Error::BadStreamId)
315    }
316}
317
318/// Reads data from the given reader, then splits it into chunks of no more than 2^16 - 1 bytes,
319/// attaches a header to each chunk containing the given `id` number and the size of the chunk, then
320/// writes each chunk to the given writer.
321///
322/// The point, of course, is that multiple functions can do this to the same writer, and since the
323/// chunks are labeled, the data can be parsed back out into separate streams on the other end.
324///
325/// If writing stops because the writer is closed, this will return the reported reason for closure.
326async fn write_as_chunks(
327    id: u32,
328    reader: &stream::Reader,
329    writer: Arc<SyncMutex<stream::Writer>>,
330) -> Option<String> {
331    loop {
332        // We want to handle errors with the read and errors with the write differently, so we
333        // return a nested result.
334        //
335        // In short, this will return Ok(Ok(())) if all is well, Ok(Err(...)) if we read data
336        // successfully but then failed to write it, and Err(...) if we failed to read.
337        let got = reader
338            .read(1, |buf| {
339                let mut total_len = 0;
340                while buf.len() > total_len {
341                    let buf = &buf[total_len..];
342                    let buf =
343                        if buf.len() > u16::MAX as usize { &buf[..u16::MAX as usize] } else { buf };
344
345                    let len: u16 = buf
346                        .len()
347                        .try_into()
348                        .expect("We just truncated the length so it would fit!");
349
350                    if let e @ Err(_) = writer.lock().unwrap().write(6 + buf.len(), |out_buf| {
351                        out_buf[..4].copy_from_slice(&id.to_le_bytes());
352                        let out_buf = &mut out_buf[4..];
353                        out_buf[..2].copy_from_slice(&len.to_le_bytes());
354                        out_buf[2..][..buf.len()].copy_from_slice(buf);
355                        Ok(buf.len() + 6)
356                    }) {
357                        return Ok((e, total_len));
358                    } else {
359                        total_len += buf.len();
360                    }
361                }
362
363                Ok((Ok(()), total_len))
364            })
365            .await;
366
367        match got {
368            Err(Error::ConnectionClosed(epitaph)) => {
369                let epitaph = epitaph.as_ref().map(|x| x.as_bytes()).unwrap_or(b"");
370                let length_u16: u16 = epitaph.len().try_into().unwrap_or(u16::MAX);
371                let length = length_u16 as usize;
372
373                // If the stream was closed, send a frame indicating such.
374                let write_result = writer.lock().unwrap().write(8 + length, |out_buf| {
375                    out_buf[..4].copy_from_slice(&id.to_le_bytes());
376                    let out_buf = &mut out_buf[4..];
377                    out_buf[..2].copy_from_slice(&0u16.to_le_bytes());
378                    let out_buf = &mut out_buf[2..];
379                    out_buf[..2].copy_from_slice(&length_u16.to_le_bytes());
380                    let out_buf = &mut out_buf[2..];
381                    out_buf[..length].copy_from_slice(&epitaph[..length]);
382                    Ok(8 + length)
383                });
384
385                match write_result {
386                    Ok(()) | Err(Error::ConnectionClosed(None)) => break None,
387                    Err(Error::ConnectionClosed(Some(s))) => break Some(format!("write: {s}")),
388                    other => unreachable!("Unexpected write error: {other:?}"),
389                }
390            }
391            Ok(Ok(())) => (),
392            Ok(Err(Error::ConnectionClosed(None))) => break None,
393            Ok(Err(Error::ConnectionClosed(Some(s)))) => break Some(format!("read: {s}")),
394            Ok(other) => unreachable!("Unexpected write error: {other:?}"),
395            other => unreachable!("Unexpected read error: {other:?}"),
396        }
397    }
398}
399
400/// Creates a new connection to a circuit node, and merges all streams produced and consumed by that
401/// connection into a multi-stream. In this way you can service a connection between nodes with a
402/// single stream of bytes.
403///
404/// The `is_server` boolean should be `true` at one end of the connection and `false` at the other.
405/// Usually it will be `true` for the node receiving a connection and `false` for the node
406/// initiating one.
407///
408/// Traffic will be written to, and read from, the given `reader` and `writer`.
409///
410/// The `quality` will be used to make routing decisions when establishing streams across multiple
411/// nodes.
412pub fn multi_stream_node_connection(
413    node: &Node,
414    reader: stream::Reader,
415    writer: stream::Writer,
416    is_server: bool,
417    quality: Quality,
418    stream_errors_out: UnboundedSender<Error>,
419    remote_name: String,
420) -> impl Future<Output = Result<()>> + Send + use<> {
421    let (mut new_stream_sender, streams_in) = channel(1);
422    let (streams_out, new_stream_receiver) = channel(1);
423
424    let control_stream = if is_server {
425        let (control_reader, control_writer_remote) = stream::stream();
426        let (control_reader_remote, control_writer) = stream::stream();
427
428        new_stream_sender
429            .try_send((control_reader_remote, control_writer_remote))
430            .expect("We just created this channel!");
431        Some((control_reader, control_writer))
432    } else {
433        None
434    };
435
436    let stream_fut = multi_stream(
437        reader,
438        writer,
439        is_server,
440        streams_out,
441        streams_in,
442        stream_errors_out,
443        remote_name,
444    );
445    let node_fut = node.link_node(control_stream, new_stream_sender, new_stream_receiver, quality);
446
447    async move {
448        let node_fut = pin!(node_fut);
449        let stream_fut = pin!(stream_fut);
450
451        // If either the node connection or the multi stream dies we assume the other will also die
452        // shortly after, so we always await both futures to completion.
453        match futures::future::select(node_fut, stream_fut).await {
454            Either::Left((res, stream_fut)) => res.and(stream_fut.await),
455            Either::Right((res, node_fut)) => res.and(node_fut.await),
456        }
457    }
458}
459
460/// Same as `multi_stream_node_connection` but reads and writes to and from implementors of the
461/// standard `AsyncRead` and `AsyncWrite` traits rather than circuit streams.
462pub async fn multi_stream_node_connection_to_async(
463    node: &Node,
464    rx: &mut (dyn AsyncRead + Unpin + Send),
465    tx: &mut (dyn AsyncWrite + Unpin + Send),
466    is_server: bool,
467    quality: Quality,
468    stream_errors_out: UnboundedSender<Error>,
469    remote_name: String,
470) -> Result<()> {
471    let (reader, remote_writer) = stream::stream();
472    let (remote_reader, writer) = stream::stream();
473    let conn_fut = multi_stream_node_connection(
474        node,
475        remote_reader,
476        remote_writer,
477        is_server,
478        quality,
479        stream_errors_out,
480        remote_name.clone(),
481    );
482    let remote_name = &remote_name;
483
484    let read_fut = async move {
485        let mut buf = [0u8; 4096];
486        loop {
487            let n = match rx.read(&mut buf).await {
488                Ok(0) => {
489                    writer
490                        .close(format!("connection closed (either by transport or {remote_name})"));
491                    break Ok(());
492                }
493                Ok(n) => n,
494                Err(e) => {
495                    writer.close(format!("{remote_name} connection failed (read): {e:?}"));
496                    return Err(Error::from(e));
497                }
498            };
499            writer.write(n, |write_buf| {
500                write_buf[..n].copy_from_slice(&buf[..n]);
501                Ok(n)
502            })?
503        }
504    };
505
506    let write_fut = async move {
507        let mut tx = std::pin::pin!(tx);
508        loop {
509            let res = futures::future::poll_fn(|ctx| {
510                reader.poll_read(ctx, &mut 1, |ctx, read_buf| {
511                    let res = match std::task::ready!(tx.as_mut().poll_write(ctx, read_buf)) {
512                        Ok(x) => Ok(((), x)),
513                        Err(e) => Err(Error::IO(e)),
514                    };
515                    std::task::Poll::Ready(res)
516                })
517            })
518            .await;
519
520            if let Err(Error::IO(e)) = &res {
521                reader.close(format!("{remote_name} connection failed (write): {e:?}"));
522                return res;
523            }
524
525            res?;
526        }
527    };
528
529    let read_write = futures::future::try_join(read_fut, write_fut);
530
531    let conn_fut = pin!(conn_fut);
532
533    let cleanup = {
534        // We must pin read_write to this scope so the streams are dropped when
535        // we leave this scope and conn_fut can run to completion. If it's
536        // pinned to the stack alongside conn_fut then the streams aren't
537        // dropped and conn_fut doesn't observe the closed status.
538        let read_write = pin!(read_write);
539
540        match futures::future::select(conn_fut, read_write).await {
541            Either::Left((res, _)) => {
542                return res;
543            }
544            Either::Right((read_write_result, conn_fut)) => {
545                conn_fut.map(move |conn_fut_result| match (conn_fut_result, read_write_result) {
546                    (Ok(()), Ok(((), ()))) => Ok(()),
547                    // Report back any errors, preferring the one from the
548                    // connection future.
549                    (Ok(()), Err(e)) | (Err(e), _) => Err(e),
550                })
551            }
552        }
553    };
554
555    // If the read/write future finished first, we need to wait for the
556    // connection future to run to completion to ensure cleanup happens.
557    cleanup.await
558}
559
560#[cfg(test)]
561mod test {
562    use super::*;
563    use fuchsia_async as fasync;
564    use futures::channel::mpsc::unbounded;
565
566    #[fuchsia::test]
567    async fn one_stream() {
568        let (a_reader, b_writer) = stream::stream();
569        let (b_reader, a_writer) = stream::stream();
570        let (mut create_stream_a, a_streams_in) = channel(1);
571        let (_create_stream_b, b_streams_in) = channel(1);
572        let (a_streams_out, _get_stream_a) = channel(100);
573        let (b_streams_out, mut get_stream_b) = channel(1);
574        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
575        // observe them in a test.
576        let (errors_sink_a, _black_hole) = unbounded();
577        let errors_sink_b = errors_sink_a.clone();
578
579        let _a = fasync::Task::spawn(async move {
580            assert!(matches!(
581                multi_stream(
582                    a_reader,
583                    a_writer,
584                    true,
585                    a_streams_out,
586                    a_streams_in,
587                    errors_sink_a,
588                    "b".to_owned()
589                )
590                .await,
591                Ok(()) | Err(Error::ConnectionClosed(None))
592            ))
593        });
594        let _b = fasync::Task::spawn(async move {
595            assert!(matches!(
596                multi_stream(
597                    b_reader,
598                    b_writer,
599                    false,
600                    b_streams_out,
601                    b_streams_in,
602                    errors_sink_b,
603                    "a".to_owned()
604                )
605                .await,
606                Ok(()) | Err(Error::ConnectionClosed(None))
607            ))
608        });
609
610        let (ab_reader_a, ab_reader_write) = stream::stream();
611        let (ab_writer_read, ab_writer_a) = stream::stream();
612
613        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
614
615        ab_writer_a
616            .write(8, |buf| {
617                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
618                Ok(8)
619            })
620            .unwrap();
621
622        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
623        err.send(Ok(())).unwrap();
624
625        ab_writer_b
626            .write(8, |buf| {
627                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
628                Ok(8)
629            })
630            .unwrap();
631
632        ab_reader_b
633            .read(8, |buf| {
634                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
635                Ok(((), 8))
636            })
637            .await
638            .unwrap();
639        ab_reader_a
640            .read(8, |buf| {
641                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
642                Ok(((), 8))
643            })
644            .await
645            .unwrap();
646
647        std::mem::drop(ab_writer_b);
648        assert!(matches!(
649            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
650            Err(Error::ConnectionClosed(_))
651        ));
652
653        std::mem::drop(ab_reader_b);
654        ab_writer_a
655            .write(8, |buf| {
656                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
657                Ok(8)
658            })
659            .unwrap();
660    }
661
662    #[fuchsia::test]
663    async fn fallible_stream() {
664        let (a_reader, b_writer) = stream::stream();
665        let (b_reader, a_writer) = stream::stream();
666        let (mut create_stream_a, a_streams_in) = channel(1);
667        let (_create_stream_b, b_streams_in) = channel(1);
668        let (a_streams_out, _get_stream_a) = channel(100);
669        let (b_streams_out, mut get_stream_b) = channel(1);
670        let (errors_sink_a, _black_hole) = unbounded();
671        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
672        // observe them in a test.
673        let (errors_sink_b, mut b_errors) = unbounded();
674
675        let _a = fasync::Task::spawn(async move {
676            assert!(matches!(
677                multi_stream(
678                    a_reader,
679                    a_writer,
680                    true,
681                    a_streams_out,
682                    a_streams_in,
683                    errors_sink_a,
684                    "b".to_owned()
685                )
686                .await,
687                Ok(()) | Err(Error::ConnectionClosed(None))
688            ))
689        });
690        let _b = fasync::Task::spawn(async move {
691            assert!(matches!(
692                multi_stream(
693                    b_reader,
694                    b_writer,
695                    false,
696                    b_streams_out,
697                    b_streams_in,
698                    errors_sink_b,
699                    "a".to_owned()
700                )
701                .await,
702                Ok(()) | Err(Error::ConnectionClosed(None))
703            ))
704        });
705
706        // The first stream fails to be created.
707        let (fail_reader, fail_reader_write) = stream::stream();
708        let (_ignore, fail_writer) = stream::stream();
709        create_stream_a.send((fail_reader, fail_writer)).await.unwrap();
710
711        // There's a laziness to stream creation in the protocol so we need to send a little data to
712        // actually create the stream.
713        fail_reader_write
714            .write(8, |buf| {
715                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
716                Ok(8)
717            })
718            .unwrap();
719
720        let (_, _, err) = get_stream_b.next().await.unwrap();
721        err.send(Err(Error::ConnectionClosed(Some("Testing".to_owned())))).unwrap();
722
723        loop {
724            if let Some(Error::ConnectionClosed(Some(s))) = b_errors.next().await {
725                if s == "Testing" {
726                    break;
727                }
728            } else {
729                panic!("Error stream closed without reporting our error.");
730            }
731        }
732
733        let (ab_reader_a, ab_reader_write) = stream::stream();
734        let (ab_writer_read, ab_writer_a) = stream::stream();
735
736        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
737
738        ab_writer_a
739            .write(8, |buf| {
740                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
741                Ok(8)
742            })
743            .unwrap();
744
745        let (ab_reader_b, ab_writer_b, err) = get_stream_b.next().await.unwrap();
746        err.send(Ok(())).unwrap();
747
748        ab_writer_b
749            .write(8, |buf| {
750                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
751                Ok(8)
752            })
753            .unwrap();
754
755        ab_reader_b
756            .read(8, |buf| {
757                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
758                Ok(((), 8))
759            })
760            .await
761            .unwrap();
762        ab_reader_a
763            .read(8, |buf| {
764                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
765                Ok(((), 8))
766            })
767            .await
768            .unwrap();
769
770        std::mem::drop(ab_writer_b);
771        assert!(matches!(
772            ab_reader_a.read::<_, ()>(1, |_| unreachable!()).await,
773            Err(Error::ConnectionClosed(_))
774        ));
775
776        std::mem::drop(ab_reader_b);
777        ab_writer_a
778            .write(8, |buf| {
779                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
780                Ok(8)
781            })
782            .unwrap();
783    }
784
785    #[fuchsia::test]
786    async fn two_streams() {
787        let (a_reader, b_writer) = stream::stream();
788        let (b_reader, a_writer) = stream::stream();
789        let (mut create_stream_a, a_streams_in) = channel(1);
790        let (mut create_stream_b, b_streams_in) = channel(1);
791        let (a_streams_out, mut get_stream_a) = channel(1);
792        let (b_streams_out, mut get_stream_b) = channel(1);
793        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
794        // observe them in a test.
795        let (errors_sink, _black_hole) = unbounded();
796
797        let _a = fasync::Task::spawn(multi_stream(
798            a_reader,
799            a_writer,
800            true,
801            a_streams_out,
802            a_streams_in,
803            errors_sink.clone(),
804            "b".to_owned(),
805        ));
806        let _b = fasync::Task::spawn(multi_stream(
807            b_reader,
808            b_writer,
809            false,
810            b_streams_out,
811            b_streams_in,
812            errors_sink.clone(),
813            "a".to_owned(),
814        ));
815
816        let (ab_reader_a, ab_reader_write) = stream::stream();
817        let (ab_writer_read, ab_writer_a) = stream::stream();
818        let (ba_reader_b, ba_reader_write) = stream::stream();
819        let (ba_writer_read, ba_writer_b) = stream::stream();
820
821        create_stream_a.send((ab_writer_read, ab_reader_write)).await.unwrap();
822        create_stream_b.send((ba_writer_read, ba_reader_write)).await.unwrap();
823
824        ab_writer_a
825            .write(8, |buf| {
826                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
827                Ok(8)
828            })
829            .unwrap();
830        ba_writer_b
831            .write(8, |buf| {
832                buf[..8].copy_from_slice(&[25, 26, 27, 28, 29, 30, 31, 32]);
833                Ok(8)
834            })
835            .unwrap();
836
837        let (ab_reader_b, ab_writer_b, err_ab) = get_stream_b.next().await.unwrap();
838        let (ba_reader_a, ba_writer_a, err_ba) = get_stream_a.next().await.unwrap();
839        err_ab.send(Ok(())).unwrap();
840        err_ba.send(Ok(())).unwrap();
841
842        ab_writer_b
843            .write(8, |buf| {
844                buf[..8].copy_from_slice(&[9, 10, 11, 12, 13, 14, 15, 16]);
845                Ok(8)
846            })
847            .unwrap();
848        ba_writer_a
849            .write(8, |buf| {
850                buf[..8].copy_from_slice(&[17, 18, 19, 20, 21, 22, 23, 24]);
851                Ok(8)
852            })
853            .unwrap();
854
855        ab_reader_b
856            .read(8, |buf| {
857                assert_eq!(&buf[..8], &[1, 2, 3, 4, 5, 6, 7, 8]);
858                Ok(((), 8))
859            })
860            .await
861            .unwrap();
862        ab_reader_a
863            .read(8, |buf| {
864                assert_eq!(&buf[..8], &[9, 10, 11, 12, 13, 14, 15, 16]);
865                Ok(((), 8))
866            })
867            .await
868            .unwrap();
869        ba_reader_b
870            .read(8, |buf| {
871                assert_eq!(&buf[..8], &[17, 18, 19, 20, 21, 22, 23, 24]);
872                Ok(((), 8))
873            })
874            .await
875            .unwrap();
876        ba_reader_a
877            .read(8, |buf| {
878                assert_eq!(&buf[..8], &[25, 26, 27, 28, 29, 30, 31, 32]);
879                Ok(((), 8))
880            })
881            .await
882            .unwrap();
883    }
884
885    #[fuchsia::test]
886    async fn node_connect() {
887        let (new_peer_sender_a, mut new_peers) = channel(1);
888        let (new_peer_sender_b, _new_peers_b) = channel(100);
889        let (incoming_streams_sender_a, _streams_a) = channel(100);
890        let (incoming_streams_sender_b, mut streams) = channel(1);
891        let a = Node::new("a", "test", new_peer_sender_a, incoming_streams_sender_a).unwrap();
892        let b = Node::new("b", "test", new_peer_sender_b, incoming_streams_sender_b).unwrap();
893        // Connection closure errors are very timing-dependent so we'll tend to be flaky if we
894        // observe them in a test.
895        let (errors_sink, _black_hole) = unbounded();
896
897        let (ab_reader, ab_writer) = stream::stream();
898        let (ba_reader, ba_writer) = stream::stream();
899
900        let _a_conn = fasync::Task::spawn(multi_stream_node_connection(
901            &a,
902            ba_reader,
903            ab_writer,
904            true,
905            Quality::IN_PROCESS,
906            errors_sink.clone(),
907            "b".to_owned(),
908        ));
909        let _b_conn = fasync::Task::spawn(multi_stream_node_connection(
910            &b,
911            ab_reader,
912            ba_writer,
913            false,
914            Quality::IN_PROCESS,
915            errors_sink.clone(),
916            "a".to_owned(),
917        ));
918
919        let new_peer = new_peers.next().await.unwrap();
920        assert_eq!("b", &new_peer);
921
922        let (_reader, peer_writer) = stream::stream();
923        let (peer_reader, writer) = stream::stream();
924        a.connect_to_peer(peer_reader, peer_writer, "b").await.unwrap();
925
926        writer
927            .write(8, |buf| {
928                buf[..8].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
929                Ok(8)
930            })
931            .unwrap();
932
933        let (reader, _writer, from) = streams.next().await.unwrap();
934        assert_eq!("a", &from);
935
936        reader
937            .read(8, |buf| {
938                assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8], &buf);
939                Ok(((), 8))
940            })
941            .await
942            .unwrap();
943    }
944}