attribution_server/
attribution_server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::ControlHandle;
6use fidl::Error::ClientChannelClosed;
7use fidl_fuchsia_memory_attribution as fattribution;
8use fuchsia_sync::Mutex;
9use log::error;
10use measure_tape_for_attribution::Measurable;
11use std::collections::HashMap;
12use std::sync::Arc;
13use thiserror::Error;
14
15mod key {
16    /// Identifier used for disambiguation;
17    #[derive(PartialEq, Eq, Clone, Copy)]
18    pub struct Key(u64);
19
20    /// Generates unique [Key] objects.
21    pub struct KeyGenerator {
22        next: Key,
23    }
24
25    impl Default for KeyGenerator {
26        fn default() -> Self {
27            Self { next: Key(0) }
28        }
29    }
30
31    impl KeyGenerator {
32        /// Generates the next [Key] object.
33        pub fn next(&mut self) -> Key {
34            let next_key = self.next;
35            self.next = Key(self.next.0.checked_add(1).expect("Key generator overflow"));
36            next_key
37        }
38    }
39}
40
41/// Function of this type returns a vector of attribution updates, and is used
42/// as the type of the callback in [AttributionServer::new].
43type GetAttributionFn = dyn Fn() -> Vec<fattribution::AttributionUpdate> + Send;
44
45/// Error types that may be used by the async hanging-get server.
46#[derive(Error, Debug)]
47pub enum AttributionServerObservationError {
48    #[error("multiple pending observations for the same Observer")]
49    GetUpdateAlreadyPending,
50}
51
52#[derive(Clone, PartialEq, Eq, Hash)]
53struct PrincipalIdentifier(u64);
54
55/// Main structure for the memory attribution hanging get server.
56///
57/// Components that wish to expose attribution information should create a
58/// single [AttributionServer] object.
59/// Each inbound fuchsia.attribution.Provider connection should get its own
60/// [Observer] object using [AttributionServer::new_observer].
61/// [Publisher]s, created using [AttributionServer::new_publisher], should be
62/// used to push attribution changes.
63#[derive(Clone)]
64pub struct AttributionServerHandle {
65    inner: Arc<Mutex<AttributionServer>>,
66}
67
68impl AttributionServerHandle {
69    /// Create a new [Observer] that represents a single client.
70    ///
71    /// Each FIDL client connection should get its own [Observer] object.
72    pub fn new_observer(&self, control_handle: fattribution::ProviderControlHandle) -> Observer {
73        AttributionServer::register(&self.inner, control_handle)
74    }
75
76    /// Create a new [Publisher] that can push updates to observers.
77    pub fn new_publisher(&self) -> Publisher {
78        Publisher { inner: self.inner.clone() }
79    }
80}
81
82/// An `Observer` can be used to register observation requests, corresponding to individual hanging
83/// get calls. These will be notified when the state changes or immediately the first time
84/// an `Observer` registers an observation.
85pub struct Observer {
86    inner: Arc<Mutex<AttributionServer>>,
87    subscription_id: key::Key,
88}
89
90impl Observer {
91    /// Register a new observation request.
92    ///
93    /// A newly-created observer will first receive the current state. After
94    /// the first call, the observer will be notified only if the state changes.
95    ///
96    /// Errors occur when an Observer attempts to wait for an update when there
97    /// is a update request already pending.
98    pub fn next(&self, responder: fattribution::ProviderGetResponder) {
99        self.inner.lock().next(responder)
100    }
101}
102
103impl Drop for Observer {
104    fn drop(&mut self) {
105        self.inner.lock().unregister(self.subscription_id);
106    }
107}
108
109/// A [Publisher] should be used to send updates to [Observer]s.
110pub struct Publisher {
111    inner: Arc<Mutex<AttributionServer>>,
112}
113
114impl Publisher {
115    /// Registers an update to the state observed.
116    ///
117    /// `partial_state` is a function that returns the update.
118    pub fn on_update(&self, updates: Vec<fattribution::AttributionUpdate>) {
119        // [update_generator] is a `Fn` and not an `FnOnce` in order to be called multiple times,
120        // once for each [Observer].
121        self.inner.lock().on_update(updates)
122    }
123}
124
125pub struct AttributionServer {
126    state: Box<GetAttributionFn>,
127    consumer: Option<AttributionConsumer>,
128    key_generator: key::KeyGenerator,
129}
130
131impl AttributionServer {
132    /// Create a new memory attribution server.
133    ///
134    /// `state` is a function returning the complete attribution state (not partial updates).
135    pub fn new(state: Box<GetAttributionFn>) -> AttributionServerHandle {
136        AttributionServerHandle {
137            inner: Arc::new(Mutex::new(AttributionServer {
138                state,
139                consumer: None,
140                key_generator: Default::default(),
141            })),
142        }
143    }
144
145    pub fn on_update(&mut self, updates: Vec<fattribution::AttributionUpdate>) {
146        if let Some(consumer) = &mut self.consumer {
147            return consumer.update_and_notify(updates);
148        }
149    }
150
151    /// Get the next attribution state.
152    pub fn next(&mut self, responder: fattribution::ProviderGetResponder) {
153        let entry = self.consumer.as_mut().unwrap();
154        entry.get_update(responder, self.state.as_ref());
155    }
156
157    pub fn register(
158        inner: &Arc<Mutex<Self>>,
159        control_handle: fattribution::ProviderControlHandle,
160    ) -> Observer {
161        let mut locked_inner = inner.lock();
162
163        if locked_inner.consumer.is_some() {
164            log::warn!("Multiple connection requests to AttributionProvider");
165            // The shutdown of the observer will be done when the old [AttributionConsumer] is
166            // dropped.
167        }
168
169        let key = locked_inner.key_generator.next();
170
171        locked_inner.consumer = Some(AttributionConsumer::new(control_handle, key.clone()));
172        Observer { inner: inner.clone(), subscription_id: key }
173    }
174
175    /// Deregister the current observer. No observer can be registered as long
176    /// as another observer is already registered.
177    pub fn unregister(&mut self, key: key::Key) {
178        if let Some(consumer) = &self.consumer {
179            if consumer.subscription_id == key {
180                self.consumer = None;
181            }
182        }
183    }
184}
185
186/// CoalescedUpdate contains all the pending updates for a given principal.
187#[derive(Default)]
188struct CoalescedUpdate {
189    add: Option<fattribution::AttributionUpdate>,
190    update: Option<fattribution::AttributionUpdate>,
191    remove: Option<fattribution::AttributionUpdate>,
192}
193
194/// Should the update be kept, or can it be discarded.
195#[derive(PartialEq)]
196enum ShouldKeepUpdate {
197    KEEP,
198    DISCARD,
199}
200
201impl CoalescedUpdate {
202    /// Merges updates of a given Principal, discarding the ones that become irrelevant.
203    pub fn update(&mut self, u: fattribution::AttributionUpdate) -> ShouldKeepUpdate {
204        match u {
205            fattribution::AttributionUpdate::Add(u) => {
206                self.add = Some(fattribution::AttributionUpdate::Add(u));
207                self.update = None;
208                self.remove = None;
209            }
210            fattribution::AttributionUpdate::Update(u) => {
211                self.update = Some(fattribution::AttributionUpdate::Update(u));
212            }
213            fattribution::AttributionUpdate::Remove(u) => {
214                if self.add.is_some() {
215                    // We both added and removed the principal, so it is a no-op.
216                    return ShouldKeepUpdate::DISCARD;
217                }
218                self.remove = Some(fattribution::AttributionUpdate::Remove(u));
219            }
220            fattribution::AttributionUpdateUnknown!() => {
221                error!("Unknown attribution update type");
222            }
223        };
224        ShouldKeepUpdate::KEEP
225    }
226
227    pub fn get_updates(self) -> Vec<fattribution::AttributionUpdate> {
228        let mut result = Vec::new();
229        if let Some(u) = self.add {
230            result.push(u);
231        }
232        if let Some(u) = self.update {
233            result.push(u);
234        }
235        if let Some(u) = self.remove {
236            result.push(u);
237        }
238        result
239    }
240
241    pub fn size(&self) -> (usize, usize) {
242        let (mut bytes, mut handles) = (0, 0);
243        if let Some(u) = &self.add {
244            let m = u.measure();
245            bytes += m.num_bytes;
246            handles += m.num_handles;
247        }
248        if let Some(u) = &self.update {
249            let m = u.measure();
250            bytes += m.num_bytes;
251            handles += m.num_handles;
252        }
253        if let Some(u) = &self.remove {
254            let m = u.measure();
255            bytes += m.num_bytes;
256            handles += m.num_handles;
257        }
258        (bytes, handles)
259    }
260}
261
262/// AttributionConsumer tracks pending updates and observation requests for a given id.
263struct AttributionConsumer {
264    /// Whether we sent the first full state, or not.
265    first: bool,
266
267    /// Pending updates waiting to be sent.
268    pending: HashMap<PrincipalIdentifier, CoalescedUpdate>,
269
270    /// Control handle for the FIDL connection.
271    observer_control_handle: fattribution::ProviderControlHandle,
272
273    /// FIDL responder for a pending hanging get call.
274    responder: Option<fattribution::ProviderGetResponder>,
275
276    /// Matches an AttributionConsumer with an Observer.
277    subscription_id: key::Key,
278}
279
280impl Drop for AttributionConsumer {
281    fn drop(&mut self) {
282        self.observer_control_handle.shutdown_with_epitaph(zx::Status::CANCELED);
283    }
284}
285
286impl AttributionConsumer {
287    /// Create a new [AttributionConsumer] without an `observer` and an initial `dirty`
288    /// value of `true`.
289    pub fn new(
290        observer_control_handle: fattribution::ProviderControlHandle,
291        key: key::Key,
292    ) -> Self {
293        AttributionConsumer {
294            first: true,
295            pending: HashMap::new(),
296            observer_control_handle: observer_control_handle,
297            responder: None,
298            subscription_id: key,
299        }
300    }
301
302    /// Register a new observation request. The observer will be notified immediately if
303    /// the [AttributionConsumer] has pending updates, or hasn't sent anything yet. The
304    /// request will be stored for future notification if the [AttributionConsumer] does
305    /// not have anything to send yet.
306    pub fn get_update(
307        &mut self,
308        responder: fattribution::ProviderGetResponder,
309        gen_state: &GetAttributionFn,
310    ) {
311        if self.responder.is_some() {
312            self.observer_control_handle.shutdown_with_epitaph(zx::Status::BAD_STATE);
313            return;
314        }
315        if self.first {
316            self.first = false;
317            self.pending.clear();
318            self.responder = Some(responder);
319            self.update_and_notify(gen_state());
320            return;
321        }
322        self.responder = Some(responder);
323        self.maybe_notify();
324    }
325
326    /// Take in new memory attribution updates.
327    pub fn update_and_notify(&mut self, updated_state: Vec<fattribution::AttributionUpdate>) {
328        for update in updated_state {
329            let principal: PrincipalIdentifier = match &update {
330                fattribution::AttributionUpdate::Add(added_attribution) => {
331                    PrincipalIdentifier(added_attribution.identifier.unwrap())
332                }
333                fattribution::AttributionUpdate::Update(update_attribution) => {
334                    PrincipalIdentifier(update_attribution.identifier.unwrap())
335                }
336                fattribution::AttributionUpdate::Remove(remove_attribution) => {
337                    PrincipalIdentifier(*remove_attribution)
338                }
339                &fattribution::AttributionUpdateUnknown!() => {
340                    unimplemented!()
341                }
342            };
343            if self.pending.entry(principal.clone()).or_insert(Default::default()).update(update)
344                == ShouldKeepUpdate::DISCARD
345            {
346                self.pending.remove(&principal);
347            }
348        }
349        self.maybe_notify();
350    }
351
352    /// Notify of the pending updates if a responder is available.
353    fn maybe_notify(&mut self) {
354        if self.pending.is_empty() {
355            return;
356        }
357
358        match self.responder.take() {
359            Some(observer) => {
360                let mut iterator = self.pending.drain().peekable();
361                let mut current_size: usize = 32;
362                let mut current_handles: usize = 0;
363                let mut update = Vec::new();
364                while let Some((_, next)) = iterator.peek() {
365                    let (update_size, update_handles) = next.size();
366
367                    if current_size + update_size > zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize {
368                        break;
369                    }
370                    if current_handles + update_handles
371                        > zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize
372                    {
373                        break;
374                    }
375                    current_size += update_size;
376                    current_handles += update_handles;
377                    update.extend(iterator.next().unwrap().1.get_updates().into_iter());
378                }
379
380                self.pending = iterator.collect();
381                Self::send_update(update, observer)
382            }
383            None => {}
384        }
385    }
386
387    /// Sends the attribution update to the provided responder.
388    fn send_update(
389        state: Vec<fattribution::AttributionUpdate>,
390        responder: fattribution::ProviderGetResponder,
391    ) {
392        match responder.send(Ok(fattribution::ProviderGetResponse {
393            attributions: Some(state),
394            ..Default::default()
395        })) {
396            Ok(()) => {} // indicates that the observer was successfully updated
397            Err(e) => {
398                // `send()` ensures that the channel is shut down in case of error.
399                if let ClientChannelClosed { status: zx::Status::PEER_CLOSED, .. } = e {
400                    // Skip if this is simply our client closing the channel.
401                    return;
402                }
403                error!("Failed to send memory state to observer: {}", e);
404            }
405        }
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use assert_matches::assert_matches;
412
413    use super::*;
414    use fidl::endpoints::RequestStream;
415    use fuchsia_async as fasync;
416    use futures::TryStreamExt;
417
418    /// Tests that the ELF runner can tell us about the resources used by the component it runs.
419    #[test]
420    fn test_attribute_memory() {
421        let mut exec = fasync::TestExecutor::new();
422        let server = AttributionServer::new(Box::new(|| {
423            let new_principal = fattribution::NewPrincipal {
424                identifier: Some(0),
425                description: Some(fattribution::Description::Part("part".to_owned())),
426                principal_type: Some(fattribution::PrincipalType::Runnable),
427                detailed_attribution: None,
428                __source_breaking: fidl::marker::SourceBreaking,
429            };
430            vec![fattribution::AttributionUpdate::Add(new_principal)]
431        }));
432        let (snapshot_provider, snapshot_request_stream) =
433            fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
434
435        let observer = server.new_observer(snapshot_request_stream.control_handle());
436        fasync::Task::spawn(async move {
437            serve(observer, snapshot_request_stream).await.unwrap();
438        })
439        .detach();
440
441        let attributions =
442            exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
443        assert!(attributions.is_some());
444
445        let attributions_vec = attributions.unwrap();
446        // It should contain one component, the one we just launched.
447        assert_eq!(attributions_vec.len(), 1);
448        let new_attrib = attributions_vec.get(0).unwrap();
449        let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
450            panic!("Not a new principal");
451        };
452        assert_eq!(added_principal.identifier, Some(0));
453        assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
454
455        server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
456            fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
457        )]);
458        let attributions =
459            exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
460        assert!(attributions.is_some());
461
462        let attributions_vec = attributions.unwrap();
463        // It should contain one component, the one we just launched.
464        assert_eq!(attributions_vec.len(), 1);
465        let updated_attrib = attributions_vec.get(0).unwrap();
466        let fattribution::AttributionUpdate::Update(updated_principal) = updated_attrib else {
467            panic!("Not an updated principal");
468        };
469        assert_eq!(updated_principal.identifier, Some(0));
470    }
471
472    pub async fn serve(
473        observer: Observer,
474        mut stream: fattribution::ProviderRequestStream,
475    ) -> Result<(), fidl::Error> {
476        while let Some(request) = stream.try_next().await? {
477            match request {
478                fattribution::ProviderRequest::Get { responder } => {
479                    observer.next(responder);
480                }
481                fattribution::ProviderRequest::_UnknownMethod { .. } => {
482                    assert!(false);
483                }
484            }
485        }
486        Ok(())
487    }
488
489    /// Tests that a new Provider connection cancels a previous one.
490    #[test]
491    fn test_disconnect_on_new_connection() {
492        let mut exec = fasync::TestExecutor::new();
493        let server = AttributionServer::new(Box::new(|| {
494            vec![fattribution::AttributionUpdate::Add(fattribution::NewPrincipal {
495                identifier: Some(1),
496                description: Some(fattribution::Description::Part("part1".to_owned())),
497                principal_type: Some(fattribution::PrincipalType::Runnable),
498                detailed_attribution: None,
499                ..Default::default()
500            })]
501        }));
502        let (snapshot_provider, snapshot_request_stream) =
503            fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
504
505        let observer = server.new_observer(snapshot_request_stream.control_handle());
506
507        let (new_snapshot_provider, new_snapshot_request_stream) =
508            fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
509
510        let new_observer = server.new_observer(new_snapshot_request_stream.control_handle());
511        fasync::Task::spawn(async move {
512            serve(new_observer, new_snapshot_request_stream).await.unwrap();
513        })
514        .detach();
515
516        drop(observer);
517        let result = exec.run_singlethreaded(snapshot_provider.get());
518        assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::CANCELED, .. }));
519
520        let result = exec.run_singlethreaded(new_snapshot_provider.get());
521        assert!(result.is_ok());
522        server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Add(
523            fattribution::NewPrincipal {
524                identifier: Some(2),
525                description: Some(fattribution::Description::Part("part2".to_owned())),
526                principal_type: Some(fattribution::PrincipalType::Runnable),
527                detailed_attribution: None,
528                ..Default::default()
529            },
530        )]);
531        let result = exec.run_singlethreaded(new_snapshot_provider.get());
532        assert!(result.is_ok());
533    }
534
535    /// Tests that a new [Provider::get] call while another call is still pending
536    /// generates an error.
537    #[test]
538    fn test_disconnect_on_two_pending_gets() {
539        let mut exec = fasync::TestExecutor::new();
540        let server = AttributionServer::new(Box::new(|| {
541            let new_principal = fattribution::NewPrincipal {
542                identifier: Some(0),
543                principal_type: Some(fattribution::PrincipalType::Runnable),
544                ..Default::default()
545            };
546            vec![fattribution::AttributionUpdate::Add(new_principal)]
547        }));
548        let (snapshot_provider, snapshot_request_stream) =
549            fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
550
551        let observer = server.new_observer(snapshot_request_stream.control_handle());
552        fasync::Task::spawn(async move {
553            serve(observer, snapshot_request_stream).await.unwrap();
554        })
555        .detach();
556
557        // The first call should succeed right away.
558        exec.run_singlethreaded(snapshot_provider.get())
559            .expect("Connection dropped")
560            .expect("Get call failed");
561
562        // The next call should block until an update is pushed on the provider side.
563        let mut future = snapshot_provider.get();
564
565        let _ = exec.run_until_stalled(&mut future);
566
567        // The second parallel get() call should fail.
568        let result = exec.run_singlethreaded(snapshot_provider.get());
569
570        let result2 = exec.run_singlethreaded(future);
571
572        assert_matches!(result2, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
573        assert_matches!(result, Err(ClientChannelClosed { status: zx::Status::BAD_STATE, .. }));
574    }
575
576    /// Tests that the first get call returns the full state, not updates.
577    #[test]
578    fn test_no_update_on_first_call() {
579        let mut exec = fasync::TestExecutor::new();
580        let server = AttributionServer::new(Box::new(|| {
581            let new_principal = fattribution::NewPrincipal {
582                identifier: Some(0),
583                principal_type: Some(fattribution::PrincipalType::Runnable),
584                ..Default::default()
585            };
586            vec![fattribution::AttributionUpdate::Add(new_principal)]
587        }));
588        let (snapshot_provider, snapshot_request_stream) =
589            fidl::endpoints::create_proxy_and_stream::<fattribution::ProviderMarker>();
590
591        let observer = server.new_observer(snapshot_request_stream.control_handle());
592        fasync::Task::spawn(async move {
593            serve(observer, snapshot_request_stream).await.unwrap();
594        })
595        .detach();
596
597        server.new_publisher().on_update(vec![fattribution::AttributionUpdate::Update(
598            fattribution::UpdatedPrincipal { identifier: Some(0), ..Default::default() },
599        )]);
600
601        // As this is the first call, we should get the full state, not the update.
602        let attributions =
603            exec.run_singlethreaded(snapshot_provider.get()).unwrap().unwrap().attributions;
604        assert!(attributions.is_some());
605
606        let attributions_vec = attributions.unwrap();
607        // It should contain one component, the one we just launched.
608        assert_eq!(attributions_vec.len(), 1);
609        let new_attrib = attributions_vec.get(0).unwrap();
610        let fattribution::AttributionUpdate::Add(added_principal) = new_attrib else {
611            panic!("Not a new principal");
612        };
613        assert_eq!(added_principal.identifier, Some(0));
614        assert_eq!(added_principal.principal_type, Some(fattribution::PrincipalType::Runnable));
615    }
616}