1use futures::stream::{BoxStream, SelectAll};
6use futures::{FutureExt, Stream, StreamExt};
7use pin_project::pin_project;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_memory_attribution as fattribution};
12
13#[derive(Debug, Clone, Eq, PartialEq, Hash)]
14pub struct PrincipalIdentifier(pub u64);
15
16#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
17pub enum Resource {
18 KernelObject(zx::Koid),
19 Vmar { process: zx::Koid, base: usize, len: usize },
20}
21
22#[derive(Debug, Clone)]
24pub struct Principal {
25 pub identifier: PrincipalIdentifier,
27
28 pub name: String,
30
31 pub resources: Vec<Resource>,
33
34 pub children: Vec<Principal>,
36}
37
38impl Principal {
39 pub fn new(identifier: PrincipalIdentifier, name: String) -> Principal {
40 Principal { identifier, name, resources: vec![], children: vec![] }
41 }
42
43 pub fn child_by_name(&self, name: &str) -> Option<&Principal> {
44 self.children.iter().find(|c| c.name == name)
45 }
46}
47
48pub fn attribute_memory(
57 identifier: PrincipalIdentifier,
58 name: String,
59 attribution_provider: fattribution::ProviderProxy,
60 introspector: fcomponent::IntrospectorProxy,
61) -> BoxStream<'static, Principal> {
62 futures::stream::unfold(
63 StreamState::new(identifier, name, introspector, attribution_provider),
64 get_next,
65 )
66 .boxed()
67}
68
69async fn get_next(mut state: StreamState) -> Option<(Principal, StreamState)> {
71 let mut node = state
72 .node
73 .clone()
74 .unwrap_or_else(|| Principal::new(state.identifier.clone(), state.name.clone()));
75 let mut children: HashMap<PrincipalIdentifier, Principal> =
76 node.children.clone().into_iter().map(|n| (n.identifier.clone(), n)).collect();
77
78 match state.next().await {
80 Some(event) => {
81 match event {
82 Event::Node(attributions) => {
84 for attribution in attributions {
85 handle_update(attribution, &mut state, &mut children).await;
86 }
87 }
88 Event::Child(child) => {
90 children.insert(child.identifier.clone(), child);
91 }
92 }
93 }
94 None => return None,
95 }
96
97 node.children = children.into_values().collect();
98 state.node = Some(node.clone());
99 Some((node, state))
100}
101
102async fn handle_update(
103 attribution: fattribution::AttributionUpdate,
104 state: &mut StreamState,
105 children: &mut HashMap<PrincipalIdentifier, Principal>,
106) {
107 match attribution {
108 fattribution::AttributionUpdate::Add(new_principal) => {
109 let identifier_id = PrincipalIdentifier(new_principal.identifier.unwrap());
110 let principal_name =
111 get_identifier_string(new_principal.description.unwrap(), &state.introspector)
112 .await;
113
114 if let Some(client) = new_principal.detailed_attribution {
116 state.child_update.push(
117 attribute_memory(
118 identifier_id.clone(),
119 principal_name.clone(),
120 client.into_proxy(),
121 state.introspector.clone(),
122 )
123 .boxed(),
124 );
125 }
126 children.insert(identifier_id.clone(), Principal::new(identifier_id, principal_name));
127 }
128 fattribution::AttributionUpdate::Update(updated_principal) => {
129 let identifier = PrincipalIdentifier(updated_principal.identifier.unwrap());
130
131 let child = children.get_mut(&identifier).unwrap();
132 let raw_resources = match updated_principal.resources.unwrap() {
133 fattribution::Resources::Data(d) => d.resources,
134 fattribution::Resources::Buffer(b) => {
135 let mapping = mapped_vmo::ImmutableMapping::create_from_vmo(&b, false).unwrap();
136 let resource_vector: fattribution::Data = fidl::unpersist(&mapping).unwrap();
137 resource_vector.resources
138 }
139 fattribution::ResourcesUnknown!() => {
140 unimplemented!()
141 }
142 };
143 child.resources = raw_resources
144 .into_iter()
145 .filter_map(|r| match r {
146 fattribution::Resource::KernelObject(koid) => {
147 Some(Resource::KernelObject(zx::Koid::from_raw(koid)))
148 }
149 fattribution::Resource::ProcessMapped(vmar) => Some(Resource::Vmar {
150 process: zx::Koid::from_raw(vmar.process),
151 base: vmar.base as usize,
152 len: vmar.len as usize,
153 }),
154 _ => todo!("unimplemented"),
155 })
156 .collect();
157 }
158 fattribution::AttributionUpdate::Remove(identifier_ref) => {
159 let identifier = PrincipalIdentifier(identifier_ref);
160 children.remove(&identifier);
161 }
162 x @ _ => panic!("unimplemented {x:?}"),
163 }
164}
165
166async fn get_identifier_string(
167 description: fattribution::Description,
168 introspector: &fcomponent::IntrospectorProxy,
169) -> String {
170 match description {
171 fattribution::Description::Component(c) => introspector
172 .get_moniker(c)
173 .await
174 .expect("Inspector call failed")
175 .expect("Inspector::GetMoniker call failed"),
176 fattribution::Description::Part(sc) => sc.clone(),
177 fattribution::DescriptionUnknown!() => todo!(),
178 }
179}
180
181#[pin_project]
190struct StreamState {
191 identifier: PrincipalIdentifier,
193
194 name: String,
196
197 introspector: fcomponent::IntrospectorProxy,
199
200 node: Option<Principal>,
202
203 hanging_get_update: Option<BoxStream<'static, Vec<fattribution::AttributionUpdate>>>,
207
208 #[pin]
213 child_update: SelectAll<BoxStream<'static, Principal>>,
214}
215
216impl StreamState {
217 fn new(
218 identifier: PrincipalIdentifier,
219 name: String,
220 introspector: fcomponent::IntrospectorProxy,
221 attribution_provider: fattribution::ProviderProxy,
222 ) -> Self {
223 Self {
224 identifier,
225 name: name.clone(),
226 introspector,
227 node: None,
228 hanging_get_update: Some(Box::pin(hanging_get_stream(name, attribution_provider))),
229 child_update: SelectAll::new(),
230 }
231 }
232}
233
234fn hanging_get_stream(
235 name: String,
236 proxy: fattribution::ProviderProxy,
237) -> impl Stream<Item = Vec<fattribution::AttributionUpdate>> + 'static {
238 futures::stream::unfold(proxy, move |proxy| {
239 let name = name.clone();
240 proxy.get().map(move |get_result| {
241 let attributions = match get_result {
242 Ok(application_result) => application_result
243 .unwrap_or_else(|e| {
244 panic!("Failed call to AttributionResponse for {name}: {e:?}")
245 })
246 .attributions
247 .unwrap_or_else(|| panic!("Failed memory attribution for {name}")),
248 Err(fidl::Error::ClientChannelClosed {
249 status: zx::Status::PEER_CLOSED, ..
250 }) => {
251 return None;
257 }
258 Err(e) => {
259 panic!("Failed to get AttributionResponse for {name}: {e:?}");
260 }
261 };
262 Some((attributions, proxy))
263 })
264 })
265}
266
267enum Event {
268 Node(Vec<fattribution::AttributionUpdate>),
269 Child(Principal),
270}
271
272impl Stream for StreamState {
273 type Item = Event;
274
275 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276 let this = self.get_mut();
277 match this.child_update.poll_next_unpin(cx) {
278 Poll::Ready(Some(node)) => {
279 return Poll::Ready(Some(Event::Child(node)));
280 }
281 Poll::Ready(None) => {}
282 Poll::Pending => {}
283 }
284 match this.hanging_get_update.as_mut() {
285 Some(hanging_get_update) => match hanging_get_update.poll_next_unpin(cx) {
286 Poll::Ready(Some(attributions)) => {
287 return Poll::Ready(Some(Event::Node(attributions)));
288 }
289 Poll::Ready(None) => {
290 this.hanging_get_update = None;
291 return Poll::Ready(None);
293 }
294 Poll::Pending => {}
295 },
296 None => {}
297 }
298 return Poll::Pending;
299 }
300}