overnet_core/router/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Manages peer<->peer connections and routing packets over links between nodes.
6
7// General structure:
8// A router is a collection of: - streams (each with a StreamId<LinkData>)
9//                                These are streams of information flow between processes.
10//                              - peers (each with a PeerId)
11//                                These are other overnet instances in the overlay network.
12//                                There is a client oriented and a server oriented peer per
13//                                node.
14//                              - links (each with a LinkId)
15//                                These are connections between this instance and other
16//                                instances in the mesh.
17// For each node in the mesh, a routing table tracks which link on which to send data to
18// that node (said link may be a third node that will be requested to forward datagrams
19// on our behalf).
20
21mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{log_errors, Observer};
25use crate::handle_info::{handle_info, HandleKey, HandleType};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29    IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{bail, format_err, Context as _, Error};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, Handle, HandleBased, Socket};
34use fidl_fuchsia_overnet_protocol::{
35    ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36    ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use rand::Rng;
45use std::collections::{BTreeMap, HashMap};
46use std::pin::pin;
47use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
48use std::sync::{Arc, Weak};
49use std::task::{Context, Poll, Waker};
50use std::time::Duration;
51
52pub use self::service_map::ListablePeer;
53
54#[derive(Debug)]
55enum PendingTransfer {
56    Complete(FoundTransfer),
57    Waiting(Waker),
58}
59
60type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
61
62#[derive(Debug)]
63pub(crate) enum FoundTransfer {
64    Fused(Handle),
65    Remote(FramedStreamWriter, FramedStreamReader),
66}
67
68#[derive(Debug)]
69pub(crate) enum OpenedTransfer {
70    Fused,
71    Remote(FramedStreamWriter, FramedStreamReader, Handle),
72}
73
74#[derive(Debug)]
75#[allow(dead_code)]
76enum CircuitState {
77    Waiters(Vec<oneshot::Sender<()>>),
78    Peer(Arc<Peer>),
79}
80
81impl CircuitState {
82    fn peer(&self) -> Option<Arc<Peer>> {
83        if let CircuitState::Peer(peer) = self {
84            Some(Arc::clone(peer))
85        } else {
86            None
87        }
88    }
89}
90
91#[derive(Debug)]
92struct PeerMaps {
93    circuit_clients: BTreeMap<NodeId, CircuitState>,
94}
95
96/// Wrapper to get the right list_peers behavior.
97#[derive(Debug)]
98pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
99
100static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
101
102impl ListPeersContext {
103    /// Implementation of ListPeers fidl method.
104    pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
105        let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
106        log::trace!(list_peers_call = call_id; "get observer");
107        let mut obs = self
108            .0
109            .lock()
110            .await
111            .take()
112            .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
113        log::trace!(list_peers_call = call_id; "wait for value");
114        let r = obs.next().await;
115        log::trace!(list_peers_call = call_id; "replace observer");
116        *self.0.lock().await = Some(obs);
117        log::trace!(list_peers_call = call_id; "return");
118        Ok(r.unwrap_or_else(Vec::new))
119    }
120}
121
122/// Whether this node's ascendd clients should be routed to each other
123pub enum AscenddClientRouting {
124    /// Ascendd client routing is allowed
125    Enabled,
126    /// Ascendd client routing is prevented
127    Disabled,
128}
129
130/// Router maintains global state for one node_id.
131/// `LinkData` is a token identifying a link for layers above Router.
132/// `Time` is a representation of time for the Router, to assist injecting different platforms
133/// schemes.
134pub struct Router {
135    /// Our node id
136    node_id: NodeId,
137    /// All peers.
138    peers: Mutex<PeerMaps>,
139    service_map: ServiceMap,
140    proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
141    pending_transfers: Mutex<PendingTransferMap>,
142    task: Mutex<Option<Task<()>>>,
143    /// Hack to prevent the n^2 scaling of a fully-connected graph of ffxs
144    ascendd_client_routing: AtomicBool,
145    circuit_node: circuit::ConnectionNode,
146}
147
148struct ProxiedHandle {
149    remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
150    original_paired: HandleKey,
151    proxy_task: Task<()>,
152}
153
154/// Generate a new random node id
155fn generate_node_id() -> NodeId {
156    rand::thread_rng().gen::<u64>().into()
157}
158
159fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
160    v.sort();
161    v
162}
163
164impl std::fmt::Debug for Router {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        write!(f, "Router({:?})", self.node_id)
167    }
168}
169
170/// This is sent when initiating connections between circuit nodes and must be identical between all
171/// such nodes.
172const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
173
174impl Router {
175    /// Create a new router. If `router_interval` is given, this router will
176    /// behave like an interior node and tell its neighbors about each other.
177    pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
178        Router::with_node_id(generate_node_id(), router_interval)
179    }
180
181    /// Make a router with a specific node ID.
182    pub fn with_node_id(
183        node_id: NodeId,
184        router_interval: Option<Duration>,
185    ) -> Result<Arc<Self>, Error> {
186        let service_map = ServiceMap::new(node_id);
187        let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
188        let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
189            let (a, b) = circuit::ConnectionNode::new_with_router(
190                &node_id.circuit_string(),
191                OVERNET_CIRCUIT_PROTOCOL,
192                interval,
193                new_peer_sender,
194            )?;
195            (a, b.boxed())
196        } else {
197            let (a, b) = circuit::ConnectionNode::new(
198                &node_id.circuit_string(),
199                OVERNET_CIRCUIT_PROTOCOL,
200                new_peer_sender,
201            )?;
202            (a, b.boxed())
203        };
204        let router = Arc::new(Router {
205            node_id,
206            service_map,
207            peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
208            proxied_streams: Mutex::new(HashMap::new()),
209            pending_transfers: Mutex::new(PendingTransferMap::new()),
210            task: Mutex::new(None),
211            // Default is to route all clients to each other. Ffx daemon disabled client routing.
212            ascendd_client_routing: AtomicBool::new(true),
213            circuit_node,
214        });
215
216        let weak_router = Arc::downgrade(&router);
217        *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
218            run_circuits(weak_router, circuit_connections, new_peer_receiver),
219            format!("router {:?} support loop failed", node_id),
220        )));
221
222        Ok(router)
223    }
224
225    /// Get the circuit protocol node for this router. This will let us create new connections to
226    /// other nodes.
227    pub fn circuit_node(&self) -> &circuit::Node {
228        self.circuit_node.node()
229    }
230
231    /// Accessor for the node id of this router.
232    pub fn node_id(&self) -> NodeId {
233        self.node_id
234    }
235
236    /// Accessor for whether to route ascendd clients to each other
237    pub fn client_routing(&self) -> AscenddClientRouting {
238        if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
239            AscenddClientRouting::Enabled
240        } else {
241            AscenddClientRouting::Disabled
242        }
243    }
244
245    /// Setter for whether to route ascendd clients to each other
246    pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
247        let client_routing = match client_routing {
248            AscenddClientRouting::Enabled => true,
249            AscenddClientRouting::Disabled => false,
250        };
251        self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
252    }
253
254    pub(crate) fn service_map(&self) -> &ServiceMap {
255        &self.service_map
256    }
257
258    /// Create a new stream to advertised service `service` on remote node id `node`.
259    pub async fn connect_to_service(
260        self: &Arc<Self>,
261        node_id: NodeId,
262        service_name: &str,
263        chan: Channel,
264    ) -> Result<(), Error> {
265        let is_local = node_id == self.node_id;
266        log::trace!(
267            service_name:%,
268            node_id = node_id.0,
269            local = is_local;
270            "Request connect_to_service",
271        );
272        if is_local {
273            self.service_map().connect(service_name, chan).await
274        } else {
275            self.client_peer(node_id)
276                .await
277                .with_context(|| {
278                    format_err!(
279                        "Fetching client peer for new stream to {:?} for service {:?}",
280                        node_id,
281                        service_name,
282                    )
283                })?
284                .new_stream(service_name, chan, self)
285                .await
286        }
287    }
288
289    /// Register a service. The callback should connect the given channel to the
290    /// service in question.
291    pub async fn register_service(
292        &self,
293        service_name: String,
294        provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
295    ) -> Result<(), Error> {
296        self.service_map().register_service(service_name, provider).await;
297        Ok(())
298    }
299
300    /// Create a new list_peers context
301    pub async fn new_list_peers_context(&self) -> ListPeersContext {
302        ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
303    }
304
305    async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
306        loop {
307            let mut peers = self.peers.lock().await;
308            match peers.circuit_clients.get_mut(&peer_node_id) {
309                Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
310                Some(CircuitState::Waiters(waiters)) => {
311                    let (sender, receiver) = oneshot::channel();
312                    waiters.push(sender);
313                    std::mem::drop(peers);
314                    let _ = receiver.await;
315                }
316                None => {
317                    peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
318                }
319            }
320        }
321    }
322
323    fn add_proxied(
324        self: &Arc<Self>,
325        proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
326        this_handle_key: HandleKey,
327        pair_handle_key: HandleKey,
328        remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
329        f: impl 'static + Send + Future<Output = Result<(), Error>>,
330    ) {
331        let router = Arc::downgrade(&self);
332        let proxy_task = Task::spawn(async move {
333            if let Err(e) = f.await {
334                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
335            } else {
336                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
337            }
338            if let Some(router) = Weak::upgrade(&router) {
339                router.remove_proxied(this_handle_key, pair_handle_key).await;
340            }
341        });
342        assert!(proxied_streams
343            .insert(
344                this_handle_key,
345                ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
346            )
347            .is_none());
348    }
349
350    // Remove a proxied handle from our table.
351    // Called by proxy::Proxy::drop.
352    async fn remove_proxied(
353        self: &Arc<Self>,
354        this_handle_key: HandleKey,
355        pair_handle_key: HandleKey,
356    ) {
357        let mut proxied_streams = self.proxied_streams.lock().await;
358        log::trace!(
359            node_id = self.node_id.0,
360            this_handle_key:?,
361            pair_handle_key:?,
362            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
363            "REMOVE_PROXIED",
364        );
365        if let Some(removed) = proxied_streams.remove(&this_handle_key) {
366            assert_eq!(removed.original_paired, pair_handle_key);
367            let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
368            removed.proxy_task.detach();
369        }
370    }
371
372    // Prepare a handle to be sent to another machine.
373    // Returns a ZirconHandle describing the established proxy.
374    pub(crate) async fn send_proxied(
375        self: &Arc<Self>,
376        handle: Handle,
377        conn: PeerConnRef<'_>,
378    ) -> Result<ZirconHandle, Error> {
379        let raw_handle = handle.raw_handle(); // for debugging
380        let info = handle_info(handle.as_handle_ref())
381            .with_context(|| format!("Getting handle information for {}", raw_handle))?;
382        let mut proxied_streams = self.proxied_streams.lock().await;
383        log::trace!(
384            node_id = self.node_id.0,
385            handle:?,
386            info:?,
387            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
388            "SEND_PROXIED",
389        );
390        if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
391            // This handle is the other end of an already proxied object...
392            // Here we need to inform the existing proxy loop that a transfer is going to be
393            // initiated, and to where.
394            drop(proxied_streams);
395            assert_eq!(info.this_handle_key, pair.original_paired);
396            log::trace!(
397                handle:?,
398                orig_pair:? = pair.original_paired;
399                "Send paired proxied"
400            );
401            // We allocate a drain stream to flush any messages we've buffered locally to the new
402            // endpoint.
403            let drain_stream = conn.alloc_uni().await?.into();
404            let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
405            pair.remove_sender
406                .send(RemoveFromProxyTable::InitiateTransfer {
407                    paired_handle: handle,
408                    drain_stream,
409                    stream_ref_sender,
410                })
411                .map_err(|_| format_err!("Failed to initiate transfer"))?;
412            let stream_ref = stream_ref_receiver
413                .await
414                .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
415            pair.proxy_task.detach();
416            match info.handle_type {
417                HandleType::Channel(rights) => {
418                    Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
419                }
420                HandleType::Socket(socket_type, rights) => {
421                    Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
422                }
423                HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
424                    stream_ref,
425                    rights: EventPairRights::empty(),
426                })),
427            }
428        } else {
429            // This handle (and its pair) is previously unseen... establish a proxy stream for it
430            log::trace!(handle:?; "Send proxied");
431            let (tx, rx) = futures::channel::oneshot::channel();
432            let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
433                format_err!(
434                    "cancelled transfer via send_proxied {:?}\n{}",
435                    info,
436                    111 //std::backtrace::Backtrace::force_capture()
437                )
438            }));
439            let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
440            let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
441            Ok(match info.handle_type {
442                HandleType::Channel(rights) => {
443                    self.add_proxied(
444                        &mut *proxied_streams,
445                        info.this_handle_key,
446                        info.pair_handle_key,
447                        tx,
448                        crate::proxy::spawn_send(
449                            Channel::from_handle(handle).into_proxied()?,
450                            rx,
451                            stream_writer.into(),
452                            stream_reader.into(),
453                            Arc::downgrade(&self),
454                        ),
455                    );
456                    ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
457                }
458                HandleType::Socket(socket_type, rights) => {
459                    self.add_proxied(
460                        &mut *proxied_streams,
461                        info.this_handle_key,
462                        info.pair_handle_key,
463                        tx,
464                        crate::proxy::spawn_send(
465                            Socket::from_handle(handle).into_proxied()?,
466                            rx,
467                            stream_writer.into(),
468                            stream_reader.into(),
469                            Arc::downgrade(&self),
470                        ),
471                    );
472                    ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
473                }
474                HandleType::EventPair => {
475                    self.add_proxied(
476                        &mut *proxied_streams,
477                        info.this_handle_key,
478                        info.pair_handle_key,
479                        tx,
480                        crate::proxy::spawn_send(
481                            EventPair::from_handle(handle).into_proxied()?,
482                            rx,
483                            stream_writer.into(),
484                            stream_reader.into(),
485                            Arc::downgrade(&self),
486                        ),
487                    );
488                    ZirconHandle::EventPair(EventPairHandle {
489                        stream_ref,
490                        rights: EventPairRights::empty(),
491                    })
492                }
493            })
494        }
495    }
496
497    // Take a received handle description and construct a fidl::Handle that represents it
498    // whilst establishing proxies as required
499    pub(crate) async fn recv_proxied(
500        self: &Arc<Self>,
501        handle: ZirconHandle,
502        conn: PeerConnRef<'_>,
503    ) -> Result<Handle, Error> {
504        match handle {
505            ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
506                self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
507                    .await
508            }
509            ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
510                self.recv_proxied_handle(
511                    conn,
512                    stream_ref,
513                    move || {
514                        Ok(match socket_type {
515                            SocketType::Stream => Socket::create_stream(),
516                            SocketType::Datagram => Socket::create_datagram(),
517                        })
518                    },
519                    rights,
520                )
521                .await
522            }
523            ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
524                self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
525                    .await
526            }
527        }
528    }
529
530    async fn recv_proxied_handle<Hdl, CreateType>(
531        self: &Arc<Self>,
532        conn: PeerConnRef<'_>,
533        stream_ref: StreamRef,
534        create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
535        rights: CreateType::Rights,
536    ) -> Result<Handle, Error>
537    where
538        Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
539        CreateType: 'static
540            + fidl::HandleBased
541            + IntoProxied<Proxied = Hdl>
542            + std::fmt::Debug
543            + crate::handle_info::WithRights,
544    {
545        let (tx, rx) = futures::channel::oneshot::channel();
546        let rx = ProxyTransferInitiationReceiver::new(
547            rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
548        );
549        let (h, p) = crate::proxy::spawn_recv(
550            create_handles,
551            rights,
552            rx,
553            stream_ref,
554            conn,
555            Arc::downgrade(&self),
556        )
557        .await?;
558        if let Some(p) = p {
559            let info = handle_info(h.as_handle_ref())?;
560            self.add_proxied(
561                &mut *self.proxied_streams.lock().await,
562                info.pair_handle_key,
563                info.this_handle_key,
564                tx,
565                p,
566            );
567        }
568        Ok(h)
569    }
570
571    // Note the endpoint of a transfer that we know about (may complete a transfer operation)
572    pub(crate) async fn post_transfer(
573        &self,
574        transfer_key: TransferKey,
575        other_end: FoundTransfer,
576    ) -> Result<(), Error> {
577        let mut pending_transfers = self.pending_transfers.lock().await;
578        match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
579            Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
580            Some(PendingTransfer::Waiting(w)) => w.wake(),
581            None => (),
582        }
583        Ok(())
584    }
585
586    fn poll_find_transfer(
587        &self,
588        ctx: &mut Context<'_>,
589        transfer_key: TransferKey,
590        lock: &mut MutexTicket<'_, PendingTransferMap>,
591    ) -> Poll<Result<FoundTransfer, Error>> {
592        let mut pending_transfers = ready!(lock.poll(ctx));
593        if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
594        {
595            Poll::Ready(Ok(other_end))
596        } else {
597            pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
598            Poll::Pending
599        }
600    }
601
602    // Lookup a transfer that we're expected to eventually know about
603    pub(crate) async fn find_transfer(
604        &self,
605        transfer_key: TransferKey,
606    ) -> Result<FoundTransfer, Error> {
607        let mut lock = MutexTicket::new(&self.pending_transfers);
608        poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
609    }
610
611    // Begin a transfer operation (opposite of find_transfer), publishing an endpoint on the remote
612    // nodes transfer table.
613    pub(crate) async fn open_transfer(
614        self: &Arc<Router>,
615        target: NodeId,
616        transfer_key: TransferKey,
617        handle: Handle,
618    ) -> Result<OpenedTransfer, Error> {
619        if target == self.node_id {
620            // The target is local: we just file away the handle.
621            // Later, find_transfer will find this and we'll collapse away Overnet's involvement and
622            // reunite the channel ends.
623            let info = handle_info(handle.as_handle_ref())?;
624            let mut proxied_streams = self.proxied_streams.lock().await;
625            log::trace!(
626                node_id = self.node_id.0,
627                key:? = transfer_key,
628                info:? = info,
629                all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
630                "OPEN_TRANSFER_REMOVE_PROXIED",
631            );
632            if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
633                assert_eq!(removed.original_paired, info.pair_handle_key);
634                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
635                removed.proxy_task.detach();
636            }
637            if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
638                assert_eq!(removed.original_paired, info.this_handle_key);
639                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
640                removed.proxy_task.detach();
641            }
642            self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
643            Ok(OpenedTransfer::Fused)
644        } else {
645            if let Some((writer, reader)) =
646                self.client_peer(target).await?.send_open_transfer(transfer_key).await
647            {
648                Ok(OpenedTransfer::Remote(writer, reader, handle))
649            } else {
650                bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
651            }
652        }
653    }
654
655    pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
656        self.peers.lock().await.circuit_clients.remove(&peer_node_id);
657    }
658}
659
660/// Runs our `ConnectionNode` to set up circuit-based peers.
661async fn run_circuits(
662    router: Weak<Router>,
663    connections: impl futures::Stream<Item = circuit::Connection> + Send,
664    peers: impl futures::Stream<Item = String> + Send,
665) -> Result<(), Error> {
666    // Notes whenever a new peer announces itself on the network.
667    let new_peer_fut = {
668        let router = router.clone();
669        async move {
670            let mut peers = pin!(peers);
671            while let Some(peer_node_id) = peers.next().await {
672                let router = match router.upgrade() {
673                    Some(x) => x,
674                    None => {
675                        log::warn!("Router disappeared from under circuit runner");
676                        break;
677                    }
678                };
679
680                let res = async {
681                    let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
682                        .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
683                    let mut peers = router.peers.lock().await;
684
685                    if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
686                    {
687                        log::warn!(peer:? = peer_node_id; "Re-establishing connection");
688                    }
689
690                    let (reader, writer_remote) = circuit::stream::stream();
691                    let (reader_remote, writer) = circuit::stream::stream();
692                    let conn = router
693                        .circuit_node
694                        .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
695                        .await?;
696                    let peer = Peer::new_circuit_client(
697                        conn,
698                        writer,
699                        reader,
700                        router.service_map.new_local_service_observer().await,
701                        &router,
702                        peer_node_id_num,
703                    )?;
704
705                    if let Some(CircuitState::Waiters(waiters)) =
706                        peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
707                    {
708                        for waiter in waiters {
709                            let _ = waiter.send(());
710                        }
711                    }
712
713                    Result::<_, Error>::Ok(())
714                }
715                .await;
716
717                if let Err(e) = res {
718                    log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
719                }
720            }
721        }
722    };
723
724    // Loops over incoming connections and starts servers.
725    let new_conn_fut = async move {
726        let mut connections = pin!(connections);
727        while let Some(conn) = connections.next().await {
728            let peer_name = conn.from().to_owned();
729            let res = async {
730                let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
731                Peer::new_circuit_server(conn, &router).await?;
732                Result::<_, Error>::Ok(())
733            }
734            .await;
735
736            if let Err(e) = res {
737                log::warn!(
738                    peer:? = peer_name;
739                    "Attempt to receive connection from peer failed: {:?}",
740                    e
741                );
742            }
743        }
744    };
745
746    futures::future::join(new_peer_fut, new_conn_fut).await;
747    Ok(())
748}
749
750#[cfg(test)]
751mod tests {
752
753    use super::*;
754    use crate::test_util::*;
755    use circuit::multi_stream::multi_stream_node_connection;
756    use circuit::stream::stream;
757    use circuit::Quality;
758
759    #[fuchsia::test]
760    async fn no_op(run: usize) {
761        let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
762        node_id_gen.new_router().unwrap();
763        let id = node_id_gen.next().unwrap();
764        assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
765    }
766
767    async fn register_test_service(
768        serving_router: Arc<Router>,
769        client_router: Arc<Router>,
770        service: &'static str,
771    ) -> futures::channel::oneshot::Receiver<Channel> {
772        use fuchsia_sync::Mutex;
773        let (send, recv) = futures::channel::oneshot::channel();
774        serving_router
775            .service_map()
776            .register_service(service.to_string(), {
777                let sender = Mutex::new(Some(send));
778                move |chan| {
779                    println!("{} got request", service);
780                    sender.lock().take().unwrap().send(chan).unwrap();
781                    println!("{} forwarded channel", service);
782                    Ok(())
783                }
784            })
785            .await;
786        let serving_node_id = serving_router.node_id();
787        println!("{} wait for service to appear @ client", service);
788        let lpc = client_router.new_list_peers_context().await;
789        loop {
790            let peers = lpc.list_peers().await.unwrap();
791            println!("{} got peers {:?}", service, peers);
792            if peers
793                .iter()
794                .find(move |peer| {
795                    serving_node_id == peer.node_id
796                        && peer
797                            .services
798                            .iter()
799                            .find(move |&advertised_service| advertised_service == service)
800                            .is_some()
801                })
802                .is_some()
803            {
804                break;
805            }
806        }
807        recv
808    }
809
810    async fn run_two_node<
811        F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
812        Fut: 'static + Send + Future<Output = Result<(), Error>>,
813    >(
814        name: &'static str,
815        run: usize,
816        f: F,
817    ) -> Result<(), Error> {
818        let mut node_id_gen = NodeIdGenerator::new(name, run);
819        let router1 = node_id_gen.new_router()?;
820        let router2 = node_id_gen.new_router()?;
821        let (circuit1_reader, circuit1_writer) = stream();
822        let (circuit2_reader, circuit2_writer) = stream();
823        let (out_1, _) = futures::channel::mpsc::unbounded();
824        let (out_2, _) = futures::channel::mpsc::unbounded();
825
826        let conn_1 = multi_stream_node_connection(
827            router1.circuit_node(),
828            circuit1_reader,
829            circuit2_writer,
830            true,
831            Quality::IN_PROCESS,
832            out_1,
833            "router1".to_owned(),
834        );
835        let conn_2 = multi_stream_node_connection(
836            router2.circuit_node(),
837            circuit2_reader,
838            circuit1_writer,
839            false,
840            Quality::IN_PROCESS,
841            out_2,
842            "router2".to_owned(),
843        );
844        let _fwd = Task::spawn(async move {
845            if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
846                log::trace!("forwarding failed: {:?}", e)
847            }
848        });
849        f(router1, router2).await
850    }
851
852    #[fuchsia::test]
853    async fn no_op_env(run: usize) -> Result<(), Error> {
854        run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
855    }
856
857    #[fuchsia::test]
858    async fn create_stream(run: usize) -> Result<(), Error> {
859        run_two_node("create_stream", run, |router1, router2| async move {
860            let (_, p) = fidl::Channel::create();
861            println!("create_stream: register service");
862            let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
863            println!("create_stream: connect to service");
864            router1.connect_to_service(router2.node_id, "create_stream", p).await?;
865            println!("create_stream: wait for connection");
866            let _ = s.await?;
867            Ok(())
868        })
869        .await
870    }
871
872    #[fuchsia::test]
873    async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
874        run_two_node("send_datagram_immediately", run, |router1, router2| async move {
875            let (c, p) = fidl::Channel::create();
876            println!("send_datagram_immediately: register service");
877            let s = register_test_service(
878                router2.clone(),
879                router1.clone(),
880                "send_datagram_immediately",
881            )
882            .await;
883            println!("send_datagram_immediately: connect to service");
884            router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
885            println!("send_datagram_immediately: wait for connection");
886            let s = s.await?;
887            let c = fidl::AsyncChannel::from_channel(c);
888            let s = fidl::AsyncChannel::from_channel(s);
889            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
890            let mut buf = fidl::MessageBufEtc::new();
891            println!("send_datagram_immediately: wait for datagram");
892            s.recv_etc_msg(&mut buf).await?;
893            assert_eq!(buf.n_handle_infos(), 0);
894            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
895            Ok(())
896        })
897        .await
898    }
899
900    #[fuchsia::test]
901    async fn ping_pong(run: usize) -> Result<(), Error> {
902        run_two_node("ping_pong", run, |router1, router2| async move {
903            let (c, p) = fidl::Channel::create();
904            println!("ping_pong: register service");
905            let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
906            println!("ping_pong: connect to service");
907            router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
908            println!("ping_pong: wait for connection");
909            let s = s.await?;
910            let c = fidl::AsyncChannel::from_channel(c);
911            let s = fidl::AsyncChannel::from_channel(s);
912            println!("ping_pong: send ping");
913            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
914            println!("ping_pong: receive ping");
915            let mut buf = fidl::MessageBufEtc::new();
916            s.recv_etc_msg(&mut buf).await?;
917            assert_eq!(buf.n_handle_infos(), 0);
918            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
919            println!("ping_pong: send pong");
920            s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
921            println!("ping_pong: receive pong");
922            let mut buf = fidl::MessageBufEtc::new();
923            c.recv_etc_msg(&mut buf).await?;
924            assert_eq!(buf.n_handle_infos(), 0);
925            assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
926            Ok(())
927        })
928        .await
929    }
930
931    fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
932        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
933        // Poll a bunch of times to convince ourselves the future is pending forever...
934        for _ in 0..1000 {
935            assert!(f.poll_unpin(&mut ctx).is_pending());
936        }
937    }
938
939    #[fuchsia::test]
940    async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
941        let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
942        let n = node_id_gen.new_router().unwrap();
943        let lp = n.new_list_peers_context().await;
944        lp.list_peers().await.unwrap();
945        let mut never_completes = async {
946            lp.list_peers().await.unwrap();
947        }
948        .boxed();
949        ensure_pending(&mut never_completes);
950        lp.list_peers().await.expect_err("Concurrent list peers should fail");
951        ensure_pending(&mut never_completes);
952        Ok(())
953    }
954}