use crate::events::router::EventConsumer;
use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
use crate::identity::ComponentIdentity;
use crate::logs::container::{CursorItem, LogsArtifactsContainer};
use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
use crate::logs::shared_buffer::SharedBuffer;
use crate::logs::stats::LogStreamStats;
use crate::severity_filter::KlogSeverityFilter;
use anyhow::format_err;
use diagnostics_data::{LogsData, Severity};
use fidl_fuchsia_diagnostics::{
LogInterestSelector, Selector, Severity as FidlSeverity, StreamMode,
};
use flyweights::FlyStr;
use fuchsia_inspect_derive::WithInspect;
use fuchsia_sync::Mutex;
use fuchsia_url::boot_url::BootUrl;
use fuchsia_url::AbsoluteComponentUrl;
use futures::channel::mpsc;
use futures::prelude::*;
use moniker::{ExtendedMoniker, Moniker};
use selectors::SelectorExt;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock, Weak};
use tracing::{debug, error};
use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub struct ComponentInitialInterest {
component: UrlOrMoniker,
log_severity: Severity,
}
impl FromStr for ComponentInitialInterest {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut split = s.rsplitn(2, ":");
match (split.next(), split.next()) {
(Some(severity), Some(url_or_moniker)) => {
let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
return Err(format_err!("invalid url or moniker"));
};
let Ok(severity) = Severity::from_str(severity) else {
return Err(format_err!("invalid severity"));
};
Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
}
_ => Err(format_err!("invalid interest")),
}
}
}
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
pub enum UrlOrMoniker {
Url(FlyStr),
Moniker(ExtendedMoniker),
}
impl FromStr for UrlOrMoniker {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
Ok(UrlOrMoniker::Url(s.into()))
} else if let Ok(moniker) = Moniker::from_str(s) {
Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
} else {
Err(())
}
}
}
static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
static ARCHIVIST_MONIKER: LazyLock<Moniker> =
LazyLock::new(|| Moniker::parse_str("bootstrap/archivist").unwrap());
pub struct LogsRepository {
mutable_state: Mutex<LogsRepositoryState>,
shared_buffer: Arc<SharedBuffer>,
scope_handle: fasync::ScopeHandle,
}
impl LogsRepository {
pub fn new(
logs_max_cached_original_bytes: u64,
initial_interests: impl Iterator<Item = ComponentInitialInterest>,
parent: &fuchsia_inspect::Node,
scope: fasync::Scope,
) -> Arc<Self> {
let scope_handle = scope.to_handle();
Arc::new_cyclic(|me: &Weak<LogsRepository>| {
let me = Weak::clone(me);
LogsRepository {
scope_handle,
mutable_state: Mutex::new(LogsRepositoryState::new(
parent,
initial_interests,
scope,
)),
shared_buffer: SharedBuffer::new(
logs_max_cached_original_bytes as usize,
Box::new(move |identity| {
if let Some(this) = me.upgrade() {
this.on_container_inactive(&identity);
}
}),
),
}
})
}
pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
where
K: DebugLog + Send + Sync + 'static,
{
let mut mutable_state = self.mutable_state.lock();
if mutable_state.draining_klog {
return;
}
mutable_state.draining_klog = true;
let container =
mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
let Some(ref scope) = mutable_state.scope else {
return;
};
scope.spawn(async move {
debug!("Draining debuglog.");
let mut kernel_logger = DebugLogBridge::create(klog_reader);
let mut messages = match kernel_logger.existing_logs() {
Ok(messages) => messages,
Err(e) => {
error!(%e, "failed to read from kernel log, important logs may be missing");
return;
}
};
messages.sort_by_key(|m| m.timestamp());
for message in messages {
container.ingest_message(message);
}
let res = kernel_logger
.listen()
.try_for_each(|message| async {
container.ingest_message(message);
Ok(())
})
.await;
if let Err(e) = res {
error!(%e, "failed to drain kernel log, important logs may be missing");
}
});
}
pub fn logs_cursor_raw(
&self,
mode: StreamMode,
parent_trace_id: ftrace::Id,
) -> impl Stream<Item = CursorItem> + Send {
let mut repo = self.mutable_state.lock();
let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
let cursor = c.cursor_raw(mode);
(Arc::clone(identity), cursor)
});
let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, None, substreams);
repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
merged
}
pub fn logs_cursor(
&self,
mode: StreamMode,
selectors: Option<Vec<Selector>>,
parent_trace_id: ftrace::Id,
) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
let mut repo = self.mutable_state.lock();
let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
let cursor = c.cursor(mode, parent_trace_id);
(Arc::clone(identity), cursor)
});
let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
merged
}
pub fn get_log_container(
self: &Arc<Self>,
identity: Arc<ComponentIdentity>,
) -> Arc<LogsArtifactsContainer> {
self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
}
pub async fn wait_for_termination(&self) {
let Some(scope) = self.mutable_state.lock().scope.take() else {
error!("Attempted to terminate twice");
return;
};
scope.join().await;
debug!("Log ingestion stopped.");
self.shared_buffer.terminate().await;
let mut repo = self.mutable_state.lock();
for container in repo.logs_data_store.values() {
container.terminate();
}
repo.logs_multiplexers.terminate();
}
pub fn stop_accepting_new_log_sinks(&self) {
self.scope_handle.close();
}
pub fn new_interest_connection(&self) -> usize {
INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
}
pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
self.mutable_state.lock().update_logs_interest(connection_id, selectors);
}
pub fn finish_interest_connection(&self, connection_id: usize) {
self.mutable_state.lock().finish_interest_connection(connection_id);
}
fn on_container_inactive(&self, identity: &ComponentIdentity) {
let mut repo = self.mutable_state.lock();
if !repo.is_live(identity) {
repo.remove(identity);
}
}
}
#[cfg(test)]
impl LogsRepository {
pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
LogsRepository::new(
crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
std::iter::empty(),
&Default::default(),
scope,
)
}
}
impl EventConsumer for LogsRepository {
fn handle(self: Arc<Self>, event: Event) {
match event.payload {
EventPayload::LogSinkRequested(LogSinkRequestedPayload {
component,
request_stream,
}) => {
debug!(identity = %component, "LogSink requested.");
let container = self.get_log_container(component);
container.handle_log_sink(request_stream, self.scope_handle.clone());
}
_ => unreachable!("Archivist state just subscribes to log sink requested"),
}
}
}
pub struct LogsRepositoryState {
logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
inspect_node: inspect::Node,
logs_multiplexers: MultiplexerBroker,
interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
draining_klog: bool,
scope: Option<fasync::Scope>,
initial_interests: BTreeMap<UrlOrMoniker, Severity>,
}
impl LogsRepositoryState {
fn new(
parent: &fuchsia_inspect::Node,
initial_interests: impl Iterator<Item = ComponentInitialInterest>,
scope: fasync::Scope,
) -> Self {
Self {
inspect_node: parent.create_child("log_sources"),
logs_data_store: HashMap::new(),
logs_multiplexers: MultiplexerBroker::new(),
interest_registrations: BTreeMap::new(),
draining_klog: false,
initial_interests: initial_interests
.map(|ComponentInitialInterest { component, log_severity }| {
(component, log_severity)
})
.collect(),
scope: Some(scope),
}
}
pub fn get_log_container(
&mut self,
identity: Arc<ComponentIdentity>,
shared_buffer: &Arc<SharedBuffer>,
repo: &Arc<LogsRepository>,
) -> Arc<LogsArtifactsContainer> {
match self.logs_data_store.get(&identity) {
None => {
let initial_interest = self.get_initial_interest(identity.as_ref());
let weak_repo = Arc::downgrade(repo);
let stats = LogStreamStats::default()
.with_inspect(&self.inspect_node, identity.moniker.to_string())
.expect("failed to attach component log stats");
stats.set_url(&identity.url);
let stats = Arc::new(stats);
let container = Arc::new(LogsArtifactsContainer::new(
Arc::clone(&identity),
self.interest_registrations.values().flat_map(|s| s.iter()),
initial_interest,
Arc::clone(&stats),
shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
Some(Box::new(move |c| {
if let Some(repo) = weak_repo.upgrade() {
repo.on_container_inactive(&c.identity)
}
})),
));
self.logs_data_store.insert(identity, Arc::clone(&container));
self.logs_multiplexers.send(&container);
container
}
Some(existing) => Arc::clone(existing),
}
}
fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
match (
self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())),
self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())),
) {
(None, None) => None,
(Some(severity), None) | (None, Some(severity)) => Some(FidlSeverity::from(*severity)),
(Some(s1), Some(s2)) => Some(FidlSeverity::from(std::cmp::min(*s1, *s2))),
}
}
fn is_live(&self, identity: &ComponentIdentity) -> bool {
match self.logs_data_store.get(identity) {
Some(container) => container.is_active(),
None => false,
}
}
fn maybe_update_own_logs_interest(
&mut self,
selectors: &[LogInterestSelector],
clear_interest: bool,
) {
tracing::dispatcher::get_default(|dispatcher| {
let Some(publisher) = dispatcher.downcast_ref::<KlogSeverityFilter>() else {
return;
};
let lowest_selector = selectors
.iter()
.filter(|selector| {
ARCHIVIST_MONIKER
.matches_component_selector(&selector.selector)
.unwrap_or(false)
})
.min_by_key(|selector| {
selector.interest.min_severity.unwrap_or(FidlSeverity::Info)
});
if let Some(selector) = lowest_selector {
if clear_interest {
publisher.set_severity(FidlSeverity::Info);
} else {
publisher
.set_severity(selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
}
}
});
}
fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
self.maybe_update_own_logs_interest(&selectors, false);
let previous_selectors =
self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
for logs_data in self.logs_data_store.values() {
logs_data.update_interest(new_selectors.iter(), &previous_selectors);
}
}
pub fn finish_interest_connection(&mut self, connection_id: usize) {
let selectors = self.interest_registrations.remove(&connection_id);
if let Some(selectors) = selectors {
self.maybe_update_own_logs_interest(&selectors, true);
for logs_data in self.logs_data_store.values() {
logs_data.reset_interest(&selectors);
}
}
}
pub fn remove(&mut self, identity: &ComponentIdentity) {
self.logs_data_store.remove(identity);
}
}
type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
pub struct MultiplexerBroker {
live_iterators: Arc<Mutex<LiveIteratorsMap>>,
cleanup_sender: mpsc::UnboundedSender<usize>,
_live_iterators_cleanup_task: fasync::Task<()>,
}
impl MultiplexerBroker {
fn new() -> Self {
let (cleanup_sender, mut receiver) = mpsc::unbounded();
let live_iterators = Arc::new(Mutex::new(HashMap::new()));
let live_iterators_clone = Arc::clone(&live_iterators);
Self {
live_iterators,
cleanup_sender,
_live_iterators_cleanup_task: fasync::Task::spawn(async move {
while let Some(id) = receiver.next().await {
live_iterators_clone.lock().remove(&id);
}
}),
}
}
fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
self.cleanup_sender.clone()
}
fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
match mode {
StreamMode::Snapshot => recipient.close(),
StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
}
}
}
pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
self.live_iterators
.lock()
.retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
}
fn terminate(&mut self) {
for (_, (_, recipient)) in self.live_iterators.lock().drain() {
recipient.close();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logs::testing::make_message;
use fidl_fuchsia_logger::LogSinkMarker;
use moniker::ExtendedMoniker;
use selectors::FastError;
use std::time::Duration;
#[fuchsia::test]
async fn data_repo_filters_logs_by_selectors() {
let repo = LogsRepository::for_test(fasync::Scope::new());
let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("./foo").unwrap(),
"fuchsia-pkg://foo",
)));
let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("./bar").unwrap(),
"fuchsia-pkg://bar",
)));
foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
let results =
stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
let filtered_stream = repo.logs_cursor(
StreamMode::Snapshot,
Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
ftrace::Id::random(),
);
let results =
filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
}
#[fuchsia::test]
async fn multiplexer_broker_cleanup() {
let repo = LogsRepository::for_test(fasync::Scope::new());
let stream =
repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
drop(stream);
loop {
fasync::Timer::new(Duration::from_millis(100)).await;
if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len() == 0 {
break;
}
}
}
#[fuchsia::test]
async fn data_repo_correctly_sets_initial_interests() {
let repo = LogsRepository::new(
100000,
[
ComponentInitialInterest {
component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
log_severity: Severity::Info,
},
ComponentInitialInterest {
component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
log_severity: Severity::Warn,
},
ComponentInitialInterest {
component: UrlOrMoniker::Moniker("core/bar".try_into().unwrap()),
log_severity: Severity::Error,
},
ComponentInitialInterest {
component: UrlOrMoniker::Moniker("core/foo".try_into().unwrap()),
log_severity: Severity::Debug,
},
]
.into_iter(),
&fuchsia_inspect::Node::default(),
fasync::Scope::new(),
);
let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("core/foo").unwrap(),
"fuchsia-pkg://foo",
)));
expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
.await;
let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("core/baz").unwrap(),
"fuchsia-pkg://baz",
)));
expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
.await;
let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("core/bar").unwrap(),
"fuchsia-pkg://bar",
)));
expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
.await;
let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("core/quux").unwrap(),
"fuchsia-pkg://quux",
)));
expect_initial_interest(None, container, repo.scope_handle.clone()).await;
}
async fn expect_initial_interest(
expected_severity: Option<FidlSeverity>,
container: Arc<LogsArtifactsContainer>,
scope: fasync::ScopeHandle,
) {
let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
container.handle_log_sink(stream, scope);
let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
assert_eq!(initial_interest.min_severity, expected_severity);
}
}