Skip to main content

archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursorStream, FxtMessage, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15    ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_async as fasync;
20use fuchsia_inspect as inspect;
21use fuchsia_inspect_derive::WithInspect;
22use fuchsia_sync::Mutex;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31
32// LINT.IfChange
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
34pub struct ComponentInitialInterest {
35    /// The URL or moniker for the component which should receive the initial interest.
36    component: UrlOrMoniker,
37    /// The log severity the initial interest should specify.
38    log_severity: Severity,
39}
40// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
41
42impl FromStr for ComponentInitialInterest {
43    type Err = anyhow::Error;
44    fn from_str(s: &str) -> Result<Self, Self::Err> {
45        let mut split = s.rsplitn(2, ":");
46        match (split.next(), split.next()) {
47            (Some(severity), Some(url_or_moniker)) => {
48                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
49                    return Err(format_err!("invalid url or moniker"));
50                };
51                let Ok(severity) = Severity::from_str(severity) else {
52                    return Err(format_err!("invalid severity"));
53                };
54                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
55            }
56            _ => Err(format_err!("invalid interest")),
57        }
58    }
59}
60
61#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
62pub enum UrlOrMoniker {
63    /// An absolute fuchsia url to a component.
64    Url(FlyStr),
65    /// The absolute moniker for a component.
66    Moniker(ExtendedMoniker),
67    /// A partial string to match against url or moniker.
68    Partial(FlyStr),
69}
70
71impl FromStr for UrlOrMoniker {
72    type Err = ();
73    fn from_str(s: &str) -> Result<Self, Self::Err> {
74        if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
75            || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
76        {
77            Ok(UrlOrMoniker::Url(s.into()))
78        } else if s.starts_with("/") {
79            if let Ok(moniker) = Moniker::from_str(s) {
80                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
81            } else {
82                Err(())
83            }
84        } else {
85            Ok(UrlOrMoniker::Partial(s.into()))
86        }
87    }
88}
89
90/// Static ID, used for persistent changes to interest settings.
91pub const STATIC_CONNECTION_ID: usize = 0;
92static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
93
94/// This is set when Archivist first starts. This might not be set for unit tests.
95pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
96
97/// The IOBuffer writer tag used for Archivist logs. We rely on the first container we create having
98/// this tag, so the Archivist container must be the first container created after creating the
99/// shared buffer. This tag is used before we create the container because we want to set up logging
100/// at the earliest possible moment after a binary starts.
101pub const ARCHIVIST_TAG: u64 = 0;
102
103/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
104/// [`pipeline::Pipeline`]s in a given Archivist instance.
105pub struct LogsRepository {
106    mutable_state: Mutex<LogsRepositoryState>,
107    shared_buffer: Arc<SharedBuffer>,
108    scope_handle: fasync::ScopeHandle,
109}
110
111impl LogsRepository {
112    pub fn new(
113        ring_buffer: ring_buffer::Reader,
114        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
115        parent: &fuchsia_inspect::Node,
116        scope: fasync::Scope,
117    ) -> Arc<Self> {
118        let scope_handle = scope.to_handle();
119        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
120            let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
121            let me_clone = Weak::clone(me);
122            let shared_buffer = SharedBuffer::new(
123                ring_buffer,
124                Box::new(move |identity| {
125                    if let Some(this) = me_clone.upgrade() {
126                        this.on_container_inactive(&identity);
127                    }
128                }),
129                Default::default(),
130                mutable_state.global_analytics.logs_node(),
131            );
132            if let Some(m) = ARCHIVIST_MONIKER.get() {
133                let archivist_container = mutable_state.create_log_container(
134                    Arc::new(ComponentIdentity::new(
135                        ExtendedMoniker::ComponentInstance(m.clone()),
136                        "fuchsia-pkg://UNKNOWN",
137                    )),
138                    &shared_buffer,
139                    Weak::clone(me),
140                );
141                // We rely on the first container we create ending up with the correct tag.
142                assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
143            }
144            LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
145        })
146    }
147
148    pub async fn flush(&self) {
149        self.shared_buffer.flush().await;
150    }
151
152    /// Drain the kernel's debug log. The returned future completes once
153    /// existing messages have been ingested.
154    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
155    where
156        K: DebugLog + Send + Sync + 'static,
157    {
158        let mut mutable_state = self.mutable_state.lock();
159
160        // We can only have one klog reader, if this is already set, it means we are already
161        // draining klog.
162        if mutable_state.draining_klog {
163            return;
164        }
165        mutable_state.draining_klog = true;
166
167        let container =
168            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
169        let Some(ref scope) = mutable_state.scope else {
170            return;
171        };
172        scope.spawn(async move {
173            debug!("Draining debuglog.");
174            let mut kernel_logger = DebugLogBridge::create(klog_reader);
175            let mut messages = match kernel_logger.existing_logs() {
176                Ok(messages) => messages,
177                Err(e) => {
178                    error!(e:%; "failed to read from kernel log, important logs may be missing");
179                    return;
180                }
181            };
182            messages.sort_by_key(|m| m.timestamp());
183            for message in messages {
184                container.ingest_message(message);
185            }
186
187            let res = kernel_logger
188                .listen()
189                .try_for_each(|message| async {
190                    container.ingest_message(message);
191                    Ok(())
192                })
193                .await;
194            if let Err(e) = res {
195                error!(e:%; "failed to drain kernel log, important logs may be missing");
196            }
197        });
198    }
199
200    pub fn logs_cursor_raw(
201        &self,
202        mode: StreamMode,
203        selectors: Vec<ComponentSelector>,
204    ) -> FilterCursorStream<FxtMessage> {
205        self.shared_buffer.cursor(mode, selectors).into()
206    }
207
208    /// Returns a log stream filtered to the specified selectors. If `selectors` is empty, all logs
209    /// are returned.
210    pub fn logs_cursor(
211        &self,
212        mode: StreamMode,
213        selectors: Vec<ComponentSelector>,
214    ) -> FilterCursorStream<LogsData> {
215        self.shared_buffer.cursor(mode, selectors).into()
216    }
217
218    /// Returns a log container.
219    ///
220    /// NOTE: This function does nothing to stop the container from being removed, so this is
221    /// currently only suitable for test code.
222    #[cfg(test)]
223    pub fn get_log_container(
224        self: &Arc<Self>,
225        identity: Arc<ComponentIdentity>,
226    ) -> Arc<LogsArtifactsContainer> {
227        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
228    }
229
230    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
231    /// After that, any pending Cursors will return Poll::Ready(None).
232    pub async fn wait_for_termination(&self) {
233        let Some(scope) = self.mutable_state.lock().scope.take() else {
234            error!("Attempted to terminate twice");
235            return;
236        };
237        scope.join().await;
238        // Process messages from log sink.
239        debug!("Log ingestion stopped.");
240        // Terminate the shared buffer first so that pending messages are processed before we
241        // terminate all the containers.
242        self.shared_buffer.terminate().await;
243        for container in self.mutable_state.lock().logs_data_store.values() {
244            container.terminate();
245        }
246    }
247
248    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
249    /// will be accepted when this is called and we'll proceed to terminate logs.
250    pub fn stop_accepting_new_log_sinks(&self) {
251        self.scope_handle.close();
252    }
253
254    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
255    /// ensure shared uniqueness of their connections.
256    pub fn new_interest_connection(&self) -> usize {
257        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
258    }
259
260    /// Updates log selectors associated with an interest connection.
261    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
262        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
263    }
264
265    /// Indicates that the connection associated with the given ID is now done.
266    pub fn finish_interest_connection(&self, connection_id: usize) {
267        self.mutable_state.lock().finish_interest_connection(connection_id);
268    }
269
270    fn on_container_inactive(&self, identity: &ComponentIdentity) {
271        let mut repo = self.mutable_state.lock();
272        if !repo.is_live(identity) {
273            repo.remove(identity);
274        }
275    }
276}
277
278#[cfg(test)]
279impl LogsRepository {
280    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
281        use crate::logs::shared_buffer::create_ring_buffer;
282
283        LogsRepository::new(
284            create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
285            std::iter::empty(),
286            &Default::default(),
287            scope,
288        )
289    }
290}
291
292impl EventConsumer for LogsRepository {
293    fn handle(self: Arc<Self>, event: Event) {
294        match event.payload {
295            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
296                component,
297                request_stream,
298            }) => {
299                debug!(identity:% = component; "LogSink requested.");
300                // NOTE: It is important that we hold the lock whilst we call
301                // `Container::handle_log_sink` because otherwise the container could be removed by
302                // `on_container_inactive`.  After calling `handle_log_sink`, the container cannot
303                // be removed until after the `LogSink` channel is closed.
304                let mut mutable_state = self.mutable_state.lock();
305                let container =
306                    mutable_state.get_log_container(component, &self.shared_buffer, &self);
307                container.handle_log_sink(request_stream, self.scope_handle.clone());
308            }
309            _ => unreachable!("Archivist state just subscribes to log sink requested"),
310        }
311    }
312}
313
314pub struct LogsRepositoryState {
315    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
316    inspect_node: inspect::Node,
317    global_analytics: GlobalAnalytics,
318
319    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
320    /// or through fuchsia.logger.LogSettings/SetInterest.
321    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
322
323    /// Whether or not we are draining the kernel log.
324    draining_klog: bool,
325
326    /// Scope where log ingestion tasks are running.
327    scope: Option<fasync::Scope>,
328
329    /// The initial log interests with which archivist was configured.
330    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
331}
332
333impl LogsRepositoryState {
334    fn new(
335        parent: &fuchsia_inspect::Node,
336        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
337        scope: fasync::Scope,
338    ) -> Self {
339        Self {
340            inspect_node: parent.create_child("log_sources"),
341            logs_data_store: HashMap::new(),
342            interest_registrations: BTreeMap::new(),
343            draining_klog: false,
344            initial_interests: initial_interests
345                .map(|ComponentInitialInterest { component, log_severity }| {
346                    (component, log_severity)
347                })
348                .collect(),
349            scope: Some(scope),
350            global_analytics: GlobalAnalytics::new(parent),
351        }
352    }
353
354    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
355    /// necessary.
356    pub fn get_log_container(
357        &mut self,
358        identity: Arc<ComponentIdentity>,
359        shared_buffer: &Arc<SharedBuffer>,
360        repo: &Arc<LogsRepository>,
361    ) -> Arc<LogsArtifactsContainer> {
362        match self.logs_data_store.get(&identity) {
363            None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
364            Some(existing) => Arc::clone(existing),
365        }
366    }
367
368    fn create_log_container(
369        &mut self,
370        identity: Arc<ComponentIdentity>,
371        shared_buffer: &Arc<SharedBuffer>,
372        repo: Weak<LogsRepository>,
373    ) -> Arc<LogsArtifactsContainer> {
374        let initial_interest = self.get_initial_interest(identity.as_ref());
375        let stats = LogStreamStats::default()
376            .with_inspect(&self.inspect_node, identity.moniker.as_ref())
377            .expect("failed to attach component log stats");
378        stats.set_url(&identity.url);
379        let stats = Arc::new(stats);
380        let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
381        let container = Arc::new(LogsArtifactsContainer::new(
382            Arc::clone(&identity),
383            self.interest_registrations.values().flat_map(|s| s.iter()),
384            initial_interest,
385            stats,
386            buffer,
387            Some(Box::new(move |c| {
388                if let Some(repo) = repo.upgrade() {
389                    repo.on_container_inactive(&c.identity)
390                }
391            })),
392        ));
393        self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
394        container
395    }
396
397    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
398        let exact_url_severity =
399            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
400        let exact_moniker_severity =
401            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
402
403        let partial_severity = self
404            .initial_interests
405            .iter()
406            .filter_map(|(uom, severity)| match uom {
407                UrlOrMoniker::Partial(p) => {
408                    if identity.url.contains(p.as_str())
409                        || identity.moniker.to_string().contains(p.as_str())
410                    {
411                        Some(*severity)
412                    } else {
413                        None
414                    }
415                }
416                _ => None,
417            })
418            .min();
419
420        [exact_url_severity, exact_moniker_severity, partial_severity]
421            .into_iter()
422            .flatten()
423            .min()
424            .map(FidlSeverity::from)
425    }
426
427    fn is_live(&self, identity: &ComponentIdentity) -> bool {
428        match self.logs_data_store.get(identity) {
429            Some(container) => container.is_active(),
430            None => false,
431        }
432    }
433
434    /// Updates our own log interest if we are the root Archivist and logging
435    /// to klog.
436    fn maybe_update_own_logs_interest(
437        &mut self,
438        selectors: &[LogInterestSelector],
439        clear_interest: bool,
440    ) {
441        let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
442        let lowest_selector = selectors
443            .iter()
444            .filter(|selector| {
445                // If this is an embedded archivist, the wildcard selector "**" is used (in
446                // tests at least) to change the interest level on all components. Since
447                // archivist logs to itself, this creates a situation where archivist logs can
448                // get included which is undesirable in the vast majority of cases. To address
449                // this, we prevent the global wildcard pattern "**" and "*" from matching
450                // archivist. This is clearly a bit of a hack, but it is balanced by it being
451                // the behavior that the vast majority of users will want. It is still possible
452                // to change the interest level for archivist by using an exact match. Note that
453                // this will apply to both the embedded and system archivist to keep things
454                // consistent.
455                if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
456                    matches!(
457                        &s[..],
458                        [StringSelector::StringPattern(s)] if s == "**" || s == "*"
459                    )
460                }) {
461                    return false;
462                }
463
464                moniker.matches_component_selector(&selector.selector).unwrap_or(false)
465            })
466            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
467        if let Some(selector) = lowest_selector {
468            if clear_interest {
469                log::set_max_level(LevelFilter::Info);
470            } else {
471                log::set_max_level(
472                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
473                        FidlSeverity::Trace => LevelFilter::Trace,
474                        FidlSeverity::Debug => LevelFilter::Debug,
475                        FidlSeverity::Info => LevelFilter::Info,
476                        FidlSeverity::Warn => LevelFilter::Warn,
477                        FidlSeverity::Error => LevelFilter::Error,
478                        // Log has no "Fatal" level, so set it to Error
479                        // instead.
480                        FidlSeverity::Fatal => LevelFilter::Error,
481                        FidlSeverity::__SourceBreaking { .. } => return,
482                    },
483                );
484            }
485        }
486    }
487
488    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
489        self.maybe_update_own_logs_interest(&selectors, false);
490        let previous_selectors =
491            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
492        // unwrap safe, we just inserted.
493        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
494        for logs_data in self.logs_data_store.values() {
495            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
496        }
497    }
498
499    pub fn finish_interest_connection(&mut self, connection_id: usize) {
500        let selectors = self.interest_registrations.remove(&connection_id);
501        if let Some(selectors) = selectors {
502            self.maybe_update_own_logs_interest(&selectors, true);
503            for logs_data in self.logs_data_store.values() {
504                logs_data.reset_interest(&selectors);
505            }
506        }
507    }
508
509    pub fn remove(&mut self, identity: &ComponentIdentity) {
510        self.logs_data_store.remove(identity);
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use crate::logs::shared_buffer::create_ring_buffer;
518    use crate::logs::testing::make_message;
519    use fidl_fuchsia_logger::LogSinkMarker;
520
521    use moniker::ExtendedMoniker;
522    use selectors::FastError;
523
524    #[fuchsia::test]
525    async fn data_repo_filters_logs_by_selectors() {
526        let repo = LogsRepository::for_test(fasync::Scope::new());
527        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
528            ExtendedMoniker::parse_str("./foo").unwrap(),
529            "fuchsia-pkg://foo",
530        )));
531        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
532            ExtendedMoniker::parse_str("./bar").unwrap(),
533            "fuchsia-pkg://bar",
534        )));
535
536        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
537        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
538        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
539
540        let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
541
542        let results =
543            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
544        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
545
546        let filtered_stream = repo.logs_cursor(
547            StreamMode::Snapshot,
548            vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
549        );
550
551        let results =
552            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
553        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
554    }
555
556    #[fuchsia::test]
557    async fn data_repo_correctly_sets_initial_interests() {
558        let repo = LogsRepository::new(
559            create_ring_buffer(100000),
560            [
561                ComponentInitialInterest {
562                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
563                    log_severity: Severity::Info,
564                },
565                ComponentInitialInterest {
566                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
567                    log_severity: Severity::Warn,
568                },
569                ComponentInitialInterest {
570                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
571                    log_severity: Severity::Error,
572                },
573                ComponentInitialInterest {
574                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
575                    log_severity: Severity::Debug,
576                },
577            ]
578            .into_iter(),
579            &fuchsia_inspect::Node::default(),
580            fasync::Scope::new(),
581        );
582
583        // We have the moniker configured, use the associated severity.
584        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
585            ExtendedMoniker::parse_str("core/foo").unwrap(),
586            "fuchsia-pkg://foo",
587        )));
588        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
589            .await;
590
591        // We have the URL configure, use the associated severity.
592        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
593            ExtendedMoniker::parse_str("core/baz").unwrap(),
594            "fuchsia-pkg://baz",
595        )));
596        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
597            .await;
598
599        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
600        // for the URL over Error for the moniker.
601        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
602            ExtendedMoniker::parse_str("core/bar").unwrap(),
603            "fuchsia-pkg://bar",
604        )));
605        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
606            .await;
607
608        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
609        // severity isn't set.
610        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
611            ExtendedMoniker::parse_str("core/quux").unwrap(),
612            "fuchsia-pkg://quux",
613        )));
614        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
615    }
616
617    #[fuchsia::test]
618    async fn data_repo_correctly_handles_partial_matching() {
619        let repo = LogsRepository::new(
620            create_ring_buffer(100000),
621            [
622                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
623                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
624                "/core/bust:DEBUG".parse(),
625                "core/bar:ERROR".parse(),
626                "foo:DEBUG".parse(),
627                "both:TRACE".parse(),
628            ]
629            .into_iter()
630            .map(Result::unwrap),
631            &fuchsia_inspect::Node::default(),
632            fasync::Scope::new(),
633        );
634
635        // We have a partial moniker configured, use the associated severity.
636        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
637            ExtendedMoniker::parse_str("core/foo").unwrap(),
638            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
639        )));
640        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
641            .await;
642
643        // We have a partial url configured, use the associated severity.
644        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
645            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
646            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
647        )));
648        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
649            .await;
650
651        // We have the URL configure, use the associated severity.
652        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
653            ExtendedMoniker::parse_str("core/baz").unwrap(),
654            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
655        )));
656        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
657            .await;
658
659        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
660        // for the URL over Error for the moniker.
661        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
662            ExtendedMoniker::parse_str("core/bar").unwrap(),
663            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
664        )));
665        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
666            .await;
667
668        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
669        // severity isn't set.
670        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
671            ExtendedMoniker::parse_str("core/quux").unwrap(),
672            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
673        )));
674        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
675
676        // We have a partial match for both moniker and url, should still work.
677        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
678            ExtendedMoniker::parse_str("core/both").unwrap(),
679            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
680        )));
681        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
682            .await;
683
684        // Exact moniker match should not match sub-monikers.
685        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
686            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
687            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
688        )));
689        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
690    }
691
692    async fn expect_initial_interest(
693        expected_severity: Option<FidlSeverity>,
694        container: Arc<LogsArtifactsContainer>,
695        scope: fasync::ScopeHandle,
696    ) {
697        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
698        container.handle_log_sink(stream, scope);
699        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
700        assert_eq!(initial_interest.min_severity, expected_severity);
701    }
702}