overnet_core/router/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Manages peer<->peer connections and routing packets over links between nodes.
6
7// General structure:
8// A router is a collection of: - streams (each with a StreamId<LinkData>)
9//                                These are streams of information flow between processes.
10//                              - peers (each with a PeerId)
11//                                These are other overnet instances in the overlay network.
12//                                There is a client oriented and a server oriented peer per
13//                                node.
14//                              - links (each with a LinkId)
15//                                These are connections between this instance and other
16//                                instances in the mesh.
17// For each node in the mesh, a routing table tracks which link on which to send data to
18// that node (said link may be a third node that will be requested to forward datagrams
19// on our behalf).
20
21mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{Observer, log_errors};
25use crate::handle_info::{HandleKey, HandleType, handle_info};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29    IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{Context as _, Error, bail, format_err};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, HandleBased, NullableHandle, Socket};
34use fidl_fuchsia_overnet_protocol::{
35    ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36    ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use std::collections::{BTreeMap, HashMap};
45use std::pin::pin;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Weak};
48use std::task::{Context, Poll, Waker};
49use std::time::Duration;
50
51pub use self::service_map::ListablePeer;
52
53#[derive(Debug)]
54enum PendingTransfer {
55    Complete(FoundTransfer),
56    Waiting(Waker),
57}
58
59type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
60
61#[derive(Debug)]
62pub(crate) enum FoundTransfer {
63    Fused(NullableHandle),
64    Remote(FramedStreamWriter, FramedStreamReader),
65}
66
67#[derive(Debug)]
68pub(crate) enum OpenedTransfer {
69    Fused,
70    Remote(FramedStreamWriter, FramedStreamReader, NullableHandle),
71}
72
73#[derive(Debug)]
74#[allow(dead_code)]
75enum CircuitState {
76    Waiters(Vec<oneshot::Sender<()>>),
77    Peer(Arc<Peer>),
78}
79
80impl CircuitState {
81    fn peer(&self) -> Option<Arc<Peer>> {
82        if let CircuitState::Peer(peer) = self { Some(Arc::clone(peer)) } else { None }
83    }
84}
85
86#[derive(Debug)]
87struct PeerMaps {
88    circuit_clients: BTreeMap<NodeId, CircuitState>,
89}
90
91/// Wrapper to get the right list_peers behavior.
92#[derive(Debug)]
93pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
94
95static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
96
97impl ListPeersContext {
98    /// Implementation of ListPeers fidl method.
99    pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
100        let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
101        log::trace!(list_peers_call = call_id; "get observer");
102        let mut obs = self
103            .0
104            .lock()
105            .await
106            .take()
107            .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
108        log::trace!(list_peers_call = call_id; "wait for value");
109        let r = obs.next().await;
110        log::trace!(list_peers_call = call_id; "replace observer");
111        *self.0.lock().await = Some(obs);
112        log::trace!(list_peers_call = call_id; "return");
113        Ok(r.unwrap_or_else(Vec::new))
114    }
115}
116
117/// Whether this node's ascendd clients should be routed to each other
118pub enum AscenddClientRouting {
119    /// Ascendd client routing is allowed
120    Enabled,
121    /// Ascendd client routing is prevented
122    Disabled,
123}
124
125/// Router maintains global state for one node_id.
126/// `LinkData` is a token identifying a link for layers above Router.
127/// `Time` is a representation of time for the Router, to assist injecting different platforms
128/// schemes.
129pub struct Router {
130    /// Our node id
131    node_id: NodeId,
132    /// All peers.
133    peers: Mutex<PeerMaps>,
134    service_map: ServiceMap,
135    proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
136    pending_transfers: Mutex<PendingTransferMap>,
137    task: Mutex<Option<Task<()>>>,
138    /// Hack to prevent the n^2 scaling of a fully-connected graph of ffxs
139    ascendd_client_routing: AtomicBool,
140    circuit_node: circuit::ConnectionNode,
141}
142
143struct ProxiedHandle {
144    remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
145    original_paired: HandleKey,
146    proxy_task: Task<()>,
147}
148
149/// Generate a new random node id
150fn generate_node_id() -> NodeId {
151    rand::random::<u64>().into()
152}
153
154fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
155    v.sort();
156    v
157}
158
159impl std::fmt::Debug for Router {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "Router({:?})", self.node_id)
162    }
163}
164
165/// This is sent when initiating connections between circuit nodes and must be identical between all
166/// such nodes.
167const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
168
169impl Router {
170    /// Create a new router. If `router_interval` is given, this router will
171    /// behave like an interior node and tell its neighbors about each other.
172    pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
173        Router::with_node_id(generate_node_id(), router_interval)
174    }
175
176    /// Make a router with a specific node ID.
177    pub fn with_node_id(
178        node_id: NodeId,
179        router_interval: Option<Duration>,
180    ) -> Result<Arc<Self>, Error> {
181        let service_map = ServiceMap::new(node_id);
182        let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
183        let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
184            let (a, b) = circuit::ConnectionNode::new_with_router(
185                &node_id.circuit_string(),
186                OVERNET_CIRCUIT_PROTOCOL,
187                interval,
188                new_peer_sender,
189            )?;
190            (a, b.boxed())
191        } else {
192            let (a, b) = circuit::ConnectionNode::new(
193                &node_id.circuit_string(),
194                OVERNET_CIRCUIT_PROTOCOL,
195                new_peer_sender,
196            )?;
197            (a, b.boxed())
198        };
199        let router = Arc::new(Router {
200            node_id,
201            service_map,
202            peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
203            proxied_streams: Mutex::new(HashMap::new()),
204            pending_transfers: Mutex::new(PendingTransferMap::new()),
205            task: Mutex::new(None),
206            // Default is to route all clients to each other. Ffx daemon disabled client routing.
207            ascendd_client_routing: AtomicBool::new(true),
208            circuit_node,
209        });
210
211        let weak_router = Arc::downgrade(&router);
212        *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
213            run_circuits(weak_router, circuit_connections, new_peer_receiver),
214            format!("router {:?} support loop failed", node_id),
215        )));
216
217        Ok(router)
218    }
219
220    /// Get the circuit protocol node for this router. This will let us create new connections to
221    /// other nodes.
222    pub fn circuit_node(&self) -> &circuit::Node {
223        self.circuit_node.node()
224    }
225
226    /// Accessor for the node id of this router.
227    pub fn node_id(&self) -> NodeId {
228        self.node_id
229    }
230
231    /// Accessor for whether to route ascendd clients to each other
232    pub fn client_routing(&self) -> AscenddClientRouting {
233        if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
234            AscenddClientRouting::Enabled
235        } else {
236            AscenddClientRouting::Disabled
237        }
238    }
239
240    /// Setter for whether to route ascendd clients to each other
241    pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
242        let client_routing = match client_routing {
243            AscenddClientRouting::Enabled => true,
244            AscenddClientRouting::Disabled => false,
245        };
246        self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
247    }
248
249    pub(crate) fn service_map(&self) -> &ServiceMap {
250        &self.service_map
251    }
252
253    /// Create a new stream to advertised service `service` on remote node id `node`.
254    pub async fn connect_to_service(
255        self: &Arc<Self>,
256        node_id: NodeId,
257        service_name: &str,
258        chan: Channel,
259    ) -> Result<(), Error> {
260        let is_local = node_id == self.node_id;
261        log::trace!(
262            service_name:%,
263            node_id = node_id.0,
264            local = is_local;
265            "Request connect_to_service",
266        );
267        if is_local {
268            self.service_map().connect(service_name, chan).await
269        } else {
270            self.client_peer(node_id)
271                .await
272                .with_context(|| {
273                    format_err!(
274                        "Fetching client peer for new stream to {:?} for service {:?}",
275                        node_id,
276                        service_name,
277                    )
278                })?
279                .new_stream(service_name, chan, self)
280                .await
281        }
282    }
283
284    /// Register a service. The callback should connect the given channel to the
285    /// service in question.
286    pub async fn register_service(
287        &self,
288        service_name: String,
289        provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
290    ) -> Result<(), Error> {
291        self.service_map().register_service(service_name, provider).await;
292        Ok(())
293    }
294
295    /// Create a new list_peers context
296    pub async fn new_list_peers_context(&self) -> ListPeersContext {
297        ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
298    }
299
300    async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
301        loop {
302            let mut peers = self.peers.lock().await;
303            match peers.circuit_clients.get_mut(&peer_node_id) {
304                Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
305                Some(CircuitState::Waiters(waiters)) => {
306                    let (sender, receiver) = oneshot::channel();
307                    waiters.push(sender);
308                    std::mem::drop(peers);
309                    let _ = receiver.await;
310                }
311                None => {
312                    peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
313                }
314            }
315        }
316    }
317
318    fn add_proxied(
319        self: &Arc<Self>,
320        proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
321        this_handle_key: HandleKey,
322        pair_handle_key: HandleKey,
323        remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
324        f: impl 'static + Send + Future<Output = Result<(), Error>>,
325    ) {
326        let router = Arc::downgrade(&self);
327        let proxy_task = Task::spawn(async move {
328            if let Err(e) = f.await {
329                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
330            } else {
331                log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
332            }
333            if let Some(router) = Weak::upgrade(&router) {
334                router.remove_proxied(this_handle_key, pair_handle_key).await;
335            }
336        });
337        assert!(
338            proxied_streams
339                .insert(
340                    this_handle_key,
341                    ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
342                )
343                .is_none()
344        );
345    }
346
347    // Remove a proxied handle from our table.
348    // Called by proxy::Proxy::drop.
349    async fn remove_proxied(
350        self: &Arc<Self>,
351        this_handle_key: HandleKey,
352        pair_handle_key: HandleKey,
353    ) {
354        let mut proxied_streams = self.proxied_streams.lock().await;
355        log::trace!(
356            node_id = self.node_id.0,
357            this_handle_key:?,
358            pair_handle_key:?,
359            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
360            "REMOVE_PROXIED",
361        );
362        if let Some(removed) = proxied_streams.remove(&this_handle_key) {
363            assert_eq!(removed.original_paired, pair_handle_key);
364            let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
365            removed.proxy_task.detach();
366        }
367    }
368
369    // Prepare a handle to be sent to another machine.
370    // Returns a ZirconHandle describing the established proxy.
371    pub(crate) async fn send_proxied(
372        self: &Arc<Self>,
373        handle: NullableHandle,
374        conn: PeerConnRef<'_>,
375    ) -> Result<ZirconHandle, Error> {
376        let raw_handle = handle.raw_handle(); // for debugging
377        let info = handle_info(handle.as_handle_ref())
378            .with_context(|| format!("Getting handle information for {}", raw_handle))?;
379        let mut proxied_streams = self.proxied_streams.lock().await;
380        log::trace!(
381            node_id = self.node_id.0,
382            handle:?,
383            info:?,
384            all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
385            "SEND_PROXIED",
386        );
387        if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
388            // This handle is the other end of an already proxied object...
389            // Here we need to inform the existing proxy loop that a transfer is going to be
390            // initiated, and to where.
391            drop(proxied_streams);
392            assert_eq!(info.this_handle_key, pair.original_paired);
393            log::trace!(
394                handle:?,
395                orig_pair:? = pair.original_paired;
396                "Send paired proxied"
397            );
398            // We allocate a drain stream to flush any messages we've buffered locally to the new
399            // endpoint.
400            let drain_stream = conn.alloc_uni().await?.into();
401            let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
402            pair.remove_sender
403                .send(RemoveFromProxyTable::InitiateTransfer {
404                    paired_handle: handle,
405                    drain_stream,
406                    stream_ref_sender,
407                })
408                .map_err(|_| format_err!("Failed to initiate transfer"))?;
409            let stream_ref = stream_ref_receiver
410                .await
411                .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
412            pair.proxy_task.detach();
413            match info.handle_type {
414                HandleType::Channel(rights) => {
415                    Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
416                }
417                HandleType::Socket(socket_type, rights) => {
418                    Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
419                }
420                HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
421                    stream_ref,
422                    rights: EventPairRights::empty(),
423                })),
424            }
425        } else {
426            // This handle (and its pair) is previously unseen... establish a proxy stream for it
427            log::trace!(handle:?; "Send proxied");
428            let (tx, rx) = futures::channel::oneshot::channel();
429            let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
430                format_err!(
431                    "cancelled transfer via send_proxied {:?}\n{}",
432                    info,
433                    111 //std::backtrace::Backtrace::force_capture()
434                )
435            }));
436            let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
437            let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
438            Ok(match info.handle_type {
439                HandleType::Channel(rights) => {
440                    self.add_proxied(
441                        &mut *proxied_streams,
442                        info.this_handle_key,
443                        info.pair_handle_key,
444                        tx,
445                        crate::proxy::spawn_send(
446                            Channel::from_handle(handle).into_proxied()?,
447                            rx,
448                            stream_writer.into(),
449                            stream_reader.into(),
450                            Arc::downgrade(&self),
451                        ),
452                    );
453                    ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
454                }
455                HandleType::Socket(socket_type, rights) => {
456                    self.add_proxied(
457                        &mut *proxied_streams,
458                        info.this_handle_key,
459                        info.pair_handle_key,
460                        tx,
461                        crate::proxy::spawn_send(
462                            Socket::from_handle(handle).into_proxied()?,
463                            rx,
464                            stream_writer.into(),
465                            stream_reader.into(),
466                            Arc::downgrade(&self),
467                        ),
468                    );
469                    ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
470                }
471                HandleType::EventPair => {
472                    self.add_proxied(
473                        &mut *proxied_streams,
474                        info.this_handle_key,
475                        info.pair_handle_key,
476                        tx,
477                        crate::proxy::spawn_send(
478                            EventPair::from_handle(handle).into_proxied()?,
479                            rx,
480                            stream_writer.into(),
481                            stream_reader.into(),
482                            Arc::downgrade(&self),
483                        ),
484                    );
485                    ZirconHandle::EventPair(EventPairHandle {
486                        stream_ref,
487                        rights: EventPairRights::empty(),
488                    })
489                }
490            })
491        }
492    }
493
494    // Take a received handle description and construct a fidl::NullableHandle that represents it
495    // whilst establishing proxies as required
496    pub(crate) async fn recv_proxied(
497        self: &Arc<Self>,
498        handle: ZirconHandle,
499        conn: PeerConnRef<'_>,
500    ) -> Result<NullableHandle, Error> {
501        match handle {
502            ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
503                self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
504                    .await
505            }
506            ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
507                self.recv_proxied_handle(
508                    conn,
509                    stream_ref,
510                    move || {
511                        Ok(match socket_type {
512                            SocketType::Stream => Socket::create_stream(),
513                            SocketType::Datagram => Socket::create_datagram(),
514                        })
515                    },
516                    rights,
517                )
518                .await
519            }
520            ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
521                self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
522                    .await
523            }
524        }
525    }
526
527    async fn recv_proxied_handle<Hdl, CreateType>(
528        self: &Arc<Self>,
529        conn: PeerConnRef<'_>,
530        stream_ref: StreamRef,
531        create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
532        rights: CreateType::Rights,
533    ) -> Result<NullableHandle, Error>
534    where
535        Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
536        CreateType: 'static
537            + fidl::HandleBased
538            + IntoProxied<Proxied = Hdl>
539            + std::fmt::Debug
540            + crate::handle_info::WithRights,
541    {
542        let (tx, rx) = futures::channel::oneshot::channel();
543        let rx = ProxyTransferInitiationReceiver::new(
544            rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
545        );
546        let (h, p) = crate::proxy::spawn_recv(
547            create_handles,
548            rights,
549            rx,
550            stream_ref,
551            conn,
552            Arc::downgrade(&self),
553        )
554        .await?;
555        if let Some(p) = p {
556            let info = handle_info(h.as_handle_ref())?;
557            self.add_proxied(
558                &mut *self.proxied_streams.lock().await,
559                info.pair_handle_key,
560                info.this_handle_key,
561                tx,
562                p,
563            );
564        }
565        Ok(h)
566    }
567
568    // Note the endpoint of a transfer that we know about (may complete a transfer operation)
569    pub(crate) async fn post_transfer(
570        &self,
571        transfer_key: TransferKey,
572        other_end: FoundTransfer,
573    ) -> Result<(), Error> {
574        let mut pending_transfers = self.pending_transfers.lock().await;
575        match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
576            Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
577            Some(PendingTransfer::Waiting(w)) => w.wake(),
578            None => (),
579        }
580        Ok(())
581    }
582
583    fn poll_find_transfer(
584        &self,
585        ctx: &mut Context<'_>,
586        transfer_key: TransferKey,
587        lock: &mut MutexTicket<'_, PendingTransferMap>,
588    ) -> Poll<Result<FoundTransfer, Error>> {
589        let mut pending_transfers = ready!(lock.poll(ctx));
590        if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
591        {
592            Poll::Ready(Ok(other_end))
593        } else {
594            pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
595            Poll::Pending
596        }
597    }
598
599    // Lookup a transfer that we're expected to eventually know about
600    pub(crate) async fn find_transfer(
601        &self,
602        transfer_key: TransferKey,
603    ) -> Result<FoundTransfer, Error> {
604        let mut lock = MutexTicket::new(&self.pending_transfers);
605        poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
606    }
607
608    // Begin a transfer operation (opposite of find_transfer), publishing an endpoint on the remote
609    // nodes transfer table.
610    pub(crate) async fn open_transfer(
611        self: &Arc<Router>,
612        target: NodeId,
613        transfer_key: TransferKey,
614        handle: NullableHandle,
615    ) -> Result<OpenedTransfer, Error> {
616        if target == self.node_id {
617            // The target is local: we just file away the handle.
618            // Later, find_transfer will find this and we'll collapse away Overnet's involvement and
619            // reunite the channel ends.
620            let info = handle_info(handle.as_handle_ref())?;
621            let mut proxied_streams = self.proxied_streams.lock().await;
622            log::trace!(
623                node_id = self.node_id.0,
624                key:? = transfer_key,
625                info:? = info,
626                all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
627                "OPEN_TRANSFER_REMOVE_PROXIED",
628            );
629            if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
630                assert_eq!(removed.original_paired, info.pair_handle_key);
631                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
632                removed.proxy_task.detach();
633            }
634            if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
635                assert_eq!(removed.original_paired, info.this_handle_key);
636                assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
637                removed.proxy_task.detach();
638            }
639            self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
640            Ok(OpenedTransfer::Fused)
641        } else {
642            if let Some((writer, reader)) =
643                self.client_peer(target).await?.send_open_transfer(transfer_key).await
644            {
645                Ok(OpenedTransfer::Remote(writer, reader, handle))
646            } else {
647                bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
648            }
649        }
650    }
651
652    pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
653        self.peers.lock().await.circuit_clients.remove(&peer_node_id);
654    }
655}
656
657/// Runs our `ConnectionNode` to set up circuit-based peers.
658async fn run_circuits(
659    router: Weak<Router>,
660    connections: impl futures::Stream<Item = circuit::Connection> + Send,
661    peers: impl futures::Stream<Item = String> + Send,
662) -> Result<(), Error> {
663    // Notes whenever a new peer announces itself on the network.
664    let new_peer_fut = {
665        let router = router.clone();
666        async move {
667            let mut peers = pin!(peers);
668            while let Some(peer_node_id) = peers.next().await {
669                let router = match router.upgrade() {
670                    Some(x) => x,
671                    None => {
672                        log::warn!("Router disappeared from under circuit runner");
673                        break;
674                    }
675                };
676
677                let res = async {
678                    let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
679                        .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
680                    let mut peers = router.peers.lock().await;
681
682                    if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
683                    {
684                        log::warn!(peer:? = peer_node_id; "Re-establishing connection");
685                    }
686
687                    let (reader, writer_remote) = circuit::stream::stream();
688                    let (reader_remote, writer) = circuit::stream::stream();
689                    let conn = router
690                        .circuit_node
691                        .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
692                        .await?;
693                    let peer = Peer::new_circuit_client(
694                        conn,
695                        writer,
696                        reader,
697                        router.service_map.new_local_service_observer().await,
698                        &router,
699                        peer_node_id_num,
700                    )?;
701
702                    if let Some(CircuitState::Waiters(waiters)) =
703                        peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
704                    {
705                        for waiter in waiters {
706                            let _ = waiter.send(());
707                        }
708                    }
709
710                    Result::<_, Error>::Ok(())
711                }
712                .await;
713
714                if let Err(e) = res {
715                    log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
716                }
717            }
718        }
719    };
720
721    // Loops over incoming connections and starts servers.
722    let new_conn_fut = async move {
723        let mut connections = pin!(connections);
724        while let Some(conn) = connections.next().await {
725            let peer_name = conn.from().to_owned();
726            let res = async {
727                let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
728                Peer::new_circuit_server(conn, &router).await?;
729                Result::<_, Error>::Ok(())
730            }
731            .await;
732
733            if let Err(e) = res {
734                log::warn!(
735                    peer:? = peer_name;
736                    "Attempt to receive connection from peer failed: {:?}",
737                    e
738                );
739            }
740        }
741    };
742
743    futures::future::join(new_peer_fut, new_conn_fut).await;
744    Ok(())
745}
746
747#[cfg(test)]
748mod tests {
749
750    use super::*;
751    use crate::test_util::*;
752    use circuit::Quality;
753    use circuit::multi_stream::multi_stream_node_connection;
754    use circuit::stream::stream;
755
756    #[fuchsia::test]
757    async fn no_op(run: usize) {
758        let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
759        node_id_gen.new_router().unwrap();
760        let id = node_id_gen.next().unwrap();
761        assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
762    }
763
764    async fn register_test_service(
765        serving_router: Arc<Router>,
766        client_router: Arc<Router>,
767        service: &'static str,
768    ) -> futures::channel::oneshot::Receiver<Channel> {
769        use fuchsia_sync::Mutex;
770        let (send, recv) = futures::channel::oneshot::channel();
771        serving_router
772            .service_map()
773            .register_service(service.to_string(), {
774                let sender = Mutex::new(Some(send));
775                move |chan| {
776                    println!("{} got request", service);
777                    sender.lock().take().unwrap().send(chan).unwrap();
778                    println!("{} forwarded channel", service);
779                    Ok(())
780                }
781            })
782            .await;
783        let serving_node_id = serving_router.node_id();
784        println!("{} wait for service to appear @ client", service);
785        let lpc = client_router.new_list_peers_context().await;
786        loop {
787            let peers = lpc.list_peers().await.unwrap();
788            println!("{} got peers {:?}", service, peers);
789            if peers
790                .iter()
791                .find(move |peer| {
792                    serving_node_id == peer.node_id
793                        && peer
794                            .services
795                            .iter()
796                            .find(move |&advertised_service| advertised_service == service)
797                            .is_some()
798                })
799                .is_some()
800            {
801                break;
802            }
803        }
804        recv
805    }
806
807    async fn run_two_node<
808        F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
809        Fut: 'static + Send + Future<Output = Result<(), Error>>,
810    >(
811        name: &'static str,
812        run: usize,
813        f: F,
814    ) -> Result<(), Error> {
815        let mut node_id_gen = NodeIdGenerator::new(name, run);
816        let router1 = node_id_gen.new_router()?;
817        let router2 = node_id_gen.new_router()?;
818        let (circuit1_reader, circuit1_writer) = stream();
819        let (circuit2_reader, circuit2_writer) = stream();
820        let (out_1, _) = futures::channel::mpsc::unbounded();
821        let (out_2, _) = futures::channel::mpsc::unbounded();
822
823        let conn_1 = multi_stream_node_connection(
824            router1.circuit_node(),
825            circuit1_reader,
826            circuit2_writer,
827            true,
828            Quality::IN_PROCESS,
829            out_1,
830            "router1".to_owned(),
831        );
832        let conn_2 = multi_stream_node_connection(
833            router2.circuit_node(),
834            circuit2_reader,
835            circuit1_writer,
836            false,
837            Quality::IN_PROCESS,
838            out_2,
839            "router2".to_owned(),
840        );
841        let _fwd = Task::spawn(async move {
842            if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
843                log::trace!("forwarding failed: {:?}", e)
844            }
845        });
846        f(router1, router2).await
847    }
848
849    #[fuchsia::test]
850    async fn no_op_env(run: usize) -> Result<(), Error> {
851        run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
852    }
853
854    #[fuchsia::test]
855    async fn create_stream(run: usize) -> Result<(), Error> {
856        run_two_node("create_stream", run, |router1, router2| async move {
857            let (_, p) = fidl::Channel::create();
858            println!("create_stream: register service");
859            let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
860            println!("create_stream: connect to service");
861            router1.connect_to_service(router2.node_id, "create_stream", p).await?;
862            println!("create_stream: wait for connection");
863            let _ = s.await?;
864            Ok(())
865        })
866        .await
867    }
868
869    #[fuchsia::test]
870    async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
871        run_two_node("send_datagram_immediately", run, |router1, router2| async move {
872            let (c, p) = fidl::Channel::create();
873            println!("send_datagram_immediately: register service");
874            let s = register_test_service(
875                router2.clone(),
876                router1.clone(),
877                "send_datagram_immediately",
878            )
879            .await;
880            println!("send_datagram_immediately: connect to service");
881            router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
882            println!("send_datagram_immediately: wait for connection");
883            let s = s.await?;
884            let c = fidl::AsyncChannel::from_channel(c);
885            let s = fidl::AsyncChannel::from_channel(s);
886            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
887            let mut buf = fidl::MessageBufEtc::new();
888            println!("send_datagram_immediately: wait for datagram");
889            s.recv_etc_msg(&mut buf).await?;
890            assert_eq!(buf.n_handle_infos(), 0);
891            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
892            Ok(())
893        })
894        .await
895    }
896
897    #[fuchsia::test]
898    async fn ping_pong(run: usize) -> Result<(), Error> {
899        run_two_node("ping_pong", run, |router1, router2| async move {
900            let (c, p) = fidl::Channel::create();
901            println!("ping_pong: register service");
902            let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
903            println!("ping_pong: connect to service");
904            router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
905            println!("ping_pong: wait for connection");
906            let s = s.await?;
907            let c = fidl::AsyncChannel::from_channel(c);
908            let s = fidl::AsyncChannel::from_channel(s);
909            println!("ping_pong: send ping");
910            c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
911            println!("ping_pong: receive ping");
912            let mut buf = fidl::MessageBufEtc::new();
913            s.recv_etc_msg(&mut buf).await?;
914            assert_eq!(buf.n_handle_infos(), 0);
915            assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
916            println!("ping_pong: send pong");
917            s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
918            println!("ping_pong: receive pong");
919            let mut buf = fidl::MessageBufEtc::new();
920            c.recv_etc_msg(&mut buf).await?;
921            assert_eq!(buf.n_handle_infos(), 0);
922            assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
923            Ok(())
924        })
925        .await
926    }
927
928    fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
929        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
930        // Poll a bunch of times to convince ourselves the future is pending forever...
931        for _ in 0..1000 {
932            assert!(f.poll_unpin(&mut ctx).is_pending());
933        }
934    }
935
936    #[fuchsia::test]
937    async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
938        let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
939        let n = node_id_gen.new_router().unwrap();
940        let lp = n.new_list_peers_context().await;
941        lp.list_peers().await.unwrap();
942        let mut never_completes = async {
943            lp.list_peers().await.unwrap();
944        }
945        .boxed();
946        ensure_pending(&mut never_completes);
947        lp.list_peers().await.expect_err("Concurrent list peers should fail");
948        ensure_pending(&mut never_completes);
949        Ok(())
950    }
951}