1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use fuchsia_async::Timer;
use futures::channel::mpsc::Sender;
use futures::channel::oneshot;
use futures::future::{poll_fn, Either};
use futures::stream::StreamExt as _;
use futures::FutureExt;
use futures::SinkExt as _;
use std::collections::HashMap;
use std::future::Future;
use std::pin::pin;
use std::sync::{Arc, Mutex, Weak};
use std::task::Poll;
use std::time::Duration;

pub const CIRCUIT_VERSION: u8 = 0;

mod error;
mod protocol;
#[cfg(test)]
mod test;

mod connection;
pub mod multi_stream;
pub mod stream;

use protocol::{EncodableString, Identify, NodeState};

pub use connection::{Connection, ConnectionNode};
pub use error::{Error, Result};
pub use protocol::Quality;

use crate::protocol::ProtocolMessage;

/// A list of other nodes we can see on the mesh. For each such node we retain a vector of senders
/// which allow us to establish new streams with that node. Each sender in the vector corresponds to
/// a different path we could take to the desired peer. For each sender we also store the sum of all
/// quality values for hops to that peer (see `header::NodeState::Online`).
struct PeerMap {
    /// The actual map of peers itself.
    peers: HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>,
    /// This value increments once every time the peer map changes. Consequently, we can track
    /// changes in this number to determine when a routing refresh is necessary.
    generation: usize,
    /// This allows the router task to wait for this structure to update. Put simply, any time we
    /// increment `generation`, we should remove this sender from the struct and fire it if it is
    /// present.
    wakeup: Option<oneshot::Sender<()>>,
    /// Control channels for various peer nodes.
    ///
    /// Every time we directly connect to a new node, we start a stream called the control stream,
    /// which we use to send handshake messages and routing updates. Once we've written the
    /// handshake message, the write side of that control stream ends up here, and this is where we
    /// send routing updates.
    ///
    /// If we have a routing task, then any time the peer map updates, we will send updated node
    /// states to each channel contained in this `Vec`. The `HashMap` contains the state of the peer
    /// map the last time we sent info and is used to determine what updates need to be sent.
    ///
    /// If we don't have a routing task, this should be empty. In that case we're a "leaf node" and
    /// we don't send any routing information to peers at all.
    control_channels: Vec<(stream::Writer, HashMap<EncodableString, Quality>)>,
}

impl PeerMap {
    /// Create a new peer map.
    fn new() -> Self {
        PeerMap {
            peers: HashMap::new(),

            // Start with generation 1 so the routing task can start with generation 0. This means
            // the routing task is out of sync as soon as it starts, so it will immediately trigger
            // an update.
            generation: 1,

            wakeup: None,
            control_channels: Vec::new(),
        }
    }

    /// Increment the generation field and fire the wakeup sender if present.
    ///
    /// In short: this signals to the routing task that the peer map has been modified, so that it
    /// can update neighboring nodes with new routing information.
    fn increment_generation(&mut self) {
        self.generation += 1;
        self.wakeup.take().map(|x| {
            let _ = x.send(());
        });
    }

    /// Get the list of peers mutably, and signal the routing task that we have modified it.
    fn peers(
        &mut self,
    ) -> &mut HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>
    {
        self.increment_generation();
        &mut self.peers
    }

    /// Reduces our peer map to a simpler form which contains only the name of each node we can see
    /// and the quality rating of the fastest connection we have to that node.
    fn condense_routes(&self) -> HashMap<EncodableString, Quality> {
        let mut ret = HashMap::new();

        for (key, value) in &self.peers {
            if value.is_empty() {
                continue;
            }

            ret.insert(key.clone(), value[0].1);
        }

        ret
    }

