archivist_lib/logs/
container.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::shared_buffer::ContainerBuffer;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_diagnostics::LogInterestSelector;
12use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
13use fidl_fuchsia_logger::{LogSinkOnInitRequest, LogSinkRequest, LogSinkRequestStream};
14use fuchsia_async as fasync;
15use fuchsia_async::condition::Condition;
16use futures::future::{Fuse, FusedFuture};
17use futures::prelude::*;
18use futures::select;
19use futures::stream::StreamExt;
20use log::{debug, error};
21use selectors::SelectorExt;
22use std::cmp::Ordering;
23use std::collections::BTreeMap;
24use std::pin::pin;
25use std::sync::Arc;
26use std::task::Poll;
27
28pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
29
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct LogsArtifactsContainer {
33    /// The source of logs in this container.
34    pub identity: Arc<ComponentIdentity>,
35
36    /// Inspect instrumentation.
37    pub stats: Arc<LogStreamStats>,
38
39    /// Buffer for all log messages.
40    #[derivative(Debug = "ignore")]
41    buffer: ContainerBuffer,
42
43    /// Mutable state for the container.
44    #[derivative(Debug = "ignore")]
45    state: Arc<Condition<ContainerState>>,
46
47    /// A callback which is called when the container is inactive i.e. has no channels, sockets or
48    /// stored logs.
49    #[derivative(Debug = "ignore")]
50    on_inactive: Option<OnInactive>,
51}
52
53#[derive(Debug)]
54struct ContainerState {
55    /// Number of LogSink channels currently being listened to for this component.
56    num_active_channels: u64,
57
58    /// Current interest for this component.
59    interests: BTreeMap<Interest, usize>,
60
61    is_initializing: bool,
62}
63
64impl LogsArtifactsContainer {
65    pub fn new<'a>(
66        identity: Arc<ComponentIdentity>,
67        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
68        initial_interest: Option<FidlSeverity>,
69        stats: Arc<LogStreamStats>,
70        buffer: ContainerBuffer,
71        on_inactive: Option<OnInactive>,
72    ) -> Self {
73        let mut interests = BTreeMap::new();
74        if let Some(severity) = initial_interest {
75            interests.insert(Interest::from(severity), 1);
76        }
77        let new = Self {
78            identity,
79            buffer,
80            state: Arc::new(Condition::new(ContainerState {
81                num_active_channels: 0,
82                interests,
83                is_initializing: true,
84            })),
85            stats,
86            on_inactive,
87        };
88
89        // there are no control handles so this won't notify anyone
90        new.update_interest(interest_selectors, &[]);
91
92        new
93    }
94
95    /// Handle `LogSink` protocol on `stream`. Each socket received from the `LogSink` client is
96    /// drained by a `Task` which is sent on `sender`. The `Task`s do not complete until their
97    /// sockets have been closed.
98    pub fn handle_log_sink(
99        self: &Arc<Self>,
100        stream: LogSinkRequestStream,
101        scope: fasync::ScopeHandle,
102    ) {
103        if stream
104            .control_handle()
105            .send_on_init(LogSinkOnInitRequest {
106                buffer: Some(self.buffer.iob()),
107                interest: Some(self.state.lock().min_interest()),
108                ..Default::default()
109            })
110            .is_err()
111        {
112            return;
113        }
114
115        {
116            let mut guard = self.state.lock();
117            guard.num_active_channels += 1;
118            guard.is_initializing = false;
119        }
120        scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
121    }
122
123    /// This function does not return until the channel is closed.
124    async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
125        let mut previous_interest_sent = None;
126        debug!(identity:% = self.identity; "Draining LogSink channel.");
127
128        let mut hanging_gets = Vec::new();
129        let mut interest_changed = pin!(Fuse::terminated());
130
131        loop {
132            select! {
133                next = stream.next() => {
134                    let Some(next) = next else { break };
135                    match next {
136                        Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
137                            self.buffer.add_socket(socket);
138                        }
139                        Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
140                            // If the interest has changed since we last reported it, we'll report
141                            // it below.
142                            hanging_gets.push(responder);
143                        }
144                        Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
145                        Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
146                    }
147                }
148                _ = interest_changed => {}
149            }
150
151            if !hanging_gets.is_empty() {
152                let min_interest = self.state.lock().min_interest();
153                if Some(&min_interest) != previous_interest_sent.as_ref() {
154                    // Send updates to all outstanding hanging gets.
155                    for responder in hanging_gets.drain(..) {
156                        let _ = responder.send(Ok(&min_interest));
157                    }
158                    interest_changed.set(Fuse::terminated());
159                    previous_interest_sent = Some(min_interest);
160                } else if interest_changed.is_terminated() {
161                    // Set ourselves up to be woken when the interest changes.
162                    let previous_interest_sent = previous_interest_sent.clone();
163                    interest_changed.set(
164                        self.state
165                            .when(move |state| {
166                                if previous_interest_sent != Some(state.min_interest()) {
167                                    Poll::Ready(())
168                                } else {
169                                    Poll::Pending
170                                }
171                            })
172                            .fuse(),
173                    );
174                }
175            }
176        }
177
178        debug!(identity:% = self.identity; "LogSink channel closed.");
179        self.state.lock().num_active_channels -= 1;
180        self.check_inactive();
181    }
182
183    /// Updates log stats in inspect and push the message onto the container's buffer.
184    pub fn ingest_message(&self, message: StoredMessage) {
185        self.buffer.push_back(message.bytes());
186    }
187
188    /// Set the `Interest` for this component, notifying all active `LogSink/WaitForInterestChange`
189    /// hanging gets with the new interset if it is a change from the previous interest.
190    /// For any match that is also contained in `previous_selectors`, the previous values will be
191    /// removed from the set of interests.
192    pub fn update_interest<'a>(
193        &self,
194        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
195        previous_selectors: &[LogInterestSelector],
196    ) {
197        let mut new_interest = FidlInterest::default();
198        let mut remove_interest = FidlInterest::default();
199        for selector in interest_selectors {
200            if self
201                .identity
202                .moniker
203                .matches_component_selector(&selector.selector)
204                .unwrap_or_default()
205            {
206                new_interest = selector.interest.clone();
207                // If there are more matches, ignore them, we'll pick the first match.
208                break;
209            }
210        }
211
212        if let Some(previous_selector) = previous_selectors.iter().find(|s| {
213            self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
214        }) {
215            remove_interest = previous_selector.interest.clone();
216        }
217
218        let mut state = self.state.lock();
219        // Unfortunately we cannot use a match statement since `FidlInterest` doesn't derive Eq.
220        // It does derive PartialEq though. All these branches will send an interest update if the
221        // minimum interest changes after performing the required actions.
222        if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
223            state.erase(&remove_interest);
224        } else if new_interest != FidlInterest::default()
225            && remove_interest == FidlInterest::default()
226        {
227            state.push_interest(new_interest);
228        } else if new_interest != FidlInterest::default()
229            && remove_interest != FidlInterest::default()
230        {
231            state.erase(&remove_interest);
232            state.push_interest(new_interest);
233        } else {
234            return;
235        }
236
237        for waker in state.drain_wakers() {
238            waker.wake();
239        }
240    }
241
242    /// Resets the `Interest` for this component, notifying all active
243    /// `LogSink/WaitForInterestChange` hanging gets with the lowest interest found in the set of
244    /// requested interests for all control handles.
245    pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
246        for selector in interest_selectors {
247            if self
248                .identity
249                .moniker
250                .matches_component_selector(&selector.selector)
251                .unwrap_or_default()
252            {
253                let mut state = self.state.lock();
254                state.erase(&selector.interest);
255                for waker in state.drain_wakers() {
256                    waker.wake();
257                }
258                return;
259            }
260        }
261    }
262
263    /// Returns `true` if this container corresponds to a running component, or still has pending
264    /// objects to drain.
265    pub fn is_active(&self) -> bool {
266        let state = self.state.lock();
267        state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
268    }
269
270    /// Called whenever there's a transition that means the component might no longer be active.
271    fn check_inactive(&self) {
272        if !self.is_active()
273            && let Some(on_inactive) = &self.on_inactive
274        {
275            on_inactive(self);
276        }
277    }
278
279    /// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
280    /// consuming any messages received before this call.
281    pub fn terminate(&self) {
282        self.buffer.terminate();
283    }
284
285    #[cfg(test)]
286    pub fn mark_stopped(&self) {
287        self.state.lock().is_initializing = false;
288        self.check_inactive();
289    }
290
291    pub fn buffer(&self) -> &ContainerBuffer {
292        &self.buffer
293    }
294}
295
296impl ContainerState {
297    /// Pushes the given `interest` to the set.
298    fn push_interest(&mut self, interest: FidlInterest) {
299        if interest != FidlInterest::default() {
300            let count = self.interests.entry(interest.into()).or_insert(0);
301            *count += 1;
302        }
303    }
304
305    /// Removes the given `interest` from the set
306    fn erase(&mut self, interest: &FidlInterest) {
307        let interest = interest.clone().into();
308        if let Some(count) = self.interests.get_mut(&interest) {
309            if *count <= 1 {
310                self.interests.remove(&interest);
311            } else {
312                *count -= 1;
313            }
314        }
315    }
316
317    /// Returns a copy of the lowest interest in the set. If the set is empty, an EMPTY interest is
318    /// returned.
319    fn min_interest(&self) -> FidlInterest {
320        // btreemap: keys are sorted and ascending.
321        self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
322    }
323}
324
325#[derive(Debug, PartialEq)]
326struct Interest(FidlInterest);
327
328impl From<FidlInterest> for Interest {
329    fn from(interest: FidlInterest) -> Interest {
330        Interest(interest)
331    }
332}
333
334impl From<FidlSeverity> for Interest {
335    fn from(severity: FidlSeverity) -> Interest {
336        Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
337    }
338}
339
340impl std::ops::Deref for Interest {
341    type Target = FidlInterest;
342    fn deref(&self) -> &Self::Target {
343        &self.0
344    }
345}
346
347impl Eq for Interest {}
348
349impl Ord for Interest {
350    fn cmp(&self, other: &Self) -> Ordering {
351        self.min_severity.cmp(&other.min_severity)
352    }
353}
354
355impl PartialOrd for Interest {
356    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
357        Some(self.cmp(other))
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
365    use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
366    use fidl_fuchsia_diagnostics_types::Severity;
367    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
368    use fuchsia_async::{Task, TestExecutor};
369    use fuchsia_inspect as inspect;
370    use fuchsia_inspect_derive::WithInspect;
371    use moniker::ExtendedMoniker;
372
373    fn initialize_container(
374        severity: Option<Severity>,
375        scope: fasync::ScopeHandle,
376    ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
377        let identity = Arc::new(ComponentIdentity::new(
378            ExtendedMoniker::parse_str("/foo/bar").unwrap(),
379            "fuchsia-pkg://test",
380        ));
381        let stats = Arc::new(
382            LogStreamStats::default()
383                .with_inspect(inspect::component::inspector().root(), identity.moniker.as_ref())
384                .expect("failed to attach component log stats"),
385        );
386        let buffer = SharedBuffer::new(
387            create_ring_buffer(1024 * 1024),
388            Box::new(|_| {}),
389            Default::default(),
390        );
391        let container = Arc::new(LogsArtifactsContainer::new(
392            identity,
393            std::iter::empty(),
394            severity,
395            Arc::clone(&stats),
396            buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
397            None,
398        ));
399        // Connect out LogSink under test and take its events channel.
400        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
401        container.handle_log_sink(stream, scope);
402        (container, proxy)
403    }
404
405    #[fuchsia::test(allow_stalls = false)]
406    async fn update_interest() {
407        // Sync path test (initial interest)
408        let scope = fasync::Scope::new();
409        let (container, log_sink) = initialize_container(None, scope.to_handle());
410
411        // Get initial interest
412        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
413
414        // Async (blocking) path test.
415        assert_eq!(initial_interest.min_severity, None);
416        let log_sink_clone = log_sink.clone();
417        let mut interest_future =
418            Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
419
420        // The call should be blocked.
421        assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
422
423        // We should see this interest update. This should unblock the hanging get.
424        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
425
426        // Verify we see the last interest we set.
427        assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
428    }
429
430    #[fuchsia::test]
431    async fn initial_interest() {
432        let scope = fasync::Scope::new();
433        let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
434        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
435        assert_eq!(initial_interest.min_severity, Some(Severity::Info));
436    }
437
438    #[fuchsia::test]
439    async fn interest_serverity_semantics() {
440        let scope = fasync::Scope::new();
441        let (container, log_sink) = initialize_container(None, scope.to_handle());
442        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
443        assert_eq!(initial_interest.min_severity, None);
444        // Set some interest.
445        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
446        assert_severity(&log_sink, Severity::Info).await;
447        assert_interests(&container, [(Severity::Info, 1)]);
448
449        // Sending a higher interest (WARN > INFO) has no visible effect, even if the new interest
450        // (WARN) will be tracked internally until reset.
451        container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
452        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
453
454        // Sending a lower interest (DEBUG < INFO) updates the previous one.
455        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
456        assert_severity(&log_sink, Severity::Debug).await;
457        assert_interests(
458            &container,
459            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
460        );
461
462        // Sending the same interest leads to tracking it twice, but no updates are sent since it's
463        // the same minimum interest.
464        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
465        assert_interests(
466            &container,
467            [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
468        );
469
470        // The first reset does nothing, since the new minimum interest remains the same (we had
471        // inserted twice, therefore we need to reset twice).
472        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
473        assert_interests(
474            &container,
475            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
476        );
477
478        // The second reset causes a change in minimum interest -> now INFO.
479        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
480        assert_severity(&log_sink, Severity::Info).await;
481        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
482
483        // If we pass a previous severity (INFO), then we undo it and set the new one (ERROR).
484        // However, we get WARN since that's the minimum severity in the set.
485        container.update_interest(
486            [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
487            &[interest(&["foo", "bar"], Some(Severity::Info))],
488        );
489        assert_severity(&log_sink, Severity::Warn).await;
490        assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
491
492        // When we reset warn, now we get ERROR since that's the minimum severity in the set.
493        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
494        assert_severity(&log_sink, Severity::Error).await;
495        assert_interests(&container, [(Severity::Error, 1)]);
496
497        // When we reset ERROR , we get back to EMPTY since we have removed all interests from the
498        // set.
499        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
500        assert_eq!(
501            log_sink.wait_for_interest_change().await.unwrap().unwrap(),
502            FidlInterest::default()
503        );
504
505        assert_interests(&container, []);
506    }
507
508    fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
509        LogInterestSelector {
510            selector: ComponentSelector {
511                moniker_segments: Some(
512                    moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
513                ),
514                ..Default::default()
515            },
516            interest: FidlInterest { min_severity, ..Default::default() },
517        }
518    }
519
520    async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
521        assert_eq!(
522            proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
523            severity
524        );
525    }
526
527    fn assert_interests<const N: usize>(
528        container: &LogsArtifactsContainer,
529        severities: [(Severity, usize); N],
530    ) {
531        let mut expected_map = BTreeMap::new();
532        expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
533            let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
534            (interest.into(), c)
535        }));
536        assert_eq!(expected_map, container.state.lock().interests);
537    }
538}