diagnostics_log/fuchsia/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8    LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_component_client::connect::connect_to_protocol;
12use fuchsia_sync::Mutex;
13use futures::stream::StreamExt;
14use std::borrow::Borrow;
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[cfg(fuchsia_api_level_less_than = "27")]
21use fidl_fuchsia_diagnostics::Interest;
22#[cfg(fuchsia_api_level_at_least = "27")]
23use fidl_fuchsia_diagnostics_types::Interest;
24
25mod filter;
26mod sink;
27
28use filter::InterestFilter;
29use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
30
31pub use diagnostics_log_encoding::Metatag;
32pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
33pub use paste::paste;
34
35#[cfg(test)]
36use std::{
37    sync::atomic::{AtomicI64, Ordering},
38    time::Duration,
39};
40
41/// Callback for interest listeners
42pub trait OnInterestChanged {
43    /// Callback for when the interest changes
44    fn on_changed(&self, severity: Severity);
45}
46
47/// Options to configure a `Publisher`.
48#[derive(Default)]
49pub struct PublisherOptions<'t> {
50    blocking: bool,
51    pub(crate) interest: Interest,
52    listen_for_interest_updates: bool,
53    log_sink_client: Option<ClientEnd<LogSinkMarker>>,
54    pub(crate) metatags: HashSet<Metatag>,
55    pub(crate) tags: &'t [&'t str],
56    pub(crate) always_log_file_line: bool,
57    register_global_logger: bool,
58}
59
60impl Default for PublishOptions<'static> {
61    fn default() -> Self {
62        Self {
63            publisher: PublisherOptions {
64                // Default to registering the global logger and listening for interest updates for
65                // `PublishOptions` because it's used by the `initialize...` functions which are
66                // typically called at program start time.
67                listen_for_interest_updates: true,
68                register_global_logger: true,
69                ..PublisherOptions::default()
70            },
71            install_panic_hook: true,
72            panic_prefix: None,
73        }
74    }
75}
76
77macro_rules! publisher_options {
78    ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
79        $(
80            impl<'t> $name<'t> {
81                /// Whether or not to log file/line information regardless of severity.
82                ///
83                /// Default: false.
84                pub fn log_file_line_info(mut $self, enable: bool) -> Self {
85                    let this = &mut $self$(.$self_arg)*;
86                    this.always_log_file_line = enable;
87                    $self
88                }
89
90                /// When set, a `fuchsia_async::Task` will be spawned and held that will be
91                /// listening for interest changes. This option can only be set if
92                /// `register_global_logger` is set.
93                ///
94                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
95                pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
96                    let this = &mut $self$(.$self_arg)*;
97                    this.listen_for_interest_updates = enable;
98                    $self
99                }
100
101                /// Sets the `LogSink` that will be used.
102                ///
103                /// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
104                pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
105                    let this = &mut $self$(.$self_arg)*;
106                    this.log_sink_client = Some(client);
107                    $self
108                }
109
110                /// When set to true, writes to the log socket will be blocking. This is, we'll
111                /// retry every time the socket buffer is full until we are able to write the log.
112                ///
113                /// Default: false
114                pub fn blocking(mut $self, is_blocking: bool) -> Self {
115                    let this = &mut $self$(.$self_arg)*;
116                    this.blocking = is_blocking;
117                    $self
118                }
119
120                /// When set to true, the publisher will be registered as the global logger. This
121                /// can only be done once.
122                ///
123                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
124                pub fn register_global_logger(mut $self, value: bool) -> Self {
125                    let this = &mut $self$(.$self_arg)*;
126                    this.register_global_logger = value;
127                    $self
128                }
129            }
130        )*
131    };
132}
133
134publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
135
136/// Initializes logging with the given options.
137///
138/// IMPORTANT: this should be called at most once in a program, and must be
139/// called only after an async executor has been set for the current thread,
140/// otherwise it'll return errors or panic. Therefore it's recommended to never
141/// call this from libraries and only do it from binaries.
142// Ideally this would be an async function, but fixing that is a bit of a Yak shave.
143pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
144    let result = Publisher::new_sync_with_async_listener(opts.publisher);
145    if matches!(result, Err(PublishError::MissingOnInit)) {
146        // NOTE: We ignore missing OnInit errors as these can happen on products where the log sink
147        // connection isn't routed. If this is a mistake, then there will be warning messages from
148        // Component Manager regarding failed routing.
149        return Ok(());
150    }
151    result?;
152    if opts.install_panic_hook {
153        crate::install_panic_hook(opts.panic_prefix);
154    }
155    Ok(())
156}
157
158/// Sets the global minimum log severity.
159/// IMPORTANT: this function can panic if `initialize` wasn't called before.
160pub fn set_minimum_severity(severity: impl Into<Severity>) {
161    let severity: Severity = severity.into();
162    log::set_max_level(severity.into());
163}
164
165/// Initializes logging with the given options.
166///
167/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
168/// used.
169///
170/// IMPORTANT: this should be called at most once in a program, and must be
171/// called only after an async executor has been set for the current thread,
172/// otherwise it'll return errors or panic. Therefore it's recommended to never
173/// call this from libraries and only do it from binaries.
174pub fn initialize_sync(opts: PublishOptions<'_>) {
175    match Publisher::new_sync(opts.publisher) {
176        Ok(_) => {}
177        Err(PublishError::MissingOnInit) => {
178            // NOTE: We ignore missing OnInit errors as these can happen on products where the log
179            // sink connection isn't routed. If this is a mistake, then there will be warning
180            // messages from Component Manager regarding failed routing.
181            return;
182        }
183        Err(e) => panic!("Unable to initialize logging: {e:?}"),
184    }
185    if opts.install_panic_hook {
186        crate::install_panic_hook(opts.panic_prefix);
187    }
188}
189
190/// A `Publisher` acts as broker, implementing [`log::Log`] to receive log
191/// events from a component, and then forwarding that data on to a diagnostics service.
192#[derive(Clone)]
193pub struct Publisher {
194    inner: Arc<InnerPublisher>,
195}
196
197struct InnerPublisher {
198    sink: IoBufferSink,
199    filter: InterestFilter,
200}
201
202impl Publisher {
203    fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
204        Self {
205            inner: Arc::new(InnerPublisher {
206                sink: IoBufferSink::new(
207                    iob,
208                    SinkConfig {
209                        tags: opts.tags.iter().map(|s| s.to_string()).collect(),
210                        metatags: opts.metatags,
211                        always_log_file_line: opts.always_log_file_line,
212                    },
213                ),
214                filter: InterestFilter::new(opts.interest),
215            }),
216        }
217    }
218
219    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
220    /// listener in a separate thread.
221    pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
222        let listen_for_interest_updates = opts.listen_for_interest_updates;
223        let (publisher, client) = Self::new_sync_no_listener(opts)?;
224        if listen_for_interest_updates {
225            let publisher = publisher.clone();
226            std::thread::spawn(move || {
227                fuchsia_async::LocalExecutor::default()
228                    .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
229            });
230        }
231        Ok(publisher)
232    }
233
234    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
235    /// listener in an async task. Prefer to use `new_async`.
236    pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
237        let listen_for_interest_updates = opts.listen_for_interest_updates;
238        let (publisher, client) = Self::new_sync_no_listener(opts)?;
239        if listen_for_interest_updates {
240            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
241                .detach();
242        }
243        Ok(publisher)
244    }
245
246    /// Returns a new `Publisher`, but doesn't listen for interest updates. This will connect
247    /// synchronously.
248    fn new_sync_no_listener(
249        mut opts: PublisherOptions<'_>,
250    ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
251        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
252
253        if listen_for_interest_updates && !register_global_logger {
254            // We can only support listening for interest updates if we are registering a global
255            // logger. This is because if we don't register, the initial interest is dropped.
256            return Err(PublishError::UnsupportedOption);
257        }
258
259        let client = match opts.log_sink_client.take() {
260            Some(log_sink) => log_sink,
261            None => connect_to_protocol()
262                .map_err(|e| e.to_string())
263                .map_err(PublishError::LogSinkConnect)?,
264        };
265
266        let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
267        let Ok(LogSinkEvent::OnInit {
268            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
269        }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
270        else {
271            return Err(PublishError::MissingOnInit);
272        };
273        drop(proxy);
274
275        let publisher = Self::new(opts, iob);
276
277        if register_global_logger {
278            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
279        }
280
281        Ok((publisher, client))
282    }
283
284    /// Returns a new `Publisher`. This will connect asynchronously and, if configured, run a
285    /// listener in an async task.
286    pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
287        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
288
289        if listen_for_interest_updates && !register_global_logger {
290            // We can only support listening for interest updates if we are registering a global
291            // logger. This is because if we don't register, the initial interest is dropped.
292            return Err(PublishError::UnsupportedOption);
293        }
294
295        let proxy = match opts.log_sink_client.take() {
296            Some(log_sink) => log_sink.into_proxy(),
297            None => connect_to_protocol()
298                .map_err(|e| e.to_string())
299                .map_err(PublishError::LogSinkConnect)?,
300        };
301
302        let Some(Ok(LogSinkEvent::OnInit {
303            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
304        })) = proxy.take_event_stream().next().await
305        else {
306            return Err(PublishError::MissingOnInit);
307        };
308
309        let publisher = Self::new(opts, iob);
310
311        if register_global_logger {
312            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
313            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
314        }
315
316        Ok(publisher)
317    }
318
319    /// Publish the provided event for testing.
320    pub fn event_for_testing(&self, record: TestRecord<'_>) {
321        if self.inner.filter.enabled_for_testing(&record) {
322            self.inner.sink.event_for_testing(record);
323        }
324    }
325
326    /// Registers an interest listener
327    pub fn set_interest_listener<T>(&self, listener: T)
328    where
329        T: OnInterestChanged + Send + Sync + 'static,
330    {
331        self.inner.filter.set_interest_listener(listener);
332    }
333
334    /// Sets the global logger to this publisher. This function may only be called once in the
335    /// lifetime of a program.
336    pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
337        self.inner.filter.update_interest(interest.unwrap_or_default());
338        // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime of the
339        // program.
340        unsafe {
341            let ptr = Arc::into_raw(self.inner.clone());
342            log::set_logger(&*ptr).inspect_err(|_| {
343                let _ = Arc::from_raw(ptr);
344            })?;
345        }
346        Ok(())
347    }
348
349    /// Listens for interest updates. Callers must maintain a clone to keep the publisher alive;
350    /// this function will downgrade to a weak reference.
351    async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
352        self.inner.filter.listen_for_interest_updates(proxy).await;
353    }
354}
355
356impl log::Log for InnerPublisher {
357    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
358        // NOTE: we handle minimum severity directly through the log max_level. So we call,
359        // log::set_max_level, log::max_level where appropriate.
360        true
361    }
362
363    fn log(&self, record: &log::Record<'_>) {
364        self.sink.record_log(record);
365    }
366
367    fn flush(&self) {}
368}
369
370impl log::Log for Publisher {
371    #[inline]
372    fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
373        self.inner.enabled(metadata)
374    }
375
376    #[inline]
377    fn log(&self, record: &log::Record<'_>) {
378        self.inner.log(record)
379    }
380
381    #[inline]
382    fn flush(&self) {
383        self.inner.flush()
384    }
385}
386
387impl Borrow<InterestFilter> for InnerPublisher {
388    fn borrow(&self) -> &InterestFilter {
389        &self.filter
390    }
391}
392
393/// Initializes logging, but buffers logs until the connection is established. This is required for
394/// things like Component Manager, which would otherwise deadlock when starting. This carries some
395/// overhead, so should be avoided unless required.
396pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
397    BufferedPublisher::new(opts.publisher)?;
398    if opts.install_panic_hook {
399        crate::install_panic_hook(opts.panic_prefix);
400    }
401    Ok(())
402}
403
404/// A buffered publisher will buffer log messages until the IOBuffer is received. If this is
405/// registered as the global logger, then messages will be logged at the default level until an
406/// updated level is received from Archivist.
407pub struct BufferedPublisher {
408    sink: BufferedSink,
409    filter: InterestFilter,
410    interest_listening_task: Mutex<Option<fasync::Task<()>>>,
411}
412
413impl BufferedPublisher {
414    /// Returns a publisher that will buffer messages until the IOBuffer is received. An async
415    /// executor must be established.
416    pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
417        if opts.listen_for_interest_updates && !opts.register_global_logger {
418            // We can only support listening for interest updates if we are registering a global
419            // logger. This is because if we don't register, the initial interest is dropped.
420            return Err(PublishError::UnsupportedOption);
421        }
422
423        let client = match opts.log_sink_client {
424            Some(log_sink) => log_sink,
425            None => connect_to_protocol()
426                .map_err(|e| e.to_string())
427                .map_err(PublishError::LogSinkConnect)?,
428        };
429
430        let this = Arc::new(Self {
431            sink: BufferedSink::new(SinkConfig {
432                tags: opts.tags.iter().map(|s| s.to_string()).collect(),
433                metatags: opts.metatags,
434                always_log_file_line: opts.always_log_file_line,
435            }),
436            filter: InterestFilter::new(opts.interest),
437            interest_listening_task: Mutex::default(),
438        });
439
440        if opts.register_global_logger {
441            // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime
442            // of the program. This leaks even when there is an error (which shouldn't happen so
443            // we don't worry about it).
444            unsafe {
445                log::set_logger(&*Arc::into_raw(this.clone()))?;
446            }
447        }
448
449        // Whilst we are waiting for the OnInit event, we hold a strong reference to the publisher
450        // which will prevent the publisher from being dropped and ensure that buffered log messages
451        // are sent.
452        let this_clone = this.clone();
453        *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
454            let proxy = client.into_proxy();
455
456            let Some(Ok(LogSinkEvent::OnInit {
457                payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
458            })) = proxy.take_event_stream().next().await
459            else {
460                // There's not a lot we can do here: we haven't received the event we expected
461                // and there's no way we can log the issue.
462                return;
463            };
464
465            // Ignore the interest sent in the OnInit request if `listen_for_interest_updates`
466            // is false; it is assumed that the caller wants the interest specified in the
467            // options to stick.
468            this.filter.update_interest(
469                (if opts.listen_for_interest_updates { interest } else { None })
470                    .unwrap_or_default(),
471            );
472
473            this.sink.set_buffer(buffer);
474
475            if opts.listen_for_interest_updates {
476                this.filter.listen_for_interest_updates(proxy).await;
477            }
478        }));
479
480        Ok(this_clone)
481    }
482}
483
484impl log::Log for BufferedPublisher {
485    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
486        // NOTE: we handle minimum severity directly through the log max_level. So we call,
487        // log::set_max_level, log::max_level where appropriate.
488        true
489    }
490
491    fn log(&self, record: &log::Record<'_>) {
492        self.sink.record_log(record);
493    }
494
495    fn flush(&self) {}
496}
497
498impl Borrow<InterestFilter> for BufferedPublisher {
499    fn borrow(&self) -> &InterestFilter {
500        &self.filter
501    }
502}
503
504/// Errors arising while forwarding a diagnostics stream to the environment.
505#[derive(Debug, Error)]
506pub enum PublishError {
507    /// Connection to fuchsia.logger.LogSink failed.
508    #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
509    LogSinkConnect(String),
510
511    /// Couldn't create a new socket.
512    #[error("failed to create a socket for logging")]
513    MakeSocket(#[source] zx::Status),
514
515    /// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
516    #[error("failed to send a socket to the LogSink")]
517    SendSocket(#[source] fidl::Error),
518
519    /// Installing a Logger.
520    #[error("failed to install the loger")]
521    InitLogForward(#[from] log::SetLoggerError),
522
523    /// Unsupported publish option.
524    #[error("unsupported option")]
525    UnsupportedOption,
526
527    /// The channel was closed with no OnInit event.
528    #[error("did not receive the OnInit event")]
529    MissingOnInit,
530}
531
532#[cfg(test)]
533static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
534
535/// Increments the test clock.
536#[cfg(test)]
537pub fn increment_clock(duration: Duration) {
538    CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
539}
540
541/// Gets the current monotonic time in nanoseconds.
542#[doc(hidden)]
543pub fn get_now() -> i64 {
544    #[cfg(not(test))]
545    return zx::MonotonicInstant::get().into_nanos();
546
547    #[cfg(test)]
548    CURRENT_TIME_NANOS.load(Ordering::Relaxed)
549}
550
551/// Logs every N seconds using an Atomic variable
552/// to keep track of the time. This will have a higher
553/// performance impact on ARM compared to regular logging due to the use
554/// of an atomic.
555#[macro_export]
556macro_rules! log_every_n_seconds {
557    ($seconds:expr, $severity:expr, $($arg:tt)*) => {
558        use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
559        use $crate::{paste, fuchsia::get_now};
560
561        let now = get_now();
562
563        static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
564        if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
565            paste! {
566                log::[< $severity:lower >]!($($arg)*);
567            }
568            LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
569        }
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use diagnostics_reader::ArchiveReader;
577    use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
578    use fuchsia_async::TimeoutExt;
579    use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
580    use futures::{StreamExt, future};
581    use log::{debug, error, info};
582    use moniker::ExtendedMoniker;
583
584    #[fuchsia::test]
585    async fn panic_integration_test() {
586        let builder = RealmBuilder::new().await.unwrap();
587        let puppet = builder
588            .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
589            .await
590            .unwrap();
591        builder
592            .add_route(
593                Route::new()
594                    .capability(Capability::protocol::<CrasherMarker>())
595                    .from(&puppet)
596                    .to(Ref::parent()),
597            )
598            .await
599            .unwrap();
600        let realm = builder.build().await.unwrap();
601        let child_name = realm.root.child_name();
602        let reader = ArchiveReader::logs();
603        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
604        let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
605        let target_moniker =
606            ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
607                .unwrap();
608        proxy.crash("This is a test panic.").await.unwrap();
609
610        let result =
611            logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
612        assert_eq!(result.line_number(), Some(29).as_ref());
613        assert_eq!(
614            result.file_path(),
615            Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
616        );
617        assert!(
618            result
619                .payload_keys()
620                .unwrap()
621                .get_property("info")
622                .unwrap()
623                .to_string()
624                .contains("This is a test panic.")
625        );
626    }
627
628    #[fuchsia::test(logging = false)]
629    async fn verify_setting_minimum_log_severity() {
630        let reader = ArchiveReader::logs();
631        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
632        let _publisher = Publisher::new_async(PublisherOptions {
633            tags: &["verify_setting_minimum_log_severity"],
634            register_global_logger: true,
635            ..PublisherOptions::default()
636        })
637        .await
638        .expect("initialized log");
639
640        info!("I'm an info log");
641        debug!("I'm a debug log and won't show up");
642
643        set_minimum_severity(Severity::Debug);
644        debug!("I'm a debug log and I show up");
645
646        let results = logs
647            .filter(|data| {
648                future::ready(
649                    data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
650                )
651            })
652            .take(2)
653            .collect::<Vec<_>>()
654            .await;
655        assert_eq!(results[0].msg().unwrap(), "I'm an info log");
656        assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
657    }
658
659    #[fuchsia::test]
660    async fn log_macro_logs_are_recorded() {
661        let reader = ArchiveReader::logs();
662        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
663
664        let total_threads = 10;
665
666        for i in 0..total_threads {
667            std::thread::spawn(move || {
668                log::info!(thread=i; "log from thread {}", i);
669            });
670        }
671
672        let mut results = logs
673            .filter(|data| {
674                future::ready(
675                    data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
676                )
677            })
678            .take(total_threads);
679
680        let mut seen = vec![];
681        while let Some(log) = results
682            .next()
683            .on_timeout(Duration::from_secs(30), || {
684                error!("Timed out!");
685                None
686            })
687            .await
688        {
689            let hierarchy = log.payload_keys().unwrap();
690            assert_eq!(hierarchy.properties.len(), 1);
691            assert_eq!(hierarchy.properties[0].name(), "thread");
692            let thread_id = hierarchy.properties[0].uint().unwrap();
693            seen.push(thread_id as usize);
694            assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
695        }
696        seen.sort();
697        assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
698    }
699}