overnet_core/router/
service_map.rs1use crate::future_help::{Observable, Observer};
6use crate::labels::NodeId;
7use anyhow::{bail, format_err, Error};
8use fidl::Channel;
9use futures::lock::Mutex;
10use std::collections::{btree_map, BTreeMap};
11
12#[derive(Debug, Clone, PartialEq)]
14pub struct ListablePeer {
15 pub node_id: NodeId,
17 pub is_self: bool,
19 pub services: Vec<String>,
21}
22
23struct ListablePeerSet {
24 listable_peers: Vec<ListablePeer>,
25 peers_with_client_connection: BTreeMap<NodeId, usize>,
26}
27
28impl ListablePeerSet {
29 fn publish(&self) -> Vec<ListablePeer> {
30 let peers_with_client_connection = &self.peers_with_client_connection;
31 self.listable_peers
32 .iter()
33 .filter(move |p| p.is_self || peers_with_client_connection.contains_key(&p.node_id))
34 .cloned()
35 .collect()
36 }
37}
38
39pub struct ServiceMapInner {
40 local_services: BTreeMap<String, Box<dyn Fn(fidl::Channel) -> Result<(), Error> + Send>>,
41 local_service_list: Observable<Vec<String>>,
42 list_peers: Observable<Vec<ListablePeer>>,
43 listable_peer_set: ListablePeerSet,
44}
45
46pub struct ServiceMap {
47 inner: Mutex<ServiceMapInner>,
48 local_node_id: NodeId,
49}
50
51impl ServiceMap {
52 pub fn new(local_node_id: NodeId) -> ServiceMap {
53 let listable_peers =
54 vec![ListablePeer { node_id: local_node_id, is_self: true, services: vec![] }];
55 ServiceMap {
56 local_node_id,
57 inner: Mutex::new(ServiceMapInner {
58 local_services: BTreeMap::new(),
59 local_service_list: Observable::new(Vec::new()),
60 list_peers: Observable::new(listable_peers.clone()),
61 listable_peer_set: ListablePeerSet {
62 listable_peers,
63 peers_with_client_connection: std::iter::once((local_node_id, 1)).collect(),
64 },
65 }),
66 }
67 }
68
69 pub async fn connect(&self, service_name: &str, chan: Channel) -> Result<(), Error> {
70 (self
71 .inner
72 .lock()
73 .await
74 .local_services
75 .get(service_name)
76 .ok_or_else(|| format_err!("Service not found: {}", service_name))?)(chan)?;
77 Ok(())
78 }
79
80 pub async fn register_service(
81 &self,
82 service_name: String,
83 provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
84 ) {
85 log::trace!("Request register_service '{}'", service_name);
86 let mut inner = self.inner.lock().await;
87 if inner.local_services.insert(service_name.clone(), Box::new(provider)).is_none() {
88 log::trace!("Publish new service '{}'", service_name);
89 let services: Vec<String> = inner.local_services.keys().cloned().collect();
90 inner.local_service_list.maybe_push(services).await;
91 }
92 }
93
94 pub async fn update_node(&self, node_id: NodeId, services: Vec<String>) -> Result<(), Error> {
95 if node_id == self.local_node_id {
96 bail!("Attempt to set local services list");
97 }
98 self.update_list_peers(ListablePeer { node_id, is_self: false, services }).await;
99 Ok(())
100 }
101
102 async fn update_list_peers(&self, update_peer: ListablePeer) {
103 let mut inner = self.inner.lock().await;
104 let peers = &mut inner.listable_peer_set.listable_peers;
105 for existing_peer in peers.iter_mut() {
106 if existing_peer.node_id == update_peer.node_id {
107 if *existing_peer == update_peer {
108 return;
109 }
110 *existing_peer = update_peer;
111 inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
112 return;
113 }
114 }
115 peers.push(update_peer);
116 inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
117 }
118
119 pub async fn add_client_connection(&self, peer_id: NodeId) {
120 let mut inner = self.inner.lock().await;
121 match inner.listable_peer_set.peers_with_client_connection.entry(peer_id) {
122 btree_map::Entry::Occupied(o) => *o.into_mut() += 1,
123 btree_map::Entry::Vacant(v) => {
124 v.insert(1);
125 inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
126 }
127 }
128 }
129
130 pub async fn remove_client_connection(&self, peer_id: NodeId) {
131 let mut inner = self.inner.lock().await;
132 match inner.listable_peer_set.peers_with_client_connection.entry(peer_id) {
133 btree_map::Entry::Occupied(mut o) => match *o.get() {
134 0 => unreachable!(),
135 1 => {
136 o.remove();
137 inner.list_peers.maybe_push(inner.listable_peer_set.publish()).await;
138 }
139 n => *o.get_mut() = n - 1,
140 },
141 btree_map::Entry::Vacant(_) => unreachable!(),
142 }
143 }
144
145 pub async fn new_local_service_observer(&self) -> Observer<Vec<String>> {
146 self.inner.lock().await.local_service_list.new_observer()
147 }
148
149 pub async fn new_list_peers_observer(&self) -> Observer<Vec<ListablePeer>> {
150 self.inner.lock().await.list_peers.new_observer()
151 }
152}