Skip to main content

archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
10use fidl_fuchsia_diagnostics_types::Severity;
11
12use fidl_fuchsia_diagnostics::{
13    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
14};
15
16use fuchsia_async as fasync;
17use futures::{Stream, StreamExt};
18use log::warn;
19use pin_project::pin_project;
20use serde::Serialize;
21use std::io::Cursor;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll, ready};
25use zerocopy::{FromBytes, IntoBytes};
26use zx;
27
28static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
29static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
30
31const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
32
33pub type FormattedStream =
34    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
35
36#[pin_project]
37pub struct FormattedContentBatcher<C> {
38    #[pin]
39    items: C,
40    stats: Arc<BatchIteratorConnectionStats>,
41}
42
43/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
44///
45/// In snapshot mode, batched items will not be flushed to the client until the batch is complete,
46/// the underlying stream has terminated, or `SNAPSHOT_TIMEOUT` has passed since the first
47/// item was discovered (in order to prevent client starvation).
48///
49/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
50/// underlying stream is pending, ensuring clients always receive latest results.
51pub fn new_batcher<I, T, E>(
52    items: I,
53    stats: Arc<BatchIteratorConnectionStats>,
54    mode: StreamMode,
55) -> FormattedStream
56where
57    I: Stream<Item = Result<T, E>> + Send + 'static,
58    T: Into<FormattedContent> + Send,
59    E: Into<AccessorError> + Send,
60{
61    match mode {
62        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
63            Box::pin(FormattedContentBatcher {
64                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
65                stats,
66            })
67        }
68        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
69            items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
70            stats,
71        }),
72    }
73}
74
75impl<I, T, E> Stream for FormattedContentBatcher<I>
76where
77    I: Stream<Item = Vec<Result<T, E>>>,
78    T: Into<FormattedContent>,
79    E: Into<AccessorError>,
80{
81    type Item = Vec<Result<FormattedContent, AccessorError>>;
82
83    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84        let this = self.project();
85        match this.items.poll_next(cx) {
86            Poll::Ready(Some(chunk)) => {
87                // loop over chunk instead of into_iter/map because we can't move `this`
88                let mut batch = vec![];
89                for item in chunk {
90                    let result = match item {
91                        Ok(i) => Ok(i.into()),
92                        Err(e) => {
93                            this.stats.add_result_error();
94                            Err(e.into())
95                        }
96                    };
97                    batch.push(result);
98                }
99                Poll::Ready(Some(batch))
100            }
101            Poll::Ready(None) => Poll::Ready(None),
102            Poll::Pending => Poll::Pending,
103        }
104    }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108enum TimerState {
109    Idle,
110    Running,
111}
112
113pub trait TimeLimitedChunksExt: Stream {
114    fn time_limited_chunks(
115        self,
116        capacity: usize,
117        timeout: std::time::Duration,
118    ) -> TimeLimitedChunks<Self>
119    where
120        Self: Sized,
121    {
122        TimeLimitedChunks::new(self, capacity, timeout)
123    }
124}
125
126impl<T: Stream> TimeLimitedChunksExt for T {}
127
128/// A stream adapter that yields chunks of items from the underlying stream.
129///
130/// Chunks are yielded when they reach the specified capacity or when the specified timeout
131/// expires since the first item in the chunk was received.
132#[pin_project]
133pub struct TimeLimitedChunks<S: Stream> {
134    #[pin]
135    stream: S,
136    capacity: usize,
137    timeout: std::time::Duration,
138    buffer: Vec<S::Item>,
139    #[pin]
140    timer: fasync::Timer,
141    state: TimerState,
142}
143
144impl<S: Stream> TimeLimitedChunks<S> {
145    pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
146        Self {
147            stream,
148            capacity,
149            timeout,
150            buffer: Vec::with_capacity(capacity),
151            timer: fasync::Timer::new(std::time::Instant::now() + timeout),
152            state: TimerState::Idle,
153        }
154    }
155}
156
157impl<S: Stream> Stream for TimeLimitedChunks<S> {
158    type Item = Vec<S::Item>;
159
160    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161        let mut this = self.project();
162
163        loop {
164            match this.stream.as_mut().poll_next(cx) {
165                Poll::Ready(Some(item)) => {
166                    if this.buffer.is_empty() {
167                        let deadline = fasync::MonotonicInstant::after(
168                            zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
169                        );
170                        this.timer.as_mut().reset(deadline);
171                        *this.state = TimerState::Running;
172                    }
173                    this.buffer.push(item);
174                    if this.buffer.len() >= *this.capacity {
175                        *this.state = TimerState::Idle;
176                        return Poll::Ready(Some(std::mem::take(this.buffer)));
177                    }
178                }
179                Poll::Ready(None) => {
180                    if !this.buffer.is_empty() {
181                        *this.state = TimerState::Idle;
182                        return Poll::Ready(Some(std::mem::take(this.buffer)));
183                    }
184                    return Poll::Ready(None);
185                }
186                Poll::Pending => {
187                    if !this.buffer.is_empty() && *this.state == TimerState::Running {
188                        use std::future::Future;
189                        if this.timer.as_mut().poll(cx).is_ready() {
190                            *this.state = TimerState::Idle;
191                            return Poll::Ready(Some(std::mem::take(this.buffer)));
192                        }
193                    }
194                    return Poll::Pending;
195                }
196            }
197        }
198    }
199}
200
201/// Holds a VMO containing valid serialized data as well as the size of that data.
202pub struct SerializedVmo {
203    pub vmo: zx::Vmo,
204    pub size: u64,
205    format: Format,
206}
207
208impl SerializedVmo {
209    pub fn serialize(
210        source: &impl Serialize,
211        data_type: DataType,
212        format: Format,
213    ) -> Result<Self, AccessorError> {
214        let initial_buffer_capacity = match data_type {
215            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
216            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
217            // single log instance it makes sense to start at the page size.
218            DataType::Logs => 4096, // page size
219        };
220        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
221        match format {
222            Format::Json => {
223                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
224            }
225            Format::Cbor => ciborium::into_writer(source, &mut buffer)
226                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
227            Format::Text => unreachable!("We'll never get Text"),
228            Format::Fxt | Format::LegacyFxt => unreachable!("We'll never get FXT"),
229        }
230        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
231        let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
232        vmo.write(&buffer, 0).unwrap();
233        Ok(Self { vmo, size: buffer.len() as u64, format })
234    }
235}
236
237pub fn encode_rolled_out(count: u64) -> Vec<u8> {
238    let mut encoder =
239        Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
240    let record = Record {
241        timestamp: zx::BootInstant::from_nanos(0),
242        severity: Severity::Info.into_primitive(),
243        arguments: vec![Argument::new(
244            "rolled_out",
245            diagnostics_log_encoding::Value::UnsignedInt(count),
246        )],
247    };
248    // This write should not fail as ResizableBuffer will grow as needed.
249    encoder.write_record(record).unwrap();
250
251    let mut buffer = encoder.take().into_inner().into_inner();
252    if buffer.len() >= 8 {
253        let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
254        header.set_tag(LOG_CONTROL_BIT);
255        buffer[0..8].copy_from_slice(header.as_bytes());
256    }
257    buffer
258}
259
260impl From<SerializedVmo> for FormattedContent {
261    fn from(content: SerializedVmo) -> FormattedContent {
262        match content.format {
263            Format::Json => {
264                // set_content_size() is redundant, but consumers may expect the size there.
265                content
266                    .vmo
267                    .set_content_size(&content.size)
268                    .expect("set_content_size always returns Ok");
269                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
270                    vmo: content.vmo,
271                    size: content.size,
272                })
273            }
274            Format::Cbor => {
275                content
276                    .vmo
277                    .set_content_size(&content.size)
278                    .expect("set_content_size always returns Ok");
279                FormattedContent::Cbor(content.vmo)
280            }
281            Format::Fxt | Format::LegacyFxt => {
282                content
283                    .vmo
284                    .set_content_size(&content.size)
285                    .expect("set_content_size always returns Ok");
286                FormattedContent::Fxt(content.vmo)
287            }
288            Format::Text => unreachable!("We'll never get Text"),
289        }
290    }
291}
292
293trait PacketFormat {
294    const FORMAT: Format;
295    const HEADER: &[u8] = &[];
296    const FOOTER: &[u8] = &[];
297
298    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
299    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
300    /// this is the first item in this batch.
301    fn write_item(
302        self: Pin<&mut Self>,
303        cx: &mut Context<'_>,
304        first: bool,
305        buffer: &mut Vec<u8>,
306    ) -> Poll<Option<usize>>;
307}
308
309#[pin_project]
310pub struct PacketSerializer<T> {
311    stats: Option<Arc<BatchIteratorConnectionStats>>,
312    max_packet_size: u64,
313    #[pin]
314    format: T,
315    overflow: Vec<u8>,
316    finished: bool,
317}
318
319impl<T> PacketSerializer<T> {
320    fn with_format(
321        stats: Option<Arc<BatchIteratorConnectionStats>>,
322        max_packet_size: u64,
323        format: T,
324    ) -> Self {
325        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
326    }
327}
328
329impl<T: PacketFormat> Stream for PacketSerializer<T> {
330    type Item = Result<SerializedVmo, AccessorError>;
331
332    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        if self.finished {
334            return Poll::Ready(None);
335        }
336
337        // Limit packet size to prevent unbounded memory use.
338        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
339        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
340        let mut this = self.project();
341
342        let mut buffer = Vec::with_capacity(256 * 1024);
343        buffer.extend_from_slice(T::HEADER);
344
345        let mut results = 0;
346
347        if !this.overflow.is_empty() {
348            buffer.append(this.overflow);
349            results += 1;
350        }
351
352        let mut vmo = None;
353        let mut vmo_len = 0;
354
355        loop {
356            // Copy to the VMO if the room in the buffer drops below a threshold.
357            if buffer.capacity() - buffer.len() < 512 {
358                vmo.get_or_insert_with(|| {
359                    let v = zx::Vmo::create(max_packet_size).unwrap();
360                    let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
361                    v
362                })
363                .write(&buffer, vmo_len as u64)
364                .unwrap();
365                vmo_len += buffer.len();
366                buffer.clear();
367            }
368
369            let last_len = buffer.len();
370
371            let separator_len = match this.format.as_mut().write_item(cx, results == 0, &mut buffer)
372            {
373                Poll::Ready(Some(separator_len)) => separator_len,
374                Poll::Ready(None) => {
375                    *this.finished = true;
376                    if results == 0 {
377                        return Poll::Ready(None);
378                    } else {
379                        break;
380                    }
381                }
382                Poll::Pending => {
383                    if results == 0 {
384                        return Poll::Pending;
385                    } else {
386                        break;
387                    }
388                }
389            };
390
391            let item_len = buffer.len() - last_len - separator_len;
392
393            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
394                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
395                buffer.truncate(last_len);
396            } else {
397                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
398                    // Last item put us over the maximum packet size, keep it for the next batch.
399                    // We should have at least one item because otherwise we should have gone
400                    // through the branch above.
401                    assert!(results > 0);
402                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
403                    buffer.truncate(last_len);
404                    break;
405                }
406
407                results += 1;
408            }
409        }
410
411        if let Some(stats) = &this.stats {
412            stats.add_result(results);
413        }
414
415        buffer.extend_from_slice(T::FOOTER);
416
417        let vmo = match vmo {
418            Some(vmo) => {
419                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
420                vmo
421            }
422            None => {
423                let v = zx::Vmo::create(buffer.len() as u64).unwrap();
424                let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
425                v
426            }
427        };
428        vmo.write(&buffer, vmo_len as u64).unwrap();
429        vmo_len += buffer.len();
430        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
431    }
432}
433
434#[pin_project]
435pub struct FxtPacketFormat {
436    #[pin]
437    pub cursor: FilterCursor,
438    pub subscribe_to_manifest: bool,
439    pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
440}
441
442impl PacketFormat for FxtPacketFormat {
443    const FORMAT: Format = Format::LegacyFxt;
444
445    fn write_item(
446        self: Pin<&mut Self>,
447        cx: &mut Context<'_>,
448        _first: bool,
449        buffer: &mut Vec<u8>,
450    ) -> Poll<Option<usize>> {
451        let mut this = self.project();
452        loop {
453            let Some(message) = ready!(this.cursor.as_mut().poll_next(cx)) else {
454                return Poll::Ready(None);
455            };
456
457            let (identity, header, data) = match message.parse() {
458                Ok(res) => res,
459                Err(_) => {
460                    // If we fail to parse the message, just ignore it and move on to the next
461                    // message.
462                    continue;
463                }
464            };
465            let tag = header.tag() as u64;
466
467            if *this.subscribe_to_manifest {
468                let send_manifest = match this.sent_tags.entry(tag) {
469                    std::collections::hash_map::Entry::Vacant(e) => {
470                        e.insert(Arc::clone(identity));
471                        true
472                    }
473                    std::collections::hash_map::Entry::Occupied(mut e) => {
474                        if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
475                            e.insert(Arc::clone(identity));
476                            true
477                        } else {
478                            false
479                        }
480                    }
481                };
482
483                if message.dropped > 0 {
484                    let rolled_out_buffer = encode_rolled_out(message.dropped);
485                    buffer.extend_from_slice(&rolled_out_buffer);
486                }
487
488                if send_manifest {
489                    use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
490                    use diagnostics_log_encoding::{Argument, LOG_CONTROL_BIT, Record};
491                    use fidl_fuchsia_diagnostics_types::Severity;
492                    use std::io::Cursor;
493
494                    let mut encoder = Encoder::new(
495                        Cursor::new(ResizableBuffer::from(Vec::new())),
496                        EncoderOpts::default(),
497                    );
498                    let record = Record {
499                        timestamp: zx::BootInstant::from_nanos(0),
500                        severity: Severity::Info.into_primitive(),
501                        arguments: vec![
502                            Argument::other("moniker", identity.moniker.to_string()),
503                            Argument::other("url", identity.url.as_str()),
504                        ],
505                    };
506                    encoder.write_record(record).unwrap();
507                    let mut manifest_buffer = encoder.take().into_inner().into_inner();
508                    if manifest_buffer.len() >= 8 {
509                        let mut header_manifest =
510                            Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
511                        header_manifest.set_tag((tag as u32) | LOG_CONTROL_BIT);
512                        manifest_buffer[0..8].copy_from_slice(header_manifest.as_bytes());
513                    }
514                    buffer.extend_from_slice(&manifest_buffer);
515                }
516            }
517
518            buffer.extend_from_slice(header.as_bytes());
519            buffer.extend_from_slice(&data[8..]);
520
521            extend_fxt_record(
522                identity,
523                message.dropped,
524                &ExtendRecordOpts {
525                    component_url: !*this.subscribe_to_manifest,
526                    moniker: !*this.subscribe_to_manifest,
527                    rolled_out: !*this.subscribe_to_manifest,
528                    subscribe_to_manifest: false,
529                },
530                buffer,
531            );
532            return Poll::Ready(Some(0));
533        }
534    }
535}
536
537pub type FxtPacketSerializer = PacketSerializer<FxtPacketFormat>;
538
539impl FxtPacketSerializer {
540    pub fn new(
541        stats: Arc<BatchIteratorConnectionStats>,
542        max_packet_size: u64,
543        cursor: FilterCursor,
544        subscribe_to_manifest: bool,
545    ) -> Self {
546        Self::with_format(
547            Some(stats),
548            max_packet_size,
549            FxtPacketFormat {
550                cursor,
551                subscribe_to_manifest,
552                sent_tags: std::collections::HashMap::new(),
553            },
554        )
555    }
556}
557
558#[pin_project]
559pub struct JsonPacketFormat<I>(#[pin] I);
560
561impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
562    const FORMAT: Format = Format::Json;
563    const HEADER: &[u8] = b"[";
564    const FOOTER: &[u8] = b"]";
565
566    fn write_item(
567        self: Pin<&mut Self>,
568        cx: &mut Context<'_>,
569        first: bool,
570        buffer: &mut Vec<u8>,
571    ) -> Poll<Option<usize>> {
572        const SEPARATOR: &[u8] = b",\n";
573
574        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
575            let separator_len = if !first {
576                buffer.extend_from_slice(SEPARATOR);
577                SEPARATOR.len()
578            } else {
579                0
580            };
581            // We don't expect serialization to fail because we should always be able to write to
582            // `buffer` and `item` is a type we control which we know should always be serializable.
583            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
584            Poll::Ready(Some(separator_len))
585        } else {
586            Poll::Ready(None)
587        }
588    }
589}
590
591pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
592
593impl<I> JsonPacketSerializer<I> {
594    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
595        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
596    }
597
598    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
599        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use crate::diagnostics::AccessorStats;
607    use futures::stream::iter;
608
609    #[fuchsia::test]
610    async fn time_limited_chunks_yields_on_capacity() {
611        let stream = futures::stream::iter(vec![1, 2, 3, 4]);
612        let mut chunks =
613            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
614
615        assert_eq!(chunks.next().await, Some(vec![1, 2]));
616        assert_eq!(chunks.next().await, Some(vec![3, 4]));
617        assert_eq!(chunks.next().await, None);
618    }
619
620    #[fuchsia::test]
621    async fn time_limited_chunks_yields_on_stream_end() {
622        let stream = futures::stream::iter(vec![1, 2, 3]);
623        let mut chunks =
624            Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
625
626        assert_eq!(chunks.next().await, Some(vec![1, 2]));
627        assert_eq!(chunks.next().await, Some(vec![3]));
628        assert_eq!(chunks.next().await, None);
629    }
630
631    #[fuchsia::test]
632    async fn time_limited_chunks_yields_on_timeout() {
633        let (tx, rx) = futures::channel::mpsc::unbounded();
634        let mut chunks =
635            Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
636
637        tx.unbounded_send(1).unwrap();
638
639        let start = std::time::Instant::now();
640        assert_eq!(chunks.next().await, Some(vec![1]));
641        assert!(start.elapsed() >= std::time::Duration::from_millis(10));
642
643        tx.unbounded_send(2).unwrap();
644        tx.unbounded_send(3).unwrap();
645        assert_eq!(chunks.next().await, Some(vec![2, 3]));
646
647        drop(tx);
648        assert_eq!(chunks.next().await, None);
649    }
650
651    #[fuchsia::test]
652    async fn two_items_joined_and_split() {
653        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
654        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
655        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
656        let smallest_possible_joined_len = joined[0].len() as u64;
657
658        let make_packets = |max| async move {
659            let node = fuchsia_inspect::Node::default();
660            let accessor_stats = Arc::new(AccessorStats::new(node));
661            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
662            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
663                .collect::<Vec<_>>()
664                .await
665                .into_iter()
666                .map(|r| {
667                    let result = r.unwrap();
668                    let mut buf = vec![0; result.size as usize];
669                    result.vmo.read(&mut buf, 0).expect("reading vmo");
670                    std::str::from_utf8(&buf).unwrap().to_string()
671                })
672                .collect::<Vec<_>>()
673        };
674
675        let actual_joined = make_packets(smallest_possible_joined_len).await;
676        assert_eq!(&actual_joined[..], joined);
677
678        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
679        assert_eq!(&actual_split[..], split);
680    }
681
682    #[fuchsia::test]
683    async fn overflow_separator_added() {
684        let inputs = &[&"A", &"B", &"C"];
685        // "[" + "A" + "]" = 4 bytes.
686        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
687        // If max is 8, "B" will overflow.
688        // Second packet starts with "B" from overflow.
689        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
690        // If max is 8, "C" will overflow.
691        // Third packet starts with "C" from overflow.
692
693        let make_packets = |max| async move {
694            let node = fuchsia_inspect::Node::default();
695            let accessor_stats = Arc::new(AccessorStats::new(node));
696            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
697            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
698                .collect::<Vec<_>>()
699                .await
700                .into_iter()
701                .map(|r| {
702                    let result = r.unwrap();
703                    let mut buf = vec![0; result.size as usize];
704                    result.vmo.read(&mut buf, 0).expect("reading vmo");
705                    std::str::from_utf8(&buf).unwrap().to_string()
706                })
707                .collect::<Vec<_>>()
708        };
709
710        let packets = make_packets(8).await;
711        assert_eq!(packets.len(), 3);
712        assert_eq!(packets[0], r#"["A"]"#);
713        assert_eq!(packets[1], r#"["B"]"#);
714        assert_eq!(packets[2], r#"["C"]"#);
715
716        let packets = make_packets(10).await;
717        assert_eq!(packets.len(), 2);
718        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
719        assert_eq!(packets[1], r#"["C"]"#);
720    }
721
722    #[fuchsia::test]
723    async fn oversize_item_not_dropped_incorrectly() {
724        let inputs = &[&"A", &"BCDEF"];
725        // Packet 1: ["A"] (4 bytes)
726        // Item 2: "BCDEF" (7 bytes)
727        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
728        // If max is 11:
729        // "A" fits (4 bytes).
730        // "BCDEF" overflows.
731        // NEXT packet:
732        // "[" + "BCDEF" + "]" = 9 bytes.
733        // 9 fits in 11.
734
735        let make_packets = |max| async move {
736            let node = fuchsia_inspect::Node::default();
737            let accessor_stats = Arc::new(AccessorStats::new(node));
738            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
739            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
740                .collect::<Vec<_>>()
741                .await
742                .into_iter()
743                .map(|r| {
744                    let result = r.unwrap();
745                    let mut buf = vec![0; result.size as usize];
746                    result.vmo.read(&mut buf, 0).expect("reading vmo");
747                    std::str::from_utf8(&buf).unwrap().to_string()
748                })
749                .collect::<Vec<_>>()
750        };
751
752        let packets = make_packets(11).await;
753        assert_eq!(packets.len(), 2);
754        assert_eq!(packets[0], r#"["A"]"#);
755        assert_eq!(packets[1], r#"["BCDEF"]"#);
756    }
757
758    #[fuchsia::test]
759    async fn item_too_big_for_packet_is_dropped() {
760        let inputs = &[&"ABCDE"]; // 7 bytes
761        // "[" + 7 + "]" = 9 bytes.
762        // If max is 8, it should be dropped.
763
764        let make_packets = |max| async move {
765            let node = fuchsia_inspect::Node::default();
766            let accessor_stats = Arc::new(AccessorStats::new(node));
767            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
768            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
769                .collect::<Vec<_>>()
770                .await
771                .into_iter()
772                .map(|r| {
773                    let result = r.unwrap();
774                    let mut buf = vec![0; result.size as usize];
775                    result.vmo.read(&mut buf, 0).expect("reading vmo");
776                    std::str::from_utf8(&buf).unwrap().to_string()
777                })
778                .collect::<Vec<_>>()
779        };
780
781        let packets = make_packets(8).await;
782        // Item should be dropped, so we get no packets.
783        assert_eq!(packets.len(), 0);
784    }
785
786    #[fuchsia::test]
787    async fn fxt_packet_serializer_subscribe_to_manifest() {
788        use crate::identity::ComponentIdentity;
789        use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
790        use crate::logs::stats::LogStreamStats;
791        use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
792        use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
793        use fidl_fuchsia_diagnostics::StreamMode;
794        use fidl_fuchsia_diagnostics_types::Severity;
795        use fuchsia_inspect::Node;
796        use std::io::Cursor;
797        use zerocopy::{FromBytes, IntoBytes};
798
799        let identity1 = Arc::new(ComponentIdentity::unknown());
800        let mut identity2_inner = ComponentIdentity::unknown();
801        identity2_inner.url =
802            flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
803        let identity2 = Arc::new(identity2_inner);
804
805        let buffer = Arc::new(SharedBuffer::new(
806            create_ring_buffer(65536),
807            Box::new(|_| {}),
808            Default::default(),
809            &Node::default(),
810        ));
811        let stats1 = Arc::new(LogStreamStats::new(&Node::default(), &identity1));
812        let container1 = buffer.new_container_buffer(Arc::clone(&identity1), stats1);
813        let stats2 = Arc::new(LogStreamStats::new(&Node::default(), &identity2));
814        let container2 = buffer.new_container_buffer(Arc::clone(&identity2), stats2);
815
816        // create message 1
817        let mut buffer_out = Cursor::new(vec![0u8; 128]);
818        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
819        encoder
820            .write_record(Record {
821                timestamp: zx::BootInstant::from_nanos(100),
822                severity: Severity::Info.into_primitive(),
823                arguments: vec![Argument::other("msg", "hello")],
824            })
825            .unwrap();
826        let end = encoder.inner().position() as usize;
827        let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
828        let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
829        header.set_tag(1);
830        msg1_bytes[0..8].copy_from_slice(header.as_bytes());
831        container1.push_back(&msg1_bytes);
832
833        // create message 2
834        let mut buffer_out = Cursor::new(vec![0u8; 128]);
835        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
836        encoder
837            .write_record(Record {
838                timestamp: zx::BootInstant::from_nanos(200),
839                severity: Severity::Info.into_primitive(),
840                arguments: vec![Argument::other("msg", "world")],
841            })
842            .unwrap();
843        let end = encoder.inner().position() as usize;
844        let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
845        let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
846        header.set_tag(2);
847        msg2_bytes[0..8].copy_from_slice(header.as_bytes());
848        container2.push_back(&msg2_bytes);
849
850        // create message 3 (same tag/identity as message 2)
851        let mut buffer_out = Cursor::new(vec![0u8; 128]);
852        let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
853        encoder
854            .write_record(Record {
855                timestamp: zx::BootInstant::from_nanos(300),
856                severity: Severity::Info.into_primitive(),
857                arguments: vec![Argument::other("msg", "again")],
858            })
859            .unwrap();
860        let end = encoder.inner().position() as usize;
861        let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
862        let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
863        header.set_tag(2);
864        msg3_bytes[0..8].copy_from_slice(header.as_bytes());
865        container2.push_back(&msg3_bytes);
866
867        let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
868
869        let node = Node::default();
870        let accessor_stats = Arc::new(AccessorStats::new(node));
871        let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
872
873        // Test with subscribe_to_manifest = true
874        let packets: Vec<_> =
875            FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, cursor, true)
876                .collect::<Vec<_>>()
877                .await
878                .into_iter()
879                .map(|r| {
880                    let result = r.unwrap();
881                    let mut buf = vec![0; result.size as usize];
882                    result.vmo.read(&mut buf, 0).expect("reading vmo");
883                    buf
884                })
885                .collect();
886
887        assert_eq!(packets.len(), 1);
888        let output = &packets[0];
889
890        let mut current_slice = output.as_slice();
891        let mut records = vec![];
892
893        while !current_slice.is_empty() {
894            let (record, remaining) =
895                diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
896            let header =
897                diagnostics_log_encoding::Header::read_from_bytes(&current_slice[..8]).unwrap();
898            records.push((header, record));
899            current_slice = remaining;
900        }
901
902        assert_eq!(records.len(), 5);
903
904        // Record 0: Manifest for tag 0
905        assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
906        assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 0);
907
908        // Record 1: Msg 1
909        assert_eq!(records[1].0.tag(), 0);
910        assert_eq!(records[1].1.arguments.len(), 1); // no moniker/url injected
911
912        // Record 2: Manifest for tag 1
913        assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
914        assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 1);
915
916        // Record 3: Msg 2
917        assert_eq!(records[3].0.tag(), 1);
918        assert_eq!(records[3].1.arguments.len(), 1); // no moniker/url injected
919
920        // Record 4: Msg 3
921        assert_eq!(records[4].0.tag(), 1);
922        assert_eq!(records[4].1.arguments.len(), 1); // no moniker/url injected
923    }
924}