    /// Adds a control channel to `control_channels` and sends an initial route update for that
    /// channel.
    fn add_control_channel(&mut self, channel: stream::Writer, quality: Quality) -> Result<()> {
        let routes = self.condense_routes();

        for (node, &route_quality) in &routes {
            let quality = quality.combine(route_quality);
            let state = NodeState::Online(node.clone(), quality);

            channel.write_protocol_message(&state)?;
        }

        self.control_channels.push((channel, routes));

        Ok(())
    }
}

/// Represents a node on the circuit network.
///
/// A node can connect to one or more other nodes, and a stream can be established between any two
/// nodes on the entire connected graph of nodes.
pub struct Node {
    /// A unique identifying string for this node.
    node_id: EncodableString,
    /// Indicates what protocol we intend to run atop the streams we established, e.g.
    /// "Overnet-with-FIDL-ABI-1"
    protocol: EncodableString,
    /// List of other nodes we can "see" on the network.
    peers: Arc<Mutex<PeerMap>>,
    /// If true, we will inform nodes we are connected to of what other nodes we can see, so that
    /// they might try to forward traffic through us. This also indicates that there is a "router
    /// process" running for this node which handles said forwarding.
    has_router: bool,

    /// If another node establishes a connection to this node, we will notify the user by way of
    /// this sender.
    incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,

    /// If a new peer becomes available we will send its name through this sender to notify the user.
    new_peer_sender: Sender<String>,
}

impl Node {
    /// Establish a new node.
    ///
    /// Any time a new peer becomes visible to this node, the peer's node ID will be sent to
    /// `new_peer_sender`.
    ///
    /// Any time a peer wants to establish a stream with this node, a reader and writer for the new
    /// stream as well as the peer's node ID will be sent to `incoming_stream_sender`.
    pub fn new(
        node_id: &str,
        protocol: &str,
        new_peer_sender: Sender<String>,
        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
    ) -> Result<Node> {
        let node_id = node_id.to_owned().try_into()?;
        let protocol = protocol.to_owned().try_into()?;

        Ok(Node {
            node_id,
            protocol,
            new_peer_sender,
            incoming_stream_sender,
            peers: Arc::new(Mutex::new(PeerMap::new())),
            has_router: false,
        })
    }

    /// Establish a new node which will forward streams between its peers. The router process
    /// provided must be polled continuously to provide this forwarding.
    pub fn new_with_router(
        node_id: &str,
        protocol: &str,
        interval: Duration,
        new_peer_sender: Sender<String>,
        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
    ) -> Result<(Node, impl Future<Output = ()> + Send)> {
        let mut node = Self::new(node_id, protocol, new_peer_sender, incoming_stream_sender)?;
        node.has_router = true;

        let weak_peers = Arc::downgrade(&node.peers);

        Ok((node, router(weak_peers, interval)))
    }

    /// Establish a stream with another node. Data will be sent to the peer with
    /// `connection_writer`, and received with `connection_reader`.
    pub async fn connect_to_peer(
        &self,
        connection_reader: stream::Reader,
        connection_writer: stream::Writer,
        node_id: &str,
    ) -> Result<()> {
        if self.node_id == node_id {
            self.incoming_stream_sender
                .clone()
                .send((connection_reader, connection_writer, node_id.to_owned()))
                .await
                .map_err(|_| Error::NoSuchPeer(node_id.to_owned()))?;
        } else {
            let node_id: EncodableString = node_id.to_owned().try_into()?;
            // Write the destination node ID and the source node ID (us). Keep in mind because of
            // the semantics of push_back_protocol_message, these will come off the wire in the
            // *reverse* of the order we're putting them on here.
            connection_reader.push_back_protocol_message(&self.node_id)?;
            connection_reader.push_back_protocol_message(&node_id)?;
            connect_to_peer(
                Arc::clone(&self.peers),
                connection_reader,
                connection_writer,
                &node_id,
            )
            .await?;
        }

        Ok(())
    }

