overnet_core/router/
service_map.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::future_help::{Observable, Observer};
6use crate::labels::NodeId;
7use anyhow::{bail, format_err, Error};
8use fidl::Channel;
9use futures::lock::Mutex;
10use std::collections::{btree_map, BTreeMap};
11
12/// A type that can be converted into a super::Peer
13#[derive(Debug, Clone, PartialEq)]
14pub struct ListablePeer {
15    /// Node ID of this peer.
16    pub node_id: NodeId,
17    /// Whether this peer is the node producing the list.
18    pub is_self: bool,
19    /// Services offered by this peer.
20    pub services: Vec<String>,
21}
22
23struct ListablePeerSet {
24    listable_peers: Vec<ListablePeer>,
25    peers_with_client_connection: BTreeMap<NodeId, usize>,
26}
27
28impl ListablePeerSet {
29    fn publish(&self) -> Vec<ListablePeer> {
30        let peers_with_client_connection = &self.peers_with_client_connection;
31        self.listable_peers
32            .iter()
33            .filter(move |p| p.is_self || peers_with_client_connection.contains_key(&p.node_id))
34            .cloned()
35            .collect()
36    }
37}
38
39pub struct ServiceMapInner {
40    local_services: BTreeMap<String, Box<dyn Fn(fidl::Channel) -> Result<(), Error> + Send>>,
41    local_service_list: Observable<Vec<String>>,
42    list_peers: Observable<Vec<ListablePeer>>,
43    listable_peer_set: ListablePeerSet,
44}
45
46pub struct ServiceMap {
47    inner: Mutex<ServiceMapInner>,
48    local_node_id: NodeId,
49}
50
51impl ServiceMap {
52    pub fn new(local_node_id: NodeId) -> ServiceMap {
53        let listable_peers =
54            vec![ListablePeer { node_id: local_node_id, is_self: true, services: vec![] }];
55        ServiceMap {
56            local_node_id,
57            inner: Mutex::new(ServiceMapInner {
58                local_services: BTreeMap::new(),
59                local_service_list: Observable::new(Vec::new()),
60                list_peers: Observable::new(listable_peers.clone()),
61                listable_peer_set: ListablePeerSet {
62                    listable_peers,
63                    peers_with_client_connection: std::iter::once((local_node_id, 1)).collect(),
64                },
65            }),
66        }
67    }
68
69    pub async fn connect(&self, service_name: &str, chan: Channel) -> Result<(), Error> {
70        (self
71            .inner
72            .lock()
73            .await
74            .local_services
75            .get(service_name)
76            .ok_or_else(|| format_err!("Service not found: {}", service_name))?)(chan)?;
77        Ok(())
78    }
79
80    pub async fn register_service(
81        &self,
82        service_name: String,
83        provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
84    ) {
85        log::trace!("Request register_service '{}'", service_name);
86        let mut inner = self.inner.lock().await;
87        if inner.local_services.insert(service_name.clone(), Box::new(provider)).is_none() {
88            log::trace!("Publish new service '{}'", service_name);
89            let services: Vec<String> = inner.local_services.keys().cloned().collect();
90            inner.local_service_list.maybe_push(services).await;
91        }
92    }
93
94    pub async fn update_node(&self, node_id: NodeId, services: Vec<String>) -> Result<(), Error> {
95        if node_id == self.local_node_id {
96            bail!("Attempt to set local services list");
97        }
98        self.update_list_peers(ListablePeer { node_id, is_self: false, services }).await;
99        Ok(())
100    }
101
102    async fn update_list_peers(&self, update_peer: ListablePeer) {
103        let mut inner = self.inner.lock().await;
104        let peers = &mut inner.listable_peer_set.listable_peers;
105        for existing_peer in peers.iter_mut() {
106            if existing_peer.node_id == update_peer.node_id {
107                if *existing_peer == update_peer {
108                    return;
109                }
110                *existing_peer = update_peer;
111                inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
112                return;
113            }
114        }
115        peers.push(update_peer);
116        inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
117    }
118
119    pub async fn add_client_connection(&self, peer_id: NodeId) {
120        let mut inner = self.inner.lock().await;
121        match inner.listable_peer_set.peers_with_client_connection.entry(peer_id) {
122            btree_map::Entry::Occupied(o) => *o.into_mut() += 1,
123            btree_map::Entry::Vacant(v) => {
124                v.insert(1);
125                inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
126            }
127        }
128    }
129
130    pub async fn remove_client_connection(&self, peer_id: NodeId) {
131        let mut inner = self.inner.lock().await;
132        match inner.listable_peer_set.peers_with_client_connection.entry(peer_id) {
133            btree_map::Entry::Occupied(mut o) => match *o.get() {
134                0 => unreachable!(),
135                1 => {
136                    o.remove();
137                    inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
138                }
139                n => *o.get_mut() = n - 1,
140            },
141            btree_map::Entry::Vacant(_) => unreachable!(),
142        }
143    }
144
145    pub async fn new_local_service_observer(&self) -> Observer<Vec<String>> {
146        self.inner.lock().await.local_service_list.new_observer()
147    }
148
149    pub async fn new_list_peers_observer(&self) -> Observer<Vec<ListablePeer>> {
150        self.inner.lock().await.list_peers.new_observer()
151    }
152}