1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::Header;
9use fidl_fuchsia_diagnostics::{
10 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
11};
12
13use fuchsia_async as fasync;
14use futures::{Stream, StreamExt};
15use log::warn;
16use pin_project::pin_project;
17use serde::Serialize;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21use zerocopy::{FromBytes, IntoBytes};
22use zx;
23
24static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
25static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
26
27const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
28
29pub type FormattedStream =
30 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
31
32#[pin_project]
33pub struct FormattedContentBatcher<C> {
34 #[pin]
35 items: C,
36 stats: Arc<BatchIteratorConnectionStats>,
37}
38
39pub fn new_batcher<I, T, E>(
48 items: I,
49 stats: Arc<BatchIteratorConnectionStats>,
50 mode: StreamMode,
51) -> FormattedStream
52where
53 I: Stream<Item = Result<T, E>> + Send + 'static,
54 T: Into<FormattedContent> + Send,
55 E: Into<AccessorError> + Send,
56{
57 match mode {
58 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
59 Box::pin(FormattedContentBatcher {
60 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
61 stats,
62 })
63 }
64 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
65 items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
66 stats,
67 }),
68 }
69}
70
71impl<I, T, E> Stream for FormattedContentBatcher<I>
72where
73 I: Stream<Item = Vec<Result<T, E>>>,
74 T: Into<FormattedContent>,
75 E: Into<AccessorError>,
76{
77 type Item = Vec<Result<FormattedContent, AccessorError>>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 let this = self.project();
81 match this.items.poll_next(cx) {
82 Poll::Ready(Some(chunk)) => {
83 let mut batch = vec![];
85 for item in chunk {
86 let result = match item {
87 Ok(i) => Ok(i.into()),
88 Err(e) => {
89 this.stats.add_result_error();
90 Err(e.into())
91 }
92 };
93 batch.push(result);
94 }
95 Poll::Ready(Some(batch))
96 }
97 Poll::Ready(None) => Poll::Ready(None),
98 Poll::Pending => Poll::Pending,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104enum TimerState {
105 Idle,
106 Running,
107}
108
109pub trait TimeLimitedChunksExt: Stream {
110 fn time_limited_chunks(
111 self,
112 capacity: usize,
113 timeout: std::time::Duration,
114 ) -> TimeLimitedChunks<Self>
115 where
116 Self: Sized,
117 {
118 TimeLimitedChunks::new(self, capacity, timeout)
119 }
120}
121
122impl<T: Stream> TimeLimitedChunksExt for T {}
123
124#[pin_project]
129pub struct TimeLimitedChunks<S: Stream> {
130 #[pin]
131 stream: S,
132 capacity: usize,
133 timeout: std::time::Duration,
134 buffer: Vec<S::Item>,
135 #[pin]
136 timer: fasync::Timer,
137 state: TimerState,
138}
139
140impl<S: Stream> TimeLimitedChunks<S> {
141 pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
142 Self {
143 stream,
144 capacity,
145 timeout,
146 buffer: Vec::with_capacity(capacity),
147 timer: fasync::Timer::new(std::time::Instant::now() + timeout),
148 state: TimerState::Idle,
149 }
150 }
151}
152
153impl<S: Stream> Stream for TimeLimitedChunks<S> {
154 type Item = Vec<S::Item>;
155
156 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 let mut this = self.project();
158
159 loop {
160 match this.stream.as_mut().poll_next(cx) {
161 Poll::Ready(Some(item)) => {
162 if this.buffer.is_empty() {
163 let deadline = fasync::MonotonicInstant::after(
164 zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
165 );
166 this.timer.as_mut().reset(deadline);
167 *this.state = TimerState::Running;
168 }
169 this.buffer.push(item);
170 if this.buffer.len() >= *this.capacity {
171 *this.state = TimerState::Idle;
172 return Poll::Ready(Some(std::mem::take(this.buffer)));
173 }
174 }
175 Poll::Ready(None) => {
176 if !this.buffer.is_empty() {
177 *this.state = TimerState::Idle;
178 return Poll::Ready(Some(std::mem::take(this.buffer)));
179 }
180 return Poll::Ready(None);
181 }
182 Poll::Pending => {
183 if !this.buffer.is_empty() && *this.state == TimerState::Running {
184 use std::future::Future;
185 if this.timer.as_mut().poll(cx).is_ready() {
186 *this.state = TimerState::Idle;
187 return Poll::Ready(Some(std::mem::take(this.buffer)));
188 }
189 }
190 return Poll::Pending;
191 }
192 }
193 }
194 }
195}
196
197pub struct SerializedVmo {
199 pub vmo: zx::Vmo,
200 pub size: u64,
201 format: Format,
202}
203
204impl SerializedVmo {
205 pub fn serialize(
206 source: &impl Serialize,
207 data_type: DataType,
208 format: Format,
209 ) -> Result<Self, AccessorError> {
210 let initial_buffer_capacity = match data_type {
211 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
212 DataType::Logs => 4096, };
216 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
217 match format {
218 Format::Json => {
219 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
220 }
221 Format::Cbor => ciborium::into_writer(source, &mut buffer)
222 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
223 Format::Text => unreachable!("We'll never get Text"),
224 Format::Fxt => unreachable!("We'll never get FXT"),
225 }
226 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
227 let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
228 vmo.write(&buffer, 0).unwrap();
229 Ok(Self { vmo, size: buffer.len() as u64, format })
230 }
231}
232
233impl From<SerializedVmo> for FormattedContent {
234 fn from(content: SerializedVmo) -> FormattedContent {
235 match content.format {
236 Format::Json => {
237 content
239 .vmo
240 .set_content_size(&content.size)
241 .expect("set_content_size always returns Ok");
242 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
243 vmo: content.vmo,
244 size: content.size,
245 })
246 }
247 Format::Cbor => {
248 content
249 .vmo
250 .set_content_size(&content.size)
251 .expect("set_content_size always returns Ok");
252 FormattedContent::Cbor(content.vmo)
253 }
254 Format::Fxt => {
255 content
256 .vmo
257 .set_content_size(&content.size)
258 .expect("set_content_size always returns Ok");
259 FormattedContent::Fxt(content.vmo)
260 }
261 Format::Text => unreachable!("We'll never get Text"),
262 }
263 }
264}
265
266trait PacketFormat {
267 const FORMAT: Format;
268 const HEADER: &[u8] = &[];
269 const FOOTER: &[u8] = &[];
270
271 fn write_item(
275 self: Pin<&mut Self>,
276 cx: &mut Context<'_>,
277 first: bool,
278 buffer: &mut Vec<u8>,
279 ) -> Poll<Option<usize>>;
280}
281
282#[pin_project]
283pub struct PacketSerializer<T> {
284 stats: Option<Arc<BatchIteratorConnectionStats>>,
285 max_packet_size: u64,
286 #[pin]
287 format: T,
288 overflow: Vec<u8>,
289 finished: bool,
290}
291
292impl<T> PacketSerializer<T> {
293 fn with_format(
294 stats: Option<Arc<BatchIteratorConnectionStats>>,
295 max_packet_size: u64,
296 format: T,
297 ) -> Self {
298 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
299 }
300}
301
302impl<T: PacketFormat> Stream for PacketSerializer<T> {
303 type Item = Result<SerializedVmo, AccessorError>;
304
305 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
306 if self.finished {
307 return Poll::Ready(None);
308 }
309
310 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
313 let mut this = self.project();
314
315 let mut buffer = Vec::with_capacity(256 * 1024);
316 buffer.extend_from_slice(T::HEADER);
317
318 let mut results = 0;
319
320 if !this.overflow.is_empty() {
321 buffer.append(this.overflow);
322 results += 1;
323 }
324
325 let mut vmo = None;
326 let mut vmo_len = 0;
327
328 loop {
329 if buffer.capacity() - buffer.len() < 512 {
331 vmo.get_or_insert_with(|| {
332 let v = zx::Vmo::create(max_packet_size).unwrap();
333 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
334 v
335 })
336 .write(&buffer, vmo_len as u64)
337 .unwrap();
338 vmo_len += buffer.len();
339 buffer.clear();
340 }
341
342 let last_len = buffer.len();
343
344 let separator_len = match this.format.as_mut().write_item(cx, results == 0, &mut buffer)
345 {
346 Poll::Ready(Some(separator_len)) => separator_len,
347 Poll::Ready(None) => {
348 *this.finished = true;
349 if results == 0 {
350 return Poll::Ready(None);
351 } else {
352 break;
353 }
354 }
355 Poll::Pending => {
356 if results == 0 {
357 return Poll::Pending;
358 } else {
359 break;
360 }
361 }
362 };
363
364 let item_len = buffer.len() - last_len - separator_len;
365
366 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
367 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
368 buffer.truncate(last_len);
369 } else {
370 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
371 assert!(results > 0);
375 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
376 buffer.truncate(last_len);
377 break;
378 }
379
380 results += 1;
381 }
382 }
383
384 if let Some(stats) = &this.stats {
385 stats.add_result(results);
386 }
387
388 buffer.extend_from_slice(T::FOOTER);
389
390 let vmo = match vmo {
391 Some(vmo) => {
392 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
393 vmo
394 }
395 None => {
396 let v = zx::Vmo::create(buffer.len() as u64).unwrap();
397 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
398 v
399 }
400 };
401 vmo.write(&buffer, vmo_len as u64).unwrap();
402 vmo_len += buffer.len();
403 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
404 }
405}
406
407#[pin_project]
408pub struct FxtPacketFormat {
409 #[pin]
410 pub cursor: FilterCursor,
411 pub subscribe_to_manifest: bool,
412 pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
413}
414
415impl PacketFormat for FxtPacketFormat {
416 const FORMAT: Format = Format::Fxt;
417
418 fn write_item(
419 self: Pin<&mut Self>,
420 cx: &mut Context<'_>,
421 _first: bool,
422 buffer: &mut Vec<u8>,
423 ) -> Poll<Option<usize>> {
424 let mut this = self.project();
425 loop {
426 let Some(message) = ready!(this.cursor.as_mut().poll_next(cx)) else {
427 return Poll::Ready(None);
428 };
429
430 let (identity, header, data) = match message.parse() {
431 Ok(res) => res,
432 Err(_) => {
433 continue;
436 }
437 };
438 let tag = header.tag() as u64;
439
440 if *this.subscribe_to_manifest {
441 let send_manifest = match this.sent_tags.entry(tag) {
442 std::collections::hash_map::Entry::Vacant(e) => {
443 e.insert(Arc::clone(identity));
444 true
445 }
446 std::collections::hash_map::Entry::Occupied(mut e) => {
447 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
448 e.insert(Arc::clone(identity));
449 true
450 } else {
451 false
452 }
453 }
454 };
455
456 if send_manifest {
457 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
458 use diagnostics_log_encoding::{Argument, LOG_CONTROL_BIT, Record};
459 use fidl_fuchsia_diagnostics_types::Severity;
460 use std::io::Cursor;
461
462 let mut encoder = Encoder::new(
463 Cursor::new(ResizableBuffer::from(Vec::new())),
464 EncoderOpts::default(),
465 );
466 let record = Record {
467 timestamp: zx::BootInstant::from_nanos(0),
468 severity: Severity::Info.into_primitive(),
469 arguments: vec![
470 Argument::other("moniker", identity.moniker.to_string()),
471 Argument::other("url", identity.url.as_str()),
472 ],
473 };
474 encoder.write_record(record).unwrap();
475 let mut manifest_buffer = encoder.take().into_inner().into_inner();
476 if manifest_buffer.len() >= 8 {
477 let mut header_manifest =
478 Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
479 header_manifest.set_tag((tag as u32) | LOG_CONTROL_BIT);
480 manifest_buffer[0..8].copy_from_slice(header_manifest.as_bytes());
481 }
482 buffer.extend_from_slice(&manifest_buffer);
483 }
484 }
485
486 buffer.extend_from_slice(header.as_bytes());
487 buffer.extend_from_slice(&data[8..]);
488
489 extend_fxt_record(
490 identity,
491 message.dropped,
492 &ExtendRecordOpts {
493 component_url: !*this.subscribe_to_manifest,
494 moniker: !*this.subscribe_to_manifest,
495 rolled_out: !*this.subscribe_to_manifest,
496 subscribe_to_manifest: false,
497 },
498 buffer,
499 );
500 return Poll::Ready(Some(0));
501 }
502 }
503}
504
505pub type FxtPacketSerializer = PacketSerializer<FxtPacketFormat>;
506
507impl FxtPacketSerializer {
508 pub fn new(
509 stats: Arc<BatchIteratorConnectionStats>,
510 max_packet_size: u64,
511 cursor: FilterCursor,
512 subscribe_to_manifest: bool,
513 ) -> Self {
514 Self::with_format(
515 Some(stats),
516 max_packet_size,
517 FxtPacketFormat {
518 cursor,
519 subscribe_to_manifest,
520 sent_tags: std::collections::HashMap::new(),
521 },
522 )
523 }
524}
525
526#[pin_project]
527pub struct JsonPacketFormat<I>(#[pin] I);
528
529impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
530 const FORMAT: Format = Format::Json;
531 const HEADER: &[u8] = b"[";
532 const FOOTER: &[u8] = b"]";
533
534 fn write_item(
535 self: Pin<&mut Self>,
536 cx: &mut Context<'_>,
537 first: bool,
538 buffer: &mut Vec<u8>,
539 ) -> Poll<Option<usize>> {
540 const SEPARATOR: &[u8] = b",\n";
541
542 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
543 let separator_len = if !first {
544 buffer.extend_from_slice(SEPARATOR);
545 SEPARATOR.len()
546 } else {
547 0
548 };
549 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
552 Poll::Ready(Some(separator_len))
553 } else {
554 Poll::Ready(None)
555 }
556 }
557}
558
559pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
560
561impl<I> JsonPacketSerializer<I> {
562 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
563 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
564 }
565
566 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
567 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::diagnostics::AccessorStats;
575 use futures::stream::iter;
576
577 #[fuchsia::test]
578 async fn time_limited_chunks_yields_on_capacity() {
579 let stream = futures::stream::iter(vec![1, 2, 3, 4]);
580 let mut chunks =
581 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
582
583 assert_eq!(chunks.next().await, Some(vec![1, 2]));
584 assert_eq!(chunks.next().await, Some(vec![3, 4]));
585 assert_eq!(chunks.next().await, None);
586 }
587
588 #[fuchsia::test]
589 async fn time_limited_chunks_yields_on_stream_end() {
590 let stream = futures::stream::iter(vec![1, 2, 3]);
591 let mut chunks =
592 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
593
594 assert_eq!(chunks.next().await, Some(vec![1, 2]));
595 assert_eq!(chunks.next().await, Some(vec![3]));
596 assert_eq!(chunks.next().await, None);
597 }
598
599 #[fuchsia::test]
600 async fn time_limited_chunks_yields_on_timeout() {
601 let (tx, rx) = futures::channel::mpsc::unbounded();
602 let mut chunks =
603 Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
604
605 tx.unbounded_send(1).unwrap();
606
607 let start = std::time::Instant::now();
608 assert_eq!(chunks.next().await, Some(vec![1]));
609 assert!(start.elapsed() >= std::time::Duration::from_millis(10));
610
611 tx.unbounded_send(2).unwrap();
612 tx.unbounded_send(3).unwrap();
613 assert_eq!(chunks.next().await, Some(vec![2, 3]));
614
615 drop(tx);
616 assert_eq!(chunks.next().await, None);
617 }
618
619 #[fuchsia::test]
620 async fn two_items_joined_and_split() {
621 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
622 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
623 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
624 let smallest_possible_joined_len = joined[0].len() as u64;
625
626 let make_packets = |max| async move {
627 let node = fuchsia_inspect::Node::default();
628 let accessor_stats = Arc::new(AccessorStats::new(node));
629 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
630 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
631 .collect::<Vec<_>>()
632 .await
633 .into_iter()
634 .map(|r| {
635 let result = r.unwrap();
636 let mut buf = vec![0; result.size as usize];
637 result.vmo.read(&mut buf, 0).expect("reading vmo");
638 std::str::from_utf8(&buf).unwrap().to_string()
639 })
640 .collect::<Vec<_>>()
641 };
642
643 let actual_joined = make_packets(smallest_possible_joined_len).await;
644 assert_eq!(&actual_joined[..], joined);
645
646 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
647 assert_eq!(&actual_split[..], split);
648 }
649
650 #[fuchsia::test]
651 async fn overflow_separator_added() {
652 let inputs = &[&"A", &"B", &"C"];
653 let make_packets = |max| async move {
662 let node = fuchsia_inspect::Node::default();
663 let accessor_stats = Arc::new(AccessorStats::new(node));
664 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
665 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
666 .collect::<Vec<_>>()
667 .await
668 .into_iter()
669 .map(|r| {
670 let result = r.unwrap();
671 let mut buf = vec![0; result.size as usize];
672 result.vmo.read(&mut buf, 0).expect("reading vmo");
673 std::str::from_utf8(&buf).unwrap().to_string()
674 })
675 .collect::<Vec<_>>()
676 };
677
678 let packets = make_packets(8).await;
679 assert_eq!(packets.len(), 3);
680 assert_eq!(packets[0], r#"["A"]"#);
681 assert_eq!(packets[1], r#"["B"]"#);
682 assert_eq!(packets[2], r#"["C"]"#);
683
684 let packets = make_packets(10).await;
685 assert_eq!(packets.len(), 2);
686 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
687 assert_eq!(packets[1], r#"["C"]"#);
688 }
689
690 #[fuchsia::test]
691 async fn oversize_item_not_dropped_incorrectly() {
692 let inputs = &[&"A", &"BCDEF"];
693 let make_packets = |max| async move {
704 let node = fuchsia_inspect::Node::default();
705 let accessor_stats = Arc::new(AccessorStats::new(node));
706 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
707 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
708 .collect::<Vec<_>>()
709 .await
710 .into_iter()
711 .map(|r| {
712 let result = r.unwrap();
713 let mut buf = vec![0; result.size as usize];
714 result.vmo.read(&mut buf, 0).expect("reading vmo");
715 std::str::from_utf8(&buf).unwrap().to_string()
716 })
717 .collect::<Vec<_>>()
718 };
719
720 let packets = make_packets(11).await;
721 assert_eq!(packets.len(), 2);
722 assert_eq!(packets[0], r#"["A"]"#);
723 assert_eq!(packets[1], r#"["BCDEF"]"#);
724 }
725
726 #[fuchsia::test]
727 async fn item_too_big_for_packet_is_dropped() {
728 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
733 let node = fuchsia_inspect::Node::default();
734 let accessor_stats = Arc::new(AccessorStats::new(node));
735 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
736 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
737 .collect::<Vec<_>>()
738 .await
739 .into_iter()
740 .map(|r| {
741 let result = r.unwrap();
742 let mut buf = vec![0; result.size as usize];
743 result.vmo.read(&mut buf, 0).expect("reading vmo");
744 std::str::from_utf8(&buf).unwrap().to_string()
745 })
746 .collect::<Vec<_>>()
747 };
748
749 let packets = make_packets(8).await;
750 assert_eq!(packets.len(), 0);
752 }
753
754 #[fuchsia::test]
755 async fn fxt_packet_serializer_subscribe_to_manifest() {
756 use crate::identity::ComponentIdentity;
757 use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
758 use crate::logs::stats::LogStreamStats;
759 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
760 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
761 use fidl_fuchsia_diagnostics::StreamMode;
762 use fidl_fuchsia_diagnostics_types::Severity;
763 use fuchsia_inspect::Node;
764 use std::io::Cursor;
765 use zerocopy::{FromBytes, IntoBytes};
766
767 let identity1 = Arc::new(ComponentIdentity::unknown());
768 let mut identity2_inner = ComponentIdentity::unknown();
769 identity2_inner.url =
770 flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
771 let identity2 = Arc::new(identity2_inner);
772
773 let buffer = Arc::new(SharedBuffer::new(
774 create_ring_buffer(65536),
775 Box::new(|_| {}),
776 Default::default(),
777 &Node::default(),
778 ));
779 let stats1 = Arc::new(LogStreamStats::new(&Node::default(), &identity1));
780 let container1 = buffer.new_container_buffer(Arc::clone(&identity1), stats1);
781 let stats2 = Arc::new(LogStreamStats::new(&Node::default(), &identity2));
782 let container2 = buffer.new_container_buffer(Arc::clone(&identity2), stats2);
783
784 let mut buffer_out = Cursor::new(vec![0u8; 128]);
786 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
787 encoder
788 .write_record(Record {
789 timestamp: zx::BootInstant::from_nanos(100),
790 severity: Severity::Info.into_primitive(),
791 arguments: vec![Argument::other("msg", "hello")],
792 })
793 .unwrap();
794 let end = encoder.inner().position() as usize;
795 let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
796 let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
797 header.set_tag(1);
798 msg1_bytes[0..8].copy_from_slice(header.as_bytes());
799 container1.push_back(&msg1_bytes);
800
801 let mut buffer_out = Cursor::new(vec![0u8; 128]);
803 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
804 encoder
805 .write_record(Record {
806 timestamp: zx::BootInstant::from_nanos(200),
807 severity: Severity::Info.into_primitive(),
808 arguments: vec![Argument::other("msg", "world")],
809 })
810 .unwrap();
811 let end = encoder.inner().position() as usize;
812 let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
813 let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
814 header.set_tag(2);
815 msg2_bytes[0..8].copy_from_slice(header.as_bytes());
816 container2.push_back(&msg2_bytes);
817
818 let mut buffer_out = Cursor::new(vec![0u8; 128]);
820 let mut encoder = Encoder::new(&mut buffer_out, EncoderOpts::default());
821 encoder
822 .write_record(Record {
823 timestamp: zx::BootInstant::from_nanos(300),
824 severity: Severity::Info.into_primitive(),
825 arguments: vec![Argument::other("msg", "again")],
826 })
827 .unwrap();
828 let end = encoder.inner().position() as usize;
829 let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
830 let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
831 header.set_tag(2);
832 msg3_bytes[0..8].copy_from_slice(header.as_bytes());
833 container2.push_back(&msg3_bytes);
834
835 let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
836
837 let node = Node::default();
838 let accessor_stats = Arc::new(AccessorStats::new(node));
839 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
840
841 let packets: Vec<_> =
843 FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, cursor, true)
844 .collect::<Vec<_>>()
845 .await
846 .into_iter()
847 .map(|r| {
848 let result = r.unwrap();
849 let mut buf = vec![0; result.size as usize];
850 result.vmo.read(&mut buf, 0).expect("reading vmo");
851 buf
852 })
853 .collect();
854
855 assert_eq!(packets.len(), 1);
856 let output = &packets[0];
857
858 let mut current_slice = output.as_slice();
859 let mut records = vec![];
860
861 while !current_slice.is_empty() {
862 let (record, remaining) =
863 diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
864 let header =
865 diagnostics_log_encoding::Header::read_from_bytes(¤t_slice[..8]).unwrap();
866 records.push((header, record));
867 current_slice = remaining;
868 }
869
870 assert_eq!(records.len(), 5);
871
872 assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
874 assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 0);
875
876 assert_eq!(records[1].0.tag(), 0);
878 assert_eq!(records[1].1.arguments.len(), 1); assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
882 assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 1);
883
884 assert_eq!(records[3].0.tag(), 1);
886 assert_eq!(records[3].1.arguments.len(), 1); assert_eq!(records[4].0.tag(), 1);
890 assert_eq!(records[4].1.arguments.len(), 1); }
892}