routing/bedrock/
use_dictionary_router.rs1use crate::capability_source::CapabilitySource;
6use crate::error::RoutingError;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::channel::oneshot;
10use futures::stream::FuturesUnordered;
11use router_error::RouterError;
12use sandbox::{
13 Capability, Dict, EntryUpdate, Request, Routable, Router, RouterResponse,
14 UpdateNotifierRetention, WeakInstanceToken,
15};
16
17pub struct UseDictionaryRouter {
21 path: cm_types::Path,
22 moniker: moniker::Moniker,
23 original_dictionary: Dict,
24 dictionary_routers: Vec<Router<Dict>>,
25 capability_source: CapabilitySource,
26}
27
28impl UseDictionaryRouter {
29 pub fn new(
30 path: cm_types::Path,
31 moniker: moniker::Moniker,
32 original_dictionary: Dict,
33 dictionary_routers: Vec<Router<Dict>>,
34 capability_source: CapabilitySource,
35 ) -> Router<Dict> {
36 Router::new(Self {
37 path,
38 moniker,
39 original_dictionary,
40 dictionary_routers,
41 capability_source,
42 })
43 }
44
45 async fn dictionary_follow_updates_from(
52 &self,
53 self_dictionary: Dict,
54 other_dict: Dict,
55 ) -> Vec<(cm_types::Name, Capability, Capability)> {
56 let self_clone = self_dictionary;
57 let (sender, receiver) = oneshot::channel();
58 let mut sender = Some(sender);
59 let mut initial_conflicts = vec![];
60 other_dict.register_update_notifier(Box::new(move |entry_update| {
61 match entry_update {
62 EntryUpdate::Add(key, capability) => {
63 if let Some(preexisting_value) = self_clone.get(key).ok().flatten() {
64 initial_conflicts.push((
67 key.into(),
68 capability.try_clone().unwrap(),
69 preexisting_value,
70 ));
71 } else {
72 let _ = self_clone.insert(key.into(), capability.try_clone().unwrap());
73 }
74 }
75 EntryUpdate::Remove(key) => {
76 let _ = self_clone.remove(key);
77 }
78 EntryUpdate::Idle => {
79 if let Some(sender) = sender.take() {
80 let _ = sender.send(std::mem::take(&mut initial_conflicts));
81 }
82 }
83 }
84 UpdateNotifierRetention::Retain
85 }));
86
87 receiver.await.expect("sender was dropped unexpectedly")
88 }
89}
90
91#[async_trait]
92impl Routable<Dict> for UseDictionaryRouter {
93 async fn route(
94 &self,
95 request: Option<Request>,
96 debug: bool,
97 target: WeakInstanceToken,
98 ) -> Result<RouterResponse<Dict>, RouterError> {
99 if debug {
100 return Ok(RouterResponse::Debug(
101 self.capability_source
102 .clone()
103 .try_into()
104 .expect("failed to serialize capability source"),
105 ));
106 }
107 let mut futures_unordered = FuturesUnordered::new();
108 for dictionary_router in self.dictionary_routers.iter() {
109 let request = request.as_ref().and_then(|r| r.try_clone().ok());
110 futures_unordered.push(dictionary_router.route(request, false, target.clone()));
111 }
112 let resulting_dictionary = self.original_dictionary.shallow_copy().unwrap();
113 while let Some(route_result) = futures_unordered.next().await {
114 match route_result {
115 Ok(RouterResponse::Capability(other_dictionary)) => {
116 let initial_conflicts = self
117 .dictionary_follow_updates_from(
118 resulting_dictionary.clone(),
119 other_dictionary,
120 )
121 .await;
122 let mut conflicting_names = vec![];
123 for (key, capability, preexisting_value) in initial_conflicts {
124 log::warn!(
125 "{}: unable to add {key} from source {} to merged dictionary for path \
126 {} because the dictionary already contains an item with the same name \
127 from source {}",
128 &self.moniker,
129 try_get_router_source(&capability, target.clone())
130 .await
131 .unwrap_or_else(|| "<unknown>".to_string()),
132 &self.path,
133 try_get_router_source(&preexisting_value, target.clone())
134 .await
135 .unwrap_or_else(|| "<unknown>".to_string()),
136 );
137 conflicting_names.push(key);
138 }
139 if !conflicting_names.is_empty() {
140 return Err(RoutingError::ConflictingDictionaryEntries {
141 moniker: self.capability_source.source_moniker(),
142 conflicting_names,
143 }
144 .into());
145 }
146 }
147 Ok(RouterResponse::Unavailable) => (),
148 Ok(RouterResponse::Debug(_)) => {
149 panic!("got debug response when we didn't request one")
150 }
151 Err(_e) => {
152 }
157 }
158 }
159 Ok(resulting_dictionary.into())
160 }
161}
162
163async fn try_get_router_source(
164 capability: &Capability,
165 target: WeakInstanceToken,
166) -> Option<String> {
167 let source: crate::capability_source::CapabilitySource = match capability {
168 Capability::DictionaryRouter(router) => match router.route(None, true, target).await {
169 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
170 _ => return None,
171 },
172 Capability::ConnectorRouter(router) => match router.route(None, true, target).await {
173 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
174 _ => return None,
175 },
176 Capability::DirConnectorRouter(router) => match router.route(None, true, target).await {
177 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
178 _ => return None,
179 },
180 Capability::DataRouter(router) => match router.route(None, true, target).await {
181 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
182 _ => return None,
183 },
184 _ => return None,
185 };
186 Some(format!("{}", source))
187}