use futures::stream::{BoxStream, SelectAll};
use futures::{FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_memory_attribution as fattribution};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct PrincipalIdentifier(pub u64);
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub enum Resource {
KernelObject(zx::Koid),
Vmar { process: zx::Koid, base: usize, len: usize },
}
#[derive(Debug, Clone)]
pub struct Principal {
pub identifier: PrincipalIdentifier,
pub name: String,
pub resources: Vec<Resource>,
pub children: Vec<Principal>,
}
impl Principal {
pub fn new(identifier: PrincipalIdentifier, name: String) -> Principal {
Principal { identifier, name, resources: vec![], children: vec![] }
}
}
pub fn attribute_memory(
identifier: PrincipalIdentifier,
name: String,
attribution_provider: fattribution::ProviderProxy,
introspector: fcomponent::IntrospectorProxy,
) -> BoxStream<'static, Principal> {
futures::stream::unfold(
StreamState::new(identifier, name, introspector, attribution_provider),
get_next,
)
.boxed()
}
async fn get_next(mut state: StreamState) -> Option<(Principal, StreamState)> {
let mut node = state
.node
.clone()
.unwrap_or_else(|| Principal::new(state.identifier.clone(), state.name.clone()));
let mut children: HashMap<PrincipalIdentifier, Principal> =
node.children.clone().into_iter().map(|n| (n.identifier.clone(), n)).collect();
match state.next().await {
Some(event) => {
match event {
Event::Node(attributions) => {
for attribution in attributions {
handle_update(attribution, &mut state, &mut children).await;
}
}
Event::Child(child) => {
children.insert(child.identifier.clone(), child);
}
}
}
None => return None,
}
node.children = children.into_values().collect();
state.node = Some(node.clone());
Some((node, state))
}
async fn handle_update(
attribution: fattribution::AttributionUpdate,
state: &mut StreamState,
children: &mut HashMap<PrincipalIdentifier, Principal>,
) {
match attribution {
fattribution::AttributionUpdate::Add(new_principal) => {
let identifier_id = PrincipalIdentifier(new_principal.identifier.unwrap());
let principal_name =
get_identifier_string(new_principal.description.unwrap(), &state.introspector)
.await;
if let Some(client) = new_principal.detailed_attribution {
state.child_update.push(
attribute_memory(
identifier_id.clone(),
principal_name.clone(),
client.into_proxy(),
state.introspector.clone(),
)
.boxed(),
);
}
children.insert(identifier_id.clone(), Principal::new(identifier_id, principal_name));
}
fattribution::AttributionUpdate::Update(updated_principal) => {
let identifier = PrincipalIdentifier(updated_principal.identifier.unwrap());
let child = children.get_mut(&identifier).unwrap();
let raw_resources = match updated_principal.resources.unwrap() {
fattribution::Resources::Data(d) => d.resources,
fattribution::Resources::Buffer(b) => {
let mapping = mapped_vmo::ImmutableMapping::create_from_vmo(&b, false).unwrap();
let resource_vector: fattribution::Data = fidl::unpersist(&mapping).unwrap();
resource_vector.resources
}
fattribution::ResourcesUnknown!() => {
unimplemented!()
}
};
child.resources = raw_resources
.into_iter()
.filter_map(|r| match r {
fattribution::Resource::KernelObject(koid) => {
Some(Resource::KernelObject(zx::Koid::from_raw(koid)))
}
fattribution::Resource::ProcessMapped(vmar) => Some(Resource::Vmar {
process: zx::Koid::from_raw(vmar.process),
base: vmar.base as usize,
len: vmar.len as usize,
}),
_ => todo!("unimplemented"),
})
.collect();
}
fattribution::AttributionUpdate::Remove(identifier_ref) => {
let identifier = PrincipalIdentifier(identifier_ref);
children.remove(&identifier);
}
x @ _ => panic!("unimplemented {x:?}"),
}
}
async fn get_identifier_string(
description: fattribution::Description,
introspector: &fcomponent::IntrospectorProxy,
) -> String {
match description {
fattribution::Description::Component(c) => introspector
.get_moniker(c)
.await
.expect("Inspector call failed")
.expect("Inspector::GetMoniker call failed"),
fattribution::Description::Part(sc) => sc.clone(),
fattribution::DescriptionUnknown!() => todo!(),
}
}
#[pin_project]
struct StreamState {
identifier: PrincipalIdentifier,
name: String,
introspector: fcomponent::IntrospectorProxy,
node: Option<Principal>,
hanging_get_update: Option<BoxStream<'static, Vec<fattribution::AttributionUpdate>>>,
#[pin]
child_update: SelectAll<BoxStream<'static, Principal>>,
}
impl StreamState {
fn new(
identifier: PrincipalIdentifier,
name: String,
introspector: fcomponent::IntrospectorProxy,
attribution_provider: fattribution::ProviderProxy,
) -> Self {
Self {
identifier,
name: name.clone(),
introspector,
node: None,
hanging_get_update: Some(Box::pin(hanging_get_stream(name, attribution_provider))),
child_update: SelectAll::new(),
}
}
}
fn hanging_get_stream(
name: String,
proxy: fattribution::ProviderProxy,
) -> impl Stream<Item = Vec<fattribution::AttributionUpdate>> + 'static {
futures::stream::unfold(proxy, move |proxy| {
let name = name.clone();
proxy.get().map(move |get_result| {
let attributions = match get_result {
Ok(application_result) => application_result
.unwrap_or_else(|e| {
panic!("Failed call to AttributionResponse for {name}: {e:?}")
})
.attributions
.unwrap_or_else(|| panic!("Failed memory attribution for {name}")),
Err(fidl::Error::ClientChannelClosed {
status: zx::Status::PEER_CLOSED, ..
}) => {
return None;
}
Err(e) => {
panic!("Failed to get AttributionResponse for {name}: {e:?}");
}
};
Some((attributions, proxy))
})
})
}
enum Event {
Node(Vec<fattribution::AttributionUpdate>),
Child(Principal),
}
impl Stream for StreamState {
type Item = Event;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.child_update.poll_next_unpin(cx) {
Poll::Ready(Some(node)) => {
return Poll::Ready(Some(Event::Child(node)));
}
Poll::Ready(None) => {}
Poll::Pending => {}
}
match this.hanging_get_update.as_mut() {
Some(hanging_get_update) => match hanging_get_update.poll_next_unpin(cx) {
Poll::Ready(Some(attributions)) => {
return Poll::Ready(Some(Event::Node(attributions)));
}
Poll::Ready(None) => {
this.hanging_get_update = None;
return Poll::Ready(None);
}
Poll::Pending => {}
},
None => {}
}
return Poll::Pending;
}
}