diagnostics_log/fuchsia/
sink.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use diagnostics_log_encoding::encode::{
5    Encoder, EncoderOpts, EncodingError, LogEvent, MutableBuffer, TestRecord, WriteEventParams,
6};
7use diagnostics_log_encoding::{Header, Metatag};
8use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
9use fuchsia_runtime as rt;
10use fuchsia_sync::RwLock;
11use std::cell::UnsafeCell;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::mem::MaybeUninit;
15use std::sync::OnceLock;
16use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
17
18// This is the amount of data that can be buffered by the BufferedPublisher before messages are
19// dropped.
20const QUEUE_SIZE: usize = 256 * 1024;
21
22#[derive(Default)]
23pub(crate) struct SinkConfig {
24    pub(crate) metatags: HashSet<Metatag>,
25    pub(crate) tags: Vec<String>,
26    pub(crate) always_log_file_line: bool,
27}
28
29thread_local! {
30    static PROCESS_ID: zx::Koid = rt::process_self()
31        .koid()
32        .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
33    static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
34        thread.koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
35    });
36}
37
38pub trait Sink {
39    fn num_events_dropped(&self) -> &AtomicU32;
40    fn config(&self) -> &SinkConfig;
41    fn send(&self, packet: &[u8]) -> Result<(), zx::Status>;
42
43    fn event_for_testing(&self, record: TestRecord<'_>) {
44        self.encode_and_send(move |encoder, previously_dropped| {
45            encoder.write_event(WriteEventParams {
46                event: record,
47                tags: &self.config().tags,
48                metatags: std::iter::empty(),
49                pid: PROCESS_ID.with(|p| *p),
50                tid: THREAD_ID.with(|t| *t),
51                dropped: previously_dropped.into(),
52            })
53        });
54    }
55
56    fn record_log(&self, record: &log::Record<'_>) {
57        self.encode_and_send(|encoder, previously_dropped| {
58            encoder.write_event(WriteEventParams {
59                event: LogEvent::new(record),
60                tags: &self.config().tags,
61                metatags: self.config().metatags.iter(),
62                pid: PROCESS_ID.with(|p| *p),
63                tid: THREAD_ID.with(|t| *t),
64                dropped: previously_dropped.into(),
65            })
66        });
67    }
68
69    #[inline]
70    fn encode_and_send(
71        &self,
72        encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
73    ) {
74        let ordering = Ordering::Relaxed;
75        let num_events_dropped = self.num_events_dropped();
76        let previously_dropped = num_events_dropped.swap(0, ordering);
77        let restore_and_increment_dropped_count = || {
78            num_events_dropped.fetch_add(previously_dropped + 1, ordering);
79        };
80
81        // TODO(https://fxbug.dev/466294903): Explore optimizations:
82        // 1) Use uninitialized buffer to avoid zeroing.
83        // 2) Potentially use stack allocation for smaller messages by first measuring
84        //    the encoded size.
85
86        let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
87        let mut encoder = Encoder::new(
88            Cursor::new(&mut buf[..]),
89            EncoderOpts { always_log_file_line: self.config().always_log_file_line },
90        );
91        if encode(&mut encoder, previously_dropped).is_err() {
92            restore_and_increment_dropped_count();
93            return;
94        }
95
96        let end = encoder.inner().cursor();
97        let packet = &encoder.inner().get_ref()[..end];
98
99        if packet.is_empty() || self.send(packet).is_err() {
100            restore_and_increment_dropped_count();
101        }
102    }
103}
104
105pub struct IoBufferSink {
106    iob: zx::Iob,
107    num_events_dropped: AtomicU32,
108    config: SinkConfig,
109}
110
111impl Sink for IoBufferSink {
112    fn num_events_dropped(&self) -> &AtomicU32 {
113        &self.num_events_dropped
114    }
115
116    fn config(&self) -> &SinkConfig {
117        &self.config
118    }
119
120    fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
121        self.iob.write(Default::default(), 0, packet)
122    }
123}
124
125impl IoBufferSink {
126    pub fn new(iob: zx::Iob, config: SinkConfig) -> Self {
127        Self { iob, num_events_dropped: AtomicU32::new(0), config }
128    }
129}
130
131pub struct BufferedSink {
132    iob: OnceLock<zx::Iob>,
133    buffer: RwLock<Option<Buffer>>,
134    num_events_dropped: AtomicU32,
135    config: SinkConfig,
136}
137
138struct Buffer {
139    queue: Queue,
140}
141
142impl BufferedSink {
143    pub fn new(config: SinkConfig) -> Self {
144        Self {
145            iob: OnceLock::new(),
146            buffer: RwLock::new(Some(Buffer { queue: Queue::new() })),
147            num_events_dropped: AtomicU32::new(0),
148            config,
149        }
150    }
151
152    /// Spawns a thread that is responsible for setting the buffer.
153    pub fn set_buffer(&self, io_buffer: zx::Iob) {
154        // Forward the outstanding messages in two phases. In the first phase, we forward the queue
155        // without blocking other loggers.  In the second phase, we forward the queue whilst
156        // blocking other loggers, which should hopefully be for a small amount of time.
157        let queue = {
158            let mut buffer = self.buffer.write();
159            if let Some(Buffer { queue, .. }) = &mut *buffer {
160                Some(std::mem::replace(queue, Queue::new()))
161            } else {
162                None
163            }
164        };
165
166        let mut dropped = 0;
167        if let Some(mut queue) = queue {
168            self.forward_queue(&mut queue, &io_buffer, &mut dropped);
169        }
170
171        // This time, hold the lock until we've finished.
172        let mut buffer = self.buffer.write();
173        if let Some(Buffer { queue, .. }) = &mut *buffer {
174            self.forward_queue(queue, &io_buffer, &mut dropped);
175        }
176
177        self.num_events_dropped.fetch_add(dropped, Ordering::Relaxed);
178
179        self.iob.set(io_buffer).unwrap();
180        *buffer = None;
181    }
182
183    fn forward_queue(&self, queue: &mut Queue, iob: &zx::Iob, dropped: &mut u32) {
184        let mut slice = queue.as_slice();
185        while !slice.is_empty() {
186            let header = Header(u64::from_le_bytes(slice[..8].try_into().unwrap()));
187            let message_len = header.size_words() as usize * 8;
188            assert!(message_len > 0);
189            if *dropped > 0 || iob.write(Default::default(), 0, &slice[..message_len]).is_err() {
190                *dropped += 1;
191            }
192            slice = &slice[message_len..];
193        }
194    }
195}
196
197impl Sink for BufferedSink {
198    fn num_events_dropped(&self) -> &AtomicU32 {
199        &self.num_events_dropped
200    }
201
202    fn config(&self) -> &SinkConfig {
203        &self.config
204    }
205
206    fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
207        loop {
208            if let Some(iob) = self.iob.get() {
209                return iob.write(Default::default(), 0, packet);
210            }
211
212            let buffer = self.buffer.read();
213            let Some(Buffer { queue, .. }) = &*buffer else {
214                // We lost a race, loop and write with the IOBuffer.
215                continue;
216            };
217
218            return if queue.push(packet) { Ok(()) } else { Err(zx::Status::NO_SPACE) };
219        }
220    }
221}
222
223struct Queue {
224    buf: UnsafeCell<Box<[MaybeUninit<u8>]>>,
225    len: AtomicUsize,
226}
227
228// SAFETY: `Queue` is made safe below.
229unsafe impl Send for BufferedSink {}
230unsafe impl Sync for BufferedSink {}
231
232impl Queue {
233    fn new() -> Self {
234        Self { buf: UnsafeCell::new(Box::new_uninit_slice(QUEUE_SIZE)), len: AtomicUsize::new(0) }
235    }
236
237    fn capacity(&self) -> usize {
238        // SAFETY: The length is immutable.
239        unsafe { (&*self.buf.get()).len() }
240    }
241
242    /// Returns false if there is no capacity.
243    fn push(&self, data: &[u8]) -> bool {
244        let mut len = self.len.load(Ordering::Relaxed);
245        loop {
246            if len + data.len() > self.capacity() {
247                return false;
248            }
249            match self.len.compare_exchange_weak(
250                len,
251                len + data.len(),
252                Ordering::Relaxed,
253                Ordering::Relaxed,
254            ) {
255                Ok(_) => {
256                    // SAFETY: We check bounds above, and thanks to the atomic update of len
257                    // we can be sure there are no other concurrent writes to the same part
258                    // of the buffer.
259                    unsafe {
260                        (*self.buf.get())
261                            .as_mut_ptr()
262                            .cast::<u8>()
263                            .add(len)
264                            .copy_from_nonoverlapping(&data[0], data.len());
265                    }
266                    return true;
267                }
268                Err(old) => len = old,
269            }
270        }
271    }
272
273    fn as_slice(&mut self) -> &[u8] {
274        // SAFETY: This is safe because `len` is only updated above, but we have exclusive access
275        // here, so we can be certain `len` bytes have been written to above.
276        unsafe {
277            std::slice::from_raw_parts(self.buf.get_mut().as_ptr().cast(), *self.len.get_mut())
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::{increment_clock, log_every_n_seconds};
286    use diagnostics_log_encoding::parse::parse_record;
287    use diagnostics_log_encoding::{Argument, Record};
288    use diagnostics_log_types::Severity;
289    use fuchsia_sync::Mutex;
290    use futures::FutureExt;
291    use log::{debug, error, info, trace, warn};
292    use ring_buffer::{self, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
293    use std::sync::Arc;
294    use std::time::Duration;
295    use test_util::assert_gt;
296
297    const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
298
299    struct TestLogger {
300        sink: IoBufferSink,
301    }
302
303    impl TestLogger {
304        fn new(sink: IoBufferSink) -> Self {
305            Self { sink }
306        }
307    }
308
309    impl log::Log for TestLogger {
310        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
311            true
312        }
313
314        fn log(&self, record: &log::Record<'_>) {
315            if self.enabled(record.metadata()) {
316                self.sink.record_log(record);
317            }
318        }
319
320        fn flush(&self) {}
321    }
322
323    async fn init_sink(config: SinkConfig) -> ring_buffer::Reader {
324        let ring_buffer = RingBuffer::create(32 * zx::system_get_page_size() as usize);
325        let (iob, _) = ring_buffer.new_iob_writer(0).unwrap();
326
327        let sink = IoBufferSink::new(iob, config);
328        log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
329        log::set_max_level(log::LevelFilter::Info);
330
331        ring_buffer
332    }
333
334    fn arg_prefix() -> Vec<Argument<'static>> {
335        vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
336    }
337
338    #[fuchsia::test(logging = false)]
339    async fn packets_are_sent() {
340        let mut ring_buffer = init_sink(SinkConfig {
341            metatags: HashSet::from([Metatag::Target]),
342            ..SinkConfig::default()
343        })
344        .await;
345        log::set_max_level(log::LevelFilter::Trace);
346
347        let mut next_message = async move || {
348            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
349            let (record, _) = parse_record(&buf).unwrap();
350            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
351            record.into_owned()
352        };
353
354        // emit some expected messages and then we'll retrieve them for parsing
355        trace!(count = 123; "whoa this is noisy");
356        let observed_trace = next_message().await;
357        debug!(maybe = true; "don't try this at home");
358        let observed_debug = next_message().await;
359        info!("this is a message");
360        let observed_info = next_message().await;
361        warn!(reason = "just cuz"; "this is a warning");
362        let observed_warn = next_message().await;
363        error!(e = "something went pretty wrong"; "this is an error");
364        let error_line = line!() - 1;
365        let metatag = Argument::tag(TARGET);
366        let observed_error = next_message().await;
367
368        // TRACE
369        {
370            let mut expected_trace = Record {
371                timestamp: observed_trace.timestamp,
372                severity: Severity::Trace as u8,
373                arguments: arg_prefix(),
374            };
375            expected_trace.arguments.push(metatag.clone());
376            expected_trace.arguments.push(Argument::message("whoa this is noisy"));
377            expected_trace.arguments.push(Argument::new("count", 123));
378            assert_eq!(observed_trace, expected_trace);
379        }
380
381        // DEBUG
382        {
383            let mut expected_debug = Record {
384                timestamp: observed_debug.timestamp,
385                severity: Severity::Debug as u8,
386                arguments: arg_prefix(),
387            };
388            expected_debug.arguments.push(metatag.clone());
389            expected_debug.arguments.push(Argument::message("don't try this at home"));
390            expected_debug.arguments.push(Argument::new("maybe", true));
391            assert_eq!(observed_debug, expected_debug);
392        }
393
394        // INFO
395        {
396            let mut expected_info = Record {
397                timestamp: observed_info.timestamp,
398                severity: Severity::Info as u8,
399                arguments: arg_prefix(),
400            };
401            expected_info.arguments.push(metatag.clone());
402            expected_info.arguments.push(Argument::message("this is a message"));
403            assert_eq!(observed_info, expected_info);
404        }
405
406        // WARN
407        {
408            let mut expected_warn = Record {
409                timestamp: observed_warn.timestamp,
410                severity: Severity::Warn as u8,
411                arguments: arg_prefix(),
412            };
413            expected_warn.arguments.push(metatag.clone());
414            expected_warn.arguments.push(Argument::message("this is a warning"));
415            expected_warn.arguments.push(Argument::new("reason", "just cuz"));
416            assert_eq!(observed_warn, expected_warn);
417        }
418
419        // ERROR
420        {
421            let mut expected_error = Record {
422                timestamp: observed_error.timestamp,
423                severity: Severity::Error as u8,
424                arguments: arg_prefix(),
425            };
426            expected_error
427                .arguments
428                .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
429            expected_error.arguments.push(Argument::line(error_line as u64));
430            expected_error.arguments.push(metatag);
431            expected_error.arguments.push(Argument::message("this is an error"));
432            expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
433            assert_eq!(observed_error, expected_error);
434        }
435    }
436
437    #[fuchsia::test(logging = false)]
438    async fn tags_are_sent() {
439        let mut ring_buffer = init_sink(SinkConfig {
440            tags: vec!["tags_are_sent".to_string()],
441            ..SinkConfig::default()
442        })
443        .await;
444
445        let mut next_message = async move || {
446            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
447            let (record, _) = parse_record(&buf).unwrap();
448            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
449            record.into_owned()
450        };
451
452        info!("this should have a tag");
453        let observed = next_message().await;
454
455        let mut expected = Record {
456            timestamp: observed.timestamp,
457            severity: Severity::Info as u8,
458            arguments: arg_prefix(),
459        };
460        expected.arguments.push(Argument::message("this should have a tag"));
461        expected.arguments.push(Argument::tag("tags_are_sent"));
462        assert_eq!(observed, expected);
463    }
464
465    #[fuchsia::test(logging = false)]
466    async fn log_every_n_seconds_test() {
467        let mut ring_buffer = init_sink(SinkConfig { ..SinkConfig::default() }).await;
468        let mut next_message = async move || {
469            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
470            let (record, _) = parse_record(&buf).unwrap();
471            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
472            record.into_owned()
473        };
474
475        let log_fn = || {
476            log_every_n_seconds!(5, INFO, "test message");
477        };
478
479        let mut expect_message = async move || {
480            let observed = next_message().await;
481
482            let mut expected = Record {
483                timestamp: observed.timestamp,
484                severity: Severity::Info as u8,
485                arguments: arg_prefix(),
486            };
487            expected.arguments.push(Argument::message("test message"));
488            assert_eq!(observed, expected);
489        };
490
491        log_fn();
492        // First log call should result in a message.
493        expect_message().await;
494        log_fn();
495        // Subsequent log call in less than 5 seconds should NOT
496        // result in a message.
497        assert!(expect_message().now_or_never().is_none());
498        increment_clock(Duration::from_secs(5));
499
500        // Calling log_fn after 5 seconds should result in a message.
501        log_fn();
502        expect_message().await;
503    }
504
505    #[fuchsia::test(logging = false)]
506    async fn drop_count_is_tracked() {
507        let mut ring_buffer = init_sink(SinkConfig::default()).await;
508        const MESSAGE_SIZE: usize = 104;
509        const MESSAGE_SIZE_WITH_DROPS: usize = 136;
510        const NUM_DROPPED: usize = 100;
511
512        let emit_message = || info!("it's-a-me, a message-o");
513
514        // Post one message and wait for it to appear.
515        emit_message();
516        ring_buffer.read_message().await.unwrap();
517
518        // From now on, messages should get posted immediately.  Fill up the buffer.
519        let mut num_emitted = 0;
520        let buffer_space =
521            || ring_buffer.capacity() - (ring_buffer.head() - ring_buffer.tail()) as usize;
522        while buffer_space() >= RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE {
523            emit_message();
524            num_emitted += 1;
525            assert_eq!(
526                (ring_buffer.head() - ring_buffer.tail()) as usize,
527                num_emitted * (RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE),
528                "incorrect bytes stored after {} messages sent",
529                num_emitted
530            );
531        }
532
533        // drop messages
534        for _ in 0..NUM_DROPPED {
535            emit_message();
536        }
537
538        let mut drain_message = async |with_drops| {
539            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
540
541            let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
542            assert_eq!(
543                buf.len(),
544                expected_len,
545                "constant message size is used to calculate thresholds"
546            );
547
548            let (record, _) = parse_record(&buf).unwrap();
549            let mut expected_args = arg_prefix();
550
551            if with_drops {
552                expected_args.push(Argument::dropped(NUM_DROPPED as u64));
553            }
554
555            expected_args.push(Argument::message("it's-a-me, a message-o"));
556
557            assert_eq!(
558                record,
559                Record {
560                    timestamp: record.timestamp,
561                    severity: Severity::Info as u8,
562                    arguments: expected_args
563                }
564            );
565        };
566
567        // make space for a message to convey the drop count
568        // we drain two messages here because emitting the drop count adds to the size of the packet
569        // if we only drain one message then we're relying on the kernel's buffer size to satisfy
570        //   (rx_buf_max_size % MESSAGE_SIZE) > (MESSAGE_SIZE_WITH_DROPS - MESSAGE_SIZE)
571        // this is true at the time of writing of this test but we don't know whether that's a
572        // guarantee.
573        drain_message(false).await;
574        drain_message(false).await;
575        // we use this count below to drain the rest of the messages
576        num_emitted -= 2;
577        // convey the drop count, it's now at the tail of the socket
578        emit_message();
579        // drain remaining "normal" messages ahead of the drop count
580        for _ in 0..num_emitted {
581            drain_message(false).await;
582        }
583        // verify that messages were dropped
584        drain_message(true).await;
585
586        // check that we return to normal after reporting the drops
587        emit_message();
588        drain_message(false).await;
589        assert_eq!(ring_buffer.head(), ring_buffer.tail(), "must drain all messages");
590    }
591
592    #[fuchsia::test(logging = false)]
593    async fn build_record_from_log_event() {
594        let before_timestamp = zx::BootInstant::get();
595        let last_record = Arc::new(Mutex::new(None));
596        let logger = TrackerLogger::new(last_record.clone());
597        log::set_boxed_logger(Box::new(logger)).expect("set logger");
598        log::set_max_level(log::LevelFilter::Info);
599        log::info!(
600            is_a_str = "hahaha",
601            is_debug:? = PrintMe(5),
602            is_signed = -500,
603            is_unsigned = 1000u64,
604            is_bool = false;
605            "blarg this is a message"
606        );
607
608        let guard = last_record.lock();
609        let encoder = guard.as_ref().unwrap();
610        let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
611        assert_gt!(record.timestamp, before_timestamp);
612        assert_eq!(
613            record,
614            Record {
615                timestamp: record.timestamp,
616                severity: Severity::Info as u8,
617                arguments: vec![
618                    Argument::pid(PROCESS_ID.with(|p| *p)),
619                    Argument::tid(THREAD_ID.with(|p| *p)),
620                    Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
621                    Argument::message("blarg this is a message"),
622                    Argument::other("is_a_str", "hahaha"),
623                    Argument::other("is_debug", "PrintMe(5)"),
624                    Argument::other("is_signed", -500),
625                    Argument::other("is_unsigned", 1000u64),
626                    Argument::other("is_bool", false),
627                    Argument::tag("a-tag"),
628                ]
629            }
630        );
631    }
632
633    // Note the inner u32 is used in the debug implementation.
634    #[derive(Debug)]
635    struct PrintMe(#[allow(unused)] u32);
636
637    type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
638
639    struct TrackerLogger {
640        last_record: Arc<Mutex<Option<ByteEncoder>>>,
641    }
642
643    impl TrackerLogger {
644        fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
645            Self { last_record }
646        }
647    }
648
649    impl log::Log for TrackerLogger {
650        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
651            true
652        }
653
654        fn log(&self, record: &log::Record<'_>) {
655            let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
656            encoder
657                .write_event(WriteEventParams {
658                    event: LogEvent::new(record),
659                    tags: &["a-tag"],
660                    metatags: [Metatag::Target].iter(),
661                    pid: PROCESS_ID.with(|p| *p),
662                    tid: THREAD_ID.with(|t| *t),
663                    dropped: 0,
664                })
665                .expect("wrote event");
666            let mut last_record = self.last_record.lock();
667            last_record.replace(encoder);
668        }
669
670        fn flush(&self) {}
671    }
672
673    #[fuchsia::test]
674    async fn buffered_sink() {
675        const TAG: &str = "foo";
676        let sink = Arc::new(BufferedSink::new(SinkConfig {
677            tags: vec![TAG.to_string()],
678            ..Default::default()
679        }));
680        const MSG: &str = "The quick brown fox jumped over the lazy dog.";
681        const COUNT: usize = 1000;
682        // Log from a different thread to test races.
683        {
684            let sink = Arc::clone(&sink);
685            std::thread::spawn(move || {
686                for i in 0..COUNT {
687                    sink.record_log(
688                        &log::Record::builder()
689                            .level(log::Level::Warn)
690                            .args(format_args!("{i}: {MSG}"))
691                            .build(),
692                    );
693                }
694            });
695        }
696        const MAX_RECORD_SIZE: usize = 176;
697        // Include a 25% buffer.
698        let mut ring_buffer = RingBuffer::create(
699            (MAX_RECORD_SIZE * COUNT * 5 / 4).next_multiple_of(zx::system_get_page_size() as usize),
700        );
701        sink.set_buffer(ring_buffer.new_iob_writer(0).unwrap().0);
702        // Now check that all the messages got written.
703        for i in 0..COUNT {
704            let (_tag, msg) = ring_buffer.read_message().await.unwrap();
705            let (record, _) = parse_record(&msg).unwrap();
706            assert_eq!(record.severity, Severity::Warn as u8);
707            let mut found = 0;
708            for arg in record.arguments {
709                match arg {
710                    Argument::Message(msg) => {
711                        assert_eq!(msg, format!("{i}: {MSG}"));
712                        assert_eq!(found & 1, 0);
713                        found |= 1;
714                    }
715                    Argument::Tag(tag) => {
716                        assert_eq!(tag, TAG);
717                        assert_eq!(found & 2, 0);
718                        found |= 2;
719                    }
720                    _ => {}
721                }
722            }
723            assert_eq!(found, 3);
724        }
725    }
726}