1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_component_client::connect::connect_to_protocol;
12use fuchsia_sync::Mutex;
13use futures::stream::StreamExt;
14use std::borrow::Borrow;
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[cfg(fuchsia_api_level_less_than = "27")]
21use fidl_fuchsia_diagnostics::Interest;
22#[cfg(fuchsia_api_level_at_least = "27")]
23use fidl_fuchsia_diagnostics_types::Interest;
24
25mod filter;
26mod sink;
27
28use filter::InterestFilter;
29use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
30
31pub use diagnostics_log_encoding::Metatag;
32pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
33pub use paste::paste;
34
35#[cfg(test)]
36use std::{
37 sync::atomic::{AtomicI64, Ordering},
38 time::Duration,
39};
40
41pub trait OnInterestChanged {
43 fn on_changed(&self, severity: Severity);
45}
46
47#[derive(Default)]
49pub struct PublisherOptions<'t> {
50 blocking: bool,
51 pub(crate) interest: Interest,
52 listen_for_interest_updates: bool,
53 log_sink_client: Option<ClientEnd<LogSinkMarker>>,
54 pub(crate) metatags: HashSet<Metatag>,
55 pub(crate) tags: &'t [&'t str],
56 pub(crate) always_log_file_line: bool,
57 register_global_logger: bool,
58}
59
60impl Default for PublishOptions<'static> {
61 fn default() -> Self {
62 Self {
63 publisher: PublisherOptions {
64 listen_for_interest_updates: true,
68 register_global_logger: true,
69 ..PublisherOptions::default()
70 },
71 install_panic_hook: true,
72 panic_prefix: None,
73 }
74 }
75}
76
77macro_rules! publisher_options {
78 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
79 $(
80 impl<'t> $name<'t> {
81 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
85 let this = &mut $self$(.$self_arg)*;
86 this.always_log_file_line = enable;
87 $self
88 }
89
90 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
96 let this = &mut $self$(.$self_arg)*;
97 this.listen_for_interest_updates = enable;
98 $self
99 }
100
101 pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
105 let this = &mut $self$(.$self_arg)*;
106 this.log_sink_client = Some(client);
107 $self
108 }
109
110 pub fn blocking(mut $self, is_blocking: bool) -> Self {
115 let this = &mut $self$(.$self_arg)*;
116 this.blocking = is_blocking;
117 $self
118 }
119
120 pub fn register_global_logger(mut $self, value: bool) -> Self {
125 let this = &mut $self$(.$self_arg)*;
126 this.register_global_logger = value;
127 $self
128 }
129 }
130 )*
131 };
132}
133
134publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
135
136pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
144 let result = Publisher::new_sync_with_async_listener(opts.publisher);
145 if matches!(result, Err(PublishError::MissingOnInit)) {
146 return Ok(());
150 }
151 result?;
152 if opts.install_panic_hook {
153 crate::install_panic_hook(opts.panic_prefix);
154 }
155 Ok(())
156}
157
158pub fn set_minimum_severity(severity: impl Into<Severity>) {
161 let severity: Severity = severity.into();
162 log::set_max_level(severity.into());
163}
164
165pub fn initialize_sync(opts: PublishOptions<'_>) {
175 match Publisher::new_sync(opts.publisher) {
176 Ok(_) => {}
177 Err(PublishError::MissingOnInit) => {
178 return;
182 }
183 Err(e) => panic!("Unable to initialize logging: {e:?}"),
184 }
185 if opts.install_panic_hook {
186 crate::install_panic_hook(opts.panic_prefix);
187 }
188}
189
190#[derive(Clone)]
193pub struct Publisher {
194 inner: Arc<InnerPublisher>,
195}
196
197struct InnerPublisher {
198 sink: IoBufferSink,
199 filter: InterestFilter,
200}
201
202impl Publisher {
203 fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
204 Self {
205 inner: Arc::new(InnerPublisher {
206 sink: IoBufferSink::new(
207 iob,
208 SinkConfig {
209 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
210 metatags: opts.metatags,
211 always_log_file_line: opts.always_log_file_line,
212 },
213 ),
214 filter: InterestFilter::new(opts.interest),
215 }),
216 }
217 }
218
219 pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
222 let listen_for_interest_updates = opts.listen_for_interest_updates;
223 let (publisher, client) = Self::new_sync_no_listener(opts)?;
224 if listen_for_interest_updates {
225 let publisher = publisher.clone();
226 std::thread::spawn(move || {
227 fuchsia_async::LocalExecutor::default()
228 .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
229 });
230 }
231 Ok(publisher)
232 }
233
234 pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
237 let listen_for_interest_updates = opts.listen_for_interest_updates;
238 let (publisher, client) = Self::new_sync_no_listener(opts)?;
239 if listen_for_interest_updates {
240 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
241 .detach();
242 }
243 Ok(publisher)
244 }
245
246 fn new_sync_no_listener(
249 mut opts: PublisherOptions<'_>,
250 ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
251 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
252
253 if listen_for_interest_updates && !register_global_logger {
254 return Err(PublishError::UnsupportedOption);
257 }
258
259 let client = match opts.log_sink_client.take() {
260 Some(log_sink) => log_sink,
261 None => connect_to_protocol()
262 .map_err(|e| e.to_string())
263 .map_err(PublishError::LogSinkConnect)?,
264 };
265
266 let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
267 let Ok(LogSinkEvent::OnInit {
268 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
269 }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
270 else {
271 return Err(PublishError::MissingOnInit);
272 };
273 drop(proxy);
274
275 let publisher = Self::new(opts, iob);
276
277 if register_global_logger {
278 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
279 }
280
281 Ok((publisher, client))
282 }
283
284 pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
287 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
288
289 if listen_for_interest_updates && !register_global_logger {
290 return Err(PublishError::UnsupportedOption);
293 }
294
295 let proxy = match opts.log_sink_client.take() {
296 Some(log_sink) => log_sink.into_proxy(),
297 None => connect_to_protocol()
298 .map_err(|e| e.to_string())
299 .map_err(PublishError::LogSinkConnect)?,
300 };
301
302 let Some(Ok(LogSinkEvent::OnInit {
303 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
304 })) = proxy.take_event_stream().next().await
305 else {
306 return Err(PublishError::MissingOnInit);
307 };
308
309 let publisher = Self::new(opts, iob);
310
311 if register_global_logger {
312 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
313 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
314 }
315
316 Ok(publisher)
317 }
318
319 pub fn event_for_testing(&self, record: TestRecord<'_>) {
321 if self.inner.filter.enabled_for_testing(&record) {
322 self.inner.sink.event_for_testing(record);
323 }
324 }
325
326 pub fn set_interest_listener<T>(&self, listener: T)
328 where
329 T: OnInterestChanged + Send + Sync + 'static,
330 {
331 self.inner.filter.set_interest_listener(listener);
332 }
333
334 pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
337 self.inner.filter.update_interest(interest.unwrap_or_default());
338 unsafe {
341 let ptr = Arc::into_raw(self.inner.clone());
342 log::set_logger(&*ptr).inspect_err(|_| {
343 let _ = Arc::from_raw(ptr);
344 })?;
345 }
346 Ok(())
347 }
348
349 async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
352 self.inner.filter.listen_for_interest_updates(proxy).await;
353 }
354}
355
356impl log::Log for InnerPublisher {
357 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
358 true
361 }
362
363 fn log(&self, record: &log::Record<'_>) {
364 self.sink.record_log(record);
365 }
366
367 fn flush(&self) {}
368}
369
370impl log::Log for Publisher {
371 #[inline]
372 fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
373 self.inner.enabled(metadata)
374 }
375
376 #[inline]
377 fn log(&self, record: &log::Record<'_>) {
378 self.inner.log(record)
379 }
380
381 #[inline]
382 fn flush(&self) {
383 self.inner.flush()
384 }
385}
386
387impl Borrow<InterestFilter> for InnerPublisher {
388 fn borrow(&self) -> &InterestFilter {
389 &self.filter
390 }
391}
392
393pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
397 BufferedPublisher::new(opts.publisher)?;
398 if opts.install_panic_hook {
399 crate::install_panic_hook(opts.panic_prefix);
400 }
401 Ok(())
402}
403
404pub struct BufferedPublisher {
408 sink: BufferedSink,
409 filter: InterestFilter,
410 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
411}
412
413impl BufferedPublisher {
414 pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
417 if opts.listen_for_interest_updates && !opts.register_global_logger {
418 return Err(PublishError::UnsupportedOption);
421 }
422
423 let client = match opts.log_sink_client {
424 Some(log_sink) => log_sink,
425 None => connect_to_protocol()
426 .map_err(|e| e.to_string())
427 .map_err(PublishError::LogSinkConnect)?,
428 };
429
430 let this = Arc::new(Self {
431 sink: BufferedSink::new(SinkConfig {
432 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
433 metatags: opts.metatags,
434 always_log_file_line: opts.always_log_file_line,
435 }),
436 filter: InterestFilter::new(opts.interest),
437 interest_listening_task: Mutex::default(),
438 });
439
440 if opts.register_global_logger {
441 unsafe {
445 log::set_logger(&*Arc::into_raw(this.clone()))?;
446 }
447 }
448
449 let this_clone = this.clone();
453 *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
454 let proxy = client.into_proxy();
455
456 let Some(Ok(LogSinkEvent::OnInit {
457 payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
458 })) = proxy.take_event_stream().next().await
459 else {
460 return;
463 };
464
465 this.filter.update_interest(
469 (if opts.listen_for_interest_updates { interest } else { None })
470 .unwrap_or_default(),
471 );
472
473 this.sink.set_buffer(buffer);
474
475 if opts.listen_for_interest_updates {
476 this.filter.listen_for_interest_updates(proxy).await;
477 }
478 }));
479
480 Ok(this_clone)
481 }
482}
483
484impl log::Log for BufferedPublisher {
485 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
486 true
489 }
490
491 fn log(&self, record: &log::Record<'_>) {
492 self.sink.record_log(record);
493 }
494
495 fn flush(&self) {}
496}
497
498impl Borrow<InterestFilter> for BufferedPublisher {
499 fn borrow(&self) -> &InterestFilter {
500 &self.filter
501 }
502}
503
504#[derive(Debug, Error)]
506pub enum PublishError {
507 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
509 LogSinkConnect(String),
510
511 #[error("failed to create a socket for logging")]
513 MakeSocket(#[source] zx::Status),
514
515 #[error("failed to send a socket to the LogSink")]
517 SendSocket(#[source] fidl::Error),
518
519 #[error("failed to install the loger")]
521 InitLogForward(#[from] log::SetLoggerError),
522
523 #[error("unsupported option")]
525 UnsupportedOption,
526
527 #[error("did not receive the OnInit event")]
529 MissingOnInit,
530}
531
532#[cfg(test)]
533static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
534
535#[cfg(test)]
537pub fn increment_clock(duration: Duration) {
538 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
539}
540
541#[doc(hidden)]
543pub fn get_now() -> i64 {
544 #[cfg(not(test))]
545 return zx::MonotonicInstant::get().into_nanos();
546
547 #[cfg(test)]
548 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
549}
550
551#[macro_export]
556macro_rules! log_every_n_seconds {
557 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
558 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
559 use $crate::{paste, fuchsia::get_now};
560
561 let now = get_now();
562
563 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
564 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
565 paste! {
566 log::[< $severity:lower >]!($($arg)*);
567 }
568 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
569 }
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use diagnostics_reader::ArchiveReader;
577 use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
578 use fuchsia_async::TimeoutExt;
579 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
580 use futures::{StreamExt, future};
581 use log::{debug, error, info};
582 use moniker::ExtendedMoniker;
583
584 #[fuchsia::test]
585 async fn panic_integration_test() {
586 let builder = RealmBuilder::new().await.unwrap();
587 let puppet = builder
588 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
589 .await
590 .unwrap();
591 builder
592 .add_route(
593 Route::new()
594 .capability(Capability::protocol::<CrasherMarker>())
595 .from(&puppet)
596 .to(Ref::parent()),
597 )
598 .await
599 .unwrap();
600 let realm = builder.build().await.unwrap();
601 let child_name = realm.root.child_name();
602 let reader = ArchiveReader::logs();
603 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
604 let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
605 let target_moniker =
606 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
607 .unwrap();
608 proxy.crash("This is a test panic.").await.unwrap();
609
610 let result =
611 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
612 assert_eq!(result.line_number(), Some(29).as_ref());
613 assert_eq!(
614 result.file_path(),
615 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
616 );
617 assert!(
618 result
619 .payload_keys()
620 .unwrap()
621 .get_property("info")
622 .unwrap()
623 .to_string()
624 .contains("This is a test panic.")
625 );
626 }
627
628 #[fuchsia::test(logging = false)]
629 async fn verify_setting_minimum_log_severity() {
630 let reader = ArchiveReader::logs();
631 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
632 let _publisher = Publisher::new_async(PublisherOptions {
633 tags: &["verify_setting_minimum_log_severity"],
634 register_global_logger: true,
635 ..PublisherOptions::default()
636 })
637 .await
638 .expect("initialized log");
639
640 info!("I'm an info log");
641 debug!("I'm a debug log and won't show up");
642
643 set_minimum_severity(Severity::Debug);
644 debug!("I'm a debug log and I show up");
645
646 let results = logs
647 .filter(|data| {
648 future::ready(
649 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
650 )
651 })
652 .take(2)
653 .collect::<Vec<_>>()
654 .await;
655 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
656 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
657 }
658
659 #[fuchsia::test]
660 async fn log_macro_logs_are_recorded() {
661 let reader = ArchiveReader::logs();
662 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
663
664 let total_threads = 10;
665
666 for i in 0..total_threads {
667 std::thread::spawn(move || {
668 log::info!(thread=i; "log from thread {}", i);
669 });
670 }
671
672 let mut results = logs
673 .filter(|data| {
674 future::ready(
675 data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
676 )
677 })
678 .take(total_threads);
679
680 let mut seen = vec![];
681 while let Some(log) = results
682 .next()
683 .on_timeout(Duration::from_secs(30), || {
684 error!("Timed out!");
685 None
686 })
687 .await
688 {
689 let hierarchy = log.payload_keys().unwrap();
690 assert_eq!(hierarchy.properties.len(), 1);
691 assert_eq!(hierarchy.properties[0].name(), "thread");
692 let thread_id = hierarchy.properties[0].uint().unwrap();
693 seen.push(thread_id as usize);
694 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
695 }
696 seen.sort();
697 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
698 }
699}