Skip to main content

log_command/
log_formatter.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::filter::LogFilterCriteria;
6use crate::log_socket_stream::{JsonDeserializeError, LogsDataStream};
7use crate::{
8    DetailedDateTime, InstanceGetter, LogCommand, LogError, LogProcessingResult, TimeFormat,
9};
10use anyhow::Result;
11use async_trait::async_trait;
12use diagnostics_data::{
13    Data, LogTextColor, LogTextDisplayOptions, LogTextPresenter, LogTimeDisplayFormat, Logs,
14    LogsData, LogsDataBuilder, LogsField, LogsProperty, Severity, Timezone,
15};
16use futures_util::future::Either;
17use futures_util::stream::FuturesUnordered;
18use futures_util::{StreamExt, select};
19use serde::{Deserialize, Serialize};
20use std::fmt::Display;
21use std::io::Write;
22use std::time::Duration;
23use thiserror::Error;
24use writer::ToolIO;
25
26pub use diagnostics_data::Timestamp;
27
28pub const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S.%3f";
29
30/// Type of data in a log entry
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub enum LogData {
33    /// A log entry from the target
34    TargetLog(LogsData),
35}
36
37impl LogData {
38    /// Gets the LogData as a target log.
39    pub fn as_target_log(&self) -> Option<&LogsData> {
40        match self {
41            LogData::TargetLog(log) => Some(log),
42        }
43    }
44
45    pub fn as_target_log_mut(&mut self) -> Option<&mut LogsData> {
46        match self {
47            LogData::TargetLog(log) => Some(log),
48        }
49    }
50}
51
52impl From<LogsData> for LogData {
53    fn from(data: LogsData) -> Self {
54        Self::TargetLog(data)
55    }
56}
57
58/// A log entry from either the host, target, or
59/// a symbolized log.
60#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
61pub struct LogEntry {
62    /// The log
63    pub data: LogData,
64}
65
66impl LogEntry {
67    fn utc_timestamp(&self, boot_ts: Option<Timestamp>) -> Timestamp {
68        Timestamp::from_nanos(match &self.data {
69            LogData::TargetLog(data) => {
70                data.metadata.timestamp.into_nanos()
71                    + boot_ts.map(|value| value.into_nanos()).unwrap_or(0)
72            }
73        })
74    }
75}
76
77// Required if we want to use ffx's built-in I/O, but
78// this isn't really applicable to us because we have
79// custom formatting rules.
80impl Display for LogEntry {
81    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        unreachable!("UNSUPPORTED -- This type cannot be formatted with std format.");
83    }
84}
85
86/// A trait for symbolizing log entries
87#[async_trait(?Send)]
88pub trait Symbolize {
89    /// Symbolizes a LogEntry and optionally produces a result.
90    /// The symbolizer may choose to discard the result.
91    /// This method may be called multiple times concurrently.
92    async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry>;
93}
94
95async fn handle_value<S>(one: Data<Logs>, symbolizer: &S) -> Option<LogEntry>
96where
97    S: Symbolize + ?Sized,
98{
99    let entry = LogEntry { data: one.into() };
100    symbolizer.symbolize(entry).await
101}
102
103fn generate_timestamp_message(boot_timestamp: Timestamp) -> LogEntry {
104    LogEntry {
105        data: LogData::TargetLog(
106            LogsDataBuilder::new(diagnostics_data::BuilderArgs {
107                moniker: "ffx".try_into().unwrap(),
108                timestamp: Timestamp::from_nanos(0),
109                component_url: Some("ffx".into()),
110                severity: Severity::Info,
111            })
112            .set_message("Logging started")
113            .add_key(LogsProperty::String(
114                LogsField::Other("utc_time_now".into()),
115                chrono::Utc::now().to_rfc3339(),
116            ))
117            .add_key(LogsProperty::Int(
118                LogsField::Other("current_boot_timestamp".to_string()),
119                boot_timestamp.into_nanos(),
120            ))
121            .build(),
122        ),
123    }
124}
125
126/// Reads logs from a socket and formats them using the given formatter and symbolizer.
127pub async fn dump_logs_from_socket<F, S>(
128    socket: flex_client::AsyncSocket,
129    formatter: &mut F,
130    symbolizer: &S,
131    include_timestamp: bool,
132) -> Result<LogProcessingResult, JsonDeserializeError>
133where
134    F: LogFormatter + BootTimeAccessor,
135    S: Symbolize + ?Sized,
136{
137    let mut decoder = Box::pin(LogsDataStream::new(socket).fuse());
138    let mut symbolize_pending = FuturesUnordered::new();
139    if include_timestamp && !formatter.is_utc_time_format() {
140        formatter.push_log(generate_timestamp_message(formatter.get_boot_timestamp())).await?;
141    }
142    while let Some(value) = select! {
143        res = decoder.next() => Some(Either::Left(res)),
144        res = symbolize_pending.next() => Some(Either::Right(res)),
145        complete => None,
146    } {
147        match value {
148            Either::Left(Some(result)) => match result {
149                Ok(log) => symbolize_pending.push(handle_value(log, symbolizer)),
150                Err(e) => return Err(e),
151            },
152            Either::Right(Some(Some(symbolized))) => match formatter.push_log(symbolized).await? {
153                LogProcessingResult::Exit => {
154                    return Ok(LogProcessingResult::Exit);
155                }
156                LogProcessingResult::Continue => {}
157            },
158            _ => {}
159        }
160    }
161    Ok(LogProcessingResult::Continue)
162}
163
164pub trait BootTimeAccessor {
165    /// Sets the boot timestamp in nanoseconds since the Unix epoch.
166    fn set_boot_timestamp(&mut self, _boot_ts_nanos: Timestamp);
167
168    /// Returns the boot timestamp in nanoseconds since the Unix epoch.
169    fn get_boot_timestamp(&self) -> Timestamp;
170}
171
172/// Timestamp filter which is either either boot-based or UTC-based.
173#[derive(Clone, Debug)]
174pub struct DeviceOrLocalTimestamp {
175    /// Timestamp in boot time
176    pub timestamp: Timestamp,
177    /// True if this filter should be applied to boot time,
178    /// false if UTC time.
179    pub is_boot: bool,
180}
181
182impl DeviceOrLocalTimestamp {
183    /// Creates a DeviceOrLocalTimestamp from a real-time date/time or
184    /// a boot date/time. Returns None if both rtc and boot are None.
185    /// Returns None if the timestamp is "now".
186    pub fn new(
187        rtc: Option<&DetailedDateTime>,
188        boot: Option<&Duration>,
189    ) -> Option<DeviceOrLocalTimestamp> {
190        rtc.as_ref()
191            .filter(|value| !value.is_now)
192            .map(|value| DeviceOrLocalTimestamp {
193                timestamp: Timestamp::from_nanos(
194                    value.naive_utc().and_utc().timestamp_nanos_opt().unwrap(),
195                ),
196                is_boot: false,
197            })
198            .or_else(|| {
199                boot.map(|value| DeviceOrLocalTimestamp {
200                    timestamp: Timestamp::from_nanos(value.as_nanos() as i64),
201                    is_boot: true,
202                })
203            })
204    }
205}
206
207/// Log formatter options
208#[derive(Clone, Debug)]
209pub struct LogFormatterOptions {
210    /// Text display options
211    pub display: Option<LogTextDisplayOptions>,
212    /// Only display logs since the specified time.
213    pub since: Option<DeviceOrLocalTimestamp>,
214    /// Only display logs until the specified time.
215    pub until: Option<DeviceOrLocalTimestamp>,
216}
217
218impl Default for LogFormatterOptions {
219    fn default() -> Self {
220        LogFormatterOptions { display: Some(Default::default()), since: None, until: None }
221    }
222}
223
224/// Log formatter error
225#[derive(Error, Debug)]
226pub enum FormatterError {
227    /// An unknown error occurred
228    #[error(transparent)]
229    Other(#[from] anyhow::Error),
230    /// An IO error occurred
231    #[error(transparent)]
232    IO(#[from] std::io::Error),
233}
234
235impl FormatterError {
236    pub fn is_broken_pipe(&self) -> bool {
237        match self {
238            FormatterError::IO(error) => error.kind() == std::io::ErrorKind::BrokenPipe,
239            FormatterError::Other(_) => false,
240        }
241    }
242}
243
244/// Default formatter implementation
245pub struct DefaultLogFormatter<W>
246where
247    W: Write + ToolIO<OutputItem = LogEntry>,
248{
249    writer: W,
250    filters: LogFilterCriteria,
251    options: LogFormatterOptions,
252    boot_ts_nanos: Option<Timestamp>,
253}
254
255/// Converts from UTC time to boot time.
256fn utc_to_boot(boot_ts: Timestamp, utc: Timestamp) -> Timestamp {
257    Timestamp::from_nanos(utc.into_nanos() - boot_ts.into_nanos())
258}
259
260#[async_trait(?Send)]
261impl<W> LogFormatter for DefaultLogFormatter<W>
262where
263    W: Write + ToolIO<OutputItem = LogEntry>,
264{
265    async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
266        self.push_log_internal(log_entry, true).await.or_else(|err| {
267            if err.is_broken_pipe() { Ok(LogProcessingResult::Exit) } else { Err(err) }
268        })
269    }
270
271    fn is_utc_time_format(&self) -> bool {
272        self.options.display.iter().any(|options| match options.time_format {
273            LogTimeDisplayFormat::Original => false,
274            LogTimeDisplayFormat::WallTime { tz, offset: _ } => tz == Timezone::Utc,
275        })
276    }
277}
278
279impl<W> BootTimeAccessor for DefaultLogFormatter<W>
280where
281    W: Write + ToolIO<OutputItem = LogEntry>,
282{
283    fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
284        if let Some(LogTextDisplayOptions {
285            time_format: LogTimeDisplayFormat::WallTime { offset, .. },
286            ..
287        }) = &mut self.options.display
288        {
289            *offset = boot_ts_nanos.into_nanos();
290        }
291        self.boot_ts_nanos = Some(boot_ts_nanos);
292    }
293    fn get_boot_timestamp(&self) -> Timestamp {
294        debug_assert!(self.boot_ts_nanos.is_some());
295        self.boot_ts_nanos.unwrap_or_else(|| Timestamp::from_nanos(0))
296    }
297}
298
299/// Object which contains a Writer that can be borrowed
300pub trait WriterContainer<W>
301where
302    W: Write + ToolIO<OutputItem = LogEntry>,
303{
304    fn writer(&mut self) -> &mut W;
305}
306
307impl<W> WriterContainer<W> for DefaultLogFormatter<W>
308where
309    W: Write + ToolIO<OutputItem = LogEntry>,
310{
311    fn writer(&mut self) -> &mut W {
312        &mut self.writer
313    }
314}
315
316impl<W> DefaultLogFormatter<W>
317where
318    W: Write + ToolIO<OutputItem = LogEntry>,
319{
320    /// Creates a new DefaultLogFormatter with the given writer and options.
321    pub fn new(filters: LogFilterCriteria, writer: W, options: LogFormatterOptions) -> Self {
322        Self { filters, writer, options, boot_ts_nanos: None }
323    }
324
325    pub async fn expand_monikers(&mut self, getter: &impl InstanceGetter) -> Result<(), LogError> {
326        self.filters.expand_monikers(getter).await
327    }
328
329    pub async fn push_unfiltered_log(
330        &mut self,
331        log_entry: LogEntry,
332    ) -> Result<LogProcessingResult, LogError> {
333        self.push_log_internal(log_entry, false).await
334    }
335
336    async fn push_log_internal(
337        &mut self,
338        log_entry: LogEntry,
339        enable_filters: bool,
340    ) -> Result<LogProcessingResult, LogError> {
341        if enable_filters {
342            if self.filter_by_timestamp(&log_entry, self.options.since.as_ref(), |a, b| a <= b) {
343                return Ok(LogProcessingResult::Continue);
344            }
345
346            if self.filter_by_timestamp(&log_entry, self.options.until.as_ref(), |a, b| a >= b) {
347                return Ok(LogProcessingResult::Exit);
348            }
349
350            if !self.filters.matches(&log_entry) {
351                return Ok(LogProcessingResult::Continue);
352            }
353        }
354        match self.options.display {
355            Some(text_options) => {
356                let mut options_for_this_line_only = self.options.clone();
357                options_for_this_line_only.display = Some(text_options);
358                if !enable_filters {
359                    // For host logs, don't apply the boot time offset
360                    // as this is with reference to the UTC timeline
361                    if let LogTimeDisplayFormat::WallTime { ref mut offset, .. } =
362                        options_for_this_line_only.display.as_mut().unwrap().time_format
363                    {
364                        // 1 nanosecond so that LogTimeDisplayFormat in diagnostics_data
365                        // knows that we have a valid UTC offset. It normally falls back if
366                        // the UTC offset is 0. It prints at millisecond precision so being
367                        // off by +1 nanosecond isn't an issue.
368                        *offset = 1;
369                    };
370                }
371                self.format_text_log(options_for_this_line_only, log_entry)?;
372            }
373            None => {
374                self.writer.item(&log_entry).map_err(|err| LogError::UnknownError(err.into()))?;
375            }
376        };
377
378        Ok(LogProcessingResult::Continue)
379    }
380
381    /// Creates a new DefaultLogFormatter from command-line arguments.
382    pub fn new_from_args(cmd: &LogCommand, writer: W) -> Self {
383        let is_json = writer.is_machine();
384
385        DefaultLogFormatter::new(
386            LogFilterCriteria::from(cmd.clone()),
387            writer,
388            LogFormatterOptions {
389                display: if is_json {
390                    None
391                } else {
392                    Some(LogTextDisplayOptions {
393                        show_tags: !cmd.hide_tags,
394                        color: if cmd.no_color {
395                            LogTextColor::None
396                        } else {
397                            LogTextColor::BySeverity
398                        },
399                        show_metadata: cmd.show_metadata,
400                        time_format: match cmd.clock {
401                            TimeFormat::Boot => LogTimeDisplayFormat::Original,
402                            TimeFormat::Local => LogTimeDisplayFormat::WallTime {
403                                tz: Timezone::Local,
404                                // This will receive a correct value when logging actually starts,
405                                // see `set_boot_timestamp()` method on the log formatter.
406                                offset: 0,
407                            },
408                            TimeFormat::Utc => LogTimeDisplayFormat::WallTime {
409                                tz: Timezone::Utc,
410                                // This will receive a correct value when logging actually starts,
411                                // see `set_boot_timestamp()` method on the log formatter.
412                                offset: 0,
413                            },
414                        },
415                        show_file: !cmd.hide_file,
416                        show_moniker: !cmd.hide_moniker,
417                        show_full_moniker: cmd.show_full_moniker,
418                        prefer_url_component_name: cmd.prefer_url_component_name,
419                    })
420                },
421                since: DeviceOrLocalTimestamp::new(cmd.since.as_ref(), cmd.since_boot.as_ref()),
422                until: DeviceOrLocalTimestamp::new(cmd.until.as_ref(), cmd.until_boot.as_ref()),
423            },
424        )
425    }
426
427    fn filter_by_timestamp(
428        &self,
429        log_entry: &LogEntry,
430        timestamp: Option<&DeviceOrLocalTimestamp>,
431        callback: impl Fn(&Timestamp, &Timestamp) -> bool,
432    ) -> bool {
433        let Some(timestamp) = timestamp else {
434            return false;
435        };
436        if timestamp.is_boot {
437            callback(
438                &utc_to_boot(
439                    self.get_boot_timestamp(),
440                    log_entry.utc_timestamp(self.boot_ts_nanos),
441                ),
442                &timestamp.timestamp,
443            )
444        } else {
445            callback(&log_entry.utc_timestamp(self.boot_ts_nanos), &timestamp.timestamp)
446        }
447    }
448
449    // This function's arguments are copied to make lifetimes in push_log easier since borrowing
450    // &self would complicate spam highlighting.
451    fn format_text_log(
452        &mut self,
453        options: LogFormatterOptions,
454        log_entry: LogEntry,
455    ) -> Result<(), FormatterError> {
456        let text_options = match options.display {
457            Some(o) => o,
458            None => {
459                unreachable!("If we are here, we can only be formatting text");
460            }
461        };
462        match log_entry {
463            LogEntry { data: LogData::TargetLog(data), .. } => {
464                // TODO(https://fxbug.dev/42072442): Add support for log spam redaction and other
465                // features listed in the design doc.
466                writeln!(self.writer, "{}", LogTextPresenter::new(&data, text_options))?;
467            }
468        }
469        Ok(())
470    }
471}
472
473#[allow(dead_code)] // TODO(https://fxbug.dev/421409178)
474/// Symbolizer that does nothing.
475pub struct NoOpSymbolizer;
476
477#[async_trait(?Send)]
478impl Symbolize for NoOpSymbolizer {
479    async fn symbolize(&self, entry: LogEntry) -> Option<LogEntry> {
480        Some(entry)
481    }
482}
483
484/// Trait for formatting logs one at a time.
485#[async_trait(?Send)]
486pub trait LogFormatter {
487    /// Formats a log entry and writes it to the output.
488    async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError>;
489
490    /// Returns true if the formatter is configured to output in UTC time format.
491    fn is_utc_time_format(&self) -> bool;
492}
493
494#[cfg(test)]
495mod test {
496    use crate::parse_time;
497    use assert_matches::assert_matches;
498    use diagnostics_data::{LogsDataBuilder, Severity};
499    use std::cell::Cell;
500    use writer::{Format, JsonWriter, TestBuffers};
501
502    use super::*;
503
504    const DEFAULT_TS_NANOS: u64 = 1615535969000000000;
505
506    struct FakeFormatter {
507        logs: Vec<LogEntry>,
508        boot_timestamp: Timestamp,
509        is_utc_time_format: bool,
510    }
511
512    impl FakeFormatter {
513        fn new() -> Self {
514            Self {
515                logs: Vec::new(),
516                boot_timestamp: Timestamp::from_nanos(0),
517                is_utc_time_format: false,
518            }
519        }
520    }
521
522    impl BootTimeAccessor for FakeFormatter {
523        fn set_boot_timestamp(&mut self, boot_ts_nanos: Timestamp) {
524            self.boot_timestamp = boot_ts_nanos;
525        }
526
527        fn get_boot_timestamp(&self) -> Timestamp {
528            self.boot_timestamp
529        }
530    }
531
532    #[async_trait(?Send)]
533    impl LogFormatter for FakeFormatter {
534        async fn push_log(&mut self, log_entry: LogEntry) -> Result<LogProcessingResult, LogError> {
535            self.logs.push(log_entry);
536            Ok(LogProcessingResult::Continue)
537        }
538
539        fn is_utc_time_format(&self) -> bool {
540            self.is_utc_time_format
541        }
542    }
543
544    /// Symbolizer that prints "Fuchsia".
545    pub struct FakeFuchsiaSymbolizer;
546
547    fn set_log_msg(entry: &mut LogEntry, msg: impl Into<String>) {
548        *entry.data.as_target_log_mut().unwrap().msg_mut().unwrap() = msg.into();
549    }
550
551    #[async_trait(?Send)]
552    impl Symbolize for FakeFuchsiaSymbolizer {
553        async fn symbolize(&self, mut entry: LogEntry) -> Option<LogEntry> {
554            set_log_msg(&mut entry, "Fuchsia");
555            Some(entry)
556        }
557    }
558
559    struct FakeSymbolizerCallback {
560        should_discard: Cell<bool>,
561    }
562
563    impl FakeSymbolizerCallback {
564        fn new() -> Self {
565            Self { should_discard: Cell::new(true) }
566        }
567    }
568
569    async fn dump_logs_from_socket<F, S>(
570        socket: fuchsia_async::Socket,
571        formatter: &mut F,
572        symbolizer: &S,
573    ) -> Result<LogProcessingResult, JsonDeserializeError>
574    where
575        F: LogFormatter + BootTimeAccessor,
576        S: Symbolize + ?Sized,
577    {
578        super::dump_logs_from_socket(socket, formatter, symbolizer, false).await
579    }
580
581    #[async_trait(?Send)]
582    impl Symbolize for FakeSymbolizerCallback {
583        async fn symbolize(&self, mut input: LogEntry) -> Option<LogEntry> {
584            self.should_discard.set(!self.should_discard.get());
585            if self.should_discard.get() {
586                None
587            } else {
588                set_log_msg(&mut input, "symbolized log");
589                Some(input)
590            }
591        }
592    }
593
594    #[fuchsia::test]
595    async fn test_boot_timestamp_setter() {
596        let buffers = TestBuffers::default();
597        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
598        let options = LogFormatterOptions {
599            display: Some(LogTextDisplayOptions {
600                time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 0 },
601                ..Default::default()
602            }),
603            ..Default::default()
604        };
605        let mut formatter =
606            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
607        formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
608        assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
609
610        // Boot timestamp is supported when using JSON output (for filtering)
611        let buffers = TestBuffers::default();
612        let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
613        let options = LogFormatterOptions { display: None, ..Default::default() };
614        let mut formatter = DefaultLogFormatter::new(LogFilterCriteria::default(), output, options);
615        formatter.set_boot_timestamp(Timestamp::from_nanos(1234));
616        assert_eq!(formatter.get_boot_timestamp(), Timestamp::from_nanos(1234));
617    }
618
619    #[fuchsia::test]
620    async fn test_format_single_message() {
621        let symbolizer = NoOpSymbolizer {};
622        let mut formatter = FakeFormatter::new();
623        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
624            moniker: "ffx".try_into().unwrap(),
625            timestamp: Timestamp::from_nanos(0),
626            component_url: Some("ffx".into()),
627            severity: Severity::Info,
628        })
629        .set_message("Hello world!")
630        .build();
631        let (sender, receiver) = zx::Socket::create_stream();
632        sender
633            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
634            .expect("failed to write target log");
635        drop(sender);
636        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
637            .await
638            .unwrap();
639        assert_eq!(formatter.logs, vec![LogEntry { data: LogData::TargetLog(target_log) }]);
640    }
641
642    #[fuchsia::test]
643    async fn test_format_utc_timestamp() {
644        let symbolizer = NoOpSymbolizer {};
645        let mut formatter = FakeFormatter::new();
646        formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
647        let (_, receiver) = zx::Socket::create_stream();
648        super::dump_logs_from_socket(
649            flex_client::socket_to_async(receiver),
650            &mut formatter,
651            &symbolizer,
652            true,
653        )
654        .await
655        .unwrap();
656        let target_log = formatter.logs[0].data.as_target_log().unwrap();
657        let properties = target_log.payload_keys().unwrap();
658        assert_eq!(target_log.msg().unwrap(), "Logging started");
659
660        // Ensure the end has a valid timestamp
661        chrono::DateTime::parse_from_rfc3339(
662            properties.get_property("utc_time_now").unwrap().string().unwrap(),
663        )
664        .unwrap();
665        assert_eq!(
666            properties.get_property("current_boot_timestamp").unwrap().int().unwrap(),
667            DEFAULT_TS_NANOS as i64
668        );
669    }
670
671    #[fuchsia::test]
672    async fn test_format_utc_timestamp_does_not_print_if_utc_time() {
673        let symbolizer = NoOpSymbolizer {};
674        let mut formatter = FakeFormatter::new();
675        formatter.is_utc_time_format = true;
676        formatter.set_boot_timestamp(Timestamp::from_nanos(DEFAULT_TS_NANOS as i64));
677        let (_, receiver) = zx::Socket::create_stream();
678        super::dump_logs_from_socket(
679            flex_client::socket_to_async(receiver),
680            &mut formatter,
681            &symbolizer,
682            true,
683        )
684        .await
685        .unwrap();
686        assert_eq!(formatter.logs.len(), 0);
687    }
688
689    #[fuchsia::test]
690    async fn test_format_multiple_messages() {
691        let symbolizer = NoOpSymbolizer {};
692        let mut formatter = FakeFormatter::new();
693        let (sender, receiver) = zx::Socket::create_stream();
694        let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
695            moniker: "ffx".try_into().unwrap(),
696            timestamp: Timestamp::from_nanos(0),
697            component_url: Some("ffx".into()),
698            severity: Severity::Info,
699        })
700        .set_message("Hello world!")
701        .set_pid(1)
702        .set_tid(2)
703        .build();
704        let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
705            moniker: "ffx".try_into().unwrap(),
706            timestamp: Timestamp::from_nanos(1),
707            component_url: Some("ffx".into()),
708            severity: Severity::Info,
709        })
710        .set_message("Hello world 2!")
711        .build();
712        sender
713            .write(serde_json::to_string(&vec![&target_log_0, &target_log_1]).unwrap().as_bytes())
714            .expect("failed to write target log");
715        drop(sender);
716        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
717            .await
718            .unwrap();
719        assert_eq!(
720            formatter.logs,
721            vec![
722                LogEntry { data: LogData::TargetLog(target_log_0) },
723                LogEntry { data: LogData::TargetLog(target_log_1) }
724            ]
725        );
726    }
727
728    #[fuchsia::test]
729    async fn test_format_timestamp_filter() {
730        // test since and until args for the LogFormatter
731        let symbolizer = NoOpSymbolizer {};
732        let buffers = TestBuffers::default();
733        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
734        let mut formatter = DefaultLogFormatter::new(
735            LogFilterCriteria::default(),
736            stdout,
737            LogFormatterOptions {
738                since: Some(DeviceOrLocalTimestamp {
739                    timestamp: Timestamp::from_nanos(1),
740                    is_boot: true,
741                }),
742                until: Some(DeviceOrLocalTimestamp {
743                    timestamp: Timestamp::from_nanos(3),
744                    is_boot: true,
745                }),
746                ..Default::default()
747            },
748        );
749        formatter.set_boot_timestamp(Timestamp::from_nanos(0));
750
751        let (sender, receiver) = zx::Socket::create_stream();
752        let target_log_0 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
753            moniker: "ffx".try_into().unwrap(),
754            timestamp: Timestamp::from_nanos(0),
755            component_url: Some("ffx".into()),
756            severity: Severity::Info,
757        })
758        .set_message("Hello world!")
759        .build();
760        let target_log_1 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
761            moniker: "ffx".try_into().unwrap(),
762            timestamp: Timestamp::from_nanos(1),
763            component_url: Some("ffx".into()),
764            severity: Severity::Info,
765        })
766        .set_message("Hello world 2!")
767        .build();
768        let target_log_2 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
769            moniker: "ffx".try_into().unwrap(),
770            timestamp: Timestamp::from_nanos(2),
771            component_url: Some("ffx".into()),
772            severity: Severity::Info,
773        })
774        .set_pid(1)
775        .set_tid(2)
776        .set_message("Hello world 3!")
777        .build();
778        let target_log_3 = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
779            moniker: "ffx".try_into().unwrap(),
780            timestamp: Timestamp::from_nanos(3),
781            component_url: Some("ffx".into()),
782            severity: Severity::Info,
783        })
784        .set_message("Hello world 4!")
785        .set_pid(1)
786        .set_tid(2)
787        .build();
788        sender
789            .write(
790                serde_json::to_string(&vec![
791                    &target_log_0,
792                    &target_log_1,
793                    &target_log_2,
794                    &target_log_3,
795                ])
796                .unwrap()
797                .as_bytes(),
798            )
799            .expect("failed to write target log");
800        drop(sender);
801        assert_matches!(
802            dump_logs_from_socket(
803                flex_client::socket_to_async(receiver),
804                &mut formatter,
805                &symbolizer,
806            )
807            .await,
808            Ok(LogProcessingResult::Exit)
809        );
810        assert_eq!(
811            buffers.stdout.into_string(),
812            "[00000.000000][1][2][ffx] INFO: Hello world 3!\n"
813        );
814    }
815
816    fn make_log_with_timestamp(timestamp: i64) -> LogsData {
817        LogsDataBuilder::new(diagnostics_data::BuilderArgs {
818            moniker: "ffx".try_into().unwrap(),
819            timestamp: Timestamp::from_nanos(timestamp),
820            component_url: Some("ffx".into()),
821            severity: Severity::Info,
822        })
823        .set_message(format!("Hello world {timestamp}!"))
824        .set_pid(1)
825        .set_tid(2)
826        .build()
827    }
828
829    #[fuchsia::test]
830    async fn test_format_timestamp_filter_utc() {
831        // test since and until args for the LogFormatter
832        let symbolizer = NoOpSymbolizer {};
833        let buffers = TestBuffers::default();
834        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
835        let mut formatter = DefaultLogFormatter::new(
836            LogFilterCriteria::default(),
837            stdout,
838            LogFormatterOptions {
839                since: Some(DeviceOrLocalTimestamp {
840                    timestamp: Timestamp::from_nanos(1),
841                    is_boot: false,
842                }),
843                until: Some(DeviceOrLocalTimestamp {
844                    timestamp: Timestamp::from_nanos(3),
845                    is_boot: false,
846                }),
847                display: Some(LogTextDisplayOptions {
848                    time_format: LogTimeDisplayFormat::WallTime { tz: Timezone::Utc, offset: 1 },
849                    ..Default::default()
850                }),
851            },
852        );
853        formatter.set_boot_timestamp(Timestamp::from_nanos(1));
854
855        let (sender, receiver) = zx::Socket::create_stream();
856        let logs = (0..4).map(make_log_with_timestamp).collect::<Vec<_>>();
857        sender
858            .write(serde_json::to_string(&logs).unwrap().as_bytes())
859            .expect("failed to write target log");
860        drop(sender);
861        assert_matches!(
862            dump_logs_from_socket(
863                flex_client::socket_to_async(receiver),
864                &mut formatter,
865                &symbolizer,
866            )
867            .await,
868            Ok(LogProcessingResult::Exit)
869        );
870        assert_eq!(
871            buffers.stdout.into_string(),
872            "[1970-01-01 00:00:00.000][1][2][ffx] INFO: Hello world 1!\n"
873        );
874    }
875
876    fn logs_data_builder() -> LogsDataBuilder {
877        diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
878            timestamp: Timestamp::from_nanos(default_ts().as_nanos() as i64),
879            component_url: Some("component_url".into()),
880            moniker: "some/moniker".try_into().unwrap(),
881            severity: diagnostics_data::Severity::Warn,
882        })
883        .set_pid(1)
884        .set_tid(2)
885    }
886
887    fn default_ts() -> Duration {
888        Duration::from_nanos(DEFAULT_TS_NANOS)
889    }
890
891    fn log_entry() -> LogEntry {
892        LogEntry {
893            data: LogData::TargetLog(
894                logs_data_builder().add_tag("tag1").add_tag("tag2").set_message("message").build(),
895            ),
896        }
897    }
898
899    #[fuchsia::test]
900    async fn test_default_formatter() {
901        let buffers = TestBuffers::default();
902        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
903        let options = LogFormatterOptions::default();
904        let mut formatter =
905            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
906        formatter.push_log(log_entry()).await.unwrap();
907        drop(formatter);
908        assert_eq!(
909            buffers.into_stdout_str(),
910            "[1615535969.000000][1][2][some/moniker][tag1,tag2] WARN: message\n"
911        );
912    }
913
914    #[fuchsia::test]
915    async fn test_default_formatter_with_hidden_metadata() {
916        let buffers = TestBuffers::default();
917        let stdout = JsonWriter::<LogEntry>::new_test(None, &buffers);
918        let options = LogFormatterOptions {
919            display: Some(LogTextDisplayOptions { show_metadata: false, ..Default::default() }),
920            ..LogFormatterOptions::default()
921        };
922        let mut formatter =
923            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
924        formatter.push_log(log_entry()).await.unwrap();
925        drop(formatter);
926        assert_eq!(
927            buffers.into_stdout_str(),
928            "[1615535969.000000][some/moniker][tag1,tag2] WARN: message\n"
929        );
930    }
931
932    #[fuchsia::test]
933    async fn test_default_formatter_with_json() {
934        let buffers = TestBuffers::default();
935        let stdout = JsonWriter::<LogEntry>::new_test(Some(Format::Json), &buffers);
936        let options = LogFormatterOptions { display: None, ..Default::default() };
937        {
938            let mut formatter =
939                DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
940            formatter.push_log(log_entry()).await.unwrap();
941        }
942        assert_eq!(
943            serde_json::from_str::<LogEntry>(&buffers.into_stdout_str()).unwrap(),
944            log_entry()
945        );
946    }
947
948    fn emit_log(sender: &mut zx::Socket, msg: &str, timestamp: i64) -> Data<Logs> {
949        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
950            moniker: "ffx".try_into().unwrap(),
951            timestamp: Timestamp::from_nanos(timestamp),
952            component_url: Some("ffx".into()),
953            severity: Severity::Info,
954        })
955        .set_message(msg)
956        .build();
957
958        sender
959            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
960            .expect("failed to write target log");
961        target_log
962    }
963
964    #[fuchsia::test]
965    async fn test_default_formatter_discards_when_told_by_symbolizer() {
966        let mut formatter = FakeFormatter::new();
967        let (mut sender, receiver) = zx::Socket::create_stream();
968        let mut target_log_0 = emit_log(&mut sender, "Hello world!", 0);
969        emit_log(&mut sender, "Dropped world!", 1);
970        let mut target_log_2 = emit_log(&mut sender, "Hello world!", 2);
971        emit_log(&mut sender, "Dropped world!", 3);
972        let mut target_log_4 = emit_log(&mut sender, "Hello world!", 4);
973        drop(sender);
974        // Drop every other log.
975        let symbolizer = FakeSymbolizerCallback::new();
976        *target_log_0.msg_mut().unwrap() = "symbolized log".into();
977        *target_log_2.msg_mut().unwrap() = "symbolized log".into();
978        *target_log_4.msg_mut().unwrap() = "symbolized log".into();
979        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
980            .await
981            .unwrap();
982        assert_eq!(
983            formatter.logs,
984            vec![
985                LogEntry { data: LogData::TargetLog(target_log_0) },
986                LogEntry { data: LogData::TargetLog(target_log_2) },
987                LogEntry { data: LogData::TargetLog(target_log_4) }
988            ],
989        );
990    }
991
992    #[fuchsia::test]
993    async fn test_symbolized_output() {
994        let symbolizer = FakeFuchsiaSymbolizer;
995        let buffers = TestBuffers::default();
996        let output = JsonWriter::<LogEntry>::new_test(None, &buffers);
997        let mut formatter = DefaultLogFormatter::new(
998            LogFilterCriteria::default(),
999            output,
1000            LogFormatterOptions { ..Default::default() },
1001        );
1002        formatter.set_boot_timestamp(Timestamp::from_nanos(0));
1003        let target_log = LogsDataBuilder::new(diagnostics_data::BuilderArgs {
1004            moniker: "ffx".try_into().unwrap(),
1005            timestamp: Timestamp::from_nanos(0),
1006            component_url: Some("ffx".into()),
1007            severity: Severity::Info,
1008        })
1009        .set_pid(1)
1010        .set_tid(2)
1011        .set_message("Hello world!")
1012        .build();
1013        let (sender, receiver) = zx::Socket::create_stream();
1014        sender
1015            .write(serde_json::to_string(&target_log).unwrap().as_bytes())
1016            .expect("failed to write target log");
1017        drop(sender);
1018        dump_logs_from_socket(flex_client::socket_to_async(receiver), &mut formatter, &symbolizer)
1019            .await
1020            .unwrap();
1021        assert_eq!(buffers.stdout.into_string(), "[00000.000000][1][2][ffx] INFO: Fuchsia\n");
1022    }
1023
1024    #[test]
1025    fn test_device_or_local_timestamp_returns_none_if_now_is_passed() {
1026        assert_matches!(DeviceOrLocalTimestamp::new(Some(&parse_time("now").unwrap()), None), None);
1027    }
1028
1029    struct BrokenPipeWriter;
1030    impl std::io::Write for BrokenPipeWriter {
1031        fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1032            Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1033        }
1034
1035        fn flush(&mut self) -> std::io::Result<()> {
1036            Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1037        }
1038    }
1039
1040    impl ToolIO for BrokenPipeWriter {
1041        type OutputItem = LogEntry;
1042        fn is_machine(&self) -> bool {
1043            false
1044        }
1045
1046        fn stderr(&mut self) -> &mut dyn std::io::Write {
1047            self
1048        }
1049
1050        fn item(&mut self, _value: &Self::OutputItem) -> writer::Result<()> {
1051            Err(writer::Error::Io(std::io::Error::new(
1052                std::io::ErrorKind::BrokenPipe,
1053                "broken pipe",
1054            )))
1055        }
1056    }
1057
1058    #[fuchsia::test]
1059    async fn test_default_formatter_exits_on_broken_pipe() {
1060        let stdout = BrokenPipeWriter;
1061        let options = LogFormatterOptions::default();
1062        let mut formatter =
1063            DefaultLogFormatter::new(LogFilterCriteria::default(), stdout, options.clone());
1064        let result = formatter.push_log(log_entry()).await;
1065        assert_matches!(result, Ok(LogProcessingResult::Exit));
1066    }
1067
1068    #[test]
1069    fn test_formatter_error_is_broken_pipe() {
1070        assert!(
1071            FormatterError::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe"))
1072                .is_broken_pipe()
1073        );
1074        assert!(!FormatterError::IO(std::io::Error::other("other")).is_broken_pipe());
1075        assert!(!FormatterError::Other(anyhow::anyhow!("other")).is_broken_pipe());
1076    }
1077}