attribution_testing/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::stream::{BoxStream, SelectAll};
6use futures::{FutureExt, Stream, StreamExt};
7use pin_project::pin_project;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_memory_attribution as fattribution};
12
13#[derive(Debug, Clone, Eq, PartialEq, Hash)]
14pub struct PrincipalIdentifier(pub u64);
15
16#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
17pub enum Resource {
18    KernelObject(zx::Koid),
19    Vmar { process: zx::Koid, base: usize, len: usize },
20}
21
22/// A simple tree breakdown of resource usage useful for tests.
23#[derive(Debug, Clone)]
24pub struct Principal {
25    /// Identifier of the principal.
26    pub identifier: PrincipalIdentifier,
27
28    /// Name of the principal.
29    pub name: String,
30
31    /// Resources used by this principal.
32    pub resources: Vec<Resource>,
33
34    /// Children of the principal.
35    pub children: Vec<Principal>,
36}
37
38impl Principal {
39    pub fn new(identifier: PrincipalIdentifier, name: String) -> Principal {
40        Principal { identifier, name, resources: vec![], children: vec![] }
41    }
42
43    pub fn child_by_name(&self, name: &str) -> Option<&Principal> {
44        self.children.iter().find(|c| c.name == name)
45    }
46}
47
48/// Obtain which resources are used for various activities by an attribution provider.
49///
50/// If one of the children under the attribution provider has detailed attribution
51/// information, this function will recursively visit those children and build a
52/// tree of nodes.
53///
54/// Returns a stream of tree that are momentary snapshots of the memory state.
55/// The tree will evolve over time as principals are added and removed.
56pub fn attribute_memory(
57    identifier: PrincipalIdentifier,
58    name: String,
59    attribution_provider: fattribution::ProviderProxy,
60    introspector: fcomponent::IntrospectorProxy,
61) -> BoxStream<'static, Principal> {
62    futures::stream::unfold(
63        StreamState::new(identifier, name, introspector, attribution_provider),
64        get_next,
65    )
66    .boxed()
67}
68
69/// Wait for the next hanging-get message and recompute the tree.
70async fn get_next(mut state: StreamState) -> Option<(Principal, StreamState)> {
71    let mut node = state
72        .node
73        .clone()
74        .unwrap_or_else(|| Principal::new(state.identifier.clone(), state.name.clone()));
75    let mut children: HashMap<PrincipalIdentifier, Principal> =
76        node.children.clone().into_iter().map(|n| (n.identifier.clone(), n)).collect();
77
78    // Wait for new attribution information.
79    match state.next().await {
80        Some(event) => {
81            match event {
82                // New attribution information for this principal.
83                Event::Node(attributions) => {
84                    for attribution in attributions {
85                        handle_update(attribution, &mut state, &mut children).await;
86                    }
87                }
88                // New attribution information for a child principal.
89                Event::Child(child) => {
90                    children.insert(child.identifier.clone(), child);
91                }
92            }
93        }
94        None => return None,
95    }
96
97    node.children = children.into_values().collect();
98    state.node = Some(node.clone());
99    Some((node, state))
100}
101
102async fn handle_update(
103    attribution: fattribution::AttributionUpdate,
104    state: &mut StreamState,
105    children: &mut HashMap<PrincipalIdentifier, Principal>,
106) {
107    match attribution {
108        fattribution::AttributionUpdate::Add(new_principal) => {
109            let identifier_id = PrincipalIdentifier(new_principal.identifier.unwrap());
110            let principal_name =
111                get_identifier_string(new_principal.description.unwrap(), &state.introspector)
112                    .await;
113
114            // Recursively attribute memory in this child principal if applicable.
115            if let Some(client) = new_principal.detailed_attribution {
116                state.child_update.push(
117                    attribute_memory(
118                        identifier_id.clone(),
119                        principal_name.clone(),
120                        client.into_proxy(),
121                        state.introspector.clone(),
122                    )
123                    .boxed(),
124                );
125            }
126            children.insert(identifier_id.clone(), Principal::new(identifier_id, principal_name));
127        }
128        fattribution::AttributionUpdate::Update(updated_principal) => {
129            let identifier = PrincipalIdentifier(updated_principal.identifier.unwrap());
130
131            let child = children.get_mut(&identifier).unwrap();
132            let raw_resources = match updated_principal.resources.unwrap() {
133                fattribution::Resources::Data(d) => d.resources,
134                fattribution::Resources::Buffer(b) => {
135                    let mapping = mapped_vmo::ImmutableMapping::create_from_vmo(&b, false).unwrap();
136                    let resource_vector: fattribution::Data = fidl::unpersist(&mapping).unwrap();
137                    resource_vector.resources
138                }
139                fattribution::ResourcesUnknown!() => {
140                    unimplemented!()
141                }
142            };
143            child.resources = raw_resources
144                .into_iter()
145                .filter_map(|r| match r {
146                    fattribution::Resource::KernelObject(koid) => {
147                        Some(Resource::KernelObject(zx::Koid::from_raw(koid)))
148                    }
149                    fattribution::Resource::ProcessMapped(vmar) => Some(Resource::Vmar {
150                        process: zx::Koid::from_raw(vmar.process),
151                        base: vmar.base as usize,
152                        len: vmar.len as usize,
153                    }),
154                    _ => todo!("unimplemented"),
155                })
156                .collect();
157        }
158        fattribution::AttributionUpdate::Remove(identifier_ref) => {
159            let identifier = PrincipalIdentifier(identifier_ref);
160            children.remove(&identifier);
161        }
162        x @ _ => panic!("unimplemented {x:?}"),
163    }
164}
165
166async fn get_identifier_string(
167    description: fattribution::Description,
168    introspector: &fcomponent::IntrospectorProxy,
169) -> String {
170    match description {
171        fattribution::Description::Component(c) => introspector
172            .get_moniker(c)
173            .await
174            .expect("Inspector call failed")
175            .expect("Inspector::GetMoniker call failed"),
176        fattribution::Description::Part(sc) => sc.clone(),
177        fattribution::DescriptionUnknown!() => todo!(),
178    }
179}
180
181/// [`StreamState`] holds attribution information for a given tree of principals
182/// rooted at the one identified by `name`.
183///
184/// It implements a [`Stream`] and will yield the next update to the tree when
185/// any of the hanging-gets from principals in this tree returns.
186///
187/// [`get_next`] will poll this stream to process the update, such as adding a new
188/// child principal.
189#[pin_project]
190struct StreamState {
191    /// The identifier of the principal at the root of the tree.
192    identifier: PrincipalIdentifier,
193
194    /// The name of the principal at the root of the tree.
195    name: String,
196
197    /// A capability used to unseal component instance tokens back to monikers.
198    introspector: fcomponent::IntrospectorProxy,
199
200    /// The tree of principals rooted at `node`.
201    node: Option<Principal>,
202
203    /// A stream of `AttributionUpdate` events for the current principal.
204    ///
205    /// If the stream finished, it will be set to `None`.
206    hanging_get_update: Option<BoxStream<'static, Vec<fattribution::AttributionUpdate>>>,
207
208    /// A stream of child principal updates. Each `Principal` element should
209    /// replace the existing child principal if there already is a child with
210    /// the same name. [`SelectAll`] is used to merge the updates from all children
211    /// into a single stream.
212    #[pin]
213    child_update: SelectAll<BoxStream<'static, Principal>>,
214}
215
216impl StreamState {
217    fn new(
218        identifier: PrincipalIdentifier,
219        name: String,
220        introspector: fcomponent::IntrospectorProxy,
221        attribution_provider: fattribution::ProviderProxy,
222    ) -> Self {
223        Self {
224            identifier,
225            name: name.clone(),
226            introspector,
227            node: None,
228            hanging_get_update: Some(Box::pin(hanging_get_stream(name, attribution_provider))),
229            child_update: SelectAll::new(),
230        }
231    }
232}
233
234fn hanging_get_stream(
235    name: String,
236    proxy: fattribution::ProviderProxy,
237) -> impl Stream<Item = Vec<fattribution::AttributionUpdate>> + 'static {
238    futures::stream::unfold(proxy, move |proxy| {
239        let name = name.clone();
240        proxy.get().map(move |get_result| {
241            let attributions = match get_result {
242                Ok(application_result) => application_result
243                    .unwrap_or_else(|e| {
244                        panic!("Failed call to AttributionResponse for {name}: {e:?}")
245                    })
246                    .attributions
247                    .unwrap_or_else(|| panic!("Failed memory attribution for {name}")),
248                Err(fidl::Error::ClientChannelClosed {
249                    status: zx::Status::PEER_CLOSED, ..
250                }) => {
251                    // If the hanging-get failed due to peer closed, consider there are no more
252                    // updates to this principal. The closing of this hanging-get races with the
253                    // parent principal notifying with the `AttributionUpdate::Remove` message, so
254                    // it is possible to observe a peer-closed here first and then get a
255                    // `AttributionUpdate::Remove` for this principal.
256                    return None;
257                }
258                Err(e) => {
259                    panic!("Failed to get AttributionResponse for {name}: {e:?}");
260                }
261            };
262            Some((attributions, proxy))
263        })
264    })
265}
266
267enum Event {
268    Node(Vec<fattribution::AttributionUpdate>),
269    Child(Principal),
270}
271
272impl Stream for StreamState {
273    type Item = Event;
274
275    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276        let this = self.get_mut();
277        match this.child_update.poll_next_unpin(cx) {
278            Poll::Ready(Some(node)) => {
279                return Poll::Ready(Some(Event::Child(node)));
280            }
281            Poll::Ready(None) => {}
282            Poll::Pending => {}
283        }
284        match this.hanging_get_update.as_mut() {
285            Some(hanging_get_update) => match hanging_get_update.poll_next_unpin(cx) {
286                Poll::Ready(Some(attributions)) => {
287                    return Poll::Ready(Some(Event::Node(attributions)));
288                }
289                Poll::Ready(None) => {
290                    this.hanging_get_update = None;
291                    // Return None to signal that this Principal is done.
292                    return Poll::Ready(None);
293                }
294                Poll::Pending => {}
295            },
296            None => {}
297        }
298        return Poll::Pending;
299    }
300}