1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_component_client::connect::connect_to_protocol;
12use futures::stream::StreamExt;
13use std::borrow::Borrow;
14use std::collections::HashSet;
15use std::fmt::Debug;
16use std::sync::{Arc, Mutex};
17use thiserror::Error;
18
19#[cfg(fuchsia_api_level_less_than = "27")]
20use fidl_fuchsia_diagnostics::Interest;
21#[cfg(fuchsia_api_level_at_least = "27")]
22use fidl_fuchsia_diagnostics_types::Interest;
23
24mod filter;
25mod sink;
26
27use filter::InterestFilter;
28use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
29
30pub use diagnostics_log_encoding::Metatag;
31pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
32pub use paste::paste;
33
34#[cfg(test)]
35use std::{
36 sync::atomic::{AtomicI64, Ordering},
37 time::Duration,
38};
39
40pub trait OnInterestChanged {
42 fn on_changed(&self, severity: Severity);
44}
45
46#[derive(Default)]
48pub struct PublisherOptions<'t> {
49 blocking: bool,
50 pub(crate) interest: Interest,
51 listen_for_interest_updates: bool,
52 log_sink_client: Option<ClientEnd<LogSinkMarker>>,
53 pub(crate) metatags: HashSet<Metatag>,
54 pub(crate) tags: &'t [&'t str],
55 pub(crate) always_log_file_line: bool,
56 register_global_logger: bool,
57}
58
59impl Default for PublishOptions<'static> {
60 fn default() -> Self {
61 Self {
62 publisher: PublisherOptions {
63 listen_for_interest_updates: true,
67 register_global_logger: true,
68 ..PublisherOptions::default()
69 },
70 install_panic_hook: true,
71 panic_prefix: None,
72 }
73 }
74}
75
76macro_rules! publisher_options {
77 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
78 $(
79 impl<'t> $name<'t> {
80 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
84 let this = &mut $self$(.$self_arg)*;
85 this.always_log_file_line = enable;
86 $self
87 }
88
89 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
95 let this = &mut $self$(.$self_arg)*;
96 this.listen_for_interest_updates = enable;
97 $self
98 }
99
100 pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
104 let this = &mut $self$(.$self_arg)*;
105 this.log_sink_client = Some(client);
106 $self
107 }
108
109 pub fn blocking(mut $self, is_blocking: bool) -> Self {
114 let this = &mut $self$(.$self_arg)*;
115 this.blocking = is_blocking;
116 $self
117 }
118
119 pub fn register_global_logger(mut $self, value: bool) -> Self {
124 let this = &mut $self$(.$self_arg)*;
125 this.register_global_logger = value;
126 $self
127 }
128 }
129 )*
130 };
131}
132
133publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
134
135pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
143 let result = Publisher::new_sync_with_async_listener(opts.publisher);
144 if matches!(result, Err(PublishError::MissingOnInit)) {
145 return Ok(());
149 }
150 result?;
151 if opts.install_panic_hook {
152 crate::install_panic_hook(opts.panic_prefix);
153 }
154 Ok(())
155}
156
157pub fn set_minimum_severity(severity: impl Into<Severity>) {
160 let severity: Severity = severity.into();
161 log::set_max_level(severity.into());
162}
163
164pub fn initialize_sync(opts: PublishOptions<'_>) {
174 match Publisher::new_sync(opts.publisher) {
175 Ok(_) => {}
176 Err(PublishError::MissingOnInit) => {
177 return;
181 }
182 Err(e) => panic!("Unable to initialize logging: {e:?}"),
183 }
184 if opts.install_panic_hook {
185 crate::install_panic_hook(opts.panic_prefix);
186 }
187}
188
189#[derive(Clone)]
192pub struct Publisher {
193 inner: Arc<InnerPublisher>,
194}
195
196struct InnerPublisher {
197 sink: IoBufferSink,
198 filter: InterestFilter,
199}
200
201impl Publisher {
202 fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
203 Self {
204 inner: Arc::new(InnerPublisher {
205 sink: IoBufferSink::new(
206 iob,
207 SinkConfig {
208 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
209 metatags: opts.metatags,
210 always_log_file_line: opts.always_log_file_line,
211 },
212 ),
213 filter: InterestFilter::new(opts.interest),
214 }),
215 }
216 }
217
218 pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
221 let listen_for_interest_updates = opts.listen_for_interest_updates;
222 let (publisher, client) = Self::new_sync_no_listener(opts)?;
223 if listen_for_interest_updates {
224 let publisher = publisher.clone();
225 std::thread::spawn(move || {
226 fuchsia_async::LocalExecutor::new()
227 .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
228 });
229 }
230 Ok(publisher)
231 }
232
233 pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
236 let listen_for_interest_updates = opts.listen_for_interest_updates;
237 let (publisher, client) = Self::new_sync_no_listener(opts)?;
238 if listen_for_interest_updates {
239 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
240 .detach();
241 }
242 Ok(publisher)
243 }
244
245 fn new_sync_no_listener(
248 mut opts: PublisherOptions<'_>,
249 ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
250 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
251
252 if listen_for_interest_updates && !register_global_logger {
253 return Err(PublishError::UnsupportedOption);
256 }
257
258 let client = match opts.log_sink_client.take() {
259 Some(log_sink) => log_sink,
260 None => connect_to_protocol()
261 .map_err(|e| e.to_string())
262 .map_err(PublishError::LogSinkConnect)?,
263 };
264
265 let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
266 let Ok(LogSinkEvent::OnInit {
267 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
268 }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
269 else {
270 return Err(PublishError::MissingOnInit);
271 };
272 drop(proxy);
273
274 let publisher = Self::new(opts, iob);
275
276 if register_global_logger {
277 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
278 }
279
280 Ok((publisher, client))
281 }
282
283 pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
286 let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
287
288 if listen_for_interest_updates && !register_global_logger {
289 return Err(PublishError::UnsupportedOption);
292 }
293
294 let proxy = match opts.log_sink_client.take() {
295 Some(log_sink) => log_sink.into_proxy(),
296 None => connect_to_protocol()
297 .map_err(|e| e.to_string())
298 .map_err(PublishError::LogSinkConnect)?,
299 };
300
301 let Some(Ok(LogSinkEvent::OnInit {
302 payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
303 })) = proxy.take_event_stream().next().await
304 else {
305 return Err(PublishError::MissingOnInit);
306 };
307
308 let publisher = Self::new(opts, iob);
309
310 if register_global_logger {
311 publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
312 fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
313 }
314
315 Ok(publisher)
316 }
317
318 pub fn event_for_testing(&self, record: TestRecord<'_>) {
320 if self.inner.filter.enabled_for_testing(&record) {
321 self.inner.sink.event_for_testing(record);
322 }
323 }
324
325 pub fn set_interest_listener<T>(&self, listener: T)
327 where
328 T: OnInterestChanged + Send + Sync + 'static,
329 {
330 self.inner.filter.set_interest_listener(listener);
331 }
332
333 pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
336 self.inner.filter.update_interest(interest.unwrap_or_default());
337 unsafe {
340 let ptr = Arc::into_raw(self.inner.clone());
341 log::set_logger(&*ptr).inspect_err(|_| {
342 let _ = Arc::from_raw(ptr);
343 })?;
344 }
345 Ok(())
346 }
347
348 async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
351 self.inner.filter.listen_for_interest_updates(proxy).await;
352 }
353}
354
355impl log::Log for InnerPublisher {
356 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
357 true
360 }
361
362 fn log(&self, record: &log::Record<'_>) {
363 self.sink.record_log(record);
364 }
365
366 fn flush(&self) {}
367}
368
369impl log::Log for Publisher {
370 #[inline]
371 fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
372 self.inner.enabled(metadata)
373 }
374
375 #[inline]
376 fn log(&self, record: &log::Record<'_>) {
377 self.inner.log(record)
378 }
379
380 #[inline]
381 fn flush(&self) {
382 self.inner.flush()
383 }
384}
385
386impl Borrow<InterestFilter> for InnerPublisher {
387 fn borrow(&self) -> &InterestFilter {
388 &self.filter
389 }
390}
391
392pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
396 BufferedPublisher::new(opts.publisher)?;
397 if opts.install_panic_hook {
398 crate::install_panic_hook(opts.panic_prefix);
399 }
400 Ok(())
401}
402
403pub struct BufferedPublisher {
407 sink: BufferedSink,
408 filter: InterestFilter,
409 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
410}
411
412impl BufferedPublisher {
413 pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
416 if opts.listen_for_interest_updates && !opts.register_global_logger {
417 return Err(PublishError::UnsupportedOption);
420 }
421
422 let client = match opts.log_sink_client {
423 Some(log_sink) => log_sink,
424 None => connect_to_protocol()
425 .map_err(|e| e.to_string())
426 .map_err(PublishError::LogSinkConnect)?,
427 };
428
429 let this = Arc::new(Self {
430 sink: BufferedSink::new(SinkConfig {
431 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
432 metatags: opts.metatags,
433 always_log_file_line: opts.always_log_file_line,
434 }),
435 filter: InterestFilter::new(opts.interest),
436 interest_listening_task: Mutex::default(),
437 });
438
439 if opts.register_global_logger {
440 unsafe {
444 log::set_logger(&*Arc::into_raw(this.clone()))?;
445 }
446 }
447
448 let this_clone = this.clone();
452 *this_clone.interest_listening_task.lock().unwrap() =
453 Some(fasync::Task::spawn(async move {
454 let proxy = client.into_proxy();
455
456 let Some(Ok(LogSinkEvent::OnInit {
457 payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
458 })) = proxy.take_event_stream().next().await
459 else {
460 return;
463 };
464
465 this.filter.update_interest(
469 (if opts.listen_for_interest_updates { interest } else { None })
470 .unwrap_or_default(),
471 );
472
473 this.sink.set_buffer(buffer);
474
475 if opts.listen_for_interest_updates {
476 this.filter.listen_for_interest_updates(proxy).await;
477 }
478 }));
479
480 Ok(this_clone)
481 }
482}
483
484impl log::Log for BufferedPublisher {
485 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
486 true
489 }
490
491 fn log(&self, record: &log::Record<'_>) {
492 self.sink.record_log(record);
493 }
494
495 fn flush(&self) {}
496}
497
498impl Borrow<InterestFilter> for BufferedPublisher {
499 fn borrow(&self) -> &InterestFilter {
500 &self.filter
501 }
502}
503
504#[derive(Debug, Error)]
506pub enum PublishError {
507 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
509 LogSinkConnect(String),
510
511 #[error("failed to create a socket for logging")]
513 MakeSocket(#[source] zx::Status),
514
515 #[error("failed to send a socket to the LogSink")]
517 SendSocket(#[source] fidl::Error),
518
519 #[error("failed to install the loger")]
521 InitLogForward(#[from] log::SetLoggerError),
522
523 #[error("unsupported option")]
525 UnsupportedOption,
526
527 #[error("did not receive the OnInit event")]
529 MissingOnInit,
530}
531
532#[cfg(test)]
533static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
534
535#[cfg(test)]
537pub fn increment_clock(duration: Duration) {
538 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
539}
540
541#[doc(hidden)]
543pub fn get_now() -> i64 {
544 #[cfg(not(test))]
545 return zx::MonotonicInstant::get().into_nanos();
546
547 #[cfg(test)]
548 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
549}
550
551#[macro_export]
556macro_rules! log_every_n_seconds {
557 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
558 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
559 use $crate::{paste, fuchsia::get_now};
560
561 let now = get_now();
562
563 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
564 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
565 paste! {
566 log::[< $severity:lower >]!($($arg)*);
567 }
568 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
569 }
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use diagnostics_reader::ArchiveReader;
577 use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
578 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
579 use futures::{StreamExt, future};
580 use log::{debug, info};
581 use moniker::ExtendedMoniker;
582
583 #[fuchsia::test]
584 async fn panic_integration_test() {
585 let builder = RealmBuilder::new().await.unwrap();
586 let puppet = builder
587 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
588 .await
589 .unwrap();
590 builder
591 .add_route(
592 Route::new()
593 .capability(Capability::protocol::<CrasherMarker>())
594 .from(&puppet)
595 .to(Ref::parent()),
596 )
597 .await
598 .unwrap();
599 let realm = builder.build().await.unwrap();
600 let child_name = realm.root.child_name();
601 let reader = ArchiveReader::logs();
602 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
603 let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
604 let target_moniker =
605 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
606 .unwrap();
607 proxy.crash("This is a test panic.").await.unwrap();
608
609 let result =
610 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
611 assert_eq!(result.line_number(), Some(29).as_ref());
612 assert_eq!(
613 result.file_path(),
614 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
615 );
616 assert!(
617 result
618 .payload_keys()
619 .unwrap()
620 .get_property("info")
621 .unwrap()
622 .to_string()
623 .contains("This is a test panic.")
624 );
625 }
626
627 #[fuchsia::test(logging = false)]
628 async fn verify_setting_minimum_log_severity() {
629 let reader = ArchiveReader::logs();
630 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
631 let _publisher = Publisher::new_async(PublisherOptions {
632 tags: &["verify_setting_minimum_log_severity"],
633 register_global_logger: true,
634 ..PublisherOptions::default()
635 })
636 .await
637 .expect("initialized log");
638
639 info!("I'm an info log");
640 debug!("I'm a debug log and won't show up");
641
642 set_minimum_severity(Severity::Debug);
643 debug!("I'm a debug log and I show up");
644
645 let results = logs
646 .filter(|data| {
647 future::ready(
648 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
649 )
650 })
651 .take(2)
652 .collect::<Vec<_>>()
653 .await;
654 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
655 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
656 }
657
658 #[fuchsia::test]
659 async fn log_macro_logs_are_recorded() {
660 let reader = ArchiveReader::logs();
661 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
662
663 let total_threads = 10;
664
665 for i in 0..total_threads {
666 std::thread::spawn(move || {
667 log::info!(thread=i; "log from thread {}", i);
668 });
669 }
670
671 let mut results = logs
672 .filter(|data| {
673 future::ready(
674 data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
675 )
676 })
677 .take(total_threads);
678
679 let mut seen = vec![];
680 while let Some(log) = results.next().await {
681 let hierarchy = log.payload_keys().unwrap();
682 assert_eq!(hierarchy.properties.len(), 1);
683 assert_eq!(hierarchy.properties[0].name(), "thread");
684 let thread_id = hierarchy.properties[0].uint().unwrap();
685 seen.push(thread_id as usize);
686 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
687 }
688 seen.sort();
689 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
690 }
691}