diagnostics_log/fuchsia/
sink.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use diagnostics_log_encoding::encode::{
5    Encoder, EncoderOpts, EncodingError, LogEvent, MutableBuffer, TestRecord, WriteEventParams,
6};
7use diagnostics_log_encoding::{Header, Metatag};
8use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
9use fuchsia_runtime as rt;
10use fuchsia_sync::RwLock;
11use std::cell::UnsafeCell;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::mem::MaybeUninit;
15use std::sync::OnceLock;
16use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
17use zx::{self as zx, AsHandleRef};
18
19// This is the amount of data that can be buffered by the BufferedPublisher before messages are
20// dropped.
21const QUEUE_SIZE: usize = 256 * 1024;
22
23#[derive(Default)]
24pub(crate) struct SinkConfig {
25    pub(crate) metatags: HashSet<Metatag>,
26    pub(crate) tags: Vec<String>,
27    pub(crate) always_log_file_line: bool,
28}
29
30thread_local! {
31    static PROCESS_ID: zx::Koid = rt::process_self()
32        .get_koid()
33        .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
34    static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
35        thread.get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
36    });
37}
38
39pub trait Sink {
40    fn num_events_dropped(&self) -> &AtomicU32;
41    fn config(&self) -> &SinkConfig;
42    fn send(&self, packet: &[u8]) -> Result<(), zx::Status>;
43
44    fn event_for_testing(&self, record: TestRecord<'_>) {
45        self.encode_and_send(move |encoder, previously_dropped| {
46            encoder.write_event(WriteEventParams {
47                event: record,
48                tags: &self.config().tags,
49                metatags: std::iter::empty(),
50                pid: PROCESS_ID.with(|p| *p),
51                tid: THREAD_ID.with(|t| *t),
52                dropped: previously_dropped.into(),
53            })
54        });
55    }
56
57    fn record_log(&self, record: &log::Record<'_>) {
58        self.encode_and_send(|encoder, previously_dropped| {
59            encoder.write_event(WriteEventParams {
60                event: LogEvent::new(record),
61                tags: &self.config().tags,
62                metatags: self.config().metatags.iter(),
63                pid: PROCESS_ID.with(|p| *p),
64                tid: THREAD_ID.with(|t| *t),
65                dropped: previously_dropped.into(),
66            })
67        });
68    }
69
70    #[inline]
71    fn encode_and_send(
72        &self,
73        encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
74    ) {
75        let ordering = Ordering::Relaxed;
76        let num_events_dropped = self.num_events_dropped();
77        let previously_dropped = num_events_dropped.swap(0, ordering);
78        let restore_and_increment_dropped_count = || {
79            num_events_dropped.fetch_add(previously_dropped + 1, ordering);
80        };
81
82        // TODO(https://fxbug.dev/466294903): Explore optimizations:
83        // 1) Use uninitialized buffer to avoid zeroing.
84        // 2) Potentially use stack allocation for smaller messages by first measuring
85        //    the encoded size.
86
87        let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
88        let mut encoder = Encoder::new(
89            Cursor::new(&mut buf[..]),
90            EncoderOpts { always_log_file_line: self.config().always_log_file_line },
91        );
92        if encode(&mut encoder, previously_dropped).is_err() {
93            restore_and_increment_dropped_count();
94            return;
95        }
96
97        let end = encoder.inner().cursor();
98        let packet = &encoder.inner().get_ref()[..end];
99
100        if packet.is_empty() || self.send(packet).is_err() {
101            restore_and_increment_dropped_count();
102        }
103    }
104}
105
106pub struct IoBufferSink {
107    iob: zx::Iob,
108    num_events_dropped: AtomicU32,
109    config: SinkConfig,
110}
111
112impl Sink for IoBufferSink {
113    fn num_events_dropped(&self) -> &AtomicU32 {
114        &self.num_events_dropped
115    }
116
117    fn config(&self) -> &SinkConfig {
118        &self.config
119    }
120
121    fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
122        self.iob.write(Default::default(), 0, packet)
123    }
124}
125
126impl IoBufferSink {
127    pub fn new(iob: zx::Iob, config: SinkConfig) -> Self {
128        Self { iob, num_events_dropped: AtomicU32::new(0), config }
129    }
130}
131
132pub struct BufferedSink {
133    iob: OnceLock<zx::Iob>,
134    buffer: RwLock<Option<Buffer>>,
135    num_events_dropped: AtomicU32,
136    config: SinkConfig,
137}
138
139struct Buffer {
140    queue: Queue,
141}
142
143impl BufferedSink {
144    pub fn new(config: SinkConfig) -> Self {
145        Self {
146            iob: OnceLock::new(),
147            buffer: RwLock::new(Some(Buffer { queue: Queue::new() })),
148            num_events_dropped: AtomicU32::new(0),
149            config,
150        }
151    }
152
153    /// Spawns a thread that is responsible for setting the buffer.
154    pub fn set_buffer(&self, io_buffer: zx::Iob) {
155        // Forward the outstanding messages in two phases. In the first phase, we forward the queue
156        // without blocking other loggers.  In the second phase, we forward the queue whilst
157        // blocking other loggers, which should hopefully be for a small amount of time.
158        let queue = {
159            let mut buffer = self.buffer.write();
160            if let Some(Buffer { queue, .. }) = &mut *buffer {
161                Some(std::mem::replace(queue, Queue::new()))
162            } else {
163                None
164            }
165        };
166
167        let mut dropped = 0;
168        if let Some(mut queue) = queue {
169            self.forward_queue(&mut queue, &io_buffer, &mut dropped);
170        }
171
172        // This time, hold the lock until we've finished.
173        let mut buffer = self.buffer.write();
174        if let Some(Buffer { queue, .. }) = &mut *buffer {
175            self.forward_queue(queue, &io_buffer, &mut dropped);
176        }
177
178        self.num_events_dropped.fetch_add(dropped, Ordering::Relaxed);
179
180        self.iob.set(io_buffer).unwrap();
181        *buffer = None;
182    }
183
184    fn forward_queue(&self, queue: &mut Queue, iob: &zx::Iob, dropped: &mut u32) {
185        let mut slice = queue.as_slice();
186        while !slice.is_empty() {
187            let header = Header(u64::from_le_bytes(slice[..8].try_into().unwrap()));
188            let message_len = header.size_words() as usize * 8;
189            assert!(message_len > 0);
190            if *dropped > 0 || iob.write(Default::default(), 0, &slice[..message_len]).is_err() {
191                *dropped += 1;
192            }
193            slice = &slice[message_len..];
194        }
195    }
196}
197
198impl Sink for BufferedSink {
199    fn num_events_dropped(&self) -> &AtomicU32 {
200        &self.num_events_dropped
201    }
202
203    fn config(&self) -> &SinkConfig {
204        &self.config
205    }
206
207    fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
208        loop {
209            if let Some(iob) = self.iob.get() {
210                return iob.write(Default::default(), 0, packet);
211            }
212
213            let buffer = self.buffer.read();
214            let Some(Buffer { queue, .. }) = &*buffer else {
215                // We lost a race, loop and write with the IOBuffer.
216                continue;
217            };
218
219            return if queue.push(packet) { Ok(()) } else { Err(zx::Status::NO_SPACE) };
220        }
221    }
222}
223
224struct Queue {
225    buf: UnsafeCell<Box<[MaybeUninit<u8>]>>,
226    len: AtomicUsize,
227}
228
229// SAFETY: `Queue` is made safe below.
230unsafe impl Send for BufferedSink {}
231unsafe impl Sync for BufferedSink {}
232
233impl Queue {
234    fn new() -> Self {
235        Self { buf: UnsafeCell::new(Box::new_uninit_slice(QUEUE_SIZE)), len: AtomicUsize::new(0) }
236    }
237
238    fn capacity(&self) -> usize {
239        // SAFETY: The length is immutable.
240        unsafe { (&*self.buf.get()).len() }
241    }
242
243    /// Returns false if there is no capacity.
244    fn push(&self, data: &[u8]) -> bool {
245        let mut len = self.len.load(Ordering::Relaxed);
246        loop {
247            if len + data.len() > self.capacity() {
248                return false;
249            }
250            match self.len.compare_exchange_weak(
251                len,
252                len + data.len(),
253                Ordering::Relaxed,
254                Ordering::Relaxed,
255            ) {
256                Ok(_) => {
257                    // SAFETY: We check bounds above, and thanks to the atomic update of len
258                    // we can be sure there are no other concurrent writes to the same part
259                    // of the buffer.
260                    unsafe {
261                        (*self.buf.get())
262                            .as_mut_ptr()
263                            .cast::<u8>()
264                            .add(len)
265                            .copy_from_nonoverlapping(&data[0], data.len());
266                    }
267                    return true;
268                }
269                Err(old) => len = old,
270            }
271        }
272    }
273
274    fn as_slice(&mut self) -> &[u8] {
275        // SAFETY: This is safe because `len` is only updated above, but we have exclusive access
276        // here, so we can be certain `len` bytes have been written to above.
277        unsafe {
278            std::slice::from_raw_parts(self.buf.get_mut().as_ptr().cast(), *self.len.get_mut())
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::{increment_clock, log_every_n_seconds};
287    use diagnostics_log_encoding::parse::parse_record;
288    use diagnostics_log_encoding::{Argument, Record};
289    use diagnostics_log_types::Severity;
290    use fuchsia_sync::Mutex;
291    use futures::FutureExt;
292    use log::{debug, error, info, trace, warn};
293    use ring_buffer::{self, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
294    use std::sync::Arc;
295    use std::time::Duration;
296    use test_util::assert_gt;
297
298    const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
299
300    struct TestLogger {
301        sink: IoBufferSink,
302    }
303
304    impl TestLogger {
305        fn new(sink: IoBufferSink) -> Self {
306            Self { sink }
307        }
308    }
309
310    impl log::Log for TestLogger {
311        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
312            true
313        }
314
315        fn log(&self, record: &log::Record<'_>) {
316            if self.enabled(record.metadata()) {
317                self.sink.record_log(record);
318            }
319        }
320
321        fn flush(&self) {}
322    }
323
324    async fn init_sink(config: SinkConfig) -> ring_buffer::Reader {
325        let ring_buffer = RingBuffer::create(32 * zx::system_get_page_size() as usize);
326        let (iob, _) = ring_buffer.new_iob_writer(0).unwrap();
327
328        let sink = IoBufferSink::new(iob, config);
329        log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
330        log::set_max_level(log::LevelFilter::Info);
331
332        ring_buffer
333    }
334
335    fn arg_prefix() -> Vec<Argument<'static>> {
336        vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
337    }
338
339    #[fuchsia::test(logging = false)]
340    async fn packets_are_sent() {
341        let mut ring_buffer = init_sink(SinkConfig {
342            metatags: HashSet::from([Metatag::Target]),
343            ..SinkConfig::default()
344        })
345        .await;
346        log::set_max_level(log::LevelFilter::Trace);
347
348        let mut next_message = async move || {
349            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
350            let (record, _) = parse_record(&buf).unwrap();
351            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
352            record.into_owned()
353        };
354
355        // emit some expected messages and then we'll retrieve them for parsing
356        trace!(count = 123; "whoa this is noisy");
357        let observed_trace = next_message().await;
358        debug!(maybe = true; "don't try this at home");
359        let observed_debug = next_message().await;
360        info!("this is a message");
361        let observed_info = next_message().await;
362        warn!(reason = "just cuz"; "this is a warning");
363        let observed_warn = next_message().await;
364        error!(e = "something went pretty wrong"; "this is an error");
365        let error_line = line!() - 1;
366        let metatag = Argument::tag(TARGET);
367        let observed_error = next_message().await;
368
369        // TRACE
370        {
371            let mut expected_trace = Record {
372                timestamp: observed_trace.timestamp,
373                severity: Severity::Trace as u8,
374                arguments: arg_prefix(),
375            };
376            expected_trace.arguments.push(metatag.clone());
377            expected_trace.arguments.push(Argument::message("whoa this is noisy"));
378            expected_trace.arguments.push(Argument::new("count", 123));
379            assert_eq!(observed_trace, expected_trace);
380        }
381
382        // DEBUG
383        {
384            let mut expected_debug = Record {
385                timestamp: observed_debug.timestamp,
386                severity: Severity::Debug as u8,
387                arguments: arg_prefix(),
388            };
389            expected_debug.arguments.push(metatag.clone());
390            expected_debug.arguments.push(Argument::message("don't try this at home"));
391            expected_debug.arguments.push(Argument::new("maybe", true));
392            assert_eq!(observed_debug, expected_debug);
393        }
394
395        // INFO
396        {
397            let mut expected_info = Record {
398                timestamp: observed_info.timestamp,
399                severity: Severity::Info as u8,
400                arguments: arg_prefix(),
401            };
402            expected_info.arguments.push(metatag.clone());
403            expected_info.arguments.push(Argument::message("this is a message"));
404            assert_eq!(observed_info, expected_info);
405        }
406
407        // WARN
408        {
409            let mut expected_warn = Record {
410                timestamp: observed_warn.timestamp,
411                severity: Severity::Warn as u8,
412                arguments: arg_prefix(),
413            };
414            expected_warn.arguments.push(metatag.clone());
415            expected_warn.arguments.push(Argument::message("this is a warning"));
416            expected_warn.arguments.push(Argument::new("reason", "just cuz"));
417            assert_eq!(observed_warn, expected_warn);
418        }
419
420        // ERROR
421        {
422            let mut expected_error = Record {
423                timestamp: observed_error.timestamp,
424                severity: Severity::Error as u8,
425                arguments: arg_prefix(),
426            };
427            expected_error
428                .arguments
429                .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
430            expected_error.arguments.push(Argument::line(error_line as u64));
431            expected_error.arguments.push(metatag);
432            expected_error.arguments.push(Argument::message("this is an error"));
433            expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
434            assert_eq!(observed_error, expected_error);
435        }
436    }
437
438    #[fuchsia::test(logging = false)]
439    async fn tags_are_sent() {
440        let mut ring_buffer = init_sink(SinkConfig {
441            tags: vec!["tags_are_sent".to_string()],
442            ..SinkConfig::default()
443        })
444        .await;
445
446        let mut next_message = async move || {
447            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
448            let (record, _) = parse_record(&buf).unwrap();
449            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
450            record.into_owned()
451        };
452
453        info!("this should have a tag");
454        let observed = next_message().await;
455
456        let mut expected = Record {
457            timestamp: observed.timestamp,
458            severity: Severity::Info as u8,
459            arguments: arg_prefix(),
460        };
461        expected.arguments.push(Argument::message("this should have a tag"));
462        expected.arguments.push(Argument::tag("tags_are_sent"));
463        assert_eq!(observed, expected);
464    }
465
466    #[fuchsia::test(logging = false)]
467    async fn log_every_n_seconds_test() {
468        let mut ring_buffer = init_sink(SinkConfig { ..SinkConfig::default() }).await;
469        let mut next_message = async move || {
470            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
471            let (record, _) = parse_record(&buf).unwrap();
472            assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
473            record.into_owned()
474        };
475
476        let log_fn = || {
477            log_every_n_seconds!(5, INFO, "test message");
478        };
479
480        let mut expect_message = async move || {
481            let observed = next_message().await;
482
483            let mut expected = Record {
484                timestamp: observed.timestamp,
485                severity: Severity::Info as u8,
486                arguments: arg_prefix(),
487            };
488            expected.arguments.push(Argument::message("test message"));
489            assert_eq!(observed, expected);
490        };
491
492        log_fn();
493        // First log call should result in a message.
494        expect_message().await;
495        log_fn();
496        // Subsequent log call in less than 5 seconds should NOT
497        // result in a message.
498        assert!(expect_message().now_or_never().is_none());
499        increment_clock(Duration::from_secs(5));
500
501        // Calling log_fn after 5 seconds should result in a message.
502        log_fn();
503        expect_message().await;
504    }
505
506    #[fuchsia::test(logging = false)]
507    async fn drop_count_is_tracked() {
508        let mut ring_buffer = init_sink(SinkConfig::default()).await;
509        const MESSAGE_SIZE: usize = 104;
510        const MESSAGE_SIZE_WITH_DROPS: usize = 136;
511        const NUM_DROPPED: usize = 100;
512
513        let emit_message = || info!("it's-a-me, a message-o");
514
515        // Post one message and wait for it to appear.
516        emit_message();
517        ring_buffer.read_message().await.unwrap();
518
519        // From now on, messages should get posted immediately.  Fill up the buffer.
520        let mut num_emitted = 0;
521        let buffer_space =
522            || ring_buffer.capacity() - (ring_buffer.head() - ring_buffer.tail()) as usize;
523        while buffer_space() >= RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE {
524            emit_message();
525            num_emitted += 1;
526            assert_eq!(
527                (ring_buffer.head() - ring_buffer.tail()) as usize,
528                num_emitted * (RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE),
529                "incorrect bytes stored after {} messages sent",
530                num_emitted
531            );
532        }
533
534        // drop messages
535        for _ in 0..NUM_DROPPED {
536            emit_message();
537        }
538
539        let mut drain_message = async |with_drops| {
540            let (_tag, buf) = ring_buffer.read_message().await.unwrap();
541
542            let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
543            assert_eq!(
544                buf.len(),
545                expected_len,
546                "constant message size is used to calculate thresholds"
547            );
548
549            let (record, _) = parse_record(&buf).unwrap();
550            let mut expected_args = arg_prefix();
551
552            if with_drops {
553                expected_args.push(Argument::dropped(NUM_DROPPED as u64));
554            }
555
556            expected_args.push(Argument::message("it's-a-me, a message-o"));
557
558            assert_eq!(
559                record,
560                Record {
561                    timestamp: record.timestamp,
562                    severity: Severity::Info as u8,
563                    arguments: expected_args
564                }
565            );
566        };
567
568        // make space for a message to convey the drop count
569        // we drain two messages here because emitting the drop count adds to the size of the packet
570        // if we only drain one message then we're relying on the kernel's buffer size to satisfy
571        //   (rx_buf_max_size % MESSAGE_SIZE) > (MESSAGE_SIZE_WITH_DROPS - MESSAGE_SIZE)
572        // this is true at the time of writing of this test but we don't know whether that's a
573        // guarantee.
574        drain_message(false).await;
575        drain_message(false).await;
576        // we use this count below to drain the rest of the messages
577        num_emitted -= 2;
578        // convey the drop count, it's now at the tail of the socket
579        emit_message();
580        // drain remaining "normal" messages ahead of the drop count
581        for _ in 0..num_emitted {
582            drain_message(false).await;
583        }
584        // verify that messages were dropped
585        drain_message(true).await;
586
587        // check that we return to normal after reporting the drops
588        emit_message();
589        drain_message(false).await;
590        assert_eq!(ring_buffer.head(), ring_buffer.tail(), "must drain all messages");
591    }
592
593    #[fuchsia::test(logging = false)]
594    async fn build_record_from_log_event() {
595        let before_timestamp = zx::BootInstant::get();
596        let last_record = Arc::new(Mutex::new(None));
597        let logger = TrackerLogger::new(last_record.clone());
598        log::set_boxed_logger(Box::new(logger)).expect("set logger");
599        log::set_max_level(log::LevelFilter::Info);
600        log::info!(
601            is_a_str = "hahaha",
602            is_debug:? = PrintMe(5),
603            is_signed = -500,
604            is_unsigned = 1000u64,
605            is_bool = false;
606            "blarg this is a message"
607        );
608
609        let guard = last_record.lock();
610        let encoder = guard.as_ref().unwrap();
611        let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
612        assert_gt!(record.timestamp, before_timestamp);
613        assert_eq!(
614            record,
615            Record {
616                timestamp: record.timestamp,
617                severity: Severity::Info as u8,
618                arguments: vec![
619                    Argument::pid(PROCESS_ID.with(|p| *p)),
620                    Argument::tid(THREAD_ID.with(|p| *p)),
621                    Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
622                    Argument::message("blarg this is a message"),
623                    Argument::other("is_a_str", "hahaha"),
624                    Argument::other("is_debug", "PrintMe(5)"),
625                    Argument::other("is_signed", -500),
626                    Argument::other("is_unsigned", 1000u64),
627                    Argument::other("is_bool", false),
628                    Argument::tag("a-tag"),
629                ]
630            }
631        );
632    }
633
634    // Note the inner u32 is used in the debug implementation.
635    #[derive(Debug)]
636    struct PrintMe(#[allow(unused)] u32);
637
638    type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
639
640    struct TrackerLogger {
641        last_record: Arc<Mutex<Option<ByteEncoder>>>,
642    }
643
644    impl TrackerLogger {
645        fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
646            Self { last_record }
647        }
648    }
649
650    impl log::Log for TrackerLogger {
651        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
652            true
653        }
654
655        fn log(&self, record: &log::Record<'_>) {
656            let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
657            encoder
658                .write_event(WriteEventParams {
659                    event: LogEvent::new(record),
660                    tags: &["a-tag"],
661                    metatags: [Metatag::Target].iter(),
662                    pid: PROCESS_ID.with(|p| *p),
663                    tid: THREAD_ID.with(|t| *t),
664                    dropped: 0,
665                })
666                .expect("wrote event");
667            let mut last_record = self.last_record.lock();
668            last_record.replace(encoder);
669        }
670
671        fn flush(&self) {}
672    }
673
674    #[fuchsia::test]
675    async fn buffered_sink() {
676        const TAG: &str = "foo";
677        let sink = Arc::new(BufferedSink::new(SinkConfig {
678            tags: vec![TAG.to_string()],
679            ..Default::default()
680        }));
681        const MSG: &str = "The quick brown fox jumped over the lazy dog.";
682        const COUNT: usize = 1000;
683        // Log from a different thread to test races.
684        {
685            let sink = Arc::clone(&sink);
686            std::thread::spawn(move || {
687                for i in 0..COUNT {
688                    sink.record_log(
689                        &log::Record::builder()
690                            .level(log::Level::Warn)
691                            .args(format_args!("{i}: {MSG}"))
692                            .build(),
693                    );
694                }
695            });
696        }
697        const MAX_RECORD_SIZE: usize = 176;
698        // Include a 25% buffer.
699        let mut ring_buffer = RingBuffer::create(
700            (MAX_RECORD_SIZE * COUNT * 5 / 4).next_multiple_of(zx::system_get_page_size() as usize),
701        );
702        sink.set_buffer(ring_buffer.new_iob_writer(0).unwrap().0);
703        // Now check that all the messages got written.
704        for i in 0..COUNT {
705            let (_tag, msg) = ring_buffer.read_message().await.unwrap();
706            let (record, _) = parse_record(&msg).unwrap();
707            assert_eq!(record.severity, Severity::Warn as u8);
708            let mut found = 0;
709            for arg in record.arguments {
710                match arg {
711                    Argument::Message(msg) => {
712                        assert_eq!(msg, format!("{i}: {MSG}"));
713                        assert_eq!(found & 1, 0);
714                        found |= 1;
715                    }
716                    Argument::Tag(tag) => {
717                        assert_eq!(tag, TAG);
718                        assert_eq!(found & 2, 0);
719                        found |= 2;
720                    }
721                    _ => {}
722                }
723            }
724            assert_eq!(found, 3);
725        }
726    }
727}