1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::LogsArtifactsContainer;
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::shared_buffer::{FilterCursorStream, FxtMessage, SharedBuffer};
11use crate::logs::stats::{GlobalAnalytics, LogStreamStats};
12use anyhow::format_err;
13use diagnostics_data::{LogsData, Severity};
14use fidl_fuchsia_diagnostics::{
15 ComponentSelector, LogInterestSelector, StreamMode, StringSelector,
16};
17use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
18use flyweights::FlyStr;
19use fuchsia_async as fasync;
20use fuchsia_inspect as inspect;
21use fuchsia_inspect_derive::WithInspect;
22use fuchsia_sync::Mutex;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31
32#[derive(Ord, PartialOrd, Eq, PartialEq)]
34pub struct ComponentInitialInterest {
35 component: UrlOrMoniker,
37 log_severity: Severity,
39}
40impl FromStr for ComponentInitialInterest {
43 type Err = anyhow::Error;
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 let mut split = s.rsplitn(2, ":");
46 match (split.next(), split.next()) {
47 (Some(severity), Some(url_or_moniker)) => {
48 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
49 return Err(format_err!("invalid url or moniker"));
50 };
51 let Ok(severity) = Severity::from_str(severity) else {
52 return Err(format_err!("invalid severity"));
53 };
54 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
55 }
56 _ => Err(format_err!("invalid interest")),
57 }
58 }
59}
60
61#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
62pub enum UrlOrMoniker {
63 Url(FlyStr),
65 Moniker(ExtendedMoniker),
67 Partial(FlyStr),
69}
70
71impl FromStr for UrlOrMoniker {
72 type Err = ();
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 if fuchsia_url::fuchsia_pkg::AbsoluteComponentUrl::from_str(s).is_ok()
75 || fuchsia_url::boot::AbsoluteComponentUrl::parse(s).is_ok()
76 {
77 Ok(UrlOrMoniker::Url(s.into()))
78 } else if s.starts_with("/") {
79 if let Ok(moniker) = Moniker::from_str(s) {
80 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
81 } else {
82 Err(())
83 }
84 } else {
85 Ok(UrlOrMoniker::Partial(s.into()))
86 }
87 }
88}
89
90pub const STATIC_CONNECTION_ID: usize = 0;
92static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
93
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
96
97pub const ARCHIVIST_TAG: u64 = 0;
102
103pub struct LogsRepository {
106 mutable_state: Mutex<LogsRepositoryState>,
107 shared_buffer: Arc<SharedBuffer>,
108 scope_handle: fasync::ScopeHandle,
109}
110
111impl LogsRepository {
112 pub fn new(
113 ring_buffer: ring_buffer::Reader,
114 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
115 parent: &fuchsia_inspect::Node,
116 scope: fasync::Scope,
117 ) -> Arc<Self> {
118 let scope_handle = scope.to_handle();
119 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
120 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
121 let me_clone = Weak::clone(me);
122 let shared_buffer = SharedBuffer::new(
123 ring_buffer,
124 Box::new(move |identity| {
125 if let Some(this) = me_clone.upgrade() {
126 this.on_container_inactive(&identity);
127 }
128 }),
129 Default::default(),
130 mutable_state.global_analytics.logs_node(),
131 );
132 if let Some(m) = ARCHIVIST_MONIKER.get() {
133 let archivist_container = mutable_state.create_log_container(
134 Arc::new(ComponentIdentity::new(
135 ExtendedMoniker::ComponentInstance(m.clone()),
136 "fuchsia-pkg://UNKNOWN",
137 )),
138 &shared_buffer,
139 Weak::clone(me),
140 );
141 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
143 }
144 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
145 })
146 }
147
148 pub async fn flush(&self) {
149 self.shared_buffer.flush().await;
150 }
151
152 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
155 where
156 K: DebugLog + Send + Sync + 'static,
157 {
158 let mut mutable_state = self.mutable_state.lock();
159
160 if mutable_state.draining_klog {
163 return;
164 }
165 mutable_state.draining_klog = true;
166
167 let container =
168 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
169 let Some(ref scope) = mutable_state.scope else {
170 return;
171 };
172 scope.spawn(async move {
173 debug!("Draining debuglog.");
174 let mut kernel_logger = DebugLogBridge::create(klog_reader);
175 let mut messages = match kernel_logger.existing_logs() {
176 Ok(messages) => messages,
177 Err(e) => {
178 error!(e:%; "failed to read from kernel log, important logs may be missing");
179 return;
180 }
181 };
182 messages.sort_by_key(|m| m.timestamp());
183 for message in messages {
184 container.ingest_message(message);
185 }
186
187 let res = kernel_logger
188 .listen()
189 .try_for_each(|message| async {
190 container.ingest_message(message);
191 Ok(())
192 })
193 .await;
194 if let Err(e) = res {
195 error!(e:%; "failed to drain kernel log, important logs may be missing");
196 }
197 });
198 }
199
200 pub fn logs_cursor_raw(
201 &self,
202 mode: StreamMode,
203 selectors: Vec<ComponentSelector>,
204 ) -> FilterCursorStream<FxtMessage> {
205 self.shared_buffer.cursor(mode, selectors).into()
206 }
207
208 pub fn logs_cursor(
211 &self,
212 mode: StreamMode,
213 selectors: Vec<ComponentSelector>,
214 ) -> FilterCursorStream<LogsData> {
215 self.shared_buffer.cursor(mode, selectors).into()
216 }
217
218 #[cfg(test)]
223 pub fn get_log_container(
224 self: &Arc<Self>,
225 identity: Arc<ComponentIdentity>,
226 ) -> Arc<LogsArtifactsContainer> {
227 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
228 }
229
230 pub async fn wait_for_termination(&self) {
233 let Some(scope) = self.mutable_state.lock().scope.take() else {
234 error!("Attempted to terminate twice");
235 return;
236 };
237 scope.join().await;
238 debug!("Log ingestion stopped.");
240 self.shared_buffer.terminate().await;
243 for container in self.mutable_state.lock().logs_data_store.values() {
244 container.terminate();
245 }
246 }
247
248 pub fn stop_accepting_new_log_sinks(&self) {
251 self.scope_handle.close();
252 }
253
254 pub fn new_interest_connection(&self) -> usize {
257 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
258 }
259
260 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
262 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
263 }
264
265 pub fn finish_interest_connection(&self, connection_id: usize) {
267 self.mutable_state.lock().finish_interest_connection(connection_id);
268 }
269
270 fn on_container_inactive(&self, identity: &ComponentIdentity) {
271 let mut repo = self.mutable_state.lock();
272 if !repo.is_live(identity) {
273 repo.remove(identity);
274 }
275 }
276}
277
278#[cfg(test)]
279impl LogsRepository {
280 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
281 use crate::logs::shared_buffer::create_ring_buffer;
282
283 LogsRepository::new(
284 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
285 std::iter::empty(),
286 &Default::default(),
287 scope,
288 )
289 }
290}
291
292impl EventConsumer for LogsRepository {
293 fn handle(self: Arc<Self>, event: Event) {
294 match event.payload {
295 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
296 component,
297 request_stream,
298 }) => {
299 debug!(identity:% = component; "LogSink requested.");
300 let mut mutable_state = self.mutable_state.lock();
305 let container =
306 mutable_state.get_log_container(component, &self.shared_buffer, &self);
307 container.handle_log_sink(request_stream, self.scope_handle.clone());
308 }
309 _ => unreachable!("Archivist state just subscribes to log sink requested"),
310 }
311 }
312}
313
314pub struct LogsRepositoryState {
315 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
316 inspect_node: inspect::Node,
317 global_analytics: GlobalAnalytics,
318
319 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
322
323 draining_klog: bool,
325
326 scope: Option<fasync::Scope>,
328
329 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
331}
332
333impl LogsRepositoryState {
334 fn new(
335 parent: &fuchsia_inspect::Node,
336 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
337 scope: fasync::Scope,
338 ) -> Self {
339 Self {
340 inspect_node: parent.create_child("log_sources"),
341 logs_data_store: HashMap::new(),
342 interest_registrations: BTreeMap::new(),
343 draining_klog: false,
344 initial_interests: initial_interests
345 .map(|ComponentInitialInterest { component, log_severity }| {
346 (component, log_severity)
347 })
348 .collect(),
349 scope: Some(scope),
350 global_analytics: GlobalAnalytics::new(parent),
351 }
352 }
353
354 pub fn get_log_container(
357 &mut self,
358 identity: Arc<ComponentIdentity>,
359 shared_buffer: &Arc<SharedBuffer>,
360 repo: &Arc<LogsRepository>,
361 ) -> Arc<LogsArtifactsContainer> {
362 match self.logs_data_store.get(&identity) {
363 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
364 Some(existing) => Arc::clone(existing),
365 }
366 }
367
368 fn create_log_container(
369 &mut self,
370 identity: Arc<ComponentIdentity>,
371 shared_buffer: &Arc<SharedBuffer>,
372 repo: Weak<LogsRepository>,
373 ) -> Arc<LogsArtifactsContainer> {
374 let initial_interest = self.get_initial_interest(identity.as_ref());
375 let stats = LogStreamStats::default()
376 .with_inspect(&self.inspect_node, identity.moniker.as_ref())
377 .expect("failed to attach component log stats");
378 stats.set_url(&identity.url);
379 let stats = Arc::new(stats);
380 let buffer = shared_buffer.new_container_buffer(Arc::clone(&identity), Arc::clone(&stats));
381 let container = Arc::new(LogsArtifactsContainer::new(
382 Arc::clone(&identity),
383 self.interest_registrations.values().flat_map(|s| s.iter()),
384 initial_interest,
385 stats,
386 buffer,
387 Some(Box::new(move |c| {
388 if let Some(repo) = repo.upgrade() {
389 repo.on_container_inactive(&c.identity)
390 }
391 })),
392 ));
393 self.logs_data_store.insert(Arc::clone(&identity), Arc::clone(&container));
394 container
395 }
396
397 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
398 let exact_url_severity =
399 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
400 let exact_moniker_severity =
401 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
402
403 let partial_severity = self
404 .initial_interests
405 .iter()
406 .filter_map(|(uom, severity)| match uom {
407 UrlOrMoniker::Partial(p) => {
408 if identity.url.contains(p.as_str())
409 || identity.moniker.to_string().contains(p.as_str())
410 {
411 Some(*severity)
412 } else {
413 None
414 }
415 }
416 _ => None,
417 })
418 .min();
419
420 [exact_url_severity, exact_moniker_severity, partial_severity]
421 .into_iter()
422 .flatten()
423 .min()
424 .map(FidlSeverity::from)
425 }
426
427 fn is_live(&self, identity: &ComponentIdentity) -> bool {
428 match self.logs_data_store.get(identity) {
429 Some(container) => container.is_active(),
430 None => false,
431 }
432 }
433
434 fn maybe_update_own_logs_interest(
437 &mut self,
438 selectors: &[LogInterestSelector],
439 clear_interest: bool,
440 ) {
441 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
442 let lowest_selector = selectors
443 .iter()
444 .filter(|selector| {
445 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
456 matches!(
457 &s[..],
458 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
459 )
460 }) {
461 return false;
462 }
463
464 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
465 })
466 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
467 if let Some(selector) = lowest_selector {
468 if clear_interest {
469 log::set_max_level(LevelFilter::Info);
470 } else {
471 log::set_max_level(
472 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
473 FidlSeverity::Trace => LevelFilter::Trace,
474 FidlSeverity::Debug => LevelFilter::Debug,
475 FidlSeverity::Info => LevelFilter::Info,
476 FidlSeverity::Warn => LevelFilter::Warn,
477 FidlSeverity::Error => LevelFilter::Error,
478 FidlSeverity::Fatal => LevelFilter::Error,
481 FidlSeverity::__SourceBreaking { .. } => return,
482 },
483 );
484 }
485 }
486 }
487
488 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
489 self.maybe_update_own_logs_interest(&selectors, false);
490 let previous_selectors =
491 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
492 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
494 for logs_data in self.logs_data_store.values() {
495 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
496 }
497 }
498
499 pub fn finish_interest_connection(&mut self, connection_id: usize) {
500 let selectors = self.interest_registrations.remove(&connection_id);
501 if let Some(selectors) = selectors {
502 self.maybe_update_own_logs_interest(&selectors, true);
503 for logs_data in self.logs_data_store.values() {
504 logs_data.reset_interest(&selectors);
505 }
506 }
507 }
508
509 pub fn remove(&mut self, identity: &ComponentIdentity) {
510 self.logs_data_store.remove(identity);
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use crate::logs::shared_buffer::create_ring_buffer;
518 use crate::logs::testing::make_message;
519 use fidl_fuchsia_logger::LogSinkMarker;
520
521 use moniker::ExtendedMoniker;
522 use selectors::FastError;
523
524 #[fuchsia::test]
525 async fn data_repo_filters_logs_by_selectors() {
526 let repo = LogsRepository::for_test(fasync::Scope::new());
527 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
528 ExtendedMoniker::parse_str("./foo").unwrap(),
529 "fuchsia-pkg://foo",
530 )));
531 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
532 ExtendedMoniker::parse_str("./bar").unwrap(),
533 "fuchsia-pkg://bar",
534 )));
535
536 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
537 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
538 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
539
540 let stream = repo.logs_cursor(StreamMode::Snapshot, Vec::new());
541
542 let results =
543 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
544 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
545
546 let filtered_stream = repo.logs_cursor(
547 StreamMode::Snapshot,
548 vec![selectors::parse_component_selector::<FastError>("foo").unwrap()],
549 );
550
551 let results =
552 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
553 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
554 }
555
556 #[fuchsia::test]
557 async fn data_repo_correctly_sets_initial_interests() {
558 let repo = LogsRepository::new(
559 create_ring_buffer(100000),
560 [
561 ComponentInitialInterest {
562 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
563 log_severity: Severity::Info,
564 },
565 ComponentInitialInterest {
566 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
567 log_severity: Severity::Warn,
568 },
569 ComponentInitialInterest {
570 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
571 log_severity: Severity::Error,
572 },
573 ComponentInitialInterest {
574 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
575 log_severity: Severity::Debug,
576 },
577 ]
578 .into_iter(),
579 &fuchsia_inspect::Node::default(),
580 fasync::Scope::new(),
581 );
582
583 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
585 ExtendedMoniker::parse_str("core/foo").unwrap(),
586 "fuchsia-pkg://foo",
587 )));
588 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
589 .await;
590
591 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
593 ExtendedMoniker::parse_str("core/baz").unwrap(),
594 "fuchsia-pkg://baz",
595 )));
596 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
597 .await;
598
599 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
602 ExtendedMoniker::parse_str("core/bar").unwrap(),
603 "fuchsia-pkg://bar",
604 )));
605 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
606 .await;
607
608 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
611 ExtendedMoniker::parse_str("core/quux").unwrap(),
612 "fuchsia-pkg://quux",
613 )));
614 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
615 }
616
617 #[fuchsia::test]
618 async fn data_repo_correctly_handles_partial_matching() {
619 let repo = LogsRepository::new(
620 create_ring_buffer(100000),
621 [
622 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
623 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
624 "/core/bust:DEBUG".parse(),
625 "core/bar:ERROR".parse(),
626 "foo:DEBUG".parse(),
627 "both:TRACE".parse(),
628 ]
629 .into_iter()
630 .map(Result::unwrap),
631 &fuchsia_inspect::Node::default(),
632 fasync::Scope::new(),
633 );
634
635 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
637 ExtendedMoniker::parse_str("core/foo").unwrap(),
638 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
639 )));
640 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
641 .await;
642
643 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
645 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
646 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
647 )));
648 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
649 .await;
650
651 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
653 ExtendedMoniker::parse_str("core/baz").unwrap(),
654 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
655 )));
656 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
657 .await;
658
659 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
662 ExtendedMoniker::parse_str("core/bar").unwrap(),
663 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
664 )));
665 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
666 .await;
667
668 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
671 ExtendedMoniker::parse_str("core/quux").unwrap(),
672 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
673 )));
674 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
675
676 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
678 ExtendedMoniker::parse_str("core/both").unwrap(),
679 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
680 )));
681 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
682 .await;
683
684 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
686 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
687 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
688 )));
689 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
690 }
691
692 async fn expect_initial_interest(
693 expected_severity: Option<FidlSeverity>,
694 container: Arc<LogsArtifactsContainer>,
695 scope: fasync::ScopeHandle,
696 ) {
697 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
698 container.handle_log_sink(stream, scope);
699 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
700 assert_eq!(initial_interest.min_severity, expected_severity);
701 }
702}