1use crate::LogError;
6use async_stream::stream;
7use diagnostics_data::LogsData;
8use futures_util::{AsyncReadExt, Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::pin::Pin;
12use thiserror::Error;
13
14const READ_BUFFER_SIZE: usize = 1000 * 1000 * 2;
18
19const READ_BUFFER_INCREMENT: usize = 1000 * 256;
22
23fn stream_raw_json<T, const BUFFER_SIZE: usize, const INC: usize>(
24 mut socket: flex_client::AsyncSocket,
25) -> impl Stream<Item = Result<OneOrMany<T>, JsonDeserializeError>>
26where
27 T: DeserializeOwned,
28{
29 stream! {
30 let mut buffer = vec![0; BUFFER_SIZE];
31 let mut write_offset = 0;
32 let mut read_offset = 0;
33 let mut available = 0;
34 loop {
35 debug_assert!(write_offset <= buffer.len());
37 if write_offset == buffer.len() {
38 buffer.resize(buffer.len() + INC, 0);
39 }
40 let read_res = socket.read(&mut buffer[write_offset..]).await;
41 let Ok(socket_bytes_read) = read_res else {
42 log::warn!("socket.read({write_offset}..) failed: {read_res:?}");
43 break;
44 };
45 if socket_bytes_read == 0 {
46 break;
47 }
48 write_offset += socket_bytes_read;
49 available += socket_bytes_read;
50 let mut des = serde_json::Deserializer::from_slice(&buffer[read_offset..available])
51 .into_iter();
52 let mut read_nothing = true;
53 loop {
54 match des.next() {
55 Some(Ok(item)) => {
56 read_nothing = false;
57 yield Ok(item);
58 }
59 Some(Err(e)) => {
60 if e.is_eof() {
61 break;
62 }
63 read_nothing = false;
64 yield Err(JsonDeserializeError::Other { error: e.into() });
65 break;
66 }
67 None => break,
68 }
69 }
70
71 if read_nothing {
74 continue;
75 }
76 let byte_offset = des.byte_offset();
77 if byte_offset+read_offset == available {
78 available = 0;
79 write_offset = 0;
80 read_offset = 0;
81 buffer.resize(READ_BUFFER_SIZE, 0);
82 } else {
83 read_offset += byte_offset;
84 }
85 }
86 }
87}
88
89fn stream_json<T>(
91 socket: flex_client::AsyncSocket,
92) -> impl Stream<Item = Result<T, JsonDeserializeError>>
93where
94 T: DeserializeOwned,
95{
96 stream_raw_json::<T, READ_BUFFER_SIZE, READ_BUFFER_INCREMENT>(socket)
97 .map(|item| {
98 let items = match item {
99 Ok(OneOrMany::One(item)) => vec![Ok(item)],
100 Ok(OneOrMany::Many(items)) => items.into_iter().map(Ok).collect(),
101 Err(e) => vec![Err(e)],
102 };
103 futures_util::stream::iter(items)
104 })
105 .flatten()
106}
107
108pub struct LogsDataStream {
110 inner: Pin<Box<dyn Stream<Item = Result<LogsData, JsonDeserializeError>> + Send>>,
111}
112
113impl LogsDataStream {
114 pub fn new(socket: flex_client::AsyncSocket) -> Self {
116 Self { inner: Box::pin(stream_json(socket)) }
117 }
118}
119
120impl Stream for LogsDataStream {
121 type Item = Result<LogsData, JsonDeserializeError>;
122
123 fn poll_next(
124 mut self: std::pin::Pin<&mut Self>,
125 cx: &mut std::task::Context<'_>,
126 ) -> std::task::Poll<Option<Self::Item>> {
127 self.inner.poll_next_unpin(cx)
128 }
129}
130
131#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
133#[serde(untagged)]
134pub enum OneOrMany<T> {
135 One(T),
136 Many(Vec<T>),
137}
138
139pub enum OneOrManyIterator<T> {
140 One(Option<T>),
141 Many(std::vec::IntoIter<T>),
142}
143
144impl<T> Iterator for OneOrManyIterator<T> {
145 type Item = T;
146
147 fn next(&mut self) -> Option<Self::Item> {
148 match self {
149 OneOrManyIterator::One(v) => v.take(),
150 OneOrManyIterator::Many(v) => v.next(),
151 }
152 }
153}
154
155impl<T> IntoIterator for OneOrMany<T> {
156 type Item = T;
157 type IntoIter = OneOrManyIterator<T>;
158
159 fn into_iter(self) -> Self::IntoIter {
160 match self {
161 OneOrMany::One(v) => OneOrManyIterator::One(Some(v)),
162 OneOrMany::Many(v) => OneOrManyIterator::Many(v.into_iter()),
163 }
164 }
165}
166
167impl<'a, T> IntoIterator for &'a OneOrMany<T> {
168 type Item = &'a T;
169 type IntoIter = std::slice::Iter<'a, T>;
170
171 fn into_iter(self) -> Self::IntoIter {
172 match self {
173 OneOrMany::One(v) => std::slice::from_ref(v).iter(),
174 OneOrMany::Many(v) => v.iter(),
175 }
176 }
177}
178
179#[derive(Error, Debug)]
181pub enum JsonDeserializeError {
182 #[error(transparent)]
184 Other {
185 #[from]
186 error: anyhow::Error,
187 },
188 #[error("IO error {}", error)]
190 IO {
191 #[from]
192 error: std::io::Error,
193 },
194 #[error(transparent)]
196 LogError(#[from] LogError),
197 #[error("No more data")]
199 NoMoreData,
200}
201
202#[cfg(test)]
203mod test {
204 use super::*;
205 use assert_matches::assert_matches;
206 use diagnostics_data::{BuilderArgs, LogsDataBuilder, Severity, Timestamp};
207 use futures_util::AsyncWriteExt;
208
209 #[fuchsia::test]
210 fn test_one_or_many() {
211 let one: OneOrMany<u32> = serde_json::from_str("1").unwrap();
212 assert_eq!(one, OneOrMany::One(1));
213 let many: OneOrMany<u32> = serde_json::from_str("[1,2,3]").unwrap();
214 assert_eq!(many, OneOrMany::Many(vec![1, 2, 3]));
215 }
216
217 const BOOT_TS: i64 = 98765432000000000;
218
219 #[fuchsia::test]
220 async fn test_json_decoder() {
221 let (local, remote) = zx::Socket::create_datagram();
225 let socket = flex_client::socket_to_async(remote);
226 let mut decoder = LogsDataStream::new(socket);
227 let test_log = LogsDataBuilder::new(BuilderArgs {
228 component_url: None,
229 moniker: "ffx".try_into().unwrap(),
230 severity: Severity::Info,
231 timestamp: Timestamp::from_nanos(BOOT_TS),
232 })
233 .set_message("Hello world!")
234 .add_tag("Some tag")
235 .build();
236 let serialized_log = serde_json::to_string(&test_log).unwrap();
237 let serialized_bytes = serialized_log.as_bytes();
238 let part_a = &serialized_bytes[..15];
239 let part_b = &serialized_bytes[15..20];
240 let part_c = &serialized_bytes[20..];
241 local.write(part_a).unwrap();
242 local.write(part_b).unwrap();
243 local.write(part_c).unwrap();
244 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
245 }
246
247 #[fuchsia::test]
248 async fn test_json_decoder_regular_message() {
249 let (local, remote) = zx::Socket::create_datagram();
252 let socket = flex_client::socket_to_async(remote);
253 let mut decoder = LogsDataStream::new(socket);
254 let test_log = LogsDataBuilder::new(BuilderArgs {
255 component_url: None,
256 moniker: "ffx".try_into().unwrap(),
257 severity: Severity::Info,
258 timestamp: Timestamp::from_nanos(BOOT_TS),
259 })
260 .set_message("Hello world!")
261 .add_tag("Some tag")
262 .build();
263 let serialized_log = serde_json::to_string(&test_log).unwrap();
264 let serialized_bytes = serialized_log.as_bytes();
265 local.write(serialized_bytes).unwrap();
266 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log);
267 }
268
269 #[fuchsia::test]
270 async fn test_json_decoder_large_message() {
271 const MSG_COUNT: usize = 100;
272 let (local, remote) = zx::Socket::create_stream();
273 let socket = flex_client::socket_to_async(remote);
274 let mut decoder = Box::pin(stream_json::<LogsData>(socket));
275 let test_logs = (0..MSG_COUNT)
276 .map(|value| {
277 LogsDataBuilder::new(BuilderArgs {
278 component_url: None,
279 moniker: "ffx".try_into().unwrap(),
280 severity: Severity::Info,
281 timestamp: Timestamp::from_nanos(BOOT_TS),
282 })
283 .set_message(format!("Hello world! {value}"))
284 .add_tag("Some tag")
285 .build()
286 })
287 .collect::<Vec<_>>();
288 let mut local = flex_client::socket_to_async(local);
289 let test_logs_clone = test_logs.clone();
290 let _write_task = fuchsia_async::Task::local(async move {
291 for log in test_logs {
292 let serialized_log = serde_json::to_string(&log).unwrap();
293 let serialized_bytes = serialized_log.as_bytes();
294 local.write_all(serialized_bytes).await.unwrap();
295 }
296 });
297 for item in test_logs_clone.iter().take(MSG_COUNT) {
298 assert_eq!(&decoder.next().await.unwrap().unwrap(), item);
299 }
300 }
301
302 #[fuchsia::test]
303 async fn test_json_decoder_large_single_message() {
304 const CHAR_COUNT: usize = 1000 * 1000;
306 let (local, remote) = zx::Socket::create_stream();
307 let socket = flex_client::socket_to_async(remote);
308 let mut decoder = Box::pin(stream_json::<LogsData>(socket));
309 let test_log = LogsDataBuilder::new(BuilderArgs {
310 component_url: None,
311 moniker: "ffx".try_into().unwrap(),
312 severity: Severity::Info,
313 timestamp: Timestamp::from_nanos(BOOT_TS),
314 })
315 .set_message(format!("Hello world! {}", "h".repeat(CHAR_COUNT)))
316 .add_tag("Some tag")
317 .build();
318 let mut local = flex_client::socket_to_async(local);
319 let test_log_clone = test_log.clone();
320 let _write_task = fuchsia_async::Task::local(async move {
321 let serialized_log = serde_json::to_string(&test_log).unwrap();
322 let serialized_bytes = serialized_log.as_bytes();
323 local.write_all(serialized_bytes).await.unwrap();
324 });
325 assert_eq!(&decoder.next().await.unwrap().unwrap(), &test_log_clone);
326 }
327
328 #[fuchsia::test]
329 async fn test_json_decoder_truncated_message() {
330 let (local, remote) = zx::Socket::create_datagram();
334 let socket = flex_client::socket_to_async(remote);
335 let mut decoder = LogsDataStream::new(socket);
336 let test_log = LogsDataBuilder::new(BuilderArgs {
337 component_url: None,
338 moniker: "ffx".try_into().unwrap(),
339 severity: Severity::Info,
340 timestamp: Timestamp::from_nanos(BOOT_TS),
341 })
342 .set_message("Hello world!")
343 .add_tag("Some tag")
344 .build();
345 let serialized_log = serde_json::to_string(&test_log).unwrap();
346 let serialized_bytes = serialized_log.as_bytes();
347 let part_a = &serialized_bytes[..15];
348 let part_b = &serialized_bytes[15..20];
349 local.write(part_a).unwrap();
350 local.write(part_b).unwrap();
351 drop(local);
352 assert_matches!(decoder.next().await, None);
353 }
354 #[fuchsia::test]
355 async fn test_json_decoder_invalid_json() {
356 let (local, remote) = zx::Socket::create_stream();
357 let socket = flex_client::socket_to_async(remote);
358 let mut decoder = LogsDataStream::new(socket);
359
360 let mut local = flex_client::socket_to_async(local);
361
362 local.write_all(b"invalid json").await.unwrap();
364
365 let test_log = LogsDataBuilder::new(BuilderArgs {
367 component_url: None,
368 moniker: "ffx".try_into().unwrap(),
369 severity: Severity::Info,
370 timestamp: Timestamp::from_nanos(BOOT_TS),
371 })
372 .set_message("Recovery log")
373 .build();
374 let serialized_log = serde_json::to_string(&test_log).unwrap();
375 local.write_all(serialized_log.as_bytes()).await.unwrap();
376
377 drop(local);
380
381 let result = decoder.next().await;
385 assert!(result.unwrap().is_err());
387
388 let result2 = decoder.next().await;
397 match result2 {
405 Some(Ok(log)) => assert_eq!(log, test_log),
406 Some(Err(_)) => {
407 let result3 = decoder.next().await;
410 if let Some(Ok(log)) = result3 {
411 assert_eq!(log, test_log);
412 }
413 }
414 None => {} }
416 }
417}