1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursor, FilterCursorStream, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15 ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_async as fasync;
20use fuchsia_inspect as inspect;
21use fuchsia_sync::Mutex;
22use futures::prelude::*;
23use log::{LevelFilter, debug, error};
24use moniker::{ExtendedMoniker, Moniker};
25use selectors::SelectorExt;
26use std::collections::{BTreeMap, HashMap};
27use std::str::FromStr;
28use std::sync::atomic::{AtomicUsize, Ordering};
29use std::sync::{Arc, OnceLock, Weak};
30
31#[derive(Ord, PartialOrd, Eq, PartialEq)]
33pub struct ComponentInitialInterest {
34 component: UrlOrMoniker,
36 log_severity: Severity,
38}
39impl FromStr for ComponentInitialInterest {
42 type Err = anyhow::Error;
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 let mut split = s.rsplitn(2, ":");
45 match (split.next(), split.next()) {
46 (Some(severity), Some(url_or_moniker)) => {
47 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
48 return Err(format_err!("invalid url or moniker"));
49 };
50 let Ok(severity) = Severity::from_str(severity) else {
51 return Err(format_err!("invalid severity"));
52 };
53 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
54 }
55 _ => Err(format_err!("invalid interest")),
56 }
57 }
58}
59
60#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
61pub enum UrlOrMoniker {
62 Url(FlyStr),
64 Moniker(ExtendedMoniker),
66 Partial(FlyStr),
68}
69
70impl FromStr for UrlOrMoniker {
71 type Err = ();
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
74 || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
75 {
76 Ok(UrlOrMoniker::Url(s.into()))
77 } else if s.starts_with("/") {
78 if let Ok(moniker) = Moniker::from_str(s) {
79 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80 } else {
81 Err(())
82 }
83 } else {
84 Ok(UrlOrMoniker::Partial(s.into()))
85 }
86 }
87}
88
89pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96pub const ARCHIVIST_TAG: u64 = 0;
101
102pub struct LogsRepository {
105 mutable_state: Mutex<LogsRepositoryState>,
106 shared_buffer: Arc<SharedBuffer>,
107 scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111 pub fn new(
112 ring_buffer: ring_buffer::Reader,
113 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114 parent: &fuchsia_inspect::Node,
115 scope: fasync::Scope,
116 ) -> Arc<Self> {
117 let scope_handle = scope.to_handle();
118 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120 let me_clone = Weak::clone(me);
121 let shared_buffer = SharedBuffer::new(
122 ring_buffer,
123 Box::new(move |identity| {
124 if let Some(this) = me_clone.upgrade() {
125 this.on_container_inactive(&identity);
126 }
127 }),
128 Default::default(),
129 mutable_state.global_analytics.logs_node(),
130 );
131 if let Some(m) = ARCHIVIST_MONIKER.get() {
132 let archivist_container = mutable_state.create_log_container(
133 Arc::new(ComponentIdentity::new(
134 ExtendedMoniker::ComponentInstance(m.clone()),
135 "fuchsia-boot:///archivist#meta/archivist.cm",
136 )),
137 &shared_buffer,
138 Weak::clone(me),
139 );
140 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
142 }
143 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
144 })
145 }
146
147 pub async fn flush(&self) {
148 self.shared_buffer.flush().await;
149 }
150
151 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
154 where
155 K: DebugLog + Send + Sync + 'static,
156 {
157 let mut mutable_state = self.mutable_state.lock();
158
159 if mutable_state.draining_klog {
162 return;
163 }
164 mutable_state.draining_klog = true;
165
166 let container =
167 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
168 let Some(ref scope) = mutable_state.scope else {
169 return;
170 };
171 scope.spawn(async move {
172 debug!("Draining debuglog.");
173 let mut kernel_logger = DebugLogBridge::create(klog_reader);
174 let mut messages = match kernel_logger.existing_logs() {
175 Ok(messages) => messages,
176 Err(e) => {
177 error!(e:%; "failed to read from kernel log, important logs may be missing");
178 return;
179 }
180 };
181 messages.sort_by_key(|m| m.timestamp());
182 for message in messages {
183 container.ingest_message(message);
184 }
185
186 let res = kernel_logger
187 .listen()
188 .try_for_each(|message| async {
189 container.ingest_message(message);
190 Ok(())
191 })
192 .await;
193 if let Err(e) = res {
194 error!(e:%; "failed to drain kernel log, important logs may be missing");
195 }
196 });
197 }
198
199 pub fn logs_cursor_raw(
200 &self,
201 mode: StreamMode,
202 selectors: Vec<ComponentSelector>,
203 ) -> FilterCursor {
204 self.shared_buffer.cursor(mode, selectors)
205 }
206
207 pub fn logs_cursor(
210 &self,
211 mode: StreamMode,
212 selectors: Vec<ComponentSelector>,
213 ) -> FilterCursorStream<LogsData> {
214 self.shared_buffer.cursor(mode, selectors).into()
215 }
216
217 #[cfg(test)]
222 pub fn get_log_container(
223 self: &Arc<Self>,
224 identity: Arc<ComponentIdentity>,
225 ) -> Arc<LogsArtifactsContainer> {
226 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
227 }
228
229 pub async fn wait_for_termination(&self) {
232 let Some(scope) = self.mutable_state.lock().scope.take() else {
233 error!("Attempted to terminate twice");
234 return;
235 };
236 scope.join().await;
237 debug!("Log ingestion stopped.");
239 self.shared_buffer.terminate().await;
242 for container in self.mutable_state.lock().logs_data_store.values() {
243 container.terminate();
244 }
245 }
246
247 pub fn stop_accepting_new_log_sinks(&self) {
250 self.scope_handle.close();
251 }
252
253 pub fn new_interest_connection(&self) -> usize {
256 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
257 }
258
259 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
261 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
262 }
263
264 pub fn finish_interest_connection(&self, connection_id: usize) {
266 self.mutable_state.lock().finish_interest_connection(connection_id);
267 }
268
269 fn on_container_inactive(&self, identity: &ComponentIdentity) {
270 let mut repo = self.mutable_state.lock();
271 if !repo.is_live(identity) {
272 repo.remove(identity);
273 }
274 }
275}
276
277#[cfg(test)]
278impl LogsRepository {
279 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
280 use crate::logs::shared_buffer::create_ring_buffer;
281
282 LogsRepository::new(
283 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
284 std::iter::empty(),
285 &Default::default(),
286 scope,
287 )
288 }
289}
290
291impl EventConsumer for LogsRepository {
292 fn handle(self: Arc<Self>, event: Event) {
293 match event.payload {
294 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
295 component,
296 request_stream,
297 }) => {
298 debug!(identity:% = component; "LogSink requested.");
299 let mut mutable_state = self.mutable_state.lock();
304 let container =
305 mutable_state.get_log_container(component, &self.shared_buffer, &self);
306 container.handle_log_sink(request_stream, self.scope_handle.clone());
307 }
308 _ => unreachable!("Archivist state just subscribes to log sink requested"),
309 }
310 }
311}
312
313pub struct LogsRepositoryState {
314 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
315 inspect_node: inspect::Node,
316 global_analytics: GlobalAnalytics,
317
318 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
321
322 draining_klog: bool,
324
325 scope: Option<fasync::Scope>,
327
328 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
330}
331
332impl LogsRepositoryState {
333 fn new(
334 parent: &fuchsia_inspect::Node,
335 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
336 scope: fasync::Scope,
337 ) -> Self {
338 Self {
339 inspect_node: parent.create_child("log_sources"),
340 logs_data_store: HashMap::new(),
341 interest_registrations: BTreeMap::new(),
342 draining_klog: false,
343 initial_interests: initial_interests
344 .map(|ComponentInitialInterest { component, log_severity }| {
345 (component, log_severity)
346 })
347 .collect(),
348 scope: Some(scope),
349 global_analytics: GlobalAnalytics::new(parent),
350 }
351 }
352
353 pub fn get_log_container(
356 &mut self,
357 identity: Arc<ComponentIdentity>,
358 shared_buffer: &Arc<SharedBuffer>,
359 repo: &Arc<LogsRepository>,
360 ) -> Arc<LogsArtifactsContainer> {
361 match self.logs_data_store.get(&identity) {
362 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
363 Some(existing) => Arc::clone(existing),
364 }
365 }
366
367 fn create_log_container(
368 &mut self,
369 identity: Arc<ComponentIdentity>,
370 shared_buffer: &Arc<SharedBuffer>,
371 repo: Weak<LogsRepository>,
372 ) -> Arc<LogsArtifactsContainer> {
373 let initial_interest = self.get_initial_interest(identity.as_ref());
374 let stats = Arc::new(LogStreamStats::new(&self.inspect_node, &identity));
375 let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
376 let container = Arc::new(LogsArtifactsContainer::new(
377 Arc::clone(&identity),
378 self.interest_registrations.values().flat_map(|s| s.iter()),
379 initial_interest,
380 stats,
381 buffer,
382 Some(Box::new(move |c| {
383 if let Some(repo) = repo.upgrade() {
384 repo.on_container_inactive(&c.identity)
385 }
386 })),
387 ));
388 self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
389 container
390 }
391
392 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
393 let exact_url_severity =
394 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
395 let exact_moniker_severity =
396 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
397
398 let partial_severity = self
399 .initial_interests
400 .iter()
401 .filter_map(|(uom, severity)| match uom {
402 UrlOrMoniker::Partial(p) => {
403 if identity.url.contains(p.as_str())
404 || identity.moniker.to_string().contains(p.as_str())
405 {
406 Some(*severity)
407 } else {
408 None
409 }
410 }
411 _ => None,
412 })
413 .min();
414
415 [exact_url_severity, exact_moniker_severity, partial_severity]
416 .into_iter()
417 .flatten()
418 .min()
419 .map(FidlSeverity::from)
420 }
421
422 fn is_live(&self, identity: &ComponentIdentity) -> bool {
423 match self.logs_data_store.get(identity) {
424 Some(container) => container.is_active(),
425 None => false,
426 }
427 }
428
429 fn maybe_update_own_logs_interest(
432 &mut self,
433 selectors: &[LogInterestSelector],
434 clear_interest: bool,
435 ) {
436 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
437 let lowest_selector = selectors
438 .iter()
439 .filter(|selector| {
440 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
451 matches!(
452 &s[..],
453 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
454 )
455 }) {
456 return false;
457 }
458
459 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
460 })
461 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
462 if let Some(selector) = lowest_selector {
463 if clear_interest {
464 log::set_max_level(LevelFilter::Info);
465 } else {
466 log::set_max_level(
467 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
468 FidlSeverity::Trace => LevelFilter::Trace,
469 FidlSeverity::Debug => LevelFilter::Debug,
470 FidlSeverity::Info => LevelFilter::Info,
471 FidlSeverity::Warn => LevelFilter::Warn,
472 FidlSeverity::Error => LevelFilter::Error,
473 FidlSeverity::Fatal => LevelFilter::Error,
476 FidlSeverity::__SourceBreaking { .. } => return,
477 },
478 );
479 }
480 }
481 }
482
483 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
484 self.maybe_update_own_logs_interest(&selectors, false);
485 let previous_selectors =
486 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
487 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
489 for logs_data in self.logs_data_store.values() {
490 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
491 }
492 }
493
494 pub fn finish_interest_connection(&mut self, connection_id: usize) {
495 let selectors = self.interest_registrations.remove(&connection_id);
496 if let Some(selectors) = selectors {
497 self.maybe_update_own_logs_interest(&selectors, true);
498 for logs_data in self.logs_data_store.values() {
499 logs_data.reset_interest(&selectors);
500 }
501 }
502 }
503
504 pub fn remove(&mut self, identity: &ComponentIdentity) {
505 self.logs_data_store.remove(identity);
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use crate::logs::shared_buffer::create_ring_buffer;
513 use crate::logs::testing::make_message;
514 use fidl_fuchsia_diagnostics::StreamMode;
515 use fidl_fuchsia_logger::LogSinkMarker;
516 use fuchsia_inspect::Inspector;
517 use moniker::ExtendedMoniker;
518 use ring_buffer::MAX_MESSAGE_SIZE;
519 use selectors::{FastError, SelectorExt};
520 use std::time::Duration;
521
522 #[fuchsia::test]
523 async fn data_repo_filters_logs_by_selectors() {
524 let repo = LogsRepository::for_test(fasync::Scope::new());
525 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
526 ExtendedMoniker::parse_str("./foo").unwrap(),
527 "fuchsia-pkg://foo",
528 )));
529 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
530 ExtendedMoniker::parse_str("./bar").unwrap(),
531 "fuchsia-pkg://bar",
532 )));
533
534 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
535 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
536 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
537
538 let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
539
540 let results =
541 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
542 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
543
544 let filtered_stream = repo.logs_cursor(
545 StreamMode::Snapshot,
546 vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
547 );
548
549 let results =
550 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
551 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
552 }
553
554 #[fuchsia::test]
555 async fn data_repo_correctly_sets_initial_interests() {
556 let repo = LogsRepository::new(
557 create_ring_buffer(100000),
558 [
559 ComponentInitialInterest {
560 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
561 log_severity: Severity::Info,
562 },
563 ComponentInitialInterest {
564 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
565 log_severity: Severity::Warn,
566 },
567 ComponentInitialInterest {
568 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
569 log_severity: Severity::Error,
570 },
571 ComponentInitialInterest {
572 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
573 log_severity: Severity::Debug,
574 },
575 ]
576 .into_iter(),
577 &fuchsia_inspect::Node::default(),
578 fasync::Scope::new(),
579 );
580
581 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
583 ExtendedMoniker::parse_str("core/foo").unwrap(),
584 "fuchsia-pkg://foo",
585 )));
586 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
587 .await;
588
589 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
591 ExtendedMoniker::parse_str("core/baz").unwrap(),
592 "fuchsia-pkg://baz",
593 )));
594 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
595 .await;
596
597 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
600 ExtendedMoniker::parse_str("core/bar").unwrap(),
601 "fuchsia-pkg://bar",
602 )));
603 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
604 .await;
605
606 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
609 ExtendedMoniker::parse_str("core/quux").unwrap(),
610 "fuchsia-pkg://quux",
611 )));
612 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
613 }
614
615 #[fuchsia::test]
616 async fn data_repo_correctly_handles_partial_matching() {
617 let repo = LogsRepository::new(
618 create_ring_buffer(100000),
619 [
620 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
621 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
622 "/core/bust:DEBUG".parse(),
623 "core/bar:ERROR".parse(),
624 "foo:DEBUG".parse(),
625 "both:TRACE".parse(),
626 ]
627 .into_iter()
628 .map(Result::unwrap),
629 &fuchsia_inspect::Node::default(),
630 fasync::Scope::new(),
631 );
632
633 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
635 ExtendedMoniker::parse_str("core/foo").unwrap(),
636 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
637 )));
638 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
639 .await;
640
641 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
643 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
644 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
645 )));
646 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
647 .await;
648
649 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
651 ExtendedMoniker::parse_str("core/baz").unwrap(),
652 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
653 )));
654 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
655 .await;
656
657 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
660 ExtendedMoniker::parse_str("core/bar").unwrap(),
661 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
662 )));
663 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
664 .await;
665
666 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
669 ExtendedMoniker::parse_str("core/quux").unwrap(),
670 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
671 )));
672 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
673
674 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
676 ExtendedMoniker::parse_str("core/both").unwrap(),
677 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
678 )));
679 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
680 .await;
681
682 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
684 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
685 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
686 )));
687 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
688 }
689
690 async fn expect_initial_interest(
691 expected_severity: Option<FidlSeverity>,
692 container: Arc<LogsArtifactsContainer>,
693 scope: fasync::ScopeHandle,
694 ) {
695 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
696 container.handle_log_sink(stream, scope);
697 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
698 assert_eq!(initial_interest.min_severity, expected_severity);
699 }
700
701 #[fuchsia::test]
702 async fn inspect_node_cleaned_up_on_roll_out() {
703 let inspector = Inspector::default();
704 let repo = LogsRepository::new(
705 create_ring_buffer(MAX_MESSAGE_SIZE),
706 std::iter::empty(),
707 inspector.root(),
708 fasync::Scope::new(),
709 );
710
711 let identity_foo = Arc::new(ComponentIdentity::new(
712 ExtendedMoniker::parse_str("./foo").unwrap(),
713 "fuchsia-pkg://foo",
714 ));
715
716 let container_foo = repo.get_log_container(Arc::clone(&identity_foo));
718 container_foo.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
719
720 let _cursor = repo.logs_cursor(
724 StreamMode::Subscribe,
725 vec![identity_foo.moniker.clone().into_component_selector()],
726 );
727
728 container_foo.mark_stopped();
729 drop(container_foo);
730
731 let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
733 assert!(
734 hierarchy.get_child_by_path(&["log_sources", "foo"]).is_some(),
735 "foo stats must exist initially"
736 );
737
738 let container_bar = repo.get_log_container(Arc::new(ComponentIdentity::new(
740 ExtendedMoniker::parse_str("./bar").unwrap(),
741 "fuchsia-pkg://bar",
742 )));
743
744 let large_str = "b".repeat(1000);
745 for i in 2..1000 {
746 container_bar.ingest_message(make_message(
747 &large_str,
748 None,
749 zx::BootInstant::from_nanos(i),
750 ));
751 fasync::Timer::new(Duration::from_millis(10)).await;
752 if !repo.mutable_state.lock().is_live(&identity_foo) {
753 break;
754 }
755 }
756
757 assert!(
758 !repo.mutable_state.lock().is_live(&identity_foo),
759 "foo container must be inactive after rollout"
760 );
761
762 let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
764 assert!(
765 hierarchy.get_child_by_path(&["log_sources", "foo"]).is_none(),
766 "foo stats must be cleaned up after rollout"
767 );
768 }
769}