1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursorStream, FxtMessage, SharedBuffer};
11use crate::logs::stats::LogStreamStats;
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15 ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_inspect_derive::WithInspect;
20use fuchsia_sync::Mutex;
21use fuchsia_url::AbsoluteComponentUrl;
22use fuchsia_url::boot_url::BootUrl;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect};
32
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36 component: UrlOrMoniker,
38 log_severity: Severity,
40}
41impl FromStr for ComponentInitialInterest {
44 type Err = anyhow::Error;
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 let mut split = s.rsplitn(2, ":");
47 match (split.next(), split.next()) {
48 (Some(severity), Some(url_or_moniker)) => {
49 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50 return Err(format_err!("invalid url or moniker"));
51 };
52 let Ok(severity) = Severity::from_str(severity) else {
53 return Err(format_err!("invalid severity"));
54 };
55 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56 }
57 _ => Err(format_err!("invalid interest")),
58 }
59 }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64 Url(FlyStr),
66 Moniker(ExtendedMoniker),
68 Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73 type Err = ();
74 fn from_str(s: &str) -> Result<Self, Self::Err> {
75 if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76 Ok(UrlOrMoniker::Url(s.into()))
77 } else if s.starts_with("/") {
78 if let Ok(moniker) = Moniker::from_str(s) {
79 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80 } else {
81 Err(())
82 }
83 } else {
84 Ok(UrlOrMoniker::Partial(s.into()))
85 }
86 }
87}
88
89pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96pub const ARCHIVIST_TAG: u64 = 0;
101
102pub struct LogsRepository {
105 mutable_state: Mutex<LogsRepositoryState>,
106 shared_buffer: Arc<SharedBuffer>,
107 scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111 pub fn new(
112 ring_buffer: ring_buffer::Reader,
113 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114 parent: &fuchsia_inspect::Node,
115 scope: fasync::Scope,
116 ) -> Arc<Self> {
117 let scope_handle = scope.to_handle();
118 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120 let me_clone = Weak::clone(me);
121 let shared_buffer = SharedBuffer::new(
122 ring_buffer,
123 Box::new(move |identity| {
124 if let Some(this) = me_clone.upgrade() {
125 this.on_container_inactive(&identity);
126 }
127 }),
128 Default::default(),
129 );
130 if let Some(m) = ARCHIVIST_MONIKER.get() {
131 let archivist_container = mutable_state.create_log_container(
132 Arc::new(ComponentIdentity::new(
133 ExtendedMoniker::ComponentInstance(m.clone()),
134 "fuchsia-pkg://UNKNOWN",
135 )),
136 &shared_buffer,
137 Weak::clone(me),
138 );
139 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
141 }
142 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
143 })
144 }
145
146 pub async fn flush(&self) {
147 self.shared_buffer.flush().await;
148 }
149
150 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
153 where
154 K: DebugLog + Send + Sync + 'static,
155 {
156 let mut mutable_state = self.mutable_state.lock();
157
158 if mutable_state.draining_klog {
161 return;
162 }
163 mutable_state.draining_klog = true;
164
165 let container =
166 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
167 let Some(ref scope) = mutable_state.scope else {
168 return;
169 };
170 scope.spawn(async move {
171 debug!("Draining debuglog.");
172 let mut kernel_logger = DebugLogBridge::create(klog_reader);
173 let mut messages = match kernel_logger.existing_logs() {
174 Ok(messages) => messages,
175 Err(e) => {
176 error!(e:%; "failed to read from kernel log, important logs may be missing");
177 return;
178 }
179 };
180 messages.sort_by_key(|m| m.timestamp());
181 for message in messages {
182 container.ingest_message(message);
183 }
184
185 let res = kernel_logger
186 .listen()
187 .try_for_each(|message| async {
188 container.ingest_message(message);
189 Ok(())
190 })
191 .await;
192 if let Err(e) = res {
193 error!(e:%; "failed to drain kernel log, important logs may be missing");
194 }
195 });
196 }
197
198 pub fn logs_cursor_raw(
199 &self,
200 mode: StreamMode,
201 selectors: Vec<ComponentSelector>,
202 ) -> FilterCursorStream<FxtMessage> {
203 self.shared_buffer.cursor(mode, selectors).into()
204 }
205
206 pub fn logs_cursor(
209 &self,
210 mode: StreamMode,
211 selectors: Vec<ComponentSelector>,
212 ) -> FilterCursorStream<LogsData> {
213 self.shared_buffer.cursor(mode, selectors).into()
214 }
215
216 #[cfg(test)]
221 pub fn get_log_container(
222 self: &Arc<Self>,
223 identity: Arc<ComponentIdentity>,
224 ) -> Arc<LogsArtifactsContainer> {
225 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
226 }
227
228 pub async fn wait_for_termination(&self) {
231 let Some(scope) = self.mutable_state.lock().scope.take() else {
232 error!("Attempted to terminate twice");
233 return;
234 };
235 scope.join().await;
236 debug!("Log ingestion stopped.");
238 self.shared_buffer.terminate().await;
241 for container in self.mutable_state.lock().logs_data_store.values() {
242 container.terminate();
243 }
244 }
245
246 pub fn stop_accepting_new_log_sinks(&self) {
249 self.scope_handle.close();
250 }
251
252 pub fn new_interest_connection(&self) -> usize {
255 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
256 }
257
258 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
260 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
261 }
262
263 pub fn finish_interest_connection(&self, connection_id: usize) {
265 self.mutable_state.lock().finish_interest_connection(connection_id);
266 }
267
268 fn on_container_inactive(&self, identity: &ComponentIdentity) {
269 let mut repo = self.mutable_state.lock();
270 if !repo.is_live(identity) {
271 repo.remove(identity);
272 }
273 }
274}
275
276#[cfg(test)]
277impl LogsRepository {
278 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
279 use crate::logs::shared_buffer::create_ring_buffer;
280
281 LogsRepository::new(
282 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
283 std::iter::empty(),
284 &Default::default(),
285 scope,
286 )
287 }
288}
289
290impl EventConsumer for LogsRepository {
291 fn handle(self: Arc<Self>, event: Event) {
292 match event.payload {
293 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
294 component,
295 request_stream,
296 }) => {
297 debug!(identity:% = component; "LogSink requested.");
298 let mut mutable_state = self.mutable_state.lock();
303 let container =
304 mutable_state.get_log_container(component, &self.shared_buffer, &self);
305 container.handle_log_sink(request_stream, self.scope_handle.clone());
306 }
307 _ => unreachable!("Archivist state just subscribes to log sink requested"),
308 }
309 }
310}
311
312pub struct LogsRepositoryState {
313 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
314 inspect_node: inspect::Node,
315
316 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
319
320 draining_klog: bool,
322
323 scope: Option<fasync::Scope>,
325
326 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
328}
329
330impl LogsRepositoryState {
331 fn new(
332 parent: &fuchsia_inspect::Node,
333 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
334 scope: fasync::Scope,
335 ) -> Self {
336 Self {
337 inspect_node: parent.create_child("log_sources"),
338 logs_data_store: HashMap::new(),
339 interest_registrations: BTreeMap::new(),
340 draining_klog: false,
341 initial_interests: initial_interests
342 .map(|ComponentInitialInterest { component, log_severity }| {
343 (component, log_severity)
344 })
345 .collect(),
346 scope: Some(scope),
347 }
348 }
349
350 pub fn get_log_container(
353 &mut self,
354 identity: Arc<ComponentIdentity>,
355 shared_buffer: &Arc<SharedBuffer>,
356 repo: &Arc<LogsRepository>,
357 ) -> Arc<LogsArtifactsContainer> {
358 match self.logs_data_store.get(&identity) {
359 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
360 Some(existing) => Arc::clone(existing),
361 }
362 }
363
364 fn create_log_container(
365 &mut self,
366 identity: Arc<ComponentIdentity>,
367 shared_buffer: &Arc<SharedBuffer>,
368 repo: Weak<LogsRepository>,
369 ) -> Arc<LogsArtifactsContainer> {
370 let initial_interest = self.get_initial_interest(identity.as_ref());
371 let stats = LogStreamStats::default()
372 .with_inspect(&self.inspect_node, identity.moniker.as_ref())
373 .expect("failed to attach component log stats");
374 stats.set_url(&identity.url);
375 let stats = Arc::new(stats);
376 let container = Arc::new(LogsArtifactsContainer::new(
377 Arc::clone(&identity),
378 self.interest_registrations.values().flat_map(|s| s.iter()),
379 initial_interest,
380 Arc::clone(&stats),
381 shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
382 Some(Box::new(move |c| {
383 if let Some(repo) = repo.upgrade() {
384 repo.on_container_inactive(&c.identity)
385 }
386 })),
387 ));
388 self.logs_data_store.insert(identity, Arc::clone(&container));
389 container
390 }
391
392 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
393 let exact_url_severity =
394 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
395 let exact_moniker_severity =
396 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
397
398 let partial_severity = self
399 .initial_interests
400 .iter()
401 .filter_map(|(uom, severity)| match uom {
402 UrlOrMoniker::Partial(p) => {
403 if identity.url.contains(p.as_str())
404 || identity.moniker.to_string().contains(p.as_str())
405 {
406 Some(*severity)
407 } else {
408 None
409 }
410 }
411 _ => None,
412 })
413 .min();
414
415 [exact_url_severity, exact_moniker_severity, partial_severity]
416 .into_iter()
417 .flatten()
418 .min()
419 .map(FidlSeverity::from)
420 }
421
422 fn is_live(&self, identity: &ComponentIdentity) -> bool {
423 match self.logs_data_store.get(identity) {
424 Some(container) => container.is_active(),
425 None => false,
426 }
427 }
428
429 fn maybe_update_own_logs_interest(
432 &mut self,
433 selectors: &[LogInterestSelector],
434 clear_interest: bool,
435 ) {
436 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
437 let lowest_selector = selectors
438 .iter()
439 .filter(|selector| {
440 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
451 matches!(
452 &s[..],
453 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
454 )
455 }) {
456 return false;
457 }
458
459 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
460 })
461 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
462 if let Some(selector) = lowest_selector {
463 if clear_interest {
464 log::set_max_level(LevelFilter::Info);
465 } else {
466 log::set_max_level(
467 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
468 FidlSeverity::Trace => LevelFilter::Trace,
469 FidlSeverity::Debug => LevelFilter::Debug,
470 FidlSeverity::Info => LevelFilter::Info,
471 FidlSeverity::Warn => LevelFilter::Warn,
472 FidlSeverity::Error => LevelFilter::Error,
473 FidlSeverity::Fatal => LevelFilter::Error,
476 FidlSeverity::__SourceBreaking { .. } => return,
477 },
478 );
479 }
480 }
481 }
482
483 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
484 self.maybe_update_own_logs_interest(&selectors, false);
485 let previous_selectors =
486 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
487 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
489 for logs_data in self.logs_data_store.values() {
490 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
491 }
492 }
493
494 pub fn finish_interest_connection(&mut self, connection_id: usize) {
495 let selectors = self.interest_registrations.remove(&connection_id);
496 if let Some(selectors) = selectors {
497 self.maybe_update_own_logs_interest(&selectors, true);
498 for logs_data in self.logs_data_store.values() {
499 logs_data.reset_interest(&selectors);
500 }
501 }
502 }
503
504 pub fn remove(&mut self, identity: &ComponentIdentity) {
505 self.logs_data_store.remove(identity);
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use crate::logs::shared_buffer::create_ring_buffer;
513 use crate::logs::testing::make_message;
514 use fidl_fuchsia_logger::LogSinkMarker;
515
516 use moniker::ExtendedMoniker;
517 use selectors::FastError;
518
519 #[fuchsia::test]
520 async fn data_repo_filters_logs_by_selectors() {
521 let repo = LogsRepository::for_test(fasync::Scope::new());
522 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
523 ExtendedMoniker::parse_str("./foo").unwrap(),
524 "fuchsia-pkg://foo",
525 )));
526 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527 ExtendedMoniker::parse_str("./bar").unwrap(),
528 "fuchsia-pkg://bar",
529 )));
530
531 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
532 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
533 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
534
535 let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
536
537 let results =
538 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
539 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
540
541 let filtered_stream = repo.logs_cursor(
542 StreamMode::Snapshot,
543 vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
544 );
545
546 let results =
547 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
548 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
549 }
550
551 #[fuchsia::test]
552 async fn data_repo_correctly_sets_initial_interests() {
553 let repo = LogsRepository::new(
554 create_ring_buffer(100000),
555 [
556 ComponentInitialInterest {
557 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
558 log_severity: Severity::Info,
559 },
560 ComponentInitialInterest {
561 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
562 log_severity: Severity::Warn,
563 },
564 ComponentInitialInterest {
565 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
566 log_severity: Severity::Error,
567 },
568 ComponentInitialInterest {
569 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
570 log_severity: Severity::Debug,
571 },
572 ]
573 .into_iter(),
574 &fuchsia_inspect::Node::default(),
575 fasync::Scope::new(),
576 );
577
578 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
580 ExtendedMoniker::parse_str("core/foo").unwrap(),
581 "fuchsia-pkg://foo",
582 )));
583 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
584 .await;
585
586 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
588 ExtendedMoniker::parse_str("core/baz").unwrap(),
589 "fuchsia-pkg://baz",
590 )));
591 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
592 .await;
593
594 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597 ExtendedMoniker::parse_str("core/bar").unwrap(),
598 "fuchsia-pkg://bar",
599 )));
600 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
601 .await;
602
603 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
606 ExtendedMoniker::parse_str("core/quux").unwrap(),
607 "fuchsia-pkg://quux",
608 )));
609 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
610 }
611
612 #[fuchsia::test]
613 async fn data_repo_correctly_handles_partial_matching() {
614 let repo = LogsRepository::new(
615 create_ring_buffer(100000),
616 [
617 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
618 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
619 "/core/bust:DEBUG".parse(),
620 "core/bar:ERROR".parse(),
621 "foo:DEBUG".parse(),
622 "both:TRACE".parse(),
623 ]
624 .into_iter()
625 .map(Result::unwrap),
626 &fuchsia_inspect::Node::default(),
627 fasync::Scope::new(),
628 );
629
630 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
632 ExtendedMoniker::parse_str("core/foo").unwrap(),
633 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
634 )));
635 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
636 .await;
637
638 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
640 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
641 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
642 )));
643 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
644 .await;
645
646 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
648 ExtendedMoniker::parse_str("core/baz").unwrap(),
649 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
650 )));
651 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
652 .await;
653
654 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
657 ExtendedMoniker::parse_str("core/bar").unwrap(),
658 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
659 )));
660 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
661 .await;
662
663 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
666 ExtendedMoniker::parse_str("core/quux").unwrap(),
667 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
668 )));
669 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
670
671 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
673 ExtendedMoniker::parse_str("core/both").unwrap(),
674 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
675 )));
676 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
677 .await;
678
679 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
681 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
682 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
683 )));
684 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
685 }
686
687 async fn expect_initial_interest(
688 expected_severity: Option<FidlSeverity>,
689 container: Arc<LogsArtifactsContainer>,
690 scope: fasync::ScopeHandle,
691 ) {
692 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
693 container.handle_log_sink(stream, scope);
694 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
695 assert_eq!(initial_interest.min_severity, expected_severity);
696 }
697}