1use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24 mut socket: fuchsia_async::Socket,
25) -> impl Stream<Item = OneOrMany<T>>
26where
27 T: DeserializeOwned,
28{
29 stream! {
30 let mut buffer = vec![0; BUFFER_SIZE];
31 let mut write_offset = 0;
32 let mut read_offset = 0;
33 let mut available = 0;
34 loop {
35 debug_assert!(write_offset <= buffer.len());
37 if write_offset == buffer.len() {
38 buffer.resize(buffer.len() + INC, 0);
39 }
40 let socket_bytes_read = socket.read(&mut buffer[write_offset..]).await.unwrap();
41 if socket_bytes_read == 0 {
42 break;
43 }
44 write_offset += socket_bytes_read;
45 available += socket_bytes_read;
46 let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
47 .into_iter();
48 let mut read_nothing = true;
49 while let Some(Ok(item)) = des.next() {
50 read_nothing = false;
51 yield item;
52 }
53 if read_nothing {
56 continue;
57 }
58 let byte_offset = des.byte_offset();
59 if byte_offset+read_offset == available {
60 available = 0;
61 write_offset = 0;
62 read_offset = 0;
63 buffer.resize(READ_BUFFER_SIZE, 0);
64 } else {
65 read_offset += byte_offset;
66 }
67 }
68 }
69}
70
71fn stream_json<T>(socket: fuchsia_async::Socket) -> impl Stream<Item = T>
73where
74 T: DeserializeOwned,
75{
76 stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
77 .map(futures_util::stream::iter)
78 .flatten()
79}
80
81pub struct LogsDataStream {
83 inner: Pin<Box<dyn Stream<Item = LogsData> + Send>>,
84}
85
86impl LogsDataStream {
87 pub fn new(socket: fuchsia_async::Socket) -> Self {
89 Self { inner: Box::pin(stream_json(socket)) }
90 }
91}
92
93impl Stream for LogsDataStream {
94 type Item = LogsData;
95
96 fn poll_next(
97 mut self: std::pin::Pin<&mut Self>,
98 cx: &mut std::task::Context<'_>,
99 ) -> std::task::Poll<Option<Self::Item>> {
100 self.inner.poll_next_unpin(cx)
101 }
102}
103
104#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
106#[serde(untagged)]
107pub enum OneOrMany<T> {
108 One(T),
109 Many(Vec<T>),
110}
111
112pub enum OneOrManyIterator<T> {
113 One(Option<T>),
114 Many(std::vec::IntoIter<T>),
115}
116
117impl<T> Iterator for OneOrManyIterator<T> {
118 type Item = T;
119
120 fn next(&mut self) -> Option<Self::Item> {
121 match self {
122 OneOrManyIterator::One(v) => v.take(),
123 OneOrManyIterator::Many(v) => v.next(),
124 }
125 }
126}
127
128impl<T> IntoIterator for OneOrMany<T> {
129 type Item = T;
130 type IntoIter = OneOrManyIterator<T>;
131
132 fn into_iter(self) -> Self::IntoIter {
133 match self {
134 OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
135 OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
136 }
137 }
138}
139
140impl<'a, T> IntoIterator for &'a OneOrMany<T> {
141 type Item = &'a T;
142 type IntoIter = std::slice::Iter<'a, T>;
143
144 fn into_iter(self) -> Self::IntoIter {
145 match self {
146 OneOrMany::One(v) => std::slice::from_ref(v).iter(),
147 OneOrMany::Many(v) => v.iter(),
148 }
149 }
150}
151
152#[derive(Error, Debug)]
154pub enum JsonDeserializeError {
155 #[error(transparent)]
157 Other {
158 #[from]
159 error: anyhow::Error,
160 },
161 #[error("IO error {}", error)]
163 IO {
164 #[from]
165 error: std::io::Error,
166 },
167 #[error(transparent)]
169 LogError(#[from] LogError),
170 #[error("No more data")]
172 NoMoreData,
173}
174
175#[cfg(test)]
176mod test {
177 use super::*;
178 use assert_matches::assert_matches;
179 use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
180 use futures_util::AsyncWriteExt;
181
182 #[fuchsia::test]
183 fn test_one_or_many() {
184 let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
185 assert_eq!(one, OneOrMany::One(1));
186 let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
187 assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
188 }
189
190 const BOOT_TS: i64 = 98765432000000000;
191
192 #[fuchsia::test]
193 async fn test_json_decoder() {
194 let (local, remote) = zx::Socket::create_datagram();
198 let socket = fuchsia_async::Socket::from_socket(remote);
199 let mut decoder = LogsDataStream::new(socket);
200 let test_log = LogsDataBuilder::new(BuilderArgs {
201 component_url: None,
202 moniker: "ffx".try_into().unwrap(),
203 severity: Severity::Info,
204 timestamp: Timestamp::from_nanos(BOOT_TS),
205 })
206 .set_message("Hello world!")
207 .add_tag("Some tag")
208 .build();
209 let serialized_log = serde_json::to_string(&test_log).unwrap();
210 let serialized_bytes = serialized_log.as_bytes();
211 let part_a = &serialized_bytes[..15];
212 let part_b = &serialized_bytes[15..20];
213 let part_c = &serialized_bytes[20..];
214 local.write(part_a).unwrap();
215 local.write(part_b).unwrap();
216 local.write(part_c).unwrap();
217 assert_eq!(&decoder.next().await.unwrap(), &test_log);
218 }
219
220 #[fuchsia::test]
221 async fn test_json_decoder_regular_message() {
222 let (local, remote) = zx::Socket::create_datagram();
225 let socket = fuchsia_async::Socket::from_socket(remote);
226 let mut decoder = LogsDataStream::new(socket);
227 let test_log = LogsDataBuilder::new(BuilderArgs {
228 component_url: None,
229 moniker: "ffx".try_into().unwrap(),
230 severity: Severity::Info,
231 timestamp: Timestamp::from_nanos(BOOT_TS),
232 })
233 .set_message("Hello world!")
234 .add_tag("Some tag")
235 .build();
236 let serialized_log = serde_json::to_string(&test_log).unwrap();
237 let serialized_bytes = serialized_log.as_bytes();
238 local.write(serialized_bytes).unwrap();
239 assert_eq!(&decoder.next().await.unwrap(), &test_log);
240 }
241
242 #[fuchsia::test]
243 async fn test_json_decoder_large_message() {
244 const MSG_COUNT: usize = 100;
245 let (local, remote) = zx::Socket::create_stream();
246 let socket = fuchsia_async::Socket::from_socket(remote);
247 let mut decoder = Box::pin(
248 stream_raw_json::<LogsData, 100, 10>(socket).map(futures_util::stream::iter).flatten(),
249 );
250 let test_logs = (0..MSG_COUNT)
251 .map(|value| {
252 LogsDataBuilder::new(BuilderArgs {
253 component_url: None,
254 moniker: "ffx".try_into().unwrap(),
255 severity: Severity::Info,
256 timestamp: Timestamp::from_nanos(BOOT_TS),
257 })
258 .set_message(format!("Hello world! {}", value))
259 .add_tag("Some tag")
260 .build()
261 })
262 .collect::<Vec<_>>();
263 let mut local = fuchsia_async::Socket::from_socket(local);
264 let test_logs_clone = test_logs.clone();
265 let _write_task = fuchsia_async::Task::local(async move {
266 for log in test_logs {
267 let serialized_log = serde_json::to_string(&log).unwrap();
268 let serialized_bytes = serialized_log.as_bytes();
269 local.write_all(serialized_bytes).await.unwrap();
270 }
271 });
272 for item in test_logs_clone.iter().take(MSG_COUNT) {
273 assert_eq!(&decoder.next().await.unwrap(), item);
274 }
275 }
276
277 #[fuchsia::test]
278 async fn test_json_decoder_large_single_message() {
279 const CHAR_COUNT: usize = 1000 * 1000;
281 let (local, remote) = zx::Socket::create_stream();
282 let socket = fuchsia_async::Socket::from_socket(remote);
283 let mut decoder = Box::pin(
284 stream_raw_json::<LogsData, 256000, 20000>(socket)
285 .map(futures_util::stream::iter)
286 .flatten(),
287 );
288 let test_log = LogsDataBuilder::new(BuilderArgs {
289 component_url: None,
290 moniker: "ffx".try_into().unwrap(),
291 severity: Severity::Info,
292 timestamp: Timestamp::from_nanos(BOOT_TS),
293 })
294 .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
295 .add_tag("Some tag")
296 .build();
297 let mut local = fuchsia_async::Socket::from_socket(local);
298 let test_log_clone = test_log.clone();
299 let _write_task = fuchsia_async::Task::local(async move {
300 let serialized_log = serde_json::to_string(&test_log).unwrap();
301 let serialized_bytes = serialized_log.as_bytes();
302 local.write_all(serialized_bytes).await.unwrap();
303 });
304 assert_eq!(&decoder.next().await.unwrap(), &test_log_clone);
305 }
306
307 #[fuchsia::test]
308 async fn test_json_decoder_truncated_message() {
309 let (local, remote) = zx::Socket::create_datagram();
313 let socket = fuchsia_async::Socket::from_socket(remote);
314 let mut decoder = LogsDataStream::new(socket);
315 let test_log = LogsDataBuilder::new(BuilderArgs {
316 component_url: None,
317 moniker: "ffx".try_into().unwrap(),
318 severity: Severity::Info,
319 timestamp: Timestamp::from_nanos(BOOT_TS),
320 })
321 .set_message("Hello world!")
322 .add_tag("Some tag")
323 .build();
324 let serialized_log = serde_json::to_string(&test_log).unwrap();
325 let serialized_bytes = serialized_log.as_bytes();
326 let part_a = &serialized_bytes[..15];
327 let part_b = &serialized_bytes[15..20];
328 local.write(part_a).unwrap();
329 local.write(part_b).unwrap();
330 drop(local);
331 assert_matches!(decoder.next().await, None);
332 }
333}