1use diagnostics_log_encoding::encode::{
5 Encoder, EncoderOpts, EncodingError, LogEvent, MutableBuffer, TestRecord, WriteEventParams,
6};
7use diagnostics_log_encoding::{Header, Metatag};
8use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
9use fuchsia_runtime as rt;
10use fuchsia_sync::RwLock;
11use std::cell::UnsafeCell;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::mem::MaybeUninit;
15use std::sync::OnceLock;
16use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
17use zx::{self as zx, AsHandleRef};
18
19const QUEUE_SIZE: usize = 256 * 1024;
22
23#[derive(Default)]
24pub(crate) struct SinkConfig {
25 pub(crate) metatags: HashSet<Metatag>,
26 pub(crate) tags: Vec<String>,
27 pub(crate) always_log_file_line: bool,
28}
29
30thread_local! {
31 static PROCESS_ID: zx::Koid = rt::process_self()
32 .get_koid()
33 .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
34 static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
35 thread.get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
36 });
37}
38
39pub trait Sink {
40 fn num_events_dropped(&self) -> &AtomicU32;
41 fn config(&self) -> &SinkConfig;
42 fn send(&self, packet: &[u8]) -> Result<(), zx::Status>;
43
44 fn event_for_testing(&self, record: TestRecord<'_>) {
45 self.encode_and_send(move |encoder, previously_dropped| {
46 encoder.write_event(WriteEventParams {
47 event: record,
48 tags: &self.config().tags,
49 metatags: std::iter::empty(),
50 pid: PROCESS_ID.with(|p| *p),
51 tid: THREAD_ID.with(|t| *t),
52 dropped: previously_dropped.into(),
53 })
54 });
55 }
56
57 fn record_log(&self, record: &log::Record<'_>) {
58 self.encode_and_send(|encoder, previously_dropped| {
59 encoder.write_event(WriteEventParams {
60 event: LogEvent::new(record),
61 tags: &self.config().tags,
62 metatags: self.config().metatags.iter(),
63 pid: PROCESS_ID.with(|p| *p),
64 tid: THREAD_ID.with(|t| *t),
65 dropped: previously_dropped.into(),
66 })
67 });
68 }
69
70 #[inline]
71 fn encode_and_send(
72 &self,
73 encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
74 ) {
75 let ordering = Ordering::Relaxed;
76 let num_events_dropped = self.num_events_dropped();
77 let previously_dropped = num_events_dropped.swap(0, ordering);
78 let restore_and_increment_dropped_count = || {
79 num_events_dropped.fetch_add(previously_dropped + 1, ordering);
80 };
81
82 let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
88 let mut encoder = Encoder::new(
89 Cursor::new(&mut buf[..]),
90 EncoderOpts { always_log_file_line: self.config().always_log_file_line },
91 );
92 if encode(&mut encoder, previously_dropped).is_err() {
93 restore_and_increment_dropped_count();
94 return;
95 }
96
97 let end = encoder.inner().cursor();
98 let packet = &encoder.inner().get_ref()[..end];
99
100 if packet.is_empty() || self.send(packet).is_err() {
101 restore_and_increment_dropped_count();
102 }
103 }
104}
105
106pub struct IoBufferSink {
107 iob: zx::Iob,
108 num_events_dropped: AtomicU32,
109 config: SinkConfig,
110}
111
112impl Sink for IoBufferSink {
113 fn num_events_dropped(&self) -> &AtomicU32 {
114 &self.num_events_dropped
115 }
116
117 fn config(&self) -> &SinkConfig {
118 &self.config
119 }
120
121 fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
122 self.iob.write(Default::default(), 0, packet)
123 }
124}
125
126impl IoBufferSink {
127 pub fn new(iob: zx::Iob, config: SinkConfig) -> Self {
128 Self { iob, num_events_dropped: AtomicU32::new(0), config }
129 }
130}
131
132pub struct BufferedSink {
133 iob: OnceLock<zx::Iob>,
134 buffer: RwLock<Option<Buffer>>,
135 num_events_dropped: AtomicU32,
136 config: SinkConfig,
137}
138
139struct Buffer {
140 queue: Queue,
141}
142
143impl BufferedSink {
144 pub fn new(config: SinkConfig) -> Self {
145 Self {
146 iob: OnceLock::new(),
147 buffer: RwLock::new(Some(Buffer { queue: Queue::new() })),
148 num_events_dropped: AtomicU32::new(0),
149 config,
150 }
151 }
152
153 pub fn set_buffer(&self, io_buffer: zx::Iob) {
155 let queue = {
159 let mut buffer = self.buffer.write();
160 if let Some(Buffer { queue, .. }) = &mut *buffer {
161 Some(std::mem::replace(queue, Queue::new()))
162 } else {
163 None
164 }
165 };
166
167 let mut dropped = 0;
168 if let Some(mut queue) = queue {
169 self.forward_queue(&mut queue, &io_buffer, &mut dropped);
170 }
171
172 let mut buffer = self.buffer.write();
174 if let Some(Buffer { queue, .. }) = &mut *buffer {
175 self.forward_queue(queue, &io_buffer, &mut dropped);
176 }
177
178 self.num_events_dropped.fetch_add(dropped, Ordering::Relaxed);
179
180 self.iob.set(io_buffer).unwrap();
181 *buffer = None;
182 }
183
184 fn forward_queue(&self, queue: &mut Queue, iob: &zx::Iob, dropped: &mut u32) {
185 let mut slice = queue.as_slice();
186 while !slice.is_empty() {
187 let header = Header(u64::from_le_bytes(slice[..8].try_into().unwrap()));
188 let message_len = header.size_words() as usize * 8;
189 assert!(message_len > 0);
190 if *dropped > 0 || iob.write(Default::default(), 0, &slice[..message_len]).is_err() {
191 *dropped += 1;
192 }
193 slice = &slice[message_len..];
194 }
195 }
196}
197
198impl Sink for BufferedSink {
199 fn num_events_dropped(&self) -> &AtomicU32 {
200 &self.num_events_dropped
201 }
202
203 fn config(&self) -> &SinkConfig {
204 &self.config
205 }
206
207 fn send(&self, packet: &[u8]) -> Result<(), zx::Status> {
208 loop {
209 if let Some(iob) = self.iob.get() {
210 return iob.write(Default::default(), 0, packet);
211 }
212
213 let buffer = self.buffer.read();
214 let Some(Buffer { queue, .. }) = &*buffer else {
215 continue;
217 };
218
219 return if queue.push(packet) { Ok(()) } else { Err(zx::Status::NO_SPACE) };
220 }
221 }
222}
223
224struct Queue {
225 buf: UnsafeCell<Box<[MaybeUninit<u8>]>>,
226 len: AtomicUsize,
227}
228
229unsafe impl Send for BufferedSink {}
231unsafe impl Sync for BufferedSink {}
232
233impl Queue {
234 fn new() -> Self {
235 Self { buf: UnsafeCell::new(Box::new_uninit_slice(QUEUE_SIZE)), len: AtomicUsize::new(0) }
236 }
237
238 fn capacity(&self) -> usize {
239 unsafe { (&*self.buf.get()).len() }
241 }
242
243 fn push(&self, data: &[u8]) -> bool {
245 let mut len = self.len.load(Ordering::Relaxed);
246 loop {
247 if len + data.len() > self.capacity() {
248 return false;
249 }
250 match self.len.compare_exchange_weak(
251 len,
252 len + data.len(),
253 Ordering::Relaxed,
254 Ordering::Relaxed,
255 ) {
256 Ok(_) => {
257 unsafe {
261 (*self.buf.get())
262 .as_mut_ptr()
263 .cast::<u8>()
264 .add(len)
265 .copy_from_nonoverlapping(&data[0], data.len());
266 }
267 return true;
268 }
269 Err(old) => len = old,
270 }
271 }
272 }
273
274 fn as_slice(&mut self) -> &[u8] {
275 unsafe {
278 std::slice::from_raw_parts(self.buf.get_mut().as_ptr().cast(), *self.len.get_mut())
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::{increment_clock, log_every_n_seconds};
287 use diagnostics_log_encoding::parse::parse_record;
288 use diagnostics_log_encoding::{Argument, Record};
289 use diagnostics_log_types::Severity;
290 use fuchsia_sync::Mutex;
291 use futures::FutureExt;
292 use log::{debug, error, info, trace, warn};
293 use ring_buffer::{self, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
294 use std::sync::Arc;
295 use std::time::Duration;
296 use test_util::assert_gt;
297
298 const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
299
300 struct TestLogger {
301 sink: IoBufferSink,
302 }
303
304 impl TestLogger {
305 fn new(sink: IoBufferSink) -> Self {
306 Self { sink }
307 }
308 }
309
310 impl log::Log for TestLogger {
311 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
312 true
313 }
314
315 fn log(&self, record: &log::Record<'_>) {
316 if self.enabled(record.metadata()) {
317 self.sink.record_log(record);
318 }
319 }
320
321 fn flush(&self) {}
322 }
323
324 async fn init_sink(config: SinkConfig) -> ring_buffer::Reader {
325 let ring_buffer = RingBuffer::create(32 * zx::system_get_page_size() as usize);
326 let (iob, _) = ring_buffer.new_iob_writer(0).unwrap();
327
328 let sink = IoBufferSink::new(iob, config);
329 log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
330 log::set_max_level(log::LevelFilter::Info);
331
332 ring_buffer
333 }
334
335 fn arg_prefix() -> Vec<Argument<'static>> {
336 vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
337 }
338
339 #[fuchsia::test(logging = false)]
340 async fn packets_are_sent() {
341 let mut ring_buffer = init_sink(SinkConfig {
342 metatags: HashSet::from([Metatag::Target]),
343 ..SinkConfig::default()
344 })
345 .await;
346 log::set_max_level(log::LevelFilter::Trace);
347
348 let mut next_message = async move || {
349 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
350 let (record, _) = parse_record(&buf).unwrap();
351 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
352 record.into_owned()
353 };
354
355 trace!(count = 123; "whoa this is noisy");
357 let observed_trace = next_message().await;
358 debug!(maybe = true; "don't try this at home");
359 let observed_debug = next_message().await;
360 info!("this is a message");
361 let observed_info = next_message().await;
362 warn!(reason = "just cuz"; "this is a warning");
363 let observed_warn = next_message().await;
364 error!(e = "something went pretty wrong"; "this is an error");
365 let error_line = line!() - 1;
366 let metatag = Argument::tag(TARGET);
367 let observed_error = next_message().await;
368
369 {
371 let mut expected_trace = Record {
372 timestamp: observed_trace.timestamp,
373 severity: Severity::Trace as u8,
374 arguments: arg_prefix(),
375 };
376 expected_trace.arguments.push(metatag.clone());
377 expected_trace.arguments.push(Argument::message("whoa this is noisy"));
378 expected_trace.arguments.push(Argument::new("count", 123));
379 assert_eq!(observed_trace, expected_trace);
380 }
381
382 {
384 let mut expected_debug = Record {
385 timestamp: observed_debug.timestamp,
386 severity: Severity::Debug as u8,
387 arguments: arg_prefix(),
388 };
389 expected_debug.arguments.push(metatag.clone());
390 expected_debug.arguments.push(Argument::message("don't try this at home"));
391 expected_debug.arguments.push(Argument::new("maybe", true));
392 assert_eq!(observed_debug, expected_debug);
393 }
394
395 {
397 let mut expected_info = Record {
398 timestamp: observed_info.timestamp,
399 severity: Severity::Info as u8,
400 arguments: arg_prefix(),
401 };
402 expected_info.arguments.push(metatag.clone());
403 expected_info.arguments.push(Argument::message("this is a message"));
404 assert_eq!(observed_info, expected_info);
405 }
406
407 {
409 let mut expected_warn = Record {
410 timestamp: observed_warn.timestamp,
411 severity: Severity::Warn as u8,
412 arguments: arg_prefix(),
413 };
414 expected_warn.arguments.push(metatag.clone());
415 expected_warn.arguments.push(Argument::message("this is a warning"));
416 expected_warn.arguments.push(Argument::new("reason", "just cuz"));
417 assert_eq!(observed_warn, expected_warn);
418 }
419
420 {
422 let mut expected_error = Record {
423 timestamp: observed_error.timestamp,
424 severity: Severity::Error as u8,
425 arguments: arg_prefix(),
426 };
427 expected_error
428 .arguments
429 .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
430 expected_error.arguments.push(Argument::line(error_line as u64));
431 expected_error.arguments.push(metatag);
432 expected_error.arguments.push(Argument::message("this is an error"));
433 expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
434 assert_eq!(observed_error, expected_error);
435 }
436 }
437
438 #[fuchsia::test(logging = false)]
439 async fn tags_are_sent() {
440 let mut ring_buffer = init_sink(SinkConfig {
441 tags: vec!["tags_are_sent".to_string()],
442 ..SinkConfig::default()
443 })
444 .await;
445
446 let mut next_message = async move || {
447 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
448 let (record, _) = parse_record(&buf).unwrap();
449 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
450 record.into_owned()
451 };
452
453 info!("this should have a tag");
454 let observed = next_message().await;
455
456 let mut expected = Record {
457 timestamp: observed.timestamp,
458 severity: Severity::Info as u8,
459 arguments: arg_prefix(),
460 };
461 expected.arguments.push(Argument::message("this should have a tag"));
462 expected.arguments.push(Argument::tag("tags_are_sent"));
463 assert_eq!(observed, expected);
464 }
465
466 #[fuchsia::test(logging = false)]
467 async fn log_every_n_seconds_test() {
468 let mut ring_buffer = init_sink(SinkConfig { ..SinkConfig::default() }).await;
469 let mut next_message = async move || {
470 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
471 let (record, _) = parse_record(&buf).unwrap();
472 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "buffer must be empty");
473 record.into_owned()
474 };
475
476 let log_fn = || {
477 log_every_n_seconds!(5, INFO, "test message");
478 };
479
480 let mut expect_message = async move || {
481 let observed = next_message().await;
482
483 let mut expected = Record {
484 timestamp: observed.timestamp,
485 severity: Severity::Info as u8,
486 arguments: arg_prefix(),
487 };
488 expected.arguments.push(Argument::message("test message"));
489 assert_eq!(observed, expected);
490 };
491
492 log_fn();
493 expect_message().await;
495 log_fn();
496 assert!(expect_message().now_or_never().is_none());
499 increment_clock(Duration::from_secs(5));
500
501 log_fn();
503 expect_message().await;
504 }
505
506 #[fuchsia::test(logging = false)]
507 async fn drop_count_is_tracked() {
508 let mut ring_buffer = init_sink(SinkConfig::default()).await;
509 const MESSAGE_SIZE: usize = 104;
510 const MESSAGE_SIZE_WITH_DROPS: usize = 136;
511 const NUM_DROPPED: usize = 100;
512
513 let emit_message = || info!("it's-a-me, a message-o");
514
515 emit_message();
517 ring_buffer.read_message().await.unwrap();
518
519 let mut num_emitted = 0;
521 let buffer_space =
522 || ring_buffer.capacity() - (ring_buffer.head() - ring_buffer.tail()) as usize;
523 while buffer_space() >= RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE {
524 emit_message();
525 num_emitted += 1;
526 assert_eq!(
527 (ring_buffer.head() - ring_buffer.tail()) as usize,
528 num_emitted * (RING_BUFFER_MESSAGE_HEADER_SIZE + MESSAGE_SIZE),
529 "incorrect bytes stored after {} messages sent",
530 num_emitted
531 );
532 }
533
534 for _ in 0..NUM_DROPPED {
536 emit_message();
537 }
538
539 let mut drain_message = async |with_drops| {
540 let (_tag, buf) = ring_buffer.read_message().await.unwrap();
541
542 let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
543 assert_eq!(
544 buf.len(),
545 expected_len,
546 "constant message size is used to calculate thresholds"
547 );
548
549 let (record, _) = parse_record(&buf).unwrap();
550 let mut expected_args = arg_prefix();
551
552 if with_drops {
553 expected_args.push(Argument::dropped(NUM_DROPPED as u64));
554 }
555
556 expected_args.push(Argument::message("it's-a-me, a message-o"));
557
558 assert_eq!(
559 record,
560 Record {
561 timestamp: record.timestamp,
562 severity: Severity::Info as u8,
563 arguments: expected_args
564 }
565 );
566 };
567
568 drain_message(false).await;
575 drain_message(false).await;
576 num_emitted -= 2;
578 emit_message();
580 for _ in 0..num_emitted {
582 drain_message(false).await;
583 }
584 drain_message(true).await;
586
587 emit_message();
589 drain_message(false).await;
590 assert_eq!(ring_buffer.head(), ring_buffer.tail(), "must drain all messages");
591 }
592
593 #[fuchsia::test(logging = false)]
594 async fn build_record_from_log_event() {
595 let before_timestamp = zx::BootInstant::get();
596 let last_record = Arc::new(Mutex::new(None));
597 let logger = TrackerLogger::new(last_record.clone());
598 log::set_boxed_logger(Box::new(logger)).expect("set logger");
599 log::set_max_level(log::LevelFilter::Info);
600 log::info!(
601 is_a_str = "hahaha",
602 is_debug:? = PrintMe(5),
603 is_signed = -500,
604 is_unsigned = 1000u64,
605 is_bool = false;
606 "blarg this is a message"
607 );
608
609 let guard = last_record.lock();
610 let encoder = guard.as_ref().unwrap();
611 let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
612 assert_gt!(record.timestamp, before_timestamp);
613 assert_eq!(
614 record,
615 Record {
616 timestamp: record.timestamp,
617 severity: Severity::Info as u8,
618 arguments: vec![
619 Argument::pid(PROCESS_ID.with(|p| *p)),
620 Argument::tid(THREAD_ID.with(|p| *p)),
621 Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
622 Argument::message("blarg this is a message"),
623 Argument::other("is_a_str", "hahaha"),
624 Argument::other("is_debug", "PrintMe(5)"),
625 Argument::other("is_signed", -500),
626 Argument::other("is_unsigned", 1000u64),
627 Argument::other("is_bool", false),
628 Argument::tag("a-tag"),
629 ]
630 }
631 );
632 }
633
634 #[derive(Debug)]
636 struct PrintMe(#[allow(unused)] u32);
637
638 type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
639
640 struct TrackerLogger {
641 last_record: Arc<Mutex<Option<ByteEncoder>>>,
642 }
643
644 impl TrackerLogger {
645 fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
646 Self { last_record }
647 }
648 }
649
650 impl log::Log for TrackerLogger {
651 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
652 true
653 }
654
655 fn log(&self, record: &log::Record<'_>) {
656 let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
657 encoder
658 .write_event(WriteEventParams {
659 event: LogEvent::new(record),
660 tags: &["a-tag"],
661 metatags: [Metatag::Target].iter(),
662 pid: PROCESS_ID.with(|p| *p),
663 tid: THREAD_ID.with(|t| *t),
664 dropped: 0,
665 })
666 .expect("wrote event");
667 let mut last_record = self.last_record.lock();
668 last_record.replace(encoder);
669 }
670
671 fn flush(&self) {}
672 }
673
674 #[fuchsia::test]
675 async fn buffered_sink() {
676 const TAG: &str = "foo";
677 let sink = Arc::new(BufferedSink::new(SinkConfig {
678 tags: vec![TAG.to_string()],
679 ..Default::default()
680 }));
681 const MSG: &str = "The quick brown fox jumped over the lazy dog.";
682 const COUNT: usize = 1000;
683 {
685 let sink = Arc::clone(&sink);
686 std::thread::spawn(move || {
687 for i in 0..COUNT {
688 sink.record_log(
689 &log::Record::builder()
690 .level(log::Level::Warn)
691 .args(format_args!("{i}: {MSG}"))
692 .build(),
693 );
694 }
695 });
696 }
697 const MAX_RECORD_SIZE: usize = 176;
698 let mut ring_buffer = RingBuffer::create(
700 (MAX_RECORD_SIZE * COUNT * 5 / 4).next_multiple_of(zx::system_get_page_size() as usize),
701 );
702 sink.set_buffer(ring_buffer.new_iob_writer(0).unwrap().0);
703 for i in 0..COUNT {
705 let (_tag, msg) = ring_buffer.read_message().await.unwrap();
706 let (record, _) = parse_record(&msg).unwrap();
707 assert_eq!(record.severity, Severity::Warn as u8);
708 let mut found = 0;
709 for arg in record.arguments {
710 match arg {
711 Argument::Message(msg) => {
712 assert_eq!(msg, format!("{i}: {MSG}"));
713 assert_eq!(found & 1, 0);
714 found |= 1;
715 }
716 Argument::Tag(tag) => {
717 assert_eq!(tag, TAG);
718 assert_eq!(found & 2, 0);
719 found |= 2;
720 }
721 _ => {}
722 }
723 }
724 assert_eq!(found, 3);
725 }
726 }
727}