    /// Test function to copy all routes toward one node as routes to another node.
    #[cfg(test)]
    pub fn route_via(&self, to: &str, via: &str) {
        let to = EncodableString::try_from(to.to_owned()).unwrap();
        let via = EncodableString::try_from(via.to_owned()).unwrap();
        let mut peers = self.peers.lock().unwrap();
        let new_list = peers.peers.get(&via).unwrap().clone();
        peers.peers.insert(to, new_list);
    }

    /// Connect to another node.
    ///
    /// This establishes the internal state to link this node directly to another one, there by
    /// joining it to the circuit network. To actually perform the networking necessary to create
    /// such a link, a back end will have to service the streams given to this function via
    /// its arguments. To keep the link running, the returned future must also be polled to
    /// completion. Depending on configuration, it may complete swiftly or may poll for the entire
    /// lifetime of the link.
    ///
    /// When we link to another node, we immediately start a "control stream" that performs a
    /// handshake and sends routing messages. The reader and writer passed in through
    /// `control_stream` will be used to service this stream. If `control_stream` is `None` the
    /// first stream emitted from `new_stream_receiver` will be used.
    ///
    /// When the local node needs to create a new stream to the linked node, it will send a reader
    /// and writer to `new_stream_sender`.
    ///
    /// When the linked node wants to create a new stream to this node, the back end may send a
    /// reader and writer through `new_stream_receiver`, as well as a `oneshot::Sender` which will
    /// be used to report if the link is established successfully or if an error occurs.
    ///
    /// The returned future will continue to poll for the lifetime of the link and return the error
    /// that terminated it.
    pub fn link_node(
        &self,
        control_stream: Option<(stream::Reader, stream::Writer)>,
        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
        mut new_stream_receiver: impl futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
            + Unpin
            + Send,
        quality: Quality,
    ) -> impl Future<Output = Result<()>> + Send {
        let has_router = self.has_router;
        let peers = Arc::clone(&self.peers);
        let node_id = self.node_id.clone();
        let protocol = self.protocol.clone();
        let (new_stream_receiver_sender, new_stream_receiver_receiver) = oneshot::channel();
        let (control_reader_sender, control_reader_receiver) = oneshot::channel();
        let new_streams_loop =
            self.handle_new_streams(new_stream_receiver_receiver, new_stream_sender.clone());
        let control_stream_loop =
            self.handle_control_stream(control_reader_receiver, new_stream_sender.clone(), quality);

        // Runs all necessary background processing for a connection. Tasks include:
        // 1) Fetch a control stream from the new_stream_receiver if we don't have one from
        //    control_stream already.
        // 2) Send a handshake message to the other node.
        // 3) Wait for and validate the handshake from the peer.
        // 4) Receive updated routing information from the peer and update the peer map accordingly.
        // 5) When the peer tries to open a new stream to us, either forward that stream to another
        //    node or, if it's destined for us directly, consume the initial handshake from it and send
        //    it out to be processed by the user.
        async move {
            let (control_reader, control_writer) = if let Some(control_stream) = control_stream {
                control_stream
            } else {
                let (reader, writer, error_sender) =
                    new_stream_receiver.next().await.ok_or_else(|| {
                        Error::ConnectionClosed(Some(
                            "Client stopped listening for new streams".to_owned(),
                        ))
                    })?;
                let _ = error_sender.send(Ok(()));
                (reader, writer)
            };

            let _ = new_stream_receiver_sender.send(new_stream_receiver);

            let header = Identify::new(protocol.clone());
            control_writer.write_protocol_message(&header)?;

            let state = NodeState::Online(node_id.clone(), Quality::SELF);
            control_writer.write_protocol_message(&state)?;

            if has_router {
                peers
                    .lock()
                    .unwrap()
                    .add_control_channel(control_writer, quality)
                    .expect("We just created this channel!");
            } else {
                // No router means no further routing messages. Just let 'er go.
                std::mem::drop(control_writer);
            }

            // Start by reading and validating a handshake message from the stream.
            let header = control_reader.read_protocol_message::<Identify>().await?;

            if header.circuit_version != CIRCUIT_VERSION {
                return Err(Error::VersionMismatch);
            } else if header.protocol != protocol {
                return Err(Error::ProtocolMismatch);
            }

            control_reader_sender.send(control_reader).map_err(|_| {
                Error::ConnectionClosed(Some("Control stream handler disappeared".to_owned()))
            })?;

            let control_stream_loop = pin!(control_stream_loop);
            let new_streams_loop = pin!(new_streams_loop);

            let ret = match futures::future::select(control_stream_loop, new_streams_loop).await {
                Either::Left((result, new_streams)) => {
                    if matches!(result, Ok(()) | Err(Error::ConnectionClosed(_))) {
                        new_streams.await;
                    }
                    result
                }
                Either::Right(((), read_control)) => read_control.now_or_never().unwrap_or(Ok(())),
            };

            {
                let mut peers = peers.lock().unwrap();
                let peers = peers.peers();
                for peer_list in peers.values_mut() {
                    peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
                }
            }

            ret
        }
    }

