fxt/
session.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::error::ParseWarning;
6use crate::init::{InitRecord, Ticks};
7use crate::metadata::{
8    MetadataRecord, Provider, ProviderEventMetadataRecord, ProviderInfoMetadataRecord,
9    ProviderSectionMetadataRecord, TraceInfoMetadataRecord,
10};
11use crate::string::{StringRecord, StringRef};
12use crate::thread::{ProcessKoid, ProcessRef, ThreadKoid, ThreadRecord, ThreadRef};
13use crate::{ParseError, ParsedWithOriginalBytes, RawTraceRecord, TraceRecord};
14use flyweights::FlyStr;
15use futures::{AsyncRead, AsyncReadExt, SinkExt, Stream};
16use std::collections::BTreeMap;
17use std::num::{NonZeroU16, NonZeroU8};
18
19pub fn parse_full_session<'a>(
20    buf: &'a [u8],
21) -> Result<(Vec<TraceRecord>, Vec<ParseWarning>), ParseError> {
22    let mut parser = SessionParser::new(std::io::Cursor::new(buf));
23    let mut records = vec![];
24    while let Some(record) = parser.next() {
25        records.push(record?);
26    }
27    Ok((records, parser.warnings().to_owned()))
28}
29
30#[derive(Debug, PartialEq)]
31pub struct SessionParser<R> {
32    buffer: Vec<u8>,
33    reader: R,
34    resolver: ResolveCtx,
35    reader_is_eof: bool,
36    have_seen_magic_number: bool,
37    parsed_bytes: Vec<u8>,
38}
39
40impl<R: std::io::Read> SessionParser<R> {
41    pub fn new(reader: R) -> Self {
42        Self {
43            buffer: vec![],
44            reader,
45            resolver: ResolveCtx::new(),
46            reader_is_eof: false,
47            have_seen_magic_number: false,
48            parsed_bytes: vec![],
49        }
50    }
51}
52
53impl<R> SessionParser<R> {
54    pub fn warnings(&self) -> &[ParseWarning] {
55        self.resolver.warnings()
56    }
57
58    pub fn parsed_bytes(&self) -> &[u8] {
59        return &self.parsed_bytes;
60    }
61
62    fn parse_next(&mut self) -> ParseOutcome {
63        match RawTraceRecord::parse(&self.buffer) {
64            Ok((rem, ParsedWithOriginalBytes { parsed: raw_record, bytes })) => {
65                self.parsed_bytes.extend(bytes);
66                // Make sure the first record we encounter is the magic number record.
67                if raw_record.is_magic_number() {
68                    self.have_seen_magic_number = true;
69                } else {
70                    if !self.have_seen_magic_number {
71                        return ParseOutcome::Error(ParseError::MissingMagicNumber);
72                    }
73                }
74
75                // Resolve the record to end our borrow on the buffer before rotating it.
76                let resolve_res = TraceRecord::resolve(&mut self.resolver, raw_record);
77
78                // Update our buffer based on how much was unused to parse this record.
79                let unused_len = rem.len();
80                let parsed_len = self.buffer.len() - unused_len;
81                self.buffer.copy_within(parsed_len.., 0);
82                self.buffer.truncate(unused_len);
83
84                match resolve_res {
85                    // Updated resolution state but don't have any logical records to return,
86                    // try again.
87                    Ok(None) => ParseOutcome::Continue,
88                    Ok(Some(resolved)) => ParseOutcome::GotRecord(resolved),
89                    Err(e) => ParseOutcome::Error(e),
90                }
91            }
92            Err(nom::Err::Error(e) | nom::Err::Failure(e)) => ParseOutcome::Error(e),
93            Err(nom::Err::Incomplete(needed)) => {
94                ParseOutcome::NeedMoreBytes(match needed {
95                    // Fall back to asking for the max trace record size if we don't know
96                    // how much we want.
97                    nom::Needed::Unknown => 32768,
98                    nom::Needed::Size(n) => n.into(),
99                })
100            }
101        }
102    }
103}
104
105enum ParseOutcome {
106    GotRecord(TraceRecord),
107    Continue,
108    Error(ParseError),
109    NeedMoreBytes(usize),
110}
111
112// We use a macro here because it's difficult to abstract over sync vs. async for read callbacks.
113macro_rules! fill_buffer {
114    ($self:ident, $original_len:ident, $needed:ident, $bytes_read:expr) => {{
115        if $self.reader_is_eof {
116            // We already reached the end of the reader and still failed to parse, so
117            // this iterator is done.
118            return None;
119        } else {
120            let $original_len = $self.buffer.len();
121            $self.buffer.resize($original_len + $needed, 0);
122            let bytes_read = $bytes_read;
123            if bytes_read == 0 {
124                $self.reader_is_eof = true;
125            }
126            $self.buffer.truncate($original_len + bytes_read);
127        }
128    }};
129}
130
131impl<R: std::io::Read> Iterator for SessionParser<R> {
132    type Item = Result<TraceRecord, ParseError>;
133    fn next(&mut self) -> Option<Self::Item> {
134        // Clear out previously parsed bytes
135        self.parsed_bytes.clear();
136        loop {
137            match self.parse_next() {
138                ParseOutcome::GotRecord(r) => return Some(Ok(r)),
139                ParseOutcome::Error(e) => return Some(Err(e)),
140                ParseOutcome::Continue => continue,
141                ParseOutcome::NeedMoreBytes(needed) => {
142                    fill_buffer!(
143                        self,
144                        original_len,
145                        needed,
146                        match self.reader.read(&mut self.buffer[original_len..]) {
147                            Ok(b) => b,
148                            Err(e) => return Some(Err(ParseError::Io(e))),
149                        }
150                    );
151                }
152            }
153        }
154    }
155}
156
157impl<R: AsyncRead + Send + Unpin + 'static> SessionParser<R> {
158    pub fn new_async(
159        reader: R,
160    ) -> (impl Stream<Item = Result<TraceRecord, ParseError>>, fuchsia_async::Task<Vec<ParseWarning>>)
161    {
162        // Bounce the records through a channel to avoid a Stream impl and all the pinning.
163        let (mut send, recv) = futures::channel::mpsc::channel(1);
164        let pump_task = fuchsia_async::Task::spawn(async move {
165            let mut parser = Self {
166                buffer: vec![],
167                reader,
168                resolver: ResolveCtx::new(),
169                reader_is_eof: false,
170                have_seen_magic_number: false,
171                parsed_bytes: vec![],
172            };
173
174            while let Some(next) = parser.next_async().await {
175                if send.send(next).await.is_err() {
176                    // The listener has disconnected, don't need to keep parsing.
177                    break;
178                }
179            }
180
181            parser.warnings().to_owned()
182        });
183
184        (recv, pump_task)
185    }
186
187    pub async fn next_async(&mut self) -> Option<Result<TraceRecord, ParseError>> {
188        // Clear out previously parsed bytes
189        self.parsed_bytes.clear();
190        loop {
191            match self.parse_next() {
192                ParseOutcome::GotRecord(r) => return Some(Ok(r)),
193                ParseOutcome::Error(e) => return Some(Err(e)),
194                ParseOutcome::Continue => continue,
195                ParseOutcome::NeedMoreBytes(needed) => {
196                    fill_buffer!(
197                        self,
198                        original_len,
199                        needed,
200                        match self.reader.read(&mut self.buffer[original_len..]).await {
201                            Ok(b) => b,
202                            Err(e) => return Some(Err(ParseError::Io(e))),
203                        }
204                    );
205                }
206            }
207        }
208    }
209}
210
211#[derive(Debug, PartialEq)]
212pub(crate) struct ResolveCtx {
213    ticks_per_second: u64,
214    current_provider: Option<Provider>,
215    providers: BTreeMap<u32, FlyStr>,
216    strings: BTreeMap<NonZeroU16, FlyStr>,
217    threads: BTreeMap<NonZeroU8, (ProcessKoid, ThreadKoid)>,
218    warnings: Vec<ParseWarning>,
219}
220
221impl ResolveCtx {
222    pub fn new() -> Self {
223        Self {
224            ticks_per_second: 1,
225            current_provider: None,
226            providers: Default::default(),
227            strings: Default::default(),
228            threads: Default::default(),
229            warnings: Default::default(),
230        }
231    }
232
233    pub fn add_warning(&mut self, warning: ParseWarning) {
234        self.warnings.push(warning);
235    }
236
237    pub fn warnings(&self) -> &[ParseWarning] {
238        &self.warnings
239    }
240
241    pub fn current_provider(&self) -> Option<Provider> {
242        self.current_provider.clone()
243    }
244
245    pub fn get_provider(&mut self, id: u32) -> Result<Provider, ParseError> {
246        let name = if let Some(name) = self.providers.get(&id).cloned() {
247            name
248        } else {
249            self.add_warning(ParseWarning::UnknownProviderId(id));
250            "<unknown>".into()
251        };
252
253        Ok(Provider { id, name })
254    }
255
256    pub fn on_metadata_record(
257        &mut self,
258        m: MetadataRecord,
259    ) -> Result<Option<TraceRecord>, ParseError> {
260        Ok(match m {
261            // No action to take on the magic number.
262            MetadataRecord::TraceInfo(TraceInfoMetadataRecord::MagicNumber) => None,
263
264            MetadataRecord::ProviderInfo(ProviderInfoMetadataRecord { provider_id, name }) => {
265                self.providers.insert(provider_id, name.clone());
266                self.current_provider = Some(Provider { id: provider_id, name: name });
267                None
268            }
269            MetadataRecord::ProviderSection(ProviderSectionMetadataRecord { provider_id }) => {
270                let new_provider = self.get_provider(provider_id)?;
271                self.current_provider = Some(new_provider);
272                None
273            }
274            MetadataRecord::ProviderEvent(ProviderEventMetadataRecord { provider_id, event }) => {
275                Some(TraceRecord::ProviderEvent {
276                    provider: self.get_provider(provider_id)?,
277                    event,
278                })
279            }
280            MetadataRecord::Unknown { raw_type } => {
281                self.add_warning(ParseWarning::UnknownMetadataRecordType(raw_type));
282                None
283            }
284        })
285    }
286
287    pub fn on_init_record(&mut self, InitRecord { ticks_per_second }: InitRecord) {
288        self.ticks_per_second = ticks_per_second;
289    }
290
291    pub fn on_string_record(&mut self, s: StringRecord<'_>) {
292        if let Some(idx) = NonZeroU16::new(s.index) {
293            self.strings.insert(idx, s.value.into());
294        } else {
295            self.add_warning(ParseWarning::RecordForZeroStringId);
296        }
297    }
298
299    pub fn on_thread_record(&mut self, t: ThreadRecord) {
300        self.threads.insert(t.index, (t.process_koid, t.thread_koid));
301    }
302
303    pub fn resolve_str(&mut self, s: StringRef<'_>) -> FlyStr {
304        match s {
305            StringRef::Empty => FlyStr::default(),
306            StringRef::Inline(inline) => FlyStr::from(inline),
307            StringRef::Index(id) => {
308                if let Some(s) = self.strings.get(&id).cloned() {
309                    s
310                } else {
311                    self.add_warning(ParseWarning::UnknownStringId(id));
312                    "<unknown>".into()
313                }
314            }
315        }
316    }
317
318    pub fn resolve_process(&mut self, p: ProcessRef) -> ProcessKoid {
319        match p {
320            ProcessRef::Index(id) => {
321                if let Some(process) = self.threads.get(&id).map(|(process, _thread)| *process) {
322                    process
323                } else {
324                    self.add_warning(ParseWarning::UnknownProcessRef(p));
325                    ProcessKoid(u64::MAX)
326                }
327            }
328            ProcessRef::Inline(inline) => inline,
329        }
330    }
331
332    pub fn resolve_thread(&mut self, t: ThreadRef) -> ThreadKoid {
333        match t {
334            ThreadRef::Index(id) => {
335                if let Some(thread) = self.threads.get(&id).map(|(_process, thread)| *thread) {
336                    thread
337                } else {
338                    self.warnings.push(ParseWarning::UnknownThreadRef(t));
339                    ThreadKoid(u64::MAX)
340                }
341            }
342            ThreadRef::Inline(inline) => inline,
343        }
344    }
345
346    pub fn resolve_ticks(&self, t: Ticks) -> i64 {
347        t.scale(self.ticks_per_second)
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use crate::event::{EventPayload, EventRecord};
355    use crate::fxt_builder::FxtBuilder;
356    use crate::scheduling::{LegacyContextSwitchEvent, SchedulingRecord, ThreadState};
357    use futures::{StreamExt, TryStreamExt};
358
359    static SIMPLE_TRACE_FXT: &[u8] =
360        include_bytes!("../../../../trace2json/test_data/simple_trace.fxt");
361
362    #[test]
363    fn test_parse_full_session() {
364        let session = parse_full_session(SIMPLE_TRACE_FXT).unwrap();
365        assert_eq!(session, expected_simple_trace_records());
366    }
367
368    #[fuchsia::test]
369    async fn test_async_parse() {
370        let (mut send_chunks, recv_chunks) = futures::channel::mpsc::unbounded();
371
372        let parse_trace_session = fuchsia_async::Task::spawn(async move {
373            let (records, parse_task) = SessionParser::new_async(recv_chunks.into_async_read());
374            let records = records.map(|res| res.unwrap()).collect::<Vec<_>>().await;
375            (records, parse_task.await)
376        });
377
378        // Send tiny chunks to shake out any incorrect streaming parser code.
379        for chunk in SIMPLE_TRACE_FXT.chunks(1) {
380            send_chunks.send(Ok(chunk)).await.unwrap();
381        }
382        drop(send_chunks);
383
384        assert_eq!(parse_trace_session.await, expected_simple_trace_records());
385    }
386
387    #[fuchsia::test]
388    fn session_with_unknown_record_in_middle() {
389        let mut session = vec![];
390
391        // Add the magic record from the simple trace before we add our bogus record so we don't
392        // error on an invalid first record.
393        session.extend(&SIMPLE_TRACE_FXT[..8]);
394
395        // Add our bogus record with an unknown type, expecting to skip over it.
396        let mut header = crate::BaseTraceHeader::empty();
397        header.set_raw_type(14); // not currently a valid ordinal
398        session.extend(FxtBuilder::new(header).atom(&(0u8..27u8).collect::<Vec<u8>>()).build());
399
400        // Add the rest of the simple trace.
401        session.extend(&SIMPLE_TRACE_FXT[8..]);
402
403        let (observed_parsed, observed_warnings) = parse_full_session(&session).unwrap();
404        let (expected_parsed, expected_warnings) =
405            (expected_simple_trace_records().0, vec![ParseWarning::UnknownTraceRecordType(14)]);
406        assert_eq!(observed_parsed, expected_parsed);
407        assert_eq!(observed_warnings, expected_warnings);
408    }
409
410    #[fuchsia::test]
411    fn session_with_incomplete_trailing_record() {
412        use crate::string::STRING_REF_INLINE_BIT;
413
414        let mut session = SIMPLE_TRACE_FXT.to_vec();
415
416        // Make a 2 word header with some arbitrary values.
417        let category = "test_category";
418        let name = "test_instant";
419        let mut header = crate::event::EventHeader::empty();
420        header.set_category_ref(category.len() as u16 | STRING_REF_INLINE_BIT);
421        header.set_name_ref(name.len() as u16 | STRING_REF_INLINE_BIT);
422        header.set_event_type(crate::event::INSTANT_EVENT_TYPE);
423
424        let mut final_record = FxtBuilder::new(header)
425            .atom(2048u64.to_le_bytes()) // timestamp
426            .atom(512u64.to_le_bytes()) // process
427            .atom(513u64.to_le_bytes()) // thread
428            .atom(category)
429            .atom(name)
430            .build();
431        let byte_to_make_valid = final_record.pop().unwrap();
432
433        for byte in final_record {
434            session.push(byte);
435            assert_eq!(
436                parse_full_session(&session).expect("should parse without final incomplete record"),
437                expected_simple_trace_records(),
438            );
439        }
440
441        let (mut expected_with_final_record, expected_warnings) = expected_simple_trace_records();
442        expected_with_final_record.push(TraceRecord::Event(EventRecord {
443            provider: Some(Provider { id: 1, name: "test_provider".into() }),
444            timestamp: 85333,
445            process: ProcessKoid(512),
446            thread: ThreadKoid(513),
447            category: category.into(),
448            name: name.into(),
449            args: vec![],
450            payload: EventPayload::Instant,
451        }));
452
453        session.push(byte_to_make_valid);
454        assert_eq!(
455            parse_full_session(&session).unwrap(),
456            (expected_with_final_record, expected_warnings)
457        );
458    }
459
460    fn expected_simple_trace_records() -> (Vec<TraceRecord>, Vec<ParseWarning>) {
461        (
462            vec![
463                TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
464                    LegacyContextSwitchEvent {
465                        provider: Some(Provider { id: 1, name: "test_provider".into() }),
466                        timestamp: 41,
467                        cpu_id: 0,
468                        outgoing_thread_state: ThreadState::Suspended,
469                        outgoing_process: ProcessKoid(4660),
470                        outgoing_thread: ThreadKoid(17185),
471                        outgoing_thread_priority: 0,
472                        incoming_process: ProcessKoid(1000),
473                        incoming_thread: ThreadKoid(1001),
474                        incoming_thread_priority: 20,
475                    },
476                )),
477                TraceRecord::Event(EventRecord {
478                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
479                    timestamp: 0,
480                    process: ProcessKoid(1000),
481                    thread: ThreadKoid(1001),
482                    category: "test".into(),
483                    name: "begin_end_ref".into(),
484                    args: vec![],
485                    payload: EventPayload::DurationBegin,
486                }),
487                TraceRecord::Event(EventRecord {
488                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
489                    timestamp: 110000000,
490                    process: ProcessKoid(1000),
491                    thread: ThreadKoid(1001),
492                    category: "test".into(),
493                    name: "complete_inline".into(),
494                    args: vec![],
495                    payload: EventPayload::DurationComplete { end_timestamp: 150000000 },
496                }),
497                TraceRecord::Event(EventRecord {
498                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
499                    timestamp: 200000000,
500                    process: ProcessKoid(1000),
501                    thread: ThreadKoid(1001),
502                    category: "test".into(),
503                    name: "begin_end_inline".into(),
504                    args: vec![],
505                    payload: EventPayload::DurationBegin,
506                }),
507                TraceRecord::Event(EventRecord {
508                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
509                    timestamp: 450000000,
510                    process: ProcessKoid(1000),
511                    thread: ThreadKoid(1001),
512                    category: "test".into(),
513                    name: "begin_end_inline".into(),
514                    args: vec![],
515                    payload: EventPayload::DurationEnd,
516                }),
517                TraceRecord::Event(EventRecord {
518                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
519                    timestamp: 100000000,
520                    process: ProcessKoid(1000),
521                    thread: ThreadKoid(1001),
522                    category: "test".into(),
523                    name: "complete_ref".into(),
524                    args: vec![],
525                    payload: EventPayload::DurationComplete { end_timestamp: 500000000 },
526                }),
527                TraceRecord::Event(EventRecord {
528                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
529                    timestamp: 500000208,
530                    process: ProcessKoid(1000),
531                    thread: ThreadKoid(1001),
532                    category: "test".into(),
533                    name: "async".into(),
534                    args: vec![],
535                    payload: EventPayload::AsyncBegin { id: 1 },
536                }),
537                TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
538                    LegacyContextSwitchEvent {
539                        provider: Some(Provider { id: 1, name: "test_provider".into() }),
540                        timestamp: 500000416,
541                        cpu_id: 0,
542                        outgoing_thread_state: ThreadState::Suspended,
543                        outgoing_process: ProcessKoid(1000),
544                        outgoing_thread: ThreadKoid(1001),
545                        outgoing_thread_priority: 20,
546                        incoming_process: ProcessKoid(1000),
547                        incoming_thread: ThreadKoid(1002),
548                        incoming_thread_priority: 20,
549                    },
550                )),
551                TraceRecord::Event(EventRecord {
552                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
553                    timestamp: 500000458,
554                    process: ProcessKoid(1000),
555                    thread: ThreadKoid(1002),
556                    category: "test".into(),
557                    name: "complete_ref".into(),
558                    args: vec![],
559                    payload: EventPayload::DurationComplete { end_timestamp: 600000000 },
560                }),
561                TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
562                    LegacyContextSwitchEvent {
563                        provider: Some(Provider { id: 1, name: "test_provider".into() }),
564                        timestamp: 600010666,
565                        cpu_id: 0,
566                        outgoing_thread_state: ThreadState::Suspended,
567                        outgoing_process: ProcessKoid(1000),
568                        outgoing_thread: ThreadKoid(1002),
569                        outgoing_thread_priority: 20,
570                        incoming_process: ProcessKoid(1000),
571                        incoming_thread: ThreadKoid(1001),
572                        incoming_thread_priority: 20,
573                    },
574                )),
575                TraceRecord::Event(EventRecord {
576                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
577                    timestamp: 600016000,
578                    process: ProcessKoid(1000),
579                    thread: ThreadKoid(1001),
580                    category: "test".into(),
581                    name: "async".into(),
582                    args: vec![],
583                    payload: EventPayload::AsyncEnd { id: 1 },
584                }),
585                TraceRecord::Event(EventRecord {
586                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
587                    timestamp: 630000000,
588                    process: ProcessKoid(1000),
589                    thread: ThreadKoid(1001),
590                    category: "test".into(),
591                    name: "begin_end_ref".into(),
592                    args: vec![],
593                    payload: EventPayload::DurationBegin,
594                }),
595                TraceRecord::Event(EventRecord {
596                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
597                    timestamp: 950000000,
598                    process: ProcessKoid(1000),
599                    thread: ThreadKoid(1001),
600                    category: "test".into(),
601                    name: "begin_end_ref".into(),
602                    args: vec![],
603                    payload: EventPayload::DurationEnd,
604                }),
605                TraceRecord::Event(EventRecord {
606                    provider: Some(Provider { id: 1, name: "test_provider".into() }),
607                    timestamp: 1000000000,
608                    process: ProcessKoid(1000),
609                    thread: ThreadKoid(1001),
610                    category: "test".into(),
611                    name: "begin_end_ref".into(),
612                    args: vec![],
613                    payload: EventPayload::DurationEnd,
614                }),
615                TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
616                    LegacyContextSwitchEvent {
617                        provider: Some(Provider { id: 1, name: "test_provider".into() }),
618                        timestamp: 1000000666,
619                        cpu_id: 0,
620                        outgoing_thread_state: ThreadState::Suspended,
621                        outgoing_process: ProcessKoid(1000),
622                        outgoing_thread: ThreadKoid(1001),
623                        outgoing_thread_priority: 20,
624                        incoming_process: ProcessKoid(4660),
625                        incoming_thread: ThreadKoid(17185),
626                        incoming_thread_priority: 0,
627                    },
628                )),
629            ],
630            // There should be no warnings produced from these tests.
631            vec![],
632        )
633    }
634}