use fidl::endpoints::ControlHandle;
use fidl::Error::ClientChannelClosed;
use fidl_fuchsia_memory_attribution as fattribution;
use fuchsia_sync::Mutex;
use measure_tape_for_attribution::Measurable;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tracing::error;
mod key {
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct Key(u64);
pub struct KeyGenerator {
next: Key,
}
impl Default for KeyGenerator {
fn default() -> Self {
Self { next: Key(0) }
}
}
impl KeyGenerator {
pub fn next(&mut self) -> Key {
let next_key = self.next;
self.next = Key(self.next.0.checked_add(1).expect("Key generator overflow"));
next_key
}
}
}
type GetAttributionFn = dyn Fn() -> Vec<fattribution::AttributionUpdate> + Send;
#[derive(Error, Debug)]
pub enum AttributionServerObservationError {
#[error("multiple pending observations for the same Observer")]
GetUpdateAlreadyPending,
}
#[derive(Clone, PartialEq, Eq, Hash)]
struct PrincipalIdentifier(u64);
#[derive(Clone)]
pub struct AttributionServerHandle {
inner: Arc<Mutex<AttributionServer>>,
}
impl AttributionServerHandle {
pub fn new_observer(&self, control_handle: fattribution::ProviderControlHandle) -> Observer {
AttributionServer::register(&self.inner, control_handle)
}
pub fn new_publisher(&self) -> Publisher {
Publisher { inner: self.inner.clone() }
}
}
pub struct Observer {
inner: Arc<Mutex<AttributionServer>>,
subscription_id: key::Key,
}
impl Observer {
pub fn next(&self, responder: fattribution::ProviderGetResponder) {
self.inner.lock().next(responder)
}
}
impl Drop for Observer {
fn drop(&mut self) {
self.inner.lock().unregister(self.subscription_id);
}
}
pub struct Publisher {
inner: Arc<Mutex<AttributionServer>>,
}
impl Publisher {
pub fn on_update(&self, updates: Vec<fattribution::AttributionUpdate>) {
self.inner.lock().on_update(updates)
}
}
pub struct AttributionServer {
state: Box<GetAttributionFn>,
consumer: Option<AttributionConsumer>,
key_generator: key::KeyGenerator,
}
impl AttributionServer {
pub fn new(state: Box<GetAttributionFn>) -> AttributionServerHandle {
AttributionServerHandle {
inner: Arc::new(Mutex::new(AttributionServer {
state,
consumer: None,
key_generator: Default::default(),
})),
}
}
pub fn on_update(&mut self, updates: Vec<fattribution::AttributionUpdate>) {
if let Some(consumer) = &mut self.consumer {
return consumer.update_and_notify(updates);
}
}
pub fn next(&mut self, responder: fattribution::ProviderGetResponder) {
let entry = self.consumer.as_mut().unwrap();
entry.get_update(responder, self.state.as_ref());
}
pub fn register(
inner: &Arc<Mutex<Self>>,
control_handle: fattribution::ProviderControlHandle,
) -> Observer {
let mut locked_inner = inner.lock();
if locked_inner.consumer.is_some() {
tracing::warn!("Multiple connection requests to AttributionProvider");
}
let key = locked_inner.key_generator.next();
locked_inner.consumer = Some(AttributionConsumer::new(control_handle, key.clone()));
Observer { inner: inner.clone(), subscription_id: key }
}
pub fn unregister(&mut self, key: key::Key) {
if let Some(consumer) = &self.consumer {
if consumer.subscription_id == key {
self.consumer = None;
}
}
}
}
#[derive(Default)]
struct CoalescedUpdate {
add: Option<fattribution::AttributionUpdate>,
update: Option<fattribution::AttributionUpdate>,
remove: Option<fattribution::AttributionUpdate>,
}
#[derive(PartialEq)]
enum ShouldKeepUpdate {
KEEP,
DISCARD,
}
impl CoalescedUpdate {
pub fn update(&mut self, u: fattribution::AttributionUpdate) -> ShouldKeepUpdate {
match u {
fattribution::AttributionUpdate::Add(u) => {
self.add = Some(fattribution::AttributionUpdate::Add(u));
self.update = None;
self.remove = None;
}
fattribution::AttributionUpdate::Update(u) => {
self.update = Some(fattribution::AttributionUpdate::Update(u));
}
fattribution::AttributionUpdate::Remove(u) => {
if self.add.is_some() {
return ShouldKeepUpdate::DISCARD;
}
self.remove = Some(fattribution::AttributionUpdate::Remove(u));
}
fattribution::AttributionUpdateUnknown!() => {
error!("Unknown attribution update type");
}
};
ShouldKeepUpdate::KEEP
}
pub fn get_updates(self) -> Vec<fattribution::AttributionUpdate> {
let mut result = Vec::new();
if let Some(u) = self.add {
result.push(u);
}
if let Some(u) = self.update {
result.push(u);
}
if let Some(u) = self.remove {
result.push(u);
}
result
}
pub fn size(&self) -> (usize, usize) {
let (mut bytes, mut handles) = (0, 0);
if let Some(u) = &self.add {
let m = u.measure();
bytes += m.num_bytes;
handles += m.num_handles;
}
if let Some(u) = &self.update {
let m = u.measure();
bytes += m.num_bytes;
handles += m.num_handles;
}
if let Some(u) = &self.remove {
let m = u.measure();
bytes += m.num_bytes;
handles += m.num_handles;
}
(bytes, handles)
}
}
struct AttributionConsumer {
first: bool,
pending: HashMap<PrincipalIdentifier, CoalescedUpdate>,
observer_control_handle: fattribution::ProviderControlHandle,
responder: Option<fattribution::ProviderGetResponder>,
subscription_id: key::Key,
}
impl Drop for AttributionConsumer {
fn drop(&mut self) {
self.observer_control_handle.shutdown_with_epitaph(zx::Status::CANCELED);
}
}
impl AttributionConsumer {
pub fn new(
observer_control_handle: fattribution::ProviderControlHandle,
key: key::Key,
) -> Self {
AttributionConsumer {
first: true,
pending: HashMap::new(),
observer_control_handle: observer_control_handle,
responder: None,
subscription_id: key,
}
}
pub fn get_update(
&mut self,
responder: fattribution::ProviderGetResponder,
gen_state: &GetAttributionFn,
) {
if self.responder.is_some() {
self.observer_control_handle.shutdown_with_epitaph(zx::Status::BAD_STATE);
return;
}
if self.first {
self.first = false;
self.pending.clear();
self.responder = Some(responder);
self.update_and_notify(gen_state());
return;
}
self.responder = Some(responder);
self.maybe_notify();
}
pub fn update_and_notify(&mut self, updated_state: Vec<fattribution::AttributionUpdate>) {
for update in updated_state {
let principal: PrincipalIdentifier = match &update {
fattribution::AttributionUpdate::Add(added_attribution) => {
PrincipalIdentifier(added_attribution.identifier.unwrap())
}
fattribution::AttributionUpdate::Update(update_attribution) => {
PrincipalIdentifier(update_attribution.identifier.unwrap())
}
fattribution::AttributionUpdate::Remove(remove_attribution) => {
PrincipalIdentifier(*remove_attribution)
}
&fattribution::AttributionUpdateUnknown!() => {
unimplemented!()
}
};
if self.pending.entry(principal.clone()).or_insert(Default::default()).update(update)
== ShouldKeepUpdate::DISCARD
{
self.pending.remove(&principal);
}
}
self.maybe_notify();
}
fn maybe_notify(&mut self) {
if self.pending.is_empty() {
return;
}
match self.responder.take() {
Some(observer) => {
let mut iterator = self.pending.drain().peekable();
let mut current_size: usize = 32;
let mut current_handles: usize = 0;
let mut update = Vec::new();
while let Some((_, next)) = iterator.peek() {
let (update_size, update_handles) = next.size();
if current_size + update_size > zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
if current_handles + update_handles
> zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize
{
break;
}
current_size += update_size;
current_handles += update_handles;
update.extend(iterator.next().unwrap().1.get_updates().into_iter());
}
self.pending = iterator.collect();
Self::send_update(update, observer)
}
None => {}
}
}
fn send_update(
state: Vec<fattribution::AttributionUpdate>,
responder: fattribution::ProviderGetResponder,
) {
match responder.send(Ok(fattribution::ProviderGetResponse {
attributions: Some(state),
..Default::default()
})) {
Ok(()) => {} Err(e) => {
if let ClientChannelClosed { status: zx::Status::PEER_CLOSED, .. } = e {
return;
}
error!("Failed to send memory state to observer: {}", e);
}
}
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use super::*;
use fidl::endpoints::RequestStream;
use fuchsia_async as fasync;
use futures::TryStreamExt;
#[test]
fn test_attribute_memory() {
let mut exec = fasync::TestExecutor::new();
let server = AttributionServer::new(Box::new(|| {
let new_principal = fattribution::NewPrincipal {
identifier: Some(0),
description: Some(fattribution::Description::Part("part".to_owned())),
principal_type: Some(fattribution::PrincipalType::Runnable),
detailed_attribution: None,
__source_breaking: fidl::marker::SourceBreaking,
};
vec![fattribution::AttributionUpdate::Add(new_principal)]
}));
let (snapshot_provider, snapshot_request_stream) =
fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
let observer = server.new_observer(snapshot_request_stream.control_handle());
fasync::Task::spawn(async move {
serve(observer, snapshot_request_stream).await.unwrap();
})
.detach();
let attributions =
exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
assert!(attributions.is_some());
let attributions_vec = attributions.unwrap();
assert_eq!(attributions_vec.len(), 1);
let new_attrib = attributions_vec.get(0).unwrap();
let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
panic!("Not a new principal");
};
assert_eq!(added_principal.identifier, Some(0));
assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
)]);
let attributions =
exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
assert!(attributions.is_some());
let attributions_vec = attributions.unwrap();
assert_eq!(attributions_vec.len(), 1);
let updated_attrib = attributions_vec.get(0).unwrap();
let fattribution::AttributionUpdate::Update(updated_principal) = updated_attrib else {
panic!("Not an updated principal");
};
assert_eq!(updated_principal.identifier, Some(0));
}
pub async fn serve(
observer: Observer,
mut stream: fattribution::ProviderRequestStream,
) -> Result<(), fidl::Error> {
while let Some(request) = stream.try_next().await? {
match request {
fattribution::ProviderRequest::Get { responder } => {
observer.next(responder);
}
fattribution::ProviderRequest::_UnknownMethod { .. } => {
assert!(false);
}
}
}
Ok(())
}
#[test]
fn test_disconnect_on_new_connection() {
let mut exec = fasync::TestExecutor::new();
let server = AttributionServer::new(Box::new(|| {
vec![fattribution::AttributionUpdate::Add(fattribution::NewPrincipal {
identifier: Some(1),
description: Some(fattribution::Description::Part("part1".to_owned())),
principal_type: Some(fattribution::PrincipalType::Runnable),
detailed_attribution: None,
..Default::default()
})]
}));
let (snapshot_provider, snapshot_request_stream) =
fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
let observer = server.new_observer(snapshot_request_stream.control_handle());
let (new_snapshot_provider, new_snapshot_request_stream) =
fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
let new_observer = server.new_observer(new_snapshot_request_stream.control_handle());
fasync::Task::spawn(async move {
serve(new_observer, new_snapshot_request_stream).await.unwrap();
})
.detach();
drop(observer);
let result = exec.run_singlethreaded(snapshot_provider.get());
assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::CANCELED, .. }));
let result = exec.run_singlethreaded(new_snapshot_provider.get());
assert!(result.is_ok());
server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Add(
fattribution::NewPrincipal {
identifier: Some(2),
description: Some(fattribution::Description::Part("part2".to_owned())),
principal_type: Some(fattribution::PrincipalType::Runnable),
detailed_attribution: None,
..Default::default()
},
)]);
let result = exec.run_singlethreaded(new_snapshot_provider.get());
assert!(result.is_ok());
}
#[test]
fn test_disconnect_on_two_pending_gets() {
let mut exec = fasync::TestExecutor::new();
let server = AttributionServer::new(Box::new(|| {
let new_principal = fattribution::NewPrincipal {
identifier: Some(0),
principal_type: Some(fattribution::PrincipalType::Runnable),
..Default::default()
};
vec![fattribution::AttributionUpdate::Add(new_principal)]
}));
let (snapshot_provider, snapshot_request_stream) =
fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
let observer = server.new_observer(snapshot_request_stream.control_handle());
fasync::Task::spawn(async move {
serve(observer, snapshot_request_stream).await.unwrap();
})
.detach();
exec.run_singlethreaded(snapshot_provider.get())
.expect("Connection dropped")
.expect("Get call failed");
let mut future = snapshot_provider.get();
let _ = exec.run_until_stalled(&mut future);
let result = exec.run_singlethreaded(snapshot_provider.get());
let result2 = exec.run_singlethreaded(future);
assert_matches!(result2, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
}
#[test]
fn test_no_update_on_first_call() {
let mut exec = fasync::TestExecutor::new();
let server = AttributionServer::new(Box::new(|| {
let new_principal = fattribution::NewPrincipal {
identifier: Some(0),
principal_type: Some(fattribution::PrincipalType::Runnable),
..Default::default()
};
vec![fattribution::AttributionUpdate::Add(new_principal)]
}));
let (snapshot_provider, snapshot_request_stream) =
fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
let observer = server.new_observer(snapshot_request_stream.control_handle());
fasync::Task::spawn(async move {
serve(observer, snapshot_request_stream).await.unwrap();
})
.detach();
server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
)]);
let attributions =
exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
assert!(attributions.is_some());
let attributions_vec = attributions.unwrap();
assert_eq!(attributions_vec.len(), 1);
let new_attrib = attributions_vec.get(0).unwrap();
let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
panic!("Not a new principal");
};
assert_eq!(added_principal.identifier, Some(0));
assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
}
}