1use crate::error::ParseWarning;
6use crate::init::{InitRecord, Ticks};
7use crate::metadata::{
8 MetadataRecord, Provider, ProviderEventMetadataRecord, ProviderInfoMetadataRecord,
9 ProviderSectionMetadataRecord, TraceInfoMetadataRecord,
10};
11use crate::string::{StringRecord, StringRef};
12use crate::thread::{ProcessKoid, ProcessRef, ThreadKoid, ThreadRecord, ThreadRef};
13use crate::{ParseError, ParsedWithOriginalBytes, RawTraceRecord, TraceRecord};
14use flyweights::FlyStr;
15use futures::{AsyncRead, AsyncReadExt, SinkExt, Stream};
16use std::collections::BTreeMap;
17use std::num::{NonZeroU16, NonZeroU8};
18
19pub fn parse_full_session<'a>(
20 buf: &'a [u8],
21) -> Result<(Vec<TraceRecord>, Vec<ParseWarning>), ParseError> {
22 let mut parser = SessionParser::new(std::io::Cursor::new(buf));
23 let mut records = vec![];
24 while let Some(record) = parser.next() {
25 records.push(record?);
26 }
27 Ok((records, parser.warnings().to_owned()))
28}
29
30#[derive(Debug, PartialEq)]
31pub struct SessionParser<R> {
32 buffer: Vec<u8>,
33 reader: R,
34 resolver: ResolveCtx,
35 reader_is_eof: bool,
36 have_seen_magic_number: bool,
37 parsed_bytes: Vec<u8>,
38}
39
40impl<R: std::io::Read> SessionParser<R> {
41 pub fn new(reader: R) -> Self {
42 Self {
43 buffer: vec![],
44 reader,
45 resolver: ResolveCtx::new(),
46 reader_is_eof: false,
47 have_seen_magic_number: false,
48 parsed_bytes: vec![],
49 }
50 }
51}
52
53impl<R> SessionParser<R> {
54 pub fn warnings(&self) -> &[ParseWarning] {
55 self.resolver.warnings()
56 }
57
58 pub fn parsed_bytes(&self) -> &[u8] {
59 return &self.parsed_bytes;
60 }
61
62 fn parse_next(&mut self) -> ParseOutcome {
63 match RawTraceRecord::parse(&self.buffer) {
64 Ok((rem, ParsedWithOriginalBytes { parsed: raw_record, bytes })) => {
65 self.parsed_bytes.extend(bytes);
66 if raw_record.is_magic_number() {
68 self.have_seen_magic_number = true;
69 } else {
70 if !self.have_seen_magic_number {
71 return ParseOutcome::Error(ParseError::MissingMagicNumber);
72 }
73 }
74
75 let resolve_res = TraceRecord::resolve(&mut self.resolver, raw_record);
77
78 let unused_len = rem.len();
80 let parsed_len = self.buffer.len() - unused_len;
81 self.buffer.copy_within(parsed_len.., 0);
82 self.buffer.truncate(unused_len);
83
84 match resolve_res {
85 Ok(None) => ParseOutcome::Continue,
88 Ok(Some(resolved)) => ParseOutcome::GotRecord(resolved),
89 Err(e) => ParseOutcome::Error(e),
90 }
91 }
92 Err(nom::Err::Error(e) | nom::Err::Failure(e)) => ParseOutcome::Error(e),
93 Err(nom::Err::Incomplete(needed)) => {
94 ParseOutcome::NeedMoreBytes(match needed {
95 nom::Needed::Unknown => 32768,
98 nom::Needed::Size(n) => n.into(),
99 })
100 }
101 }
102 }
103}
104
105enum ParseOutcome {
106 GotRecord(TraceRecord),
107 Continue,
108 Error(ParseError),
109 NeedMoreBytes(usize),
110}
111
112macro_rules! fill_buffer {
114 ($self:ident, $original_len:ident, $needed:ident, $bytes_read:expr) => {{
115 if $self.reader_is_eof {
116 return None;
119 } else {
120 let $original_len = $self.buffer.len();
121 $self.buffer.resize($original_len + $needed, 0);
122 let bytes_read = $bytes_read;
123 if bytes_read == 0 {
124 $self.reader_is_eof = true;
125 }
126 $self.buffer.truncate($original_len + bytes_read);
127 }
128 }};
129}
130
131impl<R: std::io::Read> Iterator for SessionParser<R> {
132 type Item = Result<TraceRecord, ParseError>;
133 fn next(&mut self) -> Option<Self::Item> {
134 self.parsed_bytes.clear();
136 loop {
137 match self.parse_next() {
138 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
139 ParseOutcome::Error(e) => return Some(Err(e)),
140 ParseOutcome::Continue => continue,
141 ParseOutcome::NeedMoreBytes(needed) => {
142 fill_buffer!(
143 self,
144 original_len,
145 needed,
146 match self.reader.read(&mut self.buffer[original_len..]) {
147 Ok(b) => b,
148 Err(e) => return Some(Err(ParseError::Io(e))),
149 }
150 );
151 }
152 }
153 }
154 }
155}
156
157impl<R: AsyncRead + Send + Unpin + 'static> SessionParser<R> {
158 pub fn new_async(
159 reader: R,
160 ) -> (impl Stream<Item = Result<TraceRecord, ParseError>>, fuchsia_async::Task<Vec<ParseWarning>>)
161 {
162 let (mut send, recv) = futures::channel::mpsc::channel(1);
164 let pump_task = fuchsia_async::Task::spawn(async move {
165 let mut parser = Self {
166 buffer: vec![],
167 reader,
168 resolver: ResolveCtx::new(),
169 reader_is_eof: false,
170 have_seen_magic_number: false,
171 parsed_bytes: vec![],
172 };
173
174 while let Some(next) = parser.next_async().await {
175 if send.send(next).await.is_err() {
176 break;
178 }
179 }
180
181 parser.warnings().to_owned()
182 });
183
184 (recv, pump_task)
185 }
186
187 pub async fn next_async(&mut self) -> Option<Result<TraceRecord, ParseError>> {
188 self.parsed_bytes.clear();
190 loop {
191 match self.parse_next() {
192 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
193 ParseOutcome::Error(e) => return Some(Err(e)),
194 ParseOutcome::Continue => continue,
195 ParseOutcome::NeedMoreBytes(needed) => {
196 fill_buffer!(
197 self,
198 original_len,
199 needed,
200 match self.reader.read(&mut self.buffer[original_len..]).await {
201 Ok(b) => b,
202 Err(e) => return Some(Err(ParseError::Io(e))),
203 }
204 );
205 }
206 }
207 }
208 }
209}
210
211#[derive(Debug, PartialEq)]
212pub(crate) struct ResolveCtx {
213 ticks_per_second: u64,
214 current_provider: Option<Provider>,
215 providers: BTreeMap<u32, FlyStr>,
216 strings: BTreeMap<NonZeroU16, FlyStr>,
217 threads: BTreeMap<NonZeroU8, (ProcessKoid, ThreadKoid)>,
218 warnings: Vec<ParseWarning>,
219}
220
221impl ResolveCtx {
222 pub fn new() -> Self {
223 Self {
224 ticks_per_second: 1,
225 current_provider: None,
226 providers: Default::default(),
227 strings: Default::default(),
228 threads: Default::default(),
229 warnings: Default::default(),
230 }
231 }
232
233 pub fn add_warning(&mut self, warning: ParseWarning) {
234 self.warnings.push(warning);
235 }
236
237 pub fn warnings(&self) -> &[ParseWarning] {
238 &self.warnings
239 }
240
241 pub fn current_provider(&self) -> Option<Provider> {
242 self.current_provider.clone()
243 }
244
245 pub fn get_provider(&mut self, id: u32) -> Result<Provider, ParseError> {
246 let name = if let Some(name) = self.providers.get(&id).cloned() {
247 name
248 } else {
249 self.add_warning(ParseWarning::UnknownProviderId(id));
250 "<unknown>".into()
251 };
252
253 Ok(Provider { id, name })
254 }
255
256 pub fn on_metadata_record(
257 &mut self,
258 m: MetadataRecord,
259 ) -> Result<Option<TraceRecord>, ParseError> {
260 Ok(match m {
261 MetadataRecord::TraceInfo(TraceInfoMetadataRecord::MagicNumber) => None,
263
264 MetadataRecord::ProviderInfo(ProviderInfoMetadataRecord { provider_id, name }) => {
265 self.providers.insert(provider_id, name.clone());
266 self.current_provider = Some(Provider { id: provider_id, name: name });
267 None
268 }
269 MetadataRecord::ProviderSection(ProviderSectionMetadataRecord { provider_id }) => {
270 let new_provider = self.get_provider(provider_id)?;
271 self.current_provider = Some(new_provider);
272 None
273 }
274 MetadataRecord::ProviderEvent(ProviderEventMetadataRecord { provider_id, event }) => {
275 Some(TraceRecord::ProviderEvent {
276 provider: self.get_provider(provider_id)?,
277 event,
278 })
279 }
280 MetadataRecord::Unknown { raw_type } => {
281 self.add_warning(ParseWarning::UnknownMetadataRecordType(raw_type));
282 None
283 }
284 })
285 }
286
287 pub fn on_init_record(&mut self, InitRecord { ticks_per_second }: InitRecord) {
288 self.ticks_per_second = ticks_per_second;
289 }
290
291 pub fn on_string_record(&mut self, s: StringRecord<'_>) {
292 if let Some(idx) = NonZeroU16::new(s.index) {
293 self.strings.insert(idx, s.value.into());
294 } else {
295 self.add_warning(ParseWarning::RecordForZeroStringId);
296 }
297 }
298
299 pub fn on_thread_record(&mut self, t: ThreadRecord) {
300 self.threads.insert(t.index, (t.process_koid, t.thread_koid));
301 }
302
303 pub fn resolve_str(&mut self, s: StringRef<'_>) -> FlyStr {
304 match s {
305 StringRef::Empty => FlyStr::default(),
306 StringRef::Inline(inline) => FlyStr::from(inline),
307 StringRef::Index(id) => {
308 if let Some(s) = self.strings.get(&id).cloned() {
309 s
310 } else {
311 self.add_warning(ParseWarning::UnknownStringId(id));
312 "<unknown>".into()
313 }
314 }
315 }
316 }
317
318 pub fn resolve_process(&mut self, p: ProcessRef) -> ProcessKoid {
319 match p {
320 ProcessRef::Index(id) => {
321 if let Some(process) = self.threads.get(&id).map(|(process, _thread)| *process) {
322 process
323 } else {
324 self.add_warning(ParseWarning::UnknownProcessRef(p));
325 ProcessKoid(u64::MAX)
326 }
327 }
328 ProcessRef::Inline(inline) => inline,
329 }
330 }
331
332 pub fn resolve_thread(&mut self, t: ThreadRef) -> ThreadKoid {
333 match t {
334 ThreadRef::Index(id) => {
335 if let Some(thread) = self.threads.get(&id).map(|(_process, thread)| *thread) {
336 thread
337 } else {
338 self.warnings.push(ParseWarning::UnknownThreadRef(t));
339 ThreadKoid(u64::MAX)
340 }
341 }
342 ThreadRef::Inline(inline) => inline,
343 }
344 }
345
346 pub fn resolve_ticks(&self, t: Ticks) -> i64 {
347 t.scale(self.ticks_per_second)
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use crate::event::{EventPayload, EventRecord};
355 use crate::fxt_builder::FxtBuilder;
356 use crate::scheduling::{LegacyContextSwitchEvent, SchedulingRecord, ThreadState};
357 use futures::{StreamExt, TryStreamExt};
358
359 static SIMPLE_TRACE_FXT: &[u8] =
360 include_bytes!("../../../../trace2json/test_data/simple_trace.fxt");
361
362 #[test]
363 fn test_parse_full_session() {
364 let session = parse_full_session(SIMPLE_TRACE_FXT).unwrap();
365 assert_eq!(session, expected_simple_trace_records());
366 }
367
368 #[fuchsia::test]
369 async fn test_async_parse() {
370 let (mut send_chunks, recv_chunks) = futures::channel::mpsc::unbounded();
371
372 let parse_trace_session = fuchsia_async::Task::spawn(async move {
373 let (records, parse_task) = SessionParser::new_async(recv_chunks.into_async_read());
374 let records = records.map(|res| res.unwrap()).collect::<Vec<_>>().await;
375 (records, parse_task.await)
376 });
377
378 for chunk in SIMPLE_TRACE_FXT.chunks(1) {
380 send_chunks.send(Ok(chunk)).await.unwrap();
381 }
382 drop(send_chunks);
383
384 assert_eq!(parse_trace_session.await, expected_simple_trace_records());
385 }
386
387 #[fuchsia::test]
388 fn session_with_unknown_record_in_middle() {
389 let mut session = vec![];
390
391 session.extend(&SIMPLE_TRACE_FXT[..8]);
394
395 let mut header = crate::BaseTraceHeader::empty();
397 header.set_raw_type(14); session.extend(FxtBuilder::new(header).atom(&(0u8..27u8).collect::<Vec<u8>>()).build());
399
400 session.extend(&SIMPLE_TRACE_FXT[8..]);
402
403 let (observed_parsed, observed_warnings) = parse_full_session(&session).unwrap();
404 let (expected_parsed, expected_warnings) =
405 (expected_simple_trace_records().0, vec![ParseWarning::UnknownTraceRecordType(14)]);
406 assert_eq!(observed_parsed, expected_parsed);
407 assert_eq!(observed_warnings, expected_warnings);
408 }
409
410 #[fuchsia::test]
411 fn session_with_incomplete_trailing_record() {
412 use crate::string::STRING_REF_INLINE_BIT;
413
414 let mut session = SIMPLE_TRACE_FXT.to_vec();
415
416 let category = "test_category";
418 let name = "test_instant";
419 let mut header = crate::event::EventHeader::empty();
420 header.set_category_ref(category.len() as u16 | STRING_REF_INLINE_BIT);
421 header.set_name_ref(name.len() as u16 | STRING_REF_INLINE_BIT);
422 header.set_event_type(crate::event::INSTANT_EVENT_TYPE);
423
424 let mut final_record = FxtBuilder::new(header)
425 .atom(2048u64.to_le_bytes()) .atom(512u64.to_le_bytes()) .atom(513u64.to_le_bytes()) .atom(category)
429 .atom(name)
430 .build();
431 let byte_to_make_valid = final_record.pop().unwrap();
432
433 for byte in final_record {
434 session.push(byte);
435 assert_eq!(
436 parse_full_session(&session).expect("should parse without final incomplete record"),
437 expected_simple_trace_records(),
438 );
439 }
440
441 let (mut expected_with_final_record, expected_warnings) = expected_simple_trace_records();
442 expected_with_final_record.push(TraceRecord::Event(EventRecord {
443 provider: Some(Provider { id: 1, name: "test_provider".into() }),
444 timestamp: 85333,
445 process: ProcessKoid(512),
446 thread: ThreadKoid(513),
447 category: category.into(),
448 name: name.into(),
449 args: vec![],
450 payload: EventPayload::Instant,
451 }));
452
453 session.push(byte_to_make_valid);
454 assert_eq!(
455 parse_full_session(&session).unwrap(),
456 (expected_with_final_record, expected_warnings)
457 );
458 }
459
460 fn expected_simple_trace_records() -> (Vec<TraceRecord>, Vec<ParseWarning>) {
461 (
462 vec![
463 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
464 LegacyContextSwitchEvent {
465 provider: Some(Provider { id: 1, name: "test_provider".into() }),
466 timestamp: 41,
467 cpu_id: 0,
468 outgoing_thread_state: ThreadState::Suspended,
469 outgoing_process: ProcessKoid(4660),
470 outgoing_thread: ThreadKoid(17185),
471 outgoing_thread_priority: 0,
472 incoming_process: ProcessKoid(1000),
473 incoming_thread: ThreadKoid(1001),
474 incoming_thread_priority: 20,
475 },
476 )),
477 TraceRecord::Event(EventRecord {
478 provider: Some(Provider { id: 1, name: "test_provider".into() }),
479 timestamp: 0,
480 process: ProcessKoid(1000),
481 thread: ThreadKoid(1001),
482 category: "test".into(),
483 name: "begin_end_ref".into(),
484 args: vec![],
485 payload: EventPayload::DurationBegin,
486 }),
487 TraceRecord::Event(EventRecord {
488 provider: Some(Provider { id: 1, name: "test_provider".into() }),
489 timestamp: 110000000,
490 process: ProcessKoid(1000),
491 thread: ThreadKoid(1001),
492 category: "test".into(),
493 name: "complete_inline".into(),
494 args: vec![],
495 payload: EventPayload::DurationComplete { end_timestamp: 150000000 },
496 }),
497 TraceRecord::Event(EventRecord {
498 provider: Some(Provider { id: 1, name: "test_provider".into() }),
499 timestamp: 200000000,
500 process: ProcessKoid(1000),
501 thread: ThreadKoid(1001),
502 category: "test".into(),
503 name: "begin_end_inline".into(),
504 args: vec![],
505 payload: EventPayload::DurationBegin,
506 }),
507 TraceRecord::Event(EventRecord {
508 provider: Some(Provider { id: 1, name: "test_provider".into() }),
509 timestamp: 450000000,
510 process: ProcessKoid(1000),
511 thread: ThreadKoid(1001),
512 category: "test".into(),
513 name: "begin_end_inline".into(),
514 args: vec![],
515 payload: EventPayload::DurationEnd,
516 }),
517 TraceRecord::Event(EventRecord {
518 provider: Some(Provider { id: 1, name: "test_provider".into() }),
519 timestamp: 100000000,
520 process: ProcessKoid(1000),
521 thread: ThreadKoid(1001),
522 category: "test".into(),
523 name: "complete_ref".into(),
524 args: vec![],
525 payload: EventPayload::DurationComplete { end_timestamp: 500000000 },
526 }),
527 TraceRecord::Event(EventRecord {
528 provider: Some(Provider { id: 1, name: "test_provider".into() }),
529 timestamp: 500000208,
530 process: ProcessKoid(1000),
531 thread: ThreadKoid(1001),
532 category: "test".into(),
533 name: "async".into(),
534 args: vec![],
535 payload: EventPayload::AsyncBegin { id: 1 },
536 }),
537 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
538 LegacyContextSwitchEvent {
539 provider: Some(Provider { id: 1, name: "test_provider".into() }),
540 timestamp: 500000416,
541 cpu_id: 0,
542 outgoing_thread_state: ThreadState::Suspended,
543 outgoing_process: ProcessKoid(1000),
544 outgoing_thread: ThreadKoid(1001),
545 outgoing_thread_priority: 20,
546 incoming_process: ProcessKoid(1000),
547 incoming_thread: ThreadKoid(1002),
548 incoming_thread_priority: 20,
549 },
550 )),
551 TraceRecord::Event(EventRecord {
552 provider: Some(Provider { id: 1, name: "test_provider".into() }),
553 timestamp: 500000458,
554 process: ProcessKoid(1000),
555 thread: ThreadKoid(1002),
556 category: "test".into(),
557 name: "complete_ref".into(),
558 args: vec![],
559 payload: EventPayload::DurationComplete { end_timestamp: 600000000 },
560 }),
561 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
562 LegacyContextSwitchEvent {
563 provider: Some(Provider { id: 1, name: "test_provider".into() }),
564 timestamp: 600010666,
565 cpu_id: 0,
566 outgoing_thread_state: ThreadState::Suspended,
567 outgoing_process: ProcessKoid(1000),
568 outgoing_thread: ThreadKoid(1002),
569 outgoing_thread_priority: 20,
570 incoming_process: ProcessKoid(1000),
571 incoming_thread: ThreadKoid(1001),
572 incoming_thread_priority: 20,
573 },
574 )),
575 TraceRecord::Event(EventRecord {
576 provider: Some(Provider { id: 1, name: "test_provider".into() }),
577 timestamp: 600016000,
578 process: ProcessKoid(1000),
579 thread: ThreadKoid(1001),
580 category: "test".into(),
581 name: "async".into(),
582 args: vec![],
583 payload: EventPayload::AsyncEnd { id: 1 },
584 }),
585 TraceRecord::Event(EventRecord {
586 provider: Some(Provider { id: 1, name: "test_provider".into() }),
587 timestamp: 630000000,
588 process: ProcessKoid(1000),
589 thread: ThreadKoid(1001),
590 category: "test".into(),
591 name: "begin_end_ref".into(),
592 args: vec![],
593 payload: EventPayload::DurationBegin,
594 }),
595 TraceRecord::Event(EventRecord {
596 provider: Some(Provider { id: 1, name: "test_provider".into() }),
597 timestamp: 950000000,
598 process: ProcessKoid(1000),
599 thread: ThreadKoid(1001),
600 category: "test".into(),
601 name: "begin_end_ref".into(),
602 args: vec![],
603 payload: EventPayload::DurationEnd,
604 }),
605 TraceRecord::Event(EventRecord {
606 provider: Some(Provider { id: 1, name: "test_provider".into() }),
607 timestamp: 1000000000,
608 process: ProcessKoid(1000),
609 thread: ThreadKoid(1001),
610 category: "test".into(),
611 name: "begin_end_ref".into(),
612 args: vec![],
613 payload: EventPayload::DurationEnd,
614 }),
615 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
616 LegacyContextSwitchEvent {
617 provider: Some(Provider { id: 1, name: "test_provider".into() }),
618 timestamp: 1000000666,
619 cpu_id: 0,
620 outgoing_thread_state: ThreadState::Suspended,
621 outgoing_process: ProcessKoid(1000),
622 outgoing_thread: ThreadKoid(1001),
623 outgoing_thread_priority: 20,
624 incoming_process: ProcessKoid(4660),
625 incoming_thread: ThreadKoid(17185),
626 incoming_thread_priority: 0,
627 },
628 )),
629 ],
630 vec![],
632 )
633 }
634}