log_command/
log_socket_stream.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14/// Read buffer size. Sufficiently large to store a large number
15/// of messages to reduce the number of socket read calls we have
16/// to make when reading messages.
17const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19/// Amount to increase the read buffer size by after
20/// each read attempt.
21const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24    mut socket: flex_client::AsyncSocket,
25) -> impl Stream<Item = Result<OneOrMany<T>, JsonDeserializeError>>
26where
27    T: DeserializeOwned,
28{
29    stream! {
30        let mut buffer = vec![0; BUFFER_SIZE];
31        let mut write_offset = 0;
32        let mut read_offset = 0;
33        let mut available = 0;
34        loop {
35            // Read data from socket
36            debug_assert!(write_offset <= buffer.len());
37            if write_offset == buffer.len() {
38                buffer.resize(buffer.len() + INC, 0);
39            }
40            let read_res =  socket.read(&mut buffer[write_offset..]).await;
41            let Ok(socket_bytes_read) = read_res else {
42                log::warn!("socket.read({write_offset}..) failed: {read_res:?}");
43                break;
44            };
45            if socket_bytes_read == 0 {
46                break;
47            }
48            write_offset += socket_bytes_read;
49            available += socket_bytes_read;
50            let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
51                .into_iter();
52            let mut read_nothing = true;
53            loop {
54                match des.next() {
55                    Some(Ok(item)) => {
56                        read_nothing = false;
57                        yield Ok(item);
58                    }
59                    Some(Err(e)) => {
60                        if e.is_eof() {
61                            break;
62                        }
63                        read_nothing = false;
64                        yield Err(JsonDeserializeError::Other { error: e.into() });
65                        break;
66                    }
67                    None => break,
68                }
69            }
70
71            // Don't update the read offset if we haven't successfully
72            // read anything.
73            if read_nothing {
74                continue;
75            }
76            let byte_offset = des.byte_offset();
77            if byte_offset+read_offset == available {
78                available = 0;
79                write_offset = 0;
80                read_offset = 0;
81                buffer.resize(READ_BUFFER_SIZE, 0);
82            } else {
83                read_offset += byte_offset;
84            }
85        }
86    }
87}
88
89/// Streams JSON logs from a socket
90fn stream_json<T>(
91    socket: flex_client::AsyncSocket,
92) -> impl Stream<Item = Result<T, JsonDeserializeError>>
93where
94    T: DeserializeOwned,
95{
96    stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
97        .map(|item| {
98            let items = match item {
99                Ok(OneOrMany::One(item)) => vec![Ok(item)],
100                Ok(OneOrMany::Many(items)) => items.into_iter().map(Ok).collect(),
101                Err(e) => vec![Err(e)],
102            };
103            futures_util::stream::iter(items)
104        })
105        .flatten()
106}
107
108/// Stream of JSON logs from the target device.
109pub struct LogsDataStream {
110    inner: Pin<Box<dyn Stream<Item = Result<LogsData, JsonDeserializeError>> + Send>>,
111}
112
113impl LogsDataStream {
114    /// Creates a new LogsDataStream from a socket of log messages in JSON format.
115    pub fn new(socket: flex_client::AsyncSocket) -> Self {
116        Self { inner: Box::pin(stream_json(socket)) }
117    }
118}
119
120impl Stream for LogsDataStream {
121    type Item = Result<LogsData, JsonDeserializeError>;
122
123    fn poll_next(
124        mut self: std::pin::Pin<&mut Self>,
125        cx: &mut std::task::Context<'_>,
126    ) -> std::task::Poll<Option<Self::Item>> {
127        self.inner.poll_next_unpin(cx)
128    }
129}
130
131/// Something that can contain either a single value or a Vec of values
132#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
133#[serde(untagged)]
134pub enum OneOrMany<T> {
135    One(T),
136    Many(Vec<T>),
137}
138
139pub enum OneOrManyIterator<T> {
140    One(Option<T>),
141    Many(std::vec::IntoIter<T>),
142}
143
144impl<T> Iterator for OneOrManyIterator<T> {
145    type Item = T;
146
147    fn next(&mut self) -> Option<Self::Item> {
148        match self {
149            OneOrManyIterator::One(v) => v.take(),
150            OneOrManyIterator::Many(v) => v.next(),
151        }
152    }
153}
154
155impl<T> IntoIterator for OneOrMany<T> {
156    type Item = T;
157    type IntoIter = OneOrManyIterator<T>;
158
159    fn into_iter(self) -> Self::IntoIter {
160        match self {
161            OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
162            OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
163        }
164    }
165}
166
167impl<'a, T> IntoIterator for &'a OneOrMany<T> {
168    type Item = &'a T;
169    type IntoIter = std::slice::Iter<'a, T>;
170
171    fn into_iter(self) -> Self::IntoIter {
172        match self {
173            OneOrMany::One(v) => std::slice::from_ref(v).iter(),
174            OneOrMany::Many(v) => v.iter(),
175        }
176    }
177}
178
179/// Error type for log streamer
180#[derive(Error, Debug)]
181pub enum JsonDeserializeError {
182    /// Unknown error deserializing JSON
183    #[error(transparent)]
184    Other {
185        #[from]
186        error: anyhow::Error,
187    },
188    /// I/O error
189    #[error("IO error {}", error)]
190    IO {
191        #[from]
192        error: std::io::Error,
193    },
194    /// Log error
195    #[error(transparent)]
196    LogError(#[from] LogError),
197    /// End of stream has been reached
198    #[error("No more data")]
199    NoMoreData,
200}
201
202#[cfg(test)]
203mod test {
204    use super::*;
205    use assert_matches::assert_matches;
206    use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
207    use futures_util::AsyncWriteExt;
208
209    #[fuchsia::test]
210    fn test_one_or_many() {
211        let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
212        assert_eq!(one, OneOrMany::One(1));
213        let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
214        assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
215    }
216
217    const BOOT_TS: i64 = 98765432000000000;
218
219    #[fuchsia::test]
220    async fn test_json_decoder() {
221        // This is intentionally a datagram socket so we can
222        // guarantee torn writes and test all the code paths
223        // in the decoder.
224        let (local, remote) = zx::Socket::create_datagram();
225        let socket = flex_client::socket_to_async(remote);
226        let mut decoder = LogsDataStream::new(socket);
227        let test_log = LogsDataBuilder::new(BuilderArgs {
228            component_url: None,
229            moniker: "ffx".try_into().unwrap(),
230            severity: Severity::Info,
231            timestamp: Timestamp::from_nanos(BOOT_TS),
232        })
233        .set_message("Hello world!")
234        .add_tag("Some tag")
235        .build();
236        let serialized_log = serde_json::to_string(&test_log).unwrap();
237        let serialized_bytes = serialized_log.as_bytes();
238        let part_a = &serialized_bytes[..15];
239        let part_b = &serialized_bytes[15..20];
240        let part_c = &serialized_bytes[20..];
241        local.write(part_a).unwrap();
242        local.write(part_b).unwrap();
243        local.write(part_c).unwrap();
244        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
245    }
246
247    #[fuchsia::test]
248    async fn test_json_decoder_regular_message() {
249        // This is intentionally a datagram socket so we can
250        // send the entire message as one "packet".
251        let (local, remote) = zx::Socket::create_datagram();
252        let socket = flex_client::socket_to_async(remote);
253        let mut decoder = LogsDataStream::new(socket);
254        let test_log = LogsDataBuilder::new(BuilderArgs {
255            component_url: None,
256            moniker: "ffx".try_into().unwrap(),
257            severity: Severity::Info,
258            timestamp: Timestamp::from_nanos(BOOT_TS),
259        })
260        .set_message("Hello world!")
261        .add_tag("Some tag")
262        .build();
263        let serialized_log = serde_json::to_string(&test_log).unwrap();
264        let serialized_bytes = serialized_log.as_bytes();
265        local.write(serialized_bytes).unwrap();
266        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
267    }
268
269    #[fuchsia::test]
270    async fn test_json_decoder_large_message() {
271        const MSG_COUNT: usize = 100;
272        let (local, remote) = zx::Socket::create_stream();
273        let socket = flex_client::socket_to_async(remote);
274        let mut decoder = Box::pin(stream_json::<LogsData>(socket));
275        let test_logs = (0..MSG_COUNT)
276            .map(|value| {
277                LogsDataBuilder::new(BuilderArgs {
278                    component_url: None,
279                    moniker: "ffx".try_into().unwrap(),
280                    severity: Severity::Info,
281                    timestamp: Timestamp::from_nanos(BOOT_TS),
282                })
283                .set_message(format!("Hello world! {value}"))
284                .add_tag("Some tag")
285                .build()
286            })
287            .collect::<Vec<_>>();
288        let mut local = flex_client::socket_to_async(local);
289        let test_logs_clone = test_logs.clone();
290        let _write_task = fuchsia_async::Task::local(async move {
291            for log in test_logs {
292                let serialized_log = serde_json::to_string(&log).unwrap();
293                let serialized_bytes = serialized_log.as_bytes();
294                local.write_all(serialized_bytes).await.unwrap();
295            }
296        });
297        for item in test_logs_clone.iter().take(MSG_COUNT) {
298            assert_eq!(&decoder.next().await.unwrap().unwrap(), item);
299        }
300    }
301
302    #[fuchsia::test]
303    async fn test_json_decoder_large_single_message() {
304        // At least 10MB of characters in a single message
305        const CHAR_COUNT: usize = 1000 * 1000;
306        let (local, remote) = zx::Socket::create_stream();
307        let socket = flex_client::socket_to_async(remote);
308        let mut decoder = Box::pin(stream_json::<LogsData>(socket));
309        let test_log = LogsDataBuilder::new(BuilderArgs {
310            component_url: None,
311            moniker: "ffx".try_into().unwrap(),
312            severity: Severity::Info,
313            timestamp: Timestamp::from_nanos(BOOT_TS),
314        })
315        .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
316        .add_tag("Some tag")
317        .build();
318        let mut local = flex_client::socket_to_async(local);
319        let test_log_clone = test_log.clone();
320        let _write_task = fuchsia_async::Task::local(async move {
321            let serialized_log = serde_json::to_string(&test_log).unwrap();
322            let serialized_bytes = serialized_log.as_bytes();
323            local.write_all(serialized_bytes).await.unwrap();
324        });
325        assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log_clone);
326    }
327
328    #[fuchsia::test]
329    async fn test_json_decoder_truncated_message() {
330        // This is intentionally a datagram socket so we can
331        // guarantee torn writes and test all the code paths
332        // in the decoder.
333        let (local, remote) = zx::Socket::create_datagram();
334        let socket = flex_client::socket_to_async(remote);
335        let mut decoder = LogsDataStream::new(socket);
336        let test_log = LogsDataBuilder::new(BuilderArgs {
337            component_url: None,
338            moniker: "ffx".try_into().unwrap(),
339            severity: Severity::Info,
340            timestamp: Timestamp::from_nanos(BOOT_TS),
341        })
342        .set_message("Hello world!")
343        .add_tag("Some tag")
344        .build();
345        let serialized_log = serde_json::to_string(&test_log).unwrap();
346        let serialized_bytes = serialized_log.as_bytes();
347        let part_a = &serialized_bytes[..15];
348        let part_b = &serialized_bytes[15..20];
349        local.write(part_a).unwrap();
350        local.write(part_b).unwrap();
351        drop(local);
352        assert_matches!(decoder.next().await, None);
353    }
354    #[fuchsia::test]
355    async fn test_json_decoder_invalid_json() {
356        let (local, remote) = zx::Socket::create_stream();
357        let socket = flex_client::socket_to_async(remote);
358        let mut decoder = LogsDataStream::new(socket);
359
360        let mut local = flex_client::socket_to_async(local);
361
362        // Write invalid JSON
363        local.write_all(b"invalid json").await.unwrap();
364
365        // Write valid JSON
366        let test_log = LogsDataBuilder::new(BuilderArgs {
367            component_url: None,
368            moniker: "ffx".try_into().unwrap(),
369            severity: Severity::Info,
370            timestamp: Timestamp::from_nanos(BOOT_TS),
371        })
372        .set_message("Recovery log")
373        .build();
374        let serialized_log = serde_json::to_string(&test_log).unwrap();
375        local.write_all(serialized_log.as_bytes()).await.unwrap();
376
377        // We drop the socket end to signal EOF, ensuring the loop eventually terminates
378        // if it doesn't get stuck.
379        drop(local);
380
381        // Attempt to read.
382        // Expected behavior: The invalid JSON triggers a JsonDeserializeError.
383
384        let result = decoder.next().await;
385        // Verify we get an error!
386        assert!(result.unwrap().is_err());
387
388        // Note: Depending on the implementation, we might see the valid log after the error,
389        // OR the stream might terminate/reset in a way that consumes it?
390        // In my implementation, I break the loop after error.
391        // `read_offset` advances by `byte_offset`.
392        // The error "invalid json" has checking...
393        // If "invalid json" is < 12 chars.
394        // The parser might consume some of it.
395        // If we want to verify we recover:
396        let result2 = decoder.next().await;
397        // If we recovered and consumed "invalid json", we might see test_log next.
398        // Or we might see another error if "invalid json" wasn't fully skipped?
399        // Since `byte_offset()` returns where error happened.
400        // If it returns offset 0?
401        // Let's assert we get SOMETHING (either another error or the log).
402        // Since the bug was a HANG, getting anything is success.
403        // But getting the Log is ideal.
404        match result2 {
405            Some(Ok(log)) => assert_eq!(log, test_log),
406            Some(Err(_)) => {
407                // If we get another error, that's acceptable too if we are skipping garbage.
408                // But ideally we eventually see the log.
409                let result3 = decoder.next().await;
410                if let Some(Ok(log)) = result3 {
411                    assert_eq!(log, test_log);
412                }
413            }
414            None => {} // Stream ended.
415        }
416    }
417}