1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursorStream, FxtMessage, SharedBuffer};
11use crate::logs::stats::LogStreamStats;
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15 ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_async as fasync;
20use fuchsia_inspect as inspect;
21use fuchsia_inspect_derive::WithInspect;
22use fuchsia_sync::Mutex;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31
32#[derive(Ord, PartialOrd, Eq, PartialEq)]
34pub struct ComponentInitialInterest {
35 component: UrlOrMoniker,
37 log_severity: Severity,
39}
40impl FromStr for ComponentInitialInterest {
43 type Err = anyhow::Error;
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 let mut split = s.rsplitn(2, ":");
46 match (split.next(), split.next()) {
47 (Some(severity), Some(url_or_moniker)) => {
48 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
49 return Err(format_err!("invalid url or moniker"));
50 };
51 let Ok(severity) = Severity::from_str(severity) else {
52 return Err(format_err!("invalid severity"));
53 };
54 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
55 }
56 _ => Err(format_err!("invalid interest")),
57 }
58 }
59}
60
61#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
62pub enum UrlOrMoniker {
63 Url(FlyStr),
65 Moniker(ExtendedMoniker),
67 Partial(FlyStr),
69}
70
71impl FromStr for UrlOrMoniker {
72 type Err = ();
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
75 || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
76 {
77 Ok(UrlOrMoniker::Url(s.into()))
78 } else if s.starts_with("/") {
79 if let Ok(moniker) = Moniker::from_str(s) {
80 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
81 } else {
82 Err(())
83 }
84 } else {
85 Ok(UrlOrMoniker::Partial(s.into()))
86 }
87 }
88}
89
90pub const STATIC_CONNECTION_ID: usize = 0;
92static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
93
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
96
97pub const ARCHIVIST_TAG: u64 = 0;
102
103pub struct LogsRepository {
106 mutable_state: Mutex<LogsRepositoryState>,
107 shared_buffer: Arc<SharedBuffer>,
108 scope_handle: fasync::ScopeHandle,
109}
110
111impl LogsRepository {
112 pub fn new(
113 ring_buffer: ring_buffer::Reader,
114 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
115 parent: &fuchsia_inspect::Node,
116 scope: fasync::Scope,
117 ) -> Arc<Self> {
118 let scope_handle = scope.to_handle();
119 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
120 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
121 let me_clone = Weak::clone(me);
122 let shared_buffer = SharedBuffer::new(
123 ring_buffer,
124 Box::new(move |identity| {
125 if let Some(this) = me_clone.upgrade() {
126 this.on_container_inactive(&identity);
127 }
128 }),
129 Default::default(),
130 );
131 if let Some(m) = ARCHIVIST_MONIKER.get() {
132 let archivist_container = mutable_state.create_log_container(
133 Arc::new(ComponentIdentity::new(
134 ExtendedMoniker::ComponentInstance(m.clone()),
135 "fuchsia-pkg://UNKNOWN",
136 )),
137 &shared_buffer,
138 Weak::clone(me),
139 );
140 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
142 }
143 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
144 })
145 }
146
147 pub async fn flush(&self) {
148 self.shared_buffer.flush().await;
149 }
150
151 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
154 where
155 K: DebugLog + Send + Sync + 'static,
156 {
157 let mut mutable_state = self.mutable_state.lock();
158
159 if mutable_state.draining_klog {
162 return;
163 }
164 mutable_state.draining_klog = true;
165
166 let container =
167 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
168 let Some(ref scope) = mutable_state.scope else {
169 return;
170 };
171 scope.spawn(async move {
172 debug!("Draining debuglog.");
173 let mut kernel_logger = DebugLogBridge::create(klog_reader);
174 let mut messages = match kernel_logger.existing_logs() {
175 Ok(messages) => messages,
176 Err(e) => {
177 error!(e:%; "failed to read from kernel log, important logs may be missing");
178 return;
179 }
180 };
181 messages.sort_by_key(|m| m.timestamp());
182 for message in messages {
183 container.ingest_message(message);
184 }
185
186 let res = kernel_logger
187 .listen()
188 .try_for_each(|message| async {
189 container.ingest_message(message);
190 Ok(())
191 })
192 .await;
193 if let Err(e) = res {
194 error!(e:%; "failed to drain kernel log, important logs may be missing");
195 }
196 });
197 }
198
199 pub fn logs_cursor_raw(
200 &self,
201 mode: StreamMode,
202 selectors: Vec<ComponentSelector>,
203 ) -> FilterCursorStream<FxtMessage> {
204 self.shared_buffer.cursor(mode, selectors).into()
205 }
206
207 pub fn logs_cursor(
210 &self,
211 mode: StreamMode,
212 selectors: Vec<ComponentSelector>,
213 ) -> FilterCursorStream<LogsData> {
214 self.shared_buffer.cursor(mode, selectors).into()
215 }
216
217 #[cfg(test)]
222 pub fn get_log_container(
223 self: &Arc<Self>,
224 identity: Arc<ComponentIdentity>,
225 ) -> Arc<LogsArtifactsContainer> {
226 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
227 }
228
229 pub async fn wait_for_termination(&self) {
232 let Some(scope) = self.mutable_state.lock().scope.take() else {
233 error!("Attempted to terminate twice");
234 return;
235 };
236 scope.join().await;
237 debug!("Log ingestion stopped.");
239 self.shared_buffer.terminate().await;
242 for container in self.mutable_state.lock().logs_data_store.values() {
243 container.terminate();
244 }
245 }
246
247 pub fn stop_accepting_new_log_sinks(&self) {
250 self.scope_handle.close();
251 }
252
253 pub fn new_interest_connection(&self) -> usize {
256 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
257 }
258
259 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
261 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
262 }
263
264 pub fn finish_interest_connection(&self, connection_id: usize) {
266 self.mutable_state.lock().finish_interest_connection(connection_id);
267 }
268
269 fn on_container_inactive(&self, identity: &ComponentIdentity) {
270 let mut repo = self.mutable_state.lock();
271 if !repo.is_live(identity) {
272 repo.remove(identity);
273 }
274 }
275}
276
277#[cfg(test)]
278impl LogsRepository {
279 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
280 use crate::logs::shared_buffer::create_ring_buffer;
281
282 LogsRepository::new(
283 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
284 std::iter::empty(),
285 &Default::default(),
286 scope,
287 )
288 }
289}
290
291impl EventConsumer for LogsRepository {
292 fn handle(self: Arc<Self>, event: Event) {
293 match event.payload {
294 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
295 component,
296 request_stream,
297 }) => {
298 debug!(identity:% = component; "LogSink requested.");
299 let mut mutable_state = self.mutable_state.lock();
304 let container =
305 mutable_state.get_log_container(component, &self.shared_buffer, &self);
306 container.handle_log_sink(request_stream, self.scope_handle.clone());
307 }
308 _ => unreachable!("Archivist state just subscribes to log sink requested"),
309 }
310 }
311}
312
313pub struct LogsRepositoryState {
314 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
315 inspect_node: inspect::Node,
316
317 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
320
321 draining_klog: bool,
323
324 scope: Option<fasync::Scope>,
326
327 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
329}
330
331impl LogsRepositoryState {
332 fn new(
333 parent: &fuchsia_inspect::Node,
334 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
335 scope: fasync::Scope,
336 ) -> Self {
337 Self {
338 inspect_node: parent.create_child("log_sources"),
339 logs_data_store: HashMap::new(),
340 interest_registrations: BTreeMap::new(),
341 draining_klog: false,
342 initial_interests: initial_interests
343 .map(|ComponentInitialInterest { component, log_severity }| {
344 (component, log_severity)
345 })
346 .collect(),
347 scope: Some(scope),
348 }
349 }
350
351 pub fn get_log_container(
354 &mut self,
355 identity: Arc<ComponentIdentity>,
356 shared_buffer: &Arc<SharedBuffer>,
357 repo: &Arc<LogsRepository>,
358 ) -> Arc<LogsArtifactsContainer> {
359 match self.logs_data_store.get(&identity) {
360 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
361 Some(existing) => Arc::clone(existing),
362 }
363 }
364
365 fn create_log_container(
366 &mut self,
367 identity: Arc<ComponentIdentity>,
368 shared_buffer: &Arc<SharedBuffer>,
369 repo: Weak<LogsRepository>,
370 ) -> Arc<LogsArtifactsContainer> {
371 let initial_interest = self.get_initial_interest(identity.as_ref());
372 let stats = LogStreamStats::default()
373 .with_inspect(&self.inspect_node, identity.moniker.as_ref())
374 .expect("failed to attach component log stats");
375 stats.set_url(&identity.url);
376 let stats = Arc::new(stats);
377 let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
378 let container = Arc::new(LogsArtifactsContainer::new(
379 Arc::clone(&identity),
380 self.interest_registrations.values().flat_map(|s| s.iter()),
381 initial_interest,
382 stats,
383 buffer,
384 Some(Box::new(move |c| {
385 if let Some(repo) = repo.upgrade() {
386 repo.on_container_inactive(&c.identity)
387 }
388 })),
389 ));
390 self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
391 container
392 }
393
394 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
395 let exact_url_severity =
396 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
397 let exact_moniker_severity =
398 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
399
400 let partial_severity = self
401 .initial_interests
402 .iter()
403 .filter_map(|(uom, severity)| match uom {
404 UrlOrMoniker::Partial(p) => {
405 if identity.url.contains(p.as_str())
406 || identity.moniker.to_string().contains(p.as_str())
407 {
408 Some(*severity)
409 } else {
410 None
411 }
412 }
413 _ => None,
414 })
415 .min();
416
417 [exact_url_severity, exact_moniker_severity, partial_severity]
418 .into_iter()
419 .flatten()
420 .min()
421 .map(FidlSeverity::from)
422 }
423
424 fn is_live(&self, identity: &ComponentIdentity) -> bool {
425 match self.logs_data_store.get(identity) {
426 Some(container) => container.is_active(),
427 None => false,
428 }
429 }
430
431 fn maybe_update_own_logs_interest(
434 &mut self,
435 selectors: &[LogInterestSelector],
436 clear_interest: bool,
437 ) {
438 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
439 let lowest_selector = selectors
440 .iter()
441 .filter(|selector| {
442 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
453 matches!(
454 &s[..],
455 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
456 )
457 }) {
458 return false;
459 }
460
461 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
462 })
463 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
464 if let Some(selector) = lowest_selector {
465 if clear_interest {
466 log::set_max_level(LevelFilter::Info);
467 } else {
468 log::set_max_level(
469 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
470 FidlSeverity::Trace => LevelFilter::Trace,
471 FidlSeverity::Debug => LevelFilter::Debug,
472 FidlSeverity::Info => LevelFilter::Info,
473 FidlSeverity::Warn => LevelFilter::Warn,
474 FidlSeverity::Error => LevelFilter::Error,
475 FidlSeverity::Fatal => LevelFilter::Error,
478 FidlSeverity::__SourceBreaking { .. } => return,
479 },
480 );
481 }
482 }
483 }
484
485 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
486 self.maybe_update_own_logs_interest(&selectors, false);
487 let previous_selectors =
488 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
489 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
491 for logs_data in self.logs_data_store.values() {
492 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
493 }
494 }
495
496 pub fn finish_interest_connection(&mut self, connection_id: usize) {
497 let selectors = self.interest_registrations.remove(&connection_id);
498 if let Some(selectors) = selectors {
499 self.maybe_update_own_logs_interest(&selectors, true);
500 for logs_data in self.logs_data_store.values() {
501 logs_data.reset_interest(&selectors);
502 }
503 }
504 }
505
506 pub fn remove(&mut self, identity: &ComponentIdentity) {
507 self.logs_data_store.remove(identity);
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::logs::shared_buffer::create_ring_buffer;
515 use crate::logs::testing::make_message;
516 use fidl_fuchsia_logger::LogSinkMarker;
517
518 use moniker::ExtendedMoniker;
519 use selectors::FastError;
520
521 #[fuchsia::test]
522 async fn data_repo_filters_logs_by_selectors() {
523 let repo = LogsRepository::for_test(fasync::Scope::new());
524 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
525 ExtendedMoniker::parse_str("./foo").unwrap(),
526 "fuchsia-pkg://foo",
527 )));
528 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
529 ExtendedMoniker::parse_str("./bar").unwrap(),
530 "fuchsia-pkg://bar",
531 )));
532
533 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
534 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
535 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
536
537 let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
538
539 let results =
540 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
541 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
542
543 let filtered_stream = repo.logs_cursor(
544 StreamMode::Snapshot,
545 vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
546 );
547
548 let results =
549 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
550 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
551 }
552
553 #[fuchsia::test]
554 async fn data_repo_correctly_sets_initial_interests() {
555 let repo = LogsRepository::new(
556 create_ring_buffer(100000),
557 [
558 ComponentInitialInterest {
559 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
560 log_severity: Severity::Info,
561 },
562 ComponentInitialInterest {
563 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
564 log_severity: Severity::Warn,
565 },
566 ComponentInitialInterest {
567 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
568 log_severity: Severity::Error,
569 },
570 ComponentInitialInterest {
571 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
572 log_severity: Severity::Debug,
573 },
574 ]
575 .into_iter(),
576 &fuchsia_inspect::Node::default(),
577 fasync::Scope::new(),
578 );
579
580 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
582 ExtendedMoniker::parse_str("core/foo").unwrap(),
583 "fuchsia-pkg://foo",
584 )));
585 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
586 .await;
587
588 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
590 ExtendedMoniker::parse_str("core/baz").unwrap(),
591 "fuchsia-pkg://baz",
592 )));
593 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
594 .await;
595
596 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
599 ExtendedMoniker::parse_str("core/bar").unwrap(),
600 "fuchsia-pkg://bar",
601 )));
602 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
603 .await;
604
605 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
608 ExtendedMoniker::parse_str("core/quux").unwrap(),
609 "fuchsia-pkg://quux",
610 )));
611 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
612 }
613
614 #[fuchsia::test]
615 async fn data_repo_correctly_handles_partial_matching() {
616 let repo = LogsRepository::new(
617 create_ring_buffer(100000),
618 [
619 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
620 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
621 "/core/bust:DEBUG".parse(),
622 "core/bar:ERROR".parse(),
623 "foo:DEBUG".parse(),
624 "both:TRACE".parse(),
625 ]
626 .into_iter()
627 .map(Result::unwrap),
628 &fuchsia_inspect::Node::default(),
629 fasync::Scope::new(),
630 );
631
632 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
634 ExtendedMoniker::parse_str("core/foo").unwrap(),
635 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
636 )));
637 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
638 .await;
639
640 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
642 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
643 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
644 )));
645 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
646 .await;
647
648 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
650 ExtendedMoniker::parse_str("core/baz").unwrap(),
651 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
652 )));
653 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
654 .await;
655
656 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
659 ExtendedMoniker::parse_str("core/bar").unwrap(),
660 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
661 )));
662 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
663 .await;
664
665 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
668 ExtendedMoniker::parse_str("core/quux").unwrap(),
669 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
670 )));
671 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
672
673 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
675 ExtendedMoniker::parse_str("core/both").unwrap(),
676 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
677 )));
678 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
679 .await;
680
681 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
683 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
684 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
685 )));
686 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
687 }
688
689 async fn expect_initial_interest(
690 expected_severity: Option<FidlSeverity>,
691 container: Arc<LogsArtifactsContainer>,
692 scope: fasync::ScopeHandle,
693 ) {
694 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
695 container.handle_log_sink(stream, scope);
696 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
697 assert_eq!(initial_interest.min_severity, expected_severity);
698 }
699}