circuit/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::Timer;
6use futures::channel::mpsc::Sender;
7use futures::channel::oneshot;
8use futures::future::{poll_fn, Either};
9use futures::stream::StreamExt as _;
10use futures::{FutureExt, SinkExt as _};
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::pin;
14use std::sync::{Arc, Mutex, Weak};
15use std::task::Poll;
16use std::time::Duration;
17
18pub const CIRCUIT_VERSION: u8 = 0;
19
20mod error;
21mod protocol;
22#[cfg(test)]
23mod test;
24
25mod connection;
26pub mod multi_stream;
27pub mod stream;
28
29use protocol::{EncodableString, Identify, NodeState};
30
31pub use connection::{Connection, ConnectionNode};
32pub use error::{Error, Result};
33pub use protocol::Quality;
34
35use crate::protocol::ProtocolMessage;
36
37/// A list of other nodes we can see on the mesh. For each such node we retain a vector of senders
38/// which allow us to establish new streams with that node. Each sender in the vector corresponds to
39/// a different path we could take to the desired peer. For each sender we also store the sum of all
40/// quality values for hops to that peer (see `header::NodeState::Online`).
41struct PeerMap {
42    /// The actual map of peers itself.
43    peers: HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>,
44    /// This value increments once every time the peer map changes. Consequently, we can track
45    /// changes in this number to determine when a routing refresh is necessary.
46    generation: usize,
47    /// This allows the router task to wait for this structure to update. Put simply, any time we
48    /// increment `generation`, we should remove this sender from the struct and fire it if it is
49    /// present.
50    wakeup: Option<oneshot::Sender<()>>,
51    /// Control channels for various peer nodes.
52    ///
53    /// Every time we directly connect to a new node, we start a stream called the control stream,
54    /// which we use to send handshake messages and routing updates. Once we've written the
55    /// handshake message, the write side of that control stream ends up here, and this is where we
56    /// send routing updates.
57    ///
58    /// If we have a routing task, then any time the peer map updates, we will send updated node
59    /// states to each channel contained in this `Vec`. The `HashMap` contains the state of the peer
60    /// map the last time we sent info and is used to determine what updates need to be sent.
61    ///
62    /// If we don't have a routing task, this should be empty. In that case we're a "leaf node" and
63    /// we don't send any routing information to peers at all.
64    control_channels: Vec<(stream::Writer, HashMap<EncodableString, Quality>)>,
65}
66
67impl PeerMap {
68    /// Create a new peer map.
69    fn new() -> Self {
70        PeerMap {
71            peers: HashMap::new(),
72
73            // Start with generation 1 so the routing task can start with generation 0. This means
74            // the routing task is out of sync as soon as it starts, so it will immediately trigger
75            // an update.
76            generation: 1,
77
78            wakeup: None,
79            control_channels: Vec::new(),
80        }
81    }
82
83    /// Increment the generation field and fire the wakeup sender if present.
84    ///
85    /// In short: this signals to the routing task that the peer map has been modified, so that it
86    /// can update neighboring nodes with new routing information.
87    fn increment_generation(&mut self) {
88        self.generation += 1;
89        self.wakeup.take().map(|x| {
90            let _ = x.send(());
91        });
92    }
93
94    /// Get the list of peers mutably, and signal the routing task that we have modified it.
95    fn peers(
96        &mut self,
97    ) -> &mut HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>
98    {
99        self.increment_generation();
100        &mut self.peers
101    }
102
103    /// Reduces our peer map to a simpler form which contains only the name of each node we can see
104    /// and the quality rating of the fastest connection we have to that node.
105    fn condense_routes(&self) -> HashMap<EncodableString, Quality> {
106        let mut ret = HashMap::new();
107
108        for (key, value) in &self.peers {
109            if value.is_empty() {
110                continue;
111            }
112
113            ret.insert(key.clone(), value[0].1);
114        }
115
116        ret
117    }
118
119    /// Adds a control channel to `control_channels` and sends an initial route update for that
120    /// channel.
121    fn add_control_channel(&mut self, channel: stream::Writer, quality: Quality) -> Result<()> {
122        let routes = self.condense_routes();
123
124        for (node, &route_quality) in &routes {
125            let quality = quality.combine(route_quality);
126            let state = NodeState::Online(node.clone(), quality);
127
128            channel.write_protocol_message(&state)?;
129        }
130
131        self.control_channels.push((channel, routes));
132
133        Ok(())
134    }
135}
136
137/// Represents a node on the circuit network.
138///
139/// A node can connect to one or more other nodes, and a stream can be established between any two
140/// nodes on the entire connected graph of nodes.
141pub struct Node {
142    /// A unique identifying string for this node.
143    node_id: EncodableString,
144    /// Indicates what protocol we intend to run atop the streams we established, e.g.
145    /// "Overnet-with-FIDL-ABI-1"
146    protocol: EncodableString,
147    /// List of other nodes we can "see" on the network.
148    peers: Arc<Mutex<PeerMap>>,
149    /// If true, we will inform nodes we are connected to of what other nodes we can see, so that
150    /// they might try to forward traffic through us. This also indicates that there is a "router
151    /// process" running for this node which handles said forwarding.
152    has_router: bool,
153
154    /// If another node establishes a connection to this node, we will notify the user by way of
155    /// this sender.
156    incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
157
158    /// If a new peer becomes available we will send its name through this sender to notify the user.
159    new_peer_sender: Sender<String>,
160}
161
162impl Node {
163    /// Establish a new node.
164    ///
165    /// Any time a new peer becomes visible to this node, the peer's node ID will be sent to
166    /// `new_peer_sender`.
167    ///
168    /// Any time a peer wants to establish a stream with this node, a reader and writer for the new
169    /// stream as well as the peer's node ID will be sent to `incoming_stream_sender`.
170    pub fn new(
171        node_id: &str,
172        protocol: &str,
173        new_peer_sender: Sender<String>,
174        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
175    ) -> Result<Node> {
176        let node_id = node_id.to_owned().try_into()?;
177        let protocol = protocol.to_owned().try_into()?;
178
179        Ok(Node {
180            node_id,
181            protocol,
182            new_peer_sender,
183            incoming_stream_sender,
184            peers: Arc::new(Mutex::new(PeerMap::new())),
185            has_router: false,
186        })
187    }
188
189    /// Establish a new node which will forward streams between its peers. The router process
190    /// provided must be polled continuously to provide this forwarding.
191    pub fn new_with_router(
192        node_id: &str,
193        protocol: &str,
194        interval: Duration,
195        new_peer_sender: Sender<String>,
196        incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
197    ) -> Result<(Node, impl Future<Output = ()> + Send)> {
198        let mut node = Self::new(node_id, protocol, new_peer_sender, incoming_stream_sender)?;
199        node.has_router = true;
200
201        let weak_peers = Arc::downgrade(&node.peers);
202
203        Ok((node, router(weak_peers, interval)))
204    }
205
206    /// Establish a stream with another node. Data will be sent to the peer with
207    /// `connection_writer`, and received with `connection_reader`.
208    pub async fn connect_to_peer(
209        &self,
210        connection_reader: stream::Reader,
211        connection_writer: stream::Writer,
212        node_id: &str,
213    ) -> Result<()> {
214        if self.node_id == node_id {
215            self.incoming_stream_sender
216                .clone()
217                .send((connection_reader, connection_writer, node_id.to_owned()))
218                .await
219                .map_err(|_| Error::NoSuchPeer(node_id.to_owned()))?;
220        } else {
221            let node_id: EncodableString = node_id.to_owned().try_into()?;
222            // Write the destination node ID and the source node ID (us). Keep in mind because of
223            // the semantics of push_back_protocol_message, these will come off the wire in the
224            // *reverse* of the order we're putting them on here.
225            connection_reader.push_back_protocol_message(&self.node_id)?;
226            connection_reader.push_back_protocol_message(&node_id)?;
227            connect_to_peer(
228                Arc::clone(&self.peers),
229                connection_reader,
230                connection_writer,
231                &node_id,
232            )
233            .await?;
234        }
235
236        Ok(())
237    }
238
239    /// Test function to copy all routes toward one node as routes to another node.
240    #[cfg(test)]
241    pub fn route_via(&self, to: &str, via: &str) {
242        let to = EncodableString::try_from(to.to_owned()).unwrap();
243        let via = EncodableString::try_from(via.to_owned()).unwrap();
244        let mut peers = self.peers.lock().unwrap();
245        let new_list = peers.peers.get(&via).unwrap().clone();
246        peers.peers.insert(to, new_list);
247    }
248
249    /// Connect to another node.
250    ///
251    /// This establishes the internal state to link this node directly to another one, there by
252    /// joining it to the circuit network. To actually perform the networking necessary to create
253    /// such a link, a back end will have to service the streams given to this function via
254    /// its arguments. To keep the link running, the returned future must also be polled to
255    /// completion. Depending on configuration, it may complete swiftly or may poll for the entire
256    /// lifetime of the link.
257    ///
258    /// When we link to another node, we immediately start a "control stream" that performs a
259    /// handshake and sends routing messages. The reader and writer passed in through
260    /// `control_stream` will be used to service this stream. If `control_stream` is `None` the
261    /// first stream emitted from `new_stream_receiver` will be used.
262    ///
263    /// When the local node needs to create a new stream to the linked node, it will send a reader
264    /// and writer to `new_stream_sender`.
265    ///
266    /// When the linked node wants to create a new stream to this node, the back end may send a
267    /// reader and writer through `new_stream_receiver`, as well as a `oneshot::Sender` which will
268    /// be used to report if the link is established successfully or if an error occurs.
269    ///
270    /// The returned future will continue to poll for the lifetime of the link and return the error
271    /// that terminated it.
272    pub fn link_node(
273        &self,
274        control_stream: Option<(stream::Reader, stream::Writer)>,
275        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
276        mut new_stream_receiver: impl futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
277            + Unpin
278            + Send,
279        quality: Quality,
280    ) -> impl Future<Output = Result<()>> + Send {
281        let has_router = self.has_router;
282        let peers = Arc::clone(&self.peers);
283        let node_id = self.node_id.clone();
284        let protocol = self.protocol.clone();
285        let (new_stream_receiver_sender, new_stream_receiver_receiver) = oneshot::channel();
286        let (control_reader_sender, control_reader_receiver) = oneshot::channel();
287        let new_streams_loop =
288            self.handle_new_streams(new_stream_receiver_receiver, new_stream_sender.clone());
289        let control_stream_loop =
290            self.handle_control_stream(control_reader_receiver, new_stream_sender.clone(), quality);
291
292        // Runs all necessary background processing for a connection. Tasks include:
293        // 1) Fetch a control stream from the new_stream_receiver if we don't have one from
294        //    control_stream already.
295        // 2) Send a handshake message to the other node.
296        // 3) Wait for and validate the handshake from the peer.
297        // 4) Receive updated routing information from the peer and update the peer map accordingly.
298        // 5) When the peer tries to open a new stream to us, either forward that stream to another
299        //    node or, if it's destined for us directly, consume the initial handshake from it and send
300        //    it out to be processed by the user.
301        async move {
302            let (control_reader, control_writer) = if let Some(control_stream) = control_stream {
303                control_stream
304            } else {
305                let (reader, writer, error_sender) =
306                    new_stream_receiver.next().await.ok_or_else(|| {
307                        Error::ConnectionClosed(Some(
308                            "Client stopped listening for new streams".to_owned(),
309                        ))
310                    })?;
311                let _ = error_sender.send(Ok(()));
312                (reader, writer)
313            };
314
315            let _ = new_stream_receiver_sender.send(new_stream_receiver);
316
317            let header = Identify::new(protocol.clone());
318            control_writer.write_protocol_message(&header)?;
319
320            let state = NodeState::Online(node_id.clone(), Quality::SELF);
321            control_writer.write_protocol_message(&state)?;
322
323            if has_router {
324                peers
325                    .lock()
326                    .unwrap()
327                    .add_control_channel(control_writer, quality)
328                    .expect("We just created this channel!");
329            } else {
330                // No router means no further routing messages. Just let 'er go.
331                std::mem::drop(control_writer);
332            }
333
334            // Start by reading and validating a handshake message from the stream.
335            let header = control_reader.read_protocol_message::<Identify>().await?;
336
337            if header.circuit_version != CIRCUIT_VERSION {
338                return Err(Error::VersionMismatch);
339            } else if header.protocol != protocol {
340                return Err(Error::ProtocolMismatch);
341            }
342
343            control_reader_sender.send(control_reader).map_err(|_| {
344                Error::ConnectionClosed(Some("Control stream handler disappeared".to_owned()))
345            })?;
346
347            let control_stream_loop = pin!(control_stream_loop);
348            let new_streams_loop = pin!(new_streams_loop);
349
350            let ret = match futures::future::select(control_stream_loop, new_streams_loop).await {
351                Either::Left((result, new_streams)) => {
352                    if matches!(result, Ok(()) | Err(Error::ConnectionClosed(_))) {
353                        new_streams.await;
354                    }
355                    result
356                }
357                Either::Right(((), read_control)) => read_control.now_or_never().unwrap_or(Ok(())),
358            };
359
360            {
361                let mut peers = peers.lock().unwrap();
362                let peers = peers.peers();
363                for peer_list in peers.values_mut() {
364                    peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
365                }
366            }
367
368            ret
369        }
370    }
371
372    /// Handles messages we receive on a control stream, i.e. routing updates. `new_stream_sender`
373    /// is a channel by which streams destined for another peer can be forwarded to the node on the
374    /// other end of this control stream. The routing table will associate that sender with any
375    /// peers that can be reached via that node.
376    ///
377    /// The returned future will poll until the control stream hangs up or a protocol error occurs.
378    fn handle_control_stream(
379        &self,
380        control_reader: oneshot::Receiver<stream::Reader>,
381        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
382        quality: Quality,
383    ) -> impl Future<Output = Result<()>> + Send {
384        let peers = Arc::clone(&self.peers);
385        let new_stream_sender = new_stream_sender;
386        let node_id = self.node_id.clone();
387        let mut new_peer_sender = self.new_peer_sender.clone();
388
389        async move {
390            let control_reader = control_reader.await.map_err(|_| {
391                Error::ConnectionClosed(Some(
392                    "Reader never given to control stream handler".to_string(),
393                ))
394            })?;
395            loop {
396                let state = control_reader.read_protocol_message::<NodeState>().await?;
397                match state {
398                    NodeState::Online(peer, path_quality) => {
399                        if peer == node_id {
400                            continue;
401                        }
402
403                        let quality = path_quality.combine(quality);
404                        let peer_string = peer.to_string();
405                        let should_send = {
406                            let mut peers = peers.lock().unwrap();
407                            let peers = peers.peers();
408                            let peer_list = peers.entry(peer).or_insert_with(Vec::new);
409                            let should_send = peer_list.is_empty();
410                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
411                            peer_list.push((new_stream_sender.clone(), quality));
412                            peer_list.sort_by_key(|x| x.1);
413                            should_send
414                        };
415                        if should_send {
416                            let _ = new_peer_sender.send(peer_string).await;
417                        }
418                    }
419                    NodeState::Offline(peer) => {
420                        let mut peers = peers.lock().unwrap();
421                        let peers = peers.peers();
422                        let peer_list = peers.get_mut(&peer);
423
424                        if let Some(peer_list) = peer_list {
425                            peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
426                        }
427                    }
428                }
429            }
430        }
431    }
432
433    /// Handles requests for new streams. The `new_stream_receiver` provides a reader and writer for
434    /// every stream that gets established to or through this node, as well as a `Result` sender so
435    /// we can indicate if we have any trouble handling this stream. The `new_stream_sender` allows
436    /// us to connect back to the connecting node. If we don't have routing info for the incoming
437    /// node we will establish a new route via that stream when the connection arrives.
438    ///
439    /// For each incoming stream, we read a bit of protocol header out of it and either accept it or
440    /// forward it to another peer.
441    ///
442    /// The returned future will poll until the back end hangs up the other end of the receiver.
443    fn handle_new_streams(
444        &self,
445        new_stream_receiver_receiver: oneshot::Receiver<
446            impl futures::Stream<
447                    Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>),
448                > + Unpin,
449        >,
450        new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
451    ) -> impl Future<Output = ()> {
452        let peers = Arc::clone(&self.peers);
453        let mut incoming_stream_sender = self.incoming_stream_sender.clone();
454        let mut new_peer_sender = self.new_peer_sender.clone();
455        let node_id = self.node_id.clone();
456
457        async move {
458            let mut new_stream_receiver = if let Ok(x) = new_stream_receiver_receiver.await {
459                x
460            } else {
461                return;
462            };
463
464            while let Some((reader, writer, result_sender)) = new_stream_receiver.next().await {
465                let _ = result_sender.send(
466                    async {
467                        let dest = reader
468                            .read(EncodableString::MIN_SIZE, |buf| {
469                                EncodableString::try_from_bytes(buf).map(|(dest, size)| {
470                                    if dest == node_id {
471                                        // If the destination is this node, discard the string
472                                        // itself (we know where we are, so we don't need it)
473                                        // and return as usual.
474                                        (None, size)
475                                    } else {
476                                        // If the destination node is another node, return a
477                                        // size of zero, which tells the reader to leave the
478                                        // destination string in the stream. When we forward the
479                                        // stream, the node we forward it to will be able to
480                                        // read the destination again.
481                                        (Some(dest), 0)
482                                    }
483                                })
484                            })
485                            .await?;
486
487                        if let Some(dest) = dest {
488                            connect_to_peer(Arc::clone(&peers), reader, writer, &dest).await?;
489                        } else {
490                            let src = reader.read_protocol_message::<EncodableString>().await?;
491                            let send_new_peer = {
492                                let mut peers = peers.lock().unwrap();
493                                let peer_list =
494                                    peers.peers.entry(src.clone()).or_insert_with(Vec::new);
495                                if !peer_list.iter().any(|x| x.0.same_receiver(&new_stream_sender))
496                                {
497                                    peer_list.push((new_stream_sender.clone(), Quality::UNKNOWN));
498                                    peers.increment_generation();
499                                    true
500                                } else {
501                                    false
502                                }
503                            };
504
505                            if send_new_peer {
506                                let _ = new_peer_sender.send(src.to_string()).await;
507                            }
508
509                            incoming_stream_sender
510                                .send((reader, writer, src.to_string()))
511                                .await
512                                .map_err(|_| {
513                                    Error::ConnectionClosed(Some(
514                                        "Incoming stream dispatcher disappeared".to_owned(),
515                                    ))
516                                })?;
517                        }
518                        Ok(())
519                    }
520                    .await,
521                );
522            }
523        }
524    }
525
526    /// Get the node ID of this node.
527    pub fn node_id(&self) -> &str {
528        self.node_id.as_str()
529    }
530}
531
532/// Given the reader and writer for an incoming connection, forward that
533/// connection to another node.
534async fn connect_to_peer(
535    peers: Arc<Mutex<PeerMap>>,
536    peer_reader: stream::Reader,
537    peer_writer: stream::Writer,
538    node_id: &EncodableString,
539) -> Result<()> {
540    let mut peer_channels = Some((peer_reader, peer_writer));
541    let mut peer_sender: Option<Sender<(stream::Reader, stream::Writer)>> = None;
542
543    poll_fn(|ctx| {
544        // If we found a place to send the new stream during the last poll, but
545        // it gave us pending, we have to keep that sender's clone around so our
546        // waker is remembered and we get woken up. We could just drop it here
547        // and repeat the whole discovery again, which would cost compute but
548        // make us more responsive to changes in the network topology by a
549        // smidge. Instead we'll try the same link again, and keep trying until
550        // it succeeds or breaks.
551        if let Some(sender) = &mut peer_sender {
552            match sender.poll_ready(ctx) {
553                Poll::Ready(Ok(())) => {
554                    sender
555                        .start_send(peer_channels.take().unwrap())
556                        .expect("Should be guaranteed to succeed!");
557                    return Poll::Ready(());
558                }
559                Poll::Ready(Err(_)) => peer_sender = None,
560                Poll::Pending => {
561                    return Poll::Pending;
562                }
563            }
564        }
565
566        let mut peers = peers.lock().unwrap();
567
568        // For each peer we have a list of channels to which we can send our
569        // reader and writer, each representing a connection which will become
570        // the next link in the circuit. The list is sorted by connection
571        // quality, getting worse toward the end of the list, so we want to send
572        // our reader and writer to the first one we can.
573        let Some(peer_list) = peers.peers.get_mut(node_id) else {
574            return Poll::Ready(());
575        };
576
577        let mut changed = false;
578
579        // Go through each potential connection and send to the first one which
580        // will handle the connection. We may discover the first few we try have
581        // hung up and gone away, so we'll delete those from the list and try
582        // the next one.
583        //
584        // If the channel used to send to the fastest available connection is
585        // full, we pause and retry when it is ready. We *could* continue down
586        // the list to find another connection, but we don't; we assume waiting
587        // for a faster connection to be serviceable locally nets better
588        // performance in the long run than sending on a slower connection that
589        // can be serviced right away.
590        peer_list.retain_mut(|x| {
591            if peer_sender.is_none() && peer_channels.is_some() {
592                let mut sender = x.0.clone();
593                match sender.poll_ready(ctx) {
594                    Poll::Ready(Ok(())) => {
595                        sender
596                            .start_send(peer_channels.take().unwrap())
597                            .expect("Should be guaranteed to succeed!");
598                        true
599                    }
600                    Poll::Ready(Err(_)) => {
601                        changed = true;
602                        false
603                    }
604                    Poll::Pending => {
605                        peer_sender = Some(sender);
606                        true
607                    }
608                }
609            } else {
610                true
611            }
612        });
613
614        // If this is true, we cleared out some stale connections from the
615        // routing table. Send a routing update to update our neighbors about
616        // how this might affect connectivity.
617        if changed {
618            peers.increment_generation();
619        }
620
621        // The peer sender is where we register our waker. If we don't have one
622        // we didn't register a waker and should return now.
623        if peer_sender.is_none() {
624            Poll::Ready(())
625        } else {
626            Poll::Pending
627        }
628    })
629    .await;
630
631    // Our iteration above should have taken channels and sent them along to the
632    // connection that will handle them. If they're still here we didn't find a
633    // channel.
634    if peer_channels.is_none() {
635        Ok(())
636    } else {
637        Err(Error::NoSuchPeer(node_id.to_string()))
638    }
639}
640
641/// Given an old and a new condensed routing table, create a serialized list of
642/// `NodeState`s which will update a node on what has changed between them.
643fn route_updates(
644    old_routes: &HashMap<EncodableString, Quality>,
645    new_routes: &HashMap<EncodableString, Quality>,
646) -> Vec<u8> {
647    let mut ret = Vec::new();
648
649    for (node, &quality) in new_routes {
650        if let Some(&old_quality) = old_routes.get(node) {
651            if old_quality == quality {
652                continue;
653            }
654        }
655
656        NodeState::Online(node.clone(), quality).write_bytes_vec(&mut ret);
657    }
658
659    for old_node in old_routes.keys() {
660        if !new_routes.contains_key(old_node) {
661            NodeState::Offline(old_node.clone()).write_bytes_vec(&mut ret);
662        }
663    }
664
665    ret
666}
667
668/// Router process. Notifies each peer every time the router table is updated. The given `interval`
669/// allows these updates to be rate-limited; there will always be at least `interval` time between
670/// updates.
671async fn router(peers: Weak<Mutex<PeerMap>>, interval: Duration) {
672    let mut generation = 0;
673
674    loop {
675        let mut wake_receiver = None;
676        {
677            let peers = if let Some(peers) = peers.upgrade() {
678                peers
679            } else {
680                return;
681            };
682            let mut peers = peers.lock().unwrap();
683
684            if peers.generation <= generation {
685                let (sender, receiver) = oneshot::channel();
686                peers.wakeup = Some(sender);
687                wake_receiver = Some(receiver);
688            } else {
689                let new_routes = peers.condense_routes();
690
691                peers.control_channels.retain_mut(|(sender, routes)| {
692                    let msgs = route_updates(routes, &new_routes);
693
694                    if sender
695                        .write(msgs.len(), |buf| {
696                            buf[..msgs.len()].copy_from_slice(&msgs);
697                            Ok(msgs.len())
698                        })
699                        .is_ok()
700                    {
701                        *routes = new_routes.clone();
702                        true
703                    } else {
704                        false
705                    }
706                });
707
708                generation = peers.generation;
709            }
710        }
711
712        if let Some(receiver) = wake_receiver {
713            let _ = receiver.await;
714        } else {
715            Timer::new(interval).await
716        }
717    }
718}