1use crate::error::ParseWarning;
6use crate::init::{InitRecord, Ticks};
7use crate::metadata::{
8 MetadataRecord, Provider, ProviderEventMetadataRecord, ProviderInfoMetadataRecord,
9 ProviderSectionMetadataRecord, TraceInfoMetadataRecord,
10};
11use crate::string::{StringRecord, StringRef};
12use crate::thread::{ProcessKoid, ProcessRef, ThreadKoid, ThreadRecord, ThreadRef};
13use crate::{ParseError, ParsedWithOriginalBytes, RawTraceRecord, TraceRecord};
14use flyweights::FlyStr;
15use futures::{AsyncRead, AsyncReadExt, SinkExt, Stream};
16use std::collections::BTreeMap;
17use std::num::{NonZeroU16, NonZeroU8};
18use std::sync::Mutex;
19
20pub fn parse_full_session<'a>(
21 buf: &'a [u8],
22) -> Result<(Vec<TraceRecord>, Vec<ParseWarning>), ParseError> {
23 let mut parser = SessionParser::new(std::io::Cursor::new(buf));
24 let mut records = vec![];
25 while let Some(record) = parser.next() {
26 records.push(record?);
27 }
28 Ok((records, parser.take_warnings()))
29}
30
31#[derive(Debug)]
32pub struct SessionParser<R> {
33 buffer: Vec<u8>,
34 reader: R,
35 resolver: ResolveCtx,
36 reader_is_eof: bool,
37 have_seen_magic_number: bool,
38 parsed_bytes: Vec<u8>,
39}
40
41impl<R: std::io::Read> SessionParser<R> {
42 pub fn new(reader: R) -> Self {
43 Self {
44 buffer: vec![],
45 reader,
46 resolver: ResolveCtx::new(),
47 reader_is_eof: false,
48 have_seen_magic_number: false,
49 parsed_bytes: vec![],
50 }
51 }
52}
53
54impl<R> SessionParser<R> {
55 pub fn take_warnings(&self) -> Vec<ParseWarning> {
56 self.resolver.take_warnings()
57 }
58
59 pub fn parsed_bytes(&self) -> &[u8] {
60 return &self.parsed_bytes;
61 }
62
63 fn parse_next(&mut self) -> ParseOutcome {
64 match RawTraceRecord::parse(&self.buffer) {
65 Ok((rem, ParsedWithOriginalBytes { parsed: raw_record, bytes })) => {
66 self.parsed_bytes.extend(bytes);
67 if raw_record.is_magic_number() {
69 self.have_seen_magic_number = true;
70 } else {
71 if !self.have_seen_magic_number {
72 return ParseOutcome::Error(ParseError::MissingMagicNumber);
73 }
74 }
75
76 let resolve_res = TraceRecord::resolve(&mut self.resolver, raw_record);
78
79 let unused_len = rem.len();
81 let parsed_len = self.buffer.len() - unused_len;
82 self.buffer.copy_within(parsed_len.., 0);
83 self.buffer.truncate(unused_len);
84
85 match resolve_res {
86 Ok(None) => ParseOutcome::Continue,
89 Ok(Some(resolved)) => ParseOutcome::GotRecord(resolved),
90 Err(e) => ParseOutcome::Error(e),
91 }
92 }
93 Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
94 self.buffer = vec![];
95 ParseOutcome::Error(e)
96 }
97 Err(nom::Err::Incomplete(needed)) => {
98 ParseOutcome::NeedMoreBytes(match needed {
99 nom::Needed::Unknown => 32768,
102 nom::Needed::Size(n) => n.into(),
103 })
104 }
105 }
106 }
107}
108
109enum ParseOutcome {
110 GotRecord(TraceRecord),
111 Continue,
112 Error(ParseError),
113 NeedMoreBytes(usize),
114}
115
116macro_rules! fill_buffer {
118 ($self:ident, $original_len:ident, $needed:ident, $bytes_read:expr) => {{
119 if $self.reader_is_eof {
120 return None;
123 } else {
124 let $original_len = $self.buffer.len();
125 $self.buffer.resize($original_len + $needed, 0);
126 let bytes_read = $bytes_read;
127 if bytes_read == 0 {
128 $self.reader_is_eof = true;
129 }
130 $self.buffer.truncate($original_len + bytes_read);
131 }
132 }};
133}
134
135impl<R: std::io::Read> Iterator for SessionParser<R> {
136 type Item = Result<TraceRecord, ParseError>;
137 fn next(&mut self) -> Option<Self::Item> {
138 self.parsed_bytes.clear();
140 loop {
141 match self.parse_next() {
142 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
143 ParseOutcome::Error(e) => return Some(Err(e)),
144 ParseOutcome::Continue => continue,
145 ParseOutcome::NeedMoreBytes(needed) => {
146 fill_buffer!(
147 self,
148 original_len,
149 needed,
150 match self.reader.read(&mut self.buffer[original_len..]) {
151 Ok(b) => b,
152 Err(e) => return Some(Err(ParseError::Io(e))),
153 }
154 );
155 }
156 }
157 }
158 }
159}
160
161impl<R: AsyncRead + Send + Unpin + 'static> SessionParser<R> {
162 pub fn new_async(
163 reader: R,
164 ) -> (impl Stream<Item = Result<TraceRecord, ParseError>>, fuchsia_async::Task<Vec<ParseWarning>>)
165 {
166 let (mut send, recv) = futures::channel::mpsc::channel(1);
168 let pump_task = fuchsia_async::Task::spawn(async move {
169 let mut parser = Self {
170 buffer: vec![],
171 reader,
172 resolver: ResolveCtx::new(),
173 reader_is_eof: false,
174 have_seen_magic_number: false,
175 parsed_bytes: vec![],
176 };
177
178 while let Some(next) = parser.next_async().await {
179 if send.send(next).await.is_err() {
180 break;
182 }
183 }
184
185 parser.take_warnings()
186 });
187
188 (recv, pump_task)
189 }
190
191 pub async fn next_async(&mut self) -> Option<Result<TraceRecord, ParseError>> {
192 self.parsed_bytes.clear();
194 loop {
195 match self.parse_next() {
196 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
197 ParseOutcome::Error(e) => return Some(Err(e)),
198 ParseOutcome::Continue => continue,
199 ParseOutcome::NeedMoreBytes(needed) => {
200 fill_buffer!(
201 self,
202 original_len,
203 needed,
204 match self.reader.read(&mut self.buffer[original_len..]).await {
205 Ok(b) => b,
206 Err(e) => return Some(Err(ParseError::Io(e))),
207 }
208 );
209 }
210 }
211 }
212 }
213}
214
215#[derive(Debug)]
216pub(crate) struct ResolveCtx {
217 ticks_per_second: u64,
218 current_provider: Option<Provider>,
219 providers: BTreeMap<u32, FlyStr>,
220 strings: BTreeMap<NonZeroU16, FlyStr>,
221 threads: BTreeMap<NonZeroU8, (ProcessKoid, ThreadKoid)>,
222 warnings: Mutex<Vec<ParseWarning>>,
223}
224
225impl ResolveCtx {
226 pub fn new() -> Self {
227 Self {
228 ticks_per_second: 1,
229 current_provider: None,
230 providers: Default::default(),
231 strings: Default::default(),
232 threads: Default::default(),
233 warnings: Default::default(),
234 }
235 }
236
237 pub fn add_warning(&self, warning: ParseWarning) {
238 self.warnings.lock().expect("adding warning").push(warning);
239 }
240
241 pub fn take_warnings(&self) -> Vec<ParseWarning> {
242 let mut guard = self.warnings.lock().expect("taking warnings");
243 std::mem::replace(&mut *guard, Vec::new())
244 }
245
246 pub fn current_provider(&self) -> Option<Provider> {
247 self.current_provider.clone()
248 }
249
250 pub fn get_provider(&mut self, id: u32) -> Result<Provider, ParseError> {
251 let name = if let Some(name) = self.providers.get(&id).cloned() {
252 name
253 } else {
254 self.add_warning(ParseWarning::UnknownProviderId(id));
255 "<unknown>".into()
256 };
257
258 Ok(Provider { id, name })
259 }
260
261 pub fn on_metadata_record(
262 &mut self,
263 m: MetadataRecord,
264 ) -> Result<Option<TraceRecord>, ParseError> {
265 Ok(match m {
266 MetadataRecord::TraceInfo(TraceInfoMetadataRecord::MagicNumber) => None,
268
269 MetadataRecord::ProviderInfo(ProviderInfoMetadataRecord { provider_id, name }) => {
270 self.providers.insert(provider_id, name.clone());
271 self.current_provider = Some(Provider { id: provider_id, name: name });
272 None
273 }
274 MetadataRecord::ProviderSection(ProviderSectionMetadataRecord { provider_id }) => {
275 let new_provider = self.get_provider(provider_id)?;
276 self.current_provider = Some(new_provider);
277 None
278 }
279 MetadataRecord::ProviderEvent(ProviderEventMetadataRecord { provider_id, event }) => {
280 Some(TraceRecord::ProviderEvent {
281 provider: self.get_provider(provider_id)?,
282 event,
283 })
284 }
285 MetadataRecord::Unknown { raw_type } => {
286 self.add_warning(ParseWarning::UnknownMetadataRecordType(raw_type));
287 None
288 }
289 })
290 }
291
292 pub fn on_init_record(&mut self, InitRecord { ticks_per_second }: InitRecord) {
293 self.ticks_per_second = ticks_per_second;
294 }
295
296 pub fn on_string_record(&mut self, s: StringRecord<'_>) {
297 if let Some(idx) = NonZeroU16::new(s.index) {
298 self.strings.insert(idx, s.value.into());
299 } else {
300 self.add_warning(ParseWarning::RecordForZeroStringId);
301 }
302 }
303
304 pub fn on_thread_record(&mut self, t: ThreadRecord) {
305 self.threads.insert(t.index, (t.process_koid, t.thread_koid));
306 }
307
308 pub fn resolve_str(&self, s: StringRef<'_>) -> FlyStr {
309 match s {
310 StringRef::Empty => FlyStr::default(),
311 StringRef::Inline(inline) => FlyStr::from(inline),
312 StringRef::Index(id) => {
313 if let Some(s) = self.strings.get(&id).cloned() {
314 s
315 } else {
316 self.add_warning(ParseWarning::UnknownStringId(id));
317 "<unknown>".into()
318 }
319 }
320 }
321 }
322
323 pub fn resolve_process(&self, p: ProcessRef) -> ProcessKoid {
324 match p {
325 ProcessRef::Index(id) => {
326 if let Some(process) = self.threads.get(&id).map(|(process, _thread)| *process) {
327 process
328 } else {
329 self.add_warning(ParseWarning::UnknownProcessRef(p));
330 ProcessKoid(u64::MAX)
331 }
332 }
333 ProcessRef::Inline(inline) => inline,
334 }
335 }
336
337 pub fn resolve_thread(&self, t: ThreadRef) -> ThreadKoid {
338 match t {
339 ThreadRef::Index(id) => {
340 if let Some(thread) = self.threads.get(&id).map(|(_process, thread)| *thread) {
341 thread
342 } else {
343 self.add_warning(ParseWarning::UnknownThreadRef(t));
344 ThreadKoid(u64::MAX)
345 }
346 }
347 ThreadRef::Inline(inline) => inline,
348 }
349 }
350
351 pub fn resolve_ticks(&self, t: Ticks) -> i64 {
352 t.scale(self.ticks_per_second)
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use crate::event::{EventPayload, EventRecord};
360 use crate::fxt_builder::FxtBuilder;
361 use crate::scheduling::{LegacyContextSwitchEvent, SchedulingRecord, ThreadState};
362 use futures::{StreamExt, TryStreamExt};
363
364 static SIMPLE_TRACE_FXT: &[u8] =
365 include_bytes!("../../../../trace2json/test_data/simple_trace.fxt");
366
367 #[test]
368 fn test_parse_full_session() {
369 let session = parse_full_session(SIMPLE_TRACE_FXT).unwrap();
370 assert_eq!(session, expected_simple_trace_records());
371 }
372
373 #[fuchsia::test]
374 async fn test_async_parse() {
375 let (mut send_chunks, recv_chunks) = futures::channel::mpsc::unbounded();
376
377 let parse_trace_session = fuchsia_async::Task::spawn(async move {
378 let (records, parse_task) = SessionParser::new_async(recv_chunks.into_async_read());
379 let records = records.map(|res| res.unwrap()).collect::<Vec<_>>().await;
380 (records, parse_task.await)
381 });
382
383 for chunk in SIMPLE_TRACE_FXT.chunks(1) {
385 send_chunks.send(Ok(chunk)).await.unwrap();
386 }
387 drop(send_chunks);
388
389 assert_eq!(parse_trace_session.await, expected_simple_trace_records());
390 }
391
392 #[fuchsia::test]
393 fn session_with_unknown_record_in_middle() {
394 let mut session = vec![];
395
396 session.extend(&SIMPLE_TRACE_FXT[..8]);
399
400 let mut header = crate::BaseTraceHeader::empty();
402 header.set_raw_type(14); session.extend(FxtBuilder::new(header).atom(&(0u8..27u8).collect::<Vec<u8>>()).build());
404
405 session.extend(&SIMPLE_TRACE_FXT[8..]);
407
408 let (observed_parsed, observed_warnings) = parse_full_session(&session).unwrap();
409 let (expected_parsed, expected_warnings) =
410 (expected_simple_trace_records().0, vec![ParseWarning::UnknownTraceRecordType(14)]);
411 assert_eq!(observed_parsed, expected_parsed);
412 assert_eq!(observed_warnings, expected_warnings);
413 }
414
415 #[fuchsia::test]
416 fn session_with_invalid_record_in_middle() {
417 let mut session = vec![];
418 session.extend(&SIMPLE_TRACE_FXT[..8]);
421 let invalid_record = vec![
423 103, 0, 2, 15, 128, 1, 0, 0, 229, 253, 9, 0, 0, 0, 0, 0, 98, 105, 110, 100, 101, 114,
424 58, 57, 51, 54, 95, 68, 255, 255, 255, 0, 40, 0, 166, 0, 0, 0, 0, 0, 125, 125, 4, 0, 0,
425 0, 0, 0,
426 ];
427 session.extend(invalid_record);
428 session.extend(&SIMPLE_TRACE_FXT[8..]);
429 let mut parser = SessionParser::new(std::io::Cursor::new(session));
430 let mut records = vec![];
431 let mut had_error_record = false;
432 while let Some(record) = parser.next() {
433 match record {
434 Ok(record) => records.push(record),
435 Err(_) => had_error_record = true,
436 }
437 }
438 assert_eq!(records, expected_simple_trace_records().0);
441 assert_eq!(had_error_record, true);
442 }
443
444 #[fuchsia::test]
445 fn sessioninvalid_recordwith_incomplete_trailing_record() {
446 use crate::string::STRING_REF_INLINE_BIT;
447
448 let mut session = SIMPLE_TRACE_FXT.to_vec();
449
450 let category = "test_category";
452 let name = "test_instant";
453 let mut header = crate::event::EventHeader::empty();
454 header.set_category_ref(category.len() as u16 | STRING_REF_INLINE_BIT);
455 header.set_name_ref(name.len() as u16 | STRING_REF_INLINE_BIT);
456 header.set_event_type(crate::event::INSTANT_EVENT_TYPE);
457
458 let mut final_record = FxtBuilder::new(header)
459 .atom(2048u64.to_le_bytes()) .atom(512u64.to_le_bytes()) .atom(513u64.to_le_bytes()) .atom(category)
463 .atom(name)
464 .build();
465 let byte_to_make_valid = final_record.pop().unwrap();
466
467 for byte in final_record {
468 session.push(byte);
469 assert_eq!(
470 parse_full_session(&session).expect("should parse without final incomplete record"),
471 expected_simple_trace_records(),
472 );
473 }
474
475 let (mut expected_with_final_record, expected_warnings) = expected_simple_trace_records();
476 expected_with_final_record.push(TraceRecord::Event(EventRecord {
477 provider: Some(Provider { id: 1, name: "test_provider".into() }),
478 timestamp: 85333,
479 process: ProcessKoid(512),
480 thread: ThreadKoid(513),
481 category: category.into(),
482 name: name.into(),
483 args: vec![],
484 payload: EventPayload::Instant,
485 }));
486
487 session.push(byte_to_make_valid);
488 assert_eq!(
489 parse_full_session(&session).unwrap(),
490 (expected_with_final_record, expected_warnings)
491 );
492 }
493
494 fn expected_simple_trace_records() -> (Vec<TraceRecord>, Vec<ParseWarning>) {
495 (
496 vec![
497 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
498 LegacyContextSwitchEvent {
499 provider: Some(Provider { id: 1, name: "test_provider".into() }),
500 timestamp: 41,
501 cpu_id: 0,
502 outgoing_thread_state: ThreadState::Suspended,
503 outgoing_process: ProcessKoid(4660),
504 outgoing_thread: ThreadKoid(17185),
505 outgoing_thread_priority: 0,
506 incoming_process: ProcessKoid(1000),
507 incoming_thread: ThreadKoid(1001),
508 incoming_thread_priority: 20,
509 },
510 )),
511 TraceRecord::Event(EventRecord {
512 provider: Some(Provider { id: 1, name: "test_provider".into() }),
513 timestamp: 0,
514 process: ProcessKoid(1000),
515 thread: ThreadKoid(1001),
516 category: "test".into(),
517 name: "begin_end_ref".into(),
518 args: vec![],
519 payload: EventPayload::DurationBegin,
520 }),
521 TraceRecord::Event(EventRecord {
522 provider: Some(Provider { id: 1, name: "test_provider".into() }),
523 timestamp: 110000000,
524 process: ProcessKoid(1000),
525 thread: ThreadKoid(1001),
526 category: "test".into(),
527 name: "complete_inline".into(),
528 args: vec![],
529 payload: EventPayload::DurationComplete { end_timestamp: 150000000 },
530 }),
531 TraceRecord::Event(EventRecord {
532 provider: Some(Provider { id: 1, name: "test_provider".into() }),
533 timestamp: 200000000,
534 process: ProcessKoid(1000),
535 thread: ThreadKoid(1001),
536 category: "test".into(),
537 name: "begin_end_inline".into(),
538 args: vec![],
539 payload: EventPayload::DurationBegin,
540 }),
541 TraceRecord::Event(EventRecord {
542 provider: Some(Provider { id: 1, name: "test_provider".into() }),
543 timestamp: 450000000,
544 process: ProcessKoid(1000),
545 thread: ThreadKoid(1001),
546 category: "test".into(),
547 name: "begin_end_inline".into(),
548 args: vec![],
549 payload: EventPayload::DurationEnd,
550 }),
551 TraceRecord::Event(EventRecord {
552 provider: Some(Provider { id: 1, name: "test_provider".into() }),
553 timestamp: 100000000,
554 process: ProcessKoid(1000),
555 thread: ThreadKoid(1001),
556 category: "test".into(),
557 name: "complete_ref".into(),
558 args: vec![],
559 payload: EventPayload::DurationComplete { end_timestamp: 500000000 },
560 }),
561 TraceRecord::Event(EventRecord {
562 provider: Some(Provider { id: 1, name: "test_provider".into() }),
563 timestamp: 500000208,
564 process: ProcessKoid(1000),
565 thread: ThreadKoid(1001),
566 category: "test".into(),
567 name: "async".into(),
568 args: vec![],
569 payload: EventPayload::AsyncBegin { id: 1 },
570 }),
571 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
572 LegacyContextSwitchEvent {
573 provider: Some(Provider { id: 1, name: "test_provider".into() }),
574 timestamp: 500000416,
575 cpu_id: 0,
576 outgoing_thread_state: ThreadState::Suspended,
577 outgoing_process: ProcessKoid(1000),
578 outgoing_thread: ThreadKoid(1001),
579 outgoing_thread_priority: 20,
580 incoming_process: ProcessKoid(1000),
581 incoming_thread: ThreadKoid(1002),
582 incoming_thread_priority: 20,
583 },
584 )),
585 TraceRecord::Event(EventRecord {
586 provider: Some(Provider { id: 1, name: "test_provider".into() }),
587 timestamp: 500000458,
588 process: ProcessKoid(1000),
589 thread: ThreadKoid(1002),
590 category: "test".into(),
591 name: "complete_ref".into(),
592 args: vec![],
593 payload: EventPayload::DurationComplete { end_timestamp: 600000000 },
594 }),
595 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
596 LegacyContextSwitchEvent {
597 provider: Some(Provider { id: 1, name: "test_provider".into() }),
598 timestamp: 600010666,
599 cpu_id: 0,
600 outgoing_thread_state: ThreadState::Suspended,
601 outgoing_process: ProcessKoid(1000),
602 outgoing_thread: ThreadKoid(1002),
603 outgoing_thread_priority: 20,
604 incoming_process: ProcessKoid(1000),
605 incoming_thread: ThreadKoid(1001),
606 incoming_thread_priority: 20,
607 },
608 )),
609 TraceRecord::Event(EventRecord {
610 provider: Some(Provider { id: 1, name: "test_provider".into() }),
611 timestamp: 600016000,
612 process: ProcessKoid(1000),
613 thread: ThreadKoid(1001),
614 category: "test".into(),
615 name: "async".into(),
616 args: vec![],
617 payload: EventPayload::AsyncEnd { id: 1 },
618 }),
619 TraceRecord::Event(EventRecord {
620 provider: Some(Provider { id: 1, name: "test_provider".into() }),
621 timestamp: 630000000,
622 process: ProcessKoid(1000),
623 thread: ThreadKoid(1001),
624 category: "test".into(),
625 name: "begin_end_ref".into(),
626 args: vec![],
627 payload: EventPayload::DurationBegin,
628 }),
629 TraceRecord::Event(EventRecord {
630 provider: Some(Provider { id: 1, name: "test_provider".into() }),
631 timestamp: 950000000,
632 process: ProcessKoid(1000),
633 thread: ThreadKoid(1001),
634 category: "test".into(),
635 name: "begin_end_ref".into(),
636 args: vec![],
637 payload: EventPayload::DurationEnd,
638 }),
639 TraceRecord::Event(EventRecord {
640 provider: Some(Provider { id: 1, name: "test_provider".into() }),
641 timestamp: 1000000000,
642 process: ProcessKoid(1000),
643 thread: ThreadKoid(1001),
644 category: "test".into(),
645 name: "begin_end_ref".into(),
646 args: vec![],
647 payload: EventPayload::DurationEnd,
648 }),
649 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
650 LegacyContextSwitchEvent {
651 provider: Some(Provider { id: 1, name: "test_provider".into() }),
652 timestamp: 1000000666,
653 cpu_id: 0,
654 outgoing_thread_state: ThreadState::Suspended,
655 outgoing_process: ProcessKoid(1000),
656 outgoing_thread: ThreadKoid(1001),
657 outgoing_thread_priority: 20,
658 incoming_process: ProcessKoid(4660),
659 incoming_thread: ThreadKoid(17185),
660 incoming_thread_priority: 0,
661 },
662 )),
663 ],
664 vec![],
666 )
667 }
668}