routing/bedrock/
use_dictionary_router.rs1use crate::error::RoutingError;
6use async_trait::async_trait;
7use capability_source::CapabilitySource;
8use fidl_fuchsia_component_runtime::RouteRequest;
9use futures::StreamExt;
10use futures::channel::oneshot;
11use futures::stream::FuturesUnordered;
12use router_error::RouterError;
13use runtime_capabilities::{
14 Capability, Dictionary, EntryUpdate, Routable, Router, UpdateNotifierRetention,
15 WeakInstanceToken,
16};
17use std::sync::Arc;
18
19pub struct UseDictionaryRouter {
23 path: cm_types::Path,
24 moniker: moniker::Moniker,
25 original_dictionary: Arc<Dictionary>,
26 dictionary_routers: Vec<Arc<Router<Dictionary>>>,
27 capability_source: CapabilitySource,
28}
29
30impl UseDictionaryRouter {
31 pub fn new(
32 path: cm_types::Path,
33 moniker: moniker::Moniker,
34 original_dictionary: Arc<Dictionary>,
35 dictionary_routers: Vec<Arc<Router<Dictionary>>>,
36 capability_source: CapabilitySource,
37 ) -> Arc<Router<Dictionary>> {
38 Router::new(Self {
39 path,
40 moniker,
41 original_dictionary,
42 dictionary_routers,
43 capability_source,
44 })
45 }
46
47 async fn dictionary_follow_updates_from(
54 &self,
55 self_dictionary: Arc<Dictionary>,
56 other_dict: Arc<Dictionary>,
57 ) -> Vec<(cm_types::Name, Capability, Capability)> {
58 let self_clone = self_dictionary;
59 let (sender, receiver) = oneshot::channel();
60 let mut sender = Some(sender);
61 let mut initial_conflicts = vec![];
62 other_dict.register_update_notifier(Box::new(move |entry_update| {
63 match entry_update {
64 EntryUpdate::Add(key, capability) => {
65 if let Some(preexisting_value) = self_clone.get(key) {
66 initial_conflicts.push((key.into(), capability.clone(), preexisting_value));
69 } else {
70 let _ = self_clone.insert(key.into(), capability.clone());
71 }
72 }
73 EntryUpdate::Remove(key) => {
74 let _ = self_clone.remove(key);
75 }
76 EntryUpdate::Idle => {
77 if let Some(sender) = sender.take() {
78 let _ = sender.send(std::mem::take(&mut initial_conflicts));
79 }
80 }
81 }
82 UpdateNotifierRetention::Retain
83 }));
84
85 receiver.await.expect("sender was dropped unexpectedly")
86 }
87}
88
89#[async_trait]
90impl Routable<Dictionary> for UseDictionaryRouter {
91 async fn route(
92 &self,
93 request: RouteRequest,
94 target: Arc<WeakInstanceToken>,
95 ) -> Result<Option<Arc<Dictionary>>, RouterError> {
96 let mut futures_unordered = FuturesUnordered::new();
97 for dictionary_router in self.dictionary_routers.iter() {
98 futures_unordered.push(dictionary_router.route(request.clone(), target.clone()));
99 }
100 let resulting_dictionary = self.original_dictionary.shallow_copy();
101 while let Some(route_result) = futures_unordered.next().await {
102 match route_result {
103 Ok(Some(other_dictionary)) => {
104 let initial_conflicts = self
105 .dictionary_follow_updates_from(
106 resulting_dictionary.clone(),
107 other_dictionary,
108 )
109 .await;
110 let mut conflicting_names = vec![];
111 for (key, capability, preexisting_value) in initial_conflicts {
112 log::warn!(
113 "{}: unable to add {key} from source {} to merged dictionary for path \
114 {} because the dictionary already contains an item with the same name \
115 from source {}",
116 self.moniker,
117 try_get_router_source(&capability, target.clone())
118 .await
119 .unwrap_or_else(|| "<unknown>".to_string()),
120 self.path,
121 try_get_router_source(&preexisting_value, target.clone())
122 .await
123 .unwrap_or_else(|| "<unknown>".to_string()),
124 );
125 conflicting_names.push(key);
126 }
127 if !conflicting_names.is_empty() {
128 return Err(RoutingError::ConflictingDictionaryEntries {
129 moniker: self.capability_source.source_moniker(),
130 conflicting_names,
131 }
132 .into());
133 }
134 }
135 Ok(None) => (),
136 Err(_e) => {
137 }
142 }
143 }
144 Ok(resulting_dictionary.into())
145 }
146
147 async fn route_debug(
148 &self,
149 _request: RouteRequest,
150 _target: Arc<WeakInstanceToken>,
151 ) -> Result<CapabilitySource, RouterError> {
152 Ok(self.capability_source.clone())
153 }
154}
155
156async fn try_get_router_source(
157 capability: &Capability,
158 target: Arc<WeakInstanceToken>,
159) -> Option<String> {
160 let source: CapabilitySource = match capability {
161 Capability::DictionaryRouter(router) => {
162 router.route_debug(RouteRequest::default(), target).await.ok()?
163 }
164 Capability::ConnectorRouter(router) => {
165 router.route_debug(RouteRequest::default(), target).await.ok()?
166 }
167 Capability::DirConnectorRouter(router) => {
168 router.route_debug(RouteRequest::default(), target).await.ok()?
169 }
170 Capability::DataRouter(router) => {
171 router.route_debug(RouteRequest::default(), target).await.ok()?
172 }
173 _ => return None,
174 };
175 Some(format!("{}", source))
176}