Skip to main content

archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19use zx;
20
21static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
22static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
23
24pub type FormattedStream =
25    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
26
27#[pin_project]
28pub struct FormattedContentBatcher<C> {
29    #[pin]
30    items: C,
31    stats: Arc<BatchIteratorConnectionStats>,
32}
33
34/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
35///
36/// In snapshot mode, batched items will not be flushed to the client until the batch is complete
37/// or the underlying stream has terminated.
38///
39/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
40/// underlying stream is pending, ensuring clients always receive latest results.
41pub fn new_batcher<I, T, E>(
42    items: I,
43    stats: Arc<BatchIteratorConnectionStats>,
44    mode: StreamMode,
45) -> FormattedStream
46where
47    I: Stream<Item = Result<T, E>> + Send + 'static,
48    T: Into<FormattedContent> + Send,
49    E: Into<AccessorError> + Send,
50{
51    match mode {
52        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
53            Box::pin(FormattedContentBatcher {
54                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
55                stats,
56            })
57        }
58        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
59            items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
60            stats,
61        }),
62    }
63}
64
65impl<I, T, E> Stream for FormattedContentBatcher<I>
66where
67    I: Stream<Item = Vec<Result<T, E>>>,
68    T: Into<FormattedContent>,
69    E: Into<AccessorError>,
70{
71    type Item = Vec<Result<FormattedContent, AccessorError>>;
72
73    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74        let this = self.project();
75        match this.items.poll_next(cx) {
76            Poll::Ready(Some(chunk)) => {
77                // loop over chunk instead of into_iter/map because we can't move `this`
78                let mut batch = vec![];
79                for item in chunk {
80                    let result = match item {
81                        Ok(i) => Ok(i.into()),
82                        Err(e) => {
83                            this.stats.add_result_error();
84                            Err(e.into())
85                        }
86                    };
87                    batch.push(result);
88                }
89                Poll::Ready(Some(batch))
90            }
91            Poll::Ready(None) => Poll::Ready(None),
92            Poll::Pending => Poll::Pending,
93        }
94    }
95}
96
97/// Holds a VMO containing valid serialized data as well as the size of that data.
98pub struct SerializedVmo {
99    pub vmo: zx::Vmo,
100    pub size: u64,
101    format: Format,
102}
103
104impl SerializedVmo {
105    pub fn serialize(
106        source: &impl Serialize,
107        data_type: DataType,
108        format: Format,
109    ) -> Result<Self, AccessorError> {
110        let initial_buffer_capacity = match data_type {
111            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
112            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
113            // single log instance it makes sense to start at the page size.
114            DataType::Logs => 4096, // page size
115        };
116        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
117        match format {
118            Format::Json => {
119                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
120            }
121            Format::Cbor => ciborium::into_writer(source, &mut buffer)
122                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
123            Format::Text => unreachable!("We'll never get Text"),
124            Format::Fxt => unreachable!("We'll never get FXT"),
125        }
126        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
127        let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
128        vmo.write(&buffer, 0).unwrap();
129        Ok(Self { vmo, size: buffer.len() as u64, format })
130    }
131}
132
133impl From<SerializedVmo> for FormattedContent {
134    fn from(content: SerializedVmo) -> FormattedContent {
135        match content.format {
136            Format::Json => {
137                // set_content_size() is redundant, but consumers may expect the size there.
138                content
139                    .vmo
140                    .set_content_size(&content.size)
141                    .expect("set_content_size always returns Ok");
142                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
143                    vmo: content.vmo,
144                    size: content.size,
145                })
146            }
147            Format::Cbor => {
148                content
149                    .vmo
150                    .set_content_size(&content.size)
151                    .expect("set_content_size always returns Ok");
152                FormattedContent::Cbor(content.vmo)
153            }
154            Format::Fxt => {
155                content
156                    .vmo
157                    .set_content_size(&content.size)
158                    .expect("set_content_size always returns Ok");
159                FormattedContent::Fxt(content.vmo)
160            }
161            Format::Text => unreachable!("We'll never get Text"),
162        }
163    }
164}
165
166trait PacketFormat {
167    const FORMAT: Format;
168    const HEADER: &[u8] = &[];
169    const FOOTER: &[u8] = &[];
170
171    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
172    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
173    /// this is the first item in this batch.
174    fn write_item(
175        self: Pin<&mut Self>,
176        cx: &mut Context<'_>,
177        first: bool,
178        buffer: &mut Vec<u8>,
179    ) -> Poll<Option<usize>>;
180}
181
182#[pin_project]
183pub struct PacketSerializer<T> {
184    stats: Option<Arc<BatchIteratorConnectionStats>>,
185    max_packet_size: u64,
186    #[pin]
187    format: T,
188    overflow: Vec<u8>,
189    finished: bool,
190}
191
192impl<T> PacketSerializer<T> {
193    fn with_format(
194        stats: Option<Arc<BatchIteratorConnectionStats>>,
195        max_packet_size: u64,
196        format: T,
197    ) -> Self {
198        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
199    }
200}
201
202impl<T: PacketFormat> Stream for PacketSerializer<T> {
203    type Item = Result<SerializedVmo, AccessorError>;
204
205    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206        if self.finished {
207            return Poll::Ready(None);
208        }
209
210        // Limit packet size to prevent unbounded memory use.
211        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
212        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
213        let mut this = self.project();
214
215        let mut buffer = Vec::with_capacity(256 * 1024);
216        buffer.extend_from_slice(T::HEADER);
217
218        let mut first = true;
219
220        if !this.overflow.is_empty() {
221            buffer.append(this.overflow);
222            first = false;
223            if let Some(stats) = &this.stats {
224                stats.add_result();
225            }
226        }
227
228        let mut vmo = None;
229        let mut vmo_len = 0;
230
231        loop {
232            // Copy to the VMO if the room in the buffer drops below a threshold.
233            if buffer.capacity() - buffer.len() < 512 {
234                vmo.get_or_insert_with(|| {
235                    let v = zx::Vmo::create(max_packet_size).unwrap();
236                    let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
237                    v
238                })
239                .write(&buffer, vmo_len as u64)
240                .unwrap();
241                vmo_len += buffer.len();
242                buffer.clear();
243            }
244
245            let last_len = buffer.len();
246
247            let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
248                Poll::Ready(Some(separator_len)) => separator_len,
249                Poll::Ready(None) => {
250                    *this.finished = true;
251                    if first {
252                        return Poll::Ready(None);
253                    } else {
254                        break;
255                    }
256                }
257                Poll::Pending => {
258                    if first {
259                        return Poll::Pending;
260                    } else {
261                        break;
262                    }
263                }
264            };
265
266            let item_len = buffer.len() - last_len - separator_len;
267
268            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
269                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
270                buffer.truncate(last_len);
271            } else {
272                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
273                    // Last item put us over the maximum packet size, keep it for the next batch.
274                    // We should have at least one item because otherwise we should have gone
275                    // through the branch above.
276                    assert!(!first);
277                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
278                    buffer.truncate(last_len);
279                    break;
280                }
281
282                first = false;
283
284                if let Some(stats) = &this.stats {
285                    stats.add_result();
286                }
287            }
288        }
289
290        buffer.extend_from_slice(T::FOOTER);
291
292        let vmo = match vmo {
293            Some(vmo) => {
294                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
295                vmo
296            }
297            None => {
298                let v = zx::Vmo::create(buffer.len() as u64).unwrap();
299                let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
300                v
301            }
302        };
303        vmo.write(&buffer, vmo_len as u64).unwrap();
304        vmo_len += buffer.len();
305        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
306    }
307}
308
309#[pin_project]
310pub struct FxtPacketFormat<I> {
311    #[pin]
312    pub stream: I,
313    pub subscribe_to_manifest: bool,
314    pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
315}
316
317impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
318    const FORMAT: Format = Format::Fxt;
319
320    fn write_item(
321        self: Pin<&mut Self>,
322        cx: &mut Context<'_>,
323        _first: bool,
324        buffer: &mut Vec<u8>,
325    ) -> Poll<Option<usize>> {
326        let this = self.project();
327        if let Some(item) = ready!(this.stream.poll_next(cx)) {
328            if *this.subscribe_to_manifest {
329                let tag = item.tag();
330                let identity = item.component_identity();
331                let send_manifest = match this.sent_tags.entry(tag) {
332                    std::collections::hash_map::Entry::Vacant(e) => {
333                        e.insert(Arc::clone(identity));
334                        true
335                    }
336                    std::collections::hash_map::Entry::Occupied(mut e) => {
337                        if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
338                            e.insert(Arc::clone(identity));
339                            true
340                        } else {
341                            false
342                        }
343                    }
344                };
345
346                if send_manifest {
347                    use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
348                    use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
349                    use fidl_fuchsia_diagnostics_types::Severity;
350                    use std::io::Cursor;
351                    use zerocopy::{FromBytes, IntoBytes};
352
353                    let mut encoder = Encoder::new(
354                        Cursor::new(ResizableBuffer::from(Vec::new())),
355                        EncoderOpts::default(),
356                    );
357                    let record = Record {
358                        timestamp: zx::BootInstant::from_nanos(0),
359                        severity: Severity::Info.into_primitive(),
360                        arguments: vec![
361                            Argument::other("moniker", identity.moniker.to_string()),
362                            Argument::other("url", identity.url.as_str()),
363                        ],
364                    };
365                    encoder.write_record(record).unwrap();
366                    let mut manifest_buffer = encoder.take().into_inner().into_inner();
367                    if manifest_buffer.len() >= 8 {
368                        let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
369                        header.set_tag((tag as u32) | LOG_CONTROL_BIT);
370                        manifest_buffer[0..8].copy_from_slice(header.as_bytes());
371                    }
372                    buffer.extend_from_slice(&manifest_buffer);
373                }
374            }
375
376            buffer.extend_from_slice(item.data());
377
378            // if we are subscribing to the manifest, don't inject metadata into every record
379            extend_fxt_record(
380                item.component_identity(),
381                item.dropped(),
382                &ExtendRecordOpts {
383                    component_url: !*this.subscribe_to_manifest,
384                    moniker: !*this.subscribe_to_manifest,
385                    rolled_out: !*this.subscribe_to_manifest,
386                    subscribe_to_manifest: false,
387                },
388                buffer,
389            );
390            Poll::Ready(Some(0))
391        } else {
392            Poll::Ready(None)
393        }
394    }
395}
396
397pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
398
399impl<I> FxtPacketSerializer<I> {
400    pub fn new(
401        stats: Arc<BatchIteratorConnectionStats>,
402        max_packet_size: u64,
403        items: I,
404        subscribe_to_manifest: bool,
405    ) -> Self {
406        Self::with_format(
407            Some(stats),
408            max_packet_size,
409            FxtPacketFormat {
410                stream: items,
411                subscribe_to_manifest,
412                sent_tags: std::collections::HashMap::new(),
413            },
414        )
415    }
416}
417
418#[pin_project]
419pub struct JsonPacketFormat<I>(#[pin] I);
420
421impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
422    const FORMAT: Format = Format::Json;
423    const HEADER: &[u8] = b"[";
424    const FOOTER: &[u8] = b"]";
425
426    fn write_item(
427        self: Pin<&mut Self>,
428        cx: &mut Context<'_>,
429        first: bool,
430        buffer: &mut Vec<u8>,
431    ) -> Poll<Option<usize>> {
432        const SEPARATOR: &[u8] = b",\n";
433
434        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
435            let separator_len = if !first {
436                buffer.extend_from_slice(SEPARATOR);
437                SEPARATOR.len()
438            } else {
439                0
440            };
441            // We don't expect serialization to fail because we should always be able to write to
442            // `buffer` and `item` is a type we control which we know should always be serializable.
443            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
444            Poll::Ready(Some(separator_len))
445        } else {
446            Poll::Ready(None)
447        }
448    }
449}
450
451pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
452
453impl<I> JsonPacketSerializer<I> {
454    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
455        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
456    }
457
458    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
459        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use crate::diagnostics::AccessorStats;
467    use futures::stream::iter;
468
469    #[fuchsia::test]
470    async fn two_items_joined_and_split() {
471        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
472        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
473        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
474        let smallest_possible_joined_len = joined[0].len() as u64;
475
476        let make_packets = |max| async move {
477            let node = fuchsia_inspect::Node::default();
478            let accessor_stats = Arc::new(AccessorStats::new(node));
479            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
480            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
481                .collect::<Vec<_>>()
482                .await
483                .into_iter()
484                .map(|r| {
485                    let result = r.unwrap();
486                    let mut buf = vec![0; result.size as usize];
487                    result.vmo.read(&mut buf, 0).expect("reading vmo");
488                    std::str::from_utf8(&buf).unwrap().to_string()
489                })
490                .collect::<Vec<_>>()
491        };
492
493        let actual_joined = make_packets(smallest_possible_joined_len).await;
494        assert_eq!(&actual_joined[..], joined);
495
496        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
497        assert_eq!(&actual_split[..], split);
498    }
499
500    #[fuchsia::test]
501    async fn overflow_separator_added() {
502        let inputs = &[&"A", &"B", &"C"];
503        // "[" + "A" + "]" = 4 bytes.
504        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
505        // If max is 8, "B" will overflow.
506        // Second packet starts with "B" from overflow.
507        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
508        // If max is 8, "C" will overflow.
509        // Third packet starts with "C" from overflow.
510
511        let make_packets = |max| async move {
512            let node = fuchsia_inspect::Node::default();
513            let accessor_stats = Arc::new(AccessorStats::new(node));
514            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
515            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
516                .collect::<Vec<_>>()
517                .await
518                .into_iter()
519                .map(|r| {
520                    let result = r.unwrap();
521                    let mut buf = vec![0; result.size as usize];
522                    result.vmo.read(&mut buf, 0).expect("reading vmo");
523                    std::str::from_utf8(&buf).unwrap().to_string()
524                })
525                .collect::<Vec<_>>()
526        };
527
528        let packets = make_packets(8).await;
529        assert_eq!(packets.len(), 3);
530        assert_eq!(packets[0], r#"["A"]"#);
531        assert_eq!(packets[1], r#"["B"]"#);
532        assert_eq!(packets[2], r#"["C"]"#);
533
534        let packets = make_packets(10).await;
535        assert_eq!(packets.len(), 2);
536        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
537        assert_eq!(packets[1], r#"["C"]"#);
538    }
539
540    #[fuchsia::test]
541    async fn oversize_item_not_dropped_incorrectly() {
542        let inputs = &[&"A", &"BCDEF"];
543        // Packet 1: ["A"] (4 bytes)
544        // Item 2: "BCDEF" (7 bytes)
545        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
546        // If max is 11:
547        // "A" fits (4 bytes).
548        // "BCDEF" overflows.
549        // NEXT packet:
550        // "[" + "BCDEF" + "]" = 9 bytes.
551        // 9 fits in 11.
552
553        let make_packets = |max| async move {
554            let node = fuchsia_inspect::Node::default();
555            let accessor_stats = Arc::new(AccessorStats::new(node));
556            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
557            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
558                .collect::<Vec<_>>()
559                .await
560                .into_iter()
561                .map(|r| {
562                    let result = r.unwrap();
563                    let mut buf = vec![0; result.size as usize];
564                    result.vmo.read(&mut buf, 0).expect("reading vmo");
565                    std::str::from_utf8(&buf).unwrap().to_string()
566                })
567                .collect::<Vec<_>>()
568        };
569
570        let packets = make_packets(11).await;
571        assert_eq!(packets.len(), 2);
572        assert_eq!(packets[0], r#"["A"]"#);
573        assert_eq!(packets[1], r#"["BCDEF"]"#);
574    }
575
576    #[fuchsia::test]
577    async fn item_too_big_for_packet_is_dropped() {
578        let inputs = &[&"ABCDE"]; // 7 bytes
579        // "[" + 7 + "]" = 9 bytes.
580        // If max is 8, it should be dropped.
581
582        let make_packets = |max| async move {
583            let node = fuchsia_inspect::Node::default();
584            let accessor_stats = Arc::new(AccessorStats::new(node));
585            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
586            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
587                .collect::<Vec<_>>()
588                .await
589                .into_iter()
590                .map(|r| {
591                    let result = r.unwrap();
592                    let mut buf = vec![0; result.size as usize];
593                    result.vmo.read(&mut buf, 0).expect("reading vmo");
594                    std::str::from_utf8(&buf).unwrap().to_string()
595                })
596                .collect::<Vec<_>>()
597        };
598
599        let packets = make_packets(8).await;
600        // Item should be dropped, so we get no packets.
601        assert_eq!(packets.len(), 0);
602    }
603
604    #[fuchsia::test]
605    async fn fxt_packet_serializer_subscribe_to_manifest() {
606        use crate::identity::ComponentIdentity;
607        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
608        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
609        use fidl_fuchsia_diagnostics_types::Severity;
610        use std::io::Cursor;
611        use zerocopy::{FromBytes, IntoBytes};
612
613        let identity1 = Arc::new(ComponentIdentity::unknown());
614        let mut identity2_inner = ComponentIdentity::unknown();
615        identity2_inner.url =
616            flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
617        let identity2 = Arc::new(identity2_inner);
618
619        // create message 1
620        let mut buffer = Cursor::new(vec![0u8; 128]);
621        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
622        encoder
623            .write_record(Record {
624                timestamp: zx::BootInstant::from_nanos(100),
625                severity: Severity::Info.into_primitive(),
626                arguments: vec![Argument::other("msg", "hello")],
627            })
628            .unwrap();
629        let end = encoder.inner().position() as usize;
630        let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
631        let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
632        header.set_tag(1);
633        msg1_bytes[0..8].copy_from_slice(header.as_bytes());
634        let msg1 =
635            FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
636
637        // create message 2
638        let mut buffer = Cursor::new(vec![0u8; 128]);
639        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
640        encoder
641            .write_record(Record {
642                timestamp: zx::BootInstant::from_nanos(200),
643                severity: Severity::Info.into_primitive(),
644                arguments: vec![Argument::other("msg", "world")],
645            })
646            .unwrap();
647        let end = encoder.inner().position() as usize;
648        let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
649        let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
650        header.set_tag(2);
651        msg2_bytes[0..8].copy_from_slice(header.as_bytes());
652        let msg2 =
653            FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
654
655        // create message 3 (same tag/identity as message 2)
656        let mut buffer = Cursor::new(vec![0u8; 128]);
657        let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
658        encoder
659            .write_record(Record {
660                timestamp: zx::BootInstant::from_nanos(300),
661                severity: Severity::Info.into_primitive(),
662                arguments: vec![Argument::other("msg", "again")],
663            })
664            .unwrap();
665        let end = encoder.inner().position() as usize;
666        let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
667        let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
668        header.set_tag(2);
669        msg3_bytes[0..8].copy_from_slice(header.as_bytes());
670        let msg3 =
671            FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
672
673        let inputs = vec![msg1, msg2, msg3];
674
675        let node = fuchsia_inspect::Node::default();
676        let accessor_stats = Arc::new(AccessorStats::new(node));
677        let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
678
679        // Test with subscribe_to_manifest = true
680        let packets: Vec<_> =
681            FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
682                .collect::<Vec<_>>()
683                .await
684                .into_iter()
685                .map(|r| {
686                    let result = r.unwrap();
687                    let mut buf = vec![0; result.size as usize];
688                    result.vmo.read(&mut buf, 0).expect("reading vmo");
689                    buf
690                })
691                .collect();
692
693        assert_eq!(packets.len(), 1);
694        let output = &packets[0];
695
696        let mut current_slice = output.as_slice();
697        let mut records = vec![];
698
699        while !current_slice.is_empty() {
700            let (record, remaining) =
701                diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
702            let header =
703                diagnostics_log_encoding::Header::read_from_bytes(&current_slice[..8]).unwrap();
704            records.push((header, record));
705            current_slice = remaining;
706        }
707
708        assert_eq!(records.len(), 5);
709
710        // Record 0: Manifest for tag 1
711        assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
712        assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
713
714        // Record 1: Msg 1
715        assert_eq!(records[1].0.tag(), 1);
716        assert_eq!(records[1].1.arguments.len(), 1); // no moniker/url injected
717
718        // Record 2: Manifest for tag 2
719        assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
720        assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
721
722        // Record 3: Msg 2
723        assert_eq!(records[3].0.tag(), 2);
724        assert_eq!(records[3].1.arguments.len(), 1); // no moniker/url injected
725
726        // Record 4: Msg 3
727        assert_eq!(records[4].0.tag(), 2);
728        assert_eq!(records[4].1.arguments.len(), 1); // no moniker/url injected
729    }
730}