    /// Handles messages we receive on a control stream, i.e. routing updates. `new_stream_sender`
    /// is a channel by which streams destined for another peer can be forwarded to the node on the
    /// other end of this control stream. The routing table will associate that sender with any
    /// peers that can be reached via that node.
    ///
    /// The returned future will poll until the control stream hangs up or a protocol error occurs.
    fn handle_control_stream(
        &self,
        control_reader: oneshot::Receiver<stream::Reader>,
        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
        quality: Quality,
    ) -> impl Future<Output = Result<()>> + Send {
        let peers = Arc::clone(&self.peers);
        let new_stream_sender = new_stream_sender.clone();
        let node_id = self.node_id.clone();
        let mut new_peer_sender = self.new_peer_sender.clone();

        async move {
            let control_reader = control_reader.await.map_err(|_| {
                Error::ConnectionClosed(Some(
                    "Reader never given to control stream handler".to_string(),
                ))
            })?;
            loop {
                let state = control_reader.read_protocol_message::<NodeState>().await?;
                match state {
                    NodeState::Online(peer, path_quality) => {
                        if peer == node_id {
                            continue;
                        }

                        let quality = path_quality.combine(quality);
                        let peer_string = peer.to_string();
                        let should_send = {
                            let mut peers = peers.lock().unwrap();
                            let peers = peers.peers();
                            let peer_list = peers.entry(peer).or_insert_with(Vec::new);
                            let should_send = peer_list.is_empty();
                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
                            peer_list.push((new_stream_sender.clone(), quality));
                            peer_list.sort_by_key(|x| x.1);
                            should_send
                        };
                        if should_send {
                            let _ = new_peer_sender.send(peer_string).await;
                        }
                    }
                    NodeState::Offline(peer) => {
                        let mut peers = peers.lock().unwrap();
                        let peers = peers.peers();
                        let peer_list = peers.get_mut(&peer);

                        if let Some(peer_list) = peer_list {
                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
                        }
                    }
                }
            }
        }
    }

