diagnostics_log/fuchsia/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8    LogSinkEvent, LogSinkMarker, LogSinkOnInitRequest, LogSinkProxy, LogSinkSynchronousProxy,
9};
10use fuchsia_async as fasync;
11use fuchsia_component_client::connect::connect_to_protocol;
12use fuchsia_sync::Mutex;
13use futures::stream::StreamExt;
14use std::borrow::Borrow;
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[cfg(fuchsia_api_level_less_than = "27")]
21use fidl_fuchsia_diagnostics::Interest;
22#[cfg(fuchsia_api_level_at_least = "27")]
23use fidl_fuchsia_diagnostics_types::Interest;
24
25mod filter;
26mod sink;
27
28use filter::InterestFilter;
29use sink::{BufferedSink, IoBufferSink, Sink, SinkConfig};
30
31pub use diagnostics_log_encoding::Metatag;
32pub use diagnostics_log_encoding::encode::{LogEvent, TestRecord};
33pub use paste::paste;
34
35#[cfg(test)]
36use std::{
37    sync::atomic::{AtomicI64, Ordering},
38    time::Duration,
39};
40
41/// Callback for interest listeners
42pub trait OnInterestChanged {
43    /// Callback for when the interest changes
44    fn on_changed(&self, severity: Severity);
45}
46
47/// Options to configure a `Publisher`.
48#[derive(Default)]
49pub struct PublisherOptions<'t> {
50    pub(crate) interest: Interest,
51    listen_for_interest_updates: bool,
52    log_sink_client: Option<ClientEnd<LogSinkMarker>>,
53    pub(crate) metatags: HashSet<Metatag>,
54    pub(crate) tags: &'t [&'t str],
55    pub(crate) always_log_file_line: bool,
56    register_global_logger: bool,
57}
58
59impl Default for PublishOptions<'static> {
60    fn default() -> Self {
61        Self {
62            publisher: PublisherOptions {
63                // Default to registering the global logger and listening for interest updates for
64                // `PublishOptions` because it's used by the `initialize...` functions which are
65                // typically called at program start time.
66                listen_for_interest_updates: true,
67                register_global_logger: true,
68                ..PublisherOptions::default()
69            },
70            install_panic_hook: true,
71            panic_prefix: None,
72        }
73    }
74}
75
76macro_rules! publisher_options {
77    ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
78        $(
79            impl<'t> $name<'t> {
80                /// Whether or not to log file/line information regardless of severity.
81                ///
82                /// Default: false.
83                pub fn log_file_line_info(mut $self, enable: bool) -> Self {
84                    let this = &mut $self$(.$self_arg)*;
85                    this.always_log_file_line = enable;
86                    $self
87                }
88
89                /// When set, a `fuchsia_async::Task` will be spawned and held that will be
90                /// listening for interest changes. This option can only be set if
91                /// `register_global_logger` is set.
92                ///
93                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
94                pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
95                    let this = &mut $self$(.$self_arg)*;
96                    this.listen_for_interest_updates = enable;
97                    $self
98                }
99
100                /// Sets the `LogSink` that will be used.
101                ///
102                /// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
103                pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
104                    let this = &mut $self$(.$self_arg)*;
105                    this.log_sink_client = Some(client);
106                    $self
107                }
108
109                /// When set to true, the publisher will be registered as the global logger. This
110                /// can only be done once.
111                ///
112                /// Default: true for `PublishOptions`, false for `PublisherOptions`.
113                pub fn register_global_logger(mut $self, value: bool) -> Self {
114                    let this = &mut $self$(.$self_arg)*;
115                    this.register_global_logger = value;
116                    $self
117                }
118            }
119        )*
120    };
121}
122
123publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
124
125/// Initializes logging with the given options.
126///
127/// IMPORTANT: this should be called at most once in a program, and must be
128/// called only after an async executor has been set for the current thread,
129/// otherwise it'll return errors or panic. Therefore it's recommended to never
130/// call this from libraries and only do it from binaries.
131// Ideally this would be an async function, but fixing that is a bit of a Yak shave.
132pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
133    let result = Publisher::new_sync_with_async_listener(opts.publisher);
134    if matches!(result, Err(PublishError::MissingOnInit)) {
135        // NOTE: We ignore missing OnInit errors as these can happen on products where the log sink
136        // connection isn't routed. If this is a mistake, then there will be warning messages from
137        // Component Manager regarding failed routing.
138        return Ok(());
139    }
140    result?;
141    if opts.install_panic_hook {
142        crate::install_panic_hook(opts.panic_prefix);
143    }
144    Ok(())
145}
146
147/// Sets the global minimum log severity.
148/// IMPORTANT: this function can panic if `initialize` wasn't called before.
149pub fn set_minimum_severity(severity: impl Into<Severity>) {
150    let severity: Severity = severity.into();
151    log::set_max_level(severity.into());
152}
153
154/// Initializes logging with the given options.
155///
156/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
157/// used.
158///
159/// IMPORTANT: this should be called at most once in a program, and must be
160/// called only after an async executor has been set for the current thread,
161/// otherwise it'll return errors or panic. Therefore it's recommended to never
162/// call this from libraries and only do it from binaries.
163pub fn initialize_sync(opts: PublishOptions<'_>) {
164    match Publisher::new_sync(opts.publisher) {
165        Ok(_) => {}
166        Err(PublishError::MissingOnInit) => {
167            // NOTE: We ignore missing OnInit errors as these can happen on products where the log
168            // sink connection isn't routed. If this is a mistake, then there will be warning
169            // messages from Component Manager regarding failed routing.
170            return;
171        }
172        Err(e) => panic!("Unable to initialize logging: {e:?}"),
173    }
174    if opts.install_panic_hook {
175        crate::install_panic_hook(opts.panic_prefix);
176    }
177}
178
179/// A `Publisher` acts as broker, implementing [`log::Log`] to receive log
180/// events from a component, and then forwarding that data on to a diagnostics service.
181#[derive(Clone)]
182pub struct Publisher {
183    inner: Arc<InnerPublisher>,
184}
185
186struct InnerPublisher {
187    sink: IoBufferSink,
188    filter: InterestFilter,
189}
190
191impl Publisher {
192    fn new(opts: PublisherOptions<'_>, iob: zx::Iob) -> Self {
193        Self {
194            inner: Arc::new(InnerPublisher {
195                sink: IoBufferSink::new(
196                    iob,
197                    SinkConfig {
198                        tags: opts.tags.iter().map(|s| s.to_string()).collect(),
199                        metatags: opts.metatags,
200                        always_log_file_line: opts.always_log_file_line,
201                    },
202                ),
203                filter: InterestFilter::new(opts.interest),
204            }),
205        }
206    }
207
208    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
209    /// listener in a separate thread.
210    pub fn new_sync(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
211        let listen_for_interest_updates = opts.listen_for_interest_updates;
212        let (publisher, client) = Self::new_sync_no_listener(opts)?;
213        if listen_for_interest_updates {
214            let publisher = publisher.clone();
215            std::thread::spawn(move || {
216                fuchsia_async::LocalExecutor::default()
217                    .run_singlethreaded(publisher.listen_for_interest_updates(client.into_proxy()));
218            });
219        }
220        Ok(publisher)
221    }
222
223    /// Returns a new `Publisher`. This will connect synchronously and, if configured, run a
224    /// listener in an async task. Prefer to use `new_async`.
225    pub fn new_sync_with_async_listener(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
226        let listen_for_interest_updates = opts.listen_for_interest_updates;
227        let (publisher, client) = Self::new_sync_no_listener(opts)?;
228        if listen_for_interest_updates {
229            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(client.into_proxy()))
230                .detach();
231        }
232        Ok(publisher)
233    }
234
235    /// Returns a new `Publisher`, but doesn't listen for interest updates. This will connect
236    /// synchronously.
237    fn new_sync_no_listener(
238        mut opts: PublisherOptions<'_>,
239    ) -> Result<(Self, ClientEnd<LogSinkMarker>), PublishError> {
240        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
241
242        if listen_for_interest_updates && !register_global_logger {
243            // We can only support listening for interest updates if we are registering a global
244            // logger. This is because if we don't register, the initial interest is dropped.
245            return Err(PublishError::UnsupportedOption);
246        }
247
248        let client = match opts.log_sink_client.take() {
249            Some(log_sink) => log_sink,
250            None => connect_to_protocol()
251                .map_err(|e| e.to_string())
252                .map_err(PublishError::LogSinkConnect)?,
253        };
254
255        let proxy = zx::Unowned::<LogSinkSynchronousProxy>::new(client.channel());
256        let Ok(LogSinkEvent::OnInit {
257            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
258        }) = proxy.wait_for_event(zx::MonotonicInstant::INFINITE)
259        else {
260            return Err(PublishError::MissingOnInit);
261        };
262
263        let publisher = Self::new(opts, iob);
264
265        if register_global_logger {
266            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
267        }
268
269        Ok((publisher, client))
270    }
271
272    /// Returns a new `Publisher`. This will connect asynchronously and, if configured, run a
273    /// listener in an async task.
274    pub async fn new_async(mut opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
275        let PublisherOptions { listen_for_interest_updates, register_global_logger, .. } = opts;
276
277        if listen_for_interest_updates && !register_global_logger {
278            // We can only support listening for interest updates if we are registering a global
279            // logger. This is because if we don't register, the initial interest is dropped.
280            return Err(PublishError::UnsupportedOption);
281        }
282
283        let proxy = match opts.log_sink_client.take() {
284            Some(log_sink) => log_sink.into_proxy(),
285            None => connect_to_protocol()
286                .map_err(|e| e.to_string())
287                .map_err(PublishError::LogSinkConnect)?,
288        };
289
290        let Some(Ok(LogSinkEvent::OnInit {
291            payload: LogSinkOnInitRequest { buffer: Some(iob), interest, .. },
292        })) = proxy.take_event_stream().next().await
293        else {
294            return Err(PublishError::MissingOnInit);
295        };
296
297        let publisher = Self::new(opts, iob);
298
299        if register_global_logger {
300            publisher.register_logger(if listen_for_interest_updates { interest } else { None })?;
301            fasync::Task::spawn(publisher.clone().listen_for_interest_updates(proxy)).detach();
302        }
303
304        Ok(publisher)
305    }
306
307    /// Publish the provided event for testing.
308    pub fn event_for_testing(&self, record: TestRecord<'_>) {
309        if self.inner.filter.enabled_for_testing(&record) {
310            self.inner.sink.event_for_testing(record);
311        }
312    }
313
314    /// Registers an interest listener
315    pub fn set_interest_listener<T>(&self, listener: T)
316    where
317        T: OnInterestChanged + Send + Sync + 'static,
318    {
319        self.inner.filter.set_interest_listener(listener);
320    }
321
322    /// Sets the global logger to this publisher. This function may only be called once in the
323    /// lifetime of a program.
324    pub fn register_logger(&self, interest: Option<Interest>) -> Result<(), PublishError> {
325        self.inner.filter.update_interest(interest.unwrap_or_default());
326        // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime of the
327        // program.
328        unsafe {
329            let ptr = Arc::into_raw(self.inner.clone());
330            log::set_logger(&*ptr).inspect_err(|_| {
331                let _ = Arc::from_raw(ptr);
332            })?;
333        }
334        Ok(())
335    }
336
337    /// Listens for interest updates. Callers must maintain a clone to keep the publisher alive;
338    /// this function will downgrade to a weak reference.
339    async fn listen_for_interest_updates(self, proxy: LogSinkProxy) {
340        self.inner.filter.listen_for_interest_updates(proxy).await;
341    }
342}
343
344impl log::Log for InnerPublisher {
345    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
346        // NOTE: we handle minimum severity directly through the log max_level. So we call,
347        // log::set_max_level, log::max_level where appropriate.
348        true
349    }
350
351    fn log(&self, record: &log::Record<'_>) {
352        self.sink.record_log(record);
353    }
354
355    fn flush(&self) {}
356}
357
358impl log::Log for Publisher {
359    #[inline]
360    fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
361        self.inner.enabled(metadata)
362    }
363
364    #[inline]
365    fn log(&self, record: &log::Record<'_>) {
366        self.inner.log(record)
367    }
368
369    #[inline]
370    fn flush(&self) {
371        self.inner.flush()
372    }
373}
374
375impl Borrow<InterestFilter> for InnerPublisher {
376    fn borrow(&self) -> &InterestFilter {
377        &self.filter
378    }
379}
380
381/// Initializes logging, but buffers logs until the connection is established. This is required for
382/// things like Component Manager, which would otherwise deadlock when starting. This carries some
383/// overhead, so should be avoided unless required.
384pub fn initialize_buffered(opts: PublishOptions<'_>) -> Result<(), PublishError> {
385    BufferedPublisher::new(opts.publisher)?;
386    if opts.install_panic_hook {
387        crate::install_panic_hook(opts.panic_prefix);
388    }
389    Ok(())
390}
391
392/// A buffered publisher will buffer log messages until the IOBuffer is received. If this is
393/// registered as the global logger, then messages will be logged at the default level until an
394/// updated level is received from Archivist.
395pub struct BufferedPublisher {
396    sink: BufferedSink,
397    filter: InterestFilter,
398    interest_listening_task: Mutex<Option<fasync::Task<()>>>,
399}
400
401impl BufferedPublisher {
402    /// Returns a publisher that will buffer messages until the IOBuffer is received. An async
403    /// executor must be established.
404    pub fn new(opts: PublisherOptions<'_>) -> Result<Arc<Self>, PublishError> {
405        if opts.listen_for_interest_updates && !opts.register_global_logger {
406            // We can only support listening for interest updates if we are registering a global
407            // logger. This is because if we don't register, the initial interest is dropped.
408            return Err(PublishError::UnsupportedOption);
409        }
410
411        let client = match opts.log_sink_client {
412            Some(log_sink) => log_sink,
413            None => connect_to_protocol()
414                .map_err(|e| e.to_string())
415                .map_err(PublishError::LogSinkConnect)?,
416        };
417
418        let this = Arc::new(Self {
419            sink: BufferedSink::new(SinkConfig {
420                tags: opts.tags.iter().map(|s| s.to_string()).collect(),
421                metatags: opts.metatags,
422                always_log_file_line: opts.always_log_file_line,
423            }),
424            filter: InterestFilter::new(opts.interest),
425            interest_listening_task: Mutex::default(),
426        });
427
428        if opts.register_global_logger {
429            // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime
430            // of the program. This leaks even when there is an error (which shouldn't happen so
431            // we don't worry about it).
432            unsafe {
433                log::set_logger(&*Arc::into_raw(this.clone()))?;
434            }
435        }
436
437        // Whilst we are waiting for the OnInit event, we hold a strong reference to the publisher
438        // which will prevent the publisher from being dropped and ensure that buffered log messages
439        // are sent.
440        let this_clone = this.clone();
441        *this_clone.interest_listening_task.lock() = Some(fasync::Task::spawn(async move {
442            let proxy = client.into_proxy();
443
444            let Some(Ok(LogSinkEvent::OnInit {
445                payload: LogSinkOnInitRequest { buffer: Some(buffer), interest, .. },
446            })) = proxy.take_event_stream().next().await
447            else {
448                // There's not a lot we can do here: we haven't received the event we expected
449                // and there's no way we can log the issue.
450                return;
451            };
452
453            // Ignore the interest sent in the OnInit request if `listen_for_interest_updates`
454            // is false; it is assumed that the caller wants the interest specified in the
455            // options to stick.
456            this.filter.update_interest(
457                (if opts.listen_for_interest_updates { interest } else { None })
458                    .unwrap_or_default(),
459            );
460
461            this.sink.set_buffer(buffer);
462
463            if opts.listen_for_interest_updates {
464                this.filter.listen_for_interest_updates(proxy).await;
465            }
466        }));
467
468        Ok(this_clone)
469    }
470}
471
472impl log::Log for BufferedPublisher {
473    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
474        // NOTE: we handle minimum severity directly through the log max_level. So we call,
475        // log::set_max_level, log::max_level where appropriate.
476        true
477    }
478
479    fn log(&self, record: &log::Record<'_>) {
480        self.sink.record_log(record);
481    }
482
483    fn flush(&self) {}
484}
485
486impl Borrow<InterestFilter> for BufferedPublisher {
487    fn borrow(&self) -> &InterestFilter {
488        &self.filter
489    }
490}
491
492/// Errors arising while forwarding a diagnostics stream to the environment.
493#[derive(Debug, Error)]
494pub enum PublishError {
495    /// Connection to fuchsia.logger.LogSink failed.
496    #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
497    LogSinkConnect(String),
498
499    /// Couldn't create a new socket.
500    #[error("failed to create a socket for logging")]
501    MakeSocket(#[source] zx::Status),
502
503    /// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
504    #[error("failed to send a socket to the LogSink")]
505    SendSocket(#[source] fidl::Error),
506
507    /// Installing a Logger.
508    #[error("failed to install the loger")]
509    InitLogForward(#[from] log::SetLoggerError),
510
511    /// Unsupported publish option.
512    #[error("unsupported option")]
513    UnsupportedOption,
514
515    /// The channel was closed with no OnInit event.
516    #[error("did not receive the OnInit event")]
517    MissingOnInit,
518}
519
520#[cfg(test)]
521static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
522
523/// Increments the test clock.
524#[cfg(test)]
525pub fn increment_clock(duration: Duration) {
526    CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
527}
528
529/// Gets the current monotonic time in nanoseconds.
530#[doc(hidden)]
531pub fn get_now() -> i64 {
532    #[cfg(not(test))]
533    return zx::MonotonicInstant::get().into_nanos();
534
535    #[cfg(test)]
536    CURRENT_TIME_NANOS.load(Ordering::Relaxed)
537}
538
539/// Logs every N seconds using an Atomic variable
540/// to keep track of the time. This will have a higher
541/// performance impact on ARM compared to regular logging due to the use
542/// of an atomic.
543#[macro_export]
544macro_rules! log_every_n_seconds {
545    ($seconds:expr, $severity:expr, $($arg:tt)*) => {
546        use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
547        use $crate::{paste, fuchsia::get_now};
548
549        let now = get_now();
550
551        static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
552        if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
553            paste! {
554                log::[< $severity:lower >]!($($arg)*);
555            }
556            LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
557        }
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use assert_matches::assert_matches;
565    use diagnostics_reader::ArchiveReader;
566    use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
567    use fuchsia_async::TimeoutExt;
568    use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
569    use futures::{FutureExt, StreamExt, future};
570    use log::{debug, error, info};
571    use moniker::ExtendedMoniker;
572
573    #[fuchsia::test]
574    async fn panic_integration_test() {
575        let builder = RealmBuilder::new().await.unwrap();
576        let puppet = builder
577            .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
578            .await
579            .unwrap();
580        builder
581            .add_route(
582                Route::new()
583                    .capability(Capability::protocol::<CrasherMarker>())
584                    .from(&puppet)
585                    .to(Ref::parent()),
586            )
587            .await
588            .unwrap();
589        let realm = builder.build().await.unwrap();
590        let child_name = realm.root.child_name();
591        let reader = ArchiveReader::logs();
592        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
593        let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
594        let target_moniker =
595            ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
596                .unwrap();
597        proxy.crash("This is a test panic.").await.unwrap();
598
599        let result =
600            logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
601        assert_eq!(result.line_number(), Some(29).as_ref());
602        assert_eq!(
603            result.file_path(),
604            Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
605        );
606        assert!(
607            result
608                .payload_keys()
609                .unwrap()
610                .get_property("info")
611                .unwrap()
612                .to_string()
613                .contains("This is a test panic.")
614        );
615    }
616
617    #[fuchsia::test(logging = false)]
618    async fn verify_setting_minimum_log_severity() {
619        let reader = ArchiveReader::logs();
620        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
621        let _publisher = Publisher::new_async(PublisherOptions {
622            tags: &["verify_setting_minimum_log_severity"],
623            register_global_logger: true,
624            ..PublisherOptions::default()
625        })
626        .await
627        .expect("initialized log");
628
629        info!("I'm an info log");
630        debug!("I'm a debug log and won't show up");
631
632        set_minimum_severity(Severity::Debug);
633        debug!("I'm a debug log and I show up");
634
635        let results = logs
636            .filter(|data| {
637                future::ready(
638                    data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
639                )
640            })
641            .take(2)
642            .collect::<Vec<_>>()
643            .await;
644        assert_eq!(results[0].msg().unwrap(), "I'm an info log");
645        assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
646    }
647
648    #[fuchsia::test]
649    async fn log_macro_logs_are_recorded() {
650        let reader = ArchiveReader::logs();
651        let (logs, mut errors) = reader.snapshot_then_subscribe().unwrap().split_streams();
652
653        let total_threads = 10;
654
655        for i in 0..total_threads {
656            std::thread::spawn(move || {
657                log::info!(thread=i; "log from thread {}", i);
658            });
659        }
660
661        let mut filtered = 0;
662        let mut results = logs
663            .filter(|data| {
664                future::ready(
665                    if data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded") {
666                        true
667                    } else {
668                        filtered += 1;
669                        false
670                    },
671                )
672            })
673            .take(total_threads);
674
675        let mut seen = vec![];
676        while let Some(log) = results
677            .next()
678            .on_timeout(Duration::from_secs(30), || {
679                error!("Timed out!");
680                None
681            })
682            .await
683        {
684            let hierarchy = log.payload_keys().unwrap();
685            assert_eq!(hierarchy.properties.len(), 1);
686            assert_eq!(hierarchy.properties[0].name(), "thread");
687            let thread_id = hierarchy.properties[0].uint().unwrap();
688            seen.push(thread_id as usize);
689            assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
690        }
691
692        assert_matches!(errors.next().now_or_never(), None);
693
694        seen.sort();
695        assert_eq!(seen, (0..total_threads).collect::<Vec<_>>(), "filtered={filtered}");
696    }
697}