archivist_lib/logs/
serial.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
8use fuchsia_async::OnSignals;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::channel::{mpsc, oneshot};
11use futures::executor::block_on;
12use futures::{FutureExt, StreamExt, select};
13use log::warn;
14use selectors::FastError;
15use std::collections::HashSet;
16use std::io::{self, Write};
17use std::pin::pin;
18use std::sync::Arc;
19use std::{mem, thread};
20use zx::Signals;
21
22pub const MAX_SERIAL_WRITE_SIZE: usize = 256;
23
24/// Function that forwards logs from Archivist to the serial port. Logs will be filtered by
25/// `allow_serial_log_tags` to include logs in the serial output, and `deny_serial_log_tags` to
26/// exclude specific tags.
27pub async fn launch_serial(
28    allow_serial_log_tags: impl IntoIterator<Item = impl AsRef<str>>,
29    deny_serial_log_tags: impl IntoIterator<Item = impl ToString>,
30    logs_repo: Arc<LogsRepository>,
31    sink: impl Write + Send + 'static,
32    mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
33    mut flush_receiver: UnboundedReceiver<oneshot::Sender<()>>,
34) {
35    let mut writer =
36        SerialWriter::new(sink, deny_serial_log_tags.into_iter().map(|s| s.to_string()).collect());
37
38    let mut barrier = writer.get_barrier();
39    let mut log_stream = pin!(logs_repo.logs_cursor(
40        StreamMode::SnapshotThenSubscribe,
41        selectors_from_tags(allow_serial_log_tags),
42    ));
43    loop {
44        select! {
45            log = log_stream.next() => {
46                if let Some(log) = log {
47                    // We've received a log
48                    writer.log(&log).await;
49                } else {
50                    // We've hit the end of the log stream and Archivist is shutting down.
51                    break;
52                }
53            }
54            freeze_request = freeze_receiver.next() => {
55                if let Some(request) = freeze_request {
56                    // We must use the barrier before we send back the event.
57                    barrier.wait().await;
58                    let (client, server) = zx::EventPair::create();
59                    let _ = request.send(client);
60                    let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
61                }
62            }
63            flush_request = flush_receiver.next() => {
64                if let Some(flush_request) = flush_request {
65                    // We have a background thread that polls sockets.  Ensure the background thread
66                    // is polled first before we write the remaining logs to serial to ensure we
67                    // capture all logs.
68                    logs_repo.flush().await;
69
70                    // Write all pending logs to serial
71                    while let Some(Some(log)) = log_stream.next().now_or_never() {
72                        writer.log(&log).await;
73                    }
74
75                    // Flush the serial thread (ensuring everything goes to serial)
76                    writer.get_barrier().wait().await;
77
78                    // Reply to the flush request and continue normal logging operations.  Ignore
79                    // the error, because the channel may have been closed which is OK.
80                    let _ = flush_request.send(());
81                }
82            }
83        }
84    }
85    // Ensure logs are flushed before we finish.
86    writer.get_barrier().wait().await;
87}
88
89fn selectors_from_tags(tags: impl IntoIterator<Item = impl AsRef<str>>) -> Vec<ComponentSelector> {
90    tags.into_iter()
91        .filter_map(|selector| {
92            let selector = selector.as_ref();
93            match selectors::parse_component_selector::<FastError>(selector) {
94                Ok(s) => Some(s),
95                Err(err) => {
96                    warn!(selector:%, err:?; "Failed to parse component selector");
97                    None
98                }
99            }
100        })
101        .collect()
102}
103
104/// A sink to write to serial. This Write implementation must be used together with SerialWriter.
105#[derive(Default)]
106pub struct SerialSink;
107
108impl Write for SerialSink {
109    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
110        if cfg!(debug_assertions) {
111            debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
112        } else {
113            use std::sync::atomic::{AtomicBool, Ordering};
114            static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
115            if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
116            {
117                let size = buffer.len();
118                log::error!(
119                    size;
120                    "Skipping write to serial due to internal error. Exceeded max buffer size."
121                );
122                return Ok(buffer.len());
123            }
124        }
125        // SAFETY: calling a syscall. We pass a pointer to the buffer and its exact size.
126        unsafe {
127            zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
128        }
129        Ok(buffer.len())
130    }
131
132    fn flush(&mut self) -> io::Result<()> {
133        Ok(())
134    }
135}
136
137/// An enum to represent commands sent to the worker thread.
138enum WorkerCommand {
139    /// Write the given bytes.
140    Write(Vec<u8>),
141    /// Flush all pending writes and notify when done.
142    Flush(oneshot::Sender<()>),
143}
144
145/// A writer that provides an async interface to a synchronous, blocking write operation.
146///
147/// It spawns a dedicated thread to handle the synchronous writes, allowing the async
148/// context to remain non-blocking.
149struct SerialWriter {
150    denied_tags: HashSet<String>,
151    sender: mpsc::UnboundedSender<WorkerCommand>,
152    buffers: mpsc::UnboundedReceiver<Vec<u8>>,
153    current_buffer: Vec<u8>,
154}
155
156impl SerialWriter {
157    /// Creates a new Writer and spawns a worker thread.
158    fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
159        let (sender, mut receiver) = mpsc::unbounded();
160        let (buffer_sender, buffers) = mpsc::unbounded();
161
162        // Limit to 32 buffers (one comes from `current_buffer`).
163        for _ in 0..31 {
164            buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
165        }
166
167        thread::spawn(move || {
168            block_on(async {
169                while let Some(command) = receiver.next().await {
170                    match command {
171                        WorkerCommand::Write(bytes) => {
172                            // Ignore errors.
173                            let _ = sink.write(&bytes);
174                            // Send the buffer back.
175                            let _ = buffer_sender.unbounded_send(bytes);
176                        }
177                        WorkerCommand::Flush(notifier) => {
178                            let _ = notifier.send(());
179                        }
180                    }
181                }
182            });
183        });
184
185        Self {
186            denied_tags,
187            sender,
188            buffers,
189            current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
190        }
191    }
192
193    /// Asynchronously writes bytes.
194    async fn write(&mut self, mut bytes: &[u8]) {
195        while !bytes.is_empty() {
196            if self.current_buffer.capacity() == 0 {
197                self.current_buffer = self.buffers.next().await.unwrap();
198                self.current_buffer.clear();
199            }
200            let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
201            self.current_buffer.extend(part);
202            if !rem.is_empty() {
203                self.flush();
204            }
205            bytes = rem;
206        }
207    }
208
209    /// Return a synchronous writer with the required capacity.
210    ///
211    /// NOTE: Using `io_writer` will mean that line breaks don't occur in the middle: they will
212    /// always be at the beginning of whatever is being output, which is different from what happens
213    /// with `write`.
214    async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
215        assert!(required < MAX_SERIAL_WRITE_SIZE);
216        if self.current_buffer.capacity() == 0 || self.space() < required {
217            self.flush();
218            self.current_buffer = self.buffers.next().await.unwrap();
219            self.current_buffer.clear();
220        }
221        IoWriter(self)
222    }
223
224    /// Flush the buffer.
225    fn flush(&mut self) {
226        if !self.current_buffer.is_empty() {
227            self.current_buffer.push(b'\n');
228            self.sender
229                .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
230                .unwrap();
231        }
232    }
233
234    /// Returns a barrier.
235    fn get_barrier(&self) -> Barrier {
236        Barrier(self.sender.clone())
237    }
238
239    /// Returns the amount of space in the current buffer.
240    fn space(&self) -> usize {
241        // Always leave room for the \n.
242        MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
243    }
244
245    /// Writes a log record.
246    async fn log(&mut self, log: &Data<Logs>) {
247        // Most of the time, this is empty.
248        if !self.denied_tags.is_empty() {
249            if let Some(tags) = log.tags()
250                && tags.iter().any(|tag| self.denied_tags.contains(tag))
251            {
252                return;
253            }
254
255            // Consider `component_name` as a tag. A log viewer will be presented
256            // the component name just as any tag.
257            let component_name = log.component_name();
258
259            if self.denied_tags.contains(component_name.as_ref()) {
260                return;
261            }
262
263            // This may be a dynamic collection, split into parts, and try and match each part
264            // to the tags.
265            for name in component_name.split(":") {
266                if self.denied_tags.contains(name) {
267                    return;
268                }
269            }
270        }
271
272        write!(
273            self.io_writer(64).await, // 64 is ample
274            "[{:05}.{:03}] {:05}:{:05}> [",
275            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
276            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
277                % 1000,
278            log.pid().unwrap_or(0),
279            log.tid().unwrap_or(0)
280        )
281        .unwrap();
282
283        if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
284            for (i, tag) in tags.iter().enumerate() {
285                self.write(tag.as_bytes()).await;
286                if i < tags.len() - 1 {
287                    self.write(b", ").await;
288                }
289            }
290        } else {
291            self.write(log.component_name().as_bytes()).await;
292        }
293
294        // Write this separately from the next so that the line-break, if necessary (unlikely
295        // because the tags shouldn't take up much space), comes after this, but before the
296        // severity.
297        self.write(b"]").await;
298
299        write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
300        if let Some(m) = log.msg() {
301            self.write(m.as_bytes()).await;
302        }
303
304        for key_str in log.payload_keys_strings() {
305            self.write(b" ").await;
306            self.write(key_str.as_bytes()).await;
307        }
308
309        // NOTE: Whilst it might be tempting (for performance reasons) to try and buffer up more
310        // messages before flushing, there are downstream consumers (in tests) that get confused if
311        // part lines are written to the serial log, so we must make sure we write whole lines, and
312        // it's easiest if we just send one line at a time.
313        self.flush();
314    }
315}
316
317struct IoWriter<'a>(&'a mut SerialWriter);
318
319impl Write for IoWriter<'_> {
320    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
321        self.0.current_buffer.extend(buf);
322        Ok(buf.len())
323    }
324
325    fn flush(&mut self) -> io::Result<()> {
326        Ok(())
327    }
328}
329
330struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
331
332impl Barrier {
333    /// Asynchronously waits for all pending writes to complete.
334    async fn wait(&mut self) {
335        let (tx, rx) = oneshot::channel();
336
337        self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
338
339        // Wait for the worker thread to signal that the flush is complete.
340        let _ = rx.await;
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use fuchsia_async::{self as fasync};
347    use futures::SinkExt;
348    use futures::channel::mpsc::{self, unbounded};
349
350    use super::*;
351    use crate::identity::ComponentIdentity;
352    use crate::logs::testing::make_message;
353    use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
354    use fidl::endpoints::create_proxy;
355    use fidl_fuchsia_logger::LogSinkMarker;
356    use fuchsia_async::TimeoutExt;
357    use fuchsia_sync::Mutex;
358    use futures::FutureExt;
359    use moniker::ExtendedMoniker;
360    use std::time::Duration;
361    use zx::BootInstant;
362
363    /// TestSink will send log lines received (delimited by \n) over a channel.
364    struct TestSink {
365        buffer: Vec<u8>,
366        snd: mpsc::UnboundedSender<String>,
367    }
368
369    impl TestSink {
370        fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
371            let (snd, rcv) = mpsc::unbounded();
372            (Self { buffer: Vec::new(), snd }, rcv)
373        }
374    }
375
376    impl Write for TestSink {
377        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
378            for chunk in buf.split_inclusive(|c| *c == b'\n') {
379                if !self.buffer.is_empty() {
380                    self.buffer.extend(chunk);
381                    if *self.buffer.last().unwrap() == b'\n' {
382                        self.snd
383                            .unbounded_send(
384                                String::from_utf8(std::mem::take(&mut self.buffer))
385                                    .expect("wrote valid utf8"),
386                            )
387                            .expect("sent item");
388                    }
389                } else if *chunk.last().unwrap() == b'\n' {
390                    self.snd
391                        .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
392                        .unwrap();
393                } else {
394                    self.buffer.extend(chunk);
395                }
396            }
397            Ok(buf.len())
398        }
399
400        fn flush(&mut self) -> io::Result<()> {
401            Ok(())
402        }
403    }
404
405    /// FakeSink collects logs into a buffer.
406    #[derive(Clone, Default)]
407    struct FakeSink(Arc<Mutex<Vec<u8>>>);
408
409    impl FakeSink {
410        fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
411            f(&self.0.lock())
412        }
413    }
414
415    impl Write for FakeSink {
416        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
417            self.0.lock().write(buf)
418        }
419
420        fn flush(&mut self) -> io::Result<()> {
421            unreachable!();
422        }
423    }
424
425    #[fuchsia::test]
426    async fn write_to_serial_handles_denied_tags() {
427        let log = LogsDataBuilder::new(BuilderArgs {
428            timestamp: BootInstant::from_nanos(1),
429            component_url: Some("url".into()),
430            moniker: "core/foo".try_into().unwrap(),
431            severity: Severity::Info,
432        })
433        .add_tag("denied-tag")
434        .build();
435        let sink = FakeSink::default();
436        let mut writer =
437            SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
438        writer.log(&log).await;
439        writer.get_barrier().wait().await;
440        assert!(sink.with_buffer(|b| b.is_empty()));
441    }
442
443    #[fuchsia::test]
444    async fn write_to_serial_splits_lines() {
445        let message = concat!(
446            "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
447            "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
448            "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
449        );
450        let log = LogsDataBuilder::new(BuilderArgs {
451            timestamp: BootInstant::from_nanos(123456789),
452            component_url: Some("url".into()),
453            moniker: "core/foo".try_into().unwrap(),
454            severity: Severity::Info,
455        })
456        .add_tag("bar")
457        .set_message(message)
458        .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
459        .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
460        .set_pid(1234)
461        .set_tid(5678)
462        .build();
463        let sink = FakeSink::default();
464        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
465        writer.log(&log).await;
466        writer.get_barrier().wait().await;
467        sink.with_buffer(|b| {
468            assert_eq!(
469                str::from_utf8(b).unwrap(),
470                format!(
471                    "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
472                    &message[..218],
473                    &message[218..]
474                )
475            )
476        });
477    }
478
479    #[fuchsia::test]
480    async fn when_no_tags_are_present_the_component_name_is_used() {
481        let log = LogsDataBuilder::new(BuilderArgs {
482            timestamp: BootInstant::from_nanos(123456789),
483            component_url: Some("url".into()),
484            moniker: "core/foo".try_into().unwrap(),
485            severity: Severity::Info,
486        })
487        .set_message("my msg")
488        .set_pid(1234)
489        .set_tid(5678)
490        .build();
491        let sink = FakeSink::default();
492        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
493        writer.log(&log).await;
494        writer.flush();
495        writer.get_barrier().wait().await;
496        sink.with_buffer(|b| {
497            assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
498        });
499    }
500
501    #[fuchsia::test]
502    async fn pauses_logs_correctly() {
503        let repo = LogsRepository::for_test(fasync::Scope::new());
504
505        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
506            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
507            "fuchsia-pkg://bootstrap-foo",
508        )));
509        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
510            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
511            "fuchsia-pkg://bootstrap-bar",
512        )));
513        let bootstrap_denied_component_container =
514            repo.get_log_container(Arc::new(ComponentIdentity::new(
515                ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
516                "fuchsia-pkg://bootstrap-denied_component",
517            )));
518
519        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
520            ExtendedMoniker::parse_str("./core/foo").unwrap(),
521            "fuchsia-pkg://core-foo",
522        )));
523        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
524            ExtendedMoniker::parse_str("./core/baz").unwrap(),
525            "fuchsia-pkg://core-baz",
526        )));
527
528        bootstrap_foo_container.ingest_message(make_message(
529            "a",
530            None,
531            zx::BootInstant::from_nanos(1),
532        ));
533
534        let (_flush_sender, flush_receiver) = unbounded();
535
536        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
537        let (sink, mut rcv) = TestSink::new();
538        let cloned_repo = Arc::clone(&repo);
539        let (mut sender, receiver) = unbounded();
540        let _serial_task = fasync::Task::spawn(async move {
541            let allowed = &["bootstrap/**", "/core/foo"];
542            let denied = &["denied_tag", "denied_component"];
543            launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
544        });
545
546        bootstrap_bar_container.ingest_message(make_message(
547            "b",
548            Some("denied_tag"),
549            zx::BootInstant::from_nanos(3),
550        ));
551        bootstrap_denied_component_container.ingest_message(make_message(
552            "d",
553            None,
554            zx::BootInstant::from_nanos(3),
555        ));
556
557        let received = rcv.next().await.unwrap();
558        assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
559
560        let (tx, rx) = oneshot::channel();
561        sender.send(tx).await.unwrap();
562
563        let freeze_token = rx.await.unwrap();
564
565        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
566
567        // The pipeline is asynchronous, so all we can do is assert that no message is sent with a
568        // timeout.
569        assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
570
571        drop(freeze_token);
572
573        assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
574    }
575
576    #[fuchsia::test]
577    async fn writes_ingested_logs() {
578        let repo = LogsRepository::for_test(fasync::Scope::new());
579
580        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
581            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
582            "fuchsia-pkg://bootstrap-foo",
583        )));
584        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
585            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
586            "fuchsia-pkg://bootstrap-bar",
587        )));
588        let denied_collection = repo.get_log_container(Arc::new(ComponentIdentity::new(
589            ExtendedMoniker::parse_str("./bootstrap/denied-collection:foo-bar").unwrap(),
590            "fuchsia-pkg://bootstrap-denied-collection:foo-bar",
591        )));
592        let bootstrap_denied_component_container =
593            repo.get_log_container(Arc::new(ComponentIdentity::new(
594                ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
595                "fuchsia-pkg://bootstrap-denied_component",
596            )));
597        let denied_collection_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
598            ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
599            "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
600        )));
601
602        let denied_collection_and_instance =
603            repo.get_log_container(Arc::new(ComponentIdentity::new(
604                ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
605                "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
606            )));
607
608        let collection_and_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
609            ExtendedMoniker::parse_str("./bootstrap/collection:foo-bar").unwrap(),
610            "fuchsia-pkg://bootstrap-collection:foo-bar",
611        )));
612
613        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
614            ExtendedMoniker::parse_str("./core/foo").unwrap(),
615            "fuchsia-pkg://core-foo",
616        )));
617        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
618            ExtendedMoniker::parse_str("./core/baz").unwrap(),
619            "fuchsia-pkg://core-baz",
620        )));
621
622        bootstrap_foo_container.ingest_message(make_message(
623            "a",
624            None,
625            zx::BootInstant::from_nanos(1),
626        ));
627        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
628        denied_collection.ingest_message(make_message("d", None, zx::BootInstant::from_nanos(2)));
629        denied_collection_instance.ingest_message(make_message(
630            "e",
631            None,
632            zx::BootInstant::from_nanos(2),
633        ));
634        denied_collection_and_instance.ingest_message(make_message(
635            "f",
636            None,
637            zx::BootInstant::from_nanos(2),
638        ));
639        collection_and_instance.ingest_message(make_message(
640            "g",
641            None,
642            zx::BootInstant::from_nanos(2),
643        ));
644
645        let (sink, rcv) = TestSink::new();
646
647        let (_freeze_sender, freeze_receiver) = unbounded();
648        let (_flush_sender, flush_receiver) = unbounded();
649        let _serial_task = fasync::Task::spawn(launch_serial(
650            &["bootstrap/**", "/core/foo"],
651            &[
652                "denied_tag",
653                "denied_component",
654                "denied-collection",
655                "denied-foo-bar",
656                "collection:denied-foo-bar",
657            ],
658            Arc::clone(&repo),
659            sink,
660            freeze_receiver,
661            flush_receiver,
662        ));
663
664        bootstrap_bar_container.ingest_message(make_message(
665            "b",
666            Some("denied_tag"),
667            zx::BootInstant::from_nanos(3),
668        ));
669        bootstrap_denied_component_container.ingest_message(make_message(
670            "d",
671            None,
672            zx::BootInstant::from_nanos(3),
673        ));
674        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
675        let received: Vec<_> = rcv.take(3).collect().await;
676
677        // We must see the logs emitted before we installed the serial listener and after. We must
678        // not see the log from /core/baz and we must not see the log from bootstrap/bar with tag
679        // "foo".
680        assert_eq!(
681            received,
682            vec![
683                "[00000.000] 00001:00002> [foo] DEBUG: a\n",
684                "[00000.000] 00001:00002> [collection:foo-bar] DEBUG: g\n",
685                "[00000.000] 00001:00002> [foo] DEBUG: c\n"
686            ]
687        );
688    }
689
690    #[fuchsia::test]
691    async fn flush_drains_all_logs() {
692        for _ in 0..500 {
693            let scope = fasync::Scope::new();
694            let repo = LogsRepository::for_test(scope.new_child());
695            let identity = Arc::new(ComponentIdentity::new(
696                ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
697                "fuchsia-pkg://bootstrap-foo",
698            ));
699
700            let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
701            let log_container = repo.get_log_container(identity);
702            log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
703            let (client_socket, server_socket) = zx::Socket::create_datagram();
704            log_sink.connect_structured(server_socket).unwrap();
705
706            let (sink, mut rcv) = TestSink::new();
707            let cloned_repo = Arc::clone(&repo);
708            let (_freeze_sender, freeze_receiver) = unbounded();
709            let (flush_sender, flush_receiver) = unbounded();
710            scope.spawn(async move {
711                let allowed = &["bootstrap/**"];
712                let denied: &[&str] = &[];
713                launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
714                    .await;
715            });
716
717            let mut buffer = [0u8; 1024];
718            let cursor = std::io::Cursor::new(&mut buffer[..]);
719            let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
720                cursor,
721                diagnostics_log_encoding::encode::EncoderOpts::default(),
722            );
723            // Wait for initial interest to ensure that the socket is registered.
724            log_sink.wait_for_interest_change().await.unwrap().unwrap();
725
726            encoder
727                .write_record(diagnostics_log_encoding::Record {
728                    timestamp: zx::BootInstant::from_nanos(1),
729                    severity: Severity::Info as u8,
730                    arguments: vec![
731                        diagnostics_log_encoding::Argument::new("tag", "foo"),
732                        diagnostics_log_encoding::Argument::new("message", "a"),
733                    ],
734                })
735                .unwrap();
736            client_socket
737                .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
738                .unwrap();
739            let (sender, receiver) = oneshot::channel();
740            // Send the log flush command
741            flush_sender.unbounded_send(sender).unwrap();
742            // Wait for flush to complete. Flush involves a background thread,
743            // so keep polling until the background thread handles the flush event.
744            assert_eq!(receiver.await, Ok(()));
745            let received = rcv.next().now_or_never().unwrap().unwrap();
746            assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
747        }
748    }
749}