1use crate::filter::LogFilterCriteria;
6use crate::log_socket_stream::{JsonDeserializeError, LogsDataStream};
7use crate::{
8 DetailedDateTime, InstanceGetter, LogCommand, LogError, LogProcessingResult, LogSubCommand,
9 TimeFormat,
10};
11use anyhow::Result;
12use async_trait::async_trait;
13use diagnostics_data::{
14 Data, LogTextColor, LogTextDisplayOptions, LogTextPresenter, LogTimeDisplayFormat, Logs,
15 LogsData, LogsDataBuilder, LogsField, LogsProperty, Severity, Timezone,
16};
17use futures_util::future::Either;
18use futures_util::stream::FuturesUnordered;
19use futures_util::{StreamExt, select};
20use serde::{Deserialize, Serialize};
21use std::fmt::Display;
22use std::io::Write;
23use std::time::Duration;
24use thiserror::Error;
25use writer::ToolIO;
26
27pub use diagnostics_data::Timestamp;
28
29pub const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S.%3f";
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
33pub enum LogData {
34 TargetLog(LogsData),
36}
37
38impl LogData {
39 pub fn as_target_log(&self) -> Option<&LogsData> {
41 match self {
42 LogData::TargetLog(log) => Some(log),
43 }
44 }
45
46 pub fn as_target_log_mut(&mut self) -> Option<&mut LogsData> {
47 match self {
48 LogData::TargetLog(log) => Some(log),
49 }
50 }
51}
52
53impl From<LogsData> for LogData {
54 fn from(data: LogsData) -> Self {
55 Self::TargetLog(data)
56 }
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
62pub struct LogEntry {
63 pub data: LogData,
65}
66
67impl LogEntry {
68 fn utc_timestamp(&self, boot_ts: Option<Timestamp>) -> Timestamp {
69 Timestamp::from_nanos(match &self.data {
70 LogData::TargetLog(data) => {
71 data.metadata.timestamp.into_nanos()
72 + boot_ts.map(|value| value.into_nanos()).unwrap_or(0)
73 }
74 })
75 }
76}
77
78impl Display for LogEntry {
82 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 unreachable!("UNSUPPORTED -- This type cannot be formatted with std format.");
84 }
85}
86
87#[async_trait(?Send)]
89pub trait Symbolize {
90 async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry>;
94}
95
96async fn handle_value<S>(one: Data<Logs>, symbolizer: &S) -> Option<LogEntry>
97where
98 S: Symbolize + ?Sized,
99{
100 let entry = LogEntry { data: one.into() };
101 symbolizer.symbolize(entry).await
102}
103
104fn generate_timestamp_message(boot_timestamp: Timestamp) -> LogEntry {
105 LogEntry {
106 data: LogData::TargetLog(
107 LogsDataBuilder::new(diagnostics_data::BuilderArgs {
108 moniker: "ffx".try_into().unwrap(),
109 timestamp: Timestamp::from_nanos(0),
110 component_url: Some("ffx".into()),
111 severity: Severity::Info,
112 })
113 .set_message("Logging started")
114 .add_key(LogsProperty::String(
115 LogsField::Other("utc_time_now".into()),
116 chrono::Utc::now().to_rfc3339(),
117 ))
118 .add_key(LogsProperty::Int(
119 LogsField::Other("current_boot_timestamp".to_string()),
120 boot_timestamp.into_nanos(),
121 ))
122 .build(),
123 ),
124 }
125}
126
127pub async fn dump_logs_from_socket<F, S>(
129 socket: flex_client::AsyncSocket,
130 formatter: &mut F,
131 symbolizer: &S,
132 include_timestamp: bool,
133) -> Result<LogProcessingResult, JsonDeserializeError>
134where
135 F: LogFormatter + BootTimeAccessor,
136 S: Symbolize + ?Sized,
137{
138 let mut decoder = Box::pin(LogsDataStream::new(socket).fuse());
139 let mut symbolize_pending = FuturesUnordered::new();
140 if include_timestamp && !formatter.is_utc_time_format() {
141 formatter.push_log(generate_timestamp_message(formatter.get_boot_timestamp())).await?;
142 }
143 while let Some(value) = select! {
144 res = decoder.next() => Some(Either::Left(res)),
145 res = symbolize_pending.next() => Some(Either::Right(res)),
146 complete => None,
147 } {
148 match value {
149 Either::Left(Some(result)) => match result {
150 Ok(log) => symbolize_pending.push(handle_value(log, symbolizer)),
151 Err(e) => return Err(e),
152 },
153 Either::Right(Some(Some(symbolized))) => match formatter.push_log(symbolized).await? {
154 LogProcessingResult::Exit => {
155 formatter.flush().await?;
156 return Ok(LogProcessingResult::Exit);
157 }
158 LogProcessingResult::Continue => {}
159 },
160 _ => {}
161 }
162 }
163 formatter.flush().await?;
164 Ok(LogProcessingResult::Continue)
165}
166
167#[cfg(not(feature = "fdomain"))]
168pub async fn dump_fxt_logs_from_socket<F, S>(
170 socket: flex_client::AsyncSocket,
171 formatter: &mut F,
172 symbolizer: &S,
173 include_timestamp: bool,
174) -> Result<LogProcessingResult, LogError>
175where
176 F: LogFormatter + BootTimeAccessor,
177 S: Symbolize + ?Sized,
178{
179 let mut streamer = crate::fxt_streamer::FxtStreamer::new(socket);
180 let mut decoder = std::pin::pin!(streamer.stream());
181 let mut symbolize_pending = FuturesUnordered::new();
182 if include_timestamp && !formatter.is_utc_time_format() {
183 formatter.push_log(generate_timestamp_message(formatter.get_boot_timestamp())).await?;
184 }
185 while let Some(value) = select! {
186 res = decoder.next() => Some(Either::Left(res)),
187 res = symbolize_pending.next() => Some(Either::Right(res)),
188 complete => None,
189 } {
190 match value {
191 Either::Left(Some(result)) => match result {
192 Ok(log) => symbolize_pending.push(handle_value(log, symbolizer)),
193 Err(e) => return Err(e),
194 },
195 Either::Right(Some(Some(symbolized))) => match formatter.push_log(symbolized).await? {
196 LogProcessingResult::Exit => {
197 formatter.flush().await?;
198 return Ok(LogProcessingResult::Exit);
199 }
200 LogProcessingResult::Continue => {}
201 },
202 _ => {}
203 }
204 }
205 formatter.flush().await?;
206 Ok(LogProcessingResult::Continue)
207}
208
209pub trait BootTimeAccessor {
210 fn set_boot_timestamp(&mut self, _boot_ts_nanos: Timestamp);
212
213 fn get_boot_timestamp(&self) -> Timestamp;
215}
216
217#[derive(Clone, Debug)]
219pub struct DeviceOrLocalTimestamp {
220 pub timestamp: Timestamp,
222 pub is_boot: bool,
225}
226
227impl DeviceOrLocalTimestamp {
228 pub fn new(
232 rtc: Option<&DetailedDateTime>,
233 boot: Option<&Duration>,
234 ) -> Option<DeviceOrLocalTimestamp> {
235 rtc.as_ref()
236 .filter(|value| !value.is_now)
237 .map(|value| DeviceOrLocalTimestamp {
238 timestamp: Timestamp::from_nanos(
239 value.naive_utc().and_utc().timestamp_nanos_opt().unwrap(),
240 ),
241 is_boot: false,
242 })
243 .or_else(|| {
244 boot.map(|value| DeviceOrLocalTimestamp {
245 timestamp: Timestamp::from_nanos(value.as_nanos() as i64),
246 is_boot: true,
247 })
248 })
249 }
250}
251
252#[derive(Clone, Debug)]
254pub struct LogFormatterOptions {
255 pub display: Option<LogTextDisplayOptions>,
257 pub since: Option<DeviceOrLocalTimestamp>,
259 pub until: Option<DeviceOrLocalTimestamp>,
261 pub tail: Option<usize>,
263}
264
265impl Default for LogFormatterOptions {
266 fn default() -> Self {
267 LogFormatterOptions {
268 display: Some(Default::default()),
269 since: None,
270 until: None,
271 tail: None,
272 }
273 }
274}
275
276#[derive(Error, Debug)]
278pub enum FormatterError {
279 #[error(transparent)]
281 Other(#[from] anyhow::Error),
282 #[error(transparent)]
284 IO(#[from] std::io::Error),
285}
286
287impl FormatterError {
288 pub fn is_broken_pipe(&self) -> bool {
289 match self {
290 FormatterError::IO(error) => error.kind() == std::io::ErrorKind::BrokenPipe,
291 FormatterError::Other(_) => false,
292 }
293 }
294}
295
296pub struct DefaultLogFormatter<W>
298where
299 W: Write + ToolIO<OutputItem = LogEntry>,
300{
301 writer: W,
302 filters: LogFilterCriteria,
303 options: LogFormatterOptions,
304 boot_ts_nanos: Option<Timestamp>,
305 tail_queue: std::collections::VecDeque<LogEntry>,
306 tail_limit: Option<usize>,
307}
308
309fn utc_to_boot(boot_ts: Timestamp, utc: Timestamp) -> Timestamp {
311 Timestamp::from_nanos(utc.into_nanos() - boot_ts.into_nanos())
312}
313
314#[async_trait(?Send)]
315impl<W> LogFormatter for DefaultLogFormatter<W>
316where
317 W: Write + ToolIO<OutputItem = LogEntry>,
318{
319 async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
320 self.push_log_internal(log_entry, true).await.or_else(|err| {
321 if err.is_broken_pipe() { Ok(LogProcessingResult::Exit) } else { Err(err) }
322 })
323 }
324
325 fn is_utc_time_format(&self) -> bool {
326 self.options.display.iter().any(|options| match options.time_format {
327 LogTimeDisplayFormat::Original => false,
328 LogTimeDisplayFormat::WallTime { tz, offset: _ } => tz == Timezone::Utc,
329 })
330 }
331
332 async fn flush(&mut self) -> Result<(), LogError> {
333 self.flush_tail().await
334 }
335}
336
337impl<W> BootTimeAccessor for DefaultLogFormatter<W>
338where
339 W: Write + ToolIO<OutputItem = LogEntry>,
340{
341 fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
342 if let Some(LogTextDisplayOptions {
343 time_format: LogTimeDisplayFormat::WallTime { offset, .. },
344 ..
345 }) = &mut self.options.display
346 {
347 *offset = boot_ts_nanos.into_nanos();
348 }
349 self.boot_ts_nanos = Some(boot_ts_nanos);
350 }
351 fn get_boot_timestamp(&self) -> Timestamp {
352 debug_assert!(self.boot_ts_nanos.is_some());
353 self.boot_ts_nanos.unwrap_or_else(|| Timestamp::from_nanos(0))
354 }
355}
356
357pub trait WriterContainer<W>
359where
360 W: Write + ToolIO<OutputItem = LogEntry>,
361{
362 fn writer(&mut self) -> &mut W;
363}
364
365impl<W> WriterContainer<W> for DefaultLogFormatter<W>
366where
367 W: Write + ToolIO<OutputItem = LogEntry>,
368{
369 fn writer(&mut self) -> &mut W {
370 &mut self.writer
371 }
372}
373
374impl<W> DefaultLogFormatter<W>
375where
376 W: Write + ToolIO<OutputItem = LogEntry>,
377{
378 pub fn new(filters: LogFilterCriteria, writer: W, options: LogFormatterOptions) -> Self {
380 let tail_limit = options.tail;
381 Self {
382 filters,
383 writer,
384 options,
385 boot_ts_nanos: None,
386 tail_queue: std::collections::VecDeque::new(),
387 tail_limit,
388 }
389 }
390
391 pub async fn expand_monikers(&mut self, getter: &impl InstanceGetter) -> Result<(), LogError> {
392 self.filters.expand_monikers(getter).await
393 }
394
395 pub async fn push_unfiltered_log(
396 &mut self,
397 log_entry: LogEntry,
398 ) -> Result<LogProcessingResult, LogError> {
399 self.push_log_internal(log_entry, false).await
400 }
401
402 async fn flush_tail(&mut self) -> Result<(), LogError> {
403 while let Some(log_entry) = self.tail_queue.pop_front() {
404 self.write_log_entry(log_entry, true)?;
405 }
406 Ok(())
407 }
408
409 fn write_log_entry(
410 &mut self,
411 log_entry: LogEntry,
412 enable_filters: bool,
413 ) -> Result<(), LogError> {
414 match self.options.display {
415 Some(text_options) => {
416 let mut options_for_this_line_only = self.options.clone();
417 options_for_this_line_only.display = Some(text_options);
418 if !enable_filters
421 && let LogTimeDisplayFormat::WallTime { ref mut offset, .. } =
422 options_for_this_line_only.display.as_mut().unwrap().time_format
423 {
424 *offset = 1;
429 }
430 self.format_text_log(options_for_this_line_only, log_entry)
431 .map_err(LogError::FormatterError)?;
432 }
433 None => {
434 self.writer.item(&log_entry).map_err(|err| LogError::UnknownError(err.into()))?;
435 }
436 };
437 Ok(())
438 }
439
440 async fn push_log_internal(
441 &mut self,
442 log_entry: LogEntry,
443 enable_filters: bool,
444 ) -> Result<LogProcessingResult, LogError> {
445 if enable_filters {
446 if self.filter_by_timestamp(&log_entry, self.options.since.as_ref(), |a, b| a <= b) {
447 return Ok(LogProcessingResult::Continue);
448 }
449
450 if self.filter_by_timestamp(&log_entry, self.options.until.as_ref(), |a, b| a >= b) {
451 return Ok(LogProcessingResult::Exit);
452 }
453
454 if !self.filters.matches(&log_entry) {
455 return Ok(LogProcessingResult::Continue);
456 }
457 }
458
459 if let Some(limit) = self.tail_limit {
460 self.tail_queue.push_back(log_entry);
461 if self.tail_queue.len() > limit {
462 self.tail_queue.pop_front();
463 }
464 } else {
465 self.write_log_entry(log_entry, enable_filters)?;
466 }
467 Ok(LogProcessingResult::Continue)
468 }
469
470 pub fn new_from_args(cmd: &LogCommand, writer: W) -> Result<Self, LogError> {
472 let is_json = writer.is_machine();
473
474 Ok(DefaultLogFormatter::new(
475 LogFilterCriteria::try_from(cmd.clone())?,
476 writer,
477 LogFormatterOptions {
478 display: if is_json {
479 None
480 } else {
481 Some(LogTextDisplayOptions {
482 show_tags: !cmd.hide_tags,
483 color: if cmd.no_color {
484 LogTextColor::None
485 } else {
486 LogTextColor::BySeverity
487 },
488 show_metadata: cmd.show_metadata,
489 time_format: match cmd.clock {
490 TimeFormat::Boot => LogTimeDisplayFormat::Original,
491 TimeFormat::Local => LogTimeDisplayFormat::WallTime {
492 tz: Timezone::Local,
493 offset: 0,
496 },
497 TimeFormat::Utc => LogTimeDisplayFormat::WallTime {
498 tz: Timezone::Utc,
499 offset: 0,
502 },
503 },
504 show_file: !cmd.hide_file,
505 show_moniker: !cmd.hide_moniker,
506 show_full_moniker: cmd.show_full_moniker,
507 prefer_url_component_name: cmd.prefer_url_component_name,
508 })
509 },
510 since: DeviceOrLocalTimestamp::new(cmd.since.as_ref(), cmd.since_boot.as_ref()),
511 until: DeviceOrLocalTimestamp::new(cmd.until.as_ref(), cmd.until_boot.as_ref()),
512 tail: match &cmd.sub_command {
513 Some(LogSubCommand::Dump(dump)) => dump.tail,
514 _ => None,
515 },
516 },
517 ))
518 }
519
520 fn filter_by_timestamp(
521 &self,
522 log_entry: &LogEntry,
523 timestamp: Option<&DeviceOrLocalTimestamp>,
524 callback: impl Fn(&Timestamp, &Timestamp) -> bool,
525 ) -> bool {
526 let Some(timestamp) = timestamp else {
527 return false;
528 };
529 if timestamp.is_boot {
530 callback(
531 &utc_to_boot(
532 self.get_boot_timestamp(),
533 log_entry.utc_timestamp(self.boot_ts_nanos),
534 ),
535 ×tamp.timestamp,
536 )
537 } else {
538 callback(&log_entry.utc_timestamp(self.boot_ts_nanos), ×tamp.timestamp)
539 }
540 }
541
542 fn format_text_log(
545 &mut self,
546 options: LogFormatterOptions,
547 log_entry: LogEntry,
548 ) -> Result<(), FormatterError> {
549 let text_options = match options.display {
550 Some(o) => o,
551 None => {
552 unreachable!("If we are here, we can only be formatting text");
553 }
554 };
555 match log_entry {
556 LogEntry { data: LogData::TargetLog(data), .. } => {
557 writeln!(self.writer, "{}", LogTextPresenter::new(&data, text_options))?;
560 }
561 }
562 Ok(())
563 }
564}
565
566#[allow(dead_code)] pub struct NoOpSymbolizer;
569
570#[async_trait(?Send)]
571impl Symbolize for NoOpSymbolizer {
572 async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry> {
573 Some(entry)
574 }
575}
576
577#[async_trait(?Send)]
579pub trait LogFormatter {
580 async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError>;
582
583 fn is_utc_time_format(&self) -> bool;
585
586 async fn flush(&mut self) -> Result<(), LogError> {
588 Ok(())
589 }
590}
591
592#[cfg(test)]
593mod test {
594 use crate::parse_time;
595 use assert_matches::assert_matches;
596 use diagnostics_data::{LogsDataBuilder, Severity};
597 use std::cell::Cell;
598 use writer::{Format, JsonWriter, TestBuffers};
599
600 use super::*;
601
602 const DEFAULT_TS_NANOS: u64 = 1615535969000000000;
603
604 struct FakeFormatter {
605 logs: Vec<LogEntry>,
606 boot_timestamp: Timestamp,
607 is_utc_time_format: bool,
608 }
609
610 impl FakeFormatter {
611 fn new() -> Self {
612 Self {
613 logs: Vec::new(),
614 boot_timestamp: Timestamp::from_nanos(0),
615 is_utc_time_format: false,
616 }
617 }
618 }
619
620 impl BootTimeAccessor for FakeFormatter {
621 fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
622 self.boot_timestamp = boot_ts_nanos;
623 }
624
625 fn get_boot_timestamp(&self) -> Timestamp {
626 self.boot_timestamp
627 }
628 }
629
630 #[async_trait(?Send)]
631 impl LogFormatter for FakeFormatter {
632 async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
633 self.logs.push(log_entry);
634 Ok(LogProcessingResult::Continue)
635 }
636
637 fn is_utc_time_format(&self) -> bool {
638 self.is_utc_time_format
639 }
640 }
641
642 pub struct FakeFuchsiaSymbolizer;
644
645 fn set_log_msg(entry: &mut LogEntry, msg: impl Into<String>) {
646 *entry.data.as_target_log_mut().unwrap().msg_mut().unwrap() = msg.into();
647 }
648
649 #[async_trait(?Send)]
650 impl Symbolize for FakeFuchsiaSymbolizer {
651 async fn symbolize(&self, mut entry: LogEntry) -> Option<LogEntry> {
652 set_log_msg(&mut entry, "Fuchsia");
653 Some(entry)
654 }
655 }
656
657 struct FakeSymbolizerCallback {
658 should_discard: Cell<bool>,
659 }
660
661 impl FakeSymbolizerCallback {
662 fn new() -> Self {
663 Self { should_discard: Cell::new(true) }
664 }
665 }
666
667 async fn dump_logs_from_socket<F, S>(
668 socket: fuchsia_async::Socket,
669 formatter: &mut F,
670 symbolizer: &S,
671 ) -> Result<LogProcessingResult, JsonDeserializeError>
672 where
673 F: LogFormatter + BootTimeAccessor,
674 S: Symbolize + ?Sized,
675 {
676 super::dump_logs_from_socket(socket, formatter, symbolizer, false).await
677 }
678
679 #[async_trait(?Send)]
680 impl Symbolize for FakeSymbolizerCallback {
681 async fn symbolize(&self, mut input: LogEntry) -> Option<LogEntry> {
682 self.should_discard.set(!self.should_discard.get());
683 if self.should_discard.get() {
684 None
685 } else {
686 set_log_msg(&mut input, "symbolized log");
687 Some(input)
688 }
689 }
690 }
691
692 #[fuchsia::test]
693 async fn test_boot_timestamp_setter() {
694 let buffers = TestBuffers::default();
695 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
696 let options = LogFormatterOptions {
697 display: Some(LogTextDisplayOptions {
698 time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 0 },
699 ..Default::default()
700 }),
701 ..Default::default()
702 };
703 let mut formatter =
704 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
705 formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
706 assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
707
708 let buffers = TestBuffers::default();
710 let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
711 let options = LogFormatterOptions { display: None, ..Default::default() };
712 let mut formatter = DefaultLogFormatter::new(LogFilterCriteria::default(), output, options);
713 formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
714 assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
715 }
716
717 #[fuchsia::test]
718 async fn test_format_single_message() {
719 let symbolizer = NoOpSymbolizer {};
720 let mut formatter = FakeFormatter::new();
721 let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
722 moniker: "ffx".try_into().unwrap(),
723 timestamp: Timestamp::from_nanos(0),
724 component_url: Some("ffx".into()),
725 severity: Severity::Info,
726 })
727 .set_message("Hello world!")
728 .build();
729 let (sender, receiver) = zx::Socket::create_stream();
730 sender
731 .write(serde_json::to_string(&target_log).unwrap().as_bytes())
732 .expect("failed to write target log");
733 drop(sender);
734 dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
735 .await
736 .unwrap();
737 assert_eq!(formatter.logs, vec![LogEntry { data: LogData::TargetLog(target_log) }]);
738 }
739
740 #[fuchsia::test]
741 async fn test_format_utc_timestamp() {
742 let symbolizer = NoOpSymbolizer {};
743 let mut formatter = FakeFormatter::new();
744 formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
745 let (_, receiver) = zx::Socket::create_stream();
746 super::dump_logs_from_socket(
747 flex_client::socket_to_async(receiver),
748 &mut formatter,
749 &symbolizer,
750 true,
751 )
752 .await
753 .unwrap();
754 let target_log = formatter.logs[0].data.as_target_log().unwrap();
755 let properties = target_log.payload_keys().unwrap();
756 assert_eq!(target_log.msg().unwrap(), "Logging started");
757
758 chrono::DateTime::parse_from_rfc3339(
760 properties.get_property("utc_time_now").unwrap().string().unwrap(),
761 )
762 .unwrap();
763 assert_eq!(
764 properties.get_property("current_boot_timestamp").unwrap().int().unwrap(),
765 DEFAULT_TS_NANOS as i64
766 );
767 }
768
769 #[fuchsia::test]
770 async fn test_format_utc_timestamp_does_not_print_if_utc_time() {
771 let symbolizer = NoOpSymbolizer {};
772 let mut formatter = FakeFormatter::new();
773 formatter.is_utc_time_format = true;
774 formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
775 let (_, receiver) = zx::Socket::create_stream();
776 super::dump_logs_from_socket(
777 flex_client::socket_to_async(receiver),
778 &mut formatter,
779 &symbolizer,
780 true,
781 )
782 .await
783 .unwrap();
784 assert_eq!(formatter.logs.len(), 0);
785 }
786
787 #[fuchsia::test]
788 async fn test_format_multiple_messages() {
789 let symbolizer = NoOpSymbolizer {};
790 let mut formatter = FakeFormatter::new();
791 let (sender, receiver) = zx::Socket::create_stream();
792 let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
793 moniker: "ffx".try_into().unwrap(),
794 timestamp: Timestamp::from_nanos(0),
795 component_url: Some("ffx".into()),
796 severity: Severity::Info,
797 })
798 .set_message("Hello world!")
799 .set_pid(1)
800 .set_tid(2)
801 .build();
802 let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
803 moniker: "ffx".try_into().unwrap(),
804 timestamp: Timestamp::from_nanos(1),
805 component_url: Some("ffx".into()),
806 severity: Severity::Info,
807 })
808 .set_message("Hello world 2!")
809 .build();
810 sender
811 .write(serde_json::to_string(&vec![&target_log_0, &target_log_1]).unwrap().as_bytes())
812 .expect("failed to write target log");
813 drop(sender);
814 dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
815 .await
816 .unwrap();
817 assert_eq!(
818 formatter.logs,
819 vec![
820 LogEntry { data: LogData::TargetLog(target_log_0) },
821 LogEntry { data: LogData::TargetLog(target_log_1) }
822 ]
823 );
824 }
825
826 #[fuchsia::test]
827 async fn test_format_timestamp_filter() {
828 let symbolizer = NoOpSymbolizer {};
830 let buffers = TestBuffers::default();
831 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
832 let mut formatter = DefaultLogFormatter::new(
833 LogFilterCriteria::default(),
834 stdout,
835 LogFormatterOptions {
836 since: Some(DeviceOrLocalTimestamp {
837 timestamp: Timestamp::from_nanos(1),
838 is_boot: true,
839 }),
840 until: Some(DeviceOrLocalTimestamp {
841 timestamp: Timestamp::from_nanos(3),
842 is_boot: true,
843 }),
844 ..Default::default()
845 },
846 );
847 formatter.set_boot_timestamp(Timestamp::from_nanos(0));
848
849 let (sender, receiver) = zx::Socket::create_stream();
850 let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
851 moniker: "ffx".try_into().unwrap(),
852 timestamp: Timestamp::from_nanos(0),
853 component_url: Some("ffx".into()),
854 severity: Severity::Info,
855 })
856 .set_message("Hello world!")
857 .build();
858 let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
859 moniker: "ffx".try_into().unwrap(),
860 timestamp: Timestamp::from_nanos(1),
861 component_url: Some("ffx".into()),
862 severity: Severity::Info,
863 })
864 .set_message("Hello world 2!")
865 .build();
866 let target_log_2 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
867 moniker: "ffx".try_into().unwrap(),
868 timestamp: Timestamp::from_nanos(2),
869 component_url: Some("ffx".into()),
870 severity: Severity::Info,
871 })
872 .set_pid(1)
873 .set_tid(2)
874 .set_message("Hello world 3!")
875 .build();
876 let target_log_3 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
877 moniker: "ffx".try_into().unwrap(),
878 timestamp: Timestamp::from_nanos(3),
879 component_url: Some("ffx".into()),
880 severity: Severity::Info,
881 })
882 .set_message("Hello world 4!")
883 .set_pid(1)
884 .set_tid(2)
885 .build();
886 sender
887 .write(
888 serde_json::to_string(&vec![
889 &target_log_0,
890 &target_log_1,
891 &target_log_2,
892 &target_log_3,
893 ])
894 .unwrap()
895 .as_bytes(),
896 )
897 .expect("failed to write target log");
898 drop(sender);
899 assert_matches!(
900 dump_logs_from_socket(
901 flex_client::socket_to_async(receiver),
902 &mut formatter,
903 &symbolizer,
904 )
905 .await,
906 Ok(LogProcessingResult::Exit)
907 );
908 assert_eq!(
909 buffers.stdout.into_string(),
910 "[00000.000000][1][2][ffx] INFO: Hello world 3!\n"
911 );
912 }
913
914 fn make_log_with_timestamp(timestamp: i64) -> LogsData {
915 LogsDataBuilder::new(diagnostics_data::BuilderArgs {
916 moniker: "ffx".try_into().unwrap(),
917 timestamp: Timestamp::from_nanos(timestamp),
918 component_url: Some("ffx".into()),
919 severity: Severity::Info,
920 })
921 .set_message(format!("Hello world {timestamp}!"))
922 .set_pid(1)
923 .set_tid(2)
924 .build()
925 }
926
927 #[fuchsia::test]
928 async fn test_format_timestamp_filter_utc() {
929 let symbolizer = NoOpSymbolizer {};
931 let buffers = TestBuffers::default();
932 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
933 let mut formatter = DefaultLogFormatter::new(
934 LogFilterCriteria::default(),
935 stdout,
936 LogFormatterOptions {
937 since: Some(DeviceOrLocalTimestamp {
938 timestamp: Timestamp::from_nanos(1),
939 is_boot: false,
940 }),
941 until: Some(DeviceOrLocalTimestamp {
942 timestamp: Timestamp::from_nanos(3),
943 is_boot: false,
944 }),
945 display: Some(LogTextDisplayOptions {
946 time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 1 },
947 ..Default::default()
948 }),
949 ..Default::default()
950 },
951 );
952 formatter.set_boot_timestamp(Timestamp::from_nanos(1));
953
954 let (sender, receiver) = zx::Socket::create_stream();
955 let logs = (0..4).map(make_log_with_timestamp).collect::<Vec<_>>();
956 sender
957 .write(serde_json::to_string(&logs).unwrap().as_bytes())
958 .expect("failed to write target log");
959 drop(sender);
960 assert_matches!(
961 dump_logs_from_socket(
962 flex_client::socket_to_async(receiver),
963 &mut formatter,
964 &symbolizer,
965 )
966 .await,
967 Ok(LogProcessingResult::Exit)
968 );
969 assert_eq!(
970 buffers.stdout.into_string(),
971 "[1970-01-01 00:00:00.000][1][2][ffx] INFO: Hello world 1!\n"
972 );
973 }
974
975 fn logs_data_builder() -> LogsDataBuilder {
976 diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
977 timestamp: Timestamp::from_nanos(default_ts().as_nanos() as i64),
978 component_url: Some("component_url".into()),
979 moniker: "some/moniker".try_into().unwrap(),
980 severity: diagnostics_data::Severity::Warn,
981 })
982 .set_pid(1)
983 .set_tid(2)
984 }
985
986 fn default_ts() -> Duration {
987 Duration::from_nanos(DEFAULT_TS_NANOS)
988 }
989
990 fn log_entry() -> LogEntry {
991 LogEntry {
992 data: LogData::TargetLog(
993 logs_data_builder().add_tag("tag1").add_tag("tag2").set_message("message").build(),
994 ),
995 }
996 }
997
998 #[fuchsia::test]
999 async fn test_default_formatter() {
1000 let buffers = TestBuffers::default();
1001 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1002 let options = LogFormatterOptions::default();
1003 let mut formatter =
1004 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1005 formatter.push_log(log_entry()).await.unwrap();
1006 drop(formatter);
1007 assert_eq!(
1008 buffers.into_stdout_str(),
1009 "[1615535969.000000][1][2][some/moniker][tag1,tag2] WARN: message\n"
1010 );
1011 }
1012
1013 #[fuchsia::test]
1014 async fn test_default_formatter_with_hidden_metadata() {
1015 let buffers = TestBuffers::default();
1016 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1017 let options = LogFormatterOptions {
1018 display: Some(LogTextDisplayOptions { show_metadata: false, ..Default::default() }),
1019 ..LogFormatterOptions::default()
1020 };
1021 let mut formatter =
1022 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1023 formatter.push_log(log_entry()).await.unwrap();
1024 drop(formatter);
1025 assert_eq!(
1026 buffers.into_stdout_str(),
1027 "[1615535969.000000][some/moniker][tag1,tag2] WARN: message\n"
1028 );
1029 }
1030
1031 #[fuchsia::test]
1032 async fn test_default_formatter_with_json() {
1033 let buffers = TestBuffers::default();
1034 let stdout = JsonWriter::<LogEntry>::new_test(Some(Format::Json), &buffers);
1035 let options = LogFormatterOptions { display: None, ..Default::default() };
1036 {
1037 let mut formatter =
1038 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1039 formatter.push_log(log_entry()).await.unwrap();
1040 }
1041 assert_eq!(
1042 serde_json::from_str::<LogEntry>(&buffers.into_stdout_str()).unwrap(),
1043 log_entry()
1044 );
1045 }
1046
1047 fn emit_log(sender: &mut zx::Socket, msg: &str, timestamp: i64) -> Data<Logs> {
1048 let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
1049 moniker: "ffx".try_into().unwrap(),
1050 timestamp: Timestamp::from_nanos(timestamp),
1051 component_url: Some("ffx".into()),
1052 severity: Severity::Info,
1053 })
1054 .set_message(msg)
1055 .build();
1056
1057 sender
1058 .write(serde_json::to_string(&target_log).unwrap().as_bytes())
1059 .expect("failed to write target log");
1060 target_log
1061 }
1062
1063 #[fuchsia::test]
1064 async fn test_default_formatter_discards_when_told_by_symbolizer() {
1065 let mut formatter = FakeFormatter::new();
1066 let (mut sender, receiver) = zx::Socket::create_stream();
1067 let mut target_log_0 = emit_log(&mut sender, "Hello world!", 0);
1068 emit_log(&mut sender, "Dropped world!", 1);
1069 let mut target_log_2 = emit_log(&mut sender, "Hello world!", 2);
1070 emit_log(&mut sender, "Dropped world!", 3);
1071 let mut target_log_4 = emit_log(&mut sender, "Hello world!", 4);
1072 drop(sender);
1073 let symbolizer = FakeSymbolizerCallback::new();
1075 *target_log_0.msg_mut().unwrap() = "symbolized log".into();
1076 *target_log_2.msg_mut().unwrap() = "symbolized log".into();
1077 *target_log_4.msg_mut().unwrap() = "symbolized log".into();
1078 dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
1079 .await
1080 .unwrap();
1081 assert_eq!(
1082 formatter.logs,
1083 vec![
1084 LogEntry { data: LogData::TargetLog(target_log_0) },
1085 LogEntry { data: LogData::TargetLog(target_log_2) },
1086 LogEntry { data: LogData::TargetLog(target_log_4) }
1087 ],
1088 );
1089 }
1090
1091 #[fuchsia::test]
1092 async fn test_symbolized_output() {
1093 let symbolizer = FakeFuchsiaSymbolizer;
1094 let buffers = TestBuffers::default();
1095 let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
1096 let mut formatter = DefaultLogFormatter::new(
1097 LogFilterCriteria::default(),
1098 output,
1099 LogFormatterOptions { ..Default::default() },
1100 );
1101 formatter.set_boot_timestamp(Timestamp::from_nanos(0));
1102 let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
1103 moniker: "ffx".try_into().unwrap(),
1104 timestamp: Timestamp::from_nanos(0),
1105 component_url: Some("ffx".into()),
1106 severity: Severity::Info,
1107 })
1108 .set_pid(1)
1109 .set_tid(2)
1110 .set_message("Hello world!")
1111 .build();
1112 let (sender, receiver) = zx::Socket::create_stream();
1113 sender
1114 .write(serde_json::to_string(&target_log).unwrap().as_bytes())
1115 .expect("failed to write target log");
1116 drop(sender);
1117 dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
1118 .await
1119 .unwrap();
1120 assert_eq!(buffers.stdout.into_string(), "[00000.000000][1][2][ffx] INFO: Fuchsia\n");
1121 }
1122
1123 #[test]
1124 fn test_device_or_local_timestamp_returns_none_if_now_is_passed() {
1125 assert_matches!(DeviceOrLocalTimestamp::new(Some(&parse_time("now").unwrap()), None), None);
1126 }
1127
1128 struct BrokenPipeWriter;
1129 impl std::io::Write for BrokenPipeWriter {
1130 fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1131 Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1132 }
1133
1134 fn flush(&mut self) -> std::io::Result<()> {
1135 Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1136 }
1137 }
1138
1139 impl ToolIO for BrokenPipeWriter {
1140 type OutputItem = LogEntry;
1141 fn is_machine(&self) -> bool {
1142 false
1143 }
1144
1145 fn stderr(&mut self) -> &mut dyn std::io::Write {
1146 self
1147 }
1148
1149 fn item(&mut self, _value: &Self::OutputItem) -> writer::Result<()> {
1150 Err(writer::Error::Io(std::io::Error::new(
1151 std::io::ErrorKind::BrokenPipe,
1152 "broken pipe",
1153 )))
1154 }
1155 }
1156
1157 #[fuchsia::test]
1158 async fn test_default_formatter_exits_on_broken_pipe() {
1159 let stdout = BrokenPipeWriter;
1160 let options = LogFormatterOptions::default();
1161 let mut formatter =
1162 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1163 let result = formatter.push_log(log_entry()).await;
1164 assert_matches!(result, Ok(LogProcessingResult::Exit));
1165 }
1166
1167 #[test]
1168 fn test_formatter_error_is_broken_pipe() {
1169 assert!(
1170 FormatterError::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1171 .is_broken_pipe()
1172 );
1173 assert!(!FormatterError::IO(std::io::Error::other("other")).is_broken_pipe());
1174 assert!(!FormatterError::Other(anyhow::anyhow!("other")).is_broken_pipe());
1175 }
1176
1177 #[cfg(not(feature = "fdomain"))]
1178 #[fuchsia::test]
1179 async fn test_json_and_fxt_output_identical() {
1180 use diagnostics_data::ExtendedMoniker;
1181 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
1182 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, MONIKER, Record, URL};
1183 use diagnostics_message::MonikerWithUrl;
1184 use flyweights::FlyStr;
1185 use zerocopy::{FromBytes, IntoBytes};
1186
1187 let symbolizer = NoOpSymbolizer {};
1188 let options = LogFormatterOptions::default();
1189
1190 let fn_encode = |record: Record<'_>, tag: u32| -> Vec<u8> {
1191 let mut encoder = Encoder::new(
1192 std::io::Cursor::new(ResizableBuffer::from(Vec::new())),
1193 EncoderOpts::default(),
1194 );
1195 encoder.write_record(record).unwrap();
1196 let mut bytes = encoder.take().into_inner().into_inner();
1197 let mut header = Header::read_from_bytes(&bytes[0..8]).unwrap();
1198 header.set_tag(tag);
1199 bytes[0..8].copy_from_slice(header.as_bytes());
1200 bytes
1201 };
1202
1203 let manifest_bytes = fn_encode(
1204 Record {
1205 timestamp: zx::BootInstant::from_nanos(0),
1206 severity: 0x30, arguments: vec![
1208 Argument::other(MONIKER, "core/foo"),
1209 Argument::other(URL, "fuchsia-pkg://foo"),
1210 ],
1211 },
1212 1 | LOG_CONTROL_BIT,
1213 );
1214
1215 let log_bytes1 = fn_encode(
1216 Record {
1217 timestamp: zx::BootInstant::from_nanos(123456),
1218 severity: 0x30, arguments: vec![
1220 Argument::pid(zx::Koid::from_raw(1000)),
1221 Argument::tid(zx::Koid::from_raw(2000)),
1222 Argument::tag("my_tag"),
1223 Argument::message("Hello identical world!"),
1224 ],
1225 },
1226 1,
1227 );
1228
1229 let log_bytes2 = fn_encode(
1230 Record {
1231 timestamp: zx::BootInstant::from_nanos(123457),
1232 severity: 0x40, arguments: vec![
1234 Argument::file("src/main.rs"),
1235 Argument::line(42),
1236 Argument::dropped(5),
1237 Argument::message("Warning with source location and dropped count"),
1238 ],
1239 },
1240 1,
1241 );
1242
1243 let log_bytes3_signed = fn_encode(
1244 Record {
1245 timestamp: zx::BootInstant::from_nanos(123458),
1246 severity: 0x50, arguments: vec![
1248 Argument::message("Error with custom signed int"),
1249 Argument::new("signed_val", -12345i64),
1250 ],
1251 },
1252 1,
1253 );
1254
1255 let log_bytes3_unsigned = fn_encode(
1256 Record {
1257 timestamp: zx::BootInstant::from_nanos(123458),
1258 severity: 0x50, arguments: vec![
1260 Argument::message("Error with custom unsigned int"),
1261 Argument::new("unsigned_val", 67890u64),
1262 ],
1263 },
1264 1,
1265 );
1266
1267 let log_bytes3_bool = fn_encode(
1268 Record {
1269 timestamp: zx::BootInstant::from_nanos(123458),
1270 severity: 0x50, arguments: vec![
1272 Argument::message("Error with custom boolean"),
1273 Argument::new("bool_val", true),
1274 ],
1275 },
1276 1,
1277 );
1278
1279 let log_bytes3_str = fn_encode(
1280 Record {
1281 timestamp: zx::BootInstant::from_nanos(123458),
1282 severity: 0x50, arguments: vec![
1284 Argument::message("Error with custom string"),
1285 Argument::new("string_val", "custom string"),
1286 ],
1287 },
1288 1,
1289 );
1290
1291 let log_bytes4_pi = fn_encode(
1292 Record {
1293 timestamp: zx::BootInstant::from_nanos(123459),
1294 severity: 0x60, arguments: vec![
1296 Argument::message("Fatal with float pi"),
1297 Argument::new("float_pi", std::f64::consts::PI),
1298 ],
1299 },
1300 1,
1301 );
1302
1303 let log_bytes4_zero = fn_encode(
1304 Record {
1305 timestamp: zx::BootInstant::from_nanos(123459),
1306 severity: 0x60, arguments: vec![
1308 Argument::message("Fatal with float zero"),
1309 Argument::new("float_zero", 0.0f64),
1310 ],
1311 },
1312 1,
1313 );
1314
1315 let log_bytes4_large = fn_encode(
1316 Record {
1317 timestamp: zx::BootInstant::from_nanos(123459),
1318 severity: 0x60, arguments: vec![
1320 Argument::message("Fatal with float large"),
1321 Argument::new("float_large", 123456.789f64),
1322 ],
1323 },
1324 1,
1325 );
1326
1327 let all_records = [
1328 &log_bytes1,
1329 &log_bytes2,
1330 &log_bytes3_signed,
1331 &log_bytes3_unsigned,
1332 &log_bytes3_bool,
1333 &log_bytes3_str,
1334 &log_bytes4_pi,
1335 &log_bytes4_zero,
1336 &log_bytes4_large,
1337 ];
1338
1339 let buffers_json = TestBuffers::default();
1341 let stdout_json = JsonWriter::<LogEntry>::new_test(None, &buffers_json);
1342 let mut formatter_json =
1343 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout_json, options.clone());
1344 formatter_json.set_boot_timestamp(Timestamp::from_nanos(0));
1345
1346 let source = MonikerWithUrl {
1347 moniker: ExtendedMoniker::parse_str("core/foo").unwrap(),
1348 url: FlyStr::new("fuchsia-pkg://foo"),
1349 };
1350
1351 let (sender_json, receiver_json) = zx::Socket::create_stream();
1352 let mut json_stream_bytes = Vec::new();
1353 for record_bytes in &all_records {
1354 let target_log =
1355 diagnostics_message::from_structured(source.clone(), record_bytes).unwrap();
1356 serde_json::to_writer(&mut json_stream_bytes, &target_log).unwrap();
1357 json_stream_bytes.push(b'\n');
1358 }
1359 sender_json.write(&json_stream_bytes).expect("failed to write target logs");
1360 drop(sender_json);
1361
1362 super::dump_logs_from_socket(
1363 flex_client::socket_to_async(receiver_json),
1364 &mut formatter_json,
1365 &symbolizer,
1366 false,
1367 )
1368 .await
1369 .unwrap();
1370
1371 let buffers_fxt = TestBuffers::default();
1373 let stdout_fxt = JsonWriter::<LogEntry>::new_test(None, &buffers_fxt);
1374 let mut formatter_fxt =
1375 DefaultLogFormatter::new(LogFilterCriteria::default(), stdout_fxt, options.clone());
1376 formatter_fxt.set_boot_timestamp(Timestamp::from_nanos(0));
1377
1378 let (sender_fxt, receiver_fxt) = zx::Socket::create_stream();
1379 sender_fxt.write(&manifest_bytes).unwrap();
1380 for record_bytes in &all_records {
1381 sender_fxt.write(record_bytes).unwrap();
1382 }
1383 drop(sender_fxt);
1384
1385 super::dump_fxt_logs_from_socket(
1386 flex_client::socket_to_async(receiver_fxt),
1387 &mut formatter_fxt,
1388 &symbolizer,
1389 false,
1390 )
1391 .await
1392 .unwrap();
1393
1394 assert_eq!(buffers_json.stdout.into_string(), buffers_fxt.stdout.into_string());
1396 }
1397
1398 #[fuchsia::test]
1399 async fn test_default_formatter_tail_limit() {
1400 let buffers = TestBuffers::default();
1401 let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1402 let options = LogFormatterOptions { tail: Some(2), ..Default::default() };
1403 let mut formatter = DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options);
1404
1405 let entry1 = LogEntry {
1406 data: LogData::TargetLog(
1407 logs_data_builder().add_tag("tag1").set_message("msg1").build(),
1408 ),
1409 };
1410 let entry2 = LogEntry {
1411 data: LogData::TargetLog(
1412 logs_data_builder().add_tag("tag1").set_message("msg2").build(),
1413 ),
1414 };
1415 let entry3 = LogEntry {
1416 data: LogData::TargetLog(
1417 logs_data_builder().add_tag("tag1").set_message("msg3").build(),
1418 ),
1419 };
1420
1421 formatter.push_log(entry1).await.unwrap();
1422 formatter.push_log(entry2).await.unwrap();
1423 formatter.push_log(entry3).await.unwrap();
1424
1425 let stdout_snapshot = buffers.stdout.clone().into_string();
1427 assert_eq!(stdout_snapshot, "");
1428
1429 formatter.flush().await.unwrap();
1430
1431 let output = buffers.into_stdout_str();
1433 assert!(!output.contains("msg1"));
1434 assert!(output.contains("msg2"));
1435 assert!(output.contains("msg3"));
1436 }
1437}