log_command/
log_socket_stream.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14/// Read buffer size. Sufficiently large to store a large number
15/// of messages to reduce the number of socket read calls we have
16/// to make when reading messages.
17const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19/// Amount to increase the read buffer size by after
20/// each read attempt.
21const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24    mut socket: fuchsia_async::Socket,
25) -> impl Stream<Item = OneOrMany<T>>
26where
27    T: DeserializeOwned,
28{
29    stream! {
30        let mut buffer = vec![0; BUFFER_SIZE];
31        let mut write_offset = 0;
32        let mut read_offset = 0;
33        let mut available = 0;
34        loop {
35            // Read data from socket
36            debug_assert!(write_offset <= buffer.len());
37            if write_offset == buffer.len() {
38                buffer.resize(buffer.len() + INC, 0);
39            }
40            let socket_bytes_read = socket.read(&mut buffer[write_offset..]).await.unwrap();
41            if socket_bytes_read == 0 {
42                break;
43            }
44            write_offset += socket_bytes_read;
45            available += socket_bytes_read;
46            let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
47                .into_iter();
48            let mut read_nothing = true;
49            while let Some(Ok(item)) = des.next() {
50                read_nothing = false;
51                yield item;
52            }
53            // Don't update the read offset if we haven't successfully
54            // read anything.
55            if read_nothing {
56                continue;
57            }
58            let byte_offset = des.byte_offset();
59            if byte_offset+read_offset == available {
60                available = 0;
61                write_offset = 0;
62                read_offset = 0;
63                buffer.resize(READ_BUFFER_SIZE, 0);
64            } else {
65                read_offset += byte_offset;
66            }
67        }
68    }
69}
70
71/// Streams JSON logs from a socket
72fn stream_json<T>(socket: fuchsia_async::Socket) -> impl Stream<Item = T>
73where
74    T: DeserializeOwned,
75{
76    stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
77        .map(futures_util::stream::iter)
78        .flatten()
79}
80
81/// Stream of JSON logs from the target device.
82pub struct LogsDataStream {
83    inner: Pin<Box<dyn Stream<Item = LogsData> + Send>>,
84}
85
86impl LogsDataStream {
87    /// Creates a new LogsDataStream from a socket of log messages in JSON format.
88    pub fn new(socket: fuchsia_async::Socket) -> Self {
89        Self { inner: Box::pin(stream_json(socket)) }
90    }
91}
92
93impl Stream for LogsDataStream {
94    type Item = LogsData;
95
96    fn poll_next(
97        mut self: std::pin::Pin<&mut Self>,
98        cx: &mut std::task::Context<'_>,
99    ) -> std::task::Poll<Option<Self::Item>> {
100        self.inner.poll_next_unpin(cx)
101    }
102}
103
104/// Something that can contain either a single value or a Vec of values
105#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
106#[serde(untagged)]
107pub enum OneOrMany<T> {
108    One(T),
109    Many(Vec<T>),
110}
111
112pub enum OneOrManyIterator<T> {
113    One(Option<T>),
114    Many(std::vec::IntoIter<T>),
115}
116
117impl<T> Iterator for OneOrManyIterator<T> {
118    type Item = T;
119
120    fn next(&mut self) -> Option<Self::Item> {
121        match self {
122            OneOrManyIterator::One(v) => v.take(),
123            OneOrManyIterator::Many(v) => v.next(),
124        }
125    }
126}
127
128impl<T> IntoIterator for OneOrMany<T> {
129    type Item = T;
130    type IntoIter = OneOrManyIterator<T>;
131
132    fn into_iter(self) -> Self::IntoIter {
133        match self {
134            OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
135            OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
136        }
137    }
138}
139
140impl<'a, T> IntoIterator for &'a OneOrMany<T> {
141    type Item = &'a T;
142    type IntoIter = std::slice::Iter<'a, T>;
143
144    fn into_iter(self) -> Self::IntoIter {
145        match self {
146            OneOrMany::One(v) => std::slice::from_ref(v).iter(),
147            OneOrMany::Many(v) => v.iter(),
148        }
149    }
150}
151
152/// Error type for log streamer
153#[derive(Error, Debug)]
154pub enum JsonDeserializeError {
155    /// Unknown error deserializing JSON
156    #[error(transparent)]
157    Other {
158        #[from]
159        error: anyhow::Error,
160    },
161    /// I/O error
162    #[error("IO error {}", error)]
163    IO {
164        #[from]
165        error: std::io::Error,
166    },
167    /// Log error
168    #[error(transparent)]
169    LogError(#[from] LogError),
170    /// End of stream has been reached
171    #[error("No more data")]
172    NoMoreData,
173}
174
175#[cfg(test)]
176mod test {
177    use super::*;
178    use assert_matches::assert_matches;
179    use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
180    use futures_util::AsyncWriteExt;
181
182    #[fuchsia::test]
183    fn test_one_or_many() {
184        let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
185        assert_eq!(one, OneOrMany::One(1));
186        let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
187        assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
188    }
189
190    const BOOT_TS: i64 = 98765432000000000;
191
192    #[fuchsia::test]
193    async fn test_json_decoder() {
194        // This is intentionally a datagram socket so we can
195        // guarantee torn writes and test all the code paths
196        // in the decoder.
197        let (local, remote) = zx::Socket::create_datagram();
198        let socket = fuchsia_async::Socket::from_socket(remote);
199        let mut decoder = LogsDataStream::new(socket);
200        let test_log = LogsDataBuilder::new(BuilderArgs {
201            component_url: None,
202            moniker: "ffx".try_into().unwrap(),
203            severity: Severity::Info,
204            timestamp: Timestamp::from_nanos(BOOT_TS),
205        })
206        .set_message("Hello world!")
207        .add_tag("Some tag")
208        .build();
209        let serialized_log = serde_json::to_string(&test_log).unwrap();
210        let serialized_bytes = serialized_log.as_bytes();
211        let part_a = &serialized_bytes[..15];
212        let part_b = &serialized_bytes[15..20];
213        let part_c = &serialized_bytes[20..];
214        local.write(part_a).unwrap();
215        local.write(part_b).unwrap();
216        local.write(part_c).unwrap();
217        assert_eq!(&decoder.next().await.unwrap(), &test_log);
218    }
219
220    #[fuchsia::test]
221    async fn test_json_decoder_regular_message() {
222        // This is intentionally a datagram socket so we can
223        // send the entire message as one "packet".
224        let (local, remote) = zx::Socket::create_datagram();
225        let socket = fuchsia_async::Socket::from_socket(remote);
226        let mut decoder = LogsDataStream::new(socket);
227        let test_log = LogsDataBuilder::new(BuilderArgs {
228            component_url: None,
229            moniker: "ffx".try_into().unwrap(),
230            severity: Severity::Info,
231            timestamp: Timestamp::from_nanos(BOOT_TS),
232        })
233        .set_message("Hello world!")
234        .add_tag("Some tag")
235        .build();
236        let serialized_log = serde_json::to_string(&test_log).unwrap();
237        let serialized_bytes = serialized_log.as_bytes();
238        local.write(serialized_bytes).unwrap();
239        assert_eq!(&decoder.next().await.unwrap(), &test_log);
240    }
241
242    #[fuchsia::test]
243    async fn test_json_decoder_large_message() {
244        const MSG_COUNT: usize = 100;
245        let (local, remote) = zx::Socket::create_stream();
246        let socket = fuchsia_async::Socket::from_socket(remote);
247        let mut decoder = Box::pin(
248            stream_raw_json::<LogsData, 100, 10>(socket).map(futures_util::stream::iter).flatten(),
249        );
250        let test_logs = (0..MSG_COUNT)
251            .map(|value| {
252                LogsDataBuilder::new(BuilderArgs {
253                    component_url: None,
254                    moniker: "ffx".try_into().unwrap(),
255                    severity: Severity::Info,
256                    timestamp: Timestamp::from_nanos(BOOT_TS),
257                })
258                .set_message(format!("Hello world! {}", value))
259                .add_tag("Some tag")
260                .build()
261            })
262            .collect::<Vec<_>>();
263        let mut local = fuchsia_async::Socket::from_socket(local);
264        let test_logs_clone = test_logs.clone();
265        let _write_task = fuchsia_async::Task::local(async move {
266            for log in test_logs {
267                let serialized_log = serde_json::to_string(&log).unwrap();
268                let serialized_bytes = serialized_log.as_bytes();
269                local.write_all(serialized_bytes).await.unwrap();
270            }
271        });
272        for item in test_logs_clone.iter().take(MSG_COUNT) {
273            assert_eq!(&decoder.next().await.unwrap(), item);
274        }
275    }
276
277    #[fuchsia::test]
278    async fn test_json_decoder_large_single_message() {
279        // At least 10MB of characters in a single message
280        const CHAR_COUNT: usize = 1000 * 1000;
281        let (local, remote) = zx::Socket::create_stream();
282        let socket = fuchsia_async::Socket::from_socket(remote);
283        let mut decoder = Box::pin(
284            stream_raw_json::<LogsData, 256000, 20000>(socket)
285                .map(futures_util::stream::iter)
286                .flatten(),
287        );
288        let test_log = LogsDataBuilder::new(BuilderArgs {
289            component_url: None,
290            moniker: "ffx".try_into().unwrap(),
291            severity: Severity::Info,
292            timestamp: Timestamp::from_nanos(BOOT_TS),
293        })
294        .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
295        .add_tag("Some tag")
296        .build();
297        let mut local = fuchsia_async::Socket::from_socket(local);
298        let test_log_clone = test_log.clone();
299        let _write_task = fuchsia_async::Task::local(async move {
300            let serialized_log = serde_json::to_string(&test_log).unwrap();
301            let serialized_bytes = serialized_log.as_bytes();
302            local.write_all(serialized_bytes).await.unwrap();
303        });
304        assert_eq!(&decoder.next().await.unwrap(), &test_log_clone);
305    }
306
307    #[fuchsia::test]
308    async fn test_json_decoder_truncated_message() {
309        // This is intentionally a datagram socket so we can
310        // guarantee torn writes and test all the code paths
311        // in the decoder.
312        let (local, remote) = zx::Socket::create_datagram();
313        let socket = fuchsia_async::Socket::from_socket(remote);
314        let mut decoder = LogsDataStream::new(socket);
315        let test_log = LogsDataBuilder::new(BuilderArgs {
316            component_url: None,
317            moniker: "ffx".try_into().unwrap(),
318            severity: Severity::Info,
319            timestamp: Timestamp::from_nanos(BOOT_TS),
320        })
321        .set_message("Hello world!")
322        .add_tag("Some tag")
323        .build();
324        let serialized_log = serde_json::to_string(&test_log).unwrap();
325        let serialized_bytes = serialized_log.as_bytes();
326        let part_a = &serialized_bytes[..15];
327        let part_b = &serialized_bytes[15..20];
328        local.write(part_a).unwrap();
329        local.write(part_b).unwrap();
330        drop(local);
331        assert_matches!(decoder.next().await, None);
332    }
333}