circuit/lib.rs
1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::Timer;
6use futures::channel::mpsc::Sender;
7use futures::channel::oneshot;
8use futures::future::{poll_fn, Either};
9use futures::stream::StreamExt as _;
10use futures::{FutureExt, SinkExt as _};
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::pin;
14use std::sync::{Arc, Mutex, Weak};
15use std::task::Poll;
16use std::time::Duration;
17
18pub const CIRCUIT_VERSION: u8 = 0;
19
20mod error;
21mod protocol;
22#[cfg(test)]
23mod test;
24
25mod connection;
26pub mod multi_stream;
27pub mod stream;
28
29use protocol::{EncodableString, Identify, NodeState};
30
31pub use connection::{Connection, ConnectionNode};
32pub use error::{Error, Result};
33pub use protocol::Quality;
34
35use crate::protocol::ProtocolMessage;
36
37/// A list of other nodes we can see on the mesh. For each such node we retain a vector of senders
38/// which allow us to establish new streams with that node. Each sender in the vector corresponds to
39/// a different path we could take to the desired peer. For each sender we also store the sum of all
40/// quality values for hops to that peer (see `header::NodeState::Online`).
41struct PeerMap {
42 /// The actual map of peers itself.
43 peers: HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>,
44 /// This value increments once every time the peer map changes. Consequently, we can track
45 /// changes in this number to determine when a routing refresh is necessary.
46 generation: usize,
47 /// This allows the router task to wait for this structure to update. Put simply, any time we
48 /// increment `generation`, we should remove this sender from the struct and fire it if it is
49 /// present.
50 wakeup: Option<oneshot::Sender<()>>,
51 /// Control channels for various peer nodes.
52 ///
53 /// Every time we directly connect to a new node, we start a stream called the control stream,
54 /// which we use to send handshake messages and routing updates. Once we've written the
55 /// handshake message, the write side of that control stream ends up here, and this is where we
56 /// send routing updates.
57 ///
58 /// If we have a routing task, then any time the peer map updates, we will send updated node
59 /// states to each channel contained in this `Vec`. The `HashMap` contains the state of the peer
60 /// map the last time we sent info and is used to determine what updates need to be sent.
61 ///
62 /// If we don't have a routing task, this should be empty. In that case we're a "leaf node" and
63 /// we don't send any routing information to peers at all.
64 control_channels: Vec<(stream::Writer, HashMap<EncodableString, Quality>)>,
65}
66
67impl PeerMap {
68 /// Create a new peer map.
69 fn new() -> Self {
70 PeerMap {
71 peers: HashMap::new(),
72
73 // Start with generation 1 so the routing task can start with generation 0. This means
74 // the routing task is out of sync as soon as it starts, so it will immediately trigger
75 // an update.
76 generation: 1,
77
78 wakeup: None,
79 control_channels: Vec::new(),
80 }
81 }
82
83 /// Increment the generation field and fire the wakeup sender if present.
84 ///
85 /// In short: this signals to the routing task that the peer map has been modified, so that it
86 /// can update neighboring nodes with new routing information.
87 fn increment_generation(&mut self) {
88 self.generation += 1;
89 self.wakeup.take().map(|x| {
90 let _ = x.send(());
91 });
92 }
93
94 /// Get the list of peers mutably, and signal the routing task that we have modified it.
95 fn peers(
96 &mut self,
97 ) -> &mut HashMap<EncodableString, Vec<(Sender<(stream::Reader, stream::Writer)>, Quality)>>
98 {
99 self.increment_generation();
100 &mut self.peers
101 }
102
103 /// Reduces our peer map to a simpler form which contains only the name of each node we can see
104 /// and the quality rating of the fastest connection we have to that node.
105 fn condense_routes(&self) -> HashMap<EncodableString, Quality> {
106 let mut ret = HashMap::new();
107
108 for (key, value) in &self.peers {
109 if value.is_empty() {
110 continue;
111 }
112
113 ret.insert(key.clone(), value[0].1);
114 }
115
116 ret
117 }
118
119 /// Adds a control channel to `control_channels` and sends an initial route update for that
120 /// channel.
121 fn add_control_channel(&mut self, channel: stream::Writer, quality: Quality) -> Result<()> {
122 let routes = self.condense_routes();
123
124 for (node, &route_quality) in &routes {
125 let quality = quality.combine(route_quality);
126 let state = NodeState::Online(node.clone(), quality);
127
128 channel.write_protocol_message(&state)?;
129 }
130
131 self.control_channels.push((channel, routes));
132
133 Ok(())
134 }
135}
136
137/// Represents a node on the circuit network.
138///
139/// A node can connect to one or more other nodes, and a stream can be established between any two
140/// nodes on the entire connected graph of nodes.
141pub struct Node {
142 /// A unique identifying string for this node.
143 node_id: EncodableString,
144 /// Indicates what protocol we intend to run atop the streams we established, e.g.
145 /// "Overnet-with-FIDL-ABI-1"
146 protocol: EncodableString,
147 /// List of other nodes we can "see" on the network.
148 peers: Arc<Mutex<PeerMap>>,
149 /// If true, we will inform nodes we are connected to of what other nodes we can see, so that
150 /// they might try to forward traffic through us. This also indicates that there is a "router
151 /// process" running for this node which handles said forwarding.
152 has_router: bool,
153
154 /// If another node establishes a connection to this node, we will notify the user by way of
155 /// this sender.
156 incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
157
158 /// If a new peer becomes available we will send its name through this sender to notify the user.
159 new_peer_sender: Sender<String>,
160}
161
162impl Node {
163 /// Establish a new node.
164 ///
165 /// Any time a new peer becomes visible to this node, the peer's node ID will be sent to
166 /// `new_peer_sender`.
167 ///
168 /// Any time a peer wants to establish a stream with this node, a reader and writer for the new
169 /// stream as well as the peer's node ID will be sent to `incoming_stream_sender`.
170 pub fn new(
171 node_id: &str,
172 protocol: &str,
173 new_peer_sender: Sender<String>,
174 incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
175 ) -> Result<Node> {
176 let node_id = node_id.to_owned().try_into()?;
177 let protocol = protocol.to_owned().try_into()?;
178
179 Ok(Node {
180 node_id,
181 protocol,
182 new_peer_sender,
183 incoming_stream_sender,
184 peers: Arc::new(Mutex::new(PeerMap::new())),
185 has_router: false,
186 })
187 }
188
189 /// Establish a new node which will forward streams between its peers. The router process
190 /// provided must be polled continuously to provide this forwarding.
191 pub fn new_with_router(
192 node_id: &str,
193 protocol: &str,
194 interval: Duration,
195 new_peer_sender: Sender<String>,
196 incoming_stream_sender: Sender<(stream::Reader, stream::Writer, String)>,
197 ) -> Result<(Node, impl Future<Output = ()> + Send + use<>)> {
198 let mut node = Self::new(node_id, protocol, new_peer_sender, incoming_stream_sender)?;
199 node.has_router = true;
200
201 let weak_peers = Arc::downgrade(&node.peers);
202
203 Ok((node, router(weak_peers, interval)))
204 }
205
206 /// Establish a stream with another node. Data will be sent to the peer with
207 /// `connection_writer`, and received with `connection_reader`.
208 pub async fn connect_to_peer(
209 &self,
210 connection_reader: stream::Reader,
211 connection_writer: stream::Writer,
212 node_id: &str,
213 ) -> Result<()> {
214 if self.node_id == node_id {
215 self.incoming_stream_sender
216 .clone()
217 .send((connection_reader, connection_writer, node_id.to_owned()))
218 .await
219 .map_err(|_| Error::NoSuchPeer(node_id.to_owned()))?;
220 } else {
221 let node_id: EncodableString = node_id.to_owned().try_into()?;
222 // Write the destination node ID and the source node ID (us). Keep in mind because of
223 // the semantics of push_back_protocol_message, these will come off the wire in the
224 // *reverse* of the order we're putting them on here.
225 connection_reader.push_back_protocol_message(&self.node_id)?;
226 connection_reader.push_back_protocol_message(&node_id)?;
227 connect_to_peer(
228 Arc::clone(&self.peers),
229 connection_reader,
230 connection_writer,
231 &node_id,
232 )
233 .await?;
234 }
235
236 Ok(())
237 }
238
239 /// Test function to copy all routes toward one node as routes to another node.
240 #[cfg(test)]
241 pub fn route_via(&self, to: &str, via: &str) {
242 let to = EncodableString::try_from(to.to_owned()).unwrap();
243 let via = EncodableString::try_from(via.to_owned()).unwrap();
244 let mut peers = self.peers.lock().unwrap();
245 let new_list = peers.peers.get(&via).unwrap().clone();
246 peers.peers.insert(to, new_list);
247 }
248
249 /// Connect to another node.
250 ///
251 /// This establishes the internal state to link this node directly to another one, there by
252 /// joining it to the circuit network. To actually perform the networking necessary to create
253 /// such a link, a back end will have to service the streams given to this function via
254 /// its arguments. To keep the link running, the returned future must also be polled to
255 /// completion. Depending on configuration, it may complete swiftly or may poll for the entire
256 /// lifetime of the link.
257 ///
258 /// When we link to another node, we immediately start a "control stream" that performs a
259 /// handshake and sends routing messages. The reader and writer passed in through
260 /// `control_stream` will be used to service this stream. If `control_stream` is `None` the
261 /// first stream emitted from `new_stream_receiver` will be used.
262 ///
263 /// When the local node needs to create a new stream to the linked node, it will send a reader
264 /// and writer to `new_stream_sender`.
265 ///
266 /// When the linked node wants to create a new stream to this node, the back end may send a
267 /// reader and writer through `new_stream_receiver`, as well as a `oneshot::Sender` which will
268 /// be used to report if the link is established successfully or if an error occurs.
269 ///
270 /// The returned future will continue to poll for the lifetime of the link and return the error
271 /// that terminated it.
272 pub fn link_node<F>(
273 &self,
274 control_stream: Option<(stream::Reader, stream::Writer)>,
275 new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
276 mut new_stream_receiver: F,
277 quality: Quality,
278 ) -> impl Future<Output = Result<()>> + Send + use<F>
279 where
280 F: futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
281 + Unpin
282 + Send,
283 {
284 let has_router = self.has_router;
285 let peers = Arc::clone(&self.peers);
286 let node_id = self.node_id.clone();
287 let protocol = self.protocol.clone();
288 let (new_stream_receiver_sender, new_stream_receiver_receiver) = oneshot::channel();
289 let (control_reader_sender, control_reader_receiver) = oneshot::channel();
290 let new_streams_loop =
291 self.handle_new_streams(new_stream_receiver_receiver, new_stream_sender.clone());
292 let control_stream_loop =
293 self.handle_control_stream(control_reader_receiver, new_stream_sender.clone(), quality);
294
295 // Runs all necessary background processing for a connection. Tasks include:
296 // 1) Fetch a control stream from the new_stream_receiver if we don't have one from
297 // control_stream already.
298 // 2) Send a handshake message to the other node.
299 // 3) Wait for and validate the handshake from the peer.
300 // 4) Receive updated routing information from the peer and update the peer map accordingly.
301 // 5) When the peer tries to open a new stream to us, either forward that stream to another
302 // node or, if it's destined for us directly, consume the initial handshake from it and send
303 // it out to be processed by the user.
304 async move {
305 let (control_reader, control_writer) = if let Some(control_stream) = control_stream {
306 control_stream
307 } else {
308 let (reader, writer, error_sender) =
309 new_stream_receiver.next().await.ok_or_else(|| {
310 Error::ConnectionClosed(Some(
311 "Client stopped listening for new streams".to_owned(),
312 ))
313 })?;
314 let _ = error_sender.send(Ok(()));
315 (reader, writer)
316 };
317
318 let _ = new_stream_receiver_sender.send(new_stream_receiver);
319
320 let header = Identify::new(protocol.clone());
321 control_writer.write_protocol_message(&header)?;
322
323 let state = NodeState::Online(node_id.clone(), Quality::SELF);
324 control_writer.write_protocol_message(&state)?;
325
326 if has_router {
327 peers
328 .lock()
329 .unwrap()
330 .add_control_channel(control_writer, quality)
331 .expect("We just created this channel!");
332 } else {
333 // No router means no further routing messages. Just let 'er go.
334 std::mem::drop(control_writer);
335 }
336
337 // Start by reading and validating a handshake message from the stream.
338 let header = control_reader.read_protocol_message::<Identify>().await?;
339
340 if header.circuit_version != CIRCUIT_VERSION {
341 return Err(Error::VersionMismatch);
342 } else if header.protocol != protocol {
343 return Err(Error::ProtocolMismatch);
344 }
345
346 control_reader_sender.send(control_reader).map_err(|_| {
347 Error::ConnectionClosed(Some("Control stream handler disappeared".to_owned()))
348 })?;
349
350 let control_stream_loop = pin!(control_stream_loop);
351 let new_streams_loop = pin!(new_streams_loop);
352
353 let ret = match futures::future::select(control_stream_loop, new_streams_loop).await {
354 Either::Left((result, new_streams)) => {
355 if matches!(result, Ok(()) | Err(Error::ConnectionClosed(_))) {
356 new_streams.await;
357 }
358 result
359 }
360 Either::Right(((), read_control)) => read_control.now_or_never().unwrap_or(Ok(())),
361 };
362
363 {
364 let mut peers = peers.lock().unwrap();
365 let peers = peers.peers();
366 for peer_list in peers.values_mut() {
367 peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
368 }
369 }
370
371 ret
372 }
373 }
374
375 /// Handles messages we receive on a control stream, i.e. routing updates. `new_stream_sender`
376 /// is a channel by which streams destined for another peer can be forwarded to the node on the
377 /// other end of this control stream. The routing table will associate that sender with any
378 /// peers that can be reached via that node.
379 ///
380 /// The returned future will poll until the control stream hangs up or a protocol error occurs.
381 fn handle_control_stream(
382 &self,
383 control_reader: oneshot::Receiver<stream::Reader>,
384 new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
385 quality: Quality,
386 ) -> impl Future<Output = Result<()>> + Send + use<> {
387 let peers = Arc::clone(&self.peers);
388 let new_stream_sender = new_stream_sender;
389 let node_id = self.node_id.clone();
390 let mut new_peer_sender = self.new_peer_sender.clone();
391
392 async move {
393 let control_reader = control_reader.await.map_err(|_| {
394 Error::ConnectionClosed(Some(
395 "Reader never given to control stream handler".to_string(),
396 ))
397 })?;
398 loop {
399 let state = control_reader.read_protocol_message::<NodeState>().await?;
400 match state {
401 NodeState::Online(peer, path_quality) => {
402 if peer == node_id {
403 continue;
404 }
405
406 let quality = path_quality.combine(quality);
407 let peer_string = peer.to_string();
408 let should_send = {
409 let mut peers = peers.lock().unwrap();
410 let peers = peers.peers();
411 let peer_list = peers.entry(peer).or_insert_with(Vec::new);
412 let should_send = peer_list.is_empty();
413 peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
414 peer_list.push((new_stream_sender.clone(), quality));
415 peer_list.sort_by_key(|x| x.1);
416 should_send
417 };
418 if should_send {
419 let _ = new_peer_sender.send(peer_string).await;
420 }
421 }
422 NodeState::Offline(peer) => {
423 let mut peers = peers.lock().unwrap();
424 let peers = peers.peers();
425 let peer_list = peers.get_mut(&peer);
426
427 if let Some(peer_list) = peer_list {
428 peer_list.retain(|x| !x.0.same_receiver(&new_stream_sender));
429 }
430 }
431 }
432 }
433 }
434 }
435
436 /// Handles requests for new streams. The `new_stream_receiver` provides a reader and writer for
437 /// every stream that gets established to or through this node, as well as a `Result` sender so
438 /// we can indicate if we have any trouble handling this stream. The `new_stream_sender` allows
439 /// us to connect back to the connecting node. If we don't have routing info for the incoming
440 /// node we will establish a new route via that stream when the connection arrives.
441 ///
442 /// For each incoming stream, we read a bit of protocol header out of it and either accept it or
443 /// forward it to another peer.
444 ///
445 /// The returned future will poll until the back end hangs up the other end of the receiver.
446 fn handle_new_streams<F>(
447 &self,
448 new_stream_receiver_receiver: oneshot::Receiver<F>,
449 new_stream_sender: Sender<(stream::Reader, stream::Writer)>,
450 ) -> impl Future<Output = ()> + use<F>
451 where
452 F: futures::Stream<Item = (stream::Reader, stream::Writer, oneshot::Sender<Result<()>>)>
453 + Unpin,
454 {
455 let peers = Arc::clone(&self.peers);
456 let mut incoming_stream_sender = self.incoming_stream_sender.clone();
457 let mut new_peer_sender = self.new_peer_sender.clone();
458 let node_id = self.node_id.clone();
459
460 async move {
461 let mut new_stream_receiver = if let Ok(x) = new_stream_receiver_receiver.await {
462 x
463 } else {
464 return;
465 };
466
467 while let Some((reader, writer, result_sender)) = new_stream_receiver.next().await {
468 let _ = result_sender.send(
469 async {
470 let dest = reader
471 .read(EncodableString::MIN_SIZE, |buf| {
472 EncodableString::try_from_bytes(buf).map(|(dest, size)| {
473 if dest == node_id {
474 // If the destination is this node, discard the string
475 // itself (we know where we are, so we don't need it)
476 // and return as usual.
477 (None, size)
478 } else {
479 // If the destination node is another node, return a
480 // size of zero, which tells the reader to leave the
481 // destination string in the stream. When we forward the
482 // stream, the node we forward it to will be able to
483 // read the destination again.
484 (Some(dest), 0)
485 }
486 })
487 })
488 .await?;
489
490 if let Some(dest) = dest {
491 connect_to_peer(Arc::clone(&peers), reader, writer, &dest).await?;
492 } else {
493 let src = reader.read_protocol_message::<EncodableString>().await?;
494 let send_new_peer = {
495 let mut peers = peers.lock().unwrap();
496 let peer_list =
497 peers.peers.entry(src.clone()).or_insert_with(Vec::new);
498 if !peer_list.iter().any(|x| x.0.same_receiver(&new_stream_sender))
499 {
500 peer_list.push((new_stream_sender.clone(), Quality::UNKNOWN));
501 peers.increment_generation();
502 true
503 } else {
504 false
505 }
506 };
507
508 if send_new_peer {
509 let _ = new_peer_sender.send(src.to_string()).await;
510 }
511
512 incoming_stream_sender
513 .send((reader, writer, src.to_string()))
514 .await
515 .map_err(|_| {
516 Error::ConnectionClosed(Some(
517 "Incoming stream dispatcher disappeared".to_owned(),
518 ))
519 })?;
520 }
521 Ok(())
522 }
523 .await,
524 );
525 }
526 }
527 }
528
529 /// Get the node ID of this node.
530 pub fn node_id(&self) -> &str {
531 self.node_id.as_str()
532 }
533}
534
535/// Given the reader and writer for an incoming connection, forward that
536/// connection to another node.
537async fn connect_to_peer(
538 peers: Arc<Mutex<PeerMap>>,
539 peer_reader: stream::Reader,
540 peer_writer: stream::Writer,
541 node_id: &EncodableString,
542) -> Result<()> {
543 let mut peer_channels = Some((peer_reader, peer_writer));
544 let mut peer_sender: Option<Sender<(stream::Reader, stream::Writer)>> = None;
545
546 poll_fn(|ctx| {
547 // If we found a place to send the new stream during the last poll, but
548 // it gave us pending, we have to keep that sender's clone around so our
549 // waker is remembered and we get woken up. We could just drop it here
550 // and repeat the whole discovery again, which would cost compute but
551 // make us more responsive to changes in the network topology by a
552 // smidge. Instead we'll try the same link again, and keep trying until
553 // it succeeds or breaks.
554 if let Some(sender) = &mut peer_sender {
555 match sender.poll_ready(ctx) {
556 Poll::Ready(Ok(())) => {
557 sender
558 .start_send(peer_channels.take().unwrap())
559 .expect("Should be guaranteed to succeed!");
560 return Poll::Ready(());
561 }
562 Poll::Ready(Err(_)) => peer_sender = None,
563 Poll::Pending => {
564 return Poll::Pending;
565 }
566 }
567 }
568
569 let mut peers = peers.lock().unwrap();
570
571 // For each peer we have a list of channels to which we can send our
572 // reader and writer, each representing a connection which will become
573 // the next link in the circuit. The list is sorted by connection
574 // quality, getting worse toward the end of the list, so we want to send
575 // our reader and writer to the first one we can.
576 let Some(peer_list) = peers.peers.get_mut(node_id) else {
577 return Poll::Ready(());
578 };
579
580 let mut changed = false;
581
582 // Go through each potential connection and send to the first one which
583 // will handle the connection. We may discover the first few we try have
584 // hung up and gone away, so we'll delete those from the list and try
585 // the next one.
586 //
587 // If the channel used to send to the fastest available connection is
588 // full, we pause and retry when it is ready. We *could* continue down
589 // the list to find another connection, but we don't; we assume waiting
590 // for a faster connection to be serviceable locally nets better
591 // performance in the long run than sending on a slower connection that
592 // can be serviced right away.
593 peer_list.retain_mut(|x| {
594 if peer_sender.is_none() && peer_channels.is_some() {
595 let mut sender = x.0.clone();
596 match sender.poll_ready(ctx) {
597 Poll::Ready(Ok(())) => {
598 sender
599 .start_send(peer_channels.take().unwrap())
600 .expect("Should be guaranteed to succeed!");
601 true
602 }
603 Poll::Ready(Err(_)) => {
604 changed = true;
605 false
606 }
607 Poll::Pending => {
608 peer_sender = Some(sender);
609 true
610 }
611 }
612 } else {
613 true
614 }
615 });
616
617 // If this is true, we cleared out some stale connections from the
618 // routing table. Send a routing update to update our neighbors about
619 // how this might affect connectivity.
620 if changed {
621 peers.increment_generation();
622 }
623
624 // The peer sender is where we register our waker. If we don't have one
625 // we didn't register a waker and should return now.
626 if peer_sender.is_none() {
627 Poll::Ready(())
628 } else {
629 Poll::Pending
630 }
631 })
632 .await;
633
634 // Our iteration above should have taken channels and sent them along to the
635 // connection that will handle them. If they're still here we didn't find a
636 // channel.
637 if peer_channels.is_none() {
638 Ok(())
639 } else {
640 Err(Error::NoSuchPeer(node_id.to_string()))
641 }
642}
643
644/// Given an old and a new condensed routing table, create a serialized list of
645/// `NodeState`s which will update a node on what has changed between them.
646fn route_updates(
647 old_routes: &HashMap<EncodableString, Quality>,
648 new_routes: &HashMap<EncodableString, Quality>,
649) -> Vec<u8> {
650 let mut ret = Vec::new();
651
652 for (node, &quality) in new_routes {
653 if let Some(&old_quality) = old_routes.get(node) {
654 if old_quality == quality {
655 continue;
656 }
657 }
658
659 NodeState::Online(node.clone(), quality).write_bytes_vec(&mut ret);
660 }
661
662 for old_node in old_routes.keys() {
663 if !new_routes.contains_key(old_node) {
664 NodeState::Offline(old_node.clone()).write_bytes_vec(&mut ret);
665 }
666 }
667
668 ret
669}
670
671/// Router process. Notifies each peer every time the router table is updated. The given `interval`
672/// allows these updates to be rate-limited; there will always be at least `interval` time between
673/// updates.
674async fn router(peers: Weak<Mutex<PeerMap>>, interval: Duration) {
675 let mut generation = 0;
676
677 loop {
678 let mut wake_receiver = None;
679 {
680 let peers = if let Some(peers) = peers.upgrade() {
681 peers
682 } else {
683 return;
684 };
685 let mut peers = peers.lock().unwrap();
686
687 if peers.generation <= generation {
688 let (sender, receiver) = oneshot::channel();
689 peers.wakeup = Some(sender);
690 wake_receiver = Some(receiver);
691 } else {
692 let new_routes = peers.condense_routes();
693
694 peers.control_channels.retain_mut(|(sender, routes)| {
695 let msgs = route_updates(routes, &new_routes);
696
697 if sender
698 .write(msgs.len(), |buf| {
699 buf[..msgs.len()].copy_from_slice(&msgs);
700 Ok(msgs.len())
701 })
702 .is_ok()
703 {
704 *routes = new_routes.clone();
705 true
706 } else {
707 false
708 }
709 });
710
711 generation = peers.generation;
712 }
713 }
714
715 if let Some(receiver) = wake_receiver {
716 let _ = receiver.await;
717 } else {
718 Timer::new(interval).await
719 }
720 }
721}