Skip to main content

archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::Header;
9use fidl_fuchsia_diagnostics::{
10    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
11};
12
13use fuchsia_async as fasync;
14use futures::{Stream, StreamExt};
15use log::warn;
16use pin_project::pin_project;
17use serde::Serialize;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21use zerocopy::{FromBytes, IntoBytes};
22use zx;
23
24static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
25static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
26
27const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
28
29pub type FormattedStream =
30    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
31
32#[pin_project]
33pub struct FormattedContentBatcher<C> {
34    #[pin]
35    items: C,
36    stats: Arc<BatchIteratorConnectionStats>,
37}
38
39/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
40///
41/// In snapshot mode, batched items will not be flushed to the client until the batch is complete,
42/// the underlying stream has terminated, or `SNAPSHOT_TIMEOUT` has passed since the first
43/// item was discovered (in order to prevent client starvation).
44///
45/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
46/// underlying stream is pending, ensuring clients always receive latest results.
47pub fn new_batcher<I, T, E>(
48    items: I,
49    stats: Arc<BatchIteratorConnectionStats>,
50    mode: StreamMode,
51) -> FormattedStream
52where
53    I: Stream<Item = Result<T, E>> + Send + 'static,
54    T: Into<FormattedContent> + Send,
55    E: Into<AccessorError> + Send,
56{
57    match mode {
58        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
59            Box::pin(FormattedContentBatcher {
60                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
61                stats,
62            })
63        }
64        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
65            items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
66            stats,
67        }),
68    }
69}
70
71impl<I, T, E> Stream for FormattedContentBatcher<I>
72where
73    I: Stream<Item = Vec<Result<T, E>>>,
74    T: Into<FormattedContent>,
75    E: Into<AccessorError>,
76{
77    type Item = Vec<Result<FormattedContent, AccessorError>>;
78
79    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        let this = self.project();
81        match this.items.poll_next(cx) {
82            Poll::Ready(Some(chunk)) => {
83                // loop over chunk instead of into_iter/map because we can't move `this`
84                let mut batch = vec![];
85                for item in chunk {
86                    let result = match item {
87                        Ok(i) => Ok(i.into()),
88                        Err(e) => {
89                            this.stats.add_result_error();
90                            Err(e.into())
91                        }
92                    };
93                    batch.push(result);
94                }
95                Poll::Ready(Some(batch))
96            }
97            Poll::Ready(None) => Poll::Ready(None),
98            Poll::Pending => Poll::Pending,
99        }
100    }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104enum TimerState {
105    Idle,
106    Running,
107}
108
109pub trait TimeLimitedChunksExt: Stream {
110    fn time_limited_chunks(
111        self,
112        capacity: usize,
113        timeout: std::time::Duration,
114    ) -> TimeLimitedChunks<Self>
115    where
116        Self: Sized,
117    {
118        TimeLimitedChunks::new(self, capacity, timeout)
119    }
120}
121
122impl<T: Stream> TimeLimitedChunksExt for T {}
123
124/// A stream adapter that yields chunks of items from the underlying stream.
125///
126/// Chunks are yielded when they reach the specified capacity or when the specified timeout
127/// expires since the first item in the chunk was received.
128#[pin_project]
129pub struct TimeLimitedChunks<S: Stream> {
130    #[pin]
131    stream: S,
132    capacity: usize,
133    timeout: std::time::Duration,
134    buffer: Vec<S::Item>,
135    #[pin]
136    timer: fasync::Timer,
137    state: TimerState,
138}
139
140impl<S: Stream> TimeLimitedChunks<S> {
141    pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
142        Self {
143            stream,
144            capacity,
145            timeout,
146            buffer: Vec::with_capacity(capacity),
147            timer: fasync::Timer::new(std::time::Instant::now() + timeout),
148            state: TimerState::Idle,
149        }
150    }
151}
152
153impl<S: Stream> Stream for TimeLimitedChunks<S> {
154    type Item = Vec<S::Item>;
155
156    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        let mut this = self.project();
158
159        loop {
160            match this.stream.as_mut().poll_next(cx) {
161                Poll::Ready(Some(item)) => {
162                    if this.buffer.is_empty() {
163                        let deadline = fasync::MonotonicInstant::after(
164                            zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
165                        );
166                        this.timer.as_mut().reset(deadline);
167                        *this.state = TimerState::Running;
168                    }
169                    this.buffer.push(item);
170                    if this.buffer.len() >= *this.capacity {
171                        *this.state = TimerState::Idle;
172                        return Poll::Ready(Some(std::mem::take(this.buffer)));
173                    }
174                }
175                Poll::Ready(None) => {
176                    if !this.buffer.is_empty() {
177                        *this.state = TimerState::Idle;
178                        return Poll::Ready(Some(std::mem::take(this.buffer)));
179                    }
180                    return Poll::Ready(None);
181                }
182                Poll::Pending => {
183                    if !this.buffer.is_empty() && *this.state == TimerState::Running {
184                        use std::future::Future;
185                        if this.timer.as_mut().poll(cx).is_ready() {
186                            *this.state = TimerState::Idle;
187                            return Poll::Ready(Some(std::mem::take(this.buffer)));
188                        }
189                    }
190                    return Poll::Pending;
191                }
192            }
193        }
194    }
195}
196
197/// Holds a VMO containing valid serialized data as well as the size of that data.
198pub struct SerializedVmo {
199    pub vmo: zx::Vmo,
200    pub size: u64,
201    format: Format,
202}
203
204impl SerializedVmo {
205    pub fn serialize(
206        source: &impl Serialize,
207        data_type: DataType,
208        format: Format,
209    ) -> Result<Self, AccessorError> {
210        let initial_buffer_capacity = match data_type {
211            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
212            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
213            // single log instance it makes sense to start at the page size.
214            DataType::Logs => 4096, // page size
215        };
216        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
217        match format {
218            Format::Json => {
219                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
220            }
221            Format::Cbor => ciborium::into_writer(source, &mut buffer)
222                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
223            Format::Text => unreachable!("We'll never get Text"),
224            Format::Fxt => unreachable!("We'll never get FXT"),
225        }
226        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
227        let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
228        vmo.write(&buffer, 0).unwrap();
229        Ok(Self { vmo, size: buffer.len() as u64, format })
230    }
231}
232
233impl From<SerializedVmo> for FormattedContent {
234    fn from(content: SerializedVmo) -> FormattedContent {
235        match content.format {
236            Format::Json => {
237                // set_content_size() is redundant, but consumers may expect the size there.
238                content
239                    .vmo
240                    .set_content_size(&content.size)
241                    .expect("set_content_size always returns Ok");
242                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
243                    vmo: content.vmo,
244                    size: content.size,
245                })
246            }
247            Format::Cbor => {
248                content
249                    .vmo
250                    .set_content_size(&content.size)
251                    .expect("set_content_size always returns Ok");
252                FormattedContent::Cbor(content.vmo)
253            }
254            Format::Fxt => {
255                content
256                    .vmo
257                    .set_content_size(&content.size)
258                    .expect("set_content_size always returns Ok");
259                FormattedContent::Fxt(content.vmo)
260            }
261            Format::Text => unreachable!("We'll never get Text"),
262        }
263    }
264}
265
266trait PacketFormat {
267    const FORMAT: Format;
268    const HEADER: &[u8] = &[];
269    const FOOTER: &[u8] = &[];
270
271    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
272    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
273    /// this is the first item in this batch.
274    fn write_item(
275        self: Pin<&mut Self>,
276        cx: &mut Context<'_>,
277        first: bool,
278        buffer: &mut Vec<u8>,
279    ) -> Poll<Option<usize>>;
280}
281
282#[pin_project]
283pub struct PacketSerializer<T> {
284    stats: Option<Arc<BatchIteratorConnectionStats>>,
285    max_packet_size: u64,
286    #[pin]
287    format: T,
288    overflow: Vec<u8>,
289    finished: bool,
290}
291
292impl<T> PacketSerializer<T> {
293    fn with_format(
294        stats: Option<Arc<BatchIteratorConnectionStats>>,
295        max_packet_size: u64,
296        format: T,
297    ) -> Self {
298        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
299    }
300}
301
302impl<T: PacketFormat> Stream for PacketSerializer<T> {
303    type Item = Result<SerializedVmo, AccessorError>;
304
305    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
306        if self.finished {
307            return Poll::Ready(None);
308        }
309
310        // Limit packet size to prevent unbounded memory use.
311        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
312        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
313        let mut this = self.project();
314
315        let mut buffer = Vec::with_capacity(256 * 1024);
316        buffer.extend_from_slice(T::HEADER);
317
318        let mut results = 0;
319
320        if !this.overflow.is_empty() {
321            buffer.append(this.overflow);
322            results += 1;
323        }
324
325        let mut vmo = None;
326        let mut vmo_len = 0;
327
328        loop {
329            // Copy to the VMO if the room in the buffer drops below a threshold.
330            if buffer.capacity() - buffer.len() < 512 {
331                vmo.get_or_insert_with(|| {
332                    let v = zx::Vmo::create(max_packet_size).unwrap();
333                    let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
334                    v
335                })
336                .write(&buffer, vmo_len as u64)
337                .unwrap();
338                vmo_len += buffer.len();
339                buffer.clear();
340            }
341
342            let last_len = buffer.len();
343
344            let separator_len = match this.format.as_mut().write_item(cx, results == 0, &mut buffer)
345            {
346                Poll::Ready(Some(separator_len)) => separator_len,
347                Poll::Ready(None) => {
348                    *this.finished = true;
349                    if results == 0 {
350                        return Poll::Ready(None);
351                    } else {
352                        break;
353                    }
354                }
355                Poll::Pending => {
356                    if results == 0 {
357                        return Poll::Pending;
358                    } else {
359                        break;
360                    }
361                }
362            };
363
364            let item_len = buffer.len() - last_len - separator_len;
365
366            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
367                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
368                buffer.truncate(last_len);
369            } else {
370                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
371                    // Last item put us over the maximum packet size, keep it for the next batch.
372                    // We should have at least one item because otherwise we should have gone
373                    // through the branch above.
374                    assert!(results > 0);
375                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
376                    buffer.truncate(last_len);
377                    break;
378                }
379
380                results += 1;
381            }
382        }
383
384        if let Some(stats) = &this.stats {
385            stats.add_result(results);
386        }
387
388        buffer.extend_from_slice(T::FOOTER);
389
390        let vmo = match vmo {
391            Some(vmo) => {
392                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
393                vmo
394            }
395            None => {
396                let v = zx::Vmo::create(buffer.len() as u64).unwrap();
397                let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
398                v
399            }
400        };
401        vmo.write(&buffer, vmo_len as u64).unwrap();
402        vmo_len += buffer.len();
403        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
404    }
405}
406
407#[pin_project]
408pub struct FxtPacketFormat {
409    #[pin]
410    pub cursor: FilterCursor,
411    pub subscribe_to_manifest: bool,
412    pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
413}
414
415impl PacketFormat for FxtPacketFormat {
416    const FORMAT: Format = Format::Fxt;
417
418    fn write_item(
419        self: Pin<&mut Self>,
420        cx: &mut Context<'_>,
421        _first: bool,
422        buffer: &mut Vec<u8>,
423    ) -> Poll<Option<usize>> {
424        let mut this = self.project();
425        loop {
426            let Some(message) = ready!(this.cursor.as_mut().poll_next(cx)) else {
427                return Poll::Ready(None);
428            };
429
430            let (identity, header, data) = match message.parse() {
431                Ok(res) => res,
432                Err(_) => {
433                    // If we fail to parse the message, just ignore it and move on to the next
434                    // message.
435                    continue;
436                }
437            };
438            let tag = header.tag() as u64;
439
440            if *this.subscribe_to_manifest {
441                let send_manifest = match this.sent_tags.entry(tag) {
442                    std::collections::hash_map::Entry::Vacant(e) => {
443                        e.insert(Arc::clone(identity));
444                        true
445                    }
446                    std::collections::hash_map::Entry::Occupied(mut e) => {
447                        if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
448                            e.insert(Arc::clone(identity));
449                            true
450                        } else {
451                            false
452                        }
453                    }
454                };
455
456                if send_manifest {
457                    use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
458                    use diagnostics_log_encoding::{Argument, LOG_CONTROL_BIT, Record};
459                    use fidl_fuchsia_diagnostics_types::Severity;
460                    use std::io::Cursor;
461
462                    let mut encoder = Encoder::new(
463                        Cursor::new(ResizableBuffer::from(Vec::new())),
464                        EncoderOpts::default(),
465                    );
466                    let record = Record {
467                        timestamp: zx::BootInstant::from_nanos(0),
468                        severity: Severity::Info.into_primitive(),
469                        arguments: vec![
470                            Argument::other("moniker", identity.moniker.to_string()),
471                            Argument::other("url", identity.url.as_str()),
472                        ],
473                    };
474                    encoder.write_record(record).unwrap();
475                    let mut manifest_buffer = encoder.take().into_inner().into_inner();
476                    if manifest_buffer.len() >= 8 {
477                        let mut header_manifest =
478                            Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
479                        header_manifest.set_tag((tag as u32) | LOG_CONTROL_BIT);
480                        manifest_buffer[0..8].copy_from_slice(header_manifest.as_bytes());
481                    }
482                    buffer.extend_from_slice(&manifest_buffer);
483                }
484            }
485
486            buffer.extend_from_slice(header.as_bytes());
487            buffer.extend_from_slice(&data[8..]);
488
489            extend_fxt_record(
490                identity,
491                message.dropped,
492                &ExtendRecordOpts {
493                    component_url: !*this.subscribe_to_manifest,
494                    moniker: !*this.subscribe_to_manifest,
495                    rolled_out: !*this.subscribe_to_manifest,
496                    subscribe_to_manifest: false,
497                },
498                buffer,
499            );
500            return Poll::Ready(Some(0));
501        }
502    }
503}
504
505pub type FxtPacketSerializer = PacketSerializer<FxtPacketFormat>;
506
507impl FxtPacketSerializer {
508    pub fn new(
509        stats: Arc<BatchIteratorConnectionStats>,
510        max_packet_size: u64,
511        cursor: FilterCursor,
512        subscribe_to_manifest: bool,
513    ) -> Self {
514        Self::with_format(
515            Some(stats),
516            max_packet_size,
517            FxtPacketFormat {
518                cursor,
519                subscribe_to_manifest,
520                sent_tags: std::collections::HashMap::new(),
521            },
522        )
523    }
524}
525
526#[pin_project]
527pub struct JsonPacketFormat<I>(#[pin] I);
528
529impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
530    const FORMAT: Format = Format::Json;
531    const HEADER: &[u8] = b"[";
532    const FOOTER: &[u8] = b"]";
533
534    fn write_item(
535        self: Pin<&mut Self>,
536        cx: &mut Context<'_>,
537        first: bool,
538        buffer: &mut Vec<u8>,
539    ) -> Poll<Option<usize>> {
540        const SEPARATOR: &[u8] = b",\n";
541
542        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
543            let separator_len = if !first {
544                buffer.extend_from_slice(SEPARATOR);
545                SEPARATOR.len()
546            } else {
547                0
548            };
549            // We don't expect serialization to fail because we should always be able to write to
550            // `buffer` and `item` is a type we control which we know should always be serializable.
551            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
552            Poll::Ready(Some(separator_len))
553        } else {
554            Poll::Ready(None)
555        }
556    }
557}
558
559pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
560
561impl<I> JsonPacketSerializer<I> {
562    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
563        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
564    }
565
566    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
567        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::diagnostics::AccessorStats;
575    use futures::stream::iter;
576
577    #[fuchsia::test]
578    async fn time_limited_chunks_yields_on_capacity() {
579        let stream = futures::stream::iter(vec![1, 2, 3, 4]);
580        let mut chunks =
581            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
582
583        assert_eq!(chunks.next().await, Some(vec![1, 2]));
584        assert_eq!(chunks.next().await, Some(vec![3, 4]));
585        assert_eq!(chunks.next().await, None);
586    }
587
588    #[fuchsia::test]
589    async fn time_limited_chunks_yields_on_stream_end() {
590        let stream = futures::stream::iter(vec![1, 2, 3]);
591        let mut chunks =
592            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
593
594        assert_eq!(chunks.next().await, Some(vec![1, 2]));
595        assert_eq!(chunks.next().await, Some(vec![3]));
596        assert_eq!(chunks.next().await, None);
597    }
598
599    #[fuchsia::test]
600    async fn time_limited_chunks_yields_on_timeout() {
601        let (tx, rx) = futures::channel::mpsc::unbounded();
602        let mut chunks =
603            Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
604
605        tx.unbounded_send(1).unwrap();
606
607        let start = std::time::Instant::now();
608        assert_eq!(chunks.next().await, Some(vec![1]));
609        assert!(start.elapsed() >= std::time::Duration::from_millis(10));
610
611        tx.unbounded_send(2).unwrap();
612        tx.unbounded_send(3).unwrap();
613        assert_eq!(chunks.next().await, Some(vec![2, 3]));
614
615        drop(tx);
616        assert_eq!(chunks.next().await, None);
617    }
618
619    #[fuchsia::test]
620    async fn two_items_joined_and_split() {
621        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
622        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
623        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
624        let smallest_possible_joined_len = joined[0].len() as u64;
625
626        let make_packets = |max| async move {
627            let node = fuchsia_inspect::Node::default();
628            let accessor_stats = Arc::new(AccessorStats::new(node));
629            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
630            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
631                .collect::<Vec<_>>()
632                .await
633                .into_iter()
634                .map(|r| {
635                    let result = r.unwrap();
636                    let mut buf = vec![0; result.size as usize];
637                    result.vmo.read(&mut buf, 0).expect("reading vmo");
638                    std::str::from_utf8(&buf).unwrap().to_string()
639                })
640                .collect::<Vec<_>>()
641        };
642
643        let actual_joined = make_packets(smallest_possible_joined_len).await;
644        assert_eq!(&actual_joined[..], joined);
645
646        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
647        assert_eq!(&actual_split[..], split);
648    }
649
650    #[fuchsia::test]
651    async fn overflow_separator_added() {
652        let inputs = &[&"A", &"B", &"C"];
653        // "[" + "A" + "]" = 4 bytes.
654        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
655        // If max is 8, "B" will overflow.
656        // Second packet starts with "B" from overflow.
657        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
658        // If max is 8, "C" will overflow.
659        // Third packet starts with "C" from overflow.
660
661        let make_packets = |max| async move {
662            let node = fuchsia_inspect::Node::default();
663            let accessor_stats = Arc::new(AccessorStats::new(node));
664            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
665            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
666                .collect::<Vec<_>>()
667                .await
668                .into_iter()
669                .map(|r| {
670                    let result = r.unwrap();
671                    let mut buf = vec![0; result.size as usize];
672                    result.vmo.read(&mut buf, 0).expect("reading vmo");
673                    std::str::from_utf8(&buf).unwrap().to_string()
674                })
675                .collect::<Vec<_>>()
676        };
677
678        let packets = make_packets(8).await;
679        assert_eq!(packets.len(), 3);
680        assert_eq!(packets[0], r#"["A"]"#);
681        assert_eq!(packets[1], r#"["B"]"#);
682        assert_eq!(packets[2], r#"["C"]"#);
683
684        let packets = make_packets(10).await;
685        assert_eq!(packets.len(), 2);
686        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
687        assert_eq!(packets[1], r#"["C"]"#);
688    }
689
690    #[fuchsia::test]
691    async fn oversize_item_not_dropped_incorrectly() {
692        let inputs = &[&"A", &"BCDEF"];
693        // Packet 1: ["A"] (4 bytes)
694        // Item 2: "BCDEF" (7 bytes)
695        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
696        // If max is 11:
697        // "A" fits (4 bytes).
698        // "BCDEF" overflows.
699        // NEXT packet:
700        // "[" + "BCDEF" + "]" = 9 bytes.
701        // 9 fits in 11.
702
703        let make_packets = |max| async move {
704            let node = fuchsia_inspect::Node::default();
705            let accessor_stats = Arc::new(AccessorStats::new(node));
706            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
707            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
708                .collect::<Vec<_>>()
709                .await
710                .into_iter()
711                .map(|r| {
712                    let result = r.unwrap();
713                    let mut buf = vec![0; result.size as usize];
714                    result.vmo.read(&mut buf, 0).expect("reading vmo");
715                    std::str::from_utf8(&buf).unwrap().to_string()
716                })
717                .collect::<Vec<_>>()
718        };
719
720        let packets = make_packets(11).await;
721        assert_eq!(packets.len(), 2);
722        assert_eq!(packets[0], r#"["A"]"#);
723        assert_eq!(packets[1], r#"["BCDEF"]"#);
724    }
725
726    #[fuchsia::test]
727    async fn item_too_big_for_packet_is_dropped() {
728        let inputs = &[&"ABCDE"]; // 7 bytes
729        // "[" + 7 + "]" = 9 bytes.
730        // If max is 8, it should be dropped.
731
732        let make_packets = |max| async move {
733            let node = fuchsia_inspect::Node::default();
734            let accessor_stats = Arc::new(AccessorStats::new(node));
735            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
736            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
737                .collect::<Vec<_>>()
738                .await
739                .into_iter()
740                .map(|r| {
741                    let result = r.unwrap();
742                    let mut buf = vec![0; result.size as usize];
743                    result.vmo.read(&mut buf, 0).expect("reading vmo");
744                    std::str::from_utf8(&buf).unwrap().to_string()
745                })
746                .collect::<Vec<_>>()
747        };
748
749        let packets = make_packets(8).await;
750        // Item should be dropped, so we get no packets.
751        assert_eq!(packets.len(), 0);
752    }
753
754    #[fuchsia::test]
755    async fn fxt_packet_serializer_subscribe_to_manifest() {
756        use crate::identity::ComponentIdentity;
757        use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
758        use crate::logs::stats::LogStreamStats;
759        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
760        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
761        use fidl_fuchsia_diagnostics::StreamMode;
762        use fidl_fuchsia_diagnostics_types::Severity;
763        use fuchsia_inspect::Node;
764        use std::io::Cursor;
765        use zerocopy::{FromBytes, IntoBytes};
766
767        let identity1 = Arc::new(ComponentIdentity::unknown());
768        let mut identity2_inner = ComponentIdentity::unknown();
769        identity2_inner.url =
770            flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
771        let identity2 = Arc::new(identity2_inner);
772
773        let buffer = Arc::new(SharedBuffer::new(
774            create_ring_buffer(65536),
775            Box::new(|_| {}),
776            Default::default(),
777            &Node::default(),
778        ));
779        let stats1 = Arc::new(LogStreamStats::new(&Node::default(), &identity1));
780        let container1 = buffer.new_container_buffer(Arc::clone(&identity1), stats1);
781        let stats2 = Arc::new(LogStreamStats::new(&Node::default(), &identity2));
782        let container2 = buffer.new_container_buffer(Arc::clone(&identity2), stats2);
783
784        // create message 1
785        let mut buffer_out = Cursor::new(vec![0u8; 128]);
786        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
787        encoder
788            .write_record(Record {
789                timestamp: zx::BootInstant::from_nanos(100),
790                severity: Severity::Info.into_primitive(),
791                arguments: vec![Argument::other("msg", "hello")],
792            })
793            .unwrap();
794        let end = encoder.inner().position() as usize;
795        let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
796        let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
797        header.set_tag(1);
798        msg1_bytes[0..8].copy_from_slice(header.as_bytes());
799        container1.push_back(&msg1_bytes);
800
801        // create message 2
802        let mut buffer_out = Cursor::new(vec![0u8; 128]);
803        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
804        encoder
805            .write_record(Record {
806                timestamp: zx::BootInstant::from_nanos(200),
807                severity: Severity::Info.into_primitive(),
808                arguments: vec![Argument::other("msg", "world")],
809            })
810            .unwrap();
811        let end = encoder.inner().position() as usize;
812        let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
813        let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
814        header.set_tag(2);
815        msg2_bytes[0..8].copy_from_slice(header.as_bytes());
816        container2.push_back(&msg2_bytes);
817
818        // create message 3 (same tag/identity as message 2)
819        let mut buffer_out = Cursor::new(vec![0u8; 128]);
820        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
821        encoder
822            .write_record(Record {
823                timestamp: zx::BootInstant::from_nanos(300),
824                severity: Severity::Info.into_primitive(),
825                arguments: vec![Argument::other("msg", "again")],
826            })
827            .unwrap();
828        let end = encoder.inner().position() as usize;
829        let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
830        let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
831        header.set_tag(2);
832        msg3_bytes[0..8].copy_from_slice(header.as_bytes());
833        container2.push_back(&msg3_bytes);
834
835        let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
836
837        let node = Node::default();
838        let accessor_stats = Arc::new(AccessorStats::new(node));
839        let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
840
841        // Test with subscribe_to_manifest = true
842        let packets: Vec<_> =
843            FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, cursor, true)
844                .collect::<Vec<_>>()
845                .await
846                .into_iter()
847                .map(|r| {
848                    let result = r.unwrap();
849                    let mut buf = vec![0; result.size as usize];
850                    result.vmo.read(&mut buf, 0).expect("reading vmo");
851                    buf
852                })
853                .collect();
854
855        assert_eq!(packets.len(), 1);
856        let output = &packets[0];
857
858        let mut current_slice = output.as_slice();
859        let mut records = vec![];
860
861        while !current_slice.is_empty() {
862            let (record, remaining) =
863                diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
864            let header =
865                diagnostics_log_encoding::Header::read_from_bytes(&current_slice[..8]).unwrap();
866            records.push((header, record));
867            current_slice = remaining;
868        }
869
870        assert_eq!(records.len(), 5);
871
872        // Record 0: Manifest for tag 0
873        assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
874        assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 0);
875
876        // Record 1: Msg 1
877        assert_eq!(records[1].0.tag(), 0);
878        assert_eq!(records[1].1.arguments.len(), 1); // no moniker/url injected
879
880        // Record 2: Manifest for tag 1
881        assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
882        assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 1);
883
884        // Record 3: Msg 2
885        assert_eq!(records[3].0.tag(), 1);
886        assert_eq!(records[3].1.arguments.len(), 1); // no moniker/url injected
887
888        // Record 4: Msg 3
889        assert_eq!(records[4].0.tag(), 1);
890        assert_eq!(records[4].1.arguments.len(), 1); // no moniker/url injected
891    }
892}