1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_component_client::connect::connect_to_protocol;
12use fuchsia_sync::Mutex;
13use futures::stream::StreamExt;
14use std::borrow::Borrow;
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[cfg(fuchsia_api_level_less_than = "27")]
21use fidl_fuchsia_diagnostics::Interest;
22#[cfg(fuchsia_api_level_at_least = "27")]
23use fidl_fuchsia_diagnostics_types::Interest;
24
25mod filter;
26mod sink;
27
28use filter::InterestFilter;
29use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
30
31pub use diagnostics_log_encoding::Metatag;
32pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
33pub use paste::paste;
34
35#[cfg(test)]
36use std::{
37 sync::atomic::{AtomicI64, Ordering},
38 time::Duration,
39};
40
41pub trait OnInterestChanged {
43 fn on_changed(&self, severity: Severity);
45}
46
47#[derive(Default)]
49pub struct PublisherOptions<'t> {
50 pub(crate) interest: Interest,
51 listen_for_interest_updates: bool,
52 log_sink_client: Option<ClientEnd<LogSinkMarker>>,
53 pub(crate) metatags: HashSet<Metatag>,
54 pub(crate) tags: &'t [&'t str],
55 pub(crate) always_log_file_line: bool,
56 register_global_logger: bool,
57}
58
59impl Default for PublishOptions<'static> {
60 fn default() -> Self {
61 Self {
62 publisher: PublisherOptions {
63 listen_for_interest_updates: true,
67 register_global_logger: true,
68 ..PublisherOptions::default()
69 },
70 install_panic_hook: true,
71 panic_prefix: None,
72 }
73 }
74}
75
76macro_rules! publisher_options {
77 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
78 $(
79 impl<'t> $name<'t> {
80 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
84 let this = &mut $self$(.$self_arg)*;
85 this.always_log_file_line = enable;
86 $self
87 }
88
89 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
95 let this = &mut $self$(.$self_arg)*;
96 this.listen_for_interest_updates = enable;
97 $self
98 }
99
100 pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
104 let this = &mut $self$(.$self_arg)*;
105 this.log_sink_client = Some(client);
106 $self
107 }
108
109 pub fn register_global_logger(mut $self, value: bool) -> Self {
114 let this = &mut $self$(.$self_arg)*;
115 this.register_global_logger = value;
116 $self
117 }
118 }
119 )*
120 };
121}
122
123publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
124
125pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
133 let result = Publisher::new_sync_with_async_listener(opts.publisher);
134 if matches!(result, Err(PublishError::MissingOnInit)) {
135 return Ok(());
139 }
140 result?;
141 if opts.install_panic_hook {
142 crate::install_panic_hook(opts.panic_prefix);
143 }
144 Ok(())
145}
146
147pub fn set_minimum_severity(severity: impl Into<Severity>) {
150 let severity: Severity = severity.into();
151 log::set_max_level(severity.into());
152}
153
154pub fn initialize_sync(opts: PublishOptions<'_>) {
164 match Publisher::new_sync(opts.publisher) {
165 Ok(_) => {}
166 Err(PublishError::MissingOnInit) => {
167 return;
171 }
172 Err(e) => panic!("Unable to initialize logging: {e:?}"),
173 }
174 if opts.install_panic_hook {
175 crate::install_panic_hook(opts.panic_prefix);
176 }
177}
178
179#[derive(Clone)]
182pub struct Publisher {
183 inner: Arc<InnerPublisher>,
184}
185
186struct InnerPublisher {
187 sink: IoBufferSink,
188 filter: InterestFilter,
189}
190
191impl Publisher {
192 fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
193 Self {
194 inner: Arc::new(InnerPublisher {
195 sink: IoBufferSink::new(
196 iob,
197 SinkConfig {
198 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
199 metatags: opts.metatags,
200 always_log_file_line: opts.always_log_file_line,
201 },
202 ),
203 filter: InterestFilter::new(opts.interest),
204 }),
205 }
206 }
207
208 pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
211 let listen_for_interest_updates = opts.listen_for_interest_updates;
212 let (publisher, client) = Self::new_sync_no_listener(opts)?;
213 if listen_for_interest_updates {
214 let publisher = publisher.clone();
215 std::thread::spawn(move || {
216 fuchsia_async::LocalExecutor::default()
217 .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
218 });
219 }
220 Ok(publisher)
221 }
222
223 pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
226 let listen_for_interest_updates = opts.listen_for_interest_updates;
227 let (publisher, client) = Self::new_sync_no_listener(opts)?;
228 if listen_for_interest_updates {
229 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
230 .detach();
231 }
232 Ok(publisher)
233 }
234
235 fn new_sync_no_listener(
238 mut opts: PublisherOptions<'_>,
239 ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
240 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
241
242 if listen_for_interest_updates && !register_global_logger {
243 return Err(PublishError::UnsupportedOption);
246 }
247
248 let client = match opts.log_sink_client.take() {
249 Some(log_sink) => log_sink,
250 None => connect_to_protocol()
251 .map_err(|e| e.to_string())
252 .map_err(PublishError::LogSinkConnect)?,
253 };
254
255 let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
256 let Ok(LogSinkEvent::OnInit {
257 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
258 }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
259 else {
260 return Err(PublishError::MissingOnInit);
261 };
262
263 let publisher = Self::new(opts, iob);
264
265 if register_global_logger {
266 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
267 }
268
269 Ok((publisher, client))
270 }
271
272 pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
275 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
276
277 if listen_for_interest_updates && !register_global_logger {
278 return Err(PublishError::UnsupportedOption);
281 }
282
283 let proxy = match opts.log_sink_client.take() {
284 Some(log_sink) => log_sink.into_proxy(),
285 None => connect_to_protocol()
286 .map_err(|e| e.to_string())
287 .map_err(PublishError::LogSinkConnect)?,
288 };
289
290 let Some(Ok(LogSinkEvent::OnInit {
291 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
292 })) = proxy.take_event_stream().next().await
293 else {
294 return Err(PublishError::MissingOnInit);
295 };
296
297 let publisher = Self::new(opts, iob);
298
299 if register_global_logger {
300 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
301 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
302 }
303
304 Ok(publisher)
305 }
306
307 pub fn event_for_testing(&self, record: TestRecord<'_>) {
309 if self.inner.filter.enabled_for_testing(&record) {
310 self.inner.sink.event_for_testing(record);
311 }
312 }
313
314 pub fn set_interest_listener<T>(&self, listener: T)
316 where
317 T: OnInterestChanged + Send + Sync + 'static,
318 {
319 self.inner.filter.set_interest_listener(listener);
320 }
321
322 pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
325 self.inner.filter.update_interest(interest.unwrap_or_default());
326 unsafe {
329 let ptr = Arc::into_raw(self.inner.clone());
330 log::set_logger(&*ptr).inspect_err(|_| {
331 let _ = Arc::from_raw(ptr);
332 })?;
333 }
334 Ok(())
335 }
336
337 async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
340 self.inner.filter.listen_for_interest_updates(proxy).await;
341 }
342}
343
344impl log::Log for InnerPublisher {
345 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
346 true
349 }
350
351 fn log(&self, record: &log::Record<'_>) {
352 self.sink.record_log(record);
353 }
354
355 fn flush(&self) {}
356}
357
358impl log::Log for Publisher {
359 #[inline]
360 fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
361 self.inner.enabled(metadata)
362 }
363
364 #[inline]
365 fn log(&self, record: &log::Record<'_>) {
366 self.inner.log(record)
367 }
368
369 #[inline]
370 fn flush(&self) {
371 self.inner.flush()
372 }
373}
374
375impl Borrow<InterestFilter> for InnerPublisher {
376 fn borrow(&self) -> &InterestFilter {
377 &self.filter
378 }
379}
380
381pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
385 BufferedPublisher::new(opts.publisher)?;
386 if opts.install_panic_hook {
387 crate::install_panic_hook(opts.panic_prefix);
388 }
389 Ok(())
390}
391
392pub struct BufferedPublisher {
396 sink: BufferedSink,
397 filter: InterestFilter,
398 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
399}
400
401impl BufferedPublisher {
402 pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
405 if opts.listen_for_interest_updates && !opts.register_global_logger {
406 return Err(PublishError::UnsupportedOption);
409 }
410
411 let client = match opts.log_sink_client {
412 Some(log_sink) => log_sink,
413 None => connect_to_protocol()
414 .map_err(|e| e.to_string())
415 .map_err(PublishError::LogSinkConnect)?,
416 };
417
418 let this = Arc::new(Self {
419 sink: BufferedSink::new(SinkConfig {
420 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
421 metatags: opts.metatags,
422 always_log_file_line: opts.always_log_file_line,
423 }),
424 filter: InterestFilter::new(opts.interest),
425 interest_listening_task: Mutex::default(),
426 });
427
428 if opts.register_global_logger {
429 unsafe {
433 log::set_logger(&*Arc::into_raw(this.clone()))?;
434 }
435 }
436
437 let this_clone = this.clone();
441 *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
442 let proxy = client.into_proxy();
443
444 let Some(Ok(LogSinkEvent::OnInit {
445 payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
446 })) = proxy.take_event_stream().next().await
447 else {
448 return;
451 };
452
453 this.filter.update_interest(
457 (if opts.listen_for_interest_updates { interest } else { None })
458 .unwrap_or_default(),
459 );
460
461 this.sink.set_buffer(buffer);
462
463 if opts.listen_for_interest_updates {
464 this.filter.listen_for_interest_updates(proxy).await;
465 }
466 }));
467
468 Ok(this_clone)
469 }
470}
471
472impl log::Log for BufferedPublisher {
473 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
474 true
477 }
478
479 fn log(&self, record: &log::Record<'_>) {
480 self.sink.record_log(record);
481 }
482
483 fn flush(&self) {}
484}
485
486impl Borrow<InterestFilter> for BufferedPublisher {
487 fn borrow(&self) -> &InterestFilter {
488 &self.filter
489 }
490}
491
492#[derive(Debug, Error)]
494pub enum PublishError {
495 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
497 LogSinkConnect(String),
498
499 #[error("failed to create a socket for logging")]
501 MakeSocket(#[source] zx::Status),
502
503 #[error("failed to send a socket to the LogSink")]
505 SendSocket(#[source] fidl::Error),
506
507 #[error("failed to install the loger")]
509 InitLogForward(#[from] log::SetLoggerError),
510
511 #[error("unsupported option")]
513 UnsupportedOption,
514
515 #[error("did not receive the OnInit event")]
517 MissingOnInit,
518}
519
520#[cfg(test)]
521static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
522
523#[cfg(test)]
525pub fn increment_clock(duration: Duration) {
526 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
527}
528
529#[doc(hidden)]
531pub fn get_now() -> i64 {
532 #[cfg(not(test))]
533 return zx::MonotonicInstant::get().into_nanos();
534
535 #[cfg(test)]
536 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
537}
538
539#[macro_export]
544macro_rules! log_every_n_seconds {
545 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
546 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
547 use $crate::{paste, fuchsia::get_now};
548
549 let now = get_now();
550
551 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
552 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
553 paste! {
554 log::[< $severity:lower >]!($($arg)*);
555 }
556 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
557 }
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use assert_matches::assert_matches;
565 use diagnostics_reader::ArchiveReader;
566 use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
567 use fuchsia_async::TimeoutExt;
568 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
569 use futures::{FutureExt, StreamExt, future};
570 use log::{debug, error, info};
571 use moniker::ExtendedMoniker;
572
573 #[fuchsia::test]
574 async fn panic_integration_test() {
575 let builder = RealmBuilder::new().await.unwrap();
576 let puppet = builder
577 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
578 .await
579 .unwrap();
580 builder
581 .add_route(
582 Route::new()
583 .capability(Capability::protocol::<CrasherMarker>())
584 .from(&puppet)
585 .to(Ref::parent()),
586 )
587 .await
588 .unwrap();
589 let realm = builder.build().await.unwrap();
590 let child_name = realm.root.child_name();
591 let reader = ArchiveReader::logs();
592 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
593 let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
594 let target_moniker =
595 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
596 .unwrap();
597 proxy.crash("This is a test panic.").await.unwrap();
598
599 let result =
600 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
601 assert_eq!(result.line_number(), Some(29).as_ref());
602 assert_eq!(
603 result.file_path(),
604 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
605 );
606 assert!(
607 result
608 .payload_keys()
609 .unwrap()
610 .get_property("info")
611 .unwrap()
612 .to_string()
613 .contains("This is a test panic.")
614 );
615 }
616
617 #[fuchsia::test(logging = false)]
618 async fn verify_setting_minimum_log_severity() {
619 let reader = ArchiveReader::logs();
620 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
621 let _publisher = Publisher::new_async(PublisherOptions {
622 tags: &["verify_setting_minimum_log_severity"],
623 register_global_logger: true,
624 ..PublisherOptions::default()
625 })
626 .await
627 .expect("initialized log");
628
629 info!("I'm an info log");
630 debug!("I'm a debug log and won't show up");
631
632 set_minimum_severity(Severity::Debug);
633 debug!("I'm a debug log and I show up");
634
635 let results = logs
636 .filter(|data| {
637 future::ready(
638 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
639 )
640 })
641 .take(2)
642 .collect::<Vec<_>>()
643 .await;
644 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
645 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
646 }
647
648 #[fuchsia::test]
649 async fn log_macro_logs_are_recorded() {
650 let reader = ArchiveReader::logs();
651 let (logs, mut errors) = reader.snapshot_then_subscribe().unwrap().split_streams();
652
653 let total_threads = 10;
654
655 for i in 0..total_threads {
656 std::thread::spawn(move || {
657 log::info!(thread=i; "log from thread {}", i);
658 });
659 }
660
661 let mut filtered = 0;
662 let mut results = logs
663 .filter(|data| {
664 future::ready(
665 if data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded") {
666 true
667 } else {
668 filtered += 1;
669 false
670 },
671 )
672 })
673 .take(total_threads);
674
675 let mut seen = vec![];
676 while let Some(log) = results
677 .next()
678 .on_timeout(Duration::from_secs(30), || {
679 error!("Timed out!");
680 None
681 })
682 .await
683 {
684 let hierarchy = log.payload_keys().unwrap();
685 assert_eq!(hierarchy.properties.len(), 1);
686 assert_eq!(hierarchy.properties[0].name(), "thread");
687 let thread_id = hierarchy.properties[0].uint().unwrap();
688 seen.push(thread_id as usize);
689 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
690 }
691
692 assert_matches!(errors.next().now_or_never(), None);
693
694 seen.sort();
695 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>(), "filtered={filtered}");
696 }
697}