1use futures::stream::{BoxStream, SelectAll};
6use futures::{FutureExt, Stream, StreamExt};
7use pin_project::pin_project;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_memory_attribution as fattribution};
12
13#[derive(Debug, Clone, Eq, PartialEq, Hash)]
14pub struct PrincipalIdentifier(pub u64);
15
16#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
17pub enum Resource {
18 KernelObject(zx::Koid),
19 Vmar { process: zx::Koid, base: usize, len: usize },
20}
21
22#[derive(Debug, Clone)]
24pub struct Principal {
25 pub identifier: PrincipalIdentifier,
27
28 pub name: String,
30
31 pub resources: Vec<Resource>,
33
34 pub children: Vec<Principal>,
36}
37
38impl Principal {
39 pub fn new(identifier: PrincipalIdentifier, name: String) -> Principal {
40 Principal { identifier, name, resources: vec![], children: vec![] }
41 }
42}
43
44pub fn attribute_memory(
53 identifier: PrincipalIdentifier,
54 name: String,
55 attribution_provider: fattribution::ProviderProxy,
56 introspector: fcomponent::IntrospectorProxy,
57) -> BoxStream<'static, Principal> {
58 futures::stream::unfold(
59 StreamState::new(identifier, name, introspector, attribution_provider),
60 get_next,
61 )
62 .boxed()
63}
64
65async fn get_next(mut state: StreamState) -> Option<(Principal, StreamState)> {
67 let mut node = state
68 .node
69 .clone()
70 .unwrap_or_else(|| Principal::new(state.identifier.clone(), state.name.clone()));
71 let mut children: HashMap<PrincipalIdentifier, Principal> =
72 node.children.clone().into_iter().map(|n| (n.identifier.clone(), n)).collect();
73
74 match state.next().await {
76 Some(event) => {
77 match event {
78 Event::Node(attributions) => {
80 for attribution in attributions {
81 handle_update(attribution, &mut state, &mut children).await;
82 }
83 }
84 Event::Child(child) => {
86 children.insert(child.identifier.clone(), child);
87 }
88 }
89 }
90 None => return None,
91 }
92
93 node.children = children.into_values().collect();
94 state.node = Some(node.clone());
95 Some((node, state))
96}
97
98async fn handle_update(
99 attribution: fattribution::AttributionUpdate,
100 state: &mut StreamState,
101 children: &mut HashMap<PrincipalIdentifier, Principal>,
102) {
103 match attribution {
104 fattribution::AttributionUpdate::Add(new_principal) => {
105 let identifier_id = PrincipalIdentifier(new_principal.identifier.unwrap());
106 let principal_name =
107 get_identifier_string(new_principal.description.unwrap(), &state.introspector)
108 .await;
109
110 if let Some(client) = new_principal.detailed_attribution {
112 state.child_update.push(
113 attribute_memory(
114 identifier_id.clone(),
115 principal_name.clone(),
116 client.into_proxy(),
117 state.introspector.clone(),
118 )
119 .boxed(),
120 );
121 }
122 children.insert(identifier_id.clone(), Principal::new(identifier_id, principal_name));
123 }
124 fattribution::AttributionUpdate::Update(updated_principal) => {
125 let identifier = PrincipalIdentifier(updated_principal.identifier.unwrap());
126
127 let child = children.get_mut(&identifier).unwrap();
128 let raw_resources = match updated_principal.resources.unwrap() {
129 fattribution::Resources::Data(d) => d.resources,
130 fattribution::Resources::Buffer(b) => {
131 let mapping = mapped_vmo::ImmutableMapping::create_from_vmo(&b, false).unwrap();
132 let resource_vector: fattribution::Data = fidl::unpersist(&mapping).unwrap();
133 resource_vector.resources
134 }
135 fattribution::ResourcesUnknown!() => {
136 unimplemented!()
137 }
138 };
139 child.resources = raw_resources
140 .into_iter()
141 .filter_map(|r| match r {
142 fattribution::Resource::KernelObject(koid) => {
143 Some(Resource::KernelObject(zx::Koid::from_raw(koid)))
144 }
145 fattribution::Resource::ProcessMapped(vmar) => Some(Resource::Vmar {
146 process: zx::Koid::from_raw(vmar.process),
147 base: vmar.base as usize,
148 len: vmar.len as usize,
149 }),
150 _ => todo!("unimplemented"),
151 })
152 .collect();
153 }
154 fattribution::AttributionUpdate::Remove(identifier_ref) => {
155 let identifier = PrincipalIdentifier(identifier_ref);
156 children.remove(&identifier);
157 }
158 x @ _ => panic!("unimplemented {x:?}"),
159 }
160}
161
162async fn get_identifier_string(
163 description: fattribution::Description,
164 introspector: &fcomponent::IntrospectorProxy,
165) -> String {
166 match description {
167 fattribution::Description::Component(c) => introspector
168 .get_moniker(c)
169 .await
170 .expect("Inspector call failed")
171 .expect("Inspector::GetMoniker call failed"),
172 fattribution::Description::Part(sc) => sc.clone(),
173 fattribution::DescriptionUnknown!() => todo!(),
174 }
175}
176
177#[pin_project]
186struct StreamState {
187 identifier: PrincipalIdentifier,
189
190 name: String,
192
193 introspector: fcomponent::IntrospectorProxy,
195
196 node: Option<Principal>,
198
199 hanging_get_update: Option<BoxStream<'static, Vec<fattribution::AttributionUpdate>>>,
203
204 #[pin]
209 child_update: SelectAll<BoxStream<'static, Principal>>,
210}
211
212impl StreamState {
213 fn new(
214 identifier: PrincipalIdentifier,
215 name: String,
216 introspector: fcomponent::IntrospectorProxy,
217 attribution_provider: fattribution::ProviderProxy,
218 ) -> Self {
219 Self {
220 identifier,
221 name: name.clone(),
222 introspector,
223 node: None,
224 hanging_get_update: Some(Box::pin(hanging_get_stream(name, attribution_provider))),
225 child_update: SelectAll::new(),
226 }
227 }
228}
229
230fn hanging_get_stream(
231 name: String,
232 proxy: fattribution::ProviderProxy,
233) -> impl Stream<Item = Vec<fattribution::AttributionUpdate>> + 'static {
234 futures::stream::unfold(proxy, move |proxy| {
235 let name = name.clone();
236 proxy.get().map(move |get_result| {
237 let attributions = match get_result {
238 Ok(application_result) => application_result
239 .unwrap_or_else(|e| {
240 panic!("Failed call to AttributionResponse for {name}: {e:?}")
241 })
242 .attributions
243 .unwrap_or_else(|| panic!("Failed memory attribution for {name}")),
244 Err(fidl::Error::ClientChannelClosed {
245 status: zx::Status::PEER_CLOSED, ..
246 }) => {
247 return None;
253 }
254 Err(e) => {
255 panic!("Failed to get AttributionResponse for {name}: {e:?}");
256 }
257 };
258 Some((attributions, proxy))
259 })
260 })
261}
262
263enum Event {
264 Node(Vec<fattribution::AttributionUpdate>),
265 Child(Principal),
266}
267
268impl Stream for StreamState {
269 type Item = Event;
270
271 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
272 let this = self.get_mut();
273 match this.child_update.poll_next_unpin(cx) {
274 Poll::Ready(Some(node)) => {
275 return Poll::Ready(Some(Event::Child(node)));
276 }
277 Poll::Ready(None) => {}
278 Poll::Pending => {}
279 }
280 match this.hanging_get_update.as_mut() {
281 Some(hanging_get_update) => match hanging_get_update.poll_next_unpin(cx) {
282 Poll::Ready(Some(attributions)) => {
283 return Poll::Ready(Some(Event::Node(attributions)));
284 }
285 Poll::Ready(None) => {
286 this.hanging_get_update = None;
287 return Poll::Ready(None);
289 }
290 Poll::Pending => {}
291 },
292 None => {}
293 }
294 return Poll::Pending;
295 }
296}