Skip to main content

archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursor, FilterCursorStream, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use diagnostics_log_encoding::ARCHIVIST_URL;
15use fidl_fuchsia_diagnostics::{
16    ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
17};
18use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
19use flyweights::FlyStr;
20use fuchsia_async as fasync;
21use fuchsia_inspect as inspect;
22use fuchsia_sync::Mutex;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31
32// LINT.IfChange
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
34pub struct ComponentInitialInterest {
35    /// The URL or moniker for the component which should receive the initial interest.
36    component: UrlOrMoniker,
37    /// The log severity the initial interest should specify.
38    log_severity: Severity,
39}
40// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
41
42impl FromStr for ComponentInitialInterest {
43    type Err = anyhow::Error;
44    fn from_str(s: &str) -> Result<Self, Self::Err> {
45        let mut split = s.rsplitn(2, ":");
46        match (split.next(), split.next()) {
47            (Some(severity), Some(url_or_moniker)) => {
48                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
49                    return Err(format_err!("invalid url or moniker"));
50                };
51                let Ok(severity) = Severity::from_str(severity) else {
52                    return Err(format_err!("invalid severity"));
53                };
54                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
55            }
56            _ => Err(format_err!("invalid interest")),
57        }
58    }
59}
60
61#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
62pub enum UrlOrMoniker {
63    /// An absolute fuchsia url to a component.
64    Url(FlyStr),
65    /// The absolute moniker for a component.
66    Moniker(ExtendedMoniker),
67    /// A partial string to match against url or moniker.
68    Partial(FlyStr),
69}
70
71impl FromStr for UrlOrMoniker {
72    type Err = ();
73    fn from_str(s: &str) -> Result<Self, Self::Err> {
74        if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
75            || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
76        {
77            Ok(UrlOrMoniker::Url(s.into()))
78        } else if s.starts_with("/") {
79            if let Ok(moniker) = Moniker::from_str(s) {
80                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
81            } else {
82                Err(())
83            }
84        } else {
85            Ok(UrlOrMoniker::Partial(s.into()))
86        }
87    }
88}
89
90/// Static ID, used for persistent changes to interest settings.
91pub const STATIC_CONNECTION_ID: usize = 0;
92static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
93
94/// This is set when Archivist first starts. This might not be set for unit tests.
95pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
96
97/// The IOBuffer writer tag used for Archivist logs. We rely on the first container we create having
98/// this tag, so the Archivist container must be the first container created after creating the
99/// shared buffer. This tag is used before we create the container because we want to set up logging
100/// at the earliest possible moment after a binary starts.
101pub const ARCHIVIST_TAG: u64 = 0;
102
103/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
104/// [`pipeline::Pipeline`]s in a given Archivist instance.
105pub struct LogsRepository {
106    mutable_state: Mutex<LogsRepositoryState>,
107    shared_buffer: Arc<SharedBuffer>,
108    scope_handle: fasync::ScopeHandle,
109}
110
111impl LogsRepository {
112    pub fn new(
113        ring_buffer: ring_buffer::Reader,
114        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
115        parent: &fuchsia_inspect::Node,
116        scope: fasync::Scope,
117    ) -> Arc<Self> {
118        let scope_handle = scope.to_handle();
119        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
120            let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
121            let me_clone = Weak::clone(me);
122            let shared_buffer = SharedBuffer::new(
123                ring_buffer,
124                Box::new(move |identity| {
125                    if let Some(this) = me_clone.upgrade() {
126                        this.on_container_inactive(&identity);
127                    }
128                }),
129                Default::default(),
130                mutable_state.global_analytics.logs_node(),
131            );
132            if let Some(m) = ARCHIVIST_MONIKER.get() {
133                let archivist_container = mutable_state.create_log_container(
134                    Arc::new(ComponentIdentity::new(
135                        ExtendedMoniker::ComponentInstance(m.clone()),
136                        ARCHIVIST_URL,
137                    )),
138                    &shared_buffer,
139                    Weak::clone(me),
140                );
141                // We rely on the first container we create ending up with the correct tag.
142                assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
143            }
144            LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
145        })
146    }
147
148    pub async fn flush(&self) {
149        self.shared_buffer.flush().await;
150    }
151
152    /// Drain the kernel's debug log. The returned future completes once
153    /// existing messages have been ingested.
154    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
155    where
156        K: DebugLog + Send + Sync + 'static,
157    {
158        let mut mutable_state = self.mutable_state.lock();
159
160        // We can only have one klog reader, if this is already set, it means we are already
161        // draining klog.
162        if mutable_state.draining_klog {
163            return;
164        }
165        mutable_state.draining_klog = true;
166
167        let container =
168            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
169        let Some(ref scope) = mutable_state.scope else {
170            return;
171        };
172        scope.spawn(async move {
173            debug!("Draining debuglog.");
174            let mut kernel_logger = DebugLogBridge::create(klog_reader);
175            let mut messages = match kernel_logger.existing_logs() {
176                Ok(messages) => messages,
177                Err(e) => {
178                    error!(e:%; "failed to read from kernel log, important logs may be missing");
179                    return;
180                }
181            };
182            messages.sort_by_key(|m| m.timestamp());
183            for message in messages {
184                container.ingest_message(message);
185            }
186
187            let res = kernel_logger
188                .listen()
189                .try_for_each(|message| async {
190                    container.ingest_message(message);
191                    Ok(())
192                })
193                .await;
194            if let Err(e) = res {
195                error!(e:%; "failed to drain kernel log, important logs may be missing");
196            }
197        });
198    }
199
200    pub fn logs_cursor_raw(
201        &self,
202        mode: StreamMode,
203        selectors: Vec<ComponentSelector>,
204    ) -> FilterCursor {
205        self.shared_buffer.cursor(mode, selectors)
206    }
207
208    /// Returns a log stream filtered to the specified selectors. If `selectors` is empty, all logs
209    /// are returned.
210    pub fn logs_cursor(
211        &self,
212        mode: StreamMode,
213        selectors: Vec<ComponentSelector>,
214    ) -> FilterCursorStream<LogsData> {
215        self.shared_buffer.cursor(mode, selectors).into()
216    }
217
218    /// Returns a log container.
219    ///
220    /// NOTE: This function does nothing to stop the container from being removed, so this is
221    /// currently only suitable for test code.
222    #[cfg(test)]
223    pub fn get_log_container(
224        self: &Arc<Self>,
225        identity: Arc<ComponentIdentity>,
226    ) -> Arc<LogsArtifactsContainer> {
227        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
228    }
229
230    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
231    /// After that, any pending Cursors will return Poll::Ready(None).
232    pub async fn wait_for_termination(&self) {
233        let Some(scope) = self.mutable_state.lock().scope.take() else {
234            error!("Attempted to terminate twice");
235            return;
236        };
237        scope.join().await;
238        // Process messages from log sink.
239        debug!("Log ingestion stopped.");
240        // Terminate the shared buffer first so that pending messages are processed before we
241        // terminate all the containers.
242        self.shared_buffer.terminate().await;
243        for container in self.mutable_state.lock().logs_data_store.values() {
244            container.terminate();
245        }
246    }
247
248    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
249    /// will be accepted when this is called and we'll proceed to terminate logs.
250    pub fn stop_accepting_new_log_sinks(&self) {
251        self.scope_handle.close();
252    }
253
254    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
255    /// ensure shared uniqueness of their connections.
256    pub fn new_interest_connection(&self) -> usize {
257        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
258    }
259
260    /// Updates log selectors associated with an interest connection.
261    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
262        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
263    }
264
265    /// Indicates that the connection associated with the given ID is now done.
266    pub fn finish_interest_connection(&self, connection_id: usize) {
267        self.mutable_state.lock().finish_interest_connection(connection_id);
268    }
269
270    fn on_container_inactive(&self, identity: &ComponentIdentity) {
271        let mut repo = self.mutable_state.lock();
272        if !repo.is_live(identity) {
273            repo.remove(identity);
274        }
275    }
276}
277
278#[cfg(test)]
279impl LogsRepository {
280    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
281        use crate::logs::shared_buffer::create_ring_buffer;
282
283        LogsRepository::new(
284            create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
285            std::iter::empty(),
286            &Default::default(),
287            scope,
288        )
289    }
290}
291
292impl EventConsumer for LogsRepository {
293    fn handle(self: Arc<Self>, event: Event) {
294        match event.payload {
295            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
296                component,
297                request_stream,
298            }) => {
299                debug!(identity:% = component; "LogSink requested.");
300                // NOTE: It is important that we hold the lock whilst we call
301                // `Container::handle_log_sink` because otherwise the container could be removed by
302                // `on_container_inactive`.  After calling `handle_log_sink`, the container cannot
303                // be removed until after the `LogSink` channel is closed.
304                let mut mutable_state = self.mutable_state.lock();
305                let container =
306                    mutable_state.get_log_container(component, &self.shared_buffer, &self);
307                container.handle_log_sink(request_stream, self.scope_handle.clone());
308            }
309            _ => unreachable!("Archivist state just subscribes to log sink requested"),
310        }
311    }
312}
313
314pub struct LogsRepositoryState {
315    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
316    inspect_node: inspect::Node,
317    global_analytics: GlobalAnalytics,
318
319    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
320    /// or through fuchsia.logger.LogSettings/SetInterest.
321    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
322
323    /// Whether or not we are draining the kernel log.
324    draining_klog: bool,
325
326    /// Scope where log ingestion tasks are running.
327    scope: Option<fasync::Scope>,
328
329    /// The initial log interests with which archivist was configured.
330    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
331}
332
333impl LogsRepositoryState {
334    fn new(
335        parent: &fuchsia_inspect::Node,
336        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
337        scope: fasync::Scope,
338    ) -> Self {
339        Self {
340            inspect_node: parent.create_child("log_sources"),
341            logs_data_store: HashMap::new(),
342            interest_registrations: BTreeMap::new(),
343            draining_klog: false,
344            initial_interests: initial_interests
345                .map(|ComponentInitialInterest { component, log_severity }| {
346                    (component, log_severity)
347                })
348                .collect(),
349            scope: Some(scope),
350            global_analytics: GlobalAnalytics::new(parent),
351        }
352    }
353
354    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
355    /// necessary.
356    pub fn get_log_container(
357        &mut self,
358        identity: Arc<ComponentIdentity>,
359        shared_buffer: &Arc<SharedBuffer>,
360        repo: &Arc<LogsRepository>,
361    ) -> Arc<LogsArtifactsContainer> {
362        match self.logs_data_store.get(&identity) {
363            None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
364            Some(existing) => Arc::clone(existing),
365        }
366    }
367
368    fn create_log_container(
369        &mut self,
370        identity: Arc<ComponentIdentity>,
371        shared_buffer: &Arc<SharedBuffer>,
372        repo: Weak<LogsRepository>,
373    ) -> Arc<LogsArtifactsContainer> {
374        let initial_interest = self.get_initial_interest(identity.as_ref());
375        let stats = Arc::new(LogStreamStats::new(&self.inspect_node, &identity));
376        let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
377        let container = Arc::new(LogsArtifactsContainer::new(
378            Arc::clone(&identity),
379            self.interest_registrations.values().flat_map(|s| s.iter()),
380            initial_interest,
381            stats,
382            buffer,
383            Some(Box::new(move |c| {
384                if let Some(repo) = repo.upgrade() {
385                    repo.on_container_inactive(&c.identity)
386                }
387            })),
388        ));
389        self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
390        container
391    }
392
393    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
394        let exact_url_severity =
395            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
396        let exact_moniker_severity =
397            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
398
399        let partial_severity = self
400            .initial_interests
401            .iter()
402            .filter_map(|(uom, severity)| match uom {
403                UrlOrMoniker::Partial(p) => {
404                    if identity.url.contains(p.as_str())
405                        || identity.moniker.to_string().contains(p.as_str())
406                    {
407                        Some(*severity)
408                    } else {
409                        None
410                    }
411                }
412                _ => None,
413            })
414            .min();
415
416        [exact_url_severity, exact_moniker_severity, partial_severity]
417            .into_iter()
418            .flatten()
419            .min()
420            .map(FidlSeverity::from)
421    }
422
423    fn is_live(&self, identity: &ComponentIdentity) -> bool {
424        match self.logs_data_store.get(identity) {
425            Some(container) => container.is_active(),
426            None => false,
427        }
428    }
429
430    /// Updates our own log interest if we are the root Archivist and logging
431    /// to klog.
432    fn maybe_update_own_logs_interest(
433        &mut self,
434        selectors: &[LogInterestSelector],
435        clear_interest: bool,
436    ) {
437        let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
438        let lowest_selector = selectors
439            .iter()
440            .filter(|selector| {
441                // If this is an embedded archivist, the wildcard selector "**" is used (in
442                // tests at least) to change the interest level on all components. Since
443                // archivist logs to itself, this creates a situation where archivist logs can
444                // get included which is undesirable in the vast majority of cases. To address
445                // this, we prevent the global wildcard pattern "**" and "*" from matching
446                // archivist. This is clearly a bit of a hack, but it is balanced by it being
447                // the behavior that the vast majority of users will want. It is still possible
448                // to change the interest level for archivist by using an exact match. Note that
449                // this will apply to both the embedded and system archivist to keep things
450                // consistent.
451                if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
452                    matches!(
453                        &s[..],
454                        [StringSelector::StringPattern(s)] if s == "**" || s == "*"
455                    )
456                }) {
457                    return false;
458                }
459
460                moniker.matches_component_selector(&selector.selector).unwrap_or(false)
461            })
462            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
463        if let Some(selector) = lowest_selector {
464            if clear_interest {
465                log::set_max_level(LevelFilter::Info);
466            } else {
467                log::set_max_level(
468                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
469                        FidlSeverity::Trace => LevelFilter::Trace,
470                        FidlSeverity::Debug => LevelFilter::Debug,
471                        FidlSeverity::Info => LevelFilter::Info,
472                        FidlSeverity::Warn => LevelFilter::Warn,
473                        FidlSeverity::Error => LevelFilter::Error,
474                        // Log has no "Fatal" level, so set it to Error
475                        // instead.
476                        FidlSeverity::Fatal => LevelFilter::Error,
477                        FidlSeverity::__SourceBreaking { .. } => return,
478                    },
479                );
480            }
481        }
482    }
483
484    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
485        self.maybe_update_own_logs_interest(&selectors, false);
486        let previous_selectors =
487            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
488        // unwrap safe, we just inserted.
489        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
490        for logs_data in self.logs_data_store.values() {
491            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
492        }
493    }
494
495    pub fn finish_interest_connection(&mut self, connection_id: usize) {
496        let selectors = self.interest_registrations.remove(&connection_id);
497        if let Some(selectors) = selectors {
498            self.maybe_update_own_logs_interest(&selectors, true);
499            for logs_data in self.logs_data_store.values() {
500                logs_data.reset_interest(&selectors);
501            }
502        }
503    }
504
505    pub fn remove(&mut self, identity: &ComponentIdentity) {
506        self.logs_data_store.remove(identity);
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::logs::shared_buffer::create_ring_buffer;
514    use crate::logs::testing::make_message;
515    use fidl_fuchsia_diagnostics::StreamMode;
516    use fidl_fuchsia_logger::LogSinkMarker;
517    use fuchsia_inspect::Inspector;
518    use moniker::ExtendedMoniker;
519    use ring_buffer::MAX_MESSAGE_SIZE;
520    use selectors::{FastError, SelectorExt};
521    use std::time::Duration;
522
523    #[fuchsia::test]
524    async fn data_repo_filters_logs_by_selectors() {
525        let repo = LogsRepository::for_test(fasync::Scope::new());
526        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527            ExtendedMoniker::parse_str("./foo").unwrap(),
528            "fuchsia-pkg://foo",
529        )));
530        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
531            ExtendedMoniker::parse_str("./bar").unwrap(),
532            "fuchsia-pkg://bar",
533        )));
534
535        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
536        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
537        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
538
539        let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
540
541        let results =
542            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
543        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
544
545        let filtered_stream = repo.logs_cursor(
546            StreamMode::Snapshot,
547            vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
548        );
549
550        let results =
551            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
552        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
553    }
554
555    #[fuchsia::test]
556    async fn data_repo_correctly_sets_initial_interests() {
557        let repo = LogsRepository::new(
558            create_ring_buffer(100000),
559            [
560                ComponentInitialInterest {
561                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
562                    log_severity: Severity::Info,
563                },
564                ComponentInitialInterest {
565                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
566                    log_severity: Severity::Warn,
567                },
568                ComponentInitialInterest {
569                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
570                    log_severity: Severity::Error,
571                },
572                ComponentInitialInterest {
573                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
574                    log_severity: Severity::Debug,
575                },
576            ]
577            .into_iter(),
578            &fuchsia_inspect::Node::default(),
579            fasync::Scope::new(),
580        );
581
582        // We have the moniker configured, use the associated severity.
583        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
584            ExtendedMoniker::parse_str("core/foo").unwrap(),
585            "fuchsia-pkg://foo",
586        )));
587        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
588            .await;
589
590        // We have the URL configure, use the associated severity.
591        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
592            ExtendedMoniker::parse_str("core/baz").unwrap(),
593            "fuchsia-pkg://baz",
594        )));
595        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
596            .await;
597
598        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
599        // for the URL over Error for the moniker.
600        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
601            ExtendedMoniker::parse_str("core/bar").unwrap(),
602            "fuchsia-pkg://bar",
603        )));
604        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
605            .await;
606
607        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
608        // severity isn't set.
609        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
610            ExtendedMoniker::parse_str("core/quux").unwrap(),
611            "fuchsia-pkg://quux",
612        )));
613        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
614    }
615
616    #[fuchsia::test]
617    async fn data_repo_correctly_handles_partial_matching() {
618        let repo = LogsRepository::new(
619            create_ring_buffer(100000),
620            [
621                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
622                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
623                "/core/bust:DEBUG".parse(),
624                "core/bar:ERROR".parse(),
625                "foo:DEBUG".parse(),
626                "both:TRACE".parse(),
627            ]
628            .into_iter()
629            .map(Result::unwrap),
630            &fuchsia_inspect::Node::default(),
631            fasync::Scope::new(),
632        );
633
634        // We have a partial moniker configured, use the associated severity.
635        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
636            ExtendedMoniker::parse_str("core/foo").unwrap(),
637            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
638        )));
639        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
640            .await;
641
642        // We have a partial url configured, use the associated severity.
643        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
644            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
645            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
646        )));
647        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
648            .await;
649
650        // We have the URL configure, use the associated severity.
651        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
652            ExtendedMoniker::parse_str("core/baz").unwrap(),
653            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
654        )));
655        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
656            .await;
657
658        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
659        // for the URL over Error for the moniker.
660        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
661            ExtendedMoniker::parse_str("core/bar").unwrap(),
662            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
663        )));
664        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
665            .await;
666
667        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
668        // severity isn't set.
669        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
670            ExtendedMoniker::parse_str("core/quux").unwrap(),
671            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
672        )));
673        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
674
675        // We have a partial match for both moniker and url, should still work.
676        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
677            ExtendedMoniker::parse_str("core/both").unwrap(),
678            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
679        )));
680        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
681            .await;
682
683        // Exact moniker match should not match sub-monikers.
684        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
685            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
686            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
687        )));
688        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
689    }
690
691    async fn expect_initial_interest(
692        expected_severity: Option<FidlSeverity>,
693        container: Arc<LogsArtifactsContainer>,
694        scope: fasync::ScopeHandle,
695    ) {
696        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
697        container.handle_log_sink(stream, scope);
698        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
699        assert_eq!(initial_interest.min_severity, expected_severity);
700    }
701
702    #[fuchsia::test]
703    async fn inspect_node_cleaned_up_on_roll_out() {
704        let inspector = Inspector::default();
705        let repo = LogsRepository::new(
706            create_ring_buffer(MAX_MESSAGE_SIZE),
707            std::iter::empty(),
708            inspector.root(),
709            fasync::Scope::new(),
710        );
711
712        let identity_foo = Arc::new(ComponentIdentity::new(
713            ExtendedMoniker::parse_str("./foo").unwrap(),
714            "fuchsia-pkg://foo",
715        ));
716
717        // Create container A
718        let container_foo = repo.get_log_container(Arc::clone(&identity_foo));
719        container_foo.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
720
721        // Force SharedBuffer to scan messages and update msg_ids.end.
722        // Without this, ContainerInfo::is_active() evaluates to false immediately after
723        // mark_stopped(), causing repo.is_live() to return false before rollout happens.
724        let _cursor = repo.logs_cursor(
725            StreamMode::Subscribe,
726            vec![identity_foo.moniker.clone().into_component_selector()],
727        );
728
729        container_foo.mark_stopped();
730        drop(container_foo);
731
732        // Verify stats exist
733        let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
734        assert!(
735            hierarchy.get_child_by_path(&["log_sources", "foo"]).is_some(),
736            "foo stats must exist initially"
737        );
738
739        // Ingest messages for another component until foo is rolled out
740        let container_bar = repo.get_log_container(Arc::new(ComponentIdentity::new(
741            ExtendedMoniker::parse_str("./bar").unwrap(),
742            "fuchsia-pkg://bar",
743        )));
744
745        let large_str = "b".repeat(1000);
746        for i in 2..1000 {
747            container_bar.ingest_message(make_message(
748                &large_str,
749                None,
750                zx::BootInstant::from_nanos(i),
751            ));
752            fasync::Timer::new(Duration::from_millis(10)).await;
753            if !repo.mutable_state.lock().is_live(&identity_foo) {
754                break;
755            }
756        }
757
758        assert!(
759            !repo.mutable_state.lock().is_live(&identity_foo),
760            "foo container must be inactive after rollout"
761        );
762
763        // Verify stats are cleaned up
764        let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
765        assert!(
766            hierarchy.get_child_by_path(&["log_sources", "foo"]).is_none(),
767            "foo stats must be cleaned up after rollout"
768        );
769    }
770}