1use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24 mut socket: flex_client::AsyncSocket,
25) -> impl Stream<Item = Result<OneOrMany<T>, JsonDeserializeError>>
26where
27 T: DeserializeOwned,
28{
29 stream! {
30 let mut buffer = vec![0; BUFFER_SIZE];
31 let mut write_offset = 0;
32 let mut read_offset = 0;
33 let mut available = 0;
34 loop {
35 debug_assert!(write_offset <= buffer.len());
37 if write_offset == buffer.len() {
38 buffer.resize(buffer.len() + INC, 0);
39 }
40 let read_res = socket.read(&mut buffer[write_offset..]).await;
41 let Ok(socket_bytes_read) = read_res else {
42 log::warn!("socket.read({write_offset}..) failed: {read_res:?}");
43 break;
44 };
45 if socket_bytes_read == 0 {
46 break;
47 }
48 write_offset += socket_bytes_read;
49 available += socket_bytes_read;
50 let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
51 .into_iter();
52 let mut read_nothing = true;
53 loop {
54 match des.next() {
55 Some(Ok(item)) => {
56 read_nothing = false;
57 yield Ok(item);
58 }
59 Some(Err(e)) => {
60 if e.is_eof() {
61 break;
62 }
63 read_nothing = false;
64 yield Err(JsonDeserializeError::Other { error: e.into() });
65 break;
66 }
67 None => break,
68 }
69 }
70
71 if read_nothing {
74 continue;
75 }
76 let byte_offset = des.byte_offset();
77 if byte_offset+read_offset == available {
78 available = 0;
79 write_offset = 0;
80 read_offset = 0;
81 buffer.resize(READ_BUFFER_SIZE, 0);
82 } else {
83 read_offset += byte_offset;
84 }
85 }
86 }
87}
88
89fn stream_json<T>(
91 socket: flex_client::AsyncSocket,
92) -> impl Stream<Item = Result<T, JsonDeserializeError>>
93where
94 T: DeserializeOwned,
95{
96 stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
97 .map(|item| {
98 let items = match item {
99 Ok(OneOrMany::One(item)) => vec![Ok(item)],
100 Ok(OneOrMany::Many(items)) => items.into_iter().map(Ok).collect(),
101 Err(e) => vec![Err(e)],
102 };
103 futures_util::stream::iter(items)
104 })
105 .flatten()
106}
107
108pub struct LogsDataStream {
110 inner: Pin<Box<dyn Stream<Item = Result<LogsData, JsonDeserializeError>> + Send>>,
111}
112
113impl LogsDataStream {
114 pub fn new(socket: flex_client::AsyncSocket) -> Self {
116 Self { inner: Box::pin(stream_json(socket)) }
117 }
118}
119
120impl Stream for LogsDataStream {
121 type Item = Result<LogsData, JsonDeserializeError>;
122
123 fn poll_next(
124 mut self: std::pin::Pin<&mut Self>,
125 cx: &mut std::task::Context<'_>,
126 ) -> std::task::Poll<Option<Self::Item>> {
127 self.inner.poll_next_unpin(cx)
128 }
129}
130
131#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
133#[serde(untagged)]
134pub enum OneOrMany<T> {
135 One(T),
136 Many(Vec<T>),
137}
138
139pub enum OneOrManyIterator<T> {
140 One(Option<T>),
141 Many(std::vec::IntoIter<T>),
142}
143
144impl<T> Iterator for OneOrManyIterator<T> {
145 type Item = T;
146
147 fn next(&mut self) -> Option<Self::Item> {
148 match self {
149 OneOrManyIterator::One(v) => v.take(),
150 OneOrManyIterator::Many(v) => v.next(),
151 }
152 }
153}
154
155impl<T> IntoIterator for OneOrMany<T> {
156 type Item = T;
157 type IntoIter = OneOrManyIterator<T>;
158
159 fn into_iter(self) -> Self::IntoIter {
160 match self {
161 OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
162 OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
163 }
164 }
165}
166
167impl<'a, T> IntoIterator for &'a OneOrMany<T> {
168 type Item = &'a T;
169 type IntoIter = std::slice::Iter<'a, T>;
170
171 fn into_iter(self) -> Self::IntoIter {
172 match self {
173 OneOrMany::One(v) => std::slice::from_ref(v).iter(),
174 OneOrMany::Many(v) => v.iter(),
175 }
176 }
177}
178
179#[derive(Error, Debug)]
181pub enum JsonDeserializeError {
182 #[error(transparent)]
184 Other {
185 #[from]
186 error: anyhow::Error,
187 },
188 #[error("IO error {}", error)]
190 IO {
191 #[from]
192 error: std::io::Error,
193 },
194 #[error(transparent)]
196 LogError(#[from] LogError),
197 #[error("No more data")]
199 NoMoreData,
200}
201
202impl JsonDeserializeError {
203 pub fn is_broken_pipe(&self) -> bool {
204 match self {
205 JsonDeserializeError::IO { error } => error.kind() == std::io::ErrorKind::BrokenPipe,
206 JsonDeserializeError::LogError(log_error) => log_error.is_broken_pipe(),
207
208 JsonDeserializeError::Other { .. } | JsonDeserializeError::NoMoreData => false,
209 }
210 }
211}
212
213#[cfg(test)]
214mod test {
215 use super::*;
216 use assert_matches::assert_matches;
217 use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
218 use futures_util::AsyncWriteExt;
219
220 #[fuchsia::test]
221 fn test_one_or_many() {
222 let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
223 assert_eq!(one, OneOrMany::One(1));
224 let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
225 assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
226 }
227
228 const BOOT_TS: i64 = 98765432000000000;
229
230 #[fuchsia::test]
231 async fn test_json_decoder() {
232 let (local, remote) = zx::Socket::create_datagram();
236 let socket = flex_client::socket_to_async(remote);
237 let mut decoder = LogsDataStream::new(socket);
238 let test_log = LogsDataBuilder::new(BuilderArgs {
239 component_url: None,
240 moniker: "ffx".try_into().unwrap(),
241 severity: Severity::Info,
242 timestamp: Timestamp::from_nanos(BOOT_TS),
243 })
244 .set_message("Hello world!")
245 .add_tag("Some tag")
246 .build();
247 let serialized_log = serde_json::to_string(&test_log).unwrap();
248 let serialized_bytes = serialized_log.as_bytes();
249 let part_a = &serialized_bytes[..15];
250 let part_b = &serialized_bytes[15..20];
251 let part_c = &serialized_bytes[20..];
252 local.write(part_a).unwrap();
253 local.write(part_b).unwrap();
254 local.write(part_c).unwrap();
255 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
256 }
257
258 #[fuchsia::test]
259 async fn test_json_decoder_regular_message() {
260 let (local, remote) = zx::Socket::create_datagram();
263 let socket = flex_client::socket_to_async(remote);
264 let mut decoder = LogsDataStream::new(socket);
265 let test_log = LogsDataBuilder::new(BuilderArgs {
266 component_url: None,
267 moniker: "ffx".try_into().unwrap(),
268 severity: Severity::Info,
269 timestamp: Timestamp::from_nanos(BOOT_TS),
270 })
271 .set_message("Hello world!")
272 .add_tag("Some tag")
273 .build();
274 let serialized_log = serde_json::to_string(&test_log).unwrap();
275 let serialized_bytes = serialized_log.as_bytes();
276 local.write(serialized_bytes).unwrap();
277 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
278 }
279
280 #[fuchsia::test]
281 async fn test_json_decoder_large_message() {
282 const MSG_COUNT: usize = 100;
283 let (local, remote) = zx::Socket::create_stream();
284 let socket = flex_client::socket_to_async(remote);
285 let mut decoder = Box::pin(stream_json::<LogsData>(socket));
286 let test_logs = (0..MSG_COUNT)
287 .map(|value| {
288 LogsDataBuilder::new(BuilderArgs {
289 component_url: None,
290 moniker: "ffx".try_into().unwrap(),
291 severity: Severity::Info,
292 timestamp: Timestamp::from_nanos(BOOT_TS),
293 })
294 .set_message(format!("Hello world! {value}"))
295 .add_tag("Some tag")
296 .build()
297 })
298 .collect::<Vec<_>>();
299 let mut local = flex_client::socket_to_async(local);
300 let test_logs_clone = test_logs.clone();
301 let _write_task = fuchsia_async::Task::local(async move {
302 for log in test_logs {
303 let serialized_log = serde_json::to_string(&log).unwrap();
304 let serialized_bytes = serialized_log.as_bytes();
305 local.write_all(serialized_bytes).await.unwrap();
306 }
307 });
308 for item in test_logs_clone.iter().take(MSG_COUNT) {
309 assert_eq!(&decoder.next().await.unwrap().unwrap(), item);
310 }
311 }
312
313 #[fuchsia::test]
314 async fn test_json_decoder_large_single_message() {
315 const CHAR_COUNT: usize = 1000 * 1000;
317 let (local, remote) = zx::Socket::create_stream();
318 let socket = flex_client::socket_to_async(remote);
319 let mut decoder = Box::pin(stream_json::<LogsData>(socket));
320 let test_log = LogsDataBuilder::new(BuilderArgs {
321 component_url: None,
322 moniker: "ffx".try_into().unwrap(),
323 severity: Severity::Info,
324 timestamp: Timestamp::from_nanos(BOOT_TS),
325 })
326 .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
327 .add_tag("Some tag")
328 .build();
329 let mut local = flex_client::socket_to_async(local);
330 let test_log_clone = test_log.clone();
331 let _write_task = fuchsia_async::Task::local(async move {
332 let serialized_log = serde_json::to_string(&test_log).unwrap();
333 let serialized_bytes = serialized_log.as_bytes();
334 local.write_all(serialized_bytes).await.unwrap();
335 });
336 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log_clone);
337 }
338
339 #[fuchsia::test]
340 async fn test_json_decoder_truncated_message() {
341 let (local, remote) = zx::Socket::create_datagram();
345 let socket = flex_client::socket_to_async(remote);
346 let mut decoder = LogsDataStream::new(socket);
347 let test_log = LogsDataBuilder::new(BuilderArgs {
348 component_url: None,
349 moniker: "ffx".try_into().unwrap(),
350 severity: Severity::Info,
351 timestamp: Timestamp::from_nanos(BOOT_TS),
352 })
353 .set_message("Hello world!")
354 .add_tag("Some tag")
355 .build();
356 let serialized_log = serde_json::to_string(&test_log).unwrap();
357 let serialized_bytes = serialized_log.as_bytes();
358 let part_a = &serialized_bytes[..15];
359 let part_b = &serialized_bytes[15..20];
360 local.write(part_a).unwrap();
361 local.write(part_b).unwrap();
362 drop(local);
363 assert_matches!(decoder.next().await, None);
364 }
365 #[fuchsia::test]
366 async fn test_json_decoder_invalid_json() {
367 let (local, remote) = zx::Socket::create_stream();
368 let socket = flex_client::socket_to_async(remote);
369 let mut decoder = LogsDataStream::new(socket);
370
371 let mut local = flex_client::socket_to_async(local);
372
373 local.write_all(b"invalid json").await.unwrap();
375
376 let test_log = LogsDataBuilder::new(BuilderArgs {
378 component_url: None,
379 moniker: "ffx".try_into().unwrap(),
380 severity: Severity::Info,
381 timestamp: Timestamp::from_nanos(BOOT_TS),
382 })
383 .set_message("Recovery log")
384 .build();
385 let serialized_log = serde_json::to_string(&test_log).unwrap();
386 local.write_all(serialized_log.as_bytes()).await.unwrap();
387
388 drop(local);
391
392 let result = decoder.next().await;
396 assert!(result.unwrap().is_err());
398
399 let result2 = decoder.next().await;
408 match result2 {
416 Some(Ok(log)) => assert_eq!(log, test_log),
417 Some(Err(_)) => {
418 let result3 = decoder.next().await;
421 if let Some(Ok(log)) = result3 {
422 assert_eq!(log, test_log);
423 }
424 }
425 None => {} }
427 }
428
429 #[test]
430 fn test_json_deserialize_error_is_broken_pipe() {
431 assert!(
432 JsonDeserializeError::IO {
433 error: std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broken pipe")
434 }
435 .is_broken_pipe()
436 );
437 assert!(
438 !JsonDeserializeError::IO { error: std::io::Error::other("other") }.is_broken_pipe()
439 );
440 assert!(!JsonDeserializeError::NoMoreData.is_broken_pipe());
441 }
442}