routing/bedrock/
use_dictionary_router.rs1use crate::error::RoutingError;
6use async_trait::async_trait;
7use capability_source::CapabilitySource;
8use fidl_fuchsia_component_runtime::RouteRequest;
9use futures::StreamExt;
10use futures::channel::oneshot;
11use futures::stream::FuturesUnordered;
12use router_error::RouterError;
13use runtime_capabilities::{
14 Capability, Dictionary, EntryUpdate, Routable, Router, UpdateNotifierRetention,
15 WeakInstanceToken,
16};
17
18pub struct UseDictionaryRouter {
22 path: cm_types::Path,
23 moniker: moniker::Moniker,
24 original_dictionary: Dictionary,
25 dictionary_routers: Vec<Router<Dictionary>>,
26 capability_source: CapabilitySource,
27}
28
29impl UseDictionaryRouter {
30 pub fn new(
31 path: cm_types::Path,
32 moniker: moniker::Moniker,
33 original_dictionary: Dictionary,
34 dictionary_routers: Vec<Router<Dictionary>>,
35 capability_source: CapabilitySource,
36 ) -> Router<Dictionary> {
37 Router::new(Self {
38 path,
39 moniker,
40 original_dictionary,
41 dictionary_routers,
42 capability_source,
43 })
44 }
45
46 async fn dictionary_follow_updates_from(
53 &self,
54 self_dictionary: Dictionary,
55 other_dict: Dictionary,
56 ) -> Vec<(cm_types::Name, Capability, Capability)> {
57 let self_clone = self_dictionary;
58 let (sender, receiver) = oneshot::channel();
59 let mut sender = Some(sender);
60 let mut initial_conflicts = vec![];
61 other_dict.register_update_notifier(Box::new(move |entry_update| {
62 match entry_update {
63 EntryUpdate::Add(key, capability) => {
64 if let Some(preexisting_value) = self_clone.get(key) {
65 initial_conflicts.push((key.into(), capability.clone(), preexisting_value));
68 } else {
69 let _ = self_clone.insert(key.into(), capability.clone());
70 }
71 }
72 EntryUpdate::Remove(key) => {
73 let _ = self_clone.remove(key);
74 }
75 EntryUpdate::Idle => {
76 if let Some(sender) = sender.take() {
77 let _ = sender.send(std::mem::take(&mut initial_conflicts));
78 }
79 }
80 }
81 UpdateNotifierRetention::Retain
82 }));
83
84 receiver.await.expect("sender was dropped unexpectedly")
85 }
86}
87
88#[async_trait]
89impl Routable<Dictionary> for UseDictionaryRouter {
90 async fn route(
91 &self,
92 request: RouteRequest,
93 target: WeakInstanceToken,
94 ) -> Result<Option<Dictionary>, RouterError> {
95 let mut futures_unordered = FuturesUnordered::new();
96 for dictionary_router in self.dictionary_routers.iter() {
97 futures_unordered.push(dictionary_router.route(request.clone(), target.clone()));
98 }
99 let resulting_dictionary = self.original_dictionary.shallow_copy();
100 while let Some(route_result) = futures_unordered.next().await {
101 match route_result {
102 Ok(Some(other_dictionary)) => {
103 let initial_conflicts = self
104 .dictionary_follow_updates_from(
105 resulting_dictionary.clone(),
106 other_dictionary,
107 )
108 .await;
109 let mut conflicting_names = vec![];
110 for (key, capability, preexisting_value) in initial_conflicts {
111 log::warn!(
112 "{}: unable to add {key} from source {} to merged dictionary for path \
113 {} because the dictionary already contains an item with the same name \
114 from source {}",
115 &self.moniker,
116 try_get_router_source(&capability, target.clone())
117 .await
118 .unwrap_or_else(|| "<unknown>".to_string()),
119 &self.path,
120 try_get_router_source(&preexisting_value, target.clone())
121 .await
122 .unwrap_or_else(|| "<unknown>".to_string()),
123 );
124 conflicting_names.push(key);
125 }
126 if !conflicting_names.is_empty() {
127 return Err(RoutingError::ConflictingDictionaryEntries {
128 moniker: self.capability_source.source_moniker(),
129 conflicting_names,
130 }
131 .into());
132 }
133 }
134 Ok(None) => (),
135 Err(_e) => {
136 }
141 }
142 }
143 Ok(resulting_dictionary.into())
144 }
145
146 async fn route_debug(
147 &self,
148 _request: RouteRequest,
149 _target: WeakInstanceToken,
150 ) -> Result<CapabilitySource, RouterError> {
151 Ok(self.capability_source.clone())
152 }
153}
154
155async fn try_get_router_source(
156 capability: &Capability,
157 target: WeakInstanceToken,
158) -> Option<String> {
159 let source: CapabilitySource = match capability {
160 Capability::DictionaryRouter(router) => {
161 router.route_debug(RouteRequest::default(), target).await.ok()?
162 }
163 Capability::ConnectorRouter(router) => {
164 router.route_debug(RouteRequest::default(), target).await.ok()?
165 }
166 Capability::DirConnectorRouter(router) => {
167 router.route_debug(RouteRequest::default(), target).await.ok()?
168 }
169 Capability::DataRouter(router) => {
170 router.route_debug(RouteRequest::default(), target).await.ok()?
171 }
172 _ => return None,
173 };
174 Some(format!("{}", source))
175}