Skip to main content

archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursor, FilterCursorStream, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15    ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_async as fasync;
20use fuchsia_inspect as inspect;
21use fuchsia_sync::Mutex;
22use futures::prelude::*;
23use log::{LevelFilter, debug, error};
24use moniker::{ExtendedMoniker, Moniker};
25use selectors::SelectorExt;
26use std::collections::{BTreeMap, HashMap};
27use std::str::FromStr;
28use std::sync::atomic::{AtomicUsize, Ordering};
29use std::sync::{Arc, OnceLock, Weak};
30
31// LINT.IfChange
32#[derive(Ord, PartialOrd, Eq, PartialEq)]
33pub struct ComponentInitialInterest {
34    /// The URL or moniker for the component which should receive the initial interest.
35    component: UrlOrMoniker,
36    /// The log severity the initial interest should specify.
37    log_severity: Severity,
38}
39// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
40
41impl FromStr for ComponentInitialInterest {
42    type Err = anyhow::Error;
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        let mut split = s.rsplitn(2, ":");
45        match (split.next(), split.next()) {
46            (Some(severity), Some(url_or_moniker)) => {
47                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
48                    return Err(format_err!("invalid url or moniker"));
49                };
50                let Ok(severity) = Severity::from_str(severity) else {
51                    return Err(format_err!("invalid severity"));
52                };
53                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
54            }
55            _ => Err(format_err!("invalid interest")),
56        }
57    }
58}
59
60#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
61pub enum UrlOrMoniker {
62    /// An absolute fuchsia url to a component.
63    Url(FlyStr),
64    /// The absolute moniker for a component.
65    Moniker(ExtendedMoniker),
66    /// A partial string to match against url or moniker.
67    Partial(FlyStr),
68}
69
70impl FromStr for UrlOrMoniker {
71    type Err = ();
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
74            || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
75        {
76            Ok(UrlOrMoniker::Url(s.into()))
77        } else if s.starts_with("/") {
78            if let Ok(moniker) = Moniker::from_str(s) {
79                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80            } else {
81                Err(())
82            }
83        } else {
84            Ok(UrlOrMoniker::Partial(s.into()))
85        }
86    }
87}
88
89/// Static ID, used for persistent changes to interest settings.
90pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93/// This is set when Archivist first starts. This might not be set for unit tests.
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96/// The IOBuffer writer tag used for Archivist logs. We rely on the first container we create having
97/// this tag, so the Archivist container must be the first container created after creating the
98/// shared buffer. This tag is used before we create the container because we want to set up logging
99/// at the earliest possible moment after a binary starts.
100pub const ARCHIVIST_TAG: u64 = 0;
101
102/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
103/// [`pipeline::Pipeline`]s in a given Archivist instance.
104pub struct LogsRepository {
105    mutable_state: Mutex<LogsRepositoryState>,
106    shared_buffer: Arc<SharedBuffer>,
107    scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111    pub fn new(
112        ring_buffer: ring_buffer::Reader,
113        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114        parent: &fuchsia_inspect::Node,
115        scope: fasync::Scope,
116    ) -> Arc<Self> {
117        let scope_handle = scope.to_handle();
118        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119            let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120            let me_clone = Weak::clone(me);
121            let shared_buffer = SharedBuffer::new(
122                ring_buffer,
123                Box::new(move |identity| {
124                    if let Some(this) = me_clone.upgrade() {
125                        this.on_container_inactive(&identity);
126                    }
127                }),
128                Default::default(),
129                mutable_state.global_analytics.logs_node(),
130            );
131            if let Some(m) = ARCHIVIST_MONIKER.get() {
132                let archivist_container = mutable_state.create_log_container(
133                    Arc::new(ComponentIdentity::new(
134                        ExtendedMoniker::ComponentInstance(m.clone()),
135                        "fuchsia-boot:///archivist#meta/archivist.cm",
136                    )),
137                    &shared_buffer,
138                    Weak::clone(me),
139                );
140                // We rely on the first container we create ending up with the correct tag.
141                assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
142            }
143            LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
144        })
145    }
146
147    pub async fn flush(&self) {
148        self.shared_buffer.flush().await;
149    }
150
151    /// Drain the kernel's debug log. The returned future completes once
152    /// existing messages have been ingested.
153    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
154    where
155        K: DebugLog + Send + Sync + 'static,
156    {
157        let mut mutable_state = self.mutable_state.lock();
158
159        // We can only have one klog reader, if this is already set, it means we are already
160        // draining klog.
161        if mutable_state.draining_klog {
162            return;
163        }
164        mutable_state.draining_klog = true;
165
166        let container =
167            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
168        let Some(ref scope) = mutable_state.scope else {
169            return;
170        };
171        scope.spawn(async move {
172            debug!("Draining debuglog.");
173            let mut kernel_logger = DebugLogBridge::create(klog_reader);
174            let mut messages = match kernel_logger.existing_logs() {
175                Ok(messages) => messages,
176                Err(e) => {
177                    error!(e:%; "failed to read from kernel log, important logs may be missing");
178                    return;
179                }
180            };
181            messages.sort_by_key(|m| m.timestamp());
182            for message in messages {
183                container.ingest_message(message);
184            }
185
186            let res = kernel_logger
187                .listen()
188                .try_for_each(|message| async {
189                    container.ingest_message(message);
190                    Ok(())
191                })
192                .await;
193            if let Err(e) = res {
194                error!(e:%; "failed to drain kernel log, important logs may be missing");
195            }
196        });
197    }
198
199    pub fn logs_cursor_raw(
200        &self,
201        mode: StreamMode,
202        selectors: Vec<ComponentSelector>,
203    ) -> FilterCursor {
204        self.shared_buffer.cursor(mode, selectors)
205    }
206
207    /// Returns a log stream filtered to the specified selectors. If `selectors` is empty, all logs
208    /// are returned.
209    pub fn logs_cursor(
210        &self,
211        mode: StreamMode,
212        selectors: Vec<ComponentSelector>,
213    ) -> FilterCursorStream<LogsData> {
214        self.shared_buffer.cursor(mode, selectors).into()
215    }
216
217    /// Returns a log container.
218    ///
219    /// NOTE: This function does nothing to stop the container from being removed, so this is
220    /// currently only suitable for test code.
221    #[cfg(test)]
222    pub fn get_log_container(
223        self: &Arc<Self>,
224        identity: Arc<ComponentIdentity>,
225    ) -> Arc<LogsArtifactsContainer> {
226        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
227    }
228
229    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
230    /// After that, any pending Cursors will return Poll::Ready(None).
231    pub async fn wait_for_termination(&self) {
232        let Some(scope) = self.mutable_state.lock().scope.take() else {
233            error!("Attempted to terminate twice");
234            return;
235        };
236        scope.join().await;
237        // Process messages from log sink.
238        debug!("Log ingestion stopped.");
239        // Terminate the shared buffer first so that pending messages are processed before we
240        // terminate all the containers.
241        self.shared_buffer.terminate().await;
242        for container in self.mutable_state.lock().logs_data_store.values() {
243            container.terminate();
244        }
245    }
246
247    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
248    /// will be accepted when this is called and we'll proceed to terminate logs.
249    pub fn stop_accepting_new_log_sinks(&self) {
250        self.scope_handle.close();
251    }
252
253    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
254    /// ensure shared uniqueness of their connections.
255    pub fn new_interest_connection(&self) -> usize {
256        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
257    }
258
259    /// Updates log selectors associated with an interest connection.
260    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
261        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
262    }
263
264    /// Indicates that the connection associated with the given ID is now done.
265    pub fn finish_interest_connection(&self, connection_id: usize) {
266        self.mutable_state.lock().finish_interest_connection(connection_id);
267    }
268
269    fn on_container_inactive(&self, identity: &ComponentIdentity) {
270        let mut repo = self.mutable_state.lock();
271        if !repo.is_live(identity) {
272            repo.remove(identity);
273        }
274    }
275}
276
277#[cfg(test)]
278impl LogsRepository {
279    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
280        use crate::logs::shared_buffer::create_ring_buffer;
281
282        LogsRepository::new(
283            create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
284            std::iter::empty(),
285            &Default::default(),
286            scope,
287        )
288    }
289}
290
291impl EventConsumer for LogsRepository {
292    fn handle(self: Arc<Self>, event: Event) {
293        match event.payload {
294            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
295                component,
296                request_stream,
297            }) => {
298                debug!(identity:% = component; "LogSink requested.");
299                // NOTE: It is important that we hold the lock whilst we call
300                // `Container::handle_log_sink` because otherwise the container could be removed by
301                // `on_container_inactive`.  After calling `handle_log_sink`, the container cannot
302                // be removed until after the `LogSink` channel is closed.
303                let mut mutable_state = self.mutable_state.lock();
304                let container =
305                    mutable_state.get_log_container(component, &self.shared_buffer, &self);
306                container.handle_log_sink(request_stream, self.scope_handle.clone());
307            }
308            _ => unreachable!("Archivist state just subscribes to log sink requested"),
309        }
310    }
311}
312
313pub struct LogsRepositoryState {
314    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
315    inspect_node: inspect::Node,
316    global_analytics: GlobalAnalytics,
317
318    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
319    /// or through fuchsia.logger.LogSettings/SetInterest.
320    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
321
322    /// Whether or not we are draining the kernel log.
323    draining_klog: bool,
324
325    /// Scope where log ingestion tasks are running.
326    scope: Option<fasync::Scope>,
327
328    /// The initial log interests with which archivist was configured.
329    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
330}
331
332impl LogsRepositoryState {
333    fn new(
334        parent: &fuchsia_inspect::Node,
335        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
336        scope: fasync::Scope,
337    ) -> Self {
338        Self {
339            inspect_node: parent.create_child("log_sources"),
340            logs_data_store: HashMap::new(),
341            interest_registrations: BTreeMap::new(),
342            draining_klog: false,
343            initial_interests: initial_interests
344                .map(|ComponentInitialInterest { component, log_severity }| {
345                    (component, log_severity)
346                })
347                .collect(),
348            scope: Some(scope),
349            global_analytics: GlobalAnalytics::new(parent),
350        }
351    }
352
353    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
354    /// necessary.
355    pub fn get_log_container(
356        &mut self,
357        identity: Arc<ComponentIdentity>,
358        shared_buffer: &Arc<SharedBuffer>,
359        repo: &Arc<LogsRepository>,
360    ) -> Arc<LogsArtifactsContainer> {
361        match self.logs_data_store.get(&identity) {
362            None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
363            Some(existing) => Arc::clone(existing),
364        }
365    }
366
367    fn create_log_container(
368        &mut self,
369        identity: Arc<ComponentIdentity>,
370        shared_buffer: &Arc<SharedBuffer>,
371        repo: Weak<LogsRepository>,
372    ) -> Arc<LogsArtifactsContainer> {
373        let initial_interest = self.get_initial_interest(identity.as_ref());
374        let stats = Arc::new(LogStreamStats::new(&self.inspect_node, &identity));
375        let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
376        let container = Arc::new(LogsArtifactsContainer::new(
377            Arc::clone(&identity),
378            self.interest_registrations.values().flat_map(|s| s.iter()),
379            initial_interest,
380            stats,
381            buffer,
382            Some(Box::new(move |c| {
383                if let Some(repo) = repo.upgrade() {
384                    repo.on_container_inactive(&c.identity)
385                }
386            })),
387        ));
388        self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
389        container
390    }
391
392    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
393        let exact_url_severity =
394            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
395        let exact_moniker_severity =
396            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
397
398        let partial_severity = self
399            .initial_interests
400            .iter()
401            .filter_map(|(uom, severity)| match uom {
402                UrlOrMoniker::Partial(p) => {
403                    if identity.url.contains(p.as_str())
404                        || identity.moniker.to_string().contains(p.as_str())
405                    {
406                        Some(*severity)
407                    } else {
408                        None
409                    }
410                }
411                _ => None,
412            })
413            .min();
414
415        [exact_url_severity, exact_moniker_severity, partial_severity]
416            .into_iter()
417            .flatten()
418            .min()
419            .map(FidlSeverity::from)
420    }
421
422    fn is_live(&self, identity: &ComponentIdentity) -> bool {
423        match self.logs_data_store.get(identity) {
424            Some(container) => container.is_active(),
425            None => false,
426        }
427    }
428
429    /// Updates our own log interest if we are the root Archivist and logging
430    /// to klog.
431    fn maybe_update_own_logs_interest(
432        &mut self,
433        selectors: &[LogInterestSelector],
434        clear_interest: bool,
435    ) {
436        let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
437        let lowest_selector = selectors
438            .iter()
439            .filter(|selector| {
440                // If this is an embedded archivist, the wildcard selector "**" is used (in
441                // tests at least) to change the interest level on all components. Since
442                // archivist logs to itself, this creates a situation where archivist logs can
443                // get included which is undesirable in the vast majority of cases. To address
444                // this, we prevent the global wildcard pattern "**" and "*" from matching
445                // archivist. This is clearly a bit of a hack, but it is balanced by it being
446                // the behavior that the vast majority of users will want. It is still possible
447                // to change the interest level for archivist by using an exact match. Note that
448                // this will apply to both the embedded and system archivist to keep things
449                // consistent.
450                if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
451                    matches!(
452                        &s[..],
453                        [StringSelector::StringPattern(s)] if s == "**" || s == "*"
454                    )
455                }) {
456                    return false;
457                }
458
459                moniker.matches_component_selector(&selector.selector).unwrap_or(false)
460            })
461            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
462        if let Some(selector) = lowest_selector {
463            if clear_interest {
464                log::set_max_level(LevelFilter::Info);
465            } else {
466                log::set_max_level(
467                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
468                        FidlSeverity::Trace => LevelFilter::Trace,
469                        FidlSeverity::Debug => LevelFilter::Debug,
470                        FidlSeverity::Info => LevelFilter::Info,
471                        FidlSeverity::Warn => LevelFilter::Warn,
472                        FidlSeverity::Error => LevelFilter::Error,
473                        // Log has no "Fatal" level, so set it to Error
474                        // instead.
475                        FidlSeverity::Fatal => LevelFilter::Error,
476                        FidlSeverity::__SourceBreaking { .. } => return,
477                    },
478                );
479            }
480        }
481    }
482
483    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
484        self.maybe_update_own_logs_interest(&selectors, false);
485        let previous_selectors =
486            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
487        // unwrap safe, we just inserted.
488        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
489        for logs_data in self.logs_data_store.values() {
490            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
491        }
492    }
493
494    pub fn finish_interest_connection(&mut self, connection_id: usize) {
495        let selectors = self.interest_registrations.remove(&connection_id);
496        if let Some(selectors) = selectors {
497            self.maybe_update_own_logs_interest(&selectors, true);
498            for logs_data in self.logs_data_store.values() {
499                logs_data.reset_interest(&selectors);
500            }
501        }
502    }
503
504    pub fn remove(&mut self, identity: &ComponentIdentity) {
505        self.logs_data_store.remove(identity);
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use crate::logs::shared_buffer::create_ring_buffer;
513    use crate::logs::testing::make_message;
514    use fidl_fuchsia_diagnostics::StreamMode;
515    use fidl_fuchsia_logger::LogSinkMarker;
516    use fuchsia_inspect::Inspector;
517    use moniker::ExtendedMoniker;
518    use ring_buffer::MAX_MESSAGE_SIZE;
519    use selectors::{FastError, SelectorExt};
520    use std::time::Duration;
521
522    #[fuchsia::test]
523    async fn data_repo_filters_logs_by_selectors() {
524        let repo = LogsRepository::for_test(fasync::Scope::new());
525        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
526            ExtendedMoniker::parse_str("./foo").unwrap(),
527            "fuchsia-pkg://foo",
528        )));
529        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
530            ExtendedMoniker::parse_str("./bar").unwrap(),
531            "fuchsia-pkg://bar",
532        )));
533
534        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
535        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
536        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
537
538        let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
539
540        let results =
541            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
542        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
543
544        let filtered_stream = repo.logs_cursor(
545            StreamMode::Snapshot,
546            vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
547        );
548
549        let results =
550            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
551        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
552    }
553
554    #[fuchsia::test]
555    async fn data_repo_correctly_sets_initial_interests() {
556        let repo = LogsRepository::new(
557            create_ring_buffer(100000),
558            [
559                ComponentInitialInterest {
560                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
561                    log_severity: Severity::Info,
562                },
563                ComponentInitialInterest {
564                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
565                    log_severity: Severity::Warn,
566                },
567                ComponentInitialInterest {
568                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
569                    log_severity: Severity::Error,
570                },
571                ComponentInitialInterest {
572                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
573                    log_severity: Severity::Debug,
574                },
575            ]
576            .into_iter(),
577            &fuchsia_inspect::Node::default(),
578            fasync::Scope::new(),
579        );
580
581        // We have the moniker configured, use the associated severity.
582        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
583            ExtendedMoniker::parse_str("core/foo").unwrap(),
584            "fuchsia-pkg://foo",
585        )));
586        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
587            .await;
588
589        // We have the URL configure, use the associated severity.
590        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
591            ExtendedMoniker::parse_str("core/baz").unwrap(),
592            "fuchsia-pkg://baz",
593        )));
594        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
595            .await;
596
597        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
598        // for the URL over Error for the moniker.
599        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
600            ExtendedMoniker::parse_str("core/bar").unwrap(),
601            "fuchsia-pkg://bar",
602        )));
603        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
604            .await;
605
606        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
607        // severity isn't set.
608        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
609            ExtendedMoniker::parse_str("core/quux").unwrap(),
610            "fuchsia-pkg://quux",
611        )));
612        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
613    }
614
615    #[fuchsia::test]
616    async fn data_repo_correctly_handles_partial_matching() {
617        let repo = LogsRepository::new(
618            create_ring_buffer(100000),
619            [
620                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
621                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
622                "/core/bust:DEBUG".parse(),
623                "core/bar:ERROR".parse(),
624                "foo:DEBUG".parse(),
625                "both:TRACE".parse(),
626            ]
627            .into_iter()
628            .map(Result::unwrap),
629            &fuchsia_inspect::Node::default(),
630            fasync::Scope::new(),
631        );
632
633        // We have a partial moniker configured, use the associated severity.
634        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
635            ExtendedMoniker::parse_str("core/foo").unwrap(),
636            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
637        )));
638        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
639            .await;
640
641        // We have a partial url configured, use the associated severity.
642        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
643            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
644            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
645        )));
646        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
647            .await;
648
649        // We have the URL configure, use the associated severity.
650        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
651            ExtendedMoniker::parse_str("core/baz").unwrap(),
652            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
653        )));
654        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
655            .await;
656
657        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
658        // for the URL over Error for the moniker.
659        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
660            ExtendedMoniker::parse_str("core/bar").unwrap(),
661            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
662        )));
663        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
664            .await;
665
666        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
667        // severity isn't set.
668        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
669            ExtendedMoniker::parse_str("core/quux").unwrap(),
670            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
671        )));
672        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
673
674        // We have a partial match for both moniker and url, should still work.
675        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
676            ExtendedMoniker::parse_str("core/both").unwrap(),
677            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
678        )));
679        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
680            .await;
681
682        // Exact moniker match should not match sub-monikers.
683        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
684            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
685            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
686        )));
687        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
688    }
689
690    async fn expect_initial_interest(
691        expected_severity: Option<FidlSeverity>,
692        container: Arc<LogsArtifactsContainer>,
693        scope: fasync::ScopeHandle,
694    ) {
695        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
696        container.handle_log_sink(stream, scope);
697        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
698        assert_eq!(initial_interest.min_severity, expected_severity);
699    }
700
701    #[fuchsia::test]
702    async fn inspect_node_cleaned_up_on_roll_out() {
703        let inspector = Inspector::default();
704        let repo = LogsRepository::new(
705            create_ring_buffer(MAX_MESSAGE_SIZE),
706            std::iter::empty(),
707            inspector.root(),
708            fasync::Scope::new(),
709        );
710
711        let identity_foo = Arc::new(ComponentIdentity::new(
712            ExtendedMoniker::parse_str("./foo").unwrap(),
713            "fuchsia-pkg://foo",
714        ));
715
716        // Create container A
717        let container_foo = repo.get_log_container(Arc::clone(&identity_foo));
718        container_foo.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
719
720        // Force SharedBuffer to scan messages and update msg_ids.end.
721        // Without this, ContainerInfo::is_active() evaluates to false immediately after
722        // mark_stopped(), causing repo.is_live() to return false before rollout happens.
723        let _cursor = repo.logs_cursor(
724            StreamMode::Subscribe,
725            vec![identity_foo.moniker.clone().into_component_selector()],
726        );
727
728        container_foo.mark_stopped();
729        drop(container_foo);
730
731        // Verify stats exist
732        let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
733        assert!(
734            hierarchy.get_child_by_path(&["log_sources", "foo"]).is_some(),
735            "foo stats must exist initially"
736        );
737
738        // Ingest messages for another component until foo is rolled out
739        let container_bar = repo.get_log_container(Arc::new(ComponentIdentity::new(
740            ExtendedMoniker::parse_str("./bar").unwrap(),
741            "fuchsia-pkg://bar",
742        )));
743
744        let large_str = "b".repeat(1000);
745        for i in 2..1000 {
746            container_bar.ingest_message(make_message(
747                &large_str,
748                None,
749                zx::BootInstant::from_nanos(i),
750            ));
751            fasync::Timer::new(Duration::from_millis(10)).await;
752            if !repo.mutable_state.lock().is_live(&identity_foo) {
753                break;
754            }
755        }
756
757        assert!(
758            !repo.mutable_state.lock().is_live(&identity_foo),
759            "foo container must be inactive after rollout"
760        );
761
762        // Verify stats are cleaned up
763        let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
764        assert!(
765            hierarchy.get_child_by_path(&["log_sources", "foo"]).is_none(),
766            "foo stats must be cleaned up after rollout"
767        );
768    }
769}