1use fidl::endpoints::ControlHandle;
6use fidl::Error::ClientChannelClosed;
7use fidl_fuchsia_memory_attribution as fattribution;
8use fuchsia_sync::Mutex;
9use log::error;
10use measure_tape_for_attribution::Measurable;
11use std::collections::HashMap;
12use std::sync::Arc;
13use thiserror::Error;
14
15mod key {
16 #[derive(PartialEq, Eq, Clone, Copy)]
18 pub struct Key(u64);
19
20 pub struct KeyGenerator {
22 next: Key,
23 }
24
25 impl Default for KeyGenerator {
26 fn default() -> Self {
27 Self { next: Key(0) }
28 }
29 }
30
31 impl KeyGenerator {
32 pub fn next(&mut self) -> Key {
34 let next_key = self.next;
35 self.next = Key(self.next.0.checked_add(1).expect("Key generator overflow"));
36 next_key
37 }
38 }
39}
40
41type GetAttributionFn = dyn Fn() -> Vec<fattribution::AttributionUpdate> + Send;
44
45#[derive(Error, Debug)]
47pub enum AttributionServerObservationError {
48 #[error("multiple pending observations for the same Observer")]
49 GetUpdateAlreadyPending,
50}
51
52#[derive(Clone, PartialEq, Eq, Hash)]
53struct PrincipalIdentifier(u64);
54
55#[derive(Clone)]
64pub struct AttributionServerHandle {
65 inner: Arc<Mutex<AttributionServer>>,
66}
67
68impl AttributionServerHandle {
69 pub fn new_observer(&self, control_handle: fattribution::ProviderControlHandle) -> Observer {
73 AttributionServer::register(&self.inner, control_handle)
74 }
75
76 pub fn new_publisher(&self) -> Publisher {
78 Publisher { inner: self.inner.clone() }
79 }
80}
81
82pub struct Observer {
86 inner: Arc<Mutex<AttributionServer>>,
87 subscription_id: key::Key,
88}
89
90impl Observer {
91 pub fn next(&self, responder: fattribution::ProviderGetResponder) {
99 self.inner.lock().next(responder)
100 }
101}
102
103impl Drop for Observer {
104 fn drop(&mut self) {
105 self.inner.lock().unregister(self.subscription_id);
106 }
107}
108
109pub struct Publisher {
111 inner: Arc<Mutex<AttributionServer>>,
112}
113
114impl Publisher {
115 pub fn on_update(&self, updates: Vec<fattribution::AttributionUpdate>) {
119 self.inner.lock().on_update(updates)
122 }
123}
124
125pub struct AttributionServer {
126 state: Box<GetAttributionFn>,
127 consumer: Option<AttributionConsumer>,
128 key_generator: key::KeyGenerator,
129}
130
131impl AttributionServer {
132 pub fn new(state: Box<GetAttributionFn>) -> AttributionServerHandle {
136 AttributionServerHandle {
137 inner: Arc::new(Mutex::new(AttributionServer {
138 state,
139 consumer: None,
140 key_generator: Default::default(),
141 })),
142 }
143 }
144
145 pub fn on_update(&mut self, updates: Vec<fattribution::AttributionUpdate>) {
146 if let Some(consumer) = &mut self.consumer {
147 return consumer.update_and_notify(updates);
148 }
149 }
150
151 pub fn next(&mut self, responder: fattribution::ProviderGetResponder) {
153 let entry = self.consumer.as_mut().unwrap();
154 entry.get_update(responder, self.state.as_ref());
155 }
156
157 pub fn register(
158 inner: &Arc<Mutex<Self>>,
159 control_handle: fattribution::ProviderControlHandle,
160 ) -> Observer {
161 let mut locked_inner = inner.lock();
162
163 if locked_inner.consumer.is_some() {
164 log::warn!("Multiple connection requests to AttributionProvider");
165 }
168
169 let key = locked_inner.key_generator.next();
170
171 locked_inner.consumer = Some(AttributionConsumer::new(control_handle, key.clone()));
172 Observer { inner: inner.clone(), subscription_id: key }
173 }
174
175 pub fn unregister(&mut self, key: key::Key) {
178 if let Some(consumer) = &self.consumer {
179 if consumer.subscription_id == key {
180 self.consumer = None;
181 }
182 }
183 }
184}
185
186#[derive(Default)]
188struct CoalescedUpdate {
189 add: Option<fattribution::AttributionUpdate>,
190 update: Option<fattribution::AttributionUpdate>,
191 remove: Option<fattribution::AttributionUpdate>,
192}
193
194#[derive(PartialEq)]
196enum ShouldKeepUpdate {
197 KEEP,
198 DISCARD,
199}
200
201impl CoalescedUpdate {
202 pub fn update(&mut self, u: fattribution::AttributionUpdate) -> ShouldKeepUpdate {
204 match u {
205 fattribution::AttributionUpdate::Add(u) => {
206 self.add = Some(fattribution::AttributionUpdate::Add(u));
207 self.update = None;
208 self.remove = None;
209 }
210 fattribution::AttributionUpdate::Update(u) => {
211 self.update = Some(fattribution::AttributionUpdate::Update(u));
212 }
213 fattribution::AttributionUpdate::Remove(u) => {
214 if self.add.is_some() {
215 return ShouldKeepUpdate::DISCARD;
217 }
218 self.remove = Some(fattribution::AttributionUpdate::Remove(u));
219 }
220 fattribution::AttributionUpdateUnknown!() => {
221 error!("Unknown attribution update type");
222 }
223 };
224 ShouldKeepUpdate::KEEP
225 }
226
227 pub fn get_updates(self) -> Vec<fattribution::AttributionUpdate> {
228 let mut result = Vec::new();
229 if let Some(u) = self.add {
230 result.push(u);
231 }
232 if let Some(u) = self.update {
233 result.push(u);
234 }
235 if let Some(u) = self.remove {
236 result.push(u);
237 }
238 result
239 }
240
241 pub fn size(&self) -> (usize, usize) {
242 let (mut bytes, mut handles) = (0, 0);
243 if let Some(u) = &self.add {
244 let m = u.measure();
245 bytes += m.num_bytes;
246 handles += m.num_handles;
247 }
248 if let Some(u) = &self.update {
249 let m = u.measure();
250 bytes += m.num_bytes;
251 handles += m.num_handles;
252 }
253 if let Some(u) = &self.remove {
254 let m = u.measure();
255 bytes += m.num_bytes;
256 handles += m.num_handles;
257 }
258 (bytes, handles)
259 }
260}
261
262struct AttributionConsumer {
264 first: bool,
266
267 pending: HashMap<PrincipalIdentifier, CoalescedUpdate>,
269
270 observer_control_handle: fattribution::ProviderControlHandle,
272
273 responder: Option<fattribution::ProviderGetResponder>,
275
276 subscription_id: key::Key,
278}
279
280impl Drop for AttributionConsumer {
281 fn drop(&mut self) {
282 self.observer_control_handle.shutdown_with_epitaph(zx::Status::CANCELED);
283 }
284}
285
286impl AttributionConsumer {
287 pub fn new(
290 observer_control_handle: fattribution::ProviderControlHandle,
291 key: key::Key,
292 ) -> Self {
293 AttributionConsumer {
294 first: true,
295 pending: HashMap::new(),
296 observer_control_handle: observer_control_handle,
297 responder: None,
298 subscription_id: key,
299 }
300 }
301
302 pub fn get_update(
307 &mut self,
308 responder: fattribution::ProviderGetResponder,
309 gen_state: &GetAttributionFn,
310 ) {
311 if self.responder.is_some() {
312 self.observer_control_handle.shutdown_with_epitaph(zx::Status::BAD_STATE);
313 return;
314 }
315 if self.first {
316 self.first = false;
317 self.pending.clear();
318 self.responder = Some(responder);
319 self.update_and_notify(gen_state());
320 return;
321 }
322 self.responder = Some(responder);
323 self.maybe_notify();
324 }
325
326 pub fn update_and_notify(&mut self, updated_state: Vec<fattribution::AttributionUpdate>) {
328 for update in updated_state {
329 let principal: PrincipalIdentifier = match &update {
330 fattribution::AttributionUpdate::Add(added_attribution) => {
331 PrincipalIdentifier(added_attribution.identifier.unwrap())
332 }
333 fattribution::AttributionUpdate::Update(update_attribution) => {
334 PrincipalIdentifier(update_attribution.identifier.unwrap())
335 }
336 fattribution::AttributionUpdate::Remove(remove_attribution) => {
337 PrincipalIdentifier(*remove_attribution)
338 }
339 &fattribution::AttributionUpdateUnknown!() => {
340 unimplemented!()
341 }
342 };
343 if self.pending.entry(principal.clone()).or_insert(Default::default()).update(update)
344 == ShouldKeepUpdate::DISCARD
345 {
346 self.pending.remove(&principal);
347 }
348 }
349 self.maybe_notify();
350 }
351
352 fn maybe_notify(&mut self) {
354 if self.pending.is_empty() {
355 return;
356 }
357
358 match self.responder.take() {
359 Some(observer) => {
360 let mut iterator = self.pending.drain().peekable();
361 let mut current_size: usize = 32;
362 let mut current_handles: usize = 0;
363 let mut update = Vec::new();
364 while let Some((_, next)) = iterator.peek() {
365 let (update_size, update_handles) = next.size();
366
367 if current_size + update_size > zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize {
368 break;
369 }
370 if current_handles + update_handles
371 > zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize
372 {
373 break;
374 }
375 current_size += update_size;
376 current_handles += update_handles;
377 update.extend(iterator.next().unwrap().1.get_updates().into_iter());
378 }
379
380 self.pending = iterator.collect();
381 Self::send_update(update, observer)
382 }
383 None => {}
384 }
385 }
386
387 fn send_update(
389 state: Vec<fattribution::AttributionUpdate>,
390 responder: fattribution::ProviderGetResponder,
391 ) {
392 match responder.send(Ok(fattribution::ProviderGetResponse {
393 attributions: Some(state),
394 ..Default::default()
395 })) {
396 Ok(()) => {} Err(e) => {
398 if let ClientChannelClosed { status: zx::Status::PEER_CLOSED, .. } = e {
400 return;
402 }
403 error!("Failed to send memory state to observer: {}", e);
404 }
405 }
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use assert_matches::assert_matches;
412
413 use super::*;
414 use fidl::endpoints::RequestStream;
415 use fuchsia_async as fasync;
416 use futures::TryStreamExt;
417
418 #[test]
420 fn test_attribute_memory() {
421 let mut exec = fasync::TestExecutor::new();
422 let server = AttributionServer::new(Box::new(|| {
423 let new_principal = fattribution::NewPrincipal {
424 identifier: Some(0),
425 description: Some(fattribution::Description::Part("part".to_owned())),
426 principal_type: Some(fattribution::PrincipalType::Runnable),
427 detailed_attribution: None,
428 __source_breaking: fidl::marker::SourceBreaking,
429 };
430 vec![fattribution::AttributionUpdate::Add(new_principal)]
431 }));
432 let (snapshot_provider, snapshot_request_stream) =
433 fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
434
435 let observer = server.new_observer(snapshot_request_stream.control_handle());
436 fasync::Task::spawn(async move {
437 serve(observer, snapshot_request_stream).await.unwrap();
438 })
439 .detach();
440
441 let attributions =
442 exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
443 assert!(attributions.is_some());
444
445 let attributions_vec = attributions.unwrap();
446 assert_eq!(attributions_vec.len(), 1);
448 let new_attrib = attributions_vec.get(0).unwrap();
449 let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
450 panic!("Not a new principal");
451 };
452 assert_eq!(added_principal.identifier, Some(0));
453 assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
454
455 server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
456 fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
457 )]);
458 let attributions =
459 exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
460 assert!(attributions.is_some());
461
462 let attributions_vec = attributions.unwrap();
463 assert_eq!(attributions_vec.len(), 1);
465 let updated_attrib = attributions_vec.get(0).unwrap();
466 let fattribution::AttributionUpdate::Update(updated_principal) = updated_attrib else {
467 panic!("Not an updated principal");
468 };
469 assert_eq!(updated_principal.identifier, Some(0));
470 }
471
472 pub async fn serve(
473 observer: Observer,
474 mut stream: fattribution::ProviderRequestStream,
475 ) -> Result<(), fidl::Error> {
476 while let Some(request) = stream.try_next().await? {
477 match request {
478 fattribution::ProviderRequest::Get { responder } => {
479 observer.next(responder);
480 }
481 fattribution::ProviderRequest::_UnknownMethod { .. } => {
482 assert!(false);
483 }
484 }
485 }
486 Ok(())
487 }
488
489 #[test]
491 fn test_disconnect_on_new_connection() {
492 let mut exec = fasync::TestExecutor::new();
493 let server = AttributionServer::new(Box::new(|| {
494 vec![fattribution::AttributionUpdate::Add(fattribution::NewPrincipal {
495 identifier: Some(1),
496 description: Some(fattribution::Description::Part("part1".to_owned())),
497 principal_type: Some(fattribution::PrincipalType::Runnable),
498 detailed_attribution: None,
499 ..Default::default()
500 })]
501 }));
502 let (snapshot_provider, snapshot_request_stream) =
503 fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
504
505 let observer = server.new_observer(snapshot_request_stream.control_handle());
506
507 let (new_snapshot_provider, new_snapshot_request_stream) =
508 fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
509
510 let new_observer = server.new_observer(new_snapshot_request_stream.control_handle());
511 fasync::Task::spawn(async move {
512 serve(new_observer, new_snapshot_request_stream).await.unwrap();
513 })
514 .detach();
515
516 drop(observer);
517 let result = exec.run_singlethreaded(snapshot_provider.get());
518 assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::CANCELED, .. }));
519
520 let result = exec.run_singlethreaded(new_snapshot_provider.get());
521 assert!(result.is_ok());
522 server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Add(
523 fattribution::NewPrincipal {
524 identifier: Some(2),
525 description: Some(fattribution::Description::Part("part2".to_owned())),
526 principal_type: Some(fattribution::PrincipalType::Runnable),
527 detailed_attribution: None,
528 ..Default::default()
529 },
530 )]);
531 let result = exec.run_singlethreaded(new_snapshot_provider.get());
532 assert!(result.is_ok());
533 }
534
535 #[test]
538 fn test_disconnect_on_two_pending_gets() {
539 let mut exec = fasync::TestExecutor::new();
540 let server = AttributionServer::new(Box::new(|| {
541 let new_principal = fattribution::NewPrincipal {
542 identifier: Some(0),
543 principal_type: Some(fattribution::PrincipalType::Runnable),
544 ..Default::default()
545 };
546 vec![fattribution::AttributionUpdate::Add(new_principal)]
547 }));
548 let (snapshot_provider, snapshot_request_stream) =
549 fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
550
551 let observer = server.new_observer(snapshot_request_stream.control_handle());
552 fasync::Task::spawn(async move {
553 serve(observer, snapshot_request_stream).await.unwrap();
554 })
555 .detach();
556
557 exec.run_singlethreaded(snapshot_provider.get())
559 .expect("Connection dropped")
560 .expect("Get call failed");
561
562 let mut future = snapshot_provider.get();
564
565 let _ = exec.run_until_stalled(&mut future);
566
567 let result = exec.run_singlethreaded(snapshot_provider.get());
569
570 let result2 = exec.run_singlethreaded(future);
571
572 assert_matches!(result2, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
573 assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
574 }
575
576 #[test]
578 fn test_no_update_on_first_call() {
579 let mut exec = fasync::TestExecutor::new();
580 let server = AttributionServer::new(Box::new(|| {
581 let new_principal = fattribution::NewPrincipal {
582 identifier: Some(0),
583 principal_type: Some(fattribution::PrincipalType::Runnable),
584 ..Default::default()
585 };
586 vec![fattribution::AttributionUpdate::Add(new_principal)]
587 }));
588 let (snapshot_provider, snapshot_request_stream) =
589 fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
590
591 let observer = server.new_observer(snapshot_request_stream.control_handle());
592 fasync::Task::spawn(async move {
593 serve(observer, snapshot_request_stream).await.unwrap();
594 })
595 .detach();
596
597 server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
598 fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
599 )]);
600
601 let attributions =
603 exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
604 assert!(attributions.is_some());
605
606 let attributions_vec = attributions.unwrap();
607 assert_eq!(attributions_vec.len(), 1);
609 let new_attrib = attributions_vec.get(0).unwrap();
610 let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
611 panic!("Not a new principal");
612 };
613 assert_eq!(added_principal.identifier, Some(0));
614 assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
615 }
616}