Skip to main content

log_command/
log_socket_stream.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14/// Read buffer size. Sufficiently large to store a large number
15/// of messages to reduce the number of socket read calls we have
16/// to make when reading messages.
17const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19/// Amount to increase the read buffer size by after
20/// each read attempt.
21const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24    mut socket: flex_client::AsyncSocket,
25) -> impl Stream<Item = Result<OneOrMany<T>, JsonDeserializeError>>
26where
27    T: DeserializeOwned,
28{
29    stream! {
30        let mut buffer = vec![0; BUFFER_SIZE];
31        let mut write_offset = 0;
32        let mut read_offset = 0;
33        let mut available = 0;
34        loop {
35            // Read data from socket
36            debug_assert!(write_offset <= buffer.len());
37            if write_offset == buffer.len() {
38                buffer.resize(buffer.len() + INC, 0);
39            }
40            let read_res =  socket.read(&mut buffer[write_offset..]).await;
41            let Ok(socket_bytes_read) = read_res else {
42                log::warn!("socket.read({write_offset}..) failed: {read_res:?}");
43                break;
44            };
45            if socket_bytes_read == 0 {
46                break;
47            }
48            write_offset += socket_bytes_read;
49            available += socket_bytes_read;
50            let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
51                .into_iter();
52            let mut read_nothing = true;
53            loop {
54                match des.next() {
55                    Some(Ok(item)) => {
56                        read_nothing = false;
57                        yield Ok(item);
58                    }
59                    Some(Err(e)) => {
60                        if e.is_eof() {
61                            break;
62                        }
63                        read_nothing = false;
64                        yield Err(JsonDeserializeError::Other { error: e.into() });
65                        break;
66                    }
67                    None => break,
68                }
69            }
70
71            // Don't update the read offset if we haven't successfully
72            // read anything.
73            if read_nothing {
74                continue;
75            }
76            let byte_offset = des.byte_offset();
77            if byte_offset+read_offset == available {
78                available = 0;
79                write_offset = 0;
80                read_offset = 0;
81                buffer.resize(READ_BUFFER_SIZE, 0);
82            } else {
83                read_offset += byte_offset;
84            }
85        }
86    }
87}
88
89/// Streams JSON logs from a socket
90fn stream_json<T>(
91    socket: flex_client::AsyncSocket,
92) -> impl Stream<Item = Result<T, JsonDeserializeError>>
93where
94    T: DeserializeOwned,
95{
96    stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
97        .map(|item| {
98            let items = match item {
99                Ok(OneOrMany::One(item)) => vec![Ok(item)],
100                Ok(OneOrMany::Many(items)) => items.into_iter().map(Ok).collect(),
101                Err(e) => vec![Err(e)],
102            };
103            futures_util::stream::iter(items)
104        })
105        .flatten()
106}
107
108/// Stream of JSON logs from the target device.
109pub struct LogsDataStream {
110    inner: Pin<Box<dyn Stream<Item = Result<LogsData, JsonDeserializeError>> + Send>>,
111}
112
113impl LogsDataStream {
114    /// Creates a new LogsDataStream from a socket of log messages in JSON format.
115    pub fn new(socket: flex_client::AsyncSocket) -> Self {
116        Self { inner: Box::pin(stream_json(socket)) }
117    }
118}
119
120impl Stream for LogsDataStream {
121    type Item = Result<LogsData, JsonDeserializeError>;
122
123    fn poll_next(
124        mut self: std::pin::Pin<&mut Self>,
125        cx: &mut std::task::Context<'_>,
126    ) -> std::task::Poll<Option<Self::Item>> {
127        self.inner.poll_next_unpin(cx)
128    }
129}
130
131/// Something that can contain either a single value or a Vec of values
132#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
133#[serde(untagged)]
134pub enum OneOrMany<T> {
135    One(T),
136    Many(Vec<T>),
137}
138
139pub enum OneOrManyIterator<T> {
140    One(Option<T>),
141    Many(std::vec::IntoIter<T>),
142}
143
144impl<T> Iterator for OneOrManyIterator<T> {
145    type Item = T;
146
147    fn next(&mut self) -> Option<Self::Item> {
148        match self {
149            OneOrManyIterator::One(v) => v.take(),
150            OneOrManyIterator::Many(v) => v.next(),
151        }
152    }
153}
154
155impl<T> IntoIterator for OneOrMany<T> {
156    type Item = T;
157    type IntoIter = OneOrManyIterator<T>;
158
159    fn into_iter(self) -> Self::IntoIter {
160        match self {
161            OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
162            OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
163        }
164    }
165}
166
167impl<'a, T> IntoIterator for &'a OneOrMany<T> {
168    type Item = &'a T;
169    type IntoIter = std::slice::Iter<'a, T>;
170
171    fn into_iter(self) -> Self::IntoIter {
172        match self {
173            OneOrMany::One(v) => std::slice::from_ref(v).iter(),
174            OneOrMany::Many(v) => v.iter(),
175        }
176    }
177}
178
179/// Error type for log streamer
180#[derive(Error, Debug)]
181pub enum JsonDeserializeError {
182    /// Unknown error deserializing JSON
183    #[error(transparent)]
184    Other {
185        #[from]
186        error: anyhow::Error,
187    },
188    /// I/O error
189    #[error("IO error {}", error)]
190    IO {
191        #[from]
192        error: std::io::Error,
193    },
194    /// Log error
195    #[error(transparent)]
196    LogError(#[from] LogError),
197    /// End of stream has been reached
198    #[error("No more data")]
199    NoMoreData,
200}
201
202impl JsonDeserializeError {
203    pub fn is_broken_pipe(&self) -> bool {
204        match self {
205            JsonDeserializeError::IO { error } => error.kind() == std::io::ErrorKind::BrokenPipe,
206            JsonDeserializeError::LogError(log_error) => log_error.is_broken_pipe(),
207
208            JsonDeserializeError::Other { .. } | JsonDeserializeError::NoMoreData => false,
209        }
210    }
211}
212
213#[cfg(test)]
214mod test {
215    use super::*;
216    use assert_matches::assert_matches;
217    use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
218    use futures_util::AsyncWriteExt;
219
220    #[fuchsia::test]
221    fn test_one_or_many() {
222        let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
223        assert_eq!(one, OneOrMany::One(1));
224        let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
225        assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
226    }
227
228    const BOOT_TS: i64 = 98765432000000000;
229
230    #[fuchsia::test]
231    async fn test_json_decoder() {
232        // This is intentionally a datagram socket so we can
233        // guarantee torn writes and test all the code paths
234        // in the decoder.
235        let (local, remote) = zx::Socket::create_datagram();
236        let socket = flex_client::socket_to_async(remote);
237        let mut decoder = LogsDataStream::new(socket);
238        let test_log = LogsDataBuilder::new(BuilderArgs {
239            component_url: None,
240            moniker: "ffx".try_into().unwrap(),
241            severity: Severity::Info,
242            timestamp: Timestamp::from_nanos(BOOT_TS),
243        })
244        .set_message("Hello world!")
245        .add_tag("Some tag")
246        .build();
247        let serialized_log = serde_json::to_string(&test_log).unwrap();
248        let serialized_bytes = serialized_log.as_bytes();
249        let part_a = &serialized_bytes[..15];
250        let part_b = &serialized_bytes[15..20];
251        let part_c = &serialized_bytes[20..];
252        local.write(part_a).unwrap();
253        local.write(part_b).unwrap();
254        local.write(part_c).unwrap();
255        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
256    }
257
258    #[fuchsia::test]
259    async fn test_json_decoder_regular_message() {
260        // This is intentionally a datagram socket so we can
261        // send the entire message as one "packet".
262        let (local, remote) = zx::Socket::create_datagram();
263        let socket = flex_client::socket_to_async(remote);
264        let mut decoder = LogsDataStream::new(socket);
265        let test_log = LogsDataBuilder::new(BuilderArgs {
266            component_url: None,
267            moniker: "ffx".try_into().unwrap(),
268            severity: Severity::Info,
269            timestamp: Timestamp::from_nanos(BOOT_TS),
270        })
271        .set_message("Hello world!")
272        .add_tag("Some tag")
273        .build();
274        let serialized_log = serde_json::to_string(&test_log).unwrap();
275        let serialized_bytes = serialized_log.as_bytes();
276        local.write(serialized_bytes).unwrap();
277        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
278    }
279
280    #[fuchsia::test]
281    async fn test_json_decoder_large_message() {
282        const MSG_COUNT: usize = 100;
283        let (local, remote) = zx::Socket::create_stream();
284        let socket = flex_client::socket_to_async(remote);
285        let mut decoder = Box::pin(stream_json::<LogsData>(socket));
286        let test_logs = (0..MSG_COUNT)
287            .map(|value| {
288                LogsDataBuilder::new(BuilderArgs {
289                    component_url: None,
290                    moniker: "ffx".try_into().unwrap(),
291                    severity: Severity::Info,
292                    timestamp: Timestamp::from_nanos(BOOT_TS),
293                })
294                .set_message(format!("Hello world! {value}"))
295                .add_tag("Some tag")
296                .build()
297            })
298            .collect::<Vec<_>>();
299        let mut local = flex_client::socket_to_async(local);
300        let test_logs_clone = test_logs.clone();
301        let _write_task = fuchsia_async::Task::local(async move {
302            for log in test_logs {
303                let serialized_log = serde_json::to_string(&log).unwrap();
304                let serialized_bytes = serialized_log.as_bytes();
305                local.write_all(serialized_bytes).await.unwrap();
306            }
307        });
308        for item in test_logs_clone.iter().take(MSG_COUNT) {
309            assert_eq!(&decoder.next().await.unwrap().unwrap(), item);
310        }
311    }
312
313    #[fuchsia::test]
314    async fn test_json_decoder_large_single_message() {
315        // At least 10MB of characters in a single message
316        const CHAR_COUNT: usize = 1000 * 1000;
317        let (local, remote) = zx::Socket::create_stream();
318        let socket = flex_client::socket_to_async(remote);
319        let mut decoder = Box::pin(stream_json::<LogsData>(socket));
320        let test_log = LogsDataBuilder::new(BuilderArgs {
321            component_url: None,
322            moniker: "ffx".try_into().unwrap(),
323            severity: Severity::Info,
324            timestamp: Timestamp::from_nanos(BOOT_TS),
325        })
326        .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
327        .add_tag("Some tag")
328        .build();
329        let mut local = flex_client::socket_to_async(local);
330        let test_log_clone = test_log.clone();
331        let _write_task = fuchsia_async::Task::local(async move {
332            let serialized_log = serde_json::to_string(&test_log).unwrap();
333            let serialized_bytes = serialized_log.as_bytes();
334            local.write_all(serialized_bytes).await.unwrap();
335        });
336        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log_clone);
337    }
338
339    #[fuchsia::test]
340    async fn test_json_decoder_truncated_message() {
341        // This is intentionally a datagram socket so we can
342        // guarantee torn writes and test all the code paths
343        // in the decoder.
344        let (local, remote) = zx::Socket::create_datagram();
345        let socket = flex_client::socket_to_async(remote);
346        let mut decoder = LogsDataStream::new(socket);
347        let test_log = LogsDataBuilder::new(BuilderArgs {
348            component_url: None,
349            moniker: "ffx".try_into().unwrap(),
350            severity: Severity::Info,
351            timestamp: Timestamp::from_nanos(BOOT_TS),
352        })
353        .set_message("Hello world!")
354        .add_tag("Some tag")
355        .build();
356        let serialized_log = serde_json::to_string(&test_log).unwrap();
357        let serialized_bytes = serialized_log.as_bytes();
358        let part_a = &serialized_bytes[..15];
359        let part_b = &serialized_bytes[15..20];
360        local.write(part_a).unwrap();
361        local.write(part_b).unwrap();
362        drop(local);
363        assert_matches!(decoder.next().await, None);
364    }
365    #[fuchsia::test]
366    async fn test_json_decoder_invalid_json() {
367        let (local, remote) = zx::Socket::create_stream();
368        let socket = flex_client::socket_to_async(remote);
369        let mut decoder = LogsDataStream::new(socket);
370
371        let mut local = flex_client::socket_to_async(local);
372
373        // Write invalid JSON
374        local.write_all(b"invalid json").await.unwrap();
375
376        // Write valid JSON
377        let test_log = LogsDataBuilder::new(BuilderArgs {
378            component_url: None,
379            moniker: "ffx".try_into().unwrap(),
380            severity: Severity::Info,
381            timestamp: Timestamp::from_nanos(BOOT_TS),
382        })
383        .set_message("Recovery log")
384        .build();
385        let serialized_log = serde_json::to_string(&test_log).unwrap();
386        local.write_all(serialized_log.as_bytes()).await.unwrap();
387
388        // We drop the socket end to signal EOF, ensuring the loop eventually terminates
389        // if it doesn't get stuck.
390        drop(local);
391
392        // Attempt to read.
393        // Expected behavior: The invalid JSON triggers a JsonDeserializeError.
394
395        let result = decoder.next().await;
396        // Verify we get an error!
397        assert!(result.unwrap().is_err());
398
399        // Note: Depending on the implementation, we might see the valid log after the error,
400        // OR the stream might terminate/reset in a way that consumes it?
401        // In my implementation, I break the loop after error.
402        // `read_offset` advances by `byte_offset`.
403        // The error "invalid json" has checking...
404        // If "invalid json" is < 12 chars.
405        // The parser might consume some of it.
406        // If we want to verify we recover:
407        let result2 = decoder.next().await;
408        // If we recovered and consumed "invalid json", we might see test_log next.
409        // Or we might see another error if "invalid json" wasn't fully skipped?
410        // Since `byte_offset()` returns where error happened.
411        // If it returns offset 0?
412        // Let's assert we get SOMETHING (either another error or the log).
413        // Since the bug was a HANG, getting anything is success.
414        // But getting the Log is ideal.
415        match result2 {
416            Some(Ok(log)) => assert_eq!(log, test_log),
417            Some(Err(_)) => {
418                // If we get another error, that's acceptable too if we are skipping garbage.
419                // But ideally we eventually see the log.
420                let result3 = decoder.next().await;
421                if let Some(Ok(log)) = result3 {
422                    assert_eq!(log, test_log);
423                }
424            }
425            None => {} // Stream ended.
426        }
427    }
428
429    #[test]
430    fn test_json_deserialize_error_is_broken_pipe() {
431        assert!(
432            JsonDeserializeError::IO {
433                error: std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe")
434            }
435            .is_broken_pipe()
436        );
437        assert!(
438            !JsonDeserializeError::IO { error: std::io::Error::other("other") }.is_broken_pipe()
439        );
440        assert!(!JsonDeserializeError::NoMoreData.is_broken_pipe());
441    }
442}