archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9    DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19
20pub type FormattedStream =
21    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
22
23#[pin_project]
24pub struct FormattedContentBatcher<C> {
25    #[pin]
26    items: C,
27    stats: Arc<BatchIteratorConnectionStats>,
28}
29
30/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
31///
32/// In snapshot mode, batched items will not be flushed to the client until the batch is complete
33/// or the underlying stream has terminated.
34///
35/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
36/// underlying stream is pending, ensuring clients always receive latest results.
37pub fn new_batcher<I, T, E>(
38    items: I,
39    stats: Arc<BatchIteratorConnectionStats>,
40    mode: StreamMode,
41) -> FormattedStream
42where
43    I: Stream<Item = Result<T, E>> + Send + 'static,
44    T: Into<FormattedContent> + Send,
45    E: Into<AccessorError> + Send,
46{
47    match mode {
48        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
49            Box::pin(FormattedContentBatcher {
50                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
51                stats,
52            })
53        }
54        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
55            items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
56            stats,
57        }),
58    }
59}
60
61impl<I, T, E> Stream for FormattedContentBatcher<I>
62where
63    I: Stream<Item = Vec<Result<T, E>>>,
64    T: Into<FormattedContent>,
65    E: Into<AccessorError>,
66{
67    type Item = Vec<Result<FormattedContent, AccessorError>>;
68
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        let this = self.project();
71        match this.items.poll_next(cx) {
72            Poll::Ready(Some(chunk)) => {
73                // loop over chunk instead of into_iter/map because we can't move `this`
74                let mut batch = vec![];
75                for item in chunk {
76                    let result = match item {
77                        Ok(i) => Ok(i.into()),
78                        Err(e) => {
79                            this.stats.add_result_error();
80                            Err(e.into())
81                        }
82                    };
83                    batch.push(result);
84                }
85                Poll::Ready(Some(batch))
86            }
87            Poll::Ready(None) => Poll::Ready(None),
88            Poll::Pending => Poll::Pending,
89        }
90    }
91}
92
93/// Holds a VMO containing valid serialized data as well as the size of that data.
94pub struct SerializedVmo {
95    pub vmo: zx::Vmo,
96    pub size: u64,
97    format: Format,
98}
99
100impl SerializedVmo {
101    pub fn serialize(
102        source: &impl Serialize,
103        data_type: DataType,
104        format: Format,
105    ) -> Result<Self, AccessorError> {
106        let initial_buffer_capacity = match data_type {
107            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
108            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
109            // single log instance it makes sense to start at the page size.
110            DataType::Logs => 4096, // page size
111        };
112        let mut buffer = Vec::with_capacity(initial_buffer_capacity);
113        match format {
114            Format::Json => {
115                serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
116            }
117            Format::Cbor => ciborium::into_writer(source, &mut buffer)
118                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
119            Format::Text => unreachable!("We'll never get Text"),
120            Format::Fxt => unreachable!("We'll never get FXT"),
121        }
122        let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
123        vmo.write(&buffer, 0).unwrap();
124        Ok(Self { vmo, size: buffer.len() as u64, format })
125    }
126}
127
128impl From<SerializedVmo> for FormattedContent {
129    fn from(content: SerializedVmo) -> FormattedContent {
130        match content.format {
131            Format::Json => {
132                // set_content_size() is redundant, but consumers may expect the size there.
133                content
134                    .vmo
135                    .set_content_size(&content.size)
136                    .expect("set_content_size always returns Ok");
137                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
138                    vmo: content.vmo,
139                    size: content.size,
140                })
141            }
142            Format::Cbor => {
143                content
144                    .vmo
145                    .set_content_size(&content.size)
146                    .expect("set_content_size always returns Ok");
147                FormattedContent::Cbor(content.vmo)
148            }
149            Format::Fxt => {
150                content
151                    .vmo
152                    .set_content_size(&content.size)
153                    .expect("set_content_size always returns Ok");
154                FormattedContent::Fxt(content.vmo)
155            }
156            Format::Text => unreachable!("We'll never get Text"),
157        }
158    }
159}
160
161trait PacketFormat {
162    const FORMAT: Format;
163    const HEADER: &[u8] = &[];
164    const FOOTER: &[u8] = &[];
165
166    /// Writes an item in the required format.  Returns `Poll::Ready(Some(<separator length>))` if
167    /// an item was written, and `Poll::Ready(None)` if there are no more items.  If `first` is true,
168    /// this is the first item in this batch.
169    fn write_item(
170        self: Pin<&mut Self>,
171        cx: &mut Context<'_>,
172        first: bool,
173        buffer: &mut Vec<u8>,
174    ) -> Poll<Option<usize>>;
175}
176
177#[pin_project]
178pub struct PacketSerializer<T> {
179    stats: Option<Arc<BatchIteratorConnectionStats>>,
180    max_packet_size: u64,
181    #[pin]
182    format: T,
183    overflow: Vec<u8>,
184    finished: bool,
185}
186
187impl<T> PacketSerializer<T> {
188    fn with_format(
189        stats: Option<Arc<BatchIteratorConnectionStats>>,
190        max_packet_size: u64,
191        format: T,
192    ) -> Self {
193        Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
194    }
195}
196
197impl<T: PacketFormat> Stream for PacketSerializer<T> {
198    type Item = Result<SerializedVmo, AccessorError>;
199
200    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201        if self.finished {
202            return Poll::Ready(None);
203        }
204
205        // Limit packet size to prevent unbounded memory use.
206        const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; // 1 MiB
207        let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
208        let mut this = self.project();
209
210        let mut buffer = Vec::with_capacity(256 * 1024);
211        buffer.extend_from_slice(T::HEADER);
212
213        let mut first = true;
214
215        if !this.overflow.is_empty() {
216            buffer.append(this.overflow);
217            first = false;
218            if let Some(stats) = &this.stats {
219                stats.add_result();
220            }
221        }
222
223        let mut vmo = None;
224        let mut vmo_len = 0;
225
226        loop {
227            // Copy to the VMO if the room in the buffer drops below a threshold.
228            if buffer.capacity() - buffer.len() < 512 {
229                vmo.get_or_insert_with(|| zx::Vmo::create(max_packet_size).unwrap())
230                    .write(&buffer, vmo_len as u64)
231                    .unwrap();
232                vmo_len += buffer.len();
233                buffer.clear();
234            }
235
236            let last_len = buffer.len();
237
238            let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
239                Poll::Ready(Some(separator_len)) => separator_len,
240                Poll::Ready(None) => {
241                    *this.finished = true;
242                    if first {
243                        return Poll::Ready(None);
244                    } else {
245                        break;
246                    }
247                }
248                Poll::Pending => {
249                    if first {
250                        return Poll::Pending;
251                    } else {
252                        break;
253                    }
254                }
255            };
256
257            let item_len = buffer.len() - last_len - separator_len;
258
259            if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
260                warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
261                buffer.truncate(last_len);
262            } else {
263                if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
264                    // Last item put us over the maximum packet size, keep it for the next batch.
265                    // We should have at least one item because otherwise we should have gone
266                    // through the branch above.
267                    assert!(!first);
268                    this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
269                    buffer.truncate(last_len);
270                    break;
271                }
272
273                first = false;
274
275                if let Some(stats) = &this.stats {
276                    stats.add_result();
277                }
278            }
279        }
280
281        buffer.extend_from_slice(T::FOOTER);
282
283        let vmo = match vmo {
284            Some(vmo) => {
285                vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
286                vmo
287            }
288            None => zx::Vmo::create(buffer.len() as u64).unwrap(),
289        };
290        vmo.write(&buffer, vmo_len as u64).unwrap();
291        vmo_len += buffer.len();
292        Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
293    }
294}
295
296#[pin_project]
297pub struct FxtPacketFormat<I>(#[pin] I);
298
299impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
300    const FORMAT: Format = Format::Fxt;
301
302    fn write_item(
303        self: Pin<&mut Self>,
304        cx: &mut Context<'_>,
305        _first: bool,
306        buffer: &mut Vec<u8>,
307    ) -> Poll<Option<usize>> {
308        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
309            buffer.extend_from_slice(item.data());
310            extend_fxt_record(
311                item.component_identity(),
312                item.dropped(),
313                &ExtendRecordOpts { component_url: true, moniker: true, rolled_out: true },
314                buffer,
315            );
316            Poll::Ready(Some(0))
317        } else {
318            Poll::Ready(None)
319        }
320    }
321}
322
323pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
324
325impl<I> FxtPacketSerializer<I> {
326    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
327        Self::with_format(Some(stats), max_packet_size, FxtPacketFormat(items))
328    }
329}
330
331#[pin_project]
332pub struct JsonPacketFormat<I>(#[pin] I);
333
334impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
335    const FORMAT: Format = Format::Json;
336    const HEADER: &[u8] = b"[";
337    const FOOTER: &[u8] = b"]";
338
339    fn write_item(
340        self: Pin<&mut Self>,
341        cx: &mut Context<'_>,
342        first: bool,
343        buffer: &mut Vec<u8>,
344    ) -> Poll<Option<usize>> {
345        const SEPARATOR: &[u8] = b",\n";
346
347        if let Some(item) = ready!(self.project().0.poll_next(cx)) {
348            let separator_len = if !first {
349                buffer.extend_from_slice(SEPARATOR);
350                SEPARATOR.len()
351            } else {
352                0
353            };
354            // We don't expect serialization to fail because we should always be able to write to
355            // `buffer` and `item` is a type we control which we know should always be serializable.
356            serde_json::to_writer(buffer, &item).expect("failed to serialize item");
357            Poll::Ready(Some(separator_len))
358        } else {
359            Poll::Ready(None)
360        }
361    }
362}
363
364pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
365
366impl<I> JsonPacketSerializer<I> {
367    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
368        Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
369    }
370
371    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
372        Self::with_format(None, max_packet_size, JsonPacketFormat(items))
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use crate::diagnostics::AccessorStats;
380    use futures::stream::iter;
381
382    #[fuchsia::test]
383    async fn two_items_joined_and_split() {
384        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
385        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
386        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
387        let smallest_possible_joined_len = joined[0].len() as u64;
388
389        let make_packets = |max| async move {
390            let node = fuchsia_inspect::Node::default();
391            let accessor_stats = Arc::new(AccessorStats::new(node));
392            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
393            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
394                .collect::<Vec<_>>()
395                .await
396                .into_iter()
397                .map(|r| {
398                    let result = r.unwrap();
399                    let mut buf = vec![0; result.size as usize];
400                    result.vmo.read(&mut buf, 0).expect("reading vmo");
401                    std::str::from_utf8(&buf).unwrap().to_string()
402                })
403                .collect::<Vec<_>>()
404        };
405
406        let actual_joined = make_packets(smallest_possible_joined_len).await;
407        assert_eq!(&actual_joined[..], joined);
408
409        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
410        assert_eq!(&actual_split[..], split);
411    }
412
413    #[fuchsia::test]
414    async fn overflow_separator_added() {
415        let inputs = &[&"A", &"B", &"C"];
416        // "[" + "A" + "]" = 4 bytes.
417        // "[" + "A" + ",\n" + "B" + "]" = 4 + 2 + 3 = 9 bytes.
418        // If max is 8, "B" will overflow.
419        // Second packet starts with "B" from overflow.
420        // "[" + "B" + ",\n" + "C" + "]" = 4 + 2 + 3 = 9 bytes.
421        // If max is 8, "C" will overflow.
422        // Third packet starts with "C" from overflow.
423
424        let make_packets = |max| async move {
425            let node = fuchsia_inspect::Node::default();
426            let accessor_stats = Arc::new(AccessorStats::new(node));
427            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
428            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
429                .collect::<Vec<_>>()
430                .await
431                .into_iter()
432                .map(|r| {
433                    let result = r.unwrap();
434                    let mut buf = vec![0; result.size as usize];
435                    result.vmo.read(&mut buf, 0).expect("reading vmo");
436                    std::str::from_utf8(&buf).unwrap().to_string()
437                })
438                .collect::<Vec<_>>()
439        };
440
441        let packets = make_packets(8).await;
442        assert_eq!(packets.len(), 3);
443        assert_eq!(packets[0], r#"["A"]"#);
444        assert_eq!(packets[1], r#"["B"]"#);
445        assert_eq!(packets[2], r#"["C"]"#);
446
447        let packets = make_packets(10).await;
448        assert_eq!(packets.len(), 2);
449        assert_eq!(packets[0], "[\"A\",\n\"B\"]");
450        assert_eq!(packets[1], r#"["C"]"#);
451    }
452
453    #[fuchsia::test]
454    async fn oversize_item_not_dropped_incorrectly() {
455        let inputs = &[&"A", &"BCDEF"];
456        // Packet 1: ["A"] (4 bytes)
457        // Item 2: "BCDEF" (7 bytes)
458        // "[" + "A" + ",\n" + "BCDEF" + "]" = 1 + 3 + 2 + 7 + 1 = 14 bytes.
459        // If max is 11:
460        // "A" fits (4 bytes).
461        // "BCDEF" overflows.
462        // NEXT packet:
463        // "[" + "BCDEF" + "]" = 9 bytes.
464        // 9 fits in 11.
465
466        let make_packets = |max| async move {
467            let node = fuchsia_inspect::Node::default();
468            let accessor_stats = Arc::new(AccessorStats::new(node));
469            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
470            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
471                .collect::<Vec<_>>()
472                .await
473                .into_iter()
474                .map(|r| {
475                    let result = r.unwrap();
476                    let mut buf = vec![0; result.size as usize];
477                    result.vmo.read(&mut buf, 0).expect("reading vmo");
478                    std::str::from_utf8(&buf).unwrap().to_string()
479                })
480                .collect::<Vec<_>>()
481        };
482
483        let packets = make_packets(11).await;
484        assert_eq!(packets.len(), 2);
485        assert_eq!(packets[0], r#"["A"]"#);
486        assert_eq!(packets[1], r#"["BCDEF"]"#);
487    }
488
489    #[fuchsia::test]
490    async fn item_too_big_for_packet_is_dropped() {
491        let inputs = &[&"ABCDE"]; // 7 bytes
492        // "[" + 7 + "]" = 9 bytes.
493        // If max is 8, it should be dropped.
494
495        let make_packets = |max| async move {
496            let node = fuchsia_inspect::Node::default();
497            let accessor_stats = Arc::new(AccessorStats::new(node));
498            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
499            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
500                .collect::<Vec<_>>()
501                .await
502                .into_iter()
503                .map(|r| {
504                    let result = r.unwrap();
505                    let mut buf = vec![0; result.size as usize];
506                    result.vmo.read(&mut buf, 0).expect("reading vmo");
507                    std::str::from_utf8(&buf).unwrap().to_string()
508                })
509                .collect::<Vec<_>>()
510        };
511
512        let packets = make_packets(8).await;
513        // Item should be dropped, so we get no packets.
514        assert_eq!(packets.len(), 0);
515    }
516}