circuit/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::Timer;
6use fuchsia_sync::Mutex;
7use futures::channel::mpsc::Sender;
8use futures::channel::oneshot;
9use futures::future::{Either, poll_fn};
10use futures::stream::StreamExt as _;
11use futures::{FutureExt, SinkExt as _};
12use std::collections::HashMap;
13use std::future::Future;
14use std::pin::pin;
15use std::sync::{Arc, Weak};
16use std::task::Poll;
17use std::time::Duration;
18
19pub const CIRCUIT_VERSION: u8 = 0;
20
21mod error;
22mod protocol;
23#[cfg(test)]
24mod test;
25
26mod connection;
27pub mod multi_stream;
28pub mod stream;
29
30use protocol::{EncodableString, Identify, NodeState};
31
32pub use connection::{Connection, ConnectionNode};
33pub use error::{Error, Result};
34pub use protocol::Quality;
35
36use crate::protocol::ProtocolMessage;
37
38/// A list of other nodes we can see on the mesh. For each such node we retain a vector of senders
39/// which allow us to establish new streams with that node. Each sender in the vector corresponds to
40/// a different path we could take to the desired peer. For each sender we also store the sum of all
41/// quality values for hops to that peer (see `header::NodeState::Online`).
42struct PeerMap {
43    /// The actual map of peers itself.
44    peers: HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>,
45    /// This value increments once every time the peer map changes. Consequently, we can track
46    /// changes in this number to determine when a routing refresh is necessary.
47    generation: usize,
48    /// This allows the router task to wait for this structure to update. Put simply, any time we
49    /// increment `generation`, we should remove this sender from the struct and fire it if it is
50    /// present.
51    wakeup: Option<oneshot::Sender<()>>,
52    /// Control channels for various peer nodes.
53    ///
54    /// Every time we directly connect to a new node, we start a stream called the control stream,
55    /// which we use to send handshake messages and routing updates. Once we've written the
56    /// handshake message, the write side of that control stream ends up here, and this is where we
57    /// send routing updates.
58    ///
59    /// If we have a routing task, then any time the peer map updates, we will send updated node
60    /// states to each channel contained in this `Vec`. The `HashMap` contains the state of the peer
61    /// map the last time we sent info and is used to determine what updates need to be sent.
62    ///
63    /// If we don't have a routing task, this should be empty. In that case we're a "leaf node" and
64    /// we don't send any routing information to peers at all.
65    control_channels: Vec<(stream::Writer, HashMap<EncodableString, Quality>)>,
66}
67
68impl PeerMap {
69    /// Create a new peer map.
70    fn new() -> Self {
71        PeerMap {
72            peers: HashMap::new(),
73
74            // Start with generation 1 so the routing task can start with generation 0. This means
75            // the routing task is out of sync as soon as it starts, so it will immediately trigger
76            // an update.
77            generation: 1,
78
79            wakeup: None,
80            control_channels: Vec::new(),
81        }
82    }
83
84    /// Increment the generation field and fire the wakeup sender if present.
85    ///
86    /// In short: this signals to the routing task that the peer map has been modified, so that it
87    /// can update neighboring nodes with new routing information.
88    fn increment_generation(&mut self) {
89        self.generation += 1;
90        self.wakeup.take().map(|x| {
91            let _ = x.send(());
92        });
93    }
94
95    /// Get the list of peers mutably, and signal the routing task that we have modified it.
96    fn peers(
97        &mut self,
98    ) -> &mut HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>
99    {
100        self.increment_generation();
101        &mut self.peers
102    }
103
104    /// Reduces our peer map to a simpler form which contains only the name of each node we can see
105    /// and the quality rating of the fastest connection we have to that node.
106    fn condense_routes(&self) -> HashMap<EncodableString, Quality> {
107        let mut ret = HashMap::new();
108
109        for (key, value) in &self.peers {
110            if value.is_empty() {
111                continue;
112            }
113
114            ret.insert(key.clone(), value[0].1);
115        }
116
117        ret
118    }
119
120    /// Adds a control channel to `control_channels` and sends an initial route update for that
121    /// channel.
122    fn add_control_channel(&mut self, channel: stream::Writer, quality: Quality) -> Result<()> {
123        let routes = self.condense_routes();
124
125        for (node, &route_quality) in &routes {
126            let quality = quality.combine(route_quality);
127            let state = NodeState::Online(node.clone(), quality);
128
129            channel.write_protocol_message(&state)?;
130        }
131
132        self.control_channels.push((channel, routes));
133
134        Ok(())
135    }
136}
137
138/// Represents a node on the circuit network.
139///
140/// A node can connect to one or more other nodes, and a stream can be established between any two
141/// nodes on the entire connected graph of nodes.
142pub struct Node {
143    /// A unique identifying string for this node.
144    node_id: EncodableString,
145    /// Indicates what protocol we intend to run atop the streams we established, e.g.
146    /// "Overnet-with-FIDL-ABI-1"
147    protocol: EncodableString,
148    /// List of other nodes we can "see" on the network.
149    peers: Arc<Mutex<PeerMap>>,
150    /// If true, we will inform nodes we are connected to of what other nodes we can see, so that
151    /// they might try to forward traffic through us. This also indicates that there is a "router
152    /// process" running for this node which handles said forwarding.
153    has_router: bool,
154
155    /// If another node establishes a connection to this node, we will notify the user by way of
156    /// this sender.
157    incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
158
159    /// If a new peer becomes available we will send its name through this sender to notify the user.
160    new_peer_sender: Sender<String>,
161}
162
163impl Node {
164    /// Establish a new node.
165    ///
166    /// Any time a new peer becomes visible to this node, the peer's node ID will be sent to
167    /// `new_peer_sender`.
168    ///
169    /// Any time a peer wants to establish a stream with this node, a reader and writer for the new
170    /// stream as well as the peer's node ID will be sent to `incoming_stream_sender`.
171    pub fn new(
172        node_id: &str,
173        protocol: &str,
174        new_peer_sender: Sender<String>,
175        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
176    ) -> Result<Node> {
177        let node_id = node_id.to_owned().try_into()?;
178        let protocol = protocol.to_owned().try_into()?;
179
180        Ok(Node {
181            node_id,
182            protocol,
183            new_peer_sender,
184            incoming_stream_sender,
185            peers: Arc::new(Mutex::new(PeerMap::new())),
186            has_router: false,
187        })
188    }
189
190    /// Establish a new node which will forward streams between its peers. The router process
191    /// provided must be polled continuously to provide this forwarding.
192    pub fn new_with_router(
193        node_id: &str,
194        protocol: &str,
195        interval: Duration,
196        new_peer_sender: Sender<String>,
197        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
198    ) -> Result<(Node, impl Future<Output = ()> + Send + use<>)> {
199        let mut node = Self::new(node_id, protocol, new_peer_sender, incoming_stream_sender)?;
200        node.has_router = true;
201
202        let weak_peers = Arc::downgrade(&node.peers);
203
204        Ok((node, router(weak_peers, interval)))
205    }
206
207    /// Establish a stream with another node. Data will be sent to the peer with
208    /// `connection_writer`, and received with `connection_reader`.
209    pub async fn connect_to_peer(
210        &self,
211        connection_reader: stream::Reader,
212        connection_writer: stream::Writer,
213        node_id: &str,
214    ) -> Result<()> {
215        if self.node_id == node_id {
216            self.incoming_stream_sender
217                .clone()
218                .send((connection_reader, connection_writer, node_id.to_owned()))
219                .await
220                .map_err(|_| Error::NoSuchPeer(node_id.to_owned()))?;
221        } else {
222            let node_id: EncodableString = node_id.to_owned().try_into()?;
223            // Write the destination node ID and the source node ID (us). Keep in mind because of
224            // the semantics of push_back_protocol_message, these will come off the wire in the
225            // *reverse* of the order we're putting them on here.
226            connection_reader.push_back_protocol_message(&self.node_id)?;
227            connection_reader.push_back_protocol_message(&node_id)?;
228            connect_to_peer(
229                Arc::clone(&self.peers),
230                connection_reader,
231                connection_writer,
232                &node_id,
233            )
234            .await?;
235        }
236
237        Ok(())
238    }
239
240    /// Test function to copy all routes toward one node as routes to another node.
241    #[cfg(test)]
242    pub fn route_via(&self, to: &str, via: &str) {
243        let to = EncodableString::try_from(to.to_owned()).unwrap();
244        let via = EncodableString::try_from(via.to_owned()).unwrap();
245        let mut peers = self.peers.lock();
246        let new_list = peers.peers.get(&via).unwrap().clone();
247        peers.peers.insert(to, new_list);
248    }
249
250    /// Connect to another node.
251    ///
252    /// This establishes the internal state to link this node directly to another one, there by
253    /// joining it to the circuit network. To actually perform the networking necessary to create
254    /// such a link, a back end will have to service the streams given to this function via
255    /// its arguments. To keep the link running, the returned future must also be polled to
256    /// completion. Depending on configuration, it may complete swiftly or may poll for the entire
257    /// lifetime of the link.
258    ///
259    /// When we link to another node, we immediately start a "control stream" that performs a
260    /// handshake and sends routing messages. The reader and writer passed in through
261    /// `control_stream` will be used to service this stream. If `control_stream` is `None` the
262    /// first stream emitted from `new_stream_receiver` will be used.
263    ///
264    /// When the local node needs to create a new stream to the linked node, it will send a reader
265    /// and writer to `new_stream_sender`.
266    ///
267    /// When the linked node wants to create a new stream to this node, the back end may send a
268    /// reader and writer through `new_stream_receiver`, as well as a `oneshot::Sender` which will
269    /// be used to report if the link is established successfully or if an error occurs.
270    ///
271    /// The returned future will continue to poll for the lifetime of the link and return the error
272    /// that terminated it.
273    pub fn link_node<F>(
274        &self,
275        control_stream: Option<(stream::Reader, stream::Writer)>,
276        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
277        mut new_stream_receiver: F,
278        quality: Quality,
279    ) -> impl Future<Output = Result<()>> + Send + use<F>
280    where
281        F: futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
282            + Unpin
283            + Send,
284    {
285        let has_router = self.has_router;
286        let peers = Arc::clone(&self.peers);
287        let node_id = self.node_id.clone();
288        let protocol = self.protocol.clone();
289        let (new_stream_receiver_sender, new_stream_receiver_receiver) = oneshot::channel();
290        let (control_reader_sender, control_reader_receiver) = oneshot::channel();
291        let new_streams_loop =
292            self.handle_new_streams(new_stream_receiver_receiver, new_stream_sender.clone());
293        let control_stream_loop =
294            self.handle_control_stream(control_reader_receiver, new_stream_sender.clone(), quality);
295
296        // Runs all necessary background processing for a connection. Tasks include:
297        // 1) Fetch a control stream from the new_stream_receiver if we don't have one from
298        //    control_stream already.
299        // 2) Send a handshake message to the other node.
300        // 3) Wait for and validate the handshake from the peer.
301        // 4) Receive updated routing information from the peer and update the peer map accordingly.
302        // 5) When the peer tries to open a new stream to us, either forward that stream to another
303        //    node or, if it's destined for us directly, consume the initial handshake from it and send
304        //    it out to be processed by the user.
305        async move {
306            let (control_reader, control_writer) = if let Some(control_stream) = control_stream {
307                control_stream
308            } else {
309                let (reader, writer, error_sender) =
310                    new_stream_receiver.next().await.ok_or_else(|| {
311                        Error::ConnectionClosed(Some(
312                            "Client stopped listening for new streams".to_owned(),
313                        ))
314                    })?;
315                let _ = error_sender.send(Ok(()));
316                (reader, writer)
317            };
318
319            let _ = new_stream_receiver_sender.send(new_stream_receiver);
320
321            let header = Identify::new(protocol.clone());
322            control_writer.write_protocol_message(&header)?;
323
324            let state = NodeState::Online(node_id.clone(), Quality::SELF);
325            control_writer.write_protocol_message(&state)?;
326
327            if has_router {
328                peers
329                    .lock()
330                    .add_control_channel(control_writer, quality)
331                    .expect("We just created this channel!");
332            } else {
333                // No router means no further routing messages. Just let 'er go.
334                std::mem::drop(control_writer);
335            }
336
337            // Start by reading and validating a handshake message from the stream.
338            let header = control_reader.read_protocol_message::<Identify>().await?;
339
340            if header.circuit_version != CIRCUIT_VERSION {
341                return Err(Error::VersionMismatch);
342            } else if header.protocol != protocol {
343                return Err(Error::ProtocolMismatch);
344            }
345
346            control_reader_sender.send(control_reader).map_err(|_| {
347                Error::ConnectionClosed(Some("Control stream handler disappeared".to_owned()))
348            })?;
349
350            let control_stream_loop = pin!(control_stream_loop);
351            let new_streams_loop = pin!(new_streams_loop);
352
353            let ret = match futures::future::select(control_stream_loop, new_streams_loop).await {
354                Either::Left((result, new_streams)) => {
355                    if matches!(result, Ok(()) | Err(Error::ConnectionClosed(_))) {
356                        new_streams.await;
357                    }
358                    result
359                }
360                Either::Right(((), read_control)) => read_control.now_or_never().unwrap_or(Ok(())),
361            };
362
363            {
364                let mut peers = peers.lock();
365                let peers = peers.peers();
366                for peer_list in peers.values_mut() {
367                    peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
368                }
369            }
370
371            ret
372        }
373    }
374
375    /// Handles messages we receive on a control stream, i.e. routing updates. `new_stream_sender`
376    /// is a channel by which streams destined for another peer can be forwarded to the node on the
377    /// other end of this control stream. The routing table will associate that sender with any
378    /// peers that can be reached via that node.
379    ///
380    /// The returned future will poll until the control stream hangs up or a protocol error occurs.
381    fn handle_control_stream(
382        &self,
383        control_reader: oneshot::Receiver<stream::Reader>,
384        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
385        quality: Quality,
386    ) -> impl Future<Output = Result<()>> + Send + use<> {
387        let peers = Arc::clone(&self.peers);
388        let new_stream_sender = new_stream_sender;
389        let node_id = self.node_id.clone();
390        let mut new_peer_sender = self.new_peer_sender.clone();
391
392        async move {
393            let control_reader = control_reader.await.map_err(|_| {
394                Error::ConnectionClosed(Some(
395                    "Reader never given to control stream handler".to_string(),
396                ))
397            })?;
398            loop {
399                let state = control_reader.read_protocol_message::<NodeState>().await?;
400                match state {
401                    NodeState::Online(peer, path_quality) => {
402                        if peer == node_id {
403                            continue;
404                        }
405
406                        let quality = path_quality.combine(quality);
407                        let peer_string = peer.to_string();
408                        let should_send = {
409                            let mut peers = peers.lock();
410                            let peers = peers.peers();
411                            let peer_list = peers.entry(peer).or_insert_with(Vec::new);
412                            let should_send = peer_list.is_empty();
413                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
414                            peer_list.push((new_stream_sender.clone(), quality));
415                            peer_list.sort_by_key(|x| x.1);
416                            should_send
417                        };
418                        if should_send {
419                            let _ = new_peer_sender.send(peer_string).await;
420                        }
421                    }
422                    NodeState::Offline(peer) => {
423                        let mut peers = peers.lock();
424                        let peers = peers.peers();
425                        let peer_list = peers.get_mut(&peer);
426
427                        if let Some(peer_list) = peer_list {
428                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
429                        }
430                    }
431                }
432            }
433        }
434    }
435
436    /// Handles requests for new streams. The `new_stream_receiver` provides a reader and writer for
437    /// every stream that gets established to or through this node, as well as a `Result` sender so
438    /// we can indicate if we have any trouble handling this stream. The `new_stream_sender` allows
439    /// us to connect back to the connecting node. If we don't have routing info for the incoming
440    /// node we will establish a new route via that stream when the connection arrives.
441    ///
442    /// For each incoming stream, we read a bit of protocol header out of it and either accept it or
443    /// forward it to another peer.
444    ///
445    /// The returned future will poll until the back end hangs up the other end of the receiver.
446    fn handle_new_streams<F>(
447        &self,
448        new_stream_receiver_receiver: oneshot::Receiver<F>,
449        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
450    ) -> impl Future<Output = ()> + use<F>
451    where
452        F: futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
453            + Unpin,
454    {
455        let peers = Arc::clone(&self.peers);
456        let mut incoming_stream_sender = self.incoming_stream_sender.clone();
457        let mut new_peer_sender = self.new_peer_sender.clone();
458        let node_id = self.node_id.clone();
459
460        async move {
461            let mut new_stream_receiver = if let Ok(x) = new_stream_receiver_receiver.await {
462                x
463            } else {
464                return;
465            };
466
467            while let Some((reader, writer, result_sender)) = new_stream_receiver.next().await {
468                let _ = result_sender.send(
469                    async {
470                        let dest = reader
471                            .read(EncodableString::MIN_SIZE, |buf| {
472                                EncodableString::try_from_bytes(buf).map(|(dest, size)| {
473                                    if dest == node_id {
474                                        // If the destination is this node, discard the string
475                                        // itself (we know where we are, so we don't need it)
476                                        // and return as usual.
477                                        (None, size)
478                                    } else {
479                                        // If the destination node is another node, return a
480                                        // size of zero, which tells the reader to leave the
481                                        // destination string in the stream. When we forward the
482                                        // stream, the node we forward it to will be able to
483                                        // read the destination again.
484                                        (Some(dest), 0)
485                                    }
486                                })
487                            })
488                            .await?;
489
490                        if let Some(dest) = dest {
491                            connect_to_peer(Arc::clone(&peers), reader, writer, &dest).await?;
492                        } else {
493                            let src = reader.read_protocol_message::<EncodableString>().await?;
494                            let send_new_peer = {
495                                let mut peers = peers.lock();
496                                let peer_list =
497                                    peers.peers.entry(src.clone()).or_insert_with(Vec::new);
498                                if !peer_list.iter().any(|x| x.0.same_receiver(&new_stream_sender))
499                                {
500                                    peer_list.push((new_stream_sender.clone(), Quality::UNKNOWN));
501                                    peers.increment_generation();
502                                    true
503                                } else {
504                                    false
505                                }
506                            };
507
508                            if send_new_peer {
509                                let _ = new_peer_sender.send(src.to_string()).await;
510                            }
511
512                            incoming_stream_sender
513                                .send((reader, writer, src.to_string()))
514                                .await
515                                .map_err(|_| {
516                                    Error::ConnectionClosed(Some(
517                                        "Incoming stream dispatcher disappeared".to_owned(),
518                                    ))
519                                })?;
520                        }
521                        Ok(())
522                    }
523                    .await,
524                );
525            }
526        }
527    }
528
529    /// Get the node ID of this node.
530    pub fn node_id(&self) -> &str {
531        self.node_id.as_str()
532    }
533}
534
535/// Given the reader and writer for an incoming connection, forward that
536/// connection to another node.
537async fn connect_to_peer(
538    peers: Arc<Mutex<PeerMap>>,
539    peer_reader: stream::Reader,
540    peer_writer: stream::Writer,
541    node_id: &EncodableString,
542) -> Result<()> {
543    let mut peer_channels = Some((peer_reader, peer_writer));
544    let mut peer_sender: Option<Sender<(stream::Reader, stream::Writer)>> = None;
545
546    poll_fn(|ctx| {
547        // If we found a place to send the new stream during the last poll, but
548        // it gave us pending, we have to keep that sender's clone around so our
549        // waker is remembered and we get woken up. We could just drop it here
550        // and repeat the whole discovery again, which would cost compute but
551        // make us more responsive to changes in the network topology by a
552        // smidge. Instead we'll try the same link again, and keep trying until
553        // it succeeds or breaks.
554        if let Some(sender) = &mut peer_sender {
555            match sender.poll_ready(ctx) {
556                Poll::Ready(Ok(())) => {
557                    sender
558                        .start_send(peer_channels.take().unwrap())
559                        .expect("Should be guaranteed to succeed!");
560                    return Poll::Ready(());
561                }
562                Poll::Ready(Err(_)) => peer_sender = None,
563                Poll::Pending => {
564                    return Poll::Pending;
565                }
566            }
567        }
568
569        let mut peers = peers.lock();
570
571        // For each peer we have a list of channels to which we can send our
572        // reader and writer, each representing a connection which will become
573        // the next link in the circuit. The list is sorted by connection
574        // quality, getting worse toward the end of the list, so we want to send
575        // our reader and writer to the first one we can.
576        let Some(peer_list) = peers.peers.get_mut(node_id) else {
577            return Poll::Ready(());
578        };
579
580        let mut changed = false;
581
582        // Go through each potential connection and send to the first one which
583        // will handle the connection. We may discover the first few we try have
584        // hung up and gone away, so we'll delete those from the list and try
585        // the next one.
586        //
587        // If the channel used to send to the fastest available connection is
588        // full, we pause and retry when it is ready. We *could* continue down
589        // the list to find another connection, but we don't; we assume waiting
590        // for a faster connection to be serviceable locally nets better
591        // performance in the long run than sending on a slower connection that
592        // can be serviced right away.
593        peer_list.retain_mut(|x| {
594            if peer_sender.is_none() && peer_channels.is_some() {
595                let mut sender = x.0.clone();
596                match sender.poll_ready(ctx) {
597                    Poll::Ready(Ok(())) => {
598                        sender
599                            .start_send(peer_channels.take().unwrap())
600                            .expect("Should be guaranteed to succeed!");
601                        true
602                    }
603                    Poll::Ready(Err(_)) => {
604                        changed = true;
605                        false
606                    }
607                    Poll::Pending => {
608                        peer_sender = Some(sender);
609                        true
610                    }
611                }
612            } else {
613                true
614            }
615        });
616
617        // If this is true, we cleared out some stale connections from the
618        // routing table. Send a routing update to update our neighbors about
619        // how this might affect connectivity.
620        if changed {
621            peers.increment_generation();
622        }
623
624        // The peer sender is where we register our waker. If we don't have one
625        // we didn't register a waker and should return now.
626        if peer_sender.is_none() { Poll::Ready(()) } else { Poll::Pending }
627    })
628    .await;
629
630    // Our iteration above should have taken channels and sent them along to the
631    // connection that will handle them. If they're still here we didn't find a
632    // channel.
633    if peer_channels.is_none() { Ok(()) } else { Err(Error::NoSuchPeer(node_id.to_string())) }
634}
635
636/// Given an old and a new condensed routing table, create a serialized list of
637/// `NodeState`s which will update a node on what has changed between them.
638fn route_updates(
639    old_routes: &HashMap<EncodableString, Quality>,
640    new_routes: &HashMap<EncodableString, Quality>,
641) -> Vec<u8> {
642    let mut ret = Vec::new();
643
644    for (node, &quality) in new_routes {
645        if let Some(&old_quality) = old_routes.get(node) {
646            if old_quality == quality {
647                continue;
648            }
649        }
650
651        NodeState::Online(node.clone(), quality).write_bytes_vec(&mut ret);
652    }
653
654    for old_node in old_routes.keys() {
655        if !new_routes.contains_key(old_node) {
656            NodeState::Offline(old_node.clone()).write_bytes_vec(&mut ret);
657        }
658    }
659
660    ret
661}
662
663/// Router process. Notifies each peer every time the router table is updated. The given `interval`
664/// allows these updates to be rate-limited; there will always be at least `interval` time between
665/// updates.
666async fn router(peers: Weak<Mutex<PeerMap>>, interval: Duration) {
667    let mut generation = 0;
668
669    loop {
670        let mut wake_receiver = None;
671        {
672            let peers = if let Some(peers) = peers.upgrade() {
673                peers
674            } else {
675                return;
676            };
677            let mut peers = peers.lock();
678
679            if peers.generation <= generation {
680                let (sender, receiver) = oneshot::channel();
681                peers.wakeup = Some(sender);
682                wake_receiver = Some(receiver);
683            } else {
684                let new_routes = peers.condense_routes();
685
686                peers.control_channels.retain_mut(|(sender, routes)| {
687                    let msgs = route_updates(routes, &new_routes);
688
689                    if sender
690                        .write(msgs.len(), |buf| {
691                            buf[..msgs.len()].copy_from_slice(&msgs);
692                            Ok(msgs.len())
693                        })
694                        .is_ok()
695                    {
696                        *routes = new_routes.clone();
697                        true
698                    } else {
699                        false
700                    }
701                });
702
703                generation = peers.generation;
704            }
705        }
706
707        if let Some(receiver) = wake_receiver {
708            let _ = receiver.await;
709        } else {
710            Timer::new(interval).await
711        }
712    }
713}