1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
10use fidl_fuchsia_diagnostics_types::Severity;
11
12use fidl_fuchsia_diagnostics::{
13 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
14};
15
16use fuchsia_async as fasync;
17use futures::{Stream, StreamExt};
18use log::warn;
19use pin_project::pin_project;
20use serde::Serialize;
21use std::io::Cursor;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll, ready};
25use zerocopy::{FromBytes, IntoBytes};
26use zx;
27
28static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
29static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
30
31const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
32
33pub type FormattedStream =
34 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
35
36#[pin_project]
37pub struct FormattedContentBatcher<C> {
38 #[pin]
39 items: C,
40 stats: Arc<BatchIteratorConnectionStats>,
41}
42
43pub fn new_batcher<I, T, E>(
52 items: I,
53 stats: Arc<BatchIteratorConnectionStats>,
54 mode: StreamMode,
55) -> FormattedStream
56where
57 I: Stream<Item = Result<T, E>> + Send + 'static,
58 T: Into<FormattedContent> + Send,
59 E: Into<AccessorError> + Send,
60{
61 match mode {
62 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
63 Box::pin(FormattedContentBatcher {
64 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
65 stats,
66 })
67 }
68 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
69 items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
70 stats,
71 }),
72 }
73}
74
75impl<I, T, E> Stream for FormattedContentBatcher<I>
76where
77 I: Stream<Item = Vec<Result<T, E>>>,
78 T: Into<FormattedContent>,
79 E: Into<AccessorError>,
80{
81 type Item = Vec<Result<FormattedContent, AccessorError>>;
82
83 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84 let this = self.project();
85 match this.items.poll_next(cx) {
86 Poll::Ready(Some(chunk)) => {
87 let mut batch = vec![];
89 for item in chunk {
90 let result = match item {
91 Ok(i) => Ok(i.into()),
92 Err(e) => {
93 this.stats.add_result_error();
94 Err(e.into())
95 }
96 };
97 batch.push(result);
98 }
99 Poll::Ready(Some(batch))
100 }
101 Poll::Ready(None) => Poll::Ready(None),
102 Poll::Pending => Poll::Pending,
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108enum TimerState {
109 Idle,
110 Running,
111}
112
113pub trait TimeLimitedChunksExt: Stream {
114 fn time_limited_chunks(
115 self,
116 capacity: usize,
117 timeout: std::time::Duration,
118 ) -> TimeLimitedChunks<Self>
119 where
120 Self: Sized,
121 {
122 TimeLimitedChunks::new(self, capacity, timeout)
123 }
124}
125
126impl<T: Stream> TimeLimitedChunksExt for T {}
127
128#[pin_project]
133pub struct TimeLimitedChunks<S: Stream> {
134 #[pin]
135 stream: S,
136 capacity: usize,
137 timeout: std::time::Duration,
138 buffer: Vec<S::Item>,
139 #[pin]
140 timer: fasync::Timer,
141 state: TimerState,
142}
143
144impl<S: Stream> TimeLimitedChunks<S> {
145 pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
146 Self {
147 stream,
148 capacity,
149 timeout,
150 buffer: Vec::with_capacity(capacity),
151 timer: fasync::Timer::new(std::time::Instant::now() + timeout),
152 state: TimerState::Idle,
153 }
154 }
155}
156
157impl<S: Stream> Stream for TimeLimitedChunks<S> {
158 type Item = Vec<S::Item>;
159
160 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 let mut this = self.project();
162
163 loop {
164 match this.stream.as_mut().poll_next(cx) {
165 Poll::Ready(Some(item)) => {
166 if this.buffer.is_empty() {
167 let deadline = fasync::MonotonicInstant::after(
168 zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
169 );
170 this.timer.as_mut().reset(deadline);
171 *this.state = TimerState::Running;
172 }
173 this.buffer.push(item);
174 if this.buffer.len() >= *this.capacity {
175 *this.state = TimerState::Idle;
176 return Poll::Ready(Some(std::mem::take(this.buffer)));
177 }
178 }
179 Poll::Ready(None) => {
180 if !this.buffer.is_empty() {
181 *this.state = TimerState::Idle;
182 return Poll::Ready(Some(std::mem::take(this.buffer)));
183 }
184 return Poll::Ready(None);
185 }
186 Poll::Pending => {
187 if !this.buffer.is_empty() && *this.state == TimerState::Running {
188 use std::future::Future;
189 if this.timer.as_mut().poll(cx).is_ready() {
190 *this.state = TimerState::Idle;
191 return Poll::Ready(Some(std::mem::take(this.buffer)));
192 }
193 }
194 return Poll::Pending;
195 }
196 }
197 }
198 }
199}
200
201pub struct SerializedVmo {
203 pub vmo: zx::Vmo,
204 pub size: u64,
205 format: Format,
206}
207
208impl SerializedVmo {
209 pub fn serialize(
210 source: &impl Serialize,
211 data_type: DataType,
212 format: Format,
213 ) -> Result<Self, AccessorError> {
214 let initial_buffer_capacity = match data_type {
215 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
216 DataType::Logs => 4096, };
220 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
221 match format {
222 Format::Json => {
223 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
224 }
225 Format::Cbor => ciborium::into_writer(source, &mut buffer)
226 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
227 Format::Text => unreachable!("We'll never get Text"),
228 Format::Fxt | Format::LegacyFxt => unreachable!("We'll never get FXT"),
229 }
230 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
231 let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
232 vmo.write(&buffer, 0).unwrap();
233 Ok(Self { vmo, size: buffer.len() as u64, format })
234 }
235}
236
237pub fn encode_rolled_out(count: u64) -> Vec<u8> {
238 let mut encoder =
239 Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
240 let record = Record {
241 timestamp: zx::BootInstant::from_nanos(0),
242 severity: Severity::Info.into_primitive(),
243 arguments: vec![Argument::new(
244 "rolled_out",
245 diagnostics_log_encoding::Value::UnsignedInt(count),
246 )],
247 };
248 encoder.write_record(record).unwrap();
250
251 let mut buffer = encoder.take().into_inner().into_inner();
252 if buffer.len() >= 8 {
253 let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
254 header.set_tag(LOG_CONTROL_BIT);
255 buffer[0..8].copy_from_slice(header.as_bytes());
256 }
257 buffer
258}
259
260impl From<SerializedVmo> for FormattedContent {
261 fn from(content: SerializedVmo) -> FormattedContent {
262 match content.format {
263 Format::Json => {
264 content
266 .vmo
267 .set_content_size(&content.size)
268 .expect("set_content_size always returns Ok");
269 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
270 vmo: content.vmo,
271 size: content.size,
272 })
273 }
274 Format::Cbor => {
275 content
276 .vmo
277 .set_content_size(&content.size)
278 .expect("set_content_size always returns Ok");
279 FormattedContent::Cbor(content.vmo)
280 }
281 Format::Fxt | Format::LegacyFxt => {
282 content
283 .vmo
284 .set_content_size(&content.size)
285 .expect("set_content_size always returns Ok");
286 FormattedContent::Fxt(content.vmo)
287 }
288 Format::Text => unreachable!("We'll never get Text"),
289 }
290 }
291}
292
293trait PacketFormat {
294 const FORMAT: Format;
295 const HEADER: &[u8] = &[];
296 const FOOTER: &[u8] = &[];
297
298 fn write_item(
302 self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 first: bool,
305 buffer: &mut Vec<u8>,
306 ) -> Poll<Option<usize>>;
307}
308
309#[pin_project]
310pub struct PacketSerializer<T> {
311 stats: Option<Arc<BatchIteratorConnectionStats>>,
312 max_packet_size: u64,
313 #[pin]
314 format: T,
315 overflow: Vec<u8>,
316 finished: bool,
317}
318
319impl<T> PacketSerializer<T> {
320 fn with_format(
321 stats: Option<Arc<BatchIteratorConnectionStats>>,
322 max_packet_size: u64,
323 format: T,
324 ) -> Self {
325 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
326 }
327}
328
329impl<T: PacketFormat> Stream for PacketSerializer<T> {
330 type Item = Result<SerializedVmo, AccessorError>;
331
332 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 if self.finished {
334 return Poll::Ready(None);
335 }
336
337 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
340 let mut this = self.project();
341
342 let mut buffer = Vec::with_capacity(256 * 1024);
343 buffer.extend_from_slice(T::HEADER);
344
345 let mut results = 0;
346
347 if !this.overflow.is_empty() {
348 buffer.append(this.overflow);
349 results += 1;
350 }
351
352 let mut vmo = None;
353 let mut vmo_len = 0;
354
355 loop {
356 if buffer.capacity() - buffer.len() < 512 {
358 vmo.get_or_insert_with(|| {
359 let v = zx::Vmo::create(max_packet_size).unwrap();
360 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
361 v
362 })
363 .write(&buffer, vmo_len as u64)
364 .unwrap();
365 vmo_len += buffer.len();
366 buffer.clear();
367 }
368
369 let last_len = buffer.len();
370
371 let separator_len = match this.format.as_mut().write_item(cx, results == 0, &mut buffer)
372 {
373 Poll::Ready(Some(separator_len)) => separator_len,
374 Poll::Ready(None) => {
375 *this.finished = true;
376 if results == 0 {
377 return Poll::Ready(None);
378 } else {
379 break;
380 }
381 }
382 Poll::Pending => {
383 if results == 0 {
384 return Poll::Pending;
385 } else {
386 break;
387 }
388 }
389 };
390
391 let item_len = buffer.len() - last_len - separator_len;
392
393 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
394 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
395 buffer.truncate(last_len);
396 } else {
397 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
398 assert!(results > 0);
402 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
403 buffer.truncate(last_len);
404 break;
405 }
406
407 results += 1;
408 }
409 }
410
411 if let Some(stats) = &this.stats {
412 stats.add_result(results);
413 }
414
415 buffer.extend_from_slice(T::FOOTER);
416
417 let vmo = match vmo {
418 Some(vmo) => {
419 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
420 vmo
421 }
422 None => {
423 let v = zx::Vmo::create(buffer.len() as u64).unwrap();
424 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
425 v
426 }
427 };
428 vmo.write(&buffer, vmo_len as u64).unwrap();
429 vmo_len += buffer.len();
430 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
431 }
432}
433
434#[pin_project]
435pub struct FxtPacketFormat {
436 #[pin]
437 pub cursor: FilterCursor,
438 pub subscribe_to_manifest: bool,
439 pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
440}
441
442impl PacketFormat for FxtPacketFormat {
443 const FORMAT: Format = Format::LegacyFxt;
444
445 fn write_item(
446 self: Pin<&mut Self>,
447 cx: &mut Context<'_>,
448 _first: bool,
449 buffer: &mut Vec<u8>,
450 ) -> Poll<Option<usize>> {
451 let mut this = self.project();
452 loop {
453 let Some(message) = ready!(this.cursor.as_mut().poll_next(cx)) else {
454 return Poll::Ready(None);
455 };
456
457 let (identity, header, data) = match message.parse() {
458 Ok(res) => res,
459 Err(_) => {
460 continue;
463 }
464 };
465 let tag = header.tag() as u64;
466
467 if *this.subscribe_to_manifest {
468 let send_manifest = match this.sent_tags.entry(tag) {
469 std::collections::hash_map::Entry::Vacant(e) => {
470 e.insert(Arc::clone(identity));
471 true
472 }
473 std::collections::hash_map::Entry::Occupied(mut e) => {
474 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
475 e.insert(Arc::clone(identity));
476 true
477 } else {
478 false
479 }
480 }
481 };
482
483 if message.dropped > 0 {
484 let rolled_out_buffer = encode_rolled_out(message.dropped);
485 buffer.extend_from_slice(&rolled_out_buffer);
486 }
487
488 if send_manifest {
489 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
490 use diagnostics_log_encoding::{Argument, LOG_CONTROL_BIT, Record};
491 use fidl_fuchsia_diagnostics_types::Severity;
492 use std::io::Cursor;
493
494 let mut encoder = Encoder::new(
495 Cursor::new(ResizableBuffer::from(Vec::new())),
496 EncoderOpts::default(),
497 );
498 let record = Record {
499 timestamp: zx::BootInstant::from_nanos(0),
500 severity: Severity::Info.into_primitive(),
501 arguments: vec![
502 Argument::other("moniker", identity.moniker.to_string()),
503 Argument::other("url", identity.url.as_str()),
504 ],
505 };
506 encoder.write_record(record).unwrap();
507 let mut manifest_buffer = encoder.take().into_inner().into_inner();
508 if manifest_buffer.len() >= 8 {
509 let mut header_manifest =
510 Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
511 header_manifest.set_tag((tag as u32) | LOG_CONTROL_BIT);
512 manifest_buffer[0..8].copy_from_slice(header_manifest.as_bytes());
513 }
514 buffer.extend_from_slice(&manifest_buffer);
515 }
516 }
517
518 buffer.extend_from_slice(header.as_bytes());
519 buffer.extend_from_slice(&data[8..]);
520
521 extend_fxt_record(
522 identity,
523 message.dropped,
524 &ExtendRecordOpts {
525 component_url: !*this.subscribe_to_manifest,
526 moniker: !*this.subscribe_to_manifest,
527 rolled_out: !*this.subscribe_to_manifest,
528 subscribe_to_manifest: false,
529 },
530 buffer,
531 );
532 return Poll::Ready(Some(0));
533 }
534 }
535}
536
537pub type FxtPacketSerializer = PacketSerializer<FxtPacketFormat>;
538
539impl FxtPacketSerializer {
540 pub fn new(
541 stats: Arc<BatchIteratorConnectionStats>,
542 max_packet_size: u64,
543 cursor: FilterCursor,
544 subscribe_to_manifest: bool,
545 ) -> Self {
546 Self::with_format(
547 Some(stats),
548 max_packet_size,
549 FxtPacketFormat {
550 cursor,
551 subscribe_to_manifest,
552 sent_tags: std::collections::HashMap::new(),
553 },
554 )
555 }
556}
557
558#[pin_project]
559pub struct JsonPacketFormat<I>(#[pin] I);
560
561impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
562 const FORMAT: Format = Format::Json;
563 const HEADER: &[u8] = b"[";
564 const FOOTER: &[u8] = b"]";
565
566 fn write_item(
567 self: Pin<&mut Self>,
568 cx: &mut Context<'_>,
569 first: bool,
570 buffer: &mut Vec<u8>,
571 ) -> Poll<Option<usize>> {
572 const SEPARATOR: &[u8] = b",\n";
573
574 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
575 let separator_len = if !first {
576 buffer.extend_from_slice(SEPARATOR);
577 SEPARATOR.len()
578 } else {
579 0
580 };
581 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
584 Poll::Ready(Some(separator_len))
585 } else {
586 Poll::Ready(None)
587 }
588 }
589}
590
591pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
592
593impl<I> JsonPacketSerializer<I> {
594 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
595 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
596 }
597
598 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
599 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use crate::diagnostics::AccessorStats;
607 use futures::stream::iter;
608
609 #[fuchsia::test]
610 async fn time_limited_chunks_yields_on_capacity() {
611 let stream = futures::stream::iter(vec![1, 2, 3, 4]);
612 let mut chunks =
613 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
614
615 assert_eq!(chunks.next().await, Some(vec![1, 2]));
616 assert_eq!(chunks.next().await, Some(vec![3, 4]));
617 assert_eq!(chunks.next().await, None);
618 }
619
620 #[fuchsia::test]
621 async fn time_limited_chunks_yields_on_stream_end() {
622 let stream = futures::stream::iter(vec![1, 2, 3]);
623 let mut chunks =
624 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
625
626 assert_eq!(chunks.next().await, Some(vec![1, 2]));
627 assert_eq!(chunks.next().await, Some(vec![3]));
628 assert_eq!(chunks.next().await, None);
629 }
630
631 #[fuchsia::test]
632 async fn time_limited_chunks_yields_on_timeout() {
633 let (tx, rx) = futures::channel::mpsc::unbounded();
634 let mut chunks =
635 Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
636
637 tx.unbounded_send(1).unwrap();
638
639 let start = std::time::Instant::now();
640 assert_eq!(chunks.next().await, Some(vec![1]));
641 assert!(start.elapsed() >= std::time::Duration::from_millis(10));
642
643 tx.unbounded_send(2).unwrap();
644 tx.unbounded_send(3).unwrap();
645 assert_eq!(chunks.next().await, Some(vec![2, 3]));
646
647 drop(tx);
648 assert_eq!(chunks.next().await, None);
649 }
650
651 #[fuchsia::test]
652 async fn two_items_joined_and_split() {
653 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
654 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
655 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
656 let smallest_possible_joined_len = joined[0].len() as u64;
657
658 let make_packets = |max| async move {
659 let node = fuchsia_inspect::Node::default();
660 let accessor_stats = Arc::new(AccessorStats::new(node));
661 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
662 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
663 .collect::<Vec<_>>()
664 .await
665 .into_iter()
666 .map(|r| {
667 let result = r.unwrap();
668 let mut buf = vec![0; result.size as usize];
669 result.vmo.read(&mut buf, 0).expect("reading vmo");
670 std::str::from_utf8(&buf).unwrap().to_string()
671 })
672 .collect::<Vec<_>>()
673 };
674
675 let actual_joined = make_packets(smallest_possible_joined_len).await;
676 assert_eq!(&actual_joined[..], joined);
677
678 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
679 assert_eq!(&actual_split[..], split);
680 }
681
682 #[fuchsia::test]
683 async fn overflow_separator_added() {
684 let inputs = &[&"A", &"B", &"C"];
685 let make_packets = |max| async move {
694 let node = fuchsia_inspect::Node::default();
695 let accessor_stats = Arc::new(AccessorStats::new(node));
696 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
697 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
698 .collect::<Vec<_>>()
699 .await
700 .into_iter()
701 .map(|r| {
702 let result = r.unwrap();
703 let mut buf = vec![0; result.size as usize];
704 result.vmo.read(&mut buf, 0).expect("reading vmo");
705 std::str::from_utf8(&buf).unwrap().to_string()
706 })
707 .collect::<Vec<_>>()
708 };
709
710 let packets = make_packets(8).await;
711 assert_eq!(packets.len(), 3);
712 assert_eq!(packets[0], r#"["A"]"#);
713 assert_eq!(packets[1], r#"["B"]"#);
714 assert_eq!(packets[2], r#"["C"]"#);
715
716 let packets = make_packets(10).await;
717 assert_eq!(packets.len(), 2);
718 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
719 assert_eq!(packets[1], r#"["C"]"#);
720 }
721
722 #[fuchsia::test]
723 async fn oversize_item_not_dropped_incorrectly() {
724 let inputs = &[&"A", &"BCDEF"];
725 let make_packets = |max| async move {
736 let node = fuchsia_inspect::Node::default();
737 let accessor_stats = Arc::new(AccessorStats::new(node));
738 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
739 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
740 .collect::<Vec<_>>()
741 .await
742 .into_iter()
743 .map(|r| {
744 let result = r.unwrap();
745 let mut buf = vec![0; result.size as usize];
746 result.vmo.read(&mut buf, 0).expect("reading vmo");
747 std::str::from_utf8(&buf).unwrap().to_string()
748 })
749 .collect::<Vec<_>>()
750 };
751
752 let packets = make_packets(11).await;
753 assert_eq!(packets.len(), 2);
754 assert_eq!(packets[0], r#"["A"]"#);
755 assert_eq!(packets[1], r#"["BCDEF"]"#);
756 }
757
758 #[fuchsia::test]
759 async fn item_too_big_for_packet_is_dropped() {
760 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
765 let node = fuchsia_inspect::Node::default();
766 let accessor_stats = Arc::new(AccessorStats::new(node));
767 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
768 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
769 .collect::<Vec<_>>()
770 .await
771 .into_iter()
772 .map(|r| {
773 let result = r.unwrap();
774 let mut buf = vec![0; result.size as usize];
775 result.vmo.read(&mut buf, 0).expect("reading vmo");
776 std::str::from_utf8(&buf).unwrap().to_string()
777 })
778 .collect::<Vec<_>>()
779 };
780
781 let packets = make_packets(8).await;
782 assert_eq!(packets.len(), 0);
784 }
785
786 #[fuchsia::test]
787 async fn fxt_packet_serializer_subscribe_to_manifest() {
788 use crate::identity::ComponentIdentity;
789 use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
790 use crate::logs::stats::LogStreamStats;
791 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
792 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
793 use fidl_fuchsia_diagnostics::StreamMode;
794 use fidl_fuchsia_diagnostics_types::Severity;
795 use fuchsia_inspect::Node;
796 use std::io::Cursor;
797 use zerocopy::{FromBytes, IntoBytes};
798
799 let identity1 = Arc::new(ComponentIdentity::unknown());
800 let mut identity2_inner = ComponentIdentity::unknown();
801 identity2_inner.url =
802 flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
803 let identity2 = Arc::new(identity2_inner);
804
805 let buffer = Arc::new(SharedBuffer::new(
806 create_ring_buffer(65536),
807 Box::new(|_| {}),
808 Default::default(),
809 &Node::default(),
810 ));
811 let stats1 = Arc::new(LogStreamStats::new(&Node::default(), &identity1));
812 let container1 = buffer.new_container_buffer(Arc::clone(&identity1), stats1);
813 let stats2 = Arc::new(LogStreamStats::new(&Node::default(), &identity2));
814 let container2 = buffer.new_container_buffer(Arc::clone(&identity2), stats2);
815
816 let mut buffer_out = Cursor::new(vec![0u8; 128]);
818 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
819 encoder
820 .write_record(Record {
821 timestamp: zx::BootInstant::from_nanos(100),
822 severity: Severity::Info.into_primitive(),
823 arguments: vec![Argument::other("msg", "hello")],
824 })
825 .unwrap();
826 let end = encoder.inner().position() as usize;
827 let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
828 let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
829 header.set_tag(1);
830 msg1_bytes[0..8].copy_from_slice(header.as_bytes());
831 container1.push_back(&msg1_bytes);
832
833 let mut buffer_out = Cursor::new(vec![0u8; 128]);
835 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
836 encoder
837 .write_record(Record {
838 timestamp: zx::BootInstant::from_nanos(200),
839 severity: Severity::Info.into_primitive(),
840 arguments: vec![Argument::other("msg", "world")],
841 })
842 .unwrap();
843 let end = encoder.inner().position() as usize;
844 let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
845 let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
846 header.set_tag(2);
847 msg2_bytes[0..8].copy_from_slice(header.as_bytes());
848 container2.push_back(&msg2_bytes);
849
850 let mut buffer_out = Cursor::new(vec![0u8; 128]);
852 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
853 encoder
854 .write_record(Record {
855 timestamp: zx::BootInstant::from_nanos(300),
856 severity: Severity::Info.into_primitive(),
857 arguments: vec![Argument::other("msg", "again")],
858 })
859 .unwrap();
860 let end = encoder.inner().position() as usize;
861 let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
862 let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
863 header.set_tag(2);
864 msg3_bytes[0..8].copy_from_slice(header.as_bytes());
865 container2.push_back(&msg3_bytes);
866
867 let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
868
869 let node = Node::default();
870 let accessor_stats = Arc::new(AccessorStats::new(node));
871 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
872
873 let packets: Vec<_> =
875 FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, cursor, true)
876 .collect::<Vec<_>>()
877 .await
878 .into_iter()
879 .map(|r| {
880 let result = r.unwrap();
881 let mut buf = vec![0; result.size as usize];
882 result.vmo.read(&mut buf, 0).expect("reading vmo");
883 buf
884 })
885 .collect();
886
887 assert_eq!(packets.len(), 1);
888 let output = &packets[0];
889
890 let mut current_slice = output.as_slice();
891 let mut records = vec![];
892
893 while !current_slice.is_empty() {
894 let (record, remaining) =
895 diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
896 let header =
897 diagnostics_log_encoding::Header::read_from_bytes(¤t_slice[..8]).unwrap();
898 records.push((header, record));
899 current_slice = remaining;
900 }
901
902 assert_eq!(records.len(), 5);
903
904 assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
906 assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 0);
907
908 assert_eq!(records[1].0.tag(), 0);
910 assert_eq!(records[1].1.arguments.len(), 1); assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
914 assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 1);
915
916 assert_eq!(records[3].0.tag(), 1);
918 assert_eq!(records[3].1.arguments.len(), 1); assert_eq!(records[4].0.tag(), 1);
922 assert_eq!(records[4].1.arguments.len(), 1); }
924}