1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19
20pub type FormattedStream =
21 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
22
23#[pin_project]
24pub struct FormattedContentBatcher<C> {
25 #[pin]
26 items: C,
27 stats: Arc<BatchIteratorConnectionStats>,
28}
29
30pub fn new_batcher<I, T, E>(
38 items: I,
39 stats: Arc<BatchIteratorConnectionStats>,
40 mode: StreamMode,
41) -> FormattedStream
42where
43 I: Stream<Item = Result<T, E>> + Send + 'static,
44 T: Into<FormattedContent> + Send,
45 E: Into<AccessorError> + Send,
46{
47 match mode {
48 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
49 Box::pin(FormattedContentBatcher {
50 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
51 stats,
52 })
53 }
54 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
55 items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
56 stats,
57 }),
58 }
59}
60
61impl<I, T, E> Stream for FormattedContentBatcher<I>
62where
63 I: Stream<Item = Vec<Result<T, E>>>,
64 T: Into<FormattedContent>,
65 E: Into<AccessorError>,
66{
67 type Item = Vec<Result<FormattedContent, AccessorError>>;
68
69 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 let this = self.project();
71 match this.items.poll_next(cx) {
72 Poll::Ready(Some(chunk)) => {
73 let mut batch = vec![];
75 for item in chunk {
76 let result = match item {
77 Ok(i) => Ok(i.into()),
78 Err(e) => {
79 this.stats.add_result_error();
80 Err(e.into())
81 }
82 };
83 batch.push(result);
84 }
85 Poll::Ready(Some(batch))
86 }
87 Poll::Ready(None) => Poll::Ready(None),
88 Poll::Pending => Poll::Pending,
89 }
90 }
91}
92
93pub struct SerializedVmo {
95 pub vmo: zx::Vmo,
96 pub size: u64,
97 format: Format,
98}
99
100impl SerializedVmo {
101 pub fn serialize(
102 source: &impl Serialize,
103 data_type: DataType,
104 format: Format,
105 ) -> Result<Self, AccessorError> {
106 let initial_buffer_capacity = match data_type {
107 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
108 DataType::Logs => 4096, };
112 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
113 match format {
114 Format::Json => {
115 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
116 }
117 Format::Cbor => ciborium::into_writer(source, &mut buffer)
118 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
119 Format::Text => unreachable!("We'll never get Text"),
120 Format::Fxt => unreachable!("We'll never get FXT"),
121 }
122 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
123 vmo.write(&buffer, 0).unwrap();
124 Ok(Self { vmo, size: buffer.len() as u64, format })
125 }
126}
127
128impl From<SerializedVmo> for FormattedContent {
129 fn from(content: SerializedVmo) -> FormattedContent {
130 match content.format {
131 Format::Json => {
132 content
134 .vmo
135 .set_content_size(&content.size)
136 .expect("set_content_size always returns Ok");
137 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
138 vmo: content.vmo,
139 size: content.size,
140 })
141 }
142 Format::Cbor => {
143 content
144 .vmo
145 .set_content_size(&content.size)
146 .expect("set_content_size always returns Ok");
147 FormattedContent::Cbor(content.vmo)
148 }
149 Format::Fxt => {
150 content
151 .vmo
152 .set_content_size(&content.size)
153 .expect("set_content_size always returns Ok");
154 FormattedContent::Fxt(content.vmo)
155 }
156 Format::Text => unreachable!("We'll never get Text"),
157 }
158 }
159}
160
161trait PacketFormat {
162 const FORMAT: Format;
163 const HEADER: &[u8] = &[];
164 const FOOTER: &[u8] = &[];
165
166 fn write_item(
170 self: Pin<&mut Self>,
171 cx: &mut Context<'_>,
172 first: bool,
173 buffer: &mut Vec<u8>,
174 ) -> Poll<Option<usize>>;
175}
176
177#[pin_project]
178pub struct PacketSerializer<T> {
179 stats: Option<Arc<BatchIteratorConnectionStats>>,
180 max_packet_size: u64,
181 #[pin]
182 format: T,
183 overflow: Vec<u8>,
184 finished: bool,
185}
186
187impl<T> PacketSerializer<T> {
188 fn with_format(
189 stats: Option<Arc<BatchIteratorConnectionStats>>,
190 max_packet_size: u64,
191 format: T,
192 ) -> Self {
193 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
194 }
195}
196
197impl<T: PacketFormat> Stream for PacketSerializer<T> {
198 type Item = Result<SerializedVmo, AccessorError>;
199
200 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201 if self.finished {
202 return Poll::Ready(None);
203 }
204
205 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
208 let mut this = self.project();
209
210 let mut buffer = Vec::with_capacity(256 * 1024);
211 buffer.extend_from_slice(T::HEADER);
212
213 let mut first = true;
214
215 if !this.overflow.is_empty() {
216 buffer.append(this.overflow);
217 first = false;
218 if let Some(stats) = &this.stats {
219 stats.add_result();
220 }
221 }
222
223 let mut vmo = None;
224 let mut vmo_len = 0;
225
226 loop {
227 if buffer.capacity() - buffer.len() < 512 {
229 vmo.get_or_insert_with(|| zx::Vmo::create(max_packet_size).unwrap())
230 .write(&buffer, vmo_len as u64)
231 .unwrap();
232 vmo_len += buffer.len();
233 buffer.clear();
234 }
235
236 let last_len = buffer.len();
237
238 let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
239 Poll::Ready(Some(separator_len)) => separator_len,
240 Poll::Ready(None) => {
241 *this.finished = true;
242 if first {
243 return Poll::Ready(None);
244 } else {
245 break;
246 }
247 }
248 Poll::Pending => {
249 if first {
250 return Poll::Pending;
251 } else {
252 break;
253 }
254 }
255 };
256
257 let item_len = buffer.len() - last_len - separator_len;
258
259 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
260 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
261 buffer.truncate(last_len);
262 } else {
263 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
264 assert!(!first);
268 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
269 buffer.truncate(last_len);
270 break;
271 }
272
273 first = false;
274
275 if let Some(stats) = &this.stats {
276 stats.add_result();
277 }
278 }
279 }
280
281 buffer.extend_from_slice(T::FOOTER);
282
283 let vmo = match vmo {
284 Some(vmo) => {
285 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
286 vmo
287 }
288 None => zx::Vmo::create(buffer.len() as u64).unwrap(),
289 };
290 vmo.write(&buffer, vmo_len as u64).unwrap();
291 vmo_len += buffer.len();
292 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
293 }
294}
295
296#[pin_project]
297pub struct FxtPacketFormat<I>(#[pin] I);
298
299impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
300 const FORMAT: Format = Format::Fxt;
301
302 fn write_item(
303 self: Pin<&mut Self>,
304 cx: &mut Context<'_>,
305 _first: bool,
306 buffer: &mut Vec<u8>,
307 ) -> Poll<Option<usize>> {
308 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
309 buffer.extend_from_slice(item.data());
310 extend_fxt_record(
311 item.component_identity(),
312 item.dropped(),
313 &ExtendRecordOpts { component_url: true, moniker: true, rolled_out: true },
314 buffer,
315 );
316 Poll::Ready(Some(0))
317 } else {
318 Poll::Ready(None)
319 }
320 }
321}
322
323pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
324
325impl<I> FxtPacketSerializer<I> {
326 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
327 Self::with_format(Some(stats), max_packet_size, FxtPacketFormat(items))
328 }
329}
330
331#[pin_project]
332pub struct JsonPacketFormat<I>(#[pin] I);
333
334impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
335 const FORMAT: Format = Format::Json;
336 const HEADER: &[u8] = b"[";
337 const FOOTER: &[u8] = b"]";
338
339 fn write_item(
340 self: Pin<&mut Self>,
341 cx: &mut Context<'_>,
342 first: bool,
343 buffer: &mut Vec<u8>,
344 ) -> Poll<Option<usize>> {
345 const SEPARATOR: &[u8] = b",\n";
346
347 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
348 let separator_len = if !first {
349 buffer.extend_from_slice(SEPARATOR);
350 SEPARATOR.len()
351 } else {
352 0
353 };
354 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
357 Poll::Ready(Some(separator_len))
358 } else {
359 Poll::Ready(None)
360 }
361 }
362}
363
364pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
365
366impl<I> JsonPacketSerializer<I> {
367 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
368 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
369 }
370
371 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
372 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use crate::diagnostics::AccessorStats;
380 use futures::stream::iter;
381
382 #[fuchsia::test]
383 async fn two_items_joined_and_split() {
384 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
385 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
386 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
387 let smallest_possible_joined_len = joined[0].len() as u64;
388
389 let make_packets = |max| async move {
390 let node = fuchsia_inspect::Node::default();
391 let accessor_stats = Arc::new(AccessorStats::new(node));
392 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
393 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
394 .collect::<Vec<_>>()
395 .await
396 .into_iter()
397 .map(|r| {
398 let result = r.unwrap();
399 let mut buf = vec![0; result.size as usize];
400 result.vmo.read(&mut buf, 0).expect("reading vmo");
401 std::str::from_utf8(&buf).unwrap().to_string()
402 })
403 .collect::<Vec<_>>()
404 };
405
406 let actual_joined = make_packets(smallest_possible_joined_len).await;
407 assert_eq!(&actual_joined[..], joined);
408
409 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
410 assert_eq!(&actual_split[..], split);
411 }
412
413 #[fuchsia::test]
414 async fn overflow_separator_added() {
415 let inputs = &[&"A", &"B", &"C"];
416 let make_packets = |max| async move {
425 let node = fuchsia_inspect::Node::default();
426 let accessor_stats = Arc::new(AccessorStats::new(node));
427 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
428 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
429 .collect::<Vec<_>>()
430 .await
431 .into_iter()
432 .map(|r| {
433 let result = r.unwrap();
434 let mut buf = vec![0; result.size as usize];
435 result.vmo.read(&mut buf, 0).expect("reading vmo");
436 std::str::from_utf8(&buf).unwrap().to_string()
437 })
438 .collect::<Vec<_>>()
439 };
440
441 let packets = make_packets(8).await;
442 assert_eq!(packets.len(), 3);
443 assert_eq!(packets[0], r#"["A"]"#);
444 assert_eq!(packets[1], r#"["B"]"#);
445 assert_eq!(packets[2], r#"["C"]"#);
446
447 let packets = make_packets(10).await;
448 assert_eq!(packets.len(), 2);
449 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
450 assert_eq!(packets[1], r#"["C"]"#);
451 }
452
453 #[fuchsia::test]
454 async fn oversize_item_not_dropped_incorrectly() {
455 let inputs = &[&"A", &"BCDEF"];
456 let make_packets = |max| async move {
467 let node = fuchsia_inspect::Node::default();
468 let accessor_stats = Arc::new(AccessorStats::new(node));
469 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
470 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
471 .collect::<Vec<_>>()
472 .await
473 .into_iter()
474 .map(|r| {
475 let result = r.unwrap();
476 let mut buf = vec![0; result.size as usize];
477 result.vmo.read(&mut buf, 0).expect("reading vmo");
478 std::str::from_utf8(&buf).unwrap().to_string()
479 })
480 .collect::<Vec<_>>()
481 };
482
483 let packets = make_packets(11).await;
484 assert_eq!(packets.len(), 2);
485 assert_eq!(packets[0], r#"["A"]"#);
486 assert_eq!(packets[1], r#"["BCDEF"]"#);
487 }
488
489 #[fuchsia::test]
490 async fn item_too_big_for_packet_is_dropped() {
491 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
496 let node = fuchsia_inspect::Node::default();
497 let accessor_stats = Arc::new(AccessorStats::new(node));
498 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
499 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
500 .collect::<Vec<_>>()
501 .await
502 .into_iter()
503 .map(|r| {
504 let result = r.unwrap();
505 let mut buf = vec![0; result.size as usize];
506 result.vmo.read(&mut buf, 0).expect("reading vmo");
507 std::str::from_utf8(&buf).unwrap().to_string()
508 })
509 .collect::<Vec<_>>()
510 };
511
512 let packets = make_packets(8).await;
513 assert_eq!(packets.len(), 0);
515 }
516}