    /// Handles requests for new streams. The `new_stream_receiver` provides a reader and writer for
    /// every stream that gets established to or through this node, as well as a `Result` sender so
    /// we can indicate if we have any trouble handling this stream. The `new_stream_sender` allows
    /// us to connect back to the connecting node. If we don't have routing info for the incoming
    /// node we will establish a new route via that stream when the connection arrives.
    ///
    /// For each incoming stream, we read a bit of protocol header out of it and either accept it or
    /// forward it to another peer.
    ///
    /// The returned future will poll until the back end hangs up the other end of the receiver.
    fn handle_new_streams(
        &self,
        new_stream_receiver_receiver: oneshot::Receiver<
            impl futures::Stream<
                    Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>),
                > + Unpin,
        >,
        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
    ) -> impl Future<Output = ()> {
        let peers = Arc::clone(&self.peers);
        let mut incoming_stream_sender = self.incoming_stream_sender.clone();
        let mut new_peer_sender = self.new_peer_sender.clone();
        let node_id = self.node_id.clone();

        async move {
            let mut new_stream_receiver = if let Ok(x) = new_stream_receiver_receiver.await {
                x
            } else {
                return;
            };

            while let Some((reader, writer, result_sender)) = new_stream_receiver.next().await {
                let _ = result_sender.send(
                    async {
                        let dest = reader
                            .read(EncodableString::MIN_SIZE, |buf| {
                                EncodableString::try_from_bytes(buf).map(|(dest, size)| {
                                    if dest == node_id {
                                        // If the destination is this node, discard the string
                                        // itself (we know where we are, so we don't need it)
                                        // and return as usual.
                                        (None, size)
                                    } else {
                                        // If the destination node is another node, return a
                                        // size of zero, which tells the reader to leave the
                                        // destination string in the stream. When we forward the
                                        // stream, the node we forward it to will be able to
                                        // read the destination again.
                                        (Some(dest), 0)
                                    }
                                })
                            })
                            .await?;

                        if let Some(dest) = dest {
                            connect_to_peer(Arc::clone(&peers), reader, writer, &dest).await?;
                        } else {
                            let src = reader.read_protocol_message::<EncodableString>().await?;
                            let send_new_peer = {
                                let mut peers = peers.lock().unwrap();
                                let peer_list =
                                    peers.peers.entry(src.clone()).or_insert_with(Vec::new);
                                if !peer_list.iter().any(|x| x.0.same_receiver(&new_stream_sender))
                                {
                                    peer_list.push((new_stream_sender.clone(), Quality::UNKNOWN));
                                    peers.increment_generation();
                                    true
                                } else {
                                    false
                                }
                            };

                            if send_new_peer {
                                let _ = new_peer_sender.send(src.to_string()).await;
                            }

                            incoming_stream_sender
                                .send((reader, writer, src.to_string()))
                                .await
                                .map_err(|_| {
                                    Error::ConnectionClosed(Some(
                                        "Incoming stream dispatcher disappeared".to_owned(),
                                    ))
                                })?;
                        }
                        Ok(())
                    }
                    .await,
                );
            }
        }
    }

    /// Get the node ID of this node.
    pub fn node_id(&self) -> &str {
        self.node_id.as_str()
    }
}

