1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_sync::Mutex;
12use futures::stream::StreamExt;
13use std::borrow::Borrow;
14use std::collections::HashSet;
15use std::fmt::Debug;
16use std::sync::Arc;
17use thiserror::Error;
18
19#[cfg(fuchsia_api_level_less_than = "27")]
20use fidl_fuchsia_diagnostics::Interest;
21#[cfg(fuchsia_api_level_at_least = "27")]
22use fidl_fuchsia_diagnostics_types::Interest;
23
24use fuchsia_component_client::connect::connect_to_protocol;
25#[cfg(not(feature = "no_startup_handle"))]
26use fuchsia_runtime::{HandleInfo, HandleType};
27
28mod filter;
29mod sink;
30
31use filter::InterestFilter;
32use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
33
34pub use diagnostics_log_encoding::Metatag;
35pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
36pub use paste::paste;
37
38#[cfg(test)]
39use std::{
40 sync::atomic::{AtomicI64, Ordering},
41 time::Duration,
42};
43
44pub trait OnInterestChanged {
46 fn on_changed(&self, severity: Severity);
48}
49
50#[derive(Default)]
52pub struct PublisherOptions<'t> {
53 pub(crate) interest: Interest,
54 listen_for_interest_updates: bool,
55 log_sink_client: Option<ClientEnd<LogSinkMarker>>,
56 pub(crate) metatags: HashSet<Metatag>,
57 pub(crate) tags: &'t [&'t str],
58 pub(crate) always_log_file_line: bool,
59 register_global_logger: bool,
60}
61
62impl Default for PublishOptions<'static> {
63 fn default() -> Self {
64 Self {
65 publisher: PublisherOptions {
66 listen_for_interest_updates: true,
70 register_global_logger: true,
71 ..PublisherOptions::default()
72 },
73 install_panic_hook: true,
74 panic_prefix: None,
75 }
76 }
77}
78
79macro_rules! publisher_options {
80 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
81 $(
82 impl<'t> $name<'t> {
83 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
87 let this = &mut $self$(.$self_arg)*;
88 this.always_log_file_line = enable;
89 $self
90 }
91
92 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
98 let this = &mut $self$(.$self_arg)*;
99 this.listen_for_interest_updates = enable;
100 $self
101 }
102
103 pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
107 let this = &mut $self$(.$self_arg)*;
108 this.log_sink_client = Some(client);
109 $self
110 }
111
112 pub fn register_global_logger(mut $self, value: bool) -> Self {
117 let this = &mut $self$(.$self_arg)*;
118 this.register_global_logger = value;
119 $self
120 }
121 }
122 )*
123 };
124}
125
126publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
127
128pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
136 let result = Publisher::new_sync_with_async_listener(opts.publisher);
137 if matches!(result, Err(PublishError::MissingOnInit)) {
138 return Ok(());
142 }
143 result?;
144 if opts.install_panic_hook {
145 crate::install_panic_hook(opts.panic_prefix);
146 }
147 Ok(())
148}
149
150pub fn set_minimum_severity(severity: impl Into<Severity>) {
153 let severity: Severity = severity.into();
154 log::set_max_level(severity.into());
155}
156
157pub fn initialize_sync(opts: PublishOptions<'_>) {
167 match Publisher::new_sync(opts.publisher) {
168 Ok(_) => {}
169 Err(PublishError::MissingOnInit) => {
170 return;
174 }
175 Err(e) => panic!("Unable to initialize logging: {e:?}"),
176 }
177 if opts.install_panic_hook {
178 crate::install_panic_hook(opts.panic_prefix);
179 }
180}
181
182#[derive(Clone)]
185pub struct Publisher {
186 inner: Arc<InnerPublisher>,
187}
188
189struct InnerPublisher {
190 sink: IoBufferSink,
191 filter: InterestFilter,
192}
193
194impl Publisher {
195 fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
196 Self {
197 inner: Arc::new(InnerPublisher {
198 sink: IoBufferSink::new(
199 iob,
200 SinkConfig {
201 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
202 metatags: opts.metatags,
203 always_log_file_line: opts.always_log_file_line,
204 },
205 ),
206 filter: InterestFilter::new(opts.interest),
207 }),
208 }
209 }
210
211 pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
214 let listen_for_interest_updates = opts.listen_for_interest_updates;
215 let (publisher, client) = Self::new_sync_no_listener(opts)?;
216 if listen_for_interest_updates {
217 let publisher = publisher.clone();
218 std::thread::spawn(move || {
219 fuchsia_async::LocalExecutor::default()
220 .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
221 });
222 }
223 Ok(publisher)
224 }
225
226 pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
229 let listen_for_interest_updates = opts.listen_for_interest_updates;
230 let (publisher, client) = Self::new_sync_no_listener(opts)?;
231 if listen_for_interest_updates {
232 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
233 .detach();
234 }
235 Ok(publisher)
236 }
237
238 fn new_sync_no_listener(
241 mut opts: PublisherOptions<'_>,
242 ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
243 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
244
245 if listen_for_interest_updates && !register_global_logger {
246 return Err(PublishError::UnsupportedOption);
249 }
250
251 let mut get_client = || match opts.log_sink_client.take() {
252 Some(log_sink) => Ok(log_sink),
253 None => {
254 #[cfg(not(feature = "no_startup_handle"))]
255 {
256 let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
257 HandleType::LogSink,
258 0,
259 ));
260 if let Some(log_sink) = log_sink {
261 let log_sink = fidl::Channel::from(log_sink);
262 return Ok(ClientEnd::<LogSinkMarker>::from(log_sink));
263 }
264 }
265 connect_to_protocol()
267 .map_err(|e| e.to_string())
268 .map_err(PublishError::LogSinkConnect)
269 }
270 };
271 let client = (get_client)()?;
272
273 let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
274 let Ok(LogSinkEvent::OnInit {
275 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
276 }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
277 else {
278 return Err(PublishError::MissingOnInit);
279 };
280
281 let publisher = Self::new(opts, iob);
282
283 if register_global_logger {
284 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
285 }
286
287 Ok((publisher, client))
288 }
289
290 pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
293 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
294
295 if listen_for_interest_updates && !register_global_logger {
296 return Err(PublishError::UnsupportedOption);
299 }
300
301 let mut get_client = || match opts.log_sink_client.take() {
302 Some(log_sink) => Ok(log_sink.into_proxy()),
303 None => {
304 #[cfg(not(feature = "no_startup_handle"))]
305 {
306 let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
307 HandleType::LogSink,
308 0,
309 ));
310 if let Some(log_sink) = log_sink {
311 let log_sink = fidl::Channel::from(log_sink);
312 let log_sink = ClientEnd::<LogSinkMarker>::from(log_sink);
313 return Ok(log_sink.into_proxy());
314 }
315 }
316 connect_to_protocol()
318 .map_err(|e| e.to_string())
319 .map_err(PublishError::LogSinkConnect)
320 }
321 };
322 let proxy = (get_client)()?;
323
324 let Some(Ok(LogSinkEvent::OnInit {
325 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
326 })) = proxy.take_event_stream().next().await
327 else {
328 return Err(PublishError::MissingOnInit);
329 };
330
331 let publisher = Self::new(opts, iob);
332
333 if register_global_logger {
334 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
335 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
336 }
337
338 Ok(publisher)
339 }
340
341 pub fn event_for_testing(&self, record: TestRecord<'_>) {
343 if self.inner.filter.enabled_for_testing(&record) {
344 self.inner.sink.event_for_testing(record);
345 }
346 }
347
348 pub fn set_interest_listener<T>(&self, listener: T)
350 where
351 T: OnInterestChanged + Send + Sync + 'static,
352 {
353 self.inner.filter.set_interest_listener(listener);
354 }
355
356 pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
359 self.inner.filter.update_interest(interest.unwrap_or_default());
360 unsafe {
363 let ptr = Arc::into_raw(self.inner.clone());
364 log::set_logger(&*ptr).inspect_err(|_| {
365 let _ = Arc::from_raw(ptr);
366 })?;
367 }
368 Ok(())
369 }
370
371 async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
374 self.inner.filter.listen_for_interest_updates(proxy).await;
375 }
376}
377
378impl log::Log for InnerPublisher {
379 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
380 true
383 }
384
385 fn log(&self, record: &log::Record<'_>) {
386 self.sink.record_log(record);
387 }
388
389 fn flush(&self) {}
390}
391
392impl log::Log for Publisher {
393 #[inline]
394 fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
395 self.inner.enabled(metadata)
396 }
397
398 #[inline]
399 fn log(&self, record: &log::Record<'_>) {
400 self.inner.log(record)
401 }
402
403 #[inline]
404 fn flush(&self) {
405 self.inner.flush()
406 }
407}
408
409impl Borrow<InterestFilter> for InnerPublisher {
410 fn borrow(&self) -> &InterestFilter {
411 &self.filter
412 }
413}
414
415pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
419 BufferedPublisher::new(opts.publisher)?;
420 if opts.install_panic_hook {
421 crate::install_panic_hook(opts.panic_prefix);
422 }
423 Ok(())
424}
425
426pub struct BufferedPublisher {
430 sink: BufferedSink,
431 filter: InterestFilter,
432 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
433}
434
435impl BufferedPublisher {
436 pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
439 if opts.listen_for_interest_updates && !opts.register_global_logger {
440 return Err(PublishError::UnsupportedOption);
443 }
444
445 let get_client = || match opts.log_sink_client {
446 Some(log_sink) => Ok(log_sink),
447 None => {
448 #[cfg(not(feature = "no_startup_handle"))]
449 {
450 let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
451 HandleType::LogSink,
452 0,
453 ));
454 if let Some(log_sink) = log_sink {
455 let log_sink = fidl::Channel::from(log_sink);
456 return Ok(ClientEnd::<LogSinkMarker>::from(log_sink));
457 }
458 }
459 connect_to_protocol()
461 .map_err(|e| e.to_string())
462 .map_err(PublishError::LogSinkConnect)
463 }
464 };
465 let client = (get_client)()?;
466
467 let this = Arc::new(Self {
468 sink: BufferedSink::new(SinkConfig {
469 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
470 metatags: opts.metatags,
471 always_log_file_line: opts.always_log_file_line,
472 }),
473 filter: InterestFilter::new(opts.interest),
474 interest_listening_task: Mutex::default(),
475 });
476
477 if opts.register_global_logger {
478 unsafe {
482 log::set_logger(&*Arc::into_raw(this.clone()))?;
483 }
484 }
485
486 let this_clone = this.clone();
490 *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
491 let proxy = client.into_proxy();
492
493 let Some(Ok(LogSinkEvent::OnInit {
494 payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
495 })) = proxy.take_event_stream().next().await
496 else {
497 return;
500 };
501
502 this.filter.update_interest(
506 (if opts.listen_for_interest_updates { interest } else { None })
507 .unwrap_or_default(),
508 );
509
510 this.sink.set_buffer(buffer);
511
512 if opts.listen_for_interest_updates {
513 this.filter.listen_for_interest_updates(proxy).await;
514 }
515 }));
516
517 Ok(this_clone)
518 }
519}
520
521impl log::Log for BufferedPublisher {
522 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
523 true
526 }
527
528 fn log(&self, record: &log::Record<'_>) {
529 self.sink.record_log(record);
530 }
531
532 fn flush(&self) {}
533}
534
535impl Borrow<InterestFilter> for BufferedPublisher {
536 fn borrow(&self) -> &InterestFilter {
537 &self.filter
538 }
539}
540
541#[derive(Debug, Error)]
543pub enum PublishError {
544 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
546 LogSinkConnect(String),
547
548 #[error("failed to create a socket for logging")]
550 MakeSocket(#[source] zx::Status),
551
552 #[error("failed to send a socket to the LogSink")]
554 SendSocket(#[source] fidl::Error),
555
556 #[error("failed to install the loger")]
558 InitLogForward(#[from] log::SetLoggerError),
559
560 #[error("unsupported option")]
562 UnsupportedOption,
563
564 #[error("did not receive the OnInit event")]
566 MissingOnInit,
567}
568
569#[cfg(test)]
570static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
571
572#[cfg(test)]
574pub fn increment_clock(duration: Duration) {
575 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
576}
577
578#[doc(hidden)]
580pub fn get_now() -> i64 {
581 #[cfg(not(test))]
582 return zx::MonotonicInstant::get().into_nanos();
583
584 #[cfg(test)]
585 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
586}
587
588#[macro_export]
593macro_rules! log_every_n_seconds {
594 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
595 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
596 use $crate::{paste, fuchsia::get_now};
597
598 let now = get_now();
599
600 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
601 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
602 paste! {
603 log::[< $severity:lower >]!($($arg)*);
604 }
605 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
606 }
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613 use assert_matches::assert_matches;
614 use diagnostics_reader::ArchiveReader;
615 use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
616 use fuchsia_async::TimeoutExt;
617 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
618 use futures::{FutureExt, StreamExt, future};
619 use log::{debug, error, info};
620 use moniker::ExtendedMoniker;
621
622 #[fuchsia::test]
623 async fn panic_integration_test() {
624 let builder = RealmBuilder::new().await.unwrap();
625 let puppet = builder
626 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
627 .await
628 .unwrap();
629 builder
630 .add_route(
631 Route::new()
632 .capability(Capability::protocol::<CrasherMarker>())
633 .from(&puppet)
634 .to(Ref::parent()),
635 )
636 .await
637 .unwrap();
638 let realm = builder.build().await.unwrap();
639 let child_name = realm.root.child_name();
640 let reader = ArchiveReader::logs();
641 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
642 let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
643 let target_moniker =
644 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
645 .unwrap();
646 proxy.crash("This is a test panic.").await.unwrap();
647
648 let result =
649 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
650 assert_eq!(result.line_number(), Some(29).as_ref());
651 assert_eq!(
652 result.file_path(),
653 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
654 );
655 assert!(
656 result
657 .payload_keys()
658 .unwrap()
659 .get_property("info")
660 .unwrap()
661 .to_string()
662 .contains("This is a test panic.")
663 );
664 }
665
666 #[fuchsia::test(logging = false)]
667 async fn verify_setting_minimum_log_severity() {
668 let reader = ArchiveReader::logs();
669 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
670 let _publisher = Publisher::new_async(PublisherOptions {
671 tags: &["verify_setting_minimum_log_severity"],
672 register_global_logger: true,
673 ..PublisherOptions::default()
674 })
675 .await
676 .expect("initialized log");
677
678 info!("I'm an info log");
679 debug!("I'm a debug log and won't show up");
680
681 set_minimum_severity(Severity::Debug);
682 debug!("I'm a debug log and I show up");
683
684 let results = logs
685 .filter(|data| {
686 future::ready(
687 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
688 )
689 })
690 .take(2)
691 .collect::<Vec<_>>()
692 .await;
693 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
694 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
695 }
696
697 #[fuchsia::test]
698 async fn log_macro_logs_are_recorded() {
699 let reader = ArchiveReader::logs();
700 let (logs, mut errors) = reader.snapshot_then_subscribe().unwrap().split_streams();
701
702 let total_threads = 10;
703
704 for i in 0..total_threads {
705 std::thread::spawn(move || {
706 log::info!(thread=i; "log from thread {}", i);
707 });
708 }
709
710 let mut filtered = 0;
711 let mut results = logs
712 .filter(|data| {
713 future::ready(
714 if data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded") {
715 true
716 } else {
717 filtered += 1;
718 false
719 },
720 )
721 })
722 .take(total_threads);
723
724 let mut seen = vec![];
725 while let Some(log) = results
726 .next()
727 .on_timeout(Duration::from_secs(30), || {
728 error!("Timed out!");
729 None
730 })
731 .await
732 {
733 let hierarchy = log.payload_keys().unwrap();
734 assert_eq!(hierarchy.properties.len(), 1);
735 assert_eq!(hierarchy.properties[0].name(), "thread");
736 let thread_id = hierarchy.properties[0].uint().unwrap();
737 seen.push(thread_id as usize);
738 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
739 }
740
741 assert_matches!(errors.next().now_or_never(), None);
742
743 seen.sort();
744 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>(), "filtered={filtered}");
745 }
746}