1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use fuchsia_async as fasync;
13use futures::{Stream, StreamExt};
14use log::warn;
15use pin_project::pin_project;
16use serde::Serialize;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll, ready};
20use zx;
21
22static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
23static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
24
25const SNAPSHOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15);
26
27pub type FormattedStream =
28 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
29
30#[pin_project]
31pub struct FormattedContentBatcher<C> {
32 #[pin]
33 items: C,
34 stats: Arc<BatchIteratorConnectionStats>,
35}
36
37pub fn new_batcher<I, T, E>(
46 items: I,
47 stats: Arc<BatchIteratorConnectionStats>,
48 mode: StreamMode,
49) -> FormattedStream
50where
51 I: Stream<Item = Result<T, E>> + Send + 'static,
52 T: Into<FormattedContent> + Send,
53 E: Into<AccessorError> + Send,
54{
55 match mode {
56 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
57 Box::pin(FormattedContentBatcher {
58 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
59 stats,
60 })
61 }
62 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
63 items: items.time_limited_chunks(MAXIMUM_ENTRIES_PER_BATCH as usize, SNAPSHOT_TIMEOUT),
64 stats,
65 }),
66 }
67}
68
69impl<I, T, E> Stream for FormattedContentBatcher<I>
70where
71 I: Stream<Item = Vec<Result<T, E>>>,
72 T: Into<FormattedContent>,
73 E: Into<AccessorError>,
74{
75 type Item = Vec<Result<FormattedContent, AccessorError>>;
76
77 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 let this = self.project();
79 match this.items.poll_next(cx) {
80 Poll::Ready(Some(chunk)) => {
81 let mut batch = vec![];
83 for item in chunk {
84 let result = match item {
85 Ok(i) => Ok(i.into()),
86 Err(e) => {
87 this.stats.add_result_error();
88 Err(e.into())
89 }
90 };
91 batch.push(result);
92 }
93 Poll::Ready(Some(batch))
94 }
95 Poll::Ready(None) => Poll::Ready(None),
96 Poll::Pending => Poll::Pending,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum TimerState {
103 Idle,
104 Running,
105}
106
107pub trait TimeLimitedChunksExt: Stream {
108 fn time_limited_chunks(
109 self,
110 capacity: usize,
111 timeout: std::time::Duration,
112 ) -> TimeLimitedChunks<Self>
113 where
114 Self: Sized,
115 {
116 TimeLimitedChunks::new(self, capacity, timeout)
117 }
118}
119
120impl<T: Stream> TimeLimitedChunksExt for T {}
121
122#[pin_project]
127pub struct TimeLimitedChunks<S: Stream> {
128 #[pin]
129 stream: S,
130 capacity: usize,
131 timeout: std::time::Duration,
132 buffer: Vec<S::Item>,
133 #[pin]
134 timer: fasync::Timer,
135 state: TimerState,
136}
137
138impl<S: Stream> TimeLimitedChunks<S> {
139 pub fn new(stream: S, capacity: usize, timeout: std::time::Duration) -> Self {
140 Self {
141 stream,
142 capacity,
143 timeout,
144 buffer: Vec::with_capacity(capacity),
145 timer: fasync::Timer::new(std::time::Instant::now() + timeout),
146 state: TimerState::Idle,
147 }
148 }
149}
150
151impl<S: Stream> Stream for TimeLimitedChunks<S> {
152 type Item = Vec<S::Item>;
153
154 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
155 let mut this = self.project();
156
157 loop {
158 match this.stream.as_mut().poll_next(cx) {
159 Poll::Ready(Some(item)) => {
160 if this.buffer.is_empty() {
161 let deadline = fasync::MonotonicInstant::after(
162 zx::MonotonicDuration::from_nanos(this.timeout.as_nanos() as i64),
163 );
164 this.timer.as_mut().reset(deadline);
165 *this.state = TimerState::Running;
166 }
167 this.buffer.push(item);
168 if this.buffer.len() >= *this.capacity {
169 *this.state = TimerState::Idle;
170 return Poll::Ready(Some(std::mem::take(this.buffer)));
171 }
172 }
173 Poll::Ready(None) => {
174 if !this.buffer.is_empty() {
175 *this.state = TimerState::Idle;
176 return Poll::Ready(Some(std::mem::take(this.buffer)));
177 }
178 return Poll::Ready(None);
179 }
180 Poll::Pending => {
181 if !this.buffer.is_empty() && *this.state == TimerState::Running {
182 use std::future::Future;
183 if this.timer.as_mut().poll(cx).is_ready() {
184 *this.state = TimerState::Idle;
185 return Poll::Ready(Some(std::mem::take(this.buffer)));
186 }
187 }
188 return Poll::Pending;
189 }
190 }
191 }
192 }
193}
194
195pub struct SerializedVmo {
197 pub vmo: zx::Vmo,
198 pub size: u64,
199 format: Format,
200}
201
202impl SerializedVmo {
203 pub fn serialize(
204 source: &impl Serialize,
205 data_type: DataType,
206 format: Format,
207 ) -> Result<Self, AccessorError> {
208 let initial_buffer_capacity = match data_type {
209 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
210 DataType::Logs => 4096, };
214 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
215 match format {
216 Format::Json => {
217 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
218 }
219 Format::Cbor => ciborium::into_writer(source, &mut buffer)
220 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
221 Format::Text => unreachable!("We'll never get Text"),
222 Format::Fxt => unreachable!("We'll never get FXT"),
223 }
224 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
225 let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
226 vmo.write(&buffer, 0).unwrap();
227 Ok(Self { vmo, size: buffer.len() as u64, format })
228 }
229}
230
231impl From<SerializedVmo> for FormattedContent {
232 fn from(content: SerializedVmo) -> FormattedContent {
233 match content.format {
234 Format::Json => {
235 content
237 .vmo
238 .set_content_size(&content.size)
239 .expect("set_content_size always returns Ok");
240 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
241 vmo: content.vmo,
242 size: content.size,
243 })
244 }
245 Format::Cbor => {
246 content
247 .vmo
248 .set_content_size(&content.size)
249 .expect("set_content_size always returns Ok");
250 FormattedContent::Cbor(content.vmo)
251 }
252 Format::Fxt => {
253 content
254 .vmo
255 .set_content_size(&content.size)
256 .expect("set_content_size always returns Ok");
257 FormattedContent::Fxt(content.vmo)
258 }
259 Format::Text => unreachable!("We'll never get Text"),
260 }
261 }
262}
263
264trait PacketFormat {
265 const FORMAT: Format;
266 const HEADER: &[u8] = &[];
267 const FOOTER: &[u8] = &[];
268
269 fn write_item(
273 self: Pin<&mut Self>,
274 cx: &mut Context<'_>,
275 first: bool,
276 buffer: &mut Vec<u8>,
277 ) -> Poll<Option<usize>>;
278}
279
280#[pin_project]
281pub struct PacketSerializer<T> {
282 stats: Option<Arc<BatchIteratorConnectionStats>>,
283 max_packet_size: u64,
284 #[pin]
285 format: T,
286 overflow: Vec<u8>,
287 finished: bool,
288}
289
290impl<T> PacketSerializer<T> {
291 fn with_format(
292 stats: Option<Arc<BatchIteratorConnectionStats>>,
293 max_packet_size: u64,
294 format: T,
295 ) -> Self {
296 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
297 }
298}
299
300impl<T: PacketFormat> Stream for PacketSerializer<T> {
301 type Item = Result<SerializedVmo, AccessorError>;
302
303 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304 if self.finished {
305 return Poll::Ready(None);
306 }
307
308 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
311 let mut this = self.project();
312
313 let mut buffer = Vec::with_capacity(256 * 1024);
314 buffer.extend_from_slice(T::HEADER);
315
316 let mut first = true;
317
318 if !this.overflow.is_empty() {
319 buffer.append(this.overflow);
320 first = false;
321 if let Some(stats) = &this.stats {
322 stats.add_result();
323 }
324 }
325
326 let mut vmo = None;
327 let mut vmo_len = 0;
328
329 loop {
330 if buffer.capacity() - buffer.len() < 512 {
332 vmo.get_or_insert_with(|| {
333 let v = zx::Vmo::create(max_packet_size).unwrap();
334 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
335 v
336 })
337 .write(&buffer, vmo_len as u64)
338 .unwrap();
339 vmo_len += buffer.len();
340 buffer.clear();
341 }
342
343 let last_len = buffer.len();
344
345 let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
346 Poll::Ready(Some(separator_len)) => separator_len,
347 Poll::Ready(None) => {
348 *this.finished = true;
349 if first {
350 return Poll::Ready(None);
351 } else {
352 break;
353 }
354 }
355 Poll::Pending => {
356 if first {
357 return Poll::Pending;
358 } else {
359 break;
360 }
361 }
362 };
363
364 let item_len = buffer.len() - last_len - separator_len;
365
366 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
367 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
368 buffer.truncate(last_len);
369 } else {
370 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
371 assert!(!first);
375 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
376 buffer.truncate(last_len);
377 break;
378 }
379
380 first = false;
381
382 if let Some(stats) = &this.stats {
383 stats.add_result();
384 }
385 }
386 }
387
388 buffer.extend_from_slice(T::FOOTER);
389
390 let vmo = match vmo {
391 Some(vmo) => {
392 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
393 vmo
394 }
395 None => {
396 let v = zx::Vmo::create(buffer.len() as u64).unwrap();
397 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
398 v
399 }
400 };
401 vmo.write(&buffer, vmo_len as u64).unwrap();
402 vmo_len += buffer.len();
403 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
404 }
405}
406
407#[pin_project]
408pub struct FxtPacketFormat<I> {
409 #[pin]
410 pub stream: I,
411 pub subscribe_to_manifest: bool,
412 pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
413}
414
415impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
416 const FORMAT: Format = Format::Fxt;
417
418 fn write_item(
419 self: Pin<&mut Self>,
420 cx: &mut Context<'_>,
421 _first: bool,
422 buffer: &mut Vec<u8>,
423 ) -> Poll<Option<usize>> {
424 let this = self.project();
425 if let Some(item) = ready!(this.stream.poll_next(cx)) {
426 if *this.subscribe_to_manifest {
427 let tag = item.tag();
428 let identity = item.component_identity();
429 let send_manifest = match this.sent_tags.entry(tag) {
430 std::collections::hash_map::Entry::Vacant(e) => {
431 e.insert(Arc::clone(identity));
432 true
433 }
434 std::collections::hash_map::Entry::Occupied(mut e) => {
435 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
436 e.insert(Arc::clone(identity));
437 true
438 } else {
439 false
440 }
441 }
442 };
443
444 if send_manifest {
445 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
446 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
447 use fidl_fuchsia_diagnostics_types::Severity;
448 use std::io::Cursor;
449 use zerocopy::{FromBytes, IntoBytes};
450
451 let mut encoder = Encoder::new(
452 Cursor::new(ResizableBuffer::from(Vec::new())),
453 EncoderOpts::default(),
454 );
455 let record = Record {
456 timestamp: zx::BootInstant::from_nanos(0),
457 severity: Severity::Info.into_primitive(),
458 arguments: vec![
459 Argument::other("moniker", identity.moniker.to_string()),
460 Argument::other("url", identity.url.as_str()),
461 ],
462 };
463 encoder.write_record(record).unwrap();
464 let mut manifest_buffer = encoder.take().into_inner().into_inner();
465 if manifest_buffer.len() >= 8 {
466 let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
467 header.set_tag((tag as u32) | LOG_CONTROL_BIT);
468 manifest_buffer[0..8].copy_from_slice(header.as_bytes());
469 }
470 buffer.extend_from_slice(&manifest_buffer);
471 }
472 }
473
474 buffer.extend_from_slice(item.data());
475
476 extend_fxt_record(
478 item.component_identity(),
479 item.dropped(),
480 &ExtendRecordOpts {
481 component_url: !*this.subscribe_to_manifest,
482 moniker: !*this.subscribe_to_manifest,
483 rolled_out: !*this.subscribe_to_manifest,
484 subscribe_to_manifest: false,
485 },
486 buffer,
487 );
488 Poll::Ready(Some(0))
489 } else {
490 Poll::Ready(None)
491 }
492 }
493}
494
495pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
496
497impl<I> FxtPacketSerializer<I> {
498 pub fn new(
499 stats: Arc<BatchIteratorConnectionStats>,
500 max_packet_size: u64,
501 items: I,
502 subscribe_to_manifest: bool,
503 ) -> Self {
504 Self::with_format(
505 Some(stats),
506 max_packet_size,
507 FxtPacketFormat {
508 stream: items,
509 subscribe_to_manifest,
510 sent_tags: std::collections::HashMap::new(),
511 },
512 )
513 }
514}
515
516#[pin_project]
517pub struct JsonPacketFormat<I>(#[pin] I);
518
519impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
520 const FORMAT: Format = Format::Json;
521 const HEADER: &[u8] = b"[";
522 const FOOTER: &[u8] = b"]";
523
524 fn write_item(
525 self: Pin<&mut Self>,
526 cx: &mut Context<'_>,
527 first: bool,
528 buffer: &mut Vec<u8>,
529 ) -> Poll<Option<usize>> {
530 const SEPARATOR: &[u8] = b",\n";
531
532 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
533 let separator_len = if !first {
534 buffer.extend_from_slice(SEPARATOR);
535 SEPARATOR.len()
536 } else {
537 0
538 };
539 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
542 Poll::Ready(Some(separator_len))
543 } else {
544 Poll::Ready(None)
545 }
546 }
547}
548
549pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
550
551impl<I> JsonPacketSerializer<I> {
552 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
553 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
554 }
555
556 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
557 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::diagnostics::AccessorStats;
565 use futures::stream::iter;
566
567 #[fuchsia::test]
568 async fn time_limited_chunks_yields_on_capacity() {
569 let stream = futures::stream::iter(vec![1, 2, 3, 4]);
570 let mut chunks =
571 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
572
573 assert_eq!(chunks.next().await, Some(vec![1, 2]));
574 assert_eq!(chunks.next().await, Some(vec![3, 4]));
575 assert_eq!(chunks.next().await, None);
576 }
577
578 #[fuchsia::test]
579 async fn time_limited_chunks_yields_on_stream_end() {
580 let stream = futures::stream::iter(vec![1, 2, 3]);
581 let mut chunks =
582 Box::pin(TimeLimitedChunks::new(stream, 2, std::time::Duration::from_secs(30)));
583
584 assert_eq!(chunks.next().await, Some(vec![1, 2]));
585 assert_eq!(chunks.next().await, Some(vec![3]));
586 assert_eq!(chunks.next().await, None);
587 }
588
589 #[fuchsia::test]
590 async fn time_limited_chunks_yields_on_timeout() {
591 let (tx, rx) = futures::channel::mpsc::unbounded();
592 let mut chunks =
593 Box::pin(TimeLimitedChunks::new(rx, 2, std::time::Duration::from_millis(10)));
594
595 tx.unbounded_send(1).unwrap();
596
597 let start = std::time::Instant::now();
598 assert_eq!(chunks.next().await, Some(vec![1]));
599 assert!(start.elapsed() >= std::time::Duration::from_millis(10));
600
601 tx.unbounded_send(2).unwrap();
602 tx.unbounded_send(3).unwrap();
603 assert_eq!(chunks.next().await, Some(vec![2, 3]));
604
605 drop(tx);
606 assert_eq!(chunks.next().await, None);
607 }
608
609 #[fuchsia::test]
610 async fn two_items_joined_and_split() {
611 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
612 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
613 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
614 let smallest_possible_joined_len = joined[0].len() as u64;
615
616 let make_packets = |max| async move {
617 let node = fuchsia_inspect::Node::default();
618 let accessor_stats = Arc::new(AccessorStats::new(node));
619 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
620 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
621 .collect::<Vec<_>>()
622 .await
623 .into_iter()
624 .map(|r| {
625 let result = r.unwrap();
626 let mut buf = vec![0; result.size as usize];
627 result.vmo.read(&mut buf, 0).expect("reading vmo");
628 std::str::from_utf8(&buf).unwrap().to_string()
629 })
630 .collect::<Vec<_>>()
631 };
632
633 let actual_joined = make_packets(smallest_possible_joined_len).await;
634 assert_eq!(&actual_joined[..], joined);
635
636 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
637 assert_eq!(&actual_split[..], split);
638 }
639
640 #[fuchsia::test]
641 async fn overflow_separator_added() {
642 let inputs = &[&"A", &"B", &"C"];
643 let make_packets = |max| async move {
652 let node = fuchsia_inspect::Node::default();
653 let accessor_stats = Arc::new(AccessorStats::new(node));
654 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
655 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
656 .collect::<Vec<_>>()
657 .await
658 .into_iter()
659 .map(|r| {
660 let result = r.unwrap();
661 let mut buf = vec![0; result.size as usize];
662 result.vmo.read(&mut buf, 0).expect("reading vmo");
663 std::str::from_utf8(&buf).unwrap().to_string()
664 })
665 .collect::<Vec<_>>()
666 };
667
668 let packets = make_packets(8).await;
669 assert_eq!(packets.len(), 3);
670 assert_eq!(packets[0], r#"["A"]"#);
671 assert_eq!(packets[1], r#"["B"]"#);
672 assert_eq!(packets[2], r#"["C"]"#);
673
674 let packets = make_packets(10).await;
675 assert_eq!(packets.len(), 2);
676 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
677 assert_eq!(packets[1], r#"["C"]"#);
678 }
679
680 #[fuchsia::test]
681 async fn oversize_item_not_dropped_incorrectly() {
682 let inputs = &[&"A", &"BCDEF"];
683 let make_packets = |max| async move {
694 let node = fuchsia_inspect::Node::default();
695 let accessor_stats = Arc::new(AccessorStats::new(node));
696 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
697 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
698 .collect::<Vec<_>>()
699 .await
700 .into_iter()
701 .map(|r| {
702 let result = r.unwrap();
703 let mut buf = vec![0; result.size as usize];
704 result.vmo.read(&mut buf, 0).expect("reading vmo");
705 std::str::from_utf8(&buf).unwrap().to_string()
706 })
707 .collect::<Vec<_>>()
708 };
709
710 let packets = make_packets(11).await;
711 assert_eq!(packets.len(), 2);
712 assert_eq!(packets[0], r#"["A"]"#);
713 assert_eq!(packets[1], r#"["BCDEF"]"#);
714 }
715
716 #[fuchsia::test]
717 async fn item_too_big_for_packet_is_dropped() {
718 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
723 let node = fuchsia_inspect::Node::default();
724 let accessor_stats = Arc::new(AccessorStats::new(node));
725 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
726 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
727 .collect::<Vec<_>>()
728 .await
729 .into_iter()
730 .map(|r| {
731 let result = r.unwrap();
732 let mut buf = vec![0; result.size as usize];
733 result.vmo.read(&mut buf, 0).expect("reading vmo");
734 std::str::from_utf8(&buf).unwrap().to_string()
735 })
736 .collect::<Vec<_>>()
737 };
738
739 let packets = make_packets(8).await;
740 assert_eq!(packets.len(), 0);
742 }
743
744 #[fuchsia::test]
745 async fn fxt_packet_serializer_subscribe_to_manifest() {
746 use crate::identity::ComponentIdentity;
747 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
748 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
749 use fidl_fuchsia_diagnostics_types::Severity;
750 use std::io::Cursor;
751 use zerocopy::{FromBytes, IntoBytes};
752
753 let identity1 = Arc::new(ComponentIdentity::unknown());
754 let mut identity2_inner = ComponentIdentity::unknown();
755 identity2_inner.url =
756 flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
757 let identity2 = Arc::new(identity2_inner);
758
759 let mut buffer = Cursor::new(vec![0u8; 128]);
761 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
762 encoder
763 .write_record(Record {
764 timestamp: zx::BootInstant::from_nanos(100),
765 severity: Severity::Info.into_primitive(),
766 arguments: vec![Argument::other("msg", "hello")],
767 })
768 .unwrap();
769 let end = encoder.inner().position() as usize;
770 let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
771 let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
772 header.set_tag(1);
773 msg1_bytes[0..8].copy_from_slice(header.as_bytes());
774 let msg1 =
775 FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
776
777 let mut buffer = Cursor::new(vec![0u8; 128]);
779 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
780 encoder
781 .write_record(Record {
782 timestamp: zx::BootInstant::from_nanos(200),
783 severity: Severity::Info.into_primitive(),
784 arguments: vec![Argument::other("msg", "world")],
785 })
786 .unwrap();
787 let end = encoder.inner().position() as usize;
788 let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
789 let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
790 header.set_tag(2);
791 msg2_bytes[0..8].copy_from_slice(header.as_bytes());
792 let msg2 =
793 FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
794
795 let mut buffer = Cursor::new(vec![0u8; 128]);
797 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
798 encoder
799 .write_record(Record {
800 timestamp: zx::BootInstant::from_nanos(300),
801 severity: Severity::Info.into_primitive(),
802 arguments: vec![Argument::other("msg", "again")],
803 })
804 .unwrap();
805 let end = encoder.inner().position() as usize;
806 let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
807 let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
808 header.set_tag(2);
809 msg3_bytes[0..8].copy_from_slice(header.as_bytes());
810 let msg3 =
811 FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
812
813 let inputs = vec![msg1, msg2, msg3];
814
815 let node = fuchsia_inspect::Node::default();
816 let accessor_stats = Arc::new(AccessorStats::new(node));
817 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
818
819 let packets: Vec<_> =
821 FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
822 .collect::<Vec<_>>()
823 .await
824 .into_iter()
825 .map(|r| {
826 let result = r.unwrap();
827 let mut buf = vec![0; result.size as usize];
828 result.vmo.read(&mut buf, 0).expect("reading vmo");
829 buf
830 })
831 .collect();
832
833 assert_eq!(packets.len(), 1);
834 let output = &packets[0];
835
836 let mut current_slice = output.as_slice();
837 let mut records = vec![];
838
839 while !current_slice.is_empty() {
840 let (record, remaining) =
841 diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
842 let header =
843 diagnostics_log_encoding::Header::read_from_bytes(¤t_slice[..8]).unwrap();
844 records.push((header, record));
845 current_slice = remaining;
846 }
847
848 assert_eq!(records.len(), 5);
849
850 assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
852 assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
853
854 assert_eq!(records[1].0.tag(), 1);
856 assert_eq!(records[1].1.arguments.len(), 1); assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
860 assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
861
862 assert_eq!(records[3].0.tag(), 2);
864 assert_eq!(records[3].1.arguments.len(), 1); assert_eq!(records[4].0.tag(), 2);
868 assert_eq!(records[4].1.arguments.len(), 1); }
870}