/// Given the reader and writer for an incoming connection, forward that
/// connection to another node.
async fn connect_to_peer(
    peers: Arc<Mutex<PeerMap>>,
    peer_reader: stream::Reader,
    peer_writer: stream::Writer,
    node_id: &EncodableString,
) -> Result<()> {
    let mut peer_channels = Some((peer_reader, peer_writer));
    let mut peer_sender: Option<Sender<(stream::Reader, stream::Writer)>> = None;

    poll_fn(|ctx| {
        // If we found a place to send the new stream during the last poll, but
        // it gave us pending, we have to keep that sender's clone around so our
        // waker is remembered and we get woken up. We could just drop it here
        // and repeat the whole discovery again, which would cost compute but
        // make us more responsive to changes in the network topology by a
        // smidge. Instead we'll try the same link again, and keep trying until
        // it succeeds or breaks.
        if let Some(sender) = &mut peer_sender {
            match sender.poll_ready(ctx) {
                Poll::Ready(Ok(())) => {
                    sender
                        .start_send(peer_channels.take().unwrap())
                        .expect("Should be guaranteed to succeed!");
                    return Poll::Ready(());
                }
                Poll::Ready(Err(_)) => peer_sender = None,
                Poll::Pending => {
                    return Poll::Pending;
                }
            }
        }

        let mut peers = peers.lock().unwrap();

        // For each peer we have a list of channels to which we can send our
        // reader and writer, each representing a connection which will become
        // the next link in the circuit. The list is sorted by connection
        // quality, getting worse toward the end of the list, so we want to send
        // our reader and writer to the first one we can.
        let Some(peer_list) = peers.peers.get_mut(node_id) else {
            return Poll::Ready(());
        };

        let mut changed = false;

        // Go through each potential connection and send to the first one which
        // will handle the connection. We may discover the first few we try have
        // hung up and gone away, so we'll delete those from the list and try
        // the next one.
        //
        // If the channel used to send to the fastest available connection is
        // full, we pause and retry when it is ready. We *could* continue down
        // the list to find another connection, but we don't; we assume waiting
        // for a faster connection to be serviceable locally nets better
        // performance in the long run than sending on a slower connection that
        // can be serviced right away.
        peer_list.retain_mut(|x| {
            if peer_sender.is_none() && peer_channels.is_some() {
                let mut sender = x.0.clone();
                match sender.poll_ready(ctx) {
                    Poll::Ready(Ok(())) => {
                        sender
                            .start_send(peer_channels.take().unwrap())
                            .expect("Should be guaranteed to succeed!");
                        true
                    }
                    Poll::Ready(Err(_)) => {
                        changed = true;
                        false
                    }
                    Poll::Pending => {
                        peer_sender = Some(sender);
                        true
                    }
                }
            } else {
                true
            }
        });

        // If this is true, we cleared out some stale connections from the
        // routing table. Send a routing update to update our neighbors about
        // how this might affect connectivity.
        if changed {
            peers.increment_generation();
        }

        // The peer sender is where we register our waker. If we don't have one
        // we didn't register a waker and should return now.
        if peer_sender.is_none() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    })
    .await;

    // Our iteration above should have taken channels and sent them along to the
    // connection that will handle them. If they're still here we didn't find a
    // channel.
    if peer_channels.is_none() {
        Ok(())
    } else {
        Err(Error::NoSuchPeer(node_id.to_string()))
    }
}

/// Given an old and a new condensed routing table, create a serialized list of
/// `NodeState`s which will update a node on what has changed between them.
fn route_updates(
    old_routes: &HashMap<EncodableString, Quality>,
    new_routes: &HashMap<EncodableString, Quality>,
) -> Vec<u8> {
    let mut ret = Vec::new();

    for (node, &quality) in new_routes {
        if let Some(&old_quality) = old_routes.get(node) {
            if old_quality == quality {
                continue;
            }
        }

        NodeState::Online(node.clone(), quality).write_bytes_vec(&mut ret);
    }

    for old_node in old_routes.keys() {
        if !new_routes.contains_key(old_node) {
            NodeState::Offline(old_node.clone()).write_bytes_vec(&mut ret);
        }
    }

    ret
}

/// Router process. Notifies each peer every time the router table is updated. The given `interval`
/// allows these updates to be rate-limited; there will always be at least `interval` time between
/// updates.
async fn router(peers: Weak<Mutex<PeerMap>>, interval: Duration) {
    let mut generation = 0;

    loop {
        let mut wake_receiver = None;
        {
            let peers = if let Some(peers) = peers.upgrade() {
                peers
            } else {
                return;
            };
            let mut peers = peers.lock().unwrap();

            if peers.generation <= generation {
                let (sender, receiver) = oneshot::channel();
                peers.wakeup = Some(sender);
                wake_receiver = Some(receiver);
            } else {
                let new_routes = peers.condense_routes();

                peers.control_channels.retain_mut(|(sender, routes)| {
                    let msgs = route_updates(routes, &new_routes);

                    if sender
                        .write(msgs.len(), |buf| {
                            buf[..msgs.len()].copy_from_slice(&msgs);
                            Ok(msgs.len())
                        })
                        .is_ok()
                    {
                        *routes = new_routes.clone();
                        true
                    } else {
                        false
                    }
                });

                generation = peers.generation;
            }
        }

        if let Some(receiver) = wake_receiver {
            let _ = receiver.await;
        } else {
            Timer::new(interval).await
        }
    }
}