1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursor, FilterCursorStream, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use diagnostics_log_encoding::ARCHIVIST_URL;
15use fidl_fuchsia_diagnostics::{
16 ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
17};
18use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
19use flyweights::FlyStr;
20use fuchsia_async as fasync;
21use fuchsia_inspect as inspect;
22use fuchsia_sync::Mutex;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31
32#[derive(Ord, PartialOrd, Eq, PartialEq)]
34pub struct ComponentInitialInterest {
35 component: UrlOrMoniker,
37 log_severity: Severity,
39}
40impl FromStr for ComponentInitialInterest {
43 type Err = anyhow::Error;
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 let mut split = s.rsplitn(2, ":");
46 match (split.next(), split.next()) {
47 (Some(severity), Some(url_or_moniker)) => {
48 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
49 return Err(format_err!("invalid url or moniker"));
50 };
51 let Ok(severity) = Severity::from_str(severity) else {
52 return Err(format_err!("invalid severity"));
53 };
54 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
55 }
56 _ => Err(format_err!("invalid interest")),
57 }
58 }
59}
60
61#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
62pub enum UrlOrMoniker {
63 Url(FlyStr),
65 Moniker(ExtendedMoniker),
67 Partial(FlyStr),
69}
70
71impl FromStr for UrlOrMoniker {
72 type Err = ();
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
75 || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
76 {
77 Ok(UrlOrMoniker::Url(s.into()))
78 } else if s.starts_with("/") {
79 if let Ok(moniker) = Moniker::from_str(s) {
80 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
81 } else {
82 Err(())
83 }
84 } else {
85 Ok(UrlOrMoniker::Partial(s.into()))
86 }
87 }
88}
89
90pub const STATIC_CONNECTION_ID: usize = 0;
92static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
93
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
96
97pub const ARCHIVIST_TAG: u64 = 0;
102
103pub struct LogsRepository {
106 mutable_state: Mutex<LogsRepositoryState>,
107 shared_buffer: Arc<SharedBuffer>,
108 scope_handle: fasync::ScopeHandle,
109}
110
111impl LogsRepository {
112 pub fn new(
113 ring_buffer: ring_buffer::Reader,
114 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
115 parent: &fuchsia_inspect::Node,
116 scope: fasync::Scope,
117 ) -> Arc<Self> {
118 let scope_handle = scope.to_handle();
119 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
120 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
121 let me_clone = Weak::clone(me);
122 let shared_buffer = SharedBuffer::new(
123 ring_buffer,
124 Box::new(move |identity| {
125 if let Some(this) = me_clone.upgrade() {
126 this.on_container_inactive(&identity);
127 }
128 }),
129 Default::default(),
130 mutable_state.global_analytics.logs_node(),
131 );
132 if let Some(m) = ARCHIVIST_MONIKER.get() {
133 let archivist_container = mutable_state.create_log_container(
134 Arc::new(ComponentIdentity::new(
135 ExtendedMoniker::ComponentInstance(m.clone()),
136 ARCHIVIST_URL,
137 )),
138 &shared_buffer,
139 Weak::clone(me),
140 );
141 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
143 }
144 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
145 })
146 }
147
148 pub async fn flush(&self) {
149 self.shared_buffer.flush().await;
150 }
151
152 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
155 where
156 K: DebugLog + Send + Sync + 'static,
157 {
158 let mut mutable_state = self.mutable_state.lock();
159
160 if mutable_state.draining_klog {
163 return;
164 }
165 mutable_state.draining_klog = true;
166
167 let container =
168 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
169 let Some(ref scope) = mutable_state.scope else {
170 return;
171 };
172 scope.spawn(async move {
173 debug!("Draining debuglog.");
174 let mut kernel_logger = DebugLogBridge::create(klog_reader);
175 let mut messages = match kernel_logger.existing_logs() {
176 Ok(messages) => messages,
177 Err(e) => {
178 error!(e:%; "failed to read from kernel log, important logs may be missing");
179 return;
180 }
181 };
182 messages.sort_by_key(|m| m.timestamp());
183 for message in messages {
184 container.ingest_message(message);
185 }
186
187 let res = kernel_logger
188 .listen()
189 .try_for_each(|message| async {
190 container.ingest_message(message);
191 Ok(())
192 })
193 .await;
194 if let Err(e) = res {
195 error!(e:%; "failed to drain kernel log, important logs may be missing");
196 }
197 });
198 }
199
200 pub fn logs_cursor_raw(
201 &self,
202 mode: StreamMode,
203 selectors: Vec<ComponentSelector>,
204 ) -> FilterCursor {
205 self.shared_buffer.cursor(mode, selectors)
206 }
207
208 pub fn logs_cursor(
211 &self,
212 mode: StreamMode,
213 selectors: Vec<ComponentSelector>,
214 ) -> FilterCursorStream<LogsData> {
215 self.shared_buffer.cursor(mode, selectors).into()
216 }
217
218 #[cfg(test)]
223 pub fn get_log_container(
224 self: &Arc<Self>,
225 identity: Arc<ComponentIdentity>,
226 ) -> Arc<LogsArtifactsContainer> {
227 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
228 }
229
230 pub async fn wait_for_termination(&self) {
233 let Some(scope) = self.mutable_state.lock().scope.take() else {
234 error!("Attempted to terminate twice");
235 return;
236 };
237 scope.join().await;
238 debug!("Log ingestion stopped.");
240 self.shared_buffer.terminate().await;
243 for container in self.mutable_state.lock().logs_data_store.values() {
244 container.terminate();
245 }
246 }
247
248 pub fn stop_accepting_new_log_sinks(&self) {
251 self.scope_handle.close();
252 }
253
254 pub fn new_interest_connection(&self) -> usize {
257 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
258 }
259
260 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
262 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
263 }
264
265 pub fn finish_interest_connection(&self, connection_id: usize) {
267 self.mutable_state.lock().finish_interest_connection(connection_id);
268 }
269
270 fn on_container_inactive(&self, identity: &ComponentIdentity) {
271 let mut repo = self.mutable_state.lock();
272 if !repo.is_live(identity) {
273 repo.remove(identity);
274 }
275 }
276}
277
278#[cfg(test)]
279impl LogsRepository {
280 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
281 use crate::logs::shared_buffer::create_ring_buffer;
282
283 LogsRepository::new(
284 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
285 std::iter::empty(),
286 &Default::default(),
287 scope,
288 )
289 }
290}
291
292impl EventConsumer for LogsRepository {
293 fn handle(self: Arc<Self>, event: Event) {
294 match event.payload {
295 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
296 component,
297 request_stream,
298 }) => {
299 debug!(identity:% = component; "LogSink requested.");
300 let mut mutable_state = self.mutable_state.lock();
305 let container =
306 mutable_state.get_log_container(component, &self.shared_buffer, &self);
307 container.handle_log_sink(request_stream, self.scope_handle.clone());
308 }
309 _ => unreachable!("Archivist state just subscribes to log sink requested"),
310 }
311 }
312}
313
314pub struct LogsRepositoryState {
315 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
316 inspect_node: inspect::Node,
317 global_analytics: GlobalAnalytics,
318
319 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
322
323 draining_klog: bool,
325
326 scope: Option<fasync::Scope>,
328
329 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
331}
332
333impl LogsRepositoryState {
334 fn new(
335 parent: &fuchsia_inspect::Node,
336 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
337 scope: fasync::Scope,
338 ) -> Self {
339 Self {
340 inspect_node: parent.create_child("log_sources"),
341 logs_data_store: HashMap::new(),
342 interest_registrations: BTreeMap::new(),
343 draining_klog: false,
344 initial_interests: initial_interests
345 .map(|ComponentInitialInterest { component, log_severity }| {
346 (component, log_severity)
347 })
348 .collect(),
349 scope: Some(scope),
350 global_analytics: GlobalAnalytics::new(parent),
351 }
352 }
353
354 pub fn get_log_container(
357 &mut self,
358 identity: Arc<ComponentIdentity>,
359 shared_buffer: &Arc<SharedBuffer>,
360 repo: &Arc<LogsRepository>,
361 ) -> Arc<LogsArtifactsContainer> {
362 match self.logs_data_store.get(&identity) {
363 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
364 Some(existing) => Arc::clone(existing),
365 }
366 }
367
368 fn create_log_container(
369 &mut self,
370 identity: Arc<ComponentIdentity>,
371 shared_buffer: &Arc<SharedBuffer>,
372 repo: Weak<LogsRepository>,
373 ) -> Arc<LogsArtifactsContainer> {
374 let initial_interest = self.get_initial_interest(identity.as_ref());
375 let stats = Arc::new(LogStreamStats::new(&self.inspect_node, &identity));
376 let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
377 let container = Arc::new(LogsArtifactsContainer::new(
378 Arc::clone(&identity),
379 self.interest_registrations.values().flat_map(|s| s.iter()),
380 initial_interest,
381 stats,
382 buffer,
383 Some(Box::new(move |c| {
384 if let Some(repo) = repo.upgrade() {
385 repo.on_container_inactive(&c.identity)
386 }
387 })),
388 ));
389 self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
390 container
391 }
392
393 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
394 let exact_url_severity =
395 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
396 let exact_moniker_severity =
397 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
398
399 let partial_severity = self
400 .initial_interests
401 .iter()
402 .filter_map(|(uom, severity)| match uom {
403 UrlOrMoniker::Partial(p) => {
404 if identity.url.contains(p.as_str())
405 || identity.moniker.to_string().contains(p.as_str())
406 {
407 Some(*severity)
408 } else {
409 None
410 }
411 }
412 _ => None,
413 })
414 .min();
415
416 [exact_url_severity, exact_moniker_severity, partial_severity]
417 .into_iter()
418 .flatten()
419 .min()
420 .map(FidlSeverity::from)
421 }
422
423 fn is_live(&self, identity: &ComponentIdentity) -> bool {
424 match self.logs_data_store.get(identity) {
425 Some(container) => container.is_active(),
426 None => false,
427 }
428 }
429
430 fn maybe_update_own_logs_interest(
433 &mut self,
434 selectors: &[LogInterestSelector],
435 clear_interest: bool,
436 ) {
437 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
438 let lowest_selector = selectors
439 .iter()
440 .filter(|selector| {
441 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
452 matches!(
453 &s[..],
454 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
455 )
456 }) {
457 return false;
458 }
459
460 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
461 })
462 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
463 if let Some(selector) = lowest_selector {
464 if clear_interest {
465 log::set_max_level(LevelFilter::Info);
466 } else {
467 log::set_max_level(
468 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
469 FidlSeverity::Trace => LevelFilter::Trace,
470 FidlSeverity::Debug => LevelFilter::Debug,
471 FidlSeverity::Info => LevelFilter::Info,
472 FidlSeverity::Warn => LevelFilter::Warn,
473 FidlSeverity::Error => LevelFilter::Error,
474 FidlSeverity::Fatal => LevelFilter::Error,
477 FidlSeverity::__SourceBreaking { .. } => return,
478 },
479 );
480 }
481 }
482 }
483
484 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
485 self.maybe_update_own_logs_interest(&selectors, false);
486 let previous_selectors =
487 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
488 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
490 for logs_data in self.logs_data_store.values() {
491 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
492 }
493 }
494
495 pub fn finish_interest_connection(&mut self, connection_id: usize) {
496 let selectors = self.interest_registrations.remove(&connection_id);
497 if let Some(selectors) = selectors {
498 self.maybe_update_own_logs_interest(&selectors, true);
499 for logs_data in self.logs_data_store.values() {
500 logs_data.reset_interest(&selectors);
501 }
502 }
503 }
504
505 pub fn remove(&mut self, identity: &ComponentIdentity) {
506 self.logs_data_store.remove(identity);
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::logs::shared_buffer::create_ring_buffer;
514 use crate::logs::testing::make_message;
515 use fidl_fuchsia_diagnostics::StreamMode;
516 use fidl_fuchsia_logger::LogSinkMarker;
517 use fuchsia_inspect::Inspector;
518 use moniker::ExtendedMoniker;
519 use ring_buffer::MAX_MESSAGE_SIZE;
520 use selectors::{FastError, SelectorExt};
521 use std::time::Duration;
522
523 #[fuchsia::test]
524 async fn data_repo_filters_logs_by_selectors() {
525 let repo = LogsRepository::for_test(fasync::Scope::new());
526 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527 ExtendedMoniker::parse_str("./foo").unwrap(),
528 "fuchsia-pkg://foo",
529 )));
530 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
531 ExtendedMoniker::parse_str("./bar").unwrap(),
532 "fuchsia-pkg://bar",
533 )));
534
535 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
536 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
537 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
538
539 let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
540
541 let results =
542 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
543 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
544
545 let filtered_stream = repo.logs_cursor(
546 StreamMode::Snapshot,
547 vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
548 );
549
550 let results =
551 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
552 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
553 }
554
555 #[fuchsia::test]
556 async fn data_repo_correctly_sets_initial_interests() {
557 let repo = LogsRepository::new(
558 create_ring_buffer(100000),
559 [
560 ComponentInitialInterest {
561 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
562 log_severity: Severity::Info,
563 },
564 ComponentInitialInterest {
565 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
566 log_severity: Severity::Warn,
567 },
568 ComponentInitialInterest {
569 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
570 log_severity: Severity::Error,
571 },
572 ComponentInitialInterest {
573 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
574 log_severity: Severity::Debug,
575 },
576 ]
577 .into_iter(),
578 &fuchsia_inspect::Node::default(),
579 fasync::Scope::new(),
580 );
581
582 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
584 ExtendedMoniker::parse_str("core/foo").unwrap(),
585 "fuchsia-pkg://foo",
586 )));
587 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
588 .await;
589
590 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
592 ExtendedMoniker::parse_str("core/baz").unwrap(),
593 "fuchsia-pkg://baz",
594 )));
595 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
596 .await;
597
598 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
601 ExtendedMoniker::parse_str("core/bar").unwrap(),
602 "fuchsia-pkg://bar",
603 )));
604 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
605 .await;
606
607 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
610 ExtendedMoniker::parse_str("core/quux").unwrap(),
611 "fuchsia-pkg://quux",
612 )));
613 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
614 }
615
616 #[fuchsia::test]
617 async fn data_repo_correctly_handles_partial_matching() {
618 let repo = LogsRepository::new(
619 create_ring_buffer(100000),
620 [
621 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
622 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
623 "/core/bust:DEBUG".parse(),
624 "core/bar:ERROR".parse(),
625 "foo:DEBUG".parse(),
626 "both:TRACE".parse(),
627 ]
628 .into_iter()
629 .map(Result::unwrap),
630 &fuchsia_inspect::Node::default(),
631 fasync::Scope::new(),
632 );
633
634 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
636 ExtendedMoniker::parse_str("core/foo").unwrap(),
637 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
638 )));
639 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
640 .await;
641
642 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
644 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
645 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
646 )));
647 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
648 .await;
649
650 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
652 ExtendedMoniker::parse_str("core/baz").unwrap(),
653 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
654 )));
655 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
656 .await;
657
658 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
661 ExtendedMoniker::parse_str("core/bar").unwrap(),
662 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
663 )));
664 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
665 .await;
666
667 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
670 ExtendedMoniker::parse_str("core/quux").unwrap(),
671 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
672 )));
673 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
674
675 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
677 ExtendedMoniker::parse_str("core/both").unwrap(),
678 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
679 )));
680 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
681 .await;
682
683 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
685 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
686 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
687 )));
688 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
689 }
690
691 async fn expect_initial_interest(
692 expected_severity: Option<FidlSeverity>,
693 container: Arc<LogsArtifactsContainer>,
694 scope: fasync::ScopeHandle,
695 ) {
696 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
697 container.handle_log_sink(stream, scope);
698 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
699 assert_eq!(initial_interest.min_severity, expected_severity);
700 }
701
702 #[fuchsia::test]
703 async fn inspect_node_cleaned_up_on_roll_out() {
704 let inspector = Inspector::default();
705 let repo = LogsRepository::new(
706 create_ring_buffer(MAX_MESSAGE_SIZE),
707 std::iter::empty(),
708 inspector.root(),
709 fasync::Scope::new(),
710 );
711
712 let identity_foo = Arc::new(ComponentIdentity::new(
713 ExtendedMoniker::parse_str("./foo").unwrap(),
714 "fuchsia-pkg://foo",
715 ));
716
717 let container_foo = repo.get_log_container(Arc::clone(&identity_foo));
719 container_foo.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
720
721 let _cursor = repo.logs_cursor(
725 StreamMode::Subscribe,
726 vec![identity_foo.moniker.clone().into_component_selector()],
727 );
728
729 container_foo.mark_stopped();
730 drop(container_foo);
731
732 let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
734 assert!(
735 hierarchy.get_child_by_path(&["log_sources", "foo"]).is_some(),
736 "foo stats must exist initially"
737 );
738
739 let container_bar = repo.get_log_container(Arc::new(ComponentIdentity::new(
741 ExtendedMoniker::parse_str("./bar").unwrap(),
742 "fuchsia-pkg://bar",
743 )));
744
745 let large_str = "b".repeat(1000);
746 for i in 2..1000 {
747 container_bar.ingest_message(make_message(
748 &large_str,
749 None,
750 zx::BootInstant::from_nanos(i),
751 ));
752 fasync::Timer::new(Duration::from_millis(10)).await;
753 if !repo.mutable_state.lock().is_live(&identity_foo) {
754 break;
755 }
756 }
757
758 assert!(
759 !repo.mutable_state.lock().is_live(&identity_foo),
760 "foo container must be inactive after rollout"
761 );
762
763 let hierarchy = fuchsia_inspect::reader::read(&inspector).await.unwrap();
765 assert!(
766 hierarchy.get_child_by_path(&["log_sources", "foo"]).is_none(),
767 "foo stats must be cleaned up after rollout"
768 );
769 }
770}