Skip to main content

archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use fuchsia_async as fasync;
13use futures::{Stream, StreamExt};
14use log::warn;
15use pin_project::pin_project;
16use serde::Serialize;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll, ready};
20use zx;
21
22static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
23static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
24
25const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
26
27pub type FormattedStream =
28    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
29
30#[pin_project]
31pub struct FormattedContentBatcher<C> {
32    #[pin]
33    items: C,
34    stats: Arc<BatchIteratorConnectionStats>,
35}
36
37/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
38///
39/// In snapshot mode, batched items will not be flushed to the client until the batch is complete,
40/// the underlying stream has terminated, or `SNAPSHOT_TIMEOUT` has passed since the first
41/// item was discovered (in order to prevent client starvation).
42///
43/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
44/// underlying stream is pending, ensuring clients always receive latest results.
45pub fn new_batcher<I, T, E>(
46    items: I,
47    stats: Arc<BatchIteratorConnectionStats>,
48    mode: StreamMode,
49) -> FormattedStream
50where
51    I: Stream<Item = Result<T, E>> + Send + 'static,
52    T: Into<FormattedContent> + Send,
53    E: Into<AccessorError> + Send,
54{
55    match mode {
56        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
57            Box::pin(FormattedContentBatcher {
58                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
59                stats,
60            })
61        }
62        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
63            items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
64            stats,
65        }),
66    }
67}
68
69impl<I, T, E> Stream for FormattedContentBatcher<I>
70where
71    I: Stream<Item = Vec<Result<T, E>>>,
72    T: Into<FormattedContent>,
73    E: Into<AccessorError>,
74{
75    type Item = Vec<Result<FormattedContent, AccessorError>>;
76
77    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        let this = self.project();
79        match this.items.poll_next(cx) {
80            Poll::Ready(Some(chunk)) => {
81                // loop over chunk instead of into_iter/map because we can't move `this`
82                let mut batch = vec![];
83                for item in chunk {
84                    let result = match item {
85                        Ok(i) => Ok(i.into()),
86                        Err(e) => {
87                            this.stats.add_result_error();
88                            Err(e.into())
89                        }
90                    };
91                    batch.push(result);
92                }
93                Poll::Ready(Some(batch))
94            }
95            Poll::Ready(None) => Poll::Ready(None),
96            Poll::Pending => Poll::Pending,
97        }
98    }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum TimerState {
103    Idle,
104    Running,
105}
106
107pub trait TimeLimitedChunksExt: Stream {
108    fn time_limited_chunks(
109        self,
110        capacity: usize,
111        timeout: std::time::Duration,
112    ) -> TimeLimitedChunks<Self>
113    where
114        Self: Sized,
115    {
116        TimeLimitedChunks::new(self, capacity, timeout)
117    }
118}
119
120impl<T: Stream> TimeLimitedChunksExt for T {}
121
122/// A stream adapter that yields chunks of items from the underlying stream.
123///
124/// Chunks are yielded when they reach the specified capacity or when the specified timeout
125/// expires since the first item in the chunk was received.
126#[pin_project]
127pub struct TimeLimitedChunks<S: Stream> {
128    #[pin]
129    stream: S,
130    capacity: usize,
131    timeout: std::time::Duration,
132    buffer: Vec<S::Item>,
133    #[pin]
134    timer: fasync::Timer,
135    state: TimerState,
136}
137
138impl<S: Stream> TimeLimitedChunks<S> {
139    pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
140        Self {
141            stream,
142            capacity,
143            timeout,
144            buffer: Vec::with_capacity(capacity),
145            timer: fasync::Timer::new(std::time::Instant::now() + timeout),
146            state: TimerState::Idle,
147        }
148    }
149}
150
151impl<S: Stream> Stream for TimeLimitedChunks<S> {
152    type Item = Vec<S::Item>;
153
154    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
155        let mut this = self.project();
156
157        loop {
158            match this.stream.as_mut().poll_next(cx) {
159                Poll::Ready(Some(item)) => {
160                    if this.buffer.is_empty() {
161                        let deadline = fasync::MonotonicInstant::after(
162                            zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
163                        );
164                        this.timer.as_mut().reset(deadline);
165                        *this.state = TimerState::Running;
166                    }
167                    this.buffer.push(item);
168                    if this.buffer.len() >= *this.capacity {
169                        *this.state = TimerState::Idle;
170                        return Poll::Ready(Some(std::mem::take(this.buffer)));
171                    }
172                }
173                Poll::Ready(None) => {
174                    if !this.buffer.is_empty() {
175                        *this.state = TimerState::Idle;
176                        return Poll::Ready(Some(std::mem::take(this.buffer)));
177                    }
178                    return Poll::Ready(None);
179                }
180                Poll::Pending => {
181                    if !this.buffer.is_empty() && *this.state == TimerState::Running {
182                        use std::future::Future;
183                        if this.timer.as_mut().poll(cx).is_ready() {
184                            *this.state = TimerState::Idle;
185                            return Poll::Ready(Some(std::mem::take(this.buffer)));
186                        }
187                    }
188                    return Poll::Pending;
189                }
190            }
191        }
192    }
193}
194
195/// Holds a VMO containing valid serialized data as well as the size of that data.
196pub struct SerializedVmo {
197    pub vmo: zx::Vmo,
198    pub size: u64,
199    format: Format,
200}
201
202impl SerializedVmo {
203    pub fn serialize(
204        source: &impl Serialize,
205        data_type: DataType,
206        format: Format,
207    ) -> Result<Self, AccessorError> {
208        let initial_buffer_capacity = match data_type {
209            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
210            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
211            // single log instance it makes sense to start at the page size.
212            DataType::Logs => 4096, // page size
213        };
214        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
215        match format {
216            Format::Json => {
217                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
218            }
219            Format::Cbor => ciborium::into_writer(source, &mut buffer)
220                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
221            Format::Text => unreachable!("We'll never get Text"),
222            Format::Fxt => unreachable!("We'll never get FXT"),
223        }
224        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
225        let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
226        vmo.write(&buffer, 0).unwrap();
227        Ok(Self { vmo, size: buffer.len() as u64, format })
228    }
229}
230
231impl From<SerializedVmo> for FormattedContent {
232    fn from(content: SerializedVmo) -> FormattedContent {
233        match content.format {
234            Format::Json => {
235                // set_content_size() is redundant, but consumers may expect the size there.
236                content
237                    .vmo
238                    .set_content_size(&content.size)
239                    .expect("set_content_size always returns Ok");
240                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
241                    vmo: content.vmo,
242                    size: content.size,
243                })
244            }
245            Format::Cbor => {
246                content
247                    .vmo
248                    .set_content_size(&content.size)
249                    .expect("set_content_size always returns Ok");
250                FormattedContent::Cbor(content.vmo)
251            }
252            Format::Fxt => {
253                content
254                    .vmo
255                    .set_content_size(&content.size)
256                    .expect("set_content_size always returns Ok");
257                FormattedContent::Fxt(content.vmo)
258            }
259            Format::Text => unreachable!("We'll never get Text"),
260        }
261    }
262}
263
264trait PacketFormat {
265    const FORMAT: Format;
266    const HEADER: &[u8] = &[];
267    const FOOTER: &[u8] = &[];
268
269    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
270    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
271    /// this is the first item in this batch.
272    fn write_item(
273        self: Pin<&mut Self>,
274        cx: &mut Context<'_>,
275        first: bool,
276        buffer: &mut Vec<u8>,
277    ) -> Poll<Option<usize>>;
278}
279
280#[pin_project]
281pub struct PacketSerializer<T> {
282    stats: Option<Arc<BatchIteratorConnectionStats>>,
283    max_packet_size: u64,
284    #[pin]
285    format: T,
286    overflow: Vec<u8>,
287    finished: bool,
288}
289
290impl<T> PacketSerializer<T> {
291    fn with_format(
292        stats: Option<Arc<BatchIteratorConnectionStats>>,
293        max_packet_size: u64,
294        format: T,
295    ) -> Self {
296        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
297    }
298}
299
300impl<T: PacketFormat> Stream for PacketSerializer<T> {
301    type Item = Result<SerializedVmo, AccessorError>;
302
303    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304        if self.finished {
305            return Poll::Ready(None);
306        }
307
308        // Limit packet size to prevent unbounded memory use.
309        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
310        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
311        let mut this = self.project();
312
313        let mut buffer = Vec::with_capacity(256 * 1024);
314        buffer.extend_from_slice(T::HEADER);
315
316        let mut first = true;
317
318        if !this.overflow.is_empty() {
319            buffer.append(this.overflow);
320            first = false;
321            if let Some(stats) = &this.stats {
322                stats.add_result();
323            }
324        }
325
326        let mut vmo = None;
327        let mut vmo_len = 0;
328
329        loop {
330            // Copy to the VMO if the room in the buffer drops below a threshold.
331            if buffer.capacity() - buffer.len() < 512 {
332                vmo.get_or_insert_with(|| {
333                    let v = zx::Vmo::create(max_packet_size).unwrap();
334                    let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
335                    v
336                })
337                .write(&buffer, vmo_len as u64)
338                .unwrap();
339                vmo_len += buffer.len();
340                buffer.clear();
341            }
342
343            let last_len = buffer.len();
344
345            let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
346                Poll::Ready(Some(separator_len)) => separator_len,
347                Poll::Ready(None) => {
348                    *this.finished = true;
349                    if first {
350                        return Poll::Ready(None);
351                    } else {
352                        break;
353                    }
354                }
355                Poll::Pending => {
356                    if first {
357                        return Poll::Pending;
358                    } else {
359                        break;
360                    }
361                }
362            };
363
364            let item_len = buffer.len() - last_len - separator_len;
365
366            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
367                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
368                buffer.truncate(last_len);
369            } else {
370                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
371                    // Last item put us over the maximum packet size, keep it for the next batch.
372                    // We should have at least one item because otherwise we should have gone
373                    // through the branch above.
374                    assert!(!first);
375                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
376                    buffer.truncate(last_len);
377                    break;
378                }
379
380                first = false;
381
382                if let Some(stats) = &this.stats {
383                    stats.add_result();
384                }
385            }
386        }
387
388        buffer.extend_from_slice(T::FOOTER);
389
390        let vmo = match vmo {
391            Some(vmo) => {
392                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
393                vmo
394            }
395            None => {
396                let v = zx::Vmo::create(buffer.len() as u64).unwrap();
397                let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
398                v
399            }
400        };
401        vmo.write(&buffer, vmo_len as u64).unwrap();
402        vmo_len += buffer.len();
403        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
404    }
405}
406
407#[pin_project]
408pub struct FxtPacketFormat<I> {
409    #[pin]
410    pub stream: I,
411    pub subscribe_to_manifest: bool,
412    pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
413}
414
415impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
416    const FORMAT: Format = Format::Fxt;
417
418    fn write_item(
419        self: Pin<&mut Self>,
420        cx: &mut Context<'_>,
421        _first: bool,
422        buffer: &mut Vec<u8>,
423    ) -> Poll<Option<usize>> {
424        let this = self.project();
425        if let Some(item) = ready!(this.stream.poll_next(cx)) {
426            if *this.subscribe_to_manifest {
427                let tag = item.tag();
428                let identity = item.component_identity();
429                let send_manifest = match this.sent_tags.entry(tag) {
430                    std::collections::hash_map::Entry::Vacant(e) => {
431                        e.insert(Arc::clone(identity));
432                        true
433                    }
434                    std::collections::hash_map::Entry::Occupied(mut e) => {
435                        if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
436                            e.insert(Arc::clone(identity));
437                            true
438                        } else {
439                            false
440                        }
441                    }
442                };
443
444                if send_manifest {
445                    use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
446                    use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
447                    use fidl_fuchsia_diagnostics_types::Severity;
448                    use std::io::Cursor;
449                    use zerocopy::{FromBytes, IntoBytes};
450
451                    let mut encoder = Encoder::new(
452                        Cursor::new(ResizableBuffer::from(Vec::new())),
453                        EncoderOpts::default(),
454                    );
455                    let record = Record {
456                        timestamp: zx::BootInstant::from_nanos(0),
457                        severity: Severity::Info.into_primitive(),
458                        arguments: vec![
459                            Argument::other("moniker", identity.moniker.to_string()),
460                            Argument::other("url", identity.url.as_str()),
461                        ],
462                    };
463                    encoder.write_record(record).unwrap();
464                    let mut manifest_buffer = encoder.take().into_inner().into_inner();
465                    if manifest_buffer.len() >= 8 {
466                        let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
467                        header.set_tag((tag as u32) | LOG_CONTROL_BIT);
468                        manifest_buffer[0..8].copy_from_slice(header.as_bytes());
469                    }
470                    buffer.extend_from_slice(&manifest_buffer);
471                }
472            }
473
474            buffer.extend_from_slice(item.data());
475
476            // if we are subscribing to the manifest, don't inject metadata into every record
477            extend_fxt_record(
478                item.component_identity(),
479                item.dropped(),
480                &ExtendRecordOpts {
481                    component_url: !*this.subscribe_to_manifest,
482                    moniker: !*this.subscribe_to_manifest,
483                    rolled_out: !*this.subscribe_to_manifest,
484                    subscribe_to_manifest: false,
485                },
486                buffer,
487            );
488            Poll::Ready(Some(0))
489        } else {
490            Poll::Ready(None)
491        }
492    }
493}
494
495pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
496
497impl<I> FxtPacketSerializer<I> {
498    pub fn new(
499        stats: Arc<BatchIteratorConnectionStats>,
500        max_packet_size: u64,
501        items: I,
502        subscribe_to_manifest: bool,
503    ) -> Self {
504        Self::with_format(
505            Some(stats),
506            max_packet_size,
507            FxtPacketFormat {
508                stream: items,
509                subscribe_to_manifest,
510                sent_tags: std::collections::HashMap::new(),
511            },
512        )
513    }
514}
515
516#[pin_project]
517pub struct JsonPacketFormat<I>(#[pin] I);
518
519impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
520    const FORMAT: Format = Format::Json;
521    const HEADER: &[u8] = b"[";
522    const FOOTER: &[u8] = b"]";
523
524    fn write_item(
525        self: Pin<&mut Self>,
526        cx: &mut Context<'_>,
527        first: bool,
528        buffer: &mut Vec<u8>,
529    ) -> Poll<Option<usize>> {
530        const SEPARATOR: &[u8] = b",\n";
531
532        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
533            let separator_len = if !first {
534                buffer.extend_from_slice(SEPARATOR);
535                SEPARATOR.len()
536            } else {
537                0
538            };
539            // We don't expect serialization to fail because we should always be able to write to
540            // `buffer` and `item` is a type we control which we know should always be serializable.
541            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
542            Poll::Ready(Some(separator_len))
543        } else {
544            Poll::Ready(None)
545        }
546    }
547}
548
549pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
550
551impl<I> JsonPacketSerializer<I> {
552    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
553        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
554    }
555
556    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
557        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::diagnostics::AccessorStats;
565    use futures::stream::iter;
566
567    #[fuchsia::test]
568    async fn time_limited_chunks_yields_on_capacity() {
569        let stream = futures::stream::iter(vec![1, 2, 3, 4]);
570        let mut chunks =
571            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
572
573        assert_eq!(chunks.next().await, Some(vec![1, 2]));
574        assert_eq!(chunks.next().await, Some(vec![3, 4]));
575        assert_eq!(chunks.next().await, None);
576    }
577
578    #[fuchsia::test]
579    async fn time_limited_chunks_yields_on_stream_end() {
580        let stream = futures::stream::iter(vec![1, 2, 3]);
581        let mut chunks =
582            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
583
584        assert_eq!(chunks.next().await, Some(vec![1, 2]));
585        assert_eq!(chunks.next().await, Some(vec![3]));
586        assert_eq!(chunks.next().await, None);
587    }
588
589    #[fuchsia::test]
590    async fn time_limited_chunks_yields_on_timeout() {
591        let (tx, rx) = futures::channel::mpsc::unbounded();
592        let mut chunks =
593            Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
594
595        tx.unbounded_send(1).unwrap();
596
597        let start = std::time::Instant::now();
598        assert_eq!(chunks.next().await, Some(vec![1]));
599        assert!(start.elapsed() >= std::time::Duration::from_millis(10));
600
601        tx.unbounded_send(2).unwrap();
602        tx.unbounded_send(3).unwrap();
603        assert_eq!(chunks.next().await, Some(vec![2, 3]));
604
605        drop(tx);
606        assert_eq!(chunks.next().await, None);
607    }
608
609    #[fuchsia::test]
610    async fn two_items_joined_and_split() {
611        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
612        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
613        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
614        let smallest_possible_joined_len = joined[0].len() as u64;
615
616        let make_packets = |max| async move {
617            let node = fuchsia_inspect::Node::default();
618            let accessor_stats = Arc::new(AccessorStats::new(node));
619            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
620            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
621                .collect::<Vec<_>>()
622                .await
623                .into_iter()
624                .map(|r| {
625                    let result = r.unwrap();
626                    let mut buf = vec![0; result.size as usize];
627                    result.vmo.read(&mut buf, 0).expect("reading vmo");
628                    std::str::from_utf8(&buf).unwrap().to_string()
629                })
630                .collect::<Vec<_>>()
631        };
632
633        let actual_joined = make_packets(smallest_possible_joined_len).await;
634        assert_eq!(&actual_joined[..], joined);
635
636        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
637        assert_eq!(&actual_split[..], split);
638    }
639
640    #[fuchsia::test]
641    async fn overflow_separator_added() {
642        let inputs = &[&"A", &"B", &"C"];
643        // "[" + "A" + "]" = 4 bytes.
644        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
645        // If max is 8, "B" will overflow.
646        // Second packet starts with "B" from overflow.
647        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
648        // If max is 8, "C" will overflow.
649        // Third packet starts with "C" from overflow.
650
651        let make_packets = |max| async move {
652            let node = fuchsia_inspect::Node::default();
653            let accessor_stats = Arc::new(AccessorStats::new(node));
654            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
655            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
656                .collect::<Vec<_>>()
657                .await
658                .into_iter()
659                .map(|r| {
660                    let result = r.unwrap();
661                    let mut buf = vec![0; result.size as usize];
662                    result.vmo.read(&mut buf, 0).expect("reading vmo");
663                    std::str::from_utf8(&buf).unwrap().to_string()
664                })
665                .collect::<Vec<_>>()
666        };
667
668        let packets = make_packets(8).await;
669        assert_eq!(packets.len(), 3);
670        assert_eq!(packets[0], r#"["A"]"#);
671        assert_eq!(packets[1], r#"["B"]"#);
672        assert_eq!(packets[2], r#"["C"]"#);
673
674        let packets = make_packets(10).await;
675        assert_eq!(packets.len(), 2);
676        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
677        assert_eq!(packets[1], r#"["C"]"#);
678    }
679
680    #[fuchsia::test]
681    async fn oversize_item_not_dropped_incorrectly() {
682        let inputs = &[&"A", &"BCDEF"];
683        // Packet 1: ["A"] (4 bytes)
684        // Item 2: "BCDEF" (7 bytes)
685        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
686        // If max is 11:
687        // "A" fits (4 bytes).
688        // "BCDEF" overflows.
689        // NEXT packet:
690        // "[" + "BCDEF" + "]" = 9 bytes.
691        // 9 fits in 11.
692
693        let make_packets = |max| async move {
694            let node = fuchsia_inspect::Node::default();
695            let accessor_stats = Arc::new(AccessorStats::new(node));
696            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
697            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
698                .collect::<Vec<_>>()
699                .await
700                .into_iter()
701                .map(|r| {
702                    let result = r.unwrap();
703                    let mut buf = vec![0; result.size as usize];
704                    result.vmo.read(&mut buf, 0).expect("reading vmo");
705                    std::str::from_utf8(&buf).unwrap().to_string()
706                })
707                .collect::<Vec<_>>()
708        };
709
710        let packets = make_packets(11).await;
711        assert_eq!(packets.len(), 2);
712        assert_eq!(packets[0], r#"["A"]"#);
713        assert_eq!(packets[1], r#"["BCDEF"]"#);
714    }
715
716    #[fuchsia::test]
717    async fn item_too_big_for_packet_is_dropped() {
718        let inputs = &[&"ABCDE"]; // 7 bytes
719        // "[" + 7 + "]" = 9 bytes.
720        // If max is 8, it should be dropped.
721
722        let make_packets = |max| async move {
723            let node = fuchsia_inspect::Node::default();
724            let accessor_stats = Arc::new(AccessorStats::new(node));
725            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
726            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
727                .collect::<Vec<_>>()
728                .await
729                .into_iter()
730                .map(|r| {
731                    let result = r.unwrap();
732                    let mut buf = vec![0; result.size as usize];
733                    result.vmo.read(&mut buf, 0).expect("reading vmo");
734                    std::str::from_utf8(&buf).unwrap().to_string()
735                })
736                .collect::<Vec<_>>()
737        };
738
739        let packets = make_packets(8).await;
740        // Item should be dropped, so we get no packets.
741        assert_eq!(packets.len(), 0);
742    }
743
744    #[fuchsia::test]
745    async fn fxt_packet_serializer_subscribe_to_manifest() {
746        use crate::identity::ComponentIdentity;
747        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
748        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
749        use fidl_fuchsia_diagnostics_types::Severity;
750        use std::io::Cursor;
751        use zerocopy::{FromBytes, IntoBytes};
752
753        let identity1 = Arc::new(ComponentIdentity::unknown());
754        let mut identity2_inner = ComponentIdentity::unknown();
755        identity2_inner.url =
756            flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
757        let identity2 = Arc::new(identity2_inner);
758
759        // create message 1
760        let mut buffer = Cursor::new(vec![0u8; 128]);
761        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
762        encoder
763            .write_record(Record {
764                timestamp: zx::BootInstant::from_nanos(100),
765                severity: Severity::Info.into_primitive(),
766                arguments: vec![Argument::other("msg", "hello")],
767            })
768            .unwrap();
769        let end = encoder.inner().position() as usize;
770        let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
771        let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
772        header.set_tag(1);
773        msg1_bytes[0..8].copy_from_slice(header.as_bytes());
774        let msg1 =
775            FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
776
777        // create message 2
778        let mut buffer = Cursor::new(vec![0u8; 128]);
779        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
780        encoder
781            .write_record(Record {
782                timestamp: zx::BootInstant::from_nanos(200),
783                severity: Severity::Info.into_primitive(),
784                arguments: vec![Argument::other("msg", "world")],
785            })
786            .unwrap();
787        let end = encoder.inner().position() as usize;
788        let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
789        let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
790        header.set_tag(2);
791        msg2_bytes[0..8].copy_from_slice(header.as_bytes());
792        let msg2 =
793            FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
794
795        // create message 3 (same tag/identity as message 2)
796        let mut buffer = Cursor::new(vec![0u8; 128]);
797        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
798        encoder
799            .write_record(Record {
800                timestamp: zx::BootInstant::from_nanos(300),
801                severity: Severity::Info.into_primitive(),
802                arguments: vec![Argument::other("msg", "again")],
803            })
804            .unwrap();
805        let end = encoder.inner().position() as usize;
806        let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
807        let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
808        header.set_tag(2);
809        msg3_bytes[0..8].copy_from_slice(header.as_bytes());
810        let msg3 =
811            FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
812
813        let inputs = vec![msg1, msg2, msg3];
814
815        let node = fuchsia_inspect::Node::default();
816        let accessor_stats = Arc::new(AccessorStats::new(node));
817        let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
818
819        // Test with subscribe_to_manifest = true
820        let packets: Vec<_> =
821            FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
822                .collect::<Vec<_>>()
823                .await
824                .into_iter()
825                .map(|r| {
826                    let result = r.unwrap();
827                    let mut buf = vec![0; result.size as usize];
828                    result.vmo.read(&mut buf, 0).expect("reading vmo");
829                    buf
830                })
831                .collect();
832
833        assert_eq!(packets.len(), 1);
834        let output = &packets[0];
835
836        let mut current_slice = output.as_slice();
837        let mut records = vec![];
838
839        while !current_slice.is_empty() {
840            let (record, remaining) =
841                diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
842            let header =
843                diagnostics_log_encoding::Header::read_from_bytes(&current_slice[..8]).unwrap();
844            records.push((header, record));
845            current_slice = remaining;
846        }
847
848        assert_eq!(records.len(), 5);
849
850        // Record 0: Manifest for tag 1
851        assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
852        assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
853
854        // Record 1: Msg 1
855        assert_eq!(records[1].0.tag(), 1);
856        assert_eq!(records[1].1.arguments.len(), 1); // no moniker/url injected
857
858        // Record 2: Manifest for tag 2
859        assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
860        assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
861
862        // Record 3: Msg 2
863        assert_eq!(records[3].0.tag(), 2);
864        assert_eq!(records[3].1.arguments.len(), 1); // no moniker/url injected
865
866        // Record 4: Msg 3
867        assert_eq!(records[4].0.tag(), 2);
868        assert_eq!(records[4].1.arguments.len(), 1); // no moniker/url injected
869    }
870}