1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::servers::{ExtendRecordOpts, extend_fxt_record};
7use crate::logs::shared_buffer::FxtMessage;
8use fidl_fuchsia_diagnostics::{
9 DataType, Format, FormattedContent, MAXIMUM_ENTRIES_PER_BATCH, StreamMode,
10};
11
12use futures::{Stream, StreamExt};
13use log::warn;
14use pin_project::pin_project;
15use serde::Serialize;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll, ready};
19use zx;
20
21static SERIALIZED_DATA_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-serialized-data");
22static PACKET_BUFFER_VMO_NAME: zx::Name = zx::Name::new_lossy("archivist-packet-buffer");
23
24pub type FormattedStream =
25 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
26
27#[pin_project]
28pub struct FormattedContentBatcher<C> {
29 #[pin]
30 items: C,
31 stats: Arc<BatchIteratorConnectionStats>,
32}
33
34pub fn new_batcher<I, T, E>(
42 items: I,
43 stats: Arc<BatchIteratorConnectionStats>,
44 mode: StreamMode,
45) -> FormattedStream
46where
47 I: Stream<Item = Result<T, E>> + Send + 'static,
48 T: Into<FormattedContent> + Send,
49 E: Into<AccessorError> + Send,
50{
51 match mode {
52 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
53 Box::pin(FormattedContentBatcher {
54 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
55 stats,
56 })
57 }
58 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
59 items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
60 stats,
61 }),
62 }
63}
64
65impl<I, T, E> Stream for FormattedContentBatcher<I>
66where
67 I: Stream<Item = Vec<Result<T, E>>>,
68 T: Into<FormattedContent>,
69 E: Into<AccessorError>,
70{
71 type Item = Vec<Result<FormattedContent, AccessorError>>;
72
73 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 let this = self.project();
75 match this.items.poll_next(cx) {
76 Poll::Ready(Some(chunk)) => {
77 let mut batch = vec![];
79 for item in chunk {
80 let result = match item {
81 Ok(i) => Ok(i.into()),
82 Err(e) => {
83 this.stats.add_result_error();
84 Err(e.into())
85 }
86 };
87 batch.push(result);
88 }
89 Poll::Ready(Some(batch))
90 }
91 Poll::Ready(None) => Poll::Ready(None),
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97pub struct SerializedVmo {
99 pub vmo: zx::Vmo,
100 pub size: u64,
101 format: Format,
102}
103
104impl SerializedVmo {
105 pub fn serialize(
106 source: &impl Serialize,
107 data_type: DataType,
108 format: Format,
109 ) -> Result<Self, AccessorError> {
110 let initial_buffer_capacity = match data_type {
111 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES,
112 DataType::Logs => 4096, };
116 let mut buffer = Vec::with_capacity(initial_buffer_capacity);
117 match format {
118 Format::Json => {
119 serde_json::to_writer(&mut buffer, source).map_err(AccessorError::Serialization)?
120 }
121 Format::Cbor => ciborium::into_writer(source, &mut buffer)
122 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
123 Format::Text => unreachable!("We'll never get Text"),
124 Format::Fxt => unreachable!("We'll never get FXT"),
125 }
126 let vmo = zx::Vmo::create(buffer.len() as u64).unwrap();
127 let _ = vmo.set_name(&SERIALIZED_DATA_VMO_NAME);
128 vmo.write(&buffer, 0).unwrap();
129 Ok(Self { vmo, size: buffer.len() as u64, format })
130 }
131}
132
133impl From<SerializedVmo> for FormattedContent {
134 fn from(content: SerializedVmo) -> FormattedContent {
135 match content.format {
136 Format::Json => {
137 content
139 .vmo
140 .set_content_size(&content.size)
141 .expect("set_content_size always returns Ok");
142 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
143 vmo: content.vmo,
144 size: content.size,
145 })
146 }
147 Format::Cbor => {
148 content
149 .vmo
150 .set_content_size(&content.size)
151 .expect("set_content_size always returns Ok");
152 FormattedContent::Cbor(content.vmo)
153 }
154 Format::Fxt => {
155 content
156 .vmo
157 .set_content_size(&content.size)
158 .expect("set_content_size always returns Ok");
159 FormattedContent::Fxt(content.vmo)
160 }
161 Format::Text => unreachable!("We'll never get Text"),
162 }
163 }
164}
165
166trait PacketFormat {
167 const FORMAT: Format;
168 const HEADER: &[u8] = &[];
169 const FOOTER: &[u8] = &[];
170
171 fn write_item(
175 self: Pin<&mut Self>,
176 cx: &mut Context<'_>,
177 first: bool,
178 buffer: &mut Vec<u8>,
179 ) -> Poll<Option<usize>>;
180}
181
182#[pin_project]
183pub struct PacketSerializer<T> {
184 stats: Option<Arc<BatchIteratorConnectionStats>>,
185 max_packet_size: u64,
186 #[pin]
187 format: T,
188 overflow: Vec<u8>,
189 finished: bool,
190}
191
192impl<T> PacketSerializer<T> {
193 fn with_format(
194 stats: Option<Arc<BatchIteratorConnectionStats>>,
195 max_packet_size: u64,
196 format: T,
197 ) -> Self {
198 Self { stats, max_packet_size, format, overflow: Vec::new(), finished: false }
199 }
200}
201
202impl<T: PacketFormat> Stream for PacketSerializer<T> {
203 type Item = Result<SerializedVmo, AccessorError>;
204
205 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206 if self.finished {
207 return Poll::Ready(None);
208 }
209
210 const MAX_PACKET_SIZE_LIMIT: u64 = 1 << 20; let max_packet_size = std::cmp::min(self.max_packet_size, MAX_PACKET_SIZE_LIMIT);
213 let mut this = self.project();
214
215 let mut buffer = Vec::with_capacity(256 * 1024);
216 buffer.extend_from_slice(T::HEADER);
217
218 let mut first = true;
219
220 if !this.overflow.is_empty() {
221 buffer.append(this.overflow);
222 first = false;
223 if let Some(stats) = &this.stats {
224 stats.add_result();
225 }
226 }
227
228 let mut vmo = None;
229 let mut vmo_len = 0;
230
231 loop {
232 if buffer.capacity() - buffer.len() < 512 {
234 vmo.get_or_insert_with(|| {
235 let v = zx::Vmo::create(max_packet_size).unwrap();
236 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
237 v
238 })
239 .write(&buffer, vmo_len as u64)
240 .unwrap();
241 vmo_len += buffer.len();
242 buffer.clear();
243 }
244
245 let last_len = buffer.len();
246
247 let separator_len = match this.format.as_mut().write_item(cx, first, &mut buffer) {
248 Poll::Ready(Some(separator_len)) => separator_len,
249 Poll::Ready(None) => {
250 *this.finished = true;
251 if first {
252 return Poll::Ready(None);
253 } else {
254 break;
255 }
256 }
257 Poll::Pending => {
258 if first {
259 return Poll::Pending;
260 } else {
261 break;
262 }
263 }
264 };
265
266 let item_len = buffer.len() - last_len - separator_len;
267
268 if (item_len + T::HEADER.len() + T::FOOTER.len()) as u64 >= max_packet_size {
269 warn!("dropping oversize item (limit={max_packet_size} len={item_len})");
270 buffer.truncate(last_len);
271 } else {
272 if (vmo_len + buffer.len() + T::FOOTER.len()) as u64 > max_packet_size {
273 assert!(!first);
277 this.overflow.extend_from_slice(&buffer[last_len + separator_len..]);
278 buffer.truncate(last_len);
279 break;
280 }
281
282 first = false;
283
284 if let Some(stats) = &this.stats {
285 stats.add_result();
286 }
287 }
288 }
289
290 buffer.extend_from_slice(T::FOOTER);
291
292 let vmo = match vmo {
293 Some(vmo) => {
294 vmo.set_stream_size((vmo_len + buffer.len()) as u64).unwrap();
295 vmo
296 }
297 None => {
298 let v = zx::Vmo::create(buffer.len() as u64).unwrap();
299 let _ = v.set_name(&PACKET_BUFFER_VMO_NAME);
300 v
301 }
302 };
303 vmo.write(&buffer, vmo_len as u64).unwrap();
304 vmo_len += buffer.len();
305 Poll::Ready(Some(Ok(SerializedVmo { vmo, size: vmo_len as u64, format: T::FORMAT })))
306 }
307}
308
309#[pin_project]
310pub struct FxtPacketFormat<I> {
311 #[pin]
312 pub stream: I,
313 pub subscribe_to_manifest: bool,
314 pub sent_tags: std::collections::HashMap<u64, Arc<crate::identity::ComponentIdentity>>,
315}
316
317impl<I: Stream<Item = FxtMessage>> PacketFormat for FxtPacketFormat<I> {
318 const FORMAT: Format = Format::Fxt;
319
320 fn write_item(
321 self: Pin<&mut Self>,
322 cx: &mut Context<'_>,
323 _first: bool,
324 buffer: &mut Vec<u8>,
325 ) -> Poll<Option<usize>> {
326 let this = self.project();
327 if let Some(item) = ready!(this.stream.poll_next(cx)) {
328 if *this.subscribe_to_manifest {
329 let tag = item.tag();
330 let identity = item.component_identity();
331 let send_manifest = match this.sent_tags.entry(tag) {
332 std::collections::hash_map::Entry::Vacant(e) => {
333 e.insert(Arc::clone(identity));
334 true
335 }
336 std::collections::hash_map::Entry::Occupied(mut e) => {
337 if !Arc::ptr_eq(e.get(), identity) && **e.get() != **identity {
338 e.insert(Arc::clone(identity));
339 true
340 } else {
341 false
342 }
343 }
344 };
345
346 if send_manifest {
347 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
348 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
349 use fidl_fuchsia_diagnostics_types::Severity;
350 use std::io::Cursor;
351 use zerocopy::{FromBytes, IntoBytes};
352
353 let mut encoder = Encoder::new(
354 Cursor::new(ResizableBuffer::from(Vec::new())),
355 EncoderOpts::default(),
356 );
357 let record = Record {
358 timestamp: zx::BootInstant::from_nanos(0),
359 severity: Severity::Info.into_primitive(),
360 arguments: vec![
361 Argument::other("moniker", identity.moniker.to_string()),
362 Argument::other("url", identity.url.as_str()),
363 ],
364 };
365 encoder.write_record(record).unwrap();
366 let mut manifest_buffer = encoder.take().into_inner().into_inner();
367 if manifest_buffer.len() >= 8 {
368 let mut header = Header::read_from_bytes(&manifest_buffer[0..8]).unwrap();
369 header.set_tag((tag as u32) | LOG_CONTROL_BIT);
370 manifest_buffer[0..8].copy_from_slice(header.as_bytes());
371 }
372 buffer.extend_from_slice(&manifest_buffer);
373 }
374 }
375
376 buffer.extend_from_slice(item.data());
377
378 extend_fxt_record(
380 item.component_identity(),
381 item.dropped(),
382 &ExtendRecordOpts {
383 component_url: !*this.subscribe_to_manifest,
384 moniker: !*this.subscribe_to_manifest,
385 rolled_out: !*this.subscribe_to_manifest,
386 subscribe_to_manifest: false,
387 },
388 buffer,
389 );
390 Poll::Ready(Some(0))
391 } else {
392 Poll::Ready(None)
393 }
394 }
395}
396
397pub type FxtPacketSerializer<I> = PacketSerializer<FxtPacketFormat<I>>;
398
399impl<I> FxtPacketSerializer<I> {
400 pub fn new(
401 stats: Arc<BatchIteratorConnectionStats>,
402 max_packet_size: u64,
403 items: I,
404 subscribe_to_manifest: bool,
405 ) -> Self {
406 Self::with_format(
407 Some(stats),
408 max_packet_size,
409 FxtPacketFormat {
410 stream: items,
411 subscribe_to_manifest,
412 sent_tags: std::collections::HashMap::new(),
413 },
414 )
415 }
416}
417
418#[pin_project]
419pub struct JsonPacketFormat<I>(#[pin] I);
420
421impl<I: Stream<Item = impl Serialize>> PacketFormat for JsonPacketFormat<I> {
422 const FORMAT: Format = Format::Json;
423 const HEADER: &[u8] = b"[";
424 const FOOTER: &[u8] = b"]";
425
426 fn write_item(
427 self: Pin<&mut Self>,
428 cx: &mut Context<'_>,
429 first: bool,
430 buffer: &mut Vec<u8>,
431 ) -> Poll<Option<usize>> {
432 const SEPARATOR: &[u8] = b",\n";
433
434 if let Some(item) = ready!(self.project().0.poll_next(cx)) {
435 let separator_len = if !first {
436 buffer.extend_from_slice(SEPARATOR);
437 SEPARATOR.len()
438 } else {
439 0
440 };
441 serde_json::to_writer(buffer, &item).expect("failed to serialize item");
444 Poll::Ready(Some(separator_len))
445 } else {
446 Poll::Ready(None)
447 }
448 }
449}
450
451pub type JsonPacketSerializer<I> = PacketSerializer<JsonPacketFormat<I>>;
452
453impl<I> JsonPacketSerializer<I> {
454 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
455 Self::with_format(Some(stats), max_packet_size, JsonPacketFormat(items))
456 }
457
458 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
459 Self::with_format(None, max_packet_size, JsonPacketFormat(items))
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use crate::diagnostics::AccessorStats;
467 use futures::stream::iter;
468
469 #[fuchsia::test]
470 async fn two_items_joined_and_split() {
471 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
472 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
473 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
474 let smallest_possible_joined_len = joined[0].len() as u64;
475
476 let make_packets = |max| async move {
477 let node = fuchsia_inspect::Node::default();
478 let accessor_stats = Arc::new(AccessorStats::new(node));
479 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
480 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
481 .collect::<Vec<_>>()
482 .await
483 .into_iter()
484 .map(|r| {
485 let result = r.unwrap();
486 let mut buf = vec![0; result.size as usize];
487 result.vmo.read(&mut buf, 0).expect("reading vmo");
488 std::str::from_utf8(&buf).unwrap().to_string()
489 })
490 .collect::<Vec<_>>()
491 };
492
493 let actual_joined = make_packets(smallest_possible_joined_len).await;
494 assert_eq!(&actual_joined[..], joined);
495
496 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
497 assert_eq!(&actual_split[..], split);
498 }
499
500 #[fuchsia::test]
501 async fn overflow_separator_added() {
502 let inputs = &[&"A", &"B", &"C"];
503 let make_packets = |max| async move {
512 let node = fuchsia_inspect::Node::default();
513 let accessor_stats = Arc::new(AccessorStats::new(node));
514 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
515 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
516 .collect::<Vec<_>>()
517 .await
518 .into_iter()
519 .map(|r| {
520 let result = r.unwrap();
521 let mut buf = vec![0; result.size as usize];
522 result.vmo.read(&mut buf, 0).expect("reading vmo");
523 std::str::from_utf8(&buf).unwrap().to_string()
524 })
525 .collect::<Vec<_>>()
526 };
527
528 let packets = make_packets(8).await;
529 assert_eq!(packets.len(), 3);
530 assert_eq!(packets[0], r#"["A"]"#);
531 assert_eq!(packets[1], r#"["B"]"#);
532 assert_eq!(packets[2], r#"["C"]"#);
533
534 let packets = make_packets(10).await;
535 assert_eq!(packets.len(), 2);
536 assert_eq!(packets[0], "[\"A\",\n\"B\"]");
537 assert_eq!(packets[1], r#"["C"]"#);
538 }
539
540 #[fuchsia::test]
541 async fn oversize_item_not_dropped_incorrectly() {
542 let inputs = &[&"A", &"BCDEF"];
543 let make_packets = |max| async move {
554 let node = fuchsia_inspect::Node::default();
555 let accessor_stats = Arc::new(AccessorStats::new(node));
556 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
557 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
558 .collect::<Vec<_>>()
559 .await
560 .into_iter()
561 .map(|r| {
562 let result = r.unwrap();
563 let mut buf = vec![0; result.size as usize];
564 result.vmo.read(&mut buf, 0).expect("reading vmo");
565 std::str::from_utf8(&buf).unwrap().to_string()
566 })
567 .collect::<Vec<_>>()
568 };
569
570 let packets = make_packets(11).await;
571 assert_eq!(packets.len(), 2);
572 assert_eq!(packets[0], r#"["A"]"#);
573 assert_eq!(packets[1], r#"["BCDEF"]"#);
574 }
575
576 #[fuchsia::test]
577 async fn item_too_big_for_packet_is_dropped() {
578 let inputs = &[&"ABCDE"]; let make_packets = |max| async move {
583 let node = fuchsia_inspect::Node::default();
584 let accessor_stats = Arc::new(AccessorStats::new(node));
585 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
586 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
587 .collect::<Vec<_>>()
588 .await
589 .into_iter()
590 .map(|r| {
591 let result = r.unwrap();
592 let mut buf = vec![0; result.size as usize];
593 result.vmo.read(&mut buf, 0).expect("reading vmo");
594 std::str::from_utf8(&buf).unwrap().to_string()
595 })
596 .collect::<Vec<_>>()
597 };
598
599 let packets = make_packets(8).await;
600 assert_eq!(packets.len(), 0);
602 }
603
604 #[fuchsia::test]
605 async fn fxt_packet_serializer_subscribe_to_manifest() {
606 use crate::identity::ComponentIdentity;
607 use diagnostics_log_encoding::encode::{Encoder, EncoderOpts};
608 use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, Record};
609 use fidl_fuchsia_diagnostics_types::Severity;
610 use std::io::Cursor;
611 use zerocopy::{FromBytes, IntoBytes};
612
613 let identity1 = Arc::new(ComponentIdentity::unknown());
614 let mut identity2_inner = ComponentIdentity::unknown();
615 identity2_inner.url =
616 flyweights::FlyStr::new("fuchsia-pkg://fuchsia.com/test#meta/test.cm");
617 let identity2 = Arc::new(identity2_inner);
618
619 let mut buffer = Cursor::new(vec![0u8; 128]);
621 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
622 encoder
623 .write_record(Record {
624 timestamp: zx::BootInstant::from_nanos(100),
625 severity: Severity::Info.into_primitive(),
626 arguments: vec![Argument::other("msg", "hello")],
627 })
628 .unwrap();
629 let end = encoder.inner().position() as usize;
630 let mut msg1_bytes = encoder.inner().get_ref()[..end].to_vec();
631 let mut header = Header::read_from_bytes(&msg1_bytes[0..8]).unwrap();
632 header.set_tag(1);
633 msg1_bytes[0..8].copy_from_slice(header.as_bytes());
634 let msg1 =
635 FxtMessage::new_test(msg1_bytes.into_boxed_slice(), 0, Arc::clone(&identity1), 1);
636
637 let mut buffer = Cursor::new(vec![0u8; 128]);
639 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
640 encoder
641 .write_record(Record {
642 timestamp: zx::BootInstant::from_nanos(200),
643 severity: Severity::Info.into_primitive(),
644 arguments: vec![Argument::other("msg", "world")],
645 })
646 .unwrap();
647 let end = encoder.inner().position() as usize;
648 let mut msg2_bytes = encoder.inner().get_ref()[..end].to_vec();
649 let mut header = Header::read_from_bytes(&msg2_bytes[0..8]).unwrap();
650 header.set_tag(2);
651 msg2_bytes[0..8].copy_from_slice(header.as_bytes());
652 let msg2 =
653 FxtMessage::new_test(msg2_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
654
655 let mut buffer = Cursor::new(vec![0u8; 128]);
657 let mut encoder = Encoder::new(&mut buffer, EncoderOpts::default());
658 encoder
659 .write_record(Record {
660 timestamp: zx::BootInstant::from_nanos(300),
661 severity: Severity::Info.into_primitive(),
662 arguments: vec![Argument::other("msg", "again")],
663 })
664 .unwrap();
665 let end = encoder.inner().position() as usize;
666 let mut msg3_bytes = encoder.inner().get_ref()[..end].to_vec();
667 let mut header = Header::read_from_bytes(&msg3_bytes[0..8]).unwrap();
668 header.set_tag(2);
669 msg3_bytes[0..8].copy_from_slice(header.as_bytes());
670 let msg3 =
671 FxtMessage::new_test(msg3_bytes.into_boxed_slice(), 0, Arc::clone(&identity2), 2);
672
673 let inputs = vec![msg1, msg2, msg3];
674
675 let node = fuchsia_inspect::Node::default();
676 let accessor_stats = Arc::new(AccessorStats::new(node));
677 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
678
679 let packets: Vec<_> =
681 FxtPacketSerializer::new(Arc::clone(&test_stats), 1024 * 1024, iter(inputs), true)
682 .collect::<Vec<_>>()
683 .await
684 .into_iter()
685 .map(|r| {
686 let result = r.unwrap();
687 let mut buf = vec![0; result.size as usize];
688 result.vmo.read(&mut buf, 0).expect("reading vmo");
689 buf
690 })
691 .collect();
692
693 assert_eq!(packets.len(), 1);
694 let output = &packets[0];
695
696 let mut current_slice = output.as_slice();
697 let mut records = vec![];
698
699 while !current_slice.is_empty() {
700 let (record, remaining) =
701 diagnostics_log_encoding::parse::parse_record(current_slice).unwrap();
702 let header =
703 diagnostics_log_encoding::Header::read_from_bytes(¤t_slice[..8]).unwrap();
704 records.push((header, record));
705 current_slice = remaining;
706 }
707
708 assert_eq!(records.len(), 5);
709
710 assert_ne!(records[0].0.tag() & LOG_CONTROL_BIT, 0);
712 assert_eq!(records[0].0.tag() & !LOG_CONTROL_BIT, 1);
713
714 assert_eq!(records[1].0.tag(), 1);
716 assert_eq!(records[1].1.arguments.len(), 1); assert_ne!(records[2].0.tag() & LOG_CONTROL_BIT, 0);
720 assert_eq!(records[2].0.tag() & !LOG_CONTROL_BIT, 2);
721
722 assert_eq!(records[3].0.tag(), 2);
724 assert_eq!(records[3].1.arguments.len(), 1); assert_eq!(records[4].0.tag(), 2);
728 assert_eq!(records[4].1.arguments.len(), 1); }
730}