Skip to main content

archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19
20pub type FormattedStream =
21    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
22
23#[pin_project]
24pub struct FormattedContentBatcher<C> {
25    #[pin]
26    items: C,
27    stats: Arc<BatchIteratorConnectionStats>,
28}
29
30/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
31///
32/// In snapshot mode, batched items will not be flushed to the client until the batch is complete
33/// or the underlying stream has terminated.
34///
35/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
36/// underlying stream is pending, ensuring clients always receive latest results.
37pub fn new_batcher<I, T, E>(
38    items: I,
39    stats: Arc<BatchIteratorConnectionStats>,
40    mode: StreamMode,
41) -> FormattedStream
42where
43    I: Stream<Item = Result<T, E>> + Send + 'static,
44    T: Into<FormattedContent> + Send,
45    E: Into<AccessorError> + Send,
46{
47    match mode {
48        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
49            Box::pin(FormattedContentBatcher {
50                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
51                stats,
52            })
53        }
54        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
55            items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
56            stats,
57        }),
58    }
59}
60
61impl<I, T, E> Stream for FormattedContentBatcher<I>
62where
63    I: Stream<Item = Vec<Result<T, E>>>,
64    T: Into<FormattedContent>,
65    E: Into<AccessorError>,
66{
67    type Item = Vec<Result<FormattedContent, AccessorError>>;
68
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        let this = self.project();
71        match this.items.poll_next(cx) {
72            Poll::Ready(Some(chunk)) => {
73                // loop over chunk instead of into_iter/map because we can't move `this`
74                let mut batch = vec![];
75                for item in chunk {
76                    let result = match item {
77                        Ok(i) => Ok(i.into()),
78                        Err(e) => {
79                            this.stats.add_result_error();
80                            Err(e.into())
81                        }
82                    };
83                    batch.push(result);
84                }
85                Poll::Ready(Some(batch))
86            }
87            Poll::Ready(None) => Poll::Ready(None),
88            Poll::Pending => Poll::Pending,
89        }
90    }
91}
92
93/// Holds a VMO containing valid serialized data as well as the size of that data.
94pub struct SerializedVmo {
95    pub vmo: zx::Vmo,
96    pub size: u64,
97    format: Format,
98}
99
100impl SerializedVmo {
101    pub fn serialize(
102        source: &impl Serialize,
103        data_type: DataType,
104        format: Format,
105    ) -> Result<Self, AccessorError> {
106        let initial_buffer_capacity = match data_type {
107            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
108            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
109            // single log instance it makes sense to start at the page size.
110            DataType::Logs => 4096, // page size
111        };
112        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
113        match format {
114            Format::Json => {
115                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
116            }
117            Format::Cbor => ciborium::into_writer(source, &mut buffer)
118                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
119            Format::Text => unreachable!("We'll never get Text"),
120            Format::Fxt => unreachable!("We'll never get FXT"),
121        }
122        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
123        vmo.write(&buffer, 0).unwrap();
124        Ok(Self { vmo, size: buffer.len() as u64, format })
125    }
126}
127
128impl From<SerializedVmo> for FormattedContent {
129    fn from(content: SerializedVmo) -> FormattedContent {
130        match content.format {
131            Format::Json => {
132                // set_content_size() is redundant, but consumers may expect the size there.
133                content
134                    .vmo
135                    .set_content_size(&content.size)
136                    .expect("set_content_size always returns Ok");
137                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
138                    vmo: content.vmo,
139                    size: content.size,
140                })
141            }
142            Format::Cbor => {
143                content
144                    .vmo
145                    .set_content_size(&content.size)
146                    .expect("set_content_size always returns Ok");
147                FormattedContent::Cbor(content.vmo)
148            }
149            Format::Fxt => {
150                content
151                    .vmo
152                    .set_content_size(&content.size)
153                    .expect("set_content_size always returns Ok");
154                FormattedContent::Fxt(content.vmo)
155            }
156            Format::Text => unreachable!("We'll never get Text"),
157        }
158    }
159}
160
161trait PacketFormat {
162    const FORMAT: Format;
163    const HEADER: &[u8] = &[];
164    const FOOTER: &[u8] = &[];
165
166    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
167    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
168    /// this is the first item in this batch.
169    fn write_item(
170        self: Pin<&mut Self>,
171        cx: &mut Context<'_>,
172        first: bool,
173        buffer: &mut Vec<u8>,
174    ) -> Poll<Option<usize>>;
175}
176
177#[pin_project]
178pub struct PacketSerializer<T> {
179    stats: Option<Arc<BatchIteratorConnectionStats>>,
180    max_packet_size: u64,
181    #[pin]
182    format: T,
183    overflow: Vec<u8>,
184    finished: bool,
185}
186
187impl<T> PacketSerializer<T> {
188    fn with_format(
189        stats: Option<Arc<BatchIteratorConnectionStats>>,
190        max_packet_size: u64,
191        format: T,
192    ) -> Self {
193        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
194    }
195}
196
197impl<T: PacketFormat> Stream for PacketSerializer<T> {
198    type Item = Result<SerializedVmo, AccessorError>;
199
200    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201        if self.finished {
202            return Poll::Ready(None);
203        }
204
205        // Limit packet size to prevent unbounded memory use.
206        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
207        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
208        let mut this = self.project();
209
210        let mut buffer = Vec::with_capacity(256 * 1024);
211        buffer.extend_from_slice(T::HEADER);
212
213        let mut first = true;
214
215        if !this.overflow.is_empty() {
216            buffer.append(this.overflow);
217            first = false;
218            if let Some(stats) = &this.stats {
219                stats.add_result();
220            }
221        }
222
223        let mut vmo = None;
224        let mut vmo_len = 0;
225
226        loop {
227            // Copy to the VMO if the room in the buffer drops below a threshold.
228            if buffer.capacity() - buffer.len() < 512 {
229                vmo.get_or_insert_with(|| zx::Vmo::create(max_packet_size).unwrap())
230                    .write(&buffer, vmo_len as u64)
231                    .unwrap();
232                vmo_len += buffer.len();
233                buffer.clear();
234            }
235
236            let last_len = buffer.len();
237
238            let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
239                Poll::Ready(Some(separator_len)) => separator_len,
240                Poll::Ready(None) => {
241                    *this.finished = true;
242                    if first {
243                        return Poll::Ready(None);
244                    } else {
245                        break;
246                    }
247                }
248                Poll::Pending => {
249                    if first {
250                        return Poll::Pending;
251                    } else {
252                        break;
253                    }
254                }
255            };
256
257            let item_len = buffer.len() - last_len - separator_len;
258
259            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
260                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
261                buffer.truncate(last_len);
262            } else {
263                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
264                    // Last item put us over the maximum packet size, keep it for the next batch.
265                    // We should have at least one item because otherwise we should have gone
266                    // through the branch above.
267                    assert!(!first);
268                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
269                    buffer.truncate(last_len);
270                    break;
271                }
272
273                first = false;
274
275                if let Some(stats) = &this.stats {
276                    stats.add_result();
277                }
278            }
279        }
280
281        buffer.extend_from_slice(T::FOOTER);
282
283        let vmo = match vmo {
284            Some(vmo) => {
285                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
286                vmo
287            }
288            None => zx::Vmo::create(buffer.len() as u64).unwrap(),
289        };
290        vmo.write(&buffer, vmo_len as u64).unwrap();
291        vmo_len += buffer.len();
292        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
293    }
294}
295
296#[pin_project]
297pub struct FxtPacketFormat<I> {
298    #[pin]
299    pub stream: I,
300    pub subscribe_to_manifest: bool,
301    pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
302}
303
304impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
305    const FORMAT: Format = Format::Fxt;
306
307    fn write_item(
308        self: Pin<&mut Self>,
309        cx: &mut Context<'_>,
310        _first: bool,
311        buffer: &mut Vec<u8>,
312    ) -> Poll<Option<usize>> {
313        let this = self.project();
314        if let Some(item) = ready!(this.stream.poll_next(cx)) {
315            if *this.subscribe_to_manifest {
316                let tag = item.tag();
317                let identity = item.component_identity();
318                let send_manifest = match this.sent_tags.entry(tag) {
319                    std::collections::hash_map::Entry::Vacant(e) => {
320                        e.insert(Arc::clone(identity));
321                        true
322                    }
323                    std::collections::hash_map::Entry::Occupied(mut e) => {
324                        if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
325                            e.insert(Arc::clone(identity));
326                            true
327                        } else {
328                            false
329                        }
330                    }
331                };
332
333                if send_manifest {
334                    use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
335                    use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
336                    use fidl_fuchsia_diagnostics_types::Severity;
337                    use std::io::Cursor;
338                    use zerocopy::{FromBytes, IntoBytes};
339
340                    let mut encoder = Encoder::new(
341                        Cursor::new(ResizableBuffer::from(Vec::new())),
342                        EncoderOpts::default(),
343                    );
344                    let record = Record {
345                        timestamp: zx::BootInstant::from_nanos(0),
346                        severity: Severity::Info.into_primitive(),
347                        arguments: vec![
348                            Argument::other("moniker", identity.moniker.to_string()),
349                            Argument::other("url", identity.url.as_str()),
350                        ],
351                    };
352                    encoder.write_record(record).unwrap();
353                    let mut manifest_buffer = encoder.take().into_inner().into_inner();
354                    if manifest_buffer.len() >= 8 {
355                        let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
356                        header.set_tag((tag as u32) | LOG_CONTROL_BIT);
357                        manifest_buffer[0..8].copy_from_slice(header.as_bytes());
358                    }
359                    buffer.extend_from_slice(&manifest_buffer);
360                }
361            }
362
363            buffer.extend_from_slice(item.data());
364
365            // if we are subscribing to the manifest, don't inject metadata into every record
366            extend_fxt_record(
367                item.component_identity(),
368                item.dropped(),
369                &ExtendRecordOpts {
370                    component_url: !*this.subscribe_to_manifest,
371                    moniker: !*this.subscribe_to_manifest,
372                    rolled_out: !*this.subscribe_to_manifest,
373                    subscribe_to_manifest: false,
374                },
375                buffer,
376            );
377            Poll::Ready(Some(0))
378        } else {
379            Poll::Ready(None)
380        }
381    }
382}
383
384pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
385
386impl<I> FxtPacketSerializer<I> {
387    pub fn new(
388        stats: Arc<BatchIteratorConnectionStats>,
389        max_packet_size: u64,
390        items: I,
391        subscribe_to_manifest: bool,
392    ) -> Self {
393        Self::with_format(
394            Some(stats),
395            max_packet_size,
396            FxtPacketFormat {
397                stream: items,
398                subscribe_to_manifest,
399                sent_tags: std::collections::HashMap::new(),
400            },
401        )
402    }
403}
404
405#[pin_project]
406pub struct JsonPacketFormat<I>(#[pin] I);
407
408impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
409    const FORMAT: Format = Format::Json;
410    const HEADER: &[u8] = b"[";
411    const FOOTER: &[u8] = b"]";
412
413    fn write_item(
414        self: Pin<&mut Self>,
415        cx: &mut Context<'_>,
416        first: bool,
417        buffer: &mut Vec<u8>,
418    ) -> Poll<Option<usize>> {
419        const SEPARATOR: &[u8] = b",\n";
420
421        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
422            let separator_len = if !first {
423                buffer.extend_from_slice(SEPARATOR);
424                SEPARATOR.len()
425            } else {
426                0
427            };
428            // We don't expect serialization to fail because we should always be able to write to
429            // `buffer` and `item` is a type we control which we know should always be serializable.
430            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
431            Poll::Ready(Some(separator_len))
432        } else {
433            Poll::Ready(None)
434        }
435    }
436}
437
438pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
439
440impl<I> JsonPacketSerializer<I> {
441    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
442        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
443    }
444
445    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
446        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::diagnostics::AccessorStats;
454    use futures::stream::iter;
455
456    #[fuchsia::test]
457    async fn two_items_joined_and_split() {
458        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
459        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
460        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
461        let smallest_possible_joined_len = joined[0].len() as u64;
462
463        let make_packets = |max| async move {
464            let node = fuchsia_inspect::Node::default();
465            let accessor_stats = Arc::new(AccessorStats::new(node));
466            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
467            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
468                .collect::<Vec<_>>()
469                .await
470                .into_iter()
471                .map(|r| {
472                    let result = r.unwrap();
473                    let mut buf = vec![0; result.size as usize];
474                    result.vmo.read(&mut buf, 0).expect("reading vmo");
475                    std::str::from_utf8(&buf).unwrap().to_string()
476                })
477                .collect::<Vec<_>>()
478        };
479
480        let actual_joined = make_packets(smallest_possible_joined_len).await;
481        assert_eq!(&actual_joined[..], joined);
482
483        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
484        assert_eq!(&actual_split[..], split);
485    }
486
487    #[fuchsia::test]
488    async fn overflow_separator_added() {
489        let inputs = &[&"A", &"B", &"C"];
490        // "[" + "A" + "]" = 4 bytes.
491        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
492        // If max is 8, "B" will overflow.
493        // Second packet starts with "B" from overflow.
494        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
495        // If max is 8, "C" will overflow.
496        // Third packet starts with "C" from overflow.
497
498        let make_packets = |max| async move {
499            let node = fuchsia_inspect::Node::default();
500            let accessor_stats = Arc::new(AccessorStats::new(node));
501            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
502            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
503                .collect::<Vec<_>>()
504                .await
505                .into_iter()
506                .map(|r| {
507                    let result = r.unwrap();
508                    let mut buf = vec![0; result.size as usize];
509                    result.vmo.read(&mut buf, 0).expect("reading vmo");
510                    std::str::from_utf8(&buf).unwrap().to_string()
511                })
512                .collect::<Vec<_>>()
513        };
514
515        let packets = make_packets(8).await;
516        assert_eq!(packets.len(), 3);
517        assert_eq!(packets[0], r#"["A"]"#);
518        assert_eq!(packets[1], r#"["B"]"#);
519        assert_eq!(packets[2], r#"["C"]"#);
520
521        let packets = make_packets(10).await;
522        assert_eq!(packets.len(), 2);
523        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
524        assert_eq!(packets[1], r#"["C"]"#);
525    }
526
527    #[fuchsia::test]
528    async fn oversize_item_not_dropped_incorrectly() {
529        let inputs = &[&"A", &"BCDEF"];
530        // Packet 1: ["A"] (4 bytes)
531        // Item 2: "BCDEF" (7 bytes)
532        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
533        // If max is 11:
534        // "A" fits (4 bytes).
535        // "BCDEF" overflows.
536        // NEXT packet:
537        // "[" + "BCDEF" + "]" = 9 bytes.
538        // 9 fits in 11.
539
540        let make_packets = |max| async move {
541            let node = fuchsia_inspect::Node::default();
542            let accessor_stats = Arc::new(AccessorStats::new(node));
543            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
544            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
545                .collect::<Vec<_>>()
546                .await
547                .into_iter()
548                .map(|r| {
549                    let result = r.unwrap();
550                    let mut buf = vec![0; result.size as usize];
551                    result.vmo.read(&mut buf, 0).expect("reading vmo");
552                    std::str::from_utf8(&buf).unwrap().to_string()
553                })
554                .collect::<Vec<_>>()
555        };
556
557        let packets = make_packets(11).await;
558        assert_eq!(packets.len(), 2);
559        assert_eq!(packets[0], r#"["A"]"#);
560        assert_eq!(packets[1], r#"["BCDEF"]"#);
561    }
562
563    #[fuchsia::test]
564    async fn item_too_big_for_packet_is_dropped() {
565        let inputs = &[&"ABCDE"]; // 7 bytes
566        // "[" + 7 + "]" = 9 bytes.
567        // If max is 8, it should be dropped.
568
569        let make_packets = |max| async move {
570            let node = fuchsia_inspect::Node::default();
571            let accessor_stats = Arc::new(AccessorStats::new(node));
572            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
573            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
574                .collect::<Vec<_>>()
575                .await
576                .into_iter()
577                .map(|r| {
578                    let result = r.unwrap();
579                    let mut buf = vec![0; result.size as usize];
580                    result.vmo.read(&mut buf, 0).expect("reading vmo");
581                    std::str::from_utf8(&buf).unwrap().to_string()
582                })
583                .collect::<Vec<_>>()
584        };
585
586        let packets = make_packets(8).await;
587        // Item should be dropped, so we get no packets.
588        assert_eq!(packets.len(), 0);
589    }
590
591    #[fuchsia::test]
592    async fn fxt_packet_serializer_subscribe_to_manifest() {
593        use crate::identity::ComponentIdentity;
594        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
595        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
596        use fidl_fuchsia_diagnostics_types::Severity;
597        use std::io::Cursor;
598        use zerocopy::{FromBytes, IntoBytes};
599
600        let identity1 = Arc::new(ComponentIdentity::unknown());
601        let mut identity2_inner = ComponentIdentity::unknown();
602        identity2_inner.url =
603            flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
604        let identity2 = Arc::new(identity2_inner);
605
606        // create message 1
607        let mut buffer = Cursor::new(vec![0u8; 128]);
608        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
609        encoder
610            .write_record(Record {
611                timestamp: zx::BootInstant::from_nanos(100),
612                severity: Severity::Info.into_primitive(),
613                arguments: vec![Argument::other("msg", "hello")],
614            })
615            .unwrap();
616        let end = encoder.inner().position() as usize;
617        let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
618        let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
619        header.set_tag(1);
620        msg1_bytes[0..8].copy_from_slice(header.as_bytes());
621        let msg1 =
622            FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
623
624        // create message 2
625        let mut buffer = Cursor::new(vec![0u8; 128]);
626        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
627        encoder
628            .write_record(Record {
629                timestamp: zx::BootInstant::from_nanos(200),
630                severity: Severity::Info.into_primitive(),
631                arguments: vec![Argument::other("msg", "world")],
632            })
633            .unwrap();
634        let end = encoder.inner().position() as usize;
635        let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
636        let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
637        header.set_tag(2);
638        msg2_bytes[0..8].copy_from_slice(header.as_bytes());
639        let msg2 =
640            FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
641
642        // create message 3 (same tag/identity as message 2)
643        let mut buffer = Cursor::new(vec![0u8; 128]);
644        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
645        encoder
646            .write_record(Record {
647                timestamp: zx::BootInstant::from_nanos(300),
648                severity: Severity::Info.into_primitive(),
649                arguments: vec![Argument::other("msg", "again")],
650            })
651            .unwrap();
652        let end = encoder.inner().position() as usize;
653        let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
654        let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
655        header.set_tag(2);
656        msg3_bytes[0..8].copy_from_slice(header.as_bytes());
657        let msg3 =
658            FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
659
660        let inputs = vec![msg1, msg2, msg3];
661
662        let node = fuchsia_inspect::Node::default();
663        let accessor_stats = Arc::new(AccessorStats::new(node));
664        let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
665
666        // Test with subscribe_to_manifest = true
667        let packets: Vec<_> =
668            FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
669                .collect::<Vec<_>>()
670                .await
671                .into_iter()
672                .map(|r| {
673                    let result = r.unwrap();
674                    let mut buf = vec![0; result.size as usize];
675                    result.vmo.read(&mut buf, 0).expect("reading vmo");
676                    buf
677                })
678                .collect();
679
680        assert_eq!(packets.len(), 1);
681        let output = &packets[0];
682
683        let mut current_slice = output.as_slice();
684        let mut records = vec![];
685
686        while !current_slice.is_empty() {
687            let (record, remaining) =
688                diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
689            let header =
690                diagnostics_log_encoding::Header::read_from_bytes(&current_slice[..8]).unwrap();
691            records.push((header, record));
692            current_slice = remaining;
693        }
694
695        assert_eq!(records.len(), 5);
696
697        // Record 0: Manifest for tag 1
698        assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
699        assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
700
701        // Record 1: Msg 1
702        assert_eq!(records[1].0.tag(), 1);
703        assert_eq!(records[1].1.arguments.len(), 1); // no moniker/url injected
704
705        // Record 2: Manifest for tag 2
706        assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
707        assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
708
709        // Record 3: Msg 2
710        assert_eq!(records[3].0.tag(), 2);
711        assert_eq!(records[3].1.arguments.len(), 1); // no moniker/url injected
712
713        // Record 4: Msg 3
714        assert_eq!(records[4].0.tag(), 2);
715        assert_eq!(records[4].1.arguments.len(), 1); // no moniker/url injected
716    }
717}