Skip to main content

log_command/
log_formatter.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::filter::LogFilterCriteria;
6use crate::log_socket_stream::{JsonDeserializeError, LogsDataStream};
7use crate::{
8    DetailedDateTime, InstanceGetter, LogCommand, LogError, LogProcessingResult, LogSubCommand,
9    TimeFormat,
10};
11use anyhow::Result;
12use async_trait::async_trait;
13use diagnostics_data::{
14    Data, LogTextColor, LogTextDisplayOptions, LogTextPresenter, LogTimeDisplayFormat, Logs,
15    LogsData, LogsDataBuilder, LogsField, LogsProperty, Severity, Timezone,
16};
17use futures_util::future::Either;
18use futures_util::stream::FuturesUnordered;
19use futures_util::{StreamExt, select};
20use serde::{Deserialize, Serialize};
21use std::fmt::Display;
22use std::io::Write;
23use std::time::Duration;
24use thiserror::Error;
25use writer::ToolIO;
26
27pub use diagnostics_data::Timestamp;
28
29pub const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S.%3f";
30
31/// Type of data in a log entry
32#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
33pub enum LogData {
34    /// A log entry from the target
35    TargetLog(LogsData),
36}
37
38impl LogData {
39    /// Gets the LogData as a target log.
40    pub fn as_target_log(&self) -> Option<&LogsData> {
41        match self {
42            LogData::TargetLog(log) => Some(log),
43        }
44    }
45
46    pub fn as_target_log_mut(&mut self) -> Option<&mut LogsData> {
47        match self {
48            LogData::TargetLog(log) => Some(log),
49        }
50    }
51}
52
53impl From<LogsData> for LogData {
54    fn from(data: LogsData) -> Self {
55        Self::TargetLog(data)
56    }
57}
58
59/// A log entry from either the host, target, or
60/// a symbolized log.
61#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
62pub struct LogEntry {
63    /// The log
64    pub data: LogData,
65}
66
67impl LogEntry {
68    fn utc_timestamp(&self, boot_ts: Option<Timestamp>) -> Timestamp {
69        Timestamp::from_nanos(match &self.data {
70            LogData::TargetLog(data) => {
71                data.metadata.timestamp.into_nanos()
72                    + boot_ts.map(|value| value.into_nanos()).unwrap_or(0)
73            }
74        })
75    }
76}
77
78// Required if we want to use ffx's built-in I/O, but
79// this isn't really applicable to us because we have
80// custom formatting rules.
81impl Display for LogEntry {
82    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        unreachable!("UNSUPPORTED -- This type cannot be formatted with std format.");
84    }
85}
86
87/// A trait for symbolizing log entries
88#[async_trait(?Send)]
89pub trait Symbolize {
90    /// Symbolizes a LogEntry and optionally produces a result.
91    /// The symbolizer may choose to discard the result.
92    /// This method may be called multiple times concurrently.
93    async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry>;
94}
95
96async fn handle_value<S>(one: Data<Logs>, symbolizer: &S) -> Option<LogEntry>
97where
98    S: Symbolize + ?Sized,
99{
100    let entry = LogEntry { data: one.into() };
101    symbolizer.symbolize(entry).await
102}
103
104fn generate_timestamp_message(boot_timestamp: Timestamp) -> LogEntry {
105    LogEntry {
106        data: LogData::TargetLog(
107            LogsDataBuilder::new(diagnostics_data::BuilderArgs {
108                moniker: "ffx".try_into().unwrap(),
109                timestamp: Timestamp::from_nanos(0),
110                component_url: Some("ffx".into()),
111                severity: Severity::Info,
112            })
113            .set_message("Logging started")
114            .add_key(LogsProperty::String(
115                LogsField::Other("utc_time_now".into()),
116                chrono::Utc::now().to_rfc3339(),
117            ))
118            .add_key(LogsProperty::Int(
119                LogsField::Other("current_boot_timestamp".to_string()),
120                boot_timestamp.into_nanos(),
121            ))
122            .build(),
123        ),
124    }
125}
126
127/// Reads logs from a socket and formats them using the given formatter and symbolizer.
128pub async fn dump_logs_from_socket<F, S>(
129    socket: flex_client::AsyncSocket,
130    formatter: &mut F,
131    symbolizer: &S,
132    include_timestamp: bool,
133) -> Result<LogProcessingResult, JsonDeserializeError>
134where
135    F: LogFormatter + BootTimeAccessor,
136    S: Symbolize + ?Sized,
137{
138    let mut decoder = Box::pin(LogsDataStream::new(socket).fuse());
139    let mut symbolize_pending = FuturesUnordered::new();
140    if include_timestamp && !formatter.is_utc_time_format() {
141        formatter.push_log(generate_timestamp_message(formatter.get_boot_timestamp())).await?;
142    }
143    while let Some(value) = select! {
144        res = decoder.next() => Some(Either::Left(res)),
145        res = symbolize_pending.next() => Some(Either::Right(res)),
146        complete => None,
147    } {
148        match value {
149            Either::Left(Some(result)) => match result {
150                Ok(log) => symbolize_pending.push(handle_value(log, symbolizer)),
151                Err(e) => return Err(e),
152            },
153            Either::Right(Some(Some(symbolized))) => match formatter.push_log(symbolized).await? {
154                LogProcessingResult::Exit => {
155                    formatter.flush().await?;
156                    return Ok(LogProcessingResult::Exit);
157                }
158                LogProcessingResult::Continue => {}
159            },
160            _ => {}
161        }
162    }
163    formatter.flush().await?;
164    Ok(LogProcessingResult::Continue)
165}
166
167#[cfg(not(feature = "fdomain"))]
168/// Reads FXT logs from a socket and formats them using the given formatter and symbolizer.
169pub async fn dump_fxt_logs_from_socket<F, S>(
170    socket: flex_client::AsyncSocket,
171    formatter: &mut F,
172    symbolizer: &S,
173    include_timestamp: bool,
174) -> Result<LogProcessingResult, LogError>
175where
176    F: LogFormatter + BootTimeAccessor,
177    S: Symbolize + ?Sized,
178{
179    let mut streamer = crate::fxt_streamer::FxtStreamer::new(socket);
180    let mut decoder = std::pin::pin!(streamer.stream());
181    let mut symbolize_pending = FuturesUnordered::new();
182    if include_timestamp && !formatter.is_utc_time_format() {
183        formatter.push_log(generate_timestamp_message(formatter.get_boot_timestamp())).await?;
184    }
185    while let Some(value) = select! {
186        res = decoder.next() => Some(Either::Left(res)),
187        res = symbolize_pending.next() => Some(Either::Right(res)),
188        complete => None,
189    } {
190        match value {
191            Either::Left(Some(result)) => match result {
192                Ok(log) => symbolize_pending.push(handle_value(log, symbolizer)),
193                Err(e) => return Err(e),
194            },
195            Either::Right(Some(Some(symbolized))) => match formatter.push_log(symbolized).await? {
196                LogProcessingResult::Exit => {
197                    formatter.flush().await?;
198                    return Ok(LogProcessingResult::Exit);
199                }
200                LogProcessingResult::Continue => {}
201            },
202            _ => {}
203        }
204    }
205    formatter.flush().await?;
206    Ok(LogProcessingResult::Continue)
207}
208
209pub trait BootTimeAccessor {
210    /// Sets the boot timestamp in nanoseconds since the Unix epoch.
211    fn set_boot_timestamp(&mut self, _boot_ts_nanos: Timestamp);
212
213    /// Returns the boot timestamp in nanoseconds since the Unix epoch.
214    fn get_boot_timestamp(&self) -> Timestamp;
215}
216
217/// Timestamp filter which is either either boot-based or UTC-based.
218#[derive(Clone, Debug)]
219pub struct DeviceOrLocalTimestamp {
220    /// Timestamp in boot time
221    pub timestamp: Timestamp,
222    /// True if this filter should be applied to boot time,
223    /// false if UTC time.
224    pub is_boot: bool,
225}
226
227impl DeviceOrLocalTimestamp {
228    /// Creates a DeviceOrLocalTimestamp from a real-time date/time or
229    /// a boot date/time. Returns None if both rtc and boot are None.
230    /// Returns None if the timestamp is "now".
231    pub fn new(
232        rtc: Option<&DetailedDateTime>,
233        boot: Option<&Duration>,
234    ) -> Option<DeviceOrLocalTimestamp> {
235        rtc.as_ref()
236            .filter(|value| !value.is_now)
237            .map(|value| DeviceOrLocalTimestamp {
238                timestamp: Timestamp::from_nanos(
239                    value.naive_utc().and_utc().timestamp_nanos_opt().unwrap(),
240                ),
241                is_boot: false,
242            })
243            .or_else(|| {
244                boot.map(|value| DeviceOrLocalTimestamp {
245                    timestamp: Timestamp::from_nanos(value.as_nanos() as i64),
246                    is_boot: true,
247                })
248            })
249    }
250}
251
252/// Log formatter options
253#[derive(Clone, Debug)]
254pub struct LogFormatterOptions {
255    /// Text display options
256    pub display: Option<LogTextDisplayOptions>,
257    /// Only display logs since the specified time.
258    pub since: Option<DeviceOrLocalTimestamp>,
259    /// Only display logs until the specified time.
260    pub until: Option<DeviceOrLocalTimestamp>,
261    /// Only display the last N log lines.
262    pub tail: Option<usize>,
263}
264
265impl Default for LogFormatterOptions {
266    fn default() -> Self {
267        LogFormatterOptions {
268            display: Some(Default::default()),
269            since: None,
270            until: None,
271            tail: None,
272        }
273    }
274}
275
276/// Log formatter error
277#[derive(Error, Debug)]
278pub enum FormatterError {
279    /// An unknown error occurred
280    #[error(transparent)]
281    Other(#[from] anyhow::Error),
282    /// An IO error occurred
283    #[error(transparent)]
284    IO(#[from] std::io::Error),
285}
286
287impl FormatterError {
288    pub fn is_broken_pipe(&self) -> bool {
289        match self {
290            FormatterError::IO(error) => error.kind() == std::io::ErrorKind::BrokenPipe,
291            FormatterError::Other(_) => false,
292        }
293    }
294}
295
296/// Default formatter implementation
297pub struct DefaultLogFormatter<W>
298where
299    W: Write + ToolIO<OutputItem = LogEntry>,
300{
301    writer: W,
302    filters: LogFilterCriteria,
303    options: LogFormatterOptions,
304    boot_ts_nanos: Option<Timestamp>,
305    tail_queue: std::collections::VecDeque<LogEntry>,
306    tail_limit: Option<usize>,
307}
308
309/// Converts from UTC time to boot time.
310fn utc_to_boot(boot_ts: Timestamp, utc: Timestamp) -> Timestamp {
311    Timestamp::from_nanos(utc.into_nanos() - boot_ts.into_nanos())
312}
313
314#[async_trait(?Send)]
315impl<W> LogFormatter for DefaultLogFormatter<W>
316where
317    W: Write + ToolIO<OutputItem = LogEntry>,
318{
319    async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
320        self.push_log_internal(log_entry, true).await.or_else(|err| {
321            if err.is_broken_pipe() { Ok(LogProcessingResult::Exit) } else { Err(err) }
322        })
323    }
324
325    fn is_utc_time_format(&self) -> bool {
326        self.options.display.iter().any(|options| match options.time_format {
327            LogTimeDisplayFormat::Original => false,
328            LogTimeDisplayFormat::WallTime { tz, offset: _ } => tz == Timezone::Utc,
329        })
330    }
331
332    async fn flush(&mut self) -> Result<(), LogError> {
333        self.flush_tail().await
334    }
335}
336
337impl<W> BootTimeAccessor for DefaultLogFormatter<W>
338where
339    W: Write + ToolIO<OutputItem = LogEntry>,
340{
341    fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
342        if let Some(LogTextDisplayOptions {
343            time_format: LogTimeDisplayFormat::WallTime { offset, .. },
344            ..
345        }) = &mut self.options.display
346        {
347            *offset = boot_ts_nanos.into_nanos();
348        }
349        self.boot_ts_nanos = Some(boot_ts_nanos);
350    }
351    fn get_boot_timestamp(&self) -> Timestamp {
352        debug_assert!(self.boot_ts_nanos.is_some());
353        self.boot_ts_nanos.unwrap_or_else(|| Timestamp::from_nanos(0))
354    }
355}
356
357/// Object which contains a Writer that can be borrowed
358pub trait WriterContainer<W>
359where
360    W: Write + ToolIO<OutputItem = LogEntry>,
361{
362    fn writer(&mut self) -> &mut W;
363}
364
365impl<W> WriterContainer<W> for DefaultLogFormatter<W>
366where
367    W: Write + ToolIO<OutputItem = LogEntry>,
368{
369    fn writer(&mut self) -> &mut W {
370        &mut self.writer
371    }
372}
373
374impl<W> DefaultLogFormatter<W>
375where
376    W: Write + ToolIO<OutputItem = LogEntry>,
377{
378    /// Creates a new DefaultLogFormatter with the given writer and options.
379    pub fn new(filters: LogFilterCriteria, writer: W, options: LogFormatterOptions) -> Self {
380        let tail_limit = options.tail;
381        Self {
382            filters,
383            writer,
384            options,
385            boot_ts_nanos: None,
386            tail_queue: std::collections::VecDeque::new(),
387            tail_limit,
388        }
389    }
390
391    pub async fn expand_monikers(&mut self, getter: &impl InstanceGetter) -> Result<(), LogError> {
392        self.filters.expand_monikers(getter).await
393    }
394
395    pub async fn push_unfiltered_log(
396        &mut self,
397        log_entry: LogEntry,
398    ) -> Result<LogProcessingResult, LogError> {
399        self.push_log_internal(log_entry, false).await
400    }
401
402    async fn flush_tail(&mut self) -> Result<(), LogError> {
403        while let Some(log_entry) = self.tail_queue.pop_front() {
404            self.write_log_entry(log_entry, true)?;
405        }
406        Ok(())
407    }
408
409    fn write_log_entry(
410        &mut self,
411        log_entry: LogEntry,
412        enable_filters: bool,
413    ) -> Result<(), LogError> {
414        match self.options.display {
415            Some(text_options) => {
416                let mut options_for_this_line_only = self.options.clone();
417                options_for_this_line_only.display = Some(text_options);
418                // For host logs, don't apply the boot time offset
419                // as this is with reference to the UTC timeline
420                if !enable_filters
421                    && let LogTimeDisplayFormat::WallTime { ref mut offset, .. } =
422                        options_for_this_line_only.display.as_mut().unwrap().time_format
423                {
424                    // 1 nanosecond so that LogTimeDisplayFormat in diagnostics_data
425                    // knows that we have a valid UTC offset. It normally falls back if
426                    // the UTC offset is 0. It prints at millisecond precision so being
427                    // off by +1 nanosecond isn't an issue.
428                    *offset = 1;
429                }
430                self.format_text_log(options_for_this_line_only, log_entry)
431                    .map_err(LogError::FormatterError)?;
432            }
433            None => {
434                self.writer.item(&log_entry).map_err(|err| LogError::UnknownError(err.into()))?;
435            }
436        };
437        Ok(())
438    }
439
440    async fn push_log_internal(
441        &mut self,
442        log_entry: LogEntry,
443        enable_filters: bool,
444    ) -> Result<LogProcessingResult, LogError> {
445        if enable_filters {
446            if self.filter_by_timestamp(&log_entry, self.options.since.as_ref(), |a, b| a <= b) {
447                return Ok(LogProcessingResult::Continue);
448            }
449
450            if self.filter_by_timestamp(&log_entry, self.options.until.as_ref(), |a, b| a >= b) {
451                return Ok(LogProcessingResult::Exit);
452            }
453
454            if !self.filters.matches(&log_entry) {
455                return Ok(LogProcessingResult::Continue);
456            }
457        }
458
459        if let Some(limit) = self.tail_limit {
460            self.tail_queue.push_back(log_entry);
461            if self.tail_queue.len() > limit {
462                self.tail_queue.pop_front();
463            }
464        } else {
465            self.write_log_entry(log_entry, enable_filters)?;
466        }
467        Ok(LogProcessingResult::Continue)
468    }
469
470    /// Creates a new DefaultLogFormatter from command-line arguments.
471    pub fn new_from_args(cmd: &LogCommand, writer: W) -> Result<Self, LogError> {
472        let is_json = writer.is_machine();
473
474        Ok(DefaultLogFormatter::new(
475            LogFilterCriteria::try_from(cmd.clone())?,
476            writer,
477            LogFormatterOptions {
478                display: if is_json {
479                    None
480                } else {
481                    Some(LogTextDisplayOptions {
482                        show_tags: !cmd.hide_tags,
483                        color: if cmd.no_color {
484                            LogTextColor::None
485                        } else {
486                            LogTextColor::BySeverity
487                        },
488                        show_metadata: cmd.show_metadata,
489                        time_format: match cmd.clock {
490                            TimeFormat::Boot => LogTimeDisplayFormat::Original,
491                            TimeFormat::Local => LogTimeDisplayFormat::WallTime {
492                                tz: Timezone::Local,
493                                // This will receive a correct value when logging actually starts,
494                                // see `set_boot_timestamp()` method on the log formatter.
495                                offset: 0,
496                            },
497                            TimeFormat::Utc => LogTimeDisplayFormat::WallTime {
498                                tz: Timezone::Utc,
499                                // This will receive a correct value when logging actually starts,
500                                // see `set_boot_timestamp()` method on the log formatter.
501                                offset: 0,
502                            },
503                        },
504                        show_file: !cmd.hide_file,
505                        show_moniker: !cmd.hide_moniker,
506                        show_full_moniker: cmd.show_full_moniker,
507                        prefer_url_component_name: cmd.prefer_url_component_name,
508                    })
509                },
510                since: DeviceOrLocalTimestamp::new(cmd.since.as_ref(), cmd.since_boot.as_ref()),
511                until: DeviceOrLocalTimestamp::new(cmd.until.as_ref(), cmd.until_boot.as_ref()),
512                tail: match &cmd.sub_command {
513                    Some(LogSubCommand::Dump(dump)) => dump.tail,
514                    _ => None,
515                },
516            },
517        ))
518    }
519
520    fn filter_by_timestamp(
521        &self,
522        log_entry: &LogEntry,
523        timestamp: Option<&DeviceOrLocalTimestamp>,
524        callback: impl Fn(&Timestamp, &Timestamp) -> bool,
525    ) -> bool {
526        let Some(timestamp) = timestamp else {
527            return false;
528        };
529        if timestamp.is_boot {
530            callback(
531                &utc_to_boot(
532                    self.get_boot_timestamp(),
533                    log_entry.utc_timestamp(self.boot_ts_nanos),
534                ),
535                &timestamp.timestamp,
536            )
537        } else {
538            callback(&log_entry.utc_timestamp(self.boot_ts_nanos), &timestamp.timestamp)
539        }
540    }
541
542    // This function's arguments are copied to make lifetimes in push_log easier since borrowing
543    // &self would complicate spam highlighting.
544    fn format_text_log(
545        &mut self,
546        options: LogFormatterOptions,
547        log_entry: LogEntry,
548    ) -> Result<(), FormatterError> {
549        let text_options = match options.display {
550            Some(o) => o,
551            None => {
552                unreachable!("If we are here, we can only be formatting text");
553            }
554        };
555        match log_entry {
556            LogEntry { data: LogData::TargetLog(data), .. } => {
557                // TODO(https://fxbug.dev/42072442): Add support for log spam redaction and other
558                // features listed in the design doc.
559                writeln!(self.writer, "{}", LogTextPresenter::new(&data, text_options))?;
560            }
561        }
562        Ok(())
563    }
564}
565
566#[allow(dead_code)] // TODO(https://fxbug.dev/421409178)
567/// Symbolizer that does nothing.
568pub struct NoOpSymbolizer;
569
570#[async_trait(?Send)]
571impl Symbolize for NoOpSymbolizer {
572    async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry> {
573        Some(entry)
574    }
575}
576
577/// Trait for formatting logs one at a time.
578#[async_trait(?Send)]
579pub trait LogFormatter {
580    /// Formats a log entry and writes it to the output.
581    async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError>;
582
583    /// Returns true if the formatter is configured to output in UTC time format.
584    fn is_utc_time_format(&self) -> bool;
585
586    /// Flushes any buffered logs.
587    async fn flush(&mut self) -> Result<(), LogError> {
588        Ok(())
589    }
590}
591
592#[cfg(test)]
593mod test {
594    use crate::parse_time;
595    use assert_matches::assert_matches;
596    use diagnostics_data::{LogsDataBuilder, Severity};
597    use std::cell::Cell;
598    use writer::{Format, JsonWriter, TestBuffers};
599
600    use super::*;
601
602    const DEFAULT_TS_NANOS: u64 = 1615535969000000000;
603
604    struct FakeFormatter {
605        logs: Vec<LogEntry>,
606        boot_timestamp: Timestamp,
607        is_utc_time_format: bool,
608    }
609
610    impl FakeFormatter {
611        fn new() -> Self {
612            Self {
613                logs: Vec::new(),
614                boot_timestamp: Timestamp::from_nanos(0),
615                is_utc_time_format: false,
616            }
617        }
618    }
619
620    impl BootTimeAccessor for FakeFormatter {
621        fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
622            self.boot_timestamp = boot_ts_nanos;
623        }
624
625        fn get_boot_timestamp(&self) -> Timestamp {
626            self.boot_timestamp
627        }
628    }
629
630    #[async_trait(?Send)]
631    impl LogFormatter for FakeFormatter {
632        async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
633            self.logs.push(log_entry);
634            Ok(LogProcessingResult::Continue)
635        }
636
637        fn is_utc_time_format(&self) -> bool {
638            self.is_utc_time_format
639        }
640    }
641
642    /// Symbolizer that prints "Fuchsia".
643    pub struct FakeFuchsiaSymbolizer;
644
645    fn set_log_msg(entry: &mut LogEntry, msg: impl Into<String>) {
646        *entry.data.as_target_log_mut().unwrap().msg_mut().unwrap() = msg.into();
647    }
648
649    #[async_trait(?Send)]
650    impl Symbolize for FakeFuchsiaSymbolizer {
651        async fn symbolize(&self, mut entry: LogEntry) -> Option<LogEntry> {
652            set_log_msg(&mut entry, "Fuchsia");
653            Some(entry)
654        }
655    }
656
657    struct FakeSymbolizerCallback {
658        should_discard: Cell<bool>,
659    }
660
661    impl FakeSymbolizerCallback {
662        fn new() -> Self {
663            Self { should_discard: Cell::new(true) }
664        }
665    }
666
667    async fn dump_logs_from_socket<F, S>(
668        socket: fuchsia_async::Socket,
669        formatter: &mut F,
670        symbolizer: &S,
671    ) -> Result<LogProcessingResult, JsonDeserializeError>
672    where
673        F: LogFormatter + BootTimeAccessor,
674        S: Symbolize + ?Sized,
675    {
676        super::dump_logs_from_socket(socket, formatter, symbolizer, false).await
677    }
678
679    #[async_trait(?Send)]
680    impl Symbolize for FakeSymbolizerCallback {
681        async fn symbolize(&self, mut input: LogEntry) -> Option<LogEntry> {
682            self.should_discard.set(!self.should_discard.get());
683            if self.should_discard.get() {
684                None
685            } else {
686                set_log_msg(&mut input, "symbolized log");
687                Some(input)
688            }
689        }
690    }
691
692    #[fuchsia::test]
693    async fn test_boot_timestamp_setter() {
694        let buffers = TestBuffers::default();
695        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
696        let options = LogFormatterOptions {
697            display: Some(LogTextDisplayOptions {
698                time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 0 },
699                ..Default::default()
700            }),
701            ..Default::default()
702        };
703        let mut formatter =
704            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
705        formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
706        assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
707
708        // Boot timestamp is supported when using JSON output (for filtering)
709        let buffers = TestBuffers::default();
710        let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
711        let options = LogFormatterOptions { display: None, ..Default::default() };
712        let mut formatter = DefaultLogFormatter::new(LogFilterCriteria::default(), output, options);
713        formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
714        assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
715    }
716
717    #[fuchsia::test]
718    async fn test_format_single_message() {
719        let symbolizer = NoOpSymbolizer {};
720        let mut formatter = FakeFormatter::new();
721        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
722            moniker: "ffx".try_into().unwrap(),
723            timestamp: Timestamp::from_nanos(0),
724            component_url: Some("ffx".into()),
725            severity: Severity::Info,
726        })
727        .set_message("Hello world!")
728        .build();
729        let (sender, receiver) = zx::Socket::create_stream();
730        sender
731            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
732            .expect("failed to write target log");
733        drop(sender);
734        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
735            .await
736            .unwrap();
737        assert_eq!(formatter.logs, vec![LogEntry { data: LogData::TargetLog(target_log) }]);
738    }
739
740    #[fuchsia::test]
741    async fn test_format_utc_timestamp() {
742        let symbolizer = NoOpSymbolizer {};
743        let mut formatter = FakeFormatter::new();
744        formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
745        let (_, receiver) = zx::Socket::create_stream();
746        super::dump_logs_from_socket(
747            flex_client::socket_to_async(receiver),
748            &mut formatter,
749            &symbolizer,
750            true,
751        )
752        .await
753        .unwrap();
754        let target_log = formatter.logs[0].data.as_target_log().unwrap();
755        let properties = target_log.payload_keys().unwrap();
756        assert_eq!(target_log.msg().unwrap(), "Logging started");
757
758        // Ensure the end has a valid timestamp
759        chrono::DateTime::parse_from_rfc3339(
760            properties.get_property("utc_time_now").unwrap().string().unwrap(),
761        )
762        .unwrap();
763        assert_eq!(
764            properties.get_property("current_boot_timestamp").unwrap().int().unwrap(),
765            DEFAULT_TS_NANOS as i64
766        );
767    }
768
769    #[fuchsia::test]
770    async fn test_format_utc_timestamp_does_not_print_if_utc_time() {
771        let symbolizer = NoOpSymbolizer {};
772        let mut formatter = FakeFormatter::new();
773        formatter.is_utc_time_format = true;
774        formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
775        let (_, receiver) = zx::Socket::create_stream();
776        super::dump_logs_from_socket(
777            flex_client::socket_to_async(receiver),
778            &mut formatter,
779            &symbolizer,
780            true,
781        )
782        .await
783        .unwrap();
784        assert_eq!(formatter.logs.len(), 0);
785    }
786
787    #[fuchsia::test]
788    async fn test_format_multiple_messages() {
789        let symbolizer = NoOpSymbolizer {};
790        let mut formatter = FakeFormatter::new();
791        let (sender, receiver) = zx::Socket::create_stream();
792        let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
793            moniker: "ffx".try_into().unwrap(),
794            timestamp: Timestamp::from_nanos(0),
795            component_url: Some("ffx".into()),
796            severity: Severity::Info,
797        })
798        .set_message("Hello world!")
799        .set_pid(1)
800        .set_tid(2)
801        .build();
802        let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
803            moniker: "ffx".try_into().unwrap(),
804            timestamp: Timestamp::from_nanos(1),
805            component_url: Some("ffx".into()),
806            severity: Severity::Info,
807        })
808        .set_message("Hello world 2!")
809        .build();
810        sender
811            .write(serde_json::to_string(&vec![&target_log_0, &target_log_1]).unwrap().as_bytes())
812            .expect("failed to write target log");
813        drop(sender);
814        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
815            .await
816            .unwrap();
817        assert_eq!(
818            formatter.logs,
819            vec![
820                LogEntry { data: LogData::TargetLog(target_log_0) },
821                LogEntry { data: LogData::TargetLog(target_log_1) }
822            ]
823        );
824    }
825
826    #[fuchsia::test]
827    async fn test_format_timestamp_filter() {
828        // test since and until args for the LogFormatter
829        let symbolizer = NoOpSymbolizer {};
830        let buffers = TestBuffers::default();
831        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
832        let mut formatter = DefaultLogFormatter::new(
833            LogFilterCriteria::default(),
834            stdout,
835            LogFormatterOptions {
836                since: Some(DeviceOrLocalTimestamp {
837                    timestamp: Timestamp::from_nanos(1),
838                    is_boot: true,
839                }),
840                until: Some(DeviceOrLocalTimestamp {
841                    timestamp: Timestamp::from_nanos(3),
842                    is_boot: true,
843                }),
844                ..Default::default()
845            },
846        );
847        formatter.set_boot_timestamp(Timestamp::from_nanos(0));
848
849        let (sender, receiver) = zx::Socket::create_stream();
850        let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
851            moniker: "ffx".try_into().unwrap(),
852            timestamp: Timestamp::from_nanos(0),
853            component_url: Some("ffx".into()),
854            severity: Severity::Info,
855        })
856        .set_message("Hello world!")
857        .build();
858        let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
859            moniker: "ffx".try_into().unwrap(),
860            timestamp: Timestamp::from_nanos(1),
861            component_url: Some("ffx".into()),
862            severity: Severity::Info,
863        })
864        .set_message("Hello world 2!")
865        .build();
866        let target_log_2 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
867            moniker: "ffx".try_into().unwrap(),
868            timestamp: Timestamp::from_nanos(2),
869            component_url: Some("ffx".into()),
870            severity: Severity::Info,
871        })
872        .set_pid(1)
873        .set_tid(2)
874        .set_message("Hello world 3!")
875        .build();
876        let target_log_3 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
877            moniker: "ffx".try_into().unwrap(),
878            timestamp: Timestamp::from_nanos(3),
879            component_url: Some("ffx".into()),
880            severity: Severity::Info,
881        })
882        .set_message("Hello world 4!")
883        .set_pid(1)
884        .set_tid(2)
885        .build();
886        sender
887            .write(
888                serde_json::to_string(&vec![
889                    &target_log_0,
890                    &target_log_1,
891                    &target_log_2,
892                    &target_log_3,
893                ])
894                .unwrap()
895                .as_bytes(),
896            )
897            .expect("failed to write target log");
898        drop(sender);
899        assert_matches!(
900            dump_logs_from_socket(
901                flex_client::socket_to_async(receiver),
902                &mut formatter,
903                &symbolizer,
904            )
905            .await,
906            Ok(LogProcessingResult::Exit)
907        );
908        assert_eq!(
909            buffers.stdout.into_string(),
910            "[00000.000000][1][2][ffx] INFO: Hello world 3!\n"
911        );
912    }
913
914    fn make_log_with_timestamp(timestamp: i64) -> LogsData {
915        LogsDataBuilder::new(diagnostics_data::BuilderArgs {
916            moniker: "ffx".try_into().unwrap(),
917            timestamp: Timestamp::from_nanos(timestamp),
918            component_url: Some("ffx".into()),
919            severity: Severity::Info,
920        })
921        .set_message(format!("Hello world {timestamp}!"))
922        .set_pid(1)
923        .set_tid(2)
924        .build()
925    }
926
927    #[fuchsia::test]
928    async fn test_format_timestamp_filter_utc() {
929        // test since and until args for the LogFormatter
930        let symbolizer = NoOpSymbolizer {};
931        let buffers = TestBuffers::default();
932        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
933        let mut formatter = DefaultLogFormatter::new(
934            LogFilterCriteria::default(),
935            stdout,
936            LogFormatterOptions {
937                since: Some(DeviceOrLocalTimestamp {
938                    timestamp: Timestamp::from_nanos(1),
939                    is_boot: false,
940                }),
941                until: Some(DeviceOrLocalTimestamp {
942                    timestamp: Timestamp::from_nanos(3),
943                    is_boot: false,
944                }),
945                display: Some(LogTextDisplayOptions {
946                    time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 1 },
947                    ..Default::default()
948                }),
949                ..Default::default()
950            },
951        );
952        formatter.set_boot_timestamp(Timestamp::from_nanos(1));
953
954        let (sender, receiver) = zx::Socket::create_stream();
955        let logs = (0..4).map(make_log_with_timestamp).collect::<Vec<_>>();
956        sender
957            .write(serde_json::to_string(&logs).unwrap().as_bytes())
958            .expect("failed to write target log");
959        drop(sender);
960        assert_matches!(
961            dump_logs_from_socket(
962                flex_client::socket_to_async(receiver),
963                &mut formatter,
964                &symbolizer,
965            )
966            .await,
967            Ok(LogProcessingResult::Exit)
968        );
969        assert_eq!(
970            buffers.stdout.into_string(),
971            "[1970-01-01 00:00:00.000][1][2][ffx] INFO: Hello world 1!\n"
972        );
973    }
974
975    fn logs_data_builder() -> LogsDataBuilder {
976        diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
977            timestamp: Timestamp::from_nanos(default_ts().as_nanos() as i64),
978            component_url: Some("component_url".into()),
979            moniker: "some/moniker".try_into().unwrap(),
980            severity: diagnostics_data::Severity::Warn,
981        })
982        .set_pid(1)
983        .set_tid(2)
984    }
985
986    fn default_ts() -> Duration {
987        Duration::from_nanos(DEFAULT_TS_NANOS)
988    }
989
990    fn log_entry() -> LogEntry {
991        LogEntry {
992            data: LogData::TargetLog(
993                logs_data_builder().add_tag("tag1").add_tag("tag2").set_message("message").build(),
994            ),
995        }
996    }
997
998    #[fuchsia::test]
999    async fn test_default_formatter() {
1000        let buffers = TestBuffers::default();
1001        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1002        let options = LogFormatterOptions::default();
1003        let mut formatter =
1004            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1005        formatter.push_log(log_entry()).await.unwrap();
1006        drop(formatter);
1007        assert_eq!(
1008            buffers.into_stdout_str(),
1009            "[1615535969.000000][1][2][some/moniker][tag1,tag2] WARN: message\n"
1010        );
1011    }
1012
1013    #[fuchsia::test]
1014    async fn test_default_formatter_with_hidden_metadata() {
1015        let buffers = TestBuffers::default();
1016        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1017        let options = LogFormatterOptions {
1018            display: Some(LogTextDisplayOptions { show_metadata: false, ..Default::default() }),
1019            ..LogFormatterOptions::default()
1020        };
1021        let mut formatter =
1022            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1023        formatter.push_log(log_entry()).await.unwrap();
1024        drop(formatter);
1025        assert_eq!(
1026            buffers.into_stdout_str(),
1027            "[1615535969.000000][some/moniker][tag1,tag2] WARN: message\n"
1028        );
1029    }
1030
1031    #[fuchsia::test]
1032    async fn test_default_formatter_with_json() {
1033        let buffers = TestBuffers::default();
1034        let stdout = JsonWriter::<LogEntry>::new_test(Some(Format::Json), &buffers);
1035        let options = LogFormatterOptions { display: None, ..Default::default() };
1036        {
1037            let mut formatter =
1038                DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1039            formatter.push_log(log_entry()).await.unwrap();
1040        }
1041        assert_eq!(
1042            serde_json::from_str::<LogEntry>(&buffers.into_stdout_str()).unwrap(),
1043            log_entry()
1044        );
1045    }
1046
1047    fn emit_log(sender: &mut zx::Socket, msg: &str, timestamp: i64) -> Data<Logs> {
1048        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
1049            moniker: "ffx".try_into().unwrap(),
1050            timestamp: Timestamp::from_nanos(timestamp),
1051            component_url: Some("ffx".into()),
1052            severity: Severity::Info,
1053        })
1054        .set_message(msg)
1055        .build();
1056
1057        sender
1058            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
1059            .expect("failed to write target log");
1060        target_log
1061    }
1062
1063    #[fuchsia::test]
1064    async fn test_default_formatter_discards_when_told_by_symbolizer() {
1065        let mut formatter = FakeFormatter::new();
1066        let (mut sender, receiver) = zx::Socket::create_stream();
1067        let mut target_log_0 = emit_log(&mut sender, "Hello world!", 0);
1068        emit_log(&mut sender, "Dropped world!", 1);
1069        let mut target_log_2 = emit_log(&mut sender, "Hello world!", 2);
1070        emit_log(&mut sender, "Dropped world!", 3);
1071        let mut target_log_4 = emit_log(&mut sender, "Hello world!", 4);
1072        drop(sender);
1073        // Drop every other log.
1074        let symbolizer = FakeSymbolizerCallback::new();
1075        *target_log_0.msg_mut().unwrap() = "symbolized log".into();
1076        *target_log_2.msg_mut().unwrap() = "symbolized log".into();
1077        *target_log_4.msg_mut().unwrap() = "symbolized log".into();
1078        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
1079            .await
1080            .unwrap();
1081        assert_eq!(
1082            formatter.logs,
1083            vec![
1084                LogEntry { data: LogData::TargetLog(target_log_0) },
1085                LogEntry { data: LogData::TargetLog(target_log_2) },
1086                LogEntry { data: LogData::TargetLog(target_log_4) }
1087            ],
1088        );
1089    }
1090
1091    #[fuchsia::test]
1092    async fn test_symbolized_output() {
1093        let symbolizer = FakeFuchsiaSymbolizer;
1094        let buffers = TestBuffers::default();
1095        let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
1096        let mut formatter = DefaultLogFormatter::new(
1097            LogFilterCriteria::default(),
1098            output,
1099            LogFormatterOptions { ..Default::default() },
1100        );
1101        formatter.set_boot_timestamp(Timestamp::from_nanos(0));
1102        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
1103            moniker: "ffx".try_into().unwrap(),
1104            timestamp: Timestamp::from_nanos(0),
1105            component_url: Some("ffx".into()),
1106            severity: Severity::Info,
1107        })
1108        .set_pid(1)
1109        .set_tid(2)
1110        .set_message("Hello world!")
1111        .build();
1112        let (sender, receiver) = zx::Socket::create_stream();
1113        sender
1114            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
1115            .expect("failed to write target log");
1116        drop(sender);
1117        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
1118            .await
1119            .unwrap();
1120        assert_eq!(buffers.stdout.into_string(), "[00000.000000][1][2][ffx] INFO: Fuchsia\n");
1121    }
1122
1123    #[test]
1124    fn test_device_or_local_timestamp_returns_none_if_now_is_passed() {
1125        assert_matches!(DeviceOrLocalTimestamp::new(Some(&parse_time("now").unwrap()), None), None);
1126    }
1127
1128    struct BrokenPipeWriter;
1129    impl std::io::Write for BrokenPipeWriter {
1130        fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1131            Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1132        }
1133
1134        fn flush(&mut self) -> std::io::Result<()> {
1135            Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1136        }
1137    }
1138
1139    impl ToolIO for BrokenPipeWriter {
1140        type OutputItem = LogEntry;
1141        fn is_machine(&self) -> bool {
1142            false
1143        }
1144
1145        fn stderr(&mut self) -> &mut dyn std::io::Write {
1146            self
1147        }
1148
1149        fn item(&mut self, _value: &Self::OutputItem) -> writer::Result<()> {
1150            Err(writer::Error::Io(std::io::Error::new(
1151                std::io::ErrorKind::BrokenPipe,
1152                "broken pipe",
1153            )))
1154        }
1155    }
1156
1157    #[fuchsia::test]
1158    async fn test_default_formatter_exits_on_broken_pipe() {
1159        let stdout = BrokenPipeWriter;
1160        let options = LogFormatterOptions::default();
1161        let mut formatter =
1162            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1163        let result = formatter.push_log(log_entry()).await;
1164        assert_matches!(result, Ok(LogProcessingResult::Exit));
1165    }
1166
1167    #[test]
1168    fn test_formatter_error_is_broken_pipe() {
1169        assert!(
1170            FormatterError::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1171                .is_broken_pipe()
1172        );
1173        assert!(!FormatterError::IO(std::io::Error::other("other")).is_broken_pipe());
1174        assert!(!FormatterError::Other(anyhow::anyhow!("other")).is_broken_pipe());
1175    }
1176
1177    #[cfg(not(feature = "fdomain"))]
1178    #[fuchsia::test]
1179    async fn test_json_and_fxt_output_identical() {
1180        use diagnostics_data::ExtendedMoniker;
1181        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
1182        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, MONIKER, Record, URL};
1183        use diagnostics_message::MonikerWithUrl;
1184        use flyweights::FlyStr;
1185        use zerocopy::{FromBytes, IntoBytes};
1186
1187        let symbolizer = NoOpSymbolizer {};
1188        let options = LogFormatterOptions::default();
1189
1190        let fn_encode = |record: Record<'_>, tag: u32| -> Vec<u8> {
1191            let mut encoder = Encoder::new(
1192                std::io::Cursor::new(ResizableBuffer::from(Vec::new())),
1193                EncoderOpts::default(),
1194            );
1195            encoder.write_record(record).unwrap();
1196            let mut bytes = encoder.take().into_inner().into_inner();
1197            let mut header = Header::read_from_bytes(&bytes[0..8]).unwrap();
1198            header.set_tag(tag);
1199            bytes[0..8].copy_from_slice(header.as_bytes());
1200            bytes
1201        };
1202
1203        let manifest_bytes = fn_encode(
1204            Record {
1205                timestamp: zx::BootInstant::from_nanos(0),
1206                severity: 0x30, // INFO
1207                arguments: vec![
1208                    Argument::other(MONIKER, "core/foo"),
1209                    Argument::other(URL, "fuchsia-pkg://foo"),
1210                ],
1211            },
1212            1 | LOG_CONTROL_BIT,
1213        );
1214
1215        let log_bytes1 = fn_encode(
1216            Record {
1217                timestamp: zx::BootInstant::from_nanos(123456),
1218                severity: 0x30, // INFO
1219                arguments: vec![
1220                    Argument::pid(zx::Koid::from_raw(1000)),
1221                    Argument::tid(zx::Koid::from_raw(2000)),
1222                    Argument::tag("my_tag"),
1223                    Argument::message("Hello identical world!"),
1224                ],
1225            },
1226            1,
1227        );
1228
1229        let log_bytes2 = fn_encode(
1230            Record {
1231                timestamp: zx::BootInstant::from_nanos(123457),
1232                severity: 0x40, // WARN
1233                arguments: vec![
1234                    Argument::file("src/main.rs"),
1235                    Argument::line(42),
1236                    Argument::dropped(5),
1237                    Argument::message("Warning with source location and dropped count"),
1238                ],
1239            },
1240            1,
1241        );
1242
1243        let log_bytes3_signed = fn_encode(
1244            Record {
1245                timestamp: zx::BootInstant::from_nanos(123458),
1246                severity: 0x50, // ERROR
1247                arguments: vec![
1248                    Argument::message("Error with custom signed int"),
1249                    Argument::new("signed_val", -12345i64),
1250                ],
1251            },
1252            1,
1253        );
1254
1255        let log_bytes3_unsigned = fn_encode(
1256            Record {
1257                timestamp: zx::BootInstant::from_nanos(123458),
1258                severity: 0x50, // ERROR
1259                arguments: vec![
1260                    Argument::message("Error with custom unsigned int"),
1261                    Argument::new("unsigned_val", 67890u64),
1262                ],
1263            },
1264            1,
1265        );
1266
1267        let log_bytes3_bool = fn_encode(
1268            Record {
1269                timestamp: zx::BootInstant::from_nanos(123458),
1270                severity: 0x50, // ERROR
1271                arguments: vec![
1272                    Argument::message("Error with custom boolean"),
1273                    Argument::new("bool_val", true),
1274                ],
1275            },
1276            1,
1277        );
1278
1279        let log_bytes3_str = fn_encode(
1280            Record {
1281                timestamp: zx::BootInstant::from_nanos(123458),
1282                severity: 0x50, // ERROR
1283                arguments: vec![
1284                    Argument::message("Error with custom string"),
1285                    Argument::new("string_val", "custom string"),
1286                ],
1287            },
1288            1,
1289        );
1290
1291        let log_bytes4_pi = fn_encode(
1292            Record {
1293                timestamp: zx::BootInstant::from_nanos(123459),
1294                severity: 0x60, // FATAL
1295                arguments: vec![
1296                    Argument::message("Fatal with float pi"),
1297                    Argument::new("float_pi", std::f64::consts::PI),
1298                ],
1299            },
1300            1,
1301        );
1302
1303        let log_bytes4_zero = fn_encode(
1304            Record {
1305                timestamp: zx::BootInstant::from_nanos(123459),
1306                severity: 0x60, // FATAL
1307                arguments: vec![
1308                    Argument::message("Fatal with float zero"),
1309                    Argument::new("float_zero", 0.0f64),
1310                ],
1311            },
1312            1,
1313        );
1314
1315        let log_bytes4_large = fn_encode(
1316            Record {
1317                timestamp: zx::BootInstant::from_nanos(123459),
1318                severity: 0x60, // FATAL
1319                arguments: vec![
1320                    Argument::message("Fatal with float large"),
1321                    Argument::new("float_large", 123456.789f64),
1322                ],
1323            },
1324            1,
1325        );
1326
1327        let all_records = [
1328            &log_bytes1,
1329            &log_bytes2,
1330            &log_bytes3_signed,
1331            &log_bytes3_unsigned,
1332            &log_bytes3_bool,
1333            &log_bytes3_str,
1334            &log_bytes4_pi,
1335            &log_bytes4_zero,
1336            &log_bytes4_large,
1337        ];
1338
1339        // 1. JSON setup and execution using converted FXT messages
1340        let buffers_json = TestBuffers::default();
1341        let stdout_json = JsonWriter::<LogEntry>::new_test(None, &buffers_json);
1342        let mut formatter_json =
1343            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout_json, options.clone());
1344        formatter_json.set_boot_timestamp(Timestamp::from_nanos(0));
1345
1346        let source = MonikerWithUrl {
1347            moniker: ExtendedMoniker::parse_str("core/foo").unwrap(),
1348            url: FlyStr::new("fuchsia-pkg://foo"),
1349        };
1350
1351        let (sender_json, receiver_json) = zx::Socket::create_stream();
1352        let mut json_stream_bytes = Vec::new();
1353        for record_bytes in &all_records {
1354            let target_log =
1355                diagnostics_message::from_structured(source.clone(), record_bytes).unwrap();
1356            serde_json::to_writer(&mut json_stream_bytes, &target_log).unwrap();
1357            json_stream_bytes.push(b'\n');
1358        }
1359        sender_json.write(&json_stream_bytes).expect("failed to write target logs");
1360        drop(sender_json);
1361
1362        super::dump_logs_from_socket(
1363            flex_client::socket_to_async(receiver_json),
1364            &mut formatter_json,
1365            &symbolizer,
1366            false,
1367        )
1368        .await
1369        .unwrap();
1370
1371        // 2. FXT setup and execution
1372        let buffers_fxt = TestBuffers::default();
1373        let stdout_fxt = JsonWriter::<LogEntry>::new_test(None, &buffers_fxt);
1374        let mut formatter_fxt =
1375            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout_fxt, options.clone());
1376        formatter_fxt.set_boot_timestamp(Timestamp::from_nanos(0));
1377
1378        let (sender_fxt, receiver_fxt) = zx::Socket::create_stream();
1379        sender_fxt.write(&manifest_bytes).unwrap();
1380        for record_bytes in &all_records {
1381            sender_fxt.write(record_bytes).unwrap();
1382        }
1383        drop(sender_fxt);
1384
1385        super::dump_fxt_logs_from_socket(
1386            flex_client::socket_to_async(receiver_fxt),
1387            &mut formatter_fxt,
1388            &symbolizer,
1389            false,
1390        )
1391        .await
1392        .unwrap();
1393
1394        // 3. Verify identity
1395        assert_eq!(buffers_json.stdout.into_string(), buffers_fxt.stdout.into_string());
1396    }
1397
1398    #[fuchsia::test]
1399    async fn test_default_formatter_tail_limit() {
1400        let buffers = TestBuffers::default();
1401        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
1402        let options = LogFormatterOptions { tail: Some(2), ..Default::default() };
1403        let mut formatter = DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options);
1404
1405        let entry1 = LogEntry {
1406            data: LogData::TargetLog(
1407                logs_data_builder().add_tag("tag1").set_message("msg1").build(),
1408            ),
1409        };
1410        let entry2 = LogEntry {
1411            data: LogData::TargetLog(
1412                logs_data_builder().add_tag("tag1").set_message("msg2").build(),
1413            ),
1414        };
1415        let entry3 = LogEntry {
1416            data: LogData::TargetLog(
1417                logs_data_builder().add_tag("tag1").set_message("msg3").build(),
1418            ),
1419        };
1420
1421        formatter.push_log(entry1).await.unwrap();
1422        formatter.push_log(entry2).await.unwrap();
1423        formatter.push_log(entry3).await.unwrap();
1424
1425        // Before flushing, nothing should be outputted because tail buffering delays output
1426        let stdout_snapshot = buffers.stdout.clone().into_string();
1427        assert_eq!(stdout_snapshot, "");
1428
1429        formatter.flush().await.unwrap();
1430
1431        // After flushing, only the last 2 entries should be outputted
1432        let output = buffers.into_stdout_str();
1433        assert!(!output.contains("msg1"));
1434        assert!(output.contains("msg2"));
1435        assert!(output.contains("msg3"));
1436    }
1437}