1use diagnostics_log_encoding::encode::{
5 Encoder, EncoderOpts, EncodingError, LogEvent, MutableBuffer, TestRecord, WriteEventParams,
6};
7use diagnostics_log_encoding::{Header, Metatag};
8use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
9use fuchsia_runtime as rt;
10use fuchsia_sync::RwLock;
11use std::cell::UnsafeCell;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::mem::MaybeUninit;
15use std::sync::OnceLock;
16use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
17
18const QUEUE_SIZE: usize = 256 * 1024;
21
22#[derive(Default)]
23pub(crate) struct SinkConfig {
24 pub(crate) metatags: HashSet<Metatag>,
25 pub(crate) tags: Vec<String>,
26 pub(crate) always_log_file_line: bool,
27}
28
29thread_local! {
30 static PROCESS_ID: zx::Koid = rt::process_self()
31 .koid()
32 .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
33 static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
34 thread.koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
35 });
36}
37
38pub trait Sink {
39 fn num_events_dropped(&self) -> &AtomicU32;
40 fn config(&self) -> &SinkConfig;
41 fn send(&self, packet: &[u8]) -> Result<(), zx::Status>;
42
43 fn event_for_testing(&self, record: TestRecord<'_>) {
44 self.encode_and_send(move |encoder, previously_dropped| {
45 encoder.write_event(WriteEventParams {
46 event: record,
47 tags: &self.config().tags,
48 metatags: std::iter::empty(),
49 pid: PROCESS_ID.with(|p| *p),
50 tid: THREAD_ID.with(|t| *t),
51 dropped: previously_dropped.into(),
52 })
53 });
54 }
55
56 fn record_log(&self, record: &log::Record<'_>) {
57 self.encode_and_send(|encoder, previously_dropped| {
58 encoder.write_event(WriteEventParams {
59 event: LogEvent::new(record),
60 tags: &self.config().tags,
61 metatags: self.config().metatags.iter(),
62 pid: PROCESS_ID.with(|p| *p),
63 tid: THREAD_ID.with(|t| *t),
64 dropped: previously_dropped.into(),
65 })
66 });
67 }
68
69 #[inline]
70 fn encode_and_send(
71 &self,
72 encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
73 ) {
74 let ordering = Ordering::Relaxed;
75 let num_events_dropped = self.num_events_dropped();
76 let previously_dropped = num_events_dropped.swap(0, ordering);
77 let restore_and_increment_dropped_count = || {
78 num_events_dropped.fetch_add(previously_dropped + 1, ordering);
79 };
80
81 let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
87 let mut encoder = Encoder::new(
88 Cursor::new(&mut buf[..]),
89 EncoderOpts { always_log_file_line: self.config().always_log_file_line },
90 );
91 if encode(&mut encoder, previously_dropped).is_err() {
92 restore_and_increment_dropped_count();
93 return;
94 }
95
96 let end = encoder.inner().cursor();
97 let packet = &encoder.inner().get_ref()[..end];
98
99 if packet.is_empty() || self.send(packet).is_err() {
100 restore_and_increment_dropped_count();
101 }
102 }
103}
104
105pub struct IoBufferSink {
106 iob: zx::Iob,
107 num_events_dropped: AtomicU32,
108 config: SinkConfig,
109}
110
111impl Sink for IoBufferSink {
112 fn num_events_dropped(&self) -> &AtomicU32 {
113 &self.num_events_dropped
114 }
115
116 fn config(&self) -> &SinkConfig {
117 &self.config
118 }
119
120 fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
121 self.iob.write(Default::default(), 0, packet)
122 }
123}
124
125impl IoBufferSink {
126 pub fn new(iob: zx::Iob, config: SinkConfig) -> Self {
127 Self { iob, num_events_dropped: AtomicU32::new(0), config }
128 }
129}
130
131pub struct BufferedSink {
132 iob: OnceLock<zx::Iob>,
133 buffer: RwLock<Option<Buffer>>,
134 num_events_dropped: AtomicU32,
135 config: SinkConfig,
136}
137
138struct Buffer {
139 queue: Queue,
140}
141
142impl BufferedSink {
143 pub fn new(config: SinkConfig) -> Self {
144 Self {
145 iob: OnceLock::new(),
146 buffer: RwLock::new(Some(Buffer { queue: Queue::new() })),
147 num_events_dropped: AtomicU32::new(0),
148 config,
149 }
150 }
151
152 pub fn set_buffer(&self, io_buffer: zx::Iob) {
154 let queue = {
158 let mut buffer = self.buffer.write();
159 if let Some(Buffer { queue, .. }) = &mut *buffer {
160 Some(std::mem::replace(queue, Queue::new()))
161 } else {
162 None
163 }
164 };
165
166 let mut dropped = 0;
167 if let Some(mut queue) = queue {
168 self.forward_queue(&mut queue, &io_buffer, &mut dropped);
169 }
170
171 let mut buffer = self.buffer.write();
173 if let Some(Buffer { queue, .. }) = &mut *buffer {
174 self.forward_queue(queue, &io_buffer, &mut dropped);
175 }
176
177 self.num_events_dropped.fetch_add(dropped, Ordering::Relaxed);
178
179 self.iob.set(io_buffer).unwrap();
180 *buffer = None;
181 }
182
183 fn forward_queue(&self, queue: &mut Queue, iob: &zx::Iob, dropped: &mut u32) {
184 let mut slice = queue.as_slice();
185 while !slice.is_empty() {
186 let header = Header(u64::from_le_bytes(slice[..8].try_into().unwrap()));
187 let message_len = header.size_words() as usize * 8;
188 assert!(message_len > 0);
189 if *dropped > 0 || iob.write(Default::default(), 0, &slice[..message_len]).is_err() {
190 *dropped += 1;
191 }
192 slice = &slice[message_len..];
193 }
194 }
195}
196
197impl Sink for BufferedSink {
198 fn num_events_dropped(&self) -> &AtomicU32 {
199 &self.num_events_dropped
200 }
201
202 fn config(&self) -> &SinkConfig {
203 &self.config
204 }
205
206 fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
207 loop {
208 if let Some(iob) = self.iob.get() {
209 return iob.write(Default::default(), 0, packet);
210 }
211
212 let buffer = self.buffer.read();
213 let Some(Buffer { queue, .. }) = &*buffer else {
214 continue;
216 };
217
218 return if queue.push(packet) { Ok(()) } else { Err(zx::Status::NO_SPACE) };
219 }
220 }
221}
222
223struct Queue {
224 buf: UnsafeCell<Box<[MaybeUninit<u8>]>>,
225 len: AtomicUsize,
226}
227
228unsafe impl Send for BufferedSink {}
230unsafe impl Sync for BufferedSink {}
231
232impl Queue {
233 fn new() -> Self {
234 Self { buf: UnsafeCell::new(Box::new_uninit_slice(QUEUE_SIZE)), len: AtomicUsize::new(0) }
235 }
236
237 fn capacity(&self) -> usize {
238 unsafe { (&*self.buf.get()).len() }
240 }
241
242 fn push(&self, data: &[u8]) -> bool {
244 let mut len = self.len.load(Ordering::Relaxed);
245 loop {
246 if len + data.len() > self.capacity() {
247 return false;
248 }
249 match self.len.compare_exchange_weak(
250 len,
251 len + data.len(),
252 Ordering::Relaxed,
253 Ordering::Relaxed,
254 ) {
255 Ok(_) => {
256 unsafe {
260 (*self.buf.get())
261 .as_mut_ptr()
262 .cast::<u8>()
263 .add(len)
264 .copy_from_nonoverlapping(&data[0], data.len());
265 }
266 return true;
267 }
268 Err(old) => len = old,
269 }
270 }
271 }
272
273 fn as_slice(&mut self) -> &[u8] {
274 unsafe {
277 std::slice::from_raw_parts(self.buf.get_mut().as_ptr().cast(), *self.len.get_mut())
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::{increment_clock, log_every_n_seconds};
286 use diagnostics_log_encoding::parse::parse_record;
287 use diagnostics_log_encoding::{Argument, Record};
288 use diagnostics_log_types::Severity;
289 use fuchsia_sync::Mutex;
290 use futures::FutureExt;
291 use log::{debug, error, info, trace, warn};
292 use ring_buffer::{self, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
293 use std::sync::Arc;
294 use std::time::Duration;
295 use test_util::assert_gt;
296
297 const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
298
299 struct TestLogger {
300 sink: IoBufferSink,
301 }
302
303 impl TestLogger {
304 fn new(sink: IoBufferSink) -> Self {
305 Self { sink }
306 }
307 }
308
309 impl log::Log for TestLogger {
310 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
311 true
312 }
313
314 fn log(&self, record: &log::Record<'_>) {
315 if self.enabled(record.metadata()) {
316 self.sink.record_log(record);
317 }
318 }
319
320 fn flush(&self) {}
321 }
322
323 async fn init_sink(config: SinkConfig) -> ring_buffer::Reader {
324 let ring_buffer = RingBuffer::create(32 * zx::system_get_page_size() as usize);
325 let (iob, _) = ring_buffer.new_iob_writer(0).unwrap();
326
327 let sink = IoBufferSink::new(iob, config);
328 log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
329 log::set_max_level(log::LevelFilter::Info);
330
331 ring_buffer
332 }
333
334 fn arg_prefix() -> Vec<Argument<'static>> {
335 vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
336 }
337
338 #[fuchsia::test(logging = false)]
339 async fn packets_are_sent() {
340 let mut ring_buffer = init_sink(SinkConfig {
341 metatags: HashSet::from([Metatag::Target]),
342 ..SinkConfig::default()
343 })
344 .await;
345 log::set_max_level(log::LevelFilter::Trace);
346
347 let mut next_message = async move || {
348 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
349 let (record, _) = parse_record(&buf).unwrap();
350 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
351 record.into_owned()
352 };
353
354 trace!(count = 123; "whoa this is noisy");
356 let observed_trace = next_message().await;
357 debug!(maybe = true; "don't try this at home");
358 let observed_debug = next_message().await;
359 info!("this is a message");
360 let observed_info = next_message().await;
361 warn!(reason = "just cuz"; "this is a warning");
362 let observed_warn = next_message().await;
363 error!(e = "something went pretty wrong"; "this is an error");
364 let error_line = line!() - 1;
365 let metatag = Argument::tag(TARGET);
366 let observed_error = next_message().await;
367
368 {
370 let mut expected_trace = Record {
371 timestamp: observed_trace.timestamp,
372 severity: Severity::Trace as u8,
373 arguments: arg_prefix(),
374 };
375 expected_trace.arguments.push(metatag.clone());
376 expected_trace.arguments.push(Argument::message("whoa this is noisy"));
377 expected_trace.arguments.push(Argument::new("count", 123));
378 assert_eq!(observed_trace, expected_trace);
379 }
380
381 {
383 let mut expected_debug = Record {
384 timestamp: observed_debug.timestamp,
385 severity: Severity::Debug as u8,
386 arguments: arg_prefix(),
387 };
388 expected_debug.arguments.push(metatag.clone());
389 expected_debug.arguments.push(Argument::message("don't try this at home"));
390 expected_debug.arguments.push(Argument::new("maybe", true));
391 assert_eq!(observed_debug, expected_debug);
392 }
393
394 {
396 let mut expected_info = Record {
397 timestamp: observed_info.timestamp,
398 severity: Severity::Info as u8,
399 arguments: arg_prefix(),
400 };
401 expected_info.arguments.push(metatag.clone());
402 expected_info.arguments.push(Argument::message("this is a message"));
403 assert_eq!(observed_info, expected_info);
404 }
405
406 {
408 let mut expected_warn = Record {
409 timestamp: observed_warn.timestamp,
410 severity: Severity::Warn as u8,
411 arguments: arg_prefix(),
412 };
413 expected_warn.arguments.push(metatag.clone());
414 expected_warn.arguments.push(Argument::message("this is a warning"));
415 expected_warn.arguments.push(Argument::new("reason", "just cuz"));
416 assert_eq!(observed_warn, expected_warn);
417 }
418
419 {
421 let mut expected_error = Record {
422 timestamp: observed_error.timestamp,
423 severity: Severity::Error as u8,
424 arguments: arg_prefix(),
425 };
426 expected_error
427 .arguments
428 .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
429 expected_error.arguments.push(Argument::line(error_line as u64));
430 expected_error.arguments.push(metatag);
431 expected_error.arguments.push(Argument::message("this is an error"));
432 expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
433 assert_eq!(observed_error, expected_error);
434 }
435 }
436
437 #[fuchsia::test(logging = false)]
438 async fn tags_are_sent() {
439 let mut ring_buffer = init_sink(SinkConfig {
440 tags: vec!["tags_are_sent".to_string()],
441 ..SinkConfig::default()
442 })
443 .await;
444
445 let mut next_message = async move || {
446 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
447 let (record, _) = parse_record(&buf).unwrap();
448 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
449 record.into_owned()
450 };
451
452 info!("this should have a tag");
453 let observed = next_message().await;
454
455 let mut expected = Record {
456 timestamp: observed.timestamp,
457 severity: Severity::Info as u8,
458 arguments: arg_prefix(),
459 };
460 expected.arguments.push(Argument::message("this should have a tag"));
461 expected.arguments.push(Argument::tag("tags_are_sent"));
462 assert_eq!(observed, expected);
463 }
464
465 #[fuchsia::test(logging = false)]
466 async fn log_every_n_seconds_test() {
467 let mut ring_buffer = init_sink(SinkConfig { ..SinkConfig::default() }).await;
468 let mut next_message = async move || {
469 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
470 let (record, _) = parse_record(&buf).unwrap();
471 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
472 record.into_owned()
473 };
474
475 let log_fn = || {
476 log_every_n_seconds!(5, INFO, "test message");
477 };
478
479 let mut expect_message = async move || {
480 let observed = next_message().await;
481
482 let mut expected = Record {
483 timestamp: observed.timestamp,
484 severity: Severity::Info as u8,
485 arguments: arg_prefix(),
486 };
487 expected.arguments.push(Argument::message("test message"));
488 assert_eq!(observed, expected);
489 };
490
491 log_fn();
492 expect_message().await;
494 log_fn();
495 assert!(expect_message().now_or_never().is_none());
498 increment_clock(Duration::from_secs(5));
499
500 log_fn();
502 expect_message().await;
503 }
504
505 #[fuchsia::test(logging = false)]
506 async fn drop_count_is_tracked() {
507 let mut ring_buffer = init_sink(SinkConfig::default()).await;
508 const MESSAGE_SIZE: usize = 104;
509 const MESSAGE_SIZE_WITH_DROPS: usize = 136;
510 const NUM_DROPPED: usize = 100;
511
512 let emit_message = || info!("it's-a-me, a message-o");
513
514 emit_message();
516 ring_buffer.read_message().await.unwrap();
517
518 let mut num_emitted = 0;
520 let buffer_space =
521 || ring_buffer.capacity() - (ring_buffer.head() - ring_buffer.tail()) as usize;
522 while buffer_space() >= RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE {
523 emit_message();
524 num_emitted += 1;
525 assert_eq!(
526 (ring_buffer.head() - ring_buffer.tail()) as usize,
527 num_emitted * (RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE),
528 "incorrect bytes stored after {} messages sent",
529 num_emitted
530 );
531 }
532
533 for _ in 0..NUM_DROPPED {
535 emit_message();
536 }
537
538 let mut drain_message = async |with_drops| {
539 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
540
541 let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
542 assert_eq!(
543 buf.len(),
544 expected_len,
545 "constant message size is used to calculate thresholds"
546 );
547
548 let (record, _) = parse_record(&buf).unwrap();
549 let mut expected_args = arg_prefix();
550
551 if with_drops {
552 expected_args.push(Argument::dropped(NUM_DROPPED as u64));
553 }
554
555 expected_args.push(Argument::message("it's-a-me, a message-o"));
556
557 assert_eq!(
558 record,
559 Record {
560 timestamp: record.timestamp,
561 severity: Severity::Info as u8,
562 arguments: expected_args
563 }
564 );
565 };
566
567 drain_message(false).await;
574 drain_message(false).await;
575 num_emitted -= 2;
577 emit_message();
579 for _ in 0..num_emitted {
581 drain_message(false).await;
582 }
583 drain_message(true).await;
585
586 emit_message();
588 drain_message(false).await;
589 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "must drain all messages");
590 }
591
592 #[fuchsia::test(logging = false)]
593 async fn build_record_from_log_event() {
594 let before_timestamp = zx::BootInstant::get();
595 let last_record = Arc::new(Mutex::new(None));
596 let logger = TrackerLogger::new(last_record.clone());
597 log::set_boxed_logger(Box::new(logger)).expect("set logger");
598 log::set_max_level(log::LevelFilter::Info);
599 log::info!(
600 is_a_str = "hahaha",
601 is_debug:? = PrintMe(5),
602 is_signed = -500,
603 is_unsigned = 1000u64,
604 is_bool = false;
605 "blarg this is a message"
606 );
607
608 let guard = last_record.lock();
609 let encoder = guard.as_ref().unwrap();
610 let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
611 assert_gt!(record.timestamp, before_timestamp);
612 assert_eq!(
613 record,
614 Record {
615 timestamp: record.timestamp,
616 severity: Severity::Info as u8,
617 arguments: vec![
618 Argument::pid(PROCESS_ID.with(|p| *p)),
619 Argument::tid(THREAD_ID.with(|p| *p)),
620 Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
621 Argument::message("blarg this is a message"),
622 Argument::other("is_a_str", "hahaha"),
623 Argument::other("is_debug", "PrintMe(5)"),
624 Argument::other("is_signed", -500),
625 Argument::other("is_unsigned", 1000u64),
626 Argument::other("is_bool", false),
627 Argument::tag("a-tag"),
628 ]
629 }
630 );
631 }
632
633 #[derive(Debug)]
635 struct PrintMe(#[allow(unused)] u32);
636
637 type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
638
639 struct TrackerLogger {
640 last_record: Arc<Mutex<Option<ByteEncoder>>>,
641 }
642
643 impl TrackerLogger {
644 fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
645 Self { last_record }
646 }
647 }
648
649 impl log::Log for TrackerLogger {
650 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
651 true
652 }
653
654 fn log(&self, record: &log::Record<'_>) {
655 let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
656 encoder
657 .write_event(WriteEventParams {
658 event: LogEvent::new(record),
659 tags: &["a-tag"],
660 metatags: [Metatag::Target].iter(),
661 pid: PROCESS_ID.with(|p| *p),
662 tid: THREAD_ID.with(|t| *t),
663 dropped: 0,
664 })
665 .expect("wrote event");
666 let mut last_record = self.last_record.lock();
667 last_record.replace(encoder);
668 }
669
670 fn flush(&self) {}
671 }
672
673 #[fuchsia::test]
674 async fn buffered_sink() {
675 const TAG: &str = "foo";
676 let sink = Arc::new(BufferedSink::new(SinkConfig {
677 tags: vec![TAG.to_string()],
678 ..Default::default()
679 }));
680 const MSG: &str = "The quick brown fox jumped over the lazy dog.";
681 const COUNT: usize = 1000;
682 {
684 let sink = Arc::clone(&sink);
685 std::thread::spawn(move || {
686 for i in 0..COUNT {
687 sink.record_log(
688 &log::Record::builder()
689 .level(log::Level::Warn)
690 .args(format_args!("{i}: {MSG}"))
691 .build(),
692 );
693 }
694 });
695 }
696 const MAX_RECORD_SIZE: usize = 176;
697 let mut ring_buffer = RingBuffer::create(
699 (MAX_RECORD_SIZE * COUNT * 5 / 4).next_multiple_of(zx::system_get_page_size() as usize),
700 );
701 sink.set_buffer(ring_buffer.new_iob_writer(0).unwrap().0);
702 for i in 0..COUNT {
704 let (_tag, msg) = ring_buffer.read_message().await.unwrap();
705 let (record, _) = parse_record(&msg).unwrap();
706 assert_eq!(record.severity, Severity::Warn as u8);
707 let mut found = 0;
708 for arg in record.arguments {
709 match arg {
710 Argument::Message(msg) => {
711 assert_eq!(msg, format!("{i}: {MSG}"));
712 assert_eq!(found & 1, 0);
713 found |= 1;
714 }
715 Argument::Tag(tag) => {
716 assert_eq!(tag, TAG);
717 assert_eq!(found & 2, 0);
718 found |= 2;
719 }
720 _ => {}
721 }
722 }
723 assert_eq!(found, 3);
724 }
725 }
726}