Skip to main content

diagnostics_log/fuchsia/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8    LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_sync::Mutex;
12use futures::stream::StreamExt;
13use std::borrow::Borrow;
14use std::collections::HashSet;
15use std::fmt::Debug;
16use std::sync::Arc;
17use thiserror::Error;
18
19#[cfg(fuchsia_api_level_less_than = "27")]
20use fidl_fuchsia_diagnostics::Interest;
21#[cfg(fuchsia_api_level_at_least = "27")]
22use fidl_fuchsia_diagnostics_types::Interest;
23
24use fuchsia_component_client::connect::connect_to_protocol;
25#[cfg(not(feature = "no_startup_handle"))]
26use fuchsia_runtime::{HandleInfo, HandleType};
27
28mod filter;
29mod sink;
30
31use filter::InterestFilter;
32use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
33
34pub use diagnostics_log_encoding::Metatag;
35pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
36pub use paste::paste;
37
38#[cfg(test)]
39use std::{
40    sync::atomic::{AtomicI64, Ordering},
41    time::Duration,
42};
43
44/// Callback for interest listeners
45pub trait OnInterestChanged {
46    /// Callback for when the interest changes
47    fn on_changed(&self, severity: Severity);
48}
49
50/// Options to configure a `Publisher`.
51#[derive(Default)]
52pub struct PublisherOptions<'t> {
53    pub(crate) interest: Interest,
54    listen_for_interest_updates: bool,
55    log_sink_client: Option<ClientEnd<LogSinkMarker>>,
56    pub(crate) metatags: HashSet<Metatag>,
57    pub(crate) tags: &'t [&'t str],
58    pub(crate) always_log_file_line: bool,
59    register_global_logger: bool,
60}
61
62impl Default for PublishOptions<'static> {
63    fn default() -> Self {
64        Self {
65            publisher: PublisherOptions {
66                // Default to registering the global logger and listening for interest updates for
67                // `PublishOptions` because it's used by the `initialize...` functions which are
68                // typically called at program start time.
69                listen_for_interest_updates: true,
70                register_global_logger: true,
71                ..PublisherOptions::default()
72            },
73            install_panic_hook: true,
74            panic_prefix: None,
75        }
76    }
77}
78
79macro_rules! publisher_options {
80    ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
81        $(
82            impl<'t> $name<'t> {
83                /// Whether or not to log file/line information regardless of severity.
84                ///
85                /// Default: false.
86                pub fn log_file_line_info(mut $self, enable: bool) -> Self {
87                    let this = &mut $self$(.$self_arg)*;
88                    this.always_log_file_line = enable;
89                    $self
90                }
91
92                /// When set, a `fuchsia_async::Task` will be spawned and held that will be
93                /// listening for interest changes. This option can only be set if
94                /// `register_global_logger` is set.
95                ///
96                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
97                pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
98                    let this = &mut $self$(.$self_arg)*;
99                    this.listen_for_interest_updates = enable;
100                    $self
101                }
102
103                /// Sets the `LogSink` that will be used.
104                ///
105                /// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
106                pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
107                    let this = &mut $self$(.$self_arg)*;
108                    this.log_sink_client = Some(client);
109                    $self
110                }
111
112                /// When set to true, the publisher will be registered as the global logger. This
113                /// can only be done once.
114                ///
115                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
116                pub fn register_global_logger(mut $self, value: bool) -> Self {
117                    let this = &mut $self$(.$self_arg)*;
118                    this.register_global_logger = value;
119                    $self
120                }
121            }
122        )*
123    };
124}
125
126publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
127
128/// Initializes logging with the given options.
129///
130/// IMPORTANT: this should be called at most once in a program, and must be
131/// called only after an async executor has been set for the current thread,
132/// otherwise it'll return errors or panic. Therefore it's recommended to never
133/// call this from libraries and only do it from binaries.
134// Ideally this would be an async function, but fixing that is a bit of a Yak shave.
135pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
136    let result = Publisher::new_sync_with_async_listener(opts.publisher);
137    if matches!(result, Err(PublishError::MissingOnInit)) {
138        // NOTE: We ignore missing OnInit errors as these can happen on products where the log sink
139        // connection isn't routed. If this is a mistake, then there will be warning messages from
140        // Component Manager regarding failed routing.
141        return Ok(());
142    }
143    result?;
144    if opts.install_panic_hook {
145        crate::install_panic_hook(opts.panic_prefix);
146    }
147    Ok(())
148}
149
150/// Sets the global minimum log severity.
151/// IMPORTANT: this function can panic if `initialize` wasn't called before.
152pub fn set_minimum_severity(severity: impl Into<Severity>) {
153    let severity: Severity = severity.into();
154    log::set_max_level(severity.into());
155}
156
157/// Initializes logging with the given options.
158///
159/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
160/// used.
161///
162/// IMPORTANT: this should be called at most once in a program, and must be
163/// called only after an async executor has been set for the current thread,
164/// otherwise it'll return errors or panic. Therefore it's recommended to never
165/// call this from libraries and only do it from binaries.
166pub fn initialize_sync(opts: PublishOptions<'_>) {
167    match Publisher::new_sync(opts.publisher) {
168        Ok(_) => {}
169        Err(PublishError::MissingOnInit) => {
170            // NOTE: We ignore missing OnInit errors as these can happen on products where the log
171            // sink connection isn't routed. If this is a mistake, then there will be warning
172            // messages from Component Manager regarding failed routing.
173            return;
174        }
175        Err(e) => panic!("Unable to initialize logging: {e:?}"),
176    }
177    if opts.install_panic_hook {
178        crate::install_panic_hook(opts.panic_prefix);
179    }
180}
181
182/// A `Publisher` acts as broker, implementing [`log::Log`] to receive log
183/// events from a component, and then forwarding that data on to a diagnostics service.
184#[derive(Clone)]
185pub struct Publisher {
186    inner: Arc<InnerPublisher>,
187}
188
189struct InnerPublisher {
190    sink: IoBufferSink,
191    filter: InterestFilter,
192}
193
194impl Publisher {
195    fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
196        Self {
197            inner: Arc::new(InnerPublisher {
198                sink: IoBufferSink::new(
199                    iob,
200                    SinkConfig {
201                        tags: opts.tags.iter().map(|s| s.to_string()).collect(),
202                        metatags: opts.metatags,
203                        always_log_file_line: opts.always_log_file_line,
204                    },
205                ),
206                filter: InterestFilter::new(opts.interest),
207            }),
208        }
209    }
210
211    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
212    /// listener in a separate thread.
213    pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
214        let listen_for_interest_updates = opts.listen_for_interest_updates;
215        let (publisher, client) = Self::new_sync_no_listener(opts)?;
216        if listen_for_interest_updates {
217            let publisher = publisher.clone();
218            std::thread::spawn(move || {
219                fuchsia_async::LocalExecutor::default()
220                    .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
221            });
222        }
223        Ok(publisher)
224    }
225
226    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
227    /// listener in an async task. Prefer to use `new_async`.
228    pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
229        let listen_for_interest_updates = opts.listen_for_interest_updates;
230        let (publisher, client) = Self::new_sync_no_listener(opts)?;
231        if listen_for_interest_updates {
232            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
233                .detach();
234        }
235        Ok(publisher)
236    }
237
238    /// Returns a new `Publisher`, but doesn't listen for interest updates. This will connect
239    /// synchronously.
240    fn new_sync_no_listener(
241        mut opts: PublisherOptions<'_>,
242    ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
243        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
244
245        if listen_for_interest_updates && !register_global_logger {
246            // We can only support listening for interest updates if we are registering a global
247            // logger. This is because if we don't register, the initial interest is dropped.
248            return Err(PublishError::UnsupportedOption);
249        }
250
251        let mut get_client = || match opts.log_sink_client.take() {
252            Some(log_sink) => Ok(log_sink),
253            None => {
254                #[cfg(not(feature = "no_startup_handle"))]
255                {
256                    let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
257                        HandleType::LogSink,
258                        0,
259                    ));
260                    if let Some(log_sink) = log_sink {
261                        let log_sink = fidl::Channel::from(log_sink);
262                        return Ok(ClientEnd::<LogSinkMarker>::from(log_sink));
263                    }
264                }
265                // If startup handle is unavailable fallback to /svc.
266                connect_to_protocol()
267                    .map_err(|e| e.to_string())
268                    .map_err(PublishError::LogSinkConnect)
269            }
270        };
271        let client = (get_client)()?;
272
273        let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
274        let Ok(LogSinkEvent::OnInit {
275            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
276        }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
277        else {
278            return Err(PublishError::MissingOnInit);
279        };
280
281        let publisher = Self::new(opts, iob);
282
283        if register_global_logger {
284            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
285        }
286
287        Ok((publisher, client))
288    }
289
290    /// Returns a new `Publisher`. This will connect asynchronously and, if configured, run a
291    /// listener in an async task.
292    pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
293        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
294
295        if listen_for_interest_updates && !register_global_logger {
296            // We can only support listening for interest updates if we are registering a global
297            // logger. This is because if we don't register, the initial interest is dropped.
298            return Err(PublishError::UnsupportedOption);
299        }
300
301        let mut get_client = || match opts.log_sink_client.take() {
302            Some(log_sink) => Ok(log_sink.into_proxy()),
303            None => {
304                #[cfg(not(feature = "no_startup_handle"))]
305                {
306                    let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
307                        HandleType::LogSink,
308                        0,
309                    ));
310                    if let Some(log_sink) = log_sink {
311                        let log_sink = fidl::Channel::from(log_sink);
312                        let log_sink = ClientEnd::<LogSinkMarker>::from(log_sink);
313                        return Ok(log_sink.into_proxy());
314                    }
315                }
316                // If startup handle is unavailable fallback to /svc.
317                connect_to_protocol()
318                    .map_err(|e| e.to_string())
319                    .map_err(PublishError::LogSinkConnect)
320            }
321        };
322        let proxy = (get_client)()?;
323
324        let Some(Ok(LogSinkEvent::OnInit {
325            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
326        })) = proxy.take_event_stream().next().await
327        else {
328            return Err(PublishError::MissingOnInit);
329        };
330
331        let publisher = Self::new(opts, iob);
332
333        if register_global_logger {
334            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
335            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
336        }
337
338        Ok(publisher)
339    }
340
341    /// Publish the provided event for testing.
342    pub fn event_for_testing(&self, record: TestRecord<'_>) {
343        if self.inner.filter.enabled_for_testing(&record) {
344            self.inner.sink.event_for_testing(record);
345        }
346    }
347
348    /// Registers an interest listener
349    pub fn set_interest_listener<T>(&self, listener: T)
350    where
351        T: OnInterestChanged + Send + Sync + 'static,
352    {
353        self.inner.filter.set_interest_listener(listener);
354    }
355
356    /// Sets the global logger to this publisher. This function may only be called once in the
357    /// lifetime of a program.
358    pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
359        self.inner.filter.update_interest(interest.unwrap_or_default());
360        // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime of the
361        // program.
362        unsafe {
363            let ptr = Arc::into_raw(self.inner.clone());
364            log::set_logger(&*ptr).inspect_err(|_| {
365                let _ = Arc::from_raw(ptr);
366            })?;
367        }
368        Ok(())
369    }
370
371    /// Listens for interest updates. Callers must maintain a clone to keep the publisher alive;
372    /// this function will downgrade to a weak reference.
373    async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
374        self.inner.filter.listen_for_interest_updates(proxy).await;
375    }
376}
377
378impl log::Log for InnerPublisher {
379    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
380        // NOTE: we handle minimum severity directly through the log max_level. So we call,
381        // log::set_max_level, log::max_level where appropriate.
382        true
383    }
384
385    fn log(&self, record: &log::Record<'_>) {
386        self.sink.record_log(record);
387    }
388
389    fn flush(&self) {}
390}
391
392impl log::Log for Publisher {
393    #[inline]
394    fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
395        self.inner.enabled(metadata)
396    }
397
398    #[inline]
399    fn log(&self, record: &log::Record<'_>) {
400        self.inner.log(record)
401    }
402
403    #[inline]
404    fn flush(&self) {
405        self.inner.flush()
406    }
407}
408
409impl Borrow<InterestFilter> for InnerPublisher {
410    fn borrow(&self) -> &InterestFilter {
411        &self.filter
412    }
413}
414
415/// Initializes logging, but buffers logs until the connection is established. This is required for
416/// things like Component Manager, which would otherwise deadlock when starting. This carries some
417/// overhead, so should be avoided unless required.
418pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
419    BufferedPublisher::new(opts.publisher)?;
420    if opts.install_panic_hook {
421        crate::install_panic_hook(opts.panic_prefix);
422    }
423    Ok(())
424}
425
426/// A buffered publisher will buffer log messages until the IOBuffer is received. If this is
427/// registered as the global logger, then messages will be logged at the default level until an
428/// updated level is received from Archivist.
429pub struct BufferedPublisher {
430    sink: BufferedSink,
431    filter: InterestFilter,
432    interest_listening_task: Mutex<Option<fasync::Task<()>>>,
433}
434
435impl BufferedPublisher {
436    /// Returns a publisher that will buffer messages until the IOBuffer is received. An async
437    /// executor must be established.
438    pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
439        if opts.listen_for_interest_updates && !opts.register_global_logger {
440            // We can only support listening for interest updates if we are registering a global
441            // logger. This is because if we don't register, the initial interest is dropped.
442            return Err(PublishError::UnsupportedOption);
443        }
444
445        let get_client = || match opts.log_sink_client {
446            Some(log_sink) => Ok(log_sink),
447            None => {
448                #[cfg(not(feature = "no_startup_handle"))]
449                {
450                    let log_sink = fuchsia_runtime::take_startup_handle(HandleInfo::new(
451                        HandleType::LogSink,
452                        0,
453                    ));
454                    if let Some(log_sink) = log_sink {
455                        let log_sink = fidl::Channel::from(log_sink);
456                        return Ok(ClientEnd::<LogSinkMarker>::from(log_sink));
457                    }
458                }
459                // If startup handle is unavailable fallback to /svc.
460                connect_to_protocol()
461                    .map_err(|e| e.to_string())
462                    .map_err(PublishError::LogSinkConnect)
463            }
464        };
465        let client = (get_client)()?;
466
467        let this = Arc::new(Self {
468            sink: BufferedSink::new(SinkConfig {
469                tags: opts.tags.iter().map(|s| s.to_string()).collect(),
470                metatags: opts.metatags,
471                always_log_file_line: opts.always_log_file_line,
472            }),
473            filter: InterestFilter::new(opts.interest),
474            interest_listening_task: Mutex::default(),
475        });
476
477        if opts.register_global_logger {
478            // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime
479            // of the program. This leaks even when there is an error (which shouldn't happen so
480            // we don't worry about it).
481            unsafe {
482                log::set_logger(&*Arc::into_raw(this.clone()))?;
483            }
484        }
485
486        // Whilst we are waiting for the OnInit event, we hold a strong reference to the publisher
487        // which will prevent the publisher from being dropped and ensure that buffered log messages
488        // are sent.
489        let this_clone = this.clone();
490        *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
491            let proxy = client.into_proxy();
492
493            let Some(Ok(LogSinkEvent::OnInit {
494                payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
495            })) = proxy.take_event_stream().next().await
496            else {
497                // There's not a lot we can do here: we haven't received the event we expected
498                // and there's no way we can log the issue.
499                return;
500            };
501
502            // Ignore the interest sent in the OnInit request if `listen_for_interest_updates`
503            // is false; it is assumed that the caller wants the interest specified in the
504            // options to stick.
505            this.filter.update_interest(
506                (if opts.listen_for_interest_updates { interest } else { None })
507                    .unwrap_or_default(),
508            );
509
510            this.sink.set_buffer(buffer);
511
512            if opts.listen_for_interest_updates {
513                this.filter.listen_for_interest_updates(proxy).await;
514            }
515        }));
516
517        Ok(this_clone)
518    }
519}
520
521impl log::Log for BufferedPublisher {
522    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
523        // NOTE: we handle minimum severity directly through the log max_level. So we call,
524        // log::set_max_level, log::max_level where appropriate.
525        true
526    }
527
528    fn log(&self, record: &log::Record<'_>) {
529        self.sink.record_log(record);
530    }
531
532    fn flush(&self) {}
533}
534
535impl Borrow<InterestFilter> for BufferedPublisher {
536    fn borrow(&self) -> &InterestFilter {
537        &self.filter
538    }
539}
540
541/// Errors arising while forwarding a diagnostics stream to the environment.
542#[derive(Debug, Error)]
543pub enum PublishError {
544    /// Connection to fuchsia.logger.LogSink failed.
545    #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
546    LogSinkConnect(String),
547
548    /// Couldn't create a new socket.
549    #[error("failed to create a socket for logging")]
550    MakeSocket(#[source] zx::Status),
551
552    /// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
553    #[error("failed to send a socket to the LogSink")]
554    SendSocket(#[source] fidl::Error),
555
556    /// Installing a Logger.
557    #[error("failed to install the loger")]
558    InitLogForward(#[from] log::SetLoggerError),
559
560    /// Unsupported publish option.
561    #[error("unsupported option")]
562    UnsupportedOption,
563
564    /// The channel was closed with no OnInit event.
565    #[error("did not receive the OnInit event")]
566    MissingOnInit,
567}
568
569#[cfg(test)]
570static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
571
572/// Increments the test clock.
573#[cfg(test)]
574pub fn increment_clock(duration: Duration) {
575    CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
576}
577
578/// Gets the current monotonic time in nanoseconds.
579#[doc(hidden)]
580pub fn get_now() -> i64 {
581    #[cfg(not(test))]
582    return zx::MonotonicInstant::get().into_nanos();
583
584    #[cfg(test)]
585    CURRENT_TIME_NANOS.load(Ordering::Relaxed)
586}
587
588/// Logs every N seconds using an Atomic variable
589/// to keep track of the time. This will have a higher
590/// performance impact on ARM compared to regular logging due to the use
591/// of an atomic.
592#[macro_export]
593macro_rules! log_every_n_seconds {
594    ($seconds:expr, $severity:expr, $($arg:tt)*) => {
595        use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
596        use $crate::{paste, fuchsia::get_now};
597
598        let now = get_now();
599
600        static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
601        if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
602            paste! {
603                log::[< $severity:lower >]!($($arg)*);
604            }
605            LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
606        }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613    use assert_matches::assert_matches;
614    use diagnostics_reader::ArchiveReader;
615    use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
616    use fuchsia_async::TimeoutExt;
617    use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
618    use futures::{FutureExt, StreamExt, future};
619    use log::{debug, error, info};
620    use moniker::ExtendedMoniker;
621
622    #[fuchsia::test]
623    async fn panic_integration_test() {
624        let builder = RealmBuilder::new().await.unwrap();
625        let puppet = builder
626            .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
627            .await
628            .unwrap();
629        builder
630            .add_route(
631                Route::new()
632                    .capability(Capability::protocol::<CrasherMarker>())
633                    .from(&puppet)
634                    .to(Ref::parent()),
635            )
636            .await
637            .unwrap();
638        let realm = builder.build().await.unwrap();
639        let child_name = realm.root.child_name();
640        let reader = ArchiveReader::logs();
641        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
642        let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
643        let target_moniker =
644            ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
645                .unwrap();
646        proxy.crash("This is a test panic.").await.unwrap();
647
648        let result =
649            logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
650        assert_eq!(result.line_number(), Some(29).as_ref());
651        assert_eq!(
652            result.file_path(),
653            Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
654        );
655        assert!(
656            result
657                .payload_keys()
658                .unwrap()
659                .get_property("info")
660                .unwrap()
661                .to_string()
662                .contains("This is a test panic.")
663        );
664    }
665
666    #[fuchsia::test(logging = false)]
667    async fn verify_setting_minimum_log_severity() {
668        let reader = ArchiveReader::logs();
669        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
670        let _publisher = Publisher::new_async(PublisherOptions {
671            tags: &["verify_setting_minimum_log_severity"],
672            register_global_logger: true,
673            ..PublisherOptions::default()
674        })
675        .await
676        .expect("initialized log");
677
678        info!("I'm an info log");
679        debug!("I'm a debug log and won't show up");
680
681        set_minimum_severity(Severity::Debug);
682        debug!("I'm a debug log and I show up");
683
684        let results = logs
685            .filter(|data| {
686                future::ready(
687                    data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
688                )
689            })
690            .take(2)
691            .collect::<Vec<_>>()
692            .await;
693        assert_eq!(results[0].msg().unwrap(), "I'm an info log");
694        assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
695    }
696
697    #[fuchsia::test]
698    async fn log_macro_logs_are_recorded() {
699        let reader = ArchiveReader::logs();
700        let (logs, mut errors) = reader.snapshot_then_subscribe().unwrap().split_streams();
701
702        let total_threads = 10;
703
704        for i in 0..total_threads {
705            std::thread::spawn(move || {
706                log::info!(thread=i; "log from thread {}", i);
707            });
708        }
709
710        let mut filtered = 0;
711        let mut results = logs
712            .filter(|data| {
713                future::ready(
714                    if data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded") {
715                        true
716                    } else {
717                        filtered += 1;
718                        false
719                    },
720                )
721            })
722            .take(total_threads);
723
724        let mut seen = vec![];
725        while let Some(log) = results
726            .next()
727            .on_timeout(Duration::from_secs(30), || {
728                error!("Timed out!");
729                None
730            })
731            .await
732        {
733            let hierarchy = log.payload_keys().unwrap();
734            assert_eq!(hierarchy.properties.len(), 1);
735            assert_eq!(hierarchy.properties[0].name(), "thread");
736            let thread_id = hierarchy.properties[0].uint().unwrap();
737            seen.push(thread_id as usize);
738            assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
739        }
740
741        assert_matches!(errors.next().now_or_never(), None);
742
743        seen.sort();
744        assert_eq!(seen, (0..total_threads).collect::<Vec<_>>(), "filtered={filtered}");
745    }
746}