1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19
20pub type FormattedStream =
21 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
22
23#[pin_project]
24pub struct FormattedContentBatcher<C> {
25 #[pin]
26 items: C,
27 stats: Arc<BatchIteratorConnectionStats>,
28}
29
30pub fn new_batcher<I, T, E>(
38 items: I,
39 stats: Arc<BatchIteratorConnectionStats>,
40 mode: StreamMode,
41) -> FormattedStream
42where
43 I: Stream<Item = Result<T, E>> + Send + 'static,
44 T: Into<FormattedContent> + Send,
45 E: Into<AccessorError> + Send,
46{
47 match mode {
48 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
49 Box::pin(FormattedContentBatcher {
50 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
51 stats,
52 })
53 }
54 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
55 items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
56 stats,
57 }),
58 }
59}
60
61impl<I, T, E> Stream for FormattedContentBatcher<I>
62where
63 I: Stream<Item = Vec<Result<T, E>>>,
64 T: Into<FormattedContent>,
65 E: Into<AccessorError>,
66{
67 type Item = Vec<Result<FormattedContent, AccessorError>>;
68
69 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 let this = self.project();
71 match this.items.poll_next(cx) {
72 Poll::Ready(Some(chunk)) => {
73 let mut batch = vec![];
75 for item in chunk {
76 let result = match item {
77 Ok(i) => Ok(i.into()),
78 Err(e) => {
79 this.stats.add_result_error();
80 Err(e.into())
81 }
82 };
83 batch.push(result);
84 }
85 Poll::Ready(Some(batch))
86 }
87 Poll::Ready(None) => Poll::Ready(None),
88 Poll::Pending => Poll::Pending,
89 }
90 }
91}
92
93pub struct SerializedVmo {
95 pub vmo: zx::Vmo,
96 pub size: u64,
97 format: Format,
98}
99
100impl SerializedVmo {
101 pub fn serialize(
102 source: &impl Serialize,
103 data_type: DataType,
104 format: Format,
105 ) -> Result<Self, AccessorError> {
106 let initial_buffer_capacity = match data_type {
107 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
108 DataType::Logs => 4096, };
112 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
113 match format {
114 Format::Json => {
115 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
116 }
117 Format::Cbor => ciborium::into_writer(source, &mut buffer)
118 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
119 Format::Text => unreachable!("We'll never get Text"),
120 Format::Fxt => unreachable!("We'll never get FXT"),
121 }
122 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
123 vmo.write(&buffer, 0).unwrap();
124 Ok(Self { vmo, size: buffer.len() as u64, format })
125 }
126}
127
128impl From<SerializedVmo> for FormattedContent {
129 fn from(content: SerializedVmo) -> FormattedContent {
130 match content.format {
131 Format::Json => {
132 content
134 .vmo
135 .set_content_size(&content.size)
136 .expect("set_content_size always returns Ok");
137 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
138 vmo: content.vmo,
139 size: content.size,
140 })
141 }
142 Format::Cbor => {
143 content
144 .vmo
145 .set_content_size(&content.size)
146 .expect("set_content_size always returns Ok");
147 FormattedContent::Cbor(content.vmo)
148 }
149 Format::Fxt => {
150 content
151 .vmo
152 .set_content_size(&content.size)
153 .expect("set_content_size always returns Ok");
154 FormattedContent::Fxt(content.vmo)
155 }
156 Format::Text => unreachable!("We'll never get Text"),
157 }
158 }
159}
160
161trait PacketFormat {
162 const FORMAT: Format;
163 const HEADER: &[u8] = &[];
164 const FOOTER: &[u8] = &[];
165
166 fn write_item(
170 self: Pin<&mut Self>,
171 cx: &mut Context<'_>,
172 first: bool,
173 buffer: &mut Vec<u8>,
174 ) -> Poll<Option<usize>>;
175}
176
177#[pin_project]
178pub struct PacketSerializer<T> {
179 stats: Option<Arc<BatchIteratorConnectionStats>>,
180 max_packet_size: u64,
181 #[pin]
182 format: T,
183 overflow: Vec<u8>,
184 finished: bool,
185}
186
187impl<T> PacketSerializer<T> {
188 fn with_format(
189 stats: Option<Arc<BatchIteratorConnectionStats>>,
190 max_packet_size: u64,
191 format: T,
192 ) -> Self {
193 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
194 }
195}
196
197impl<T: PacketFormat> Stream for PacketSerializer<T> {
198 type Item = Result<SerializedVmo, AccessorError>;
199
200 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201 if self.finished {
202 return Poll::Ready(None);
203 }
204
205 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
208 let mut this = self.project();
209
210 let mut buffer = Vec::with_capacity(256 * 1024);
211 buffer.extend_from_slice(T::HEADER);
212
213 let mut first = true;
214
215 if !this.overflow.is_empty() {
216 buffer.append(this.overflow);
217 first = false;
218 if let Some(stats) = &this.stats {
219 stats.add_result();
220 }
221 }
222
223 let mut vmo = None;
224 let mut vmo_len = 0;
225
226 loop {
227 if buffer.capacity() - buffer.len() < 512 {
229 vmo.get_or_insert_with(|| zx::Vmo::create(max_packet_size).unwrap())
230 .write(&buffer, vmo_len as u64)
231 .unwrap();
232 vmo_len += buffer.len();
233 buffer.clear();
234 }
235
236 let last_len = buffer.len();
237
238 let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
239 Poll::Ready(Some(separator_len)) => separator_len,
240 Poll::Ready(None) => {
241 *this.finished = true;
242 if first {
243 return Poll::Ready(None);
244 } else {
245 break;
246 }
247 }
248 Poll::Pending => {
249 if first {
250 return Poll::Pending;
251 } else {
252 break;
253 }
254 }
255 };
256
257 let item_len = buffer.len() - last_len - separator_len;
258
259 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
260 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
261 buffer.truncate(last_len);
262 } else {
263 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
264 assert!(!first);
268 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
269 buffer.truncate(last_len);
270 break;
271 }
272
273 first = false;
274
275 if let Some(stats) = &this.stats {
276 stats.add_result();
277 }
278 }
279 }
280
281 buffer.extend_from_slice(T::FOOTER);
282
283 let vmo = match vmo {
284 Some(vmo) => {
285 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
286 vmo
287 }
288 None => zx::Vmo::create(buffer.len() as u64).unwrap(),
289 };
290 vmo.write(&buffer, vmo_len as u64).unwrap();
291 vmo_len += buffer.len();
292 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
293 }
294}
295
296#[pin_project]
297pub struct FxtPacketFormat<I> {
298 #[pin]
299 pub stream: I,
300 pub subscribe_to_manifest: bool,
301 pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
302}
303
304impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
305 const FORMAT: Format = Format::Fxt;
306
307 fn write_item(
308 self: Pin<&mut Self>,
309 cx: &mut Context<'_>,
310 _first: bool,
311 buffer: &mut Vec<u8>,
312 ) -> Poll<Option<usize>> {
313 let this = self.project();
314 if let Some(item) = ready!(this.stream.poll_next(cx)) {
315 if *this.subscribe_to_manifest {
316 let tag = item.tag();
317 let identity = item.component_identity();
318 let send_manifest = match this.sent_tags.entry(tag) {
319 std::collections::hash_map::Entry::Vacant(e) => {
320 e.insert(Arc::clone(identity));
321 true
322 }
323 std::collections::hash_map::Entry::Occupied(mut e) => {
324 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
325 e.insert(Arc::clone(identity));
326 true
327 } else {
328 false
329 }
330 }
331 };
332
333 if send_manifest {
334 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
335 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
336 use fidl_fuchsia_diagnostics_types::Severity;
337 use std::io::Cursor;
338 use zerocopy::{FromBytes, IntoBytes};
339
340 let mut encoder = Encoder::new(
341 Cursor::new(ResizableBuffer::from(Vec::new())),
342 EncoderOpts::default(),
343 );
344 let record = Record {
345 timestamp: zx::BootInstant::from_nanos(0),
346 severity: Severity::Info.into_primitive(),
347 arguments: vec![
348 Argument::other("moniker", identity.moniker.to_string()),
349 Argument::other("url", identity.url.as_str()),
350 ],
351 };
352 encoder.write_record(record).unwrap();
353 let mut manifest_buffer = encoder.take().into_inner().into_inner();
354 if manifest_buffer.len() >= 8 {
355 let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
356 header.set_tag((tag as u32) | LOG_CONTROL_BIT);
357 manifest_buffer[0..8].copy_from_slice(header.as_bytes());
358 }
359 buffer.extend_from_slice(&manifest_buffer);
360 }
361 }
362
363 buffer.extend_from_slice(item.data());
364
365 extend_fxt_record(
367 item.component_identity(),
368 item.dropped(),
369 &ExtendRecordOpts {
370 component_url: !*this.subscribe_to_manifest,
371 moniker: !*this.subscribe_to_manifest,
372 rolled_out: !*this.subscribe_to_manifest,
373 subscribe_to_manifest: false,
374 },
375 buffer,
376 );
377 Poll::Ready(Some(0))
378 } else {
379 Poll::Ready(None)
380 }
381 }
382}
383
384pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
385
386impl<I> FxtPacketSerializer<I> {
387 pub fn new(
388 stats: Arc<BatchIteratorConnectionStats>,
389 max_packet_size: u64,
390 items: I,
391 subscribe_to_manifest: bool,
392 ) -> Self {
393 Self::with_format(
394 Some(stats),
395 max_packet_size,
396 FxtPacketFormat {
397 stream: items,
398 subscribe_to_manifest,
399 sent_tags: std::collections::HashMap::new(),
400 },
401 )
402 }
403}
404
405#[pin_project]
406pub struct JsonPacketFormat<I>(#[pin] I);
407
408impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
409 const FORMAT: Format = Format::Json;
410 const HEADER: &[u8] = b"[";
411 const FOOTER: &[u8] = b"]";
412
413 fn write_item(
414 self: Pin<&mut Self>,
415 cx: &mut Context<'_>,
416 first: bool,
417 buffer: &mut Vec<u8>,
418 ) -> Poll<Option<usize>> {
419 const SEPARATOR: &[u8] = b",\n";
420
421 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
422 let separator_len = if !first {
423 buffer.extend_from_slice(SEPARATOR);
424 SEPARATOR.len()
425 } else {
426 0
427 };
428 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
431 Poll::Ready(Some(separator_len))
432 } else {
433 Poll::Ready(None)
434 }
435 }
436}
437
438pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
439
440impl<I> JsonPacketSerializer<I> {
441 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
442 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
443 }
444
445 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
446 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::diagnostics::AccessorStats;
454 use futures::stream::iter;
455
456 #[fuchsia::test]
457 async fn two_items_joined_and_split() {
458 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
459 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
460 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
461 let smallest_possible_joined_len = joined[0].len() as u64;
462
463 let make_packets = |max| async move {
464 let node = fuchsia_inspect::Node::default();
465 let accessor_stats = Arc::new(AccessorStats::new(node));
466 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
467 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
468 .collect::<Vec<_>>()
469 .await
470 .into_iter()
471 .map(|r| {
472 let result = r.unwrap();
473 let mut buf = vec![0; result.size as usize];
474 result.vmo.read(&mut buf, 0).expect("reading vmo");
475 std::str::from_utf8(&buf).unwrap().to_string()
476 })
477 .collect::<Vec<_>>()
478 };
479
480 let actual_joined = make_packets(smallest_possible_joined_len).await;
481 assert_eq!(&actual_joined[..], joined);
482
483 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
484 assert_eq!(&actual_split[..], split);
485 }
486
487 #[fuchsia::test]
488 async fn overflow_separator_added() {
489 let inputs = &[&"A", &"B", &"C"];
490 let make_packets = |max| async move {
499 let node = fuchsia_inspect::Node::default();
500 let accessor_stats = Arc::new(AccessorStats::new(node));
501 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
502 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
503 .collect::<Vec<_>>()
504 .await
505 .into_iter()
506 .map(|r| {
507 let result = r.unwrap();
508 let mut buf = vec![0; result.size as usize];
509 result.vmo.read(&mut buf, 0).expect("reading vmo");
510 std::str::from_utf8(&buf).unwrap().to_string()
511 })
512 .collect::<Vec<_>>()
513 };
514
515 let packets = make_packets(8).await;
516 assert_eq!(packets.len(), 3);
517 assert_eq!(packets[0], r#"["A"]"#);
518 assert_eq!(packets[1], r#"["B"]"#);
519 assert_eq!(packets[2], r#"["C"]"#);
520
521 let packets = make_packets(10).await;
522 assert_eq!(packets.len(), 2);
523 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
524 assert_eq!(packets[1], r#"["C"]"#);
525 }
526
527 #[fuchsia::test]
528 async fn oversize_item_not_dropped_incorrectly() {
529 let inputs = &[&"A", &"BCDEF"];
530 let make_packets = |max| async move {
541 let node = fuchsia_inspect::Node::default();
542 let accessor_stats = Arc::new(AccessorStats::new(node));
543 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
544 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
545 .collect::<Vec<_>>()
546 .await
547 .into_iter()
548 .map(|r| {
549 let result = r.unwrap();
550 let mut buf = vec![0; result.size as usize];
551 result.vmo.read(&mut buf, 0).expect("reading vmo");
552 std::str::from_utf8(&buf).unwrap().to_string()
553 })
554 .collect::<Vec<_>>()
555 };
556
557 let packets = make_packets(11).await;
558 assert_eq!(packets.len(), 2);
559 assert_eq!(packets[0], r#"["A"]"#);
560 assert_eq!(packets[1], r#"["BCDEF"]"#);
561 }
562
563 #[fuchsia::test]
564 async fn item_too_big_for_packet_is_dropped() {
565 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
570 let node = fuchsia_inspect::Node::default();
571 let accessor_stats = Arc::new(AccessorStats::new(node));
572 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
573 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
574 .collect::<Vec<_>>()
575 .await
576 .into_iter()
577 .map(|r| {
578 let result = r.unwrap();
579 let mut buf = vec![0; result.size as usize];
580 result.vmo.read(&mut buf, 0).expect("reading vmo");
581 std::str::from_utf8(&buf).unwrap().to_string()
582 })
583 .collect::<Vec<_>>()
584 };
585
586 let packets = make_packets(8).await;
587 assert_eq!(packets.len(), 0);
589 }
590
591 #[fuchsia::test]
592 async fn fxt_packet_serializer_subscribe_to_manifest() {
593 use crate::identity::ComponentIdentity;
594 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
595 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
596 use fidl_fuchsia_diagnostics_types::Severity;
597 use std::io::Cursor;
598 use zerocopy::{FromBytes, IntoBytes};
599
600 let identity1 = Arc::new(ComponentIdentity::unknown());
601 let mut identity2_inner = ComponentIdentity::unknown();
602 identity2_inner.url =
603 flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
604 let identity2 = Arc::new(identity2_inner);
605
606 let mut buffer = Cursor::new(vec![0u8; 128]);
608 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
609 encoder
610 .write_record(Record {
611 timestamp: zx::BootInstant::from_nanos(100),
612 severity: Severity::Info.into_primitive(),
613 arguments: vec![Argument::other("msg", "hello")],
614 })
615 .unwrap();
616 let end = encoder.inner().position() as usize;
617 let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
618 let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
619 header.set_tag(1);
620 msg1_bytes[0..8].copy_from_slice(header.as_bytes());
621 let msg1 =
622 FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
623
624 let mut buffer = Cursor::new(vec![0u8; 128]);
626 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
627 encoder
628 .write_record(Record {
629 timestamp: zx::BootInstant::from_nanos(200),
630 severity: Severity::Info.into_primitive(),
631 arguments: vec![Argument::other("msg", "world")],
632 })
633 .unwrap();
634 let end = encoder.inner().position() as usize;
635 let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
636 let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
637 header.set_tag(2);
638 msg2_bytes[0..8].copy_from_slice(header.as_bytes());
639 let msg2 =
640 FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
641
642 let mut buffer = Cursor::new(vec![0u8; 128]);
644 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
645 encoder
646 .write_record(Record {
647 timestamp: zx::BootInstant::from_nanos(300),
648 severity: Severity::Info.into_primitive(),
649 arguments: vec![Argument::other("msg", "again")],
650 })
651 .unwrap();
652 let end = encoder.inner().position() as usize;
653 let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
654 let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
655 header.set_tag(2);
656 msg3_bytes[0..8].copy_from_slice(header.as_bytes());
657 let msg3 =
658 FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
659
660 let inputs = vec![msg1, msg2, msg3];
661
662 let node = fuchsia_inspect::Node::default();
663 let accessor_stats = Arc::new(AccessorStats::new(node));
664 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
665
666 let packets: Vec<_> =
668 FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
669 .collect::<Vec<_>>()
670 .await
671 .into_iter()
672 .map(|r| {
673 let result = r.unwrap();
674 let mut buf = vec![0; result.size as usize];
675 result.vmo.read(&mut buf, 0).expect("reading vmo");
676 buf
677 })
678 .collect();
679
680 assert_eq!(packets.len(), 1);
681 let output = &packets[0];
682
683 let mut current_slice = output.as_slice();
684 let mut records = vec![];
685
686 while !current_slice.is_empty() {
687 let (record, remaining) =
688 diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
689 let header =
690 diagnostics_log_encoding::Header::read_from_bytes(¤t_slice[..8]).unwrap();
691 records.push((header, record));
692 current_slice = remaining;
693 }
694
695 assert_eq!(records.len(), 5);
696
697 assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
699 assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
700
701 assert_eq!(records[1].0.tag(), 1);
703 assert_eq!(records[1].1.arguments.len(), 1); assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
707 assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
708
709 assert_eq!(records[3].0.tag(), 2);
711 assert_eq!(records[3].1.arguments.len(), 1); assert_eq!(records[4].0.tag(), 2);
715 assert_eq!(records[4].1.arguments.len(), 1); }
717}