
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
5use futures::stream::{BoxStream, SelectAll};
6use futures::{FutureExt, Stream, StreamExt};
7use pin_project::pin_project;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_memory_attribution as fattribution};
13#[derive(Debug, Clone, Eq, PartialEq, Hash)]
14pub struct PrincipalIdentifier(pub u64);
16#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
17pub enum Resource {
18    KernelObject(zx::Koid),
19    Vmar { process: zx::Koid, base: usize, len: usize },
22/// A simple tree breakdown of resource usage useful for tests.
23#[derive(Debug, Clone)]
24pub struct Principal {
25    /// Identifier of the principal.
26    pub identifier: PrincipalIdentifier,
28    /// Name of the principal.
29    pub name: String,
31    /// Resources used by this principal.
32    pub resources: Vec<Resource>,
34    /// Children of the principal.
35    pub children: Vec<Principal>,
38impl Principal {
39    pub fn new(identifier: PrincipalIdentifier, name: String) -> Principal {
40        Principal { identifier, name, resources: vec![], children: vec![] }
41    }
44/// Obtain which resources are used for various activities by an attribution provider.
46/// If one of the children under the attribution provider has detailed attribution
47/// information, this function will recursively visit those children and build a
48/// tree of nodes.
50/// Returns a stream of tree that are momentary snapshots of the memory state.
51/// The tree will evolve over time as principals are added and removed.
52pub fn attribute_memory(
53    identifier: PrincipalIdentifier,
54    name: String,
55    attribution_provider: fattribution::ProviderProxy,
56    introspector: fcomponent::IntrospectorProxy,
57) -> BoxStream<'static, Principal> {
58    futures::stream::unfold(
59        StreamState::new(identifier, name, introspector, attribution_provider),
60        get_next,
61    )
62    .boxed()
65/// Wait for the next hanging-get message and recompute the tree.
66async fn get_next(mut state: StreamState) -> Option<(Principal, StreamState)> {
67    let mut node = state
68        .node
69        .clone()
70        .unwrap_or_else(|| Principal::new(state.identifier.clone(),;
71    let mut children: HashMap<PrincipalIdentifier, Principal> =
72        node.children.clone().into_iter().map(|n| (n.identifier.clone(), n)).collect();
74    // Wait for new attribution information.
75    match {
76        Some(event) => {
77            match event {
78                // New attribution information for this principal.
79                Event::Node(attributions) => {
80                    for attribution in attributions {
81                        handle_update(attribution, &mut state, &mut children).await;
82                    }
83                }
84                // New attribution information for a child principal.
85                Event::Child(child) => {
86                    children.insert(child.identifier.clone(), child);
87                }
88            }
89        }
90        None => return None,
91    }
93    node.children = children.into_values().collect();
94    state.node = Some(node.clone());
95    Some((node, state))
98async fn handle_update(
99    attribution: fattribution::AttributionUpdate,
100    state: &mut StreamState,
101    children: &mut HashMap<PrincipalIdentifier, Principal>,
102) {
103    match attribution {
104        fattribution::AttributionUpdate::Add(new_principal) => {
105            let identifier_id = PrincipalIdentifier(new_principal.identifier.unwrap());
106            let principal_name =
107                get_identifier_string(new_principal.description.unwrap(), &state.introspector)
108                    .await;
110            // Recursively attribute memory in this child principal if applicable.
111            if let Some(client) = new_principal.detailed_attribution {
112                state.child_update.push(
113                    attribute_memory(
114                        identifier_id.clone(),
115                        principal_name.clone(),
116                        client.into_proxy(),
117                        state.introspector.clone(),
118                    )
119                    .boxed(),
120                );
121            }
122            children.insert(identifier_id.clone(), Principal::new(identifier_id, principal_name));
123        }
124        fattribution::AttributionUpdate::Update(updated_principal) => {
125            let identifier = PrincipalIdentifier(updated_principal.identifier.unwrap());
127            let child = children.get_mut(&identifier).unwrap();
128            let raw_resources = match updated_principal.resources.unwrap() {
129                fattribution::Resources::Data(d) => d.resources,
130                fattribution::Resources::Buffer(b) => {
131                    let mapping = mapped_vmo::ImmutableMapping::create_from_vmo(&b, false).unwrap();
132                    let resource_vector: fattribution::Data = fidl::unpersist(&mapping).unwrap();
133                    resource_vector.resources
134                }
135                fattribution::ResourcesUnknown!() => {
136                    unimplemented!()
137                }
138            };
139            child.resources = raw_resources
140                .into_iter()
141                .filter_map(|r| match r {
142                    fattribution::Resource::KernelObject(koid) => {
143                        Some(Resource::KernelObject(zx::Koid::from_raw(koid)))
144                    }
145                    fattribution::Resource::ProcessMapped(vmar) => Some(Resource::Vmar {
146                        process: zx::Koid::from_raw(vmar.process),
147                        base: vmar.base as usize,
148                        len: vmar.len as usize,
149                    }),
150                    _ => todo!("unimplemented"),
151                })
152                .collect();
153        }
154        fattribution::AttributionUpdate::Remove(identifier_ref) => {
155            let identifier = PrincipalIdentifier(identifier_ref);
156            children.remove(&identifier);
157        }
158        x @ _ => panic!("unimplemented {x:?}"),
159    }
162async fn get_identifier_string(
163    description: fattribution::Description,
164    introspector: &fcomponent::IntrospectorProxy,
165) -> String {
166    match description {
167        fattribution::Description::Component(c) => introspector
168            .get_moniker(c)
169            .await
170            .expect("Inspector call failed")
171            .expect("Inspector::GetMoniker call failed"),
172        fattribution::Description::Part(sc) => sc.clone(),
173        fattribution::DescriptionUnknown!() => todo!(),
174    }
177/// [`StreamState`] holds attribution information for a given tree of principals
178/// rooted at the one identified by `name`.
180/// It implements a [`Stream`] and will yield the next update to the tree when
181/// any of the hanging-gets from principals in this tree returns.
183/// [`get_next`] will poll this stream to process the update, such as adding a new
184/// child principal.
186struct StreamState {
187    /// The identifier of the principal at the root of the tree.
188    identifier: PrincipalIdentifier,
190    /// The name of the principal at the root of the tree.
191    name: String,
193    /// A capability used to unseal component instance tokens back to monikers.
194    introspector: fcomponent::IntrospectorProxy,
196    /// The tree of principals rooted at `node`.
197    node: Option<Principal>,
199    /// A stream of `AttributionUpdate` events for the current principal.
200    ///
201    /// If the stream finished, it will be set to `None`.
202    hanging_get_update: Option<BoxStream<'static, Vec<fattribution::AttributionUpdate>>>,
204    /// A stream of child principal updates. Each `Principal` element should
205    /// replace the existing child principal if there already is a child with
206    /// the same name. [`SelectAll`] is used to merge the updates from all children
207    /// into a single stream.
208    #[pin]
209    child_update: SelectAll<BoxStream<'static, Principal>>,
212impl StreamState {
213    fn new(
214        identifier: PrincipalIdentifier,
215        name: String,
216        introspector: fcomponent::IntrospectorProxy,
217        attribution_provider: fattribution::ProviderProxy,
218    ) -> Self {
219        Self {
220            identifier,
221            name: name.clone(),
222            introspector,
223            node: None,
224            hanging_get_update: Some(Box::pin(hanging_get_stream(name, attribution_provider))),
225            child_update: SelectAll::new(),
226        }
227    }
230fn hanging_get_stream(
231    name: String,
232    proxy: fattribution::ProviderProxy,
233) -> impl Stream<Item = Vec<fattribution::AttributionUpdate>> + 'static {
234    futures::stream::unfold(proxy, move |proxy| {
235        let name = name.clone();
236        proxy.get().map(move |get_result| {
237            let attributions = match get_result {
238                Ok(application_result) => application_result
239                    .unwrap_or_else(|e| {
240                        panic!("Failed call to AttributionResponse for {name}: {e:?}")
241                    })
242                    .attributions
243                    .unwrap_or_else(|| panic!("Failed memory attribution for {name}")),
244                Err(fidl::Error::ClientChannelClosed {
245                    status: zx::Status::PEER_CLOSED, ..
246                }) => {
247                    // If the hanging-get failed due to peer closed, consider there are no more
248                    // updates to this principal. The closing of this hanging-get races with the
249                    // parent principal notifying with the `AttributionUpdate::Remove` message, so
250                    // it is possible to observe a peer-closed here first and then get a
251                    // `AttributionUpdate::Remove` for this principal.
252                    return None;
253                }
254                Err(e) => {
255                    panic!("Failed to get AttributionResponse for {name}: {e:?}");
256                }
257            };
258            Some((attributions, proxy))
259        })
260    })
263enum Event {
264    Node(Vec<fattribution::AttributionUpdate>),
265    Child(Principal),
268impl Stream for StreamState {
269    type Item = Event;
271    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
272        let this = self.get_mut();
273        match this.child_update.poll_next_unpin(cx) {
274            Poll::Ready(Some(node)) => {
275                return Poll::Ready(Some(Event::Child(node)));
276            }
277            Poll::Ready(None) => {}
278            Poll::Pending => {}
279        }
280        match this.hanging_get_update.as_mut() {
281            Some(hanging_get_update) => match hanging_get_update.poll_next_unpin(cx) {
282                Poll::Ready(Some(attributions)) => {
283                    return Poll::Ready(Some(Event::Node(attributions)));
284                }
285                Poll::Ready(None) => {
286                    this.hanging_get_update = None;
287                    // Return None to signal that this Principal is done.
288                    return Poll::Ready(None);
289                }
290                Poll::Pending => {}
291            },
292            None => {}
293        }
294        return Poll::Pending;
295    }