archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursorStream, FxtMessage, SharedBuffer};
11use crate::logs::stats::LogStreamStats;
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15    ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_inspect_derive::WithInspect;
20use fuchsia_sync::Mutex;
21use fuchsia_url::AbsoluteComponentUrl;
22use fuchsia_url::boot_url::BootUrl;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect};
32
33// LINT.IfChange
34#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36    /// The URL or moniker for the component which should receive the initial interest.
37    component: UrlOrMoniker,
38    /// The log severity the initial interest should specify.
39    log_severity: Severity,
40}
41// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
42
43impl FromStr for ComponentInitialInterest {
44    type Err = anyhow::Error;
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        let mut split = s.rsplitn(2, ":");
47        match (split.next(), split.next()) {
48            (Some(severity), Some(url_or_moniker)) => {
49                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50                    return Err(format_err!("invalid url or moniker"));
51                };
52                let Ok(severity) = Severity::from_str(severity) else {
53                    return Err(format_err!("invalid severity"));
54                };
55                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56            }
57            _ => Err(format_err!("invalid interest")),
58        }
59    }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64    /// An absolute fuchsia url to a component.
65    Url(FlyStr),
66    /// The absolute moniker for a component.
67    Moniker(ExtendedMoniker),
68    /// A partial string to match against url or moniker.
69    Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73    type Err = ();
74    fn from_str(s: &str) -> Result<Self, Self::Err> {
75        if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76            Ok(UrlOrMoniker::Url(s.into()))
77        } else if s.starts_with("/") {
78            if let Ok(moniker) = Moniker::from_str(s) {
79                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80            } else {
81                Err(())
82            }
83        } else {
84            Ok(UrlOrMoniker::Partial(s.into()))
85        }
86    }
87}
88
89/// Static ID, used for persistent changes to interest settings.
90pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93/// This is set when Archivist first starts. This might not be set for unit tests.
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96/// The IOBuffer writer tag used for Archivist logs. We rely on the first container we create having
97/// this tag, so the Archivist container must be the first container created after creating the
98/// shared buffer. This tag is used before we create the container because we want to set up logging
99/// at the earliest possible moment after a binary starts.
100pub const ARCHIVIST_TAG: u64 = 0;
101
102/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
103/// [`pipeline::Pipeline`]s in a given Archivist instance.
104pub struct LogsRepository {
105    mutable_state: Mutex<LogsRepositoryState>,
106    shared_buffer: Arc<SharedBuffer>,
107    scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111    pub fn new(
112        ring_buffer: ring_buffer::Reader,
113        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114        parent: &fuchsia_inspect::Node,
115        scope: fasync::Scope,
116    ) -> Arc<Self> {
117        let scope_handle = scope.to_handle();
118        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119            let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120            let me_clone = Weak::clone(me);
121            let shared_buffer = SharedBuffer::new(
122                ring_buffer,
123                Box::new(move |identity| {
124                    if let Some(this) = me_clone.upgrade() {
125                        this.on_container_inactive(&identity);
126                    }
127                }),
128                Default::default(),
129            );
130            if let Some(m) = ARCHIVIST_MONIKER.get() {
131                let archivist_container = mutable_state.create_log_container(
132                    Arc::new(ComponentIdentity::new(
133                        ExtendedMoniker::ComponentInstance(m.clone()),
134                        "fuchsia-pkg://UNKNOWN",
135                    )),
136                    &shared_buffer,
137                    Weak::clone(me),
138                );
139                // We rely on the first container we create ending up with the correct tag.
140                assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
141            }
142            LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
143        })
144    }
145
146    pub async fn flush(&self) {
147        self.shared_buffer.flush().await;
148    }
149
150    /// Drain the kernel's debug log. The returned future completes once
151    /// existing messages have been ingested.
152    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
153    where
154        K: DebugLog + Send + Sync + 'static,
155    {
156        let mut mutable_state = self.mutable_state.lock();
157
158        // We can only have one klog reader, if this is already set, it means we are already
159        // draining klog.
160        if mutable_state.draining_klog {
161            return;
162        }
163        mutable_state.draining_klog = true;
164
165        let container =
166            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
167        let Some(ref scope) = mutable_state.scope else {
168            return;
169        };
170        scope.spawn(async move {
171            debug!("Draining debuglog.");
172            let mut kernel_logger = DebugLogBridge::create(klog_reader);
173            let mut messages = match kernel_logger.existing_logs() {
174                Ok(messages) => messages,
175                Err(e) => {
176                    error!(e:%; "failed to read from kernel log, important logs may be missing");
177                    return;
178                }
179            };
180            messages.sort_by_key(|m| m.timestamp());
181            for message in messages {
182                container.ingest_message(message);
183            }
184
185            let res = kernel_logger
186                .listen()
187                .try_for_each(|message| async {
188                    container.ingest_message(message);
189                    Ok(())
190                })
191                .await;
192            if let Err(e) = res {
193                error!(e:%; "failed to drain kernel log, important logs may be missing");
194            }
195        });
196    }
197
198    pub fn logs_cursor_raw(
199        &self,
200        mode: StreamMode,
201        selectors: Vec<ComponentSelector>,
202    ) -> FilterCursorStream<FxtMessage> {
203        self.shared_buffer.cursor(mode, selectors).into()
204    }
205
206    /// Returns a log stream filtered to the specified selectors. If `selectors` is empty, all logs
207    /// are returned.
208    pub fn logs_cursor(
209        &self,
210        mode: StreamMode,
211        selectors: Vec<ComponentSelector>,
212    ) -> FilterCursorStream<LogsData> {
213        self.shared_buffer.cursor(mode, selectors).into()
214    }
215
216    /// Returns a log container.
217    ///
218    /// NOTE: This function does nothing to stop the container from being removed, so this is
219    /// currently only suitable for test code.
220    #[cfg(test)]
221    pub fn get_log_container(
222        self: &Arc<Self>,
223        identity: Arc<ComponentIdentity>,
224    ) -> Arc<LogsArtifactsContainer> {
225        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
226    }
227
228    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
229    /// After that, any pending Cursors will return Poll::Ready(None).
230    pub async fn wait_for_termination(&self) {
231        let Some(scope) = self.mutable_state.lock().scope.take() else {
232            error!("Attempted to terminate twice");
233            return;
234        };
235        scope.join().await;
236        // Process messages from log sink.
237        debug!("Log ingestion stopped.");
238        // Terminate the shared buffer first so that pending messages are processed before we
239        // terminate all the containers.
240        self.shared_buffer.terminate().await;
241        for container in self.mutable_state.lock().logs_data_store.values() {
242            container.terminate();
243        }
244    }
245
246    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
247    /// will be accepted when this is called and we'll proceed to terminate logs.
248    pub fn stop_accepting_new_log_sinks(&self) {
249        self.scope_handle.close();
250    }
251
252    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
253    /// ensure shared uniqueness of their connections.
254    pub fn new_interest_connection(&self) -> usize {
255        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
256    }
257
258    /// Updates log selectors associated with an interest connection.
259    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
260        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
261    }
262
263    /// Indicates that the connection associated with the given ID is now done.
264    pub fn finish_interest_connection(&self, connection_id: usize) {
265        self.mutable_state.lock().finish_interest_connection(connection_id);
266    }
267
268    fn on_container_inactive(&self, identity: &ComponentIdentity) {
269        let mut repo = self.mutable_state.lock();
270        if !repo.is_live(identity) {
271            repo.remove(identity);
272        }
273    }
274}
275
276#[cfg(test)]
277impl LogsRepository {
278    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
279        use crate::logs::shared_buffer::create_ring_buffer;
280
281        LogsRepository::new(
282            create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
283            std::iter::empty(),
284            &Default::default(),
285            scope,
286        )
287    }
288}
289
290impl EventConsumer for LogsRepository {
291    fn handle(self: Arc<Self>, event: Event) {
292        match event.payload {
293            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
294                component,
295                request_stream,
296            }) => {
297                debug!(identity:% = component; "LogSink requested.");
298                // NOTE: It is important that we hold the lock whilst we call
299                // `Container::handle_log_sink` because otherwise the container could be removed by
300                // `on_container_inactive`.  After calling `handle_log_sink`, the container cannot
301                // be removed until after the `LogSink` channel is closed.
302                let mut mutable_state = self.mutable_state.lock();
303                let container =
304                    mutable_state.get_log_container(component, &self.shared_buffer, &self);
305                container.handle_log_sink(request_stream, self.scope_handle.clone());
306            }
307            _ => unreachable!("Archivist state just subscribes to log sink requested"),
308        }
309    }
310}
311
312pub struct LogsRepositoryState {
313    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
314    inspect_node: inspect::Node,
315
316    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
317    /// or through fuchsia.logger.LogSettings/SetInterest.
318    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
319
320    /// Whether or not we are draining the kernel log.
321    draining_klog: bool,
322
323    /// Scope where log ingestion tasks are running.
324    scope: Option<fasync::Scope>,
325
326    /// The initial log interests with which archivist was configured.
327    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
328}
329
330impl LogsRepositoryState {
331    fn new(
332        parent: &fuchsia_inspect::Node,
333        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
334        scope: fasync::Scope,
335    ) -> Self {
336        Self {
337            inspect_node: parent.create_child("log_sources"),
338            logs_data_store: HashMap::new(),
339            interest_registrations: BTreeMap::new(),
340            draining_klog: false,
341            initial_interests: initial_interests
342                .map(|ComponentInitialInterest { component, log_severity }| {
343                    (component, log_severity)
344                })
345                .collect(),
346            scope: Some(scope),
347        }
348    }
349
350    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
351    /// necessary.
352    pub fn get_log_container(
353        &mut self,
354        identity: Arc<ComponentIdentity>,
355        shared_buffer: &Arc<SharedBuffer>,
356        repo: &Arc<LogsRepository>,
357    ) -> Arc<LogsArtifactsContainer> {
358        match self.logs_data_store.get(&identity) {
359            None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
360            Some(existing) => Arc::clone(existing),
361        }
362    }
363
364    fn create_log_container(
365        &mut self,
366        identity: Arc<ComponentIdentity>,
367        shared_buffer: &Arc<SharedBuffer>,
368        repo: Weak<LogsRepository>,
369    ) -> Arc<LogsArtifactsContainer> {
370        let initial_interest = self.get_initial_interest(identity.as_ref());
371        let stats = LogStreamStats::default()
372            .with_inspect(&self.inspect_node, identity.moniker.as_ref())
373            .expect("failed to attach component log stats");
374        stats.set_url(&identity.url);
375        let stats = Arc::new(stats);
376        let container = Arc::new(LogsArtifactsContainer::new(
377            Arc::clone(&identity),
378            self.interest_registrations.values().flat_map(|s| s.iter()),
379            initial_interest,
380            Arc::clone(&stats),
381            shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
382            Some(Box::new(move |c| {
383                if let Some(repo) = repo.upgrade() {
384                    repo.on_container_inactive(&c.identity)
385                }
386            })),
387        ));
388        self.logs_data_store.insert(identity, Arc::clone(&container));
389        container
390    }
391
392    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
393        let exact_url_severity =
394            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
395        let exact_moniker_severity =
396            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
397
398        let partial_severity = self
399            .initial_interests
400            .iter()
401            .filter_map(|(uom, severity)| match uom {
402                UrlOrMoniker::Partial(p) => {
403                    if identity.url.contains(p.as_str())
404                        || identity.moniker.to_string().contains(p.as_str())
405                    {
406                        Some(*severity)
407                    } else {
408                        None
409                    }
410                }
411                _ => None,
412            })
413            .min();
414
415        [exact_url_severity, exact_moniker_severity, partial_severity]
416            .into_iter()
417            .flatten()
418            .min()
419            .map(FidlSeverity::from)
420    }
421
422    fn is_live(&self, identity: &ComponentIdentity) -> bool {
423        match self.logs_data_store.get(identity) {
424            Some(container) => container.is_active(),
425            None => false,
426        }
427    }
428
429    /// Updates our own log interest if we are the root Archivist and logging
430    /// to klog.
431    fn maybe_update_own_logs_interest(
432        &mut self,
433        selectors: &[LogInterestSelector],
434        clear_interest: bool,
435    ) {
436        let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
437        let lowest_selector = selectors
438            .iter()
439            .filter(|selector| {
440                // If this is an embedded archivist, the wildcard selector "**" is used (in
441                // tests at least) to change the interest level on all components. Since
442                // archivist logs to itself, this creates a situation where archivist logs can
443                // get included which is undesirable in the vast majority of cases. To address
444                // this, we prevent the global wildcard pattern "**" and "*" from matching
445                // archivist. This is clearly a bit of a hack, but it is balanced by it being
446                // the behavior that the vast majority of users will want. It is still possible
447                // to change the interest level for archivist by using an exact match. Note that
448                // this will apply to both the embedded and system archivist to keep things
449                // consistent.
450                if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
451                    matches!(
452                        &s[..],
453                        [StringSelector::StringPattern(s)] if s == "**" || s == "*"
454                    )
455                }) {
456                    return false;
457                }
458
459                moniker.matches_component_selector(&selector.selector).unwrap_or(false)
460            })
461            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
462        if let Some(selector) = lowest_selector {
463            if clear_interest {
464                log::set_max_level(LevelFilter::Info);
465            } else {
466                log::set_max_level(
467                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
468                        FidlSeverity::Trace => LevelFilter::Trace,
469                        FidlSeverity::Debug => LevelFilter::Debug,
470                        FidlSeverity::Info => LevelFilter::Info,
471                        FidlSeverity::Warn => LevelFilter::Warn,
472                        FidlSeverity::Error => LevelFilter::Error,
473                        // Log has no "Fatal" level, so set it to Error
474                        // instead.
475                        FidlSeverity::Fatal => LevelFilter::Error,
476                        FidlSeverity::__SourceBreaking { .. } => return,
477                    },
478                );
479            }
480        }
481    }
482
483    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
484        self.maybe_update_own_logs_interest(&selectors, false);
485        let previous_selectors =
486            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
487        // unwrap safe, we just inserted.
488        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
489        for logs_data in self.logs_data_store.values() {
490            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
491        }
492    }
493
494    pub fn finish_interest_connection(&mut self, connection_id: usize) {
495        let selectors = self.interest_registrations.remove(&connection_id);
496        if let Some(selectors) = selectors {
497            self.maybe_update_own_logs_interest(&selectors, true);
498            for logs_data in self.logs_data_store.values() {
499                logs_data.reset_interest(&selectors);
500            }
501        }
502    }
503
504    pub fn remove(&mut self, identity: &ComponentIdentity) {
505        self.logs_data_store.remove(identity);
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use crate::logs::shared_buffer::create_ring_buffer;
513    use crate::logs::testing::make_message;
514    use fidl_fuchsia_logger::LogSinkMarker;
515
516    use moniker::ExtendedMoniker;
517    use selectors::FastError;
518
519    #[fuchsia::test]
520    async fn data_repo_filters_logs_by_selectors() {
521        let repo = LogsRepository::for_test(fasync::Scope::new());
522        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
523            ExtendedMoniker::parse_str("./foo").unwrap(),
524            "fuchsia-pkg://foo",
525        )));
526        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527            ExtendedMoniker::parse_str("./bar").unwrap(),
528            "fuchsia-pkg://bar",
529        )));
530
531        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
532        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
533        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
534
535        let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
536
537        let results =
538            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
539        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
540
541        let filtered_stream = repo.logs_cursor(
542            StreamMode::Snapshot,
543            vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
544        );
545
546        let results =
547            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
548        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
549    }
550
551    #[fuchsia::test]
552    async fn data_repo_correctly_sets_initial_interests() {
553        let repo = LogsRepository::new(
554            create_ring_buffer(100000),
555            [
556                ComponentInitialInterest {
557                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
558                    log_severity: Severity::Info,
559                },
560                ComponentInitialInterest {
561                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
562                    log_severity: Severity::Warn,
563                },
564                ComponentInitialInterest {
565                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
566                    log_severity: Severity::Error,
567                },
568                ComponentInitialInterest {
569                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
570                    log_severity: Severity::Debug,
571                },
572            ]
573            .into_iter(),
574            &fuchsia_inspect::Node::default(),
575            fasync::Scope::new(),
576        );
577
578        // We have the moniker configured, use the associated severity.
579        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
580            ExtendedMoniker::parse_str("core/foo").unwrap(),
581            "fuchsia-pkg://foo",
582        )));
583        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
584            .await;
585
586        // We have the URL configure, use the associated severity.
587        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
588            ExtendedMoniker::parse_str("core/baz").unwrap(),
589            "fuchsia-pkg://baz",
590        )));
591        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
592            .await;
593
594        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
595        // for the URL over Error for the moniker.
596        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597            ExtendedMoniker::parse_str("core/bar").unwrap(),
598            "fuchsia-pkg://bar",
599        )));
600        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
601            .await;
602
603        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
604        // severity isn't set.
605        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
606            ExtendedMoniker::parse_str("core/quux").unwrap(),
607            "fuchsia-pkg://quux",
608        )));
609        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
610    }
611
612    #[fuchsia::test]
613    async fn data_repo_correctly_handles_partial_matching() {
614        let repo = LogsRepository::new(
615            create_ring_buffer(100000),
616            [
617                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
618                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
619                "/core/bust:DEBUG".parse(),
620                "core/bar:ERROR".parse(),
621                "foo:DEBUG".parse(),
622                "both:TRACE".parse(),
623            ]
624            .into_iter()
625            .map(Result::unwrap),
626            &fuchsia_inspect::Node::default(),
627            fasync::Scope::new(),
628        );
629
630        // We have a partial moniker configured, use the associated severity.
631        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
632            ExtendedMoniker::parse_str("core/foo").unwrap(),
633            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
634        )));
635        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
636            .await;
637
638        // We have a partial url configured, use the associated severity.
639        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
640            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
641            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
642        )));
643        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
644            .await;
645
646        // We have the URL configure, use the associated severity.
647        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
648            ExtendedMoniker::parse_str("core/baz").unwrap(),
649            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
650        )));
651        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
652            .await;
653
654        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
655        // for the URL over Error for the moniker.
656        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
657            ExtendedMoniker::parse_str("core/bar").unwrap(),
658            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
659        )));
660        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
661            .await;
662
663        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
664        // severity isn't set.
665        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
666            ExtendedMoniker::parse_str("core/quux").unwrap(),
667            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
668        )));
669        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
670
671        // We have a partial match for both moniker and url, should still work.
672        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
673            ExtendedMoniker::parse_str("core/both").unwrap(),
674            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
675        )));
676        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
677            .await;
678
679        // Exact moniker match should not match sub-monikers.
680        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
681            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
682            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
683        )));
684        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
685    }
686
687    async fn expect_initial_interest(
688        expected_severity: Option<FidlSeverity>,
689        container: Arc<LogsArtifactsContainer>,
690        scope: fasync::ScopeHandle,
691    ) {
692        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
693        container.handle_log_sink(stream, scope);
694        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
695        assert_eq!(initial_interest.min_severity, expected_severity);
696    }
697}