overnet_core/router/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Manages peer<->peer connections and routing packets over links between nodes.
6
7// General structure:
8// A router is a collection of: - streams (each with a StreamId<LinkData>)
9//                                These are streams of information flow between processes.
10//                              - peers (each with a PeerId)
11//                                These are other overnet instances in the overlay network.
12//                                There is a client oriented and a server oriented peer per
13//                                node.
14//                              - links (each with a LinkId)
15//                                These are connections between this instance and other
16//                                instances in the mesh.
17// For each node in the mesh, a routing table tracks which link on which to send data to
18// that node (said link may be a third node that will be requested to forward datagrams
19// on our behalf).
20
21mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{log_errors, Observer};
25use crate::handle_info::{handle_info, HandleKey, HandleType};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29    IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{bail, format_err, Context as _, Error};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, Handle, HandleBased, Socket};
34use fidl_fuchsia_overnet_protocol::{
35    ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36    ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use std::collections::{BTreeMap, HashMap};
45use std::pin::pin;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Weak};
48use std::task::{Context, Poll, Waker};
49use std::time::Duration;
50
51pub use self::service_map::ListablePeer;
52
53#[derive(Debug)]
54enum PendingTransfer {
55    Complete(FoundTransfer),
56    Waiting(Waker),
57}
58
59type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
60
61#[derive(Debug)]
62pub(crate) enum FoundTransfer {
63    Fused(Handle),
64    Remote(FramedStreamWriter, FramedStreamReader),
65}
66
67#[derive(Debug)]
68pub(crate) enum OpenedTransfer {
69    Fused,
70    Remote(FramedStreamWriter, FramedStreamReader, Handle),
71}
72
73#[derive(Debug)]
74#[allow(dead_code)]
75enum CircuitState {
76    Waiters(Vec<oneshot::Sender<()>>),
77    Peer(Arc<Peer>),
78}
79
80impl CircuitState {
81    fn peer(&self) -> Option<Arc<Peer>> {
82        if let CircuitState::Peer(peer) = self {
83            Some(Arc::clone(peer))
84        } else {
85            None
86        }
87    }
88}
89
90#[derive(Debug)]
91struct PeerMaps {
92    circuit_clients: BTreeMap<NodeId, CircuitState>,
93}
94
95/// Wrapper to get the right list_peers behavior.
96#[derive(Debug)]
97pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
98
99static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
100
101impl ListPeersContext {
102    /// Implementation of ListPeers fidl method.
103    pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
104        let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
105        log::trace!(list_peers_call = call_id; "get observer");
106        let mut obs = self
107            .0
108            .lock()
109            .await
110            .take()
111            .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
112        log::trace!(list_peers_call = call_id; "wait for value");
113        let r = obs.next().await;
114        log::trace!(list_peers_call = call_id; "replace observer");
115        *self.0.lock().await = Some(obs);
116        log::trace!(list_peers_call = call_id; "return");
117        Ok(r.unwrap_or_else(Vec::new))
118    }
119}
120
121/// Whether this node's ascendd clients should be routed to each other
122pub enum AscenddClientRouting {
123    /// Ascendd client routing is allowed
124    Enabled,
125    /// Ascendd client routing is prevented
126    Disabled,
127}
128
129/// Router maintains global state for one node_id.
130/// `LinkData` is a token identifying a link for layers above Router.
131/// `Time` is a representation of time for the Router, to assist injecting different platforms
132/// schemes.
133pub struct Router {
134    /// Our node id
135    node_id: NodeId,
136    /// All peers.
137    peers: Mutex<PeerMaps>,
138    service_map: ServiceMap,
139    proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
140    pending_transfers: Mutex<PendingTransferMap>,
141    task: Mutex<Option<Task<()>>>,
142    /// Hack to prevent the n^2 scaling of a fully-connected graph of ffxs
143    ascendd_client_routing: AtomicBool,
144    circuit_node: circuit::ConnectionNode,
145}
146
147struct ProxiedHandle {
148    remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
149    original_paired: HandleKey,
150    proxy_task: Task<()>,
151}
152
153/// Generate a new random node id
154fn generate_node_id() -> NodeId {
155    rand::random::<u64>().into()
156}
157
158fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
159    v.sort();
160    v
161}
162
163impl std::fmt::Debug for Router {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        write!(f, "Router({:?})", self.node_id)
166    }
167}
168
169/// This is sent when initiating connections between circuit nodes and must be identical between all
170/// such nodes.
171const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
172
173impl Router {
174    /// Create a new router. If `router_interval` is given, this router will
175    /// behave like an interior node and tell its neighbors about each other.
176    pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
177        Router::with_node_id(generate_node_id(), router_interval)
178    }
179
180    /// Make a router with a specific node ID.
181    pub fn with_node_id(
182        node_id: NodeId,
183        router_interval: Option<Duration>,
184    ) -> Result<Arc<Self>, Error> {
185        let service_map = ServiceMap::new(node_id);
186        let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
187        let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
188            let (a, b) = circuit::ConnectionNode::new_with_router(
189                &node_id.circuit_string(),
190                OVERNET_CIRCUIT_PROTOCOL,
191                interval,
192                new_peer_sender,
193            )?;
194            (a, b.boxed())
195        } else {
196            let (a, b) = circuit::ConnectionNode::new(
197                &node_id.circuit_string(),
198                OVERNET_CIRCUIT_PROTOCOL,
199                new_peer_sender,
200            )?;
201            (a, b.boxed())
202        };
203        let router = Arc::new(Router {
204            node_id,
205            service_map,
206            peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
207            proxied_streams: Mutex::new(HashMap::new()),
208            pending_transfers: Mutex::new(PendingTransferMap::new()),
209            task: Mutex::new(None),
210            // Default is to route all clients to each other. Ffx daemon disabled client routing.
211            ascendd_client_routing: AtomicBool::new(true),
212            circuit_node,
213        });
214
215        let weak_router = Arc::downgrade(&router);
216        *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
217            run_circuits(weak_router, circuit_connections, new_peer_receiver),
218            format!("router {:?} support loop failed", node_id),
219        )));
220
221        Ok(router)
222    }
223
224    /// Get the circuit protocol node for this router. This will let us create new connections to
225    /// other nodes.
226    pub fn circuit_node(&self) -> &circuit::Node {
227        self.circuit_node.node()
228    }
229
230    /// Accessor for the node id of this router.
231    pub fn node_id(&self) -> NodeId {
232        self.node_id
233    }
234
235    /// Accessor for whether to route ascendd clients to each other
236    pub fn client_routing(&self) -> AscenddClientRouting {
237        if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
238            AscenddClientRouting::Enabled
239        } else {
240            AscenddClientRouting::Disabled
241        }
242    }
243
244    /// Setter for whether to route ascendd clients to each other
245    pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
246        let client_routing = match client_routing {
247            AscenddClientRouting::Enabled => true,
248            AscenddClientRouting::Disabled => false,
249        };
250        self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
251    }
252
253    pub(crate) fn service_map(&self) -> &ServiceMap {
254        &self.service_map
255    }
256
257    /// Create a new stream to advertised service `service` on remote node id `node`.
258    pub async fn connect_to_service(
259        self: &Arc<Self>,
260        node_id: NodeId,
261        service_name: &str,
262        chan: Channel,
263    ) -> Result<(), Error> {
264        let is_local = node_id == self.node_id;
265        log::trace!(
266            service_name:%,
267            node_id = node_id.0,
268            local = is_local;
269            "Request connect_to_service",
270        );
271        if is_local {
272            self.service_map().connect(service_name, chan).await
273        } else {
274            self.client_peer(node_id)
275                .await
276                .with_context(|| {
277                    format_err!(
278                        "Fetching client peer for new stream to {:?} for service {:?}",
279                        node_id,
280                        service_name,
281                    )
282                })?
283                .new_stream(service_name, chan, self)
284                .await
285        }
286    }
287
288    /// Register a service. The callback should connect the given channel to the
289    /// service in question.
290    pub async fn register_service(
291        &self,
292        service_name: String,
293        provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
294    ) -> Result<(), Error> {
295        self.service_map().register_service(service_name, provider).await;
296        Ok(())
297    }
298
299    /// Create a new list_peers context
300    pub async fn new_list_peers_context(&self) -> ListPeersContext {
301        ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
302    }
303
304    async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
305        loop {
306            let mut peers = self.peers.lock().await;
307            match peers.circuit_clients.get_mut(&peer_node_id) {
308                Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
309                Some(CircuitState::Waiters(waiters)) => {
310                    let (sender, receiver) = oneshot::channel();
311                    waiters.push(sender);
312                    std::mem::drop(peers);
313                    let _ = receiver.await;
314                }
315                None => {
316                    peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
317                }
318            }
319        }
320    }
321
322    fn add_proxied(
323        self: &Arc<Self>,
324        proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
325        this_handle_key: HandleKey,
326        pair_handle_key: HandleKey,
327        remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
328        f: impl 'static + Send + Future<Output = Result<(), Error>>,
329    ) {
330        let router = Arc::downgrade(&self);
331        let proxy_task = Task::spawn(async move {
332            if let Err(e) = f.await {
333                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
334            } else {
335                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
336            }
337            if let Some(router) = Weak::upgrade(&router) {
338                router.remove_proxied(this_handle_key, pair_handle_key).await;
339            }
340        });
341        assert!(proxied_streams
342            .insert(
343                this_handle_key,
344                ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
345            )
346            .is_none());
347    }
348
349    // Remove a proxied handle from our table.
350    // Called by proxy::Proxy::drop.
351    async fn remove_proxied(
352        self: &Arc<Self>,
353        this_handle_key: HandleKey,
354        pair_handle_key: HandleKey,
355    ) {
356        let mut proxied_streams = self.proxied_streams.lock().await;
357        log::trace!(
358            node_id = self.node_id.0,
359            this_handle_key:?,
360            pair_handle_key:?,
361            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
362            "REMOVE_PROXIED",
363        );
364        if let Some(removed) = proxied_streams.remove(&this_handle_key) {
365            assert_eq!(removed.original_paired, pair_handle_key);
366            let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
367            removed.proxy_task.detach();
368        }
369    }
370
371    // Prepare a handle to be sent to another machine.
372    // Returns a ZirconHandle describing the established proxy.
373    pub(crate) async fn send_proxied(
374        self: &Arc<Self>,
375        handle: Handle,
376        conn: PeerConnRef<'_>,
377    ) -> Result<ZirconHandle, Error> {
378        let raw_handle = handle.raw_handle(); // for debugging
379        let info = handle_info(handle.as_handle_ref())
380            .with_context(|| format!("Getting handle information for {}", raw_handle))?;
381        let mut proxied_streams = self.proxied_streams.lock().await;
382        log::trace!(
383            node_id = self.node_id.0,
384            handle:?,
385            info:?,
386            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
387            "SEND_PROXIED",
388        );
389        if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
390            // This handle is the other end of an already proxied object...
391            // Here we need to inform the existing proxy loop that a transfer is going to be
392            // initiated, and to where.
393            drop(proxied_streams);
394            assert_eq!(info.this_handle_key, pair.original_paired);
395            log::trace!(
396                handle:?,
397                orig_pair:? = pair.original_paired;
398                "Send paired proxied"
399            );
400            // We allocate a drain stream to flush any messages we've buffered locally to the new
401            // endpoint.
402            let drain_stream = conn.alloc_uni().await?.into();
403            let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
404            pair.remove_sender
405                .send(RemoveFromProxyTable::InitiateTransfer {
406                    paired_handle: handle,
407                    drain_stream,
408                    stream_ref_sender,
409                })
410                .map_err(|_| format_err!("Failed to initiate transfer"))?;
411            let stream_ref = stream_ref_receiver
412                .await
413                .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
414            pair.proxy_task.detach();
415            match info.handle_type {
416                HandleType::Channel(rights) => {
417                    Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
418                }
419                HandleType::Socket(socket_type, rights) => {
420                    Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
421                }
422                HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
423                    stream_ref,
424                    rights: EventPairRights::empty(),
425                })),
426            }
427        } else {
428            // This handle (and its pair) is previously unseen... establish a proxy stream for it
429            log::trace!(handle:?; "Send proxied");
430            let (tx, rx) = futures::channel::oneshot::channel();
431            let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
432                format_err!(
433                    "cancelled transfer via send_proxied {:?}\n{}",
434                    info,
435                    111 //std::backtrace::Backtrace::force_capture()
436                )
437            }));
438            let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
439            let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
440            Ok(match info.handle_type {
441                HandleType::Channel(rights) => {
442                    self.add_proxied(
443                        &mut *proxied_streams,
444                        info.this_handle_key,
445                        info.pair_handle_key,
446                        tx,
447                        crate::proxy::spawn_send(
448                            Channel::from_handle(handle).into_proxied()?,
449                            rx,
450                            stream_writer.into(),
451                            stream_reader.into(),
452                            Arc::downgrade(&self),
453                        ),
454                    );
455                    ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
456                }
457                HandleType::Socket(socket_type, rights) => {
458                    self.add_proxied(
459                        &mut *proxied_streams,
460                        info.this_handle_key,
461                        info.pair_handle_key,
462                        tx,
463                        crate::proxy::spawn_send(
464                            Socket::from_handle(handle).into_proxied()?,
465                            rx,
466                            stream_writer.into(),
467                            stream_reader.into(),
468                            Arc::downgrade(&self),
469                        ),
470                    );
471                    ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
472                }
473                HandleType::EventPair => {
474                    self.add_proxied(
475                        &mut *proxied_streams,
476                        info.this_handle_key,
477                        info.pair_handle_key,
478                        tx,
479                        crate::proxy::spawn_send(
480                            EventPair::from_handle(handle).into_proxied()?,
481                            rx,
482                            stream_writer.into(),
483                            stream_reader.into(),
484                            Arc::downgrade(&self),
485                        ),
486                    );
487                    ZirconHandle::EventPair(EventPairHandle {
488                        stream_ref,
489                        rights: EventPairRights::empty(),
490                    })
491                }
492            })
493        }
494    }
495
496    // Take a received handle description and construct a fidl::Handle that represents it
497    // whilst establishing proxies as required
498    pub(crate) async fn recv_proxied(
499        self: &Arc<Self>,
500        handle: ZirconHandle,
501        conn: PeerConnRef<'_>,
502    ) -> Result<Handle, Error> {
503        match handle {
504            ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
505                self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
506                    .await
507            }
508            ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
509                self.recv_proxied_handle(
510                    conn,
511                    stream_ref,
512                    move || {
513                        Ok(match socket_type {
514                            SocketType::Stream => Socket::create_stream(),
515                            SocketType::Datagram => Socket::create_datagram(),
516                        })
517                    },
518                    rights,
519                )
520                .await
521            }
522            ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
523                self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
524                    .await
525            }
526        }
527    }
528
529    async fn recv_proxied_handle<Hdl, CreateType>(
530        self: &Arc<Self>,
531        conn: PeerConnRef<'_>,
532        stream_ref: StreamRef,
533        create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
534        rights: CreateType::Rights,
535    ) -> Result<Handle, Error>
536    where
537        Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
538        CreateType: 'static
539            + fidl::HandleBased
540            + IntoProxied<Proxied = Hdl>
541            + std::fmt::Debug
542            + crate::handle_info::WithRights,
543    {
544        let (tx, rx) = futures::channel::oneshot::channel();
545        let rx = ProxyTransferInitiationReceiver::new(
546            rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
547        );
548        let (h, p) = crate::proxy::spawn_recv(
549            create_handles,
550            rights,
551            rx,
552            stream_ref,
553            conn,
554            Arc::downgrade(&self),
555        )
556        .await?;
557        if let Some(p) = p {
558            let info = handle_info(h.as_handle_ref())?;
559            self.add_proxied(
560                &mut *self.proxied_streams.lock().await,
561                info.pair_handle_key,
562                info.this_handle_key,
563                tx,
564                p,
565            );
566        }
567        Ok(h)
568    }
569
570    // Note the endpoint of a transfer that we know about (may complete a transfer operation)
571    pub(crate) async fn post_transfer(
572        &self,
573        transfer_key: TransferKey,
574        other_end: FoundTransfer,
575    ) -> Result<(), Error> {
576        let mut pending_transfers = self.pending_transfers.lock().await;
577        match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
578            Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
579            Some(PendingTransfer::Waiting(w)) => w.wake(),
580            None => (),
581        }
582        Ok(())
583    }
584
585    fn poll_find_transfer(
586        &self,
587        ctx: &mut Context<'_>,
588        transfer_key: TransferKey,
589        lock: &mut MutexTicket<'_, PendingTransferMap>,
590    ) -> Poll<Result<FoundTransfer, Error>> {
591        let mut pending_transfers = ready!(lock.poll(ctx));
592        if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
593        {
594            Poll::Ready(Ok(other_end))
595        } else {
596            pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
597            Poll::Pending
598        }
599    }
600
601    // Lookup a transfer that we're expected to eventually know about
602    pub(crate) async fn find_transfer(
603        &self,
604        transfer_key: TransferKey,
605    ) -> Result<FoundTransfer, Error> {
606        let mut lock = MutexTicket::new(&self.pending_transfers);
607        poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
608    }
609
610    // Begin a transfer operation (opposite of find_transfer), publishing an endpoint on the remote
611    // nodes transfer table.
612    pub(crate) async fn open_transfer(
613        self: &Arc<Router>,
614        target: NodeId,
615        transfer_key: TransferKey,
616        handle: Handle,
617    ) -> Result<OpenedTransfer, Error> {
618        if target == self.node_id {
619            // The target is local: we just file away the handle.
620            // Later, find_transfer will find this and we'll collapse away Overnet's involvement and
621            // reunite the channel ends.
622            let info = handle_info(handle.as_handle_ref())?;
623            let mut proxied_streams = self.proxied_streams.lock().await;
624            log::trace!(
625                node_id = self.node_id.0,
626                key:? = transfer_key,
627                info:? = info,
628                all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
629                "OPEN_TRANSFER_REMOVE_PROXIED",
630            );
631            if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
632                assert_eq!(removed.original_paired, info.pair_handle_key);
633                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
634                removed.proxy_task.detach();
635            }
636            if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
637                assert_eq!(removed.original_paired, info.this_handle_key);
638                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
639                removed.proxy_task.detach();
640            }
641            self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
642            Ok(OpenedTransfer::Fused)
643        } else {
644            if let Some((writer, reader)) =
645                self.client_peer(target).await?.send_open_transfer(transfer_key).await
646            {
647                Ok(OpenedTransfer::Remote(writer, reader, handle))
648            } else {
649                bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
650            }
651        }
652    }
653
654    pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
655        self.peers.lock().await.circuit_clients.remove(&peer_node_id);
656    }
657}
658
659/// Runs our `ConnectionNode` to set up circuit-based peers.
660async fn run_circuits(
661    router: Weak<Router>,
662    connections: impl futures::Stream<Item = circuit::Connection> + Send,
663    peers: impl futures::Stream<Item = String> + Send,
664) -> Result<(), Error> {
665    // Notes whenever a new peer announces itself on the network.
666    let new_peer_fut = {
667        let router = router.clone();
668        async move {
669            let mut peers = pin!(peers);
670            while let Some(peer_node_id) = peers.next().await {
671                let router = match router.upgrade() {
672                    Some(x) => x,
673                    None => {
674                        log::warn!("Router disappeared from under circuit runner");
675                        break;
676                    }
677                };
678
679                let res = async {
680                    let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
681                        .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
682                    let mut peers = router.peers.lock().await;
683
684                    if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
685                    {
686                        log::warn!(peer:? = peer_node_id; "Re-establishing connection");
687                    }
688
689                    let (reader, writer_remote) = circuit::stream::stream();
690                    let (reader_remote, writer) = circuit::stream::stream();
691                    let conn = router
692                        .circuit_node
693                        .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
694                        .await?;
695                    let peer = Peer::new_circuit_client(
696                        conn,
697                        writer,
698                        reader,
699                        router.service_map.new_local_service_observer().await,
700                        &router,
701                        peer_node_id_num,
702                    )?;
703
704                    if let Some(CircuitState::Waiters(waiters)) =
705                        peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
706                    {
707                        for waiter in waiters {
708                            let _ = waiter.send(());
709                        }
710                    }
711
712                    Result::<_, Error>::Ok(())
713                }
714                .await;
715
716                if let Err(e) = res {
717                    log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
718                }
719            }
720        }
721    };
722
723    // Loops over incoming connections and starts servers.
724    let new_conn_fut = async move {
725        let mut connections = pin!(connections);
726        while let Some(conn) = connections.next().await {
727            let peer_name = conn.from().to_owned();
728            let res = async {
729                let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
730                Peer::new_circuit_server(conn, &router).await?;
731                Result::<_, Error>::Ok(())
732            }
733            .await;
734
735            if let Err(e) = res {
736                log::warn!(
737                    peer:? = peer_name;
738                    "Attempt to receive connection from peer failed: {:?}",
739                    e
740                );
741            }
742        }
743    };
744
745    futures::future::join(new_peer_fut, new_conn_fut).await;
746    Ok(())
747}
748
749#[cfg(test)]
750mod tests {
751
752    use super::*;
753    use crate::test_util::*;
754    use circuit::multi_stream::multi_stream_node_connection;
755    use circuit::stream::stream;
756    use circuit::Quality;
757
758    #[fuchsia::test]
759    async fn no_op(run: usize) {
760        let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
761        node_id_gen.new_router().unwrap();
762        let id = node_id_gen.next().unwrap();
763        assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
764    }
765
766    async fn register_test_service(
767        serving_router: Arc<Router>,
768        client_router: Arc<Router>,
769        service: &'static str,
770    ) -> futures::channel::oneshot::Receiver<Channel> {
771        use fuchsia_sync::Mutex;
772        let (send, recv) = futures::channel::oneshot::channel();
773        serving_router
774            .service_map()
775            .register_service(service.to_string(), {
776                let sender = Mutex::new(Some(send));
777                move |chan| {
778                    println!("{} got request", service);
779                    sender.lock().take().unwrap().send(chan).unwrap();
780                    println!("{} forwarded channel", service);
781                    Ok(())
782                }
783            })
784            .await;
785        let serving_node_id = serving_router.node_id();
786        println!("{} wait for service to appear @ client", service);
787        let lpc = client_router.new_list_peers_context().await;
788        loop {
789            let peers = lpc.list_peers().await.unwrap();
790            println!("{} got peers {:?}", service, peers);
791            if peers
792                .iter()
793                .find(move |peer| {
794                    serving_node_id == peer.node_id
795                        && peer
796                            .services
797                            .iter()
798                            .find(move |&advertised_service| advertised_service == service)
799                            .is_some()
800                })
801                .is_some()
802            {
803                break;
804            }
805        }
806        recv
807    }
808
809    async fn run_two_node<
810        F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
811        Fut: 'static + Send + Future<Output = Result<(), Error>>,
812    >(
813        name: &'static str,
814        run: usize,
815        f: F,
816    ) -> Result<(), Error> {
817        let mut node_id_gen = NodeIdGenerator::new(name, run);
818        let router1 = node_id_gen.new_router()?;
819        let router2 = node_id_gen.new_router()?;
820        let (circuit1_reader, circuit1_writer) = stream();
821        let (circuit2_reader, circuit2_writer) = stream();
822        let (out_1, _) = futures::channel::mpsc::unbounded();
823        let (out_2, _) = futures::channel::mpsc::unbounded();
824
825        let conn_1 = multi_stream_node_connection(
826            router1.circuit_node(),
827            circuit1_reader,
828            circuit2_writer,
829            true,
830            Quality::IN_PROCESS,
831            out_1,
832            "router1".to_owned(),
833        );
834        let conn_2 = multi_stream_node_connection(
835            router2.circuit_node(),
836            circuit2_reader,
837            circuit1_writer,
838            false,
839            Quality::IN_PROCESS,
840            out_2,
841            "router2".to_owned(),
842        );
843        let _fwd = Task::spawn(async move {
844            if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
845                log::trace!("forwarding failed: {:?}", e)
846            }
847        });
848        f(router1, router2).await
849    }
850
851    #[fuchsia::test]
852    async fn no_op_env(run: usize) -> Result<(), Error> {
853        run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
854    }
855
856    #[fuchsia::test]
857    async fn create_stream(run: usize) -> Result<(), Error> {
858        run_two_node("create_stream", run, |router1, router2| async move {
859            let (_, p) = fidl::Channel::create();
860            println!("create_stream: register service");
861            let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
862            println!("create_stream: connect to service");
863            router1.connect_to_service(router2.node_id, "create_stream", p).await?;
864            println!("create_stream: wait for connection");
865            let _ = s.await?;
866            Ok(())
867        })
868        .await
869    }
870
871    #[fuchsia::test]
872    async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
873        run_two_node("send_datagram_immediately", run, |router1, router2| async move {
874            let (c, p) = fidl::Channel::create();
875            println!("send_datagram_immediately: register service");
876            let s = register_test_service(
877                router2.clone(),
878                router1.clone(),
879                "send_datagram_immediately",
880            )
881            .await;
882            println!("send_datagram_immediately: connect to service");
883            router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
884            println!("send_datagram_immediately: wait for connection");
885            let s = s.await?;
886            let c = fidl::AsyncChannel::from_channel(c);
887            let s = fidl::AsyncChannel::from_channel(s);
888            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
889            let mut buf = fidl::MessageBufEtc::new();
890            println!("send_datagram_immediately: wait for datagram");
891            s.recv_etc_msg(&mut buf).await?;
892            assert_eq!(buf.n_handle_infos(), 0);
893            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
894            Ok(())
895        })
896        .await
897    }
898
899    #[fuchsia::test]
900    async fn ping_pong(run: usize) -> Result<(), Error> {
901        run_two_node("ping_pong", run, |router1, router2| async move {
902            let (c, p) = fidl::Channel::create();
903            println!("ping_pong: register service");
904            let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
905            println!("ping_pong: connect to service");
906            router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
907            println!("ping_pong: wait for connection");
908            let s = s.await?;
909            let c = fidl::AsyncChannel::from_channel(c);
910            let s = fidl::AsyncChannel::from_channel(s);
911            println!("ping_pong: send ping");
912            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
913            println!("ping_pong: receive ping");
914            let mut buf = fidl::MessageBufEtc::new();
915            s.recv_etc_msg(&mut buf).await?;
916            assert_eq!(buf.n_handle_infos(), 0);
917            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
918            println!("ping_pong: send pong");
919            s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
920            println!("ping_pong: receive pong");
921            let mut buf = fidl::MessageBufEtc::new();
922            c.recv_etc_msg(&mut buf).await?;
923            assert_eq!(buf.n_handle_infos(), 0);
924            assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
925            Ok(())
926        })
927        .await
928    }
929
930    fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
931        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
932        // Poll a bunch of times to convince ourselves the future is pending forever...
933        for _ in 0..1000 {
934            assert!(f.poll_unpin(&mut ctx).is_pending());
935        }
936    }
937
938    #[fuchsia::test]
939    async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
940        let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
941        let n = node_id_gen.new_router().unwrap();
942        let lp = n.new_list_peers_context().await;
943        lp.list_peers().await.unwrap();
944        let mut never_completes = async {
945            lp.list_peers().await.unwrap();
946        }
947        .boxed();
948        ensure_pending(&mut never_completes);
949        lp.list_peers().await.expect_err("Concurrent list peers should fail");
950        ensure_pending(&mut never_completes);
951        Ok(())
952    }
953}