Skip to main content

starnix_modules_perfetto_consumer/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![recursion_limit = "256"]
6
7use anyhow::bail;
8use fuchsia_trace::{
9    BufferingMode, ProlongedContext, TraceState, category_enabled, trace_state, trace_string_ref_t,
10};
11use fuchsia_trace_observer::TraceObserver;
12use futures::{SinkExt, StreamExt};
13use fxt::blob::{BlobHeader, BlobType};
14use perfetto_protos::perfetto::protos::trace_config::buffer_config::FillPolicy;
15use perfetto_protos::perfetto::protos::trace_config::{BufferConfig, DataSource};
16use perfetto_protos::perfetto::protos::{
17    DataSourceConfig, DisableTracingRequest, EnableTracingRequest, FreeBuffersRequest,
18    FtraceConfig, ReadBuffersRequest, TraceConfig, ipc_frame,
19};
20use perfetto_trace_protos::perfetto::protos::frame_timeline_event::{
21    ActualDisplayFrameStart, ActualSurfaceFrameStart, Event, ExpectedDisplayFrameStart,
22    ExpectedSurfaceFrameStart,
23};
24use perfetto_trace_protos::perfetto::protos::ftrace_event::Event::Print;
25use perfetto_trace_protos::perfetto::protos::trace_packet;
26use starnix_core::security;
27use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
28use starnix_core::task::tracing::TracePerformanceEventManager;
29use starnix_core::task::{CurrentTask, Kernel, LockedAndTask};
30use starnix_core::vfs::FsString;
31use starnix_logging::{
32    CATEGORY_ATRACE, NAME_PERFETTO_BLOB, log_debug, log_error, log_info, log_warn,
33};
34use starnix_perfetto_trace_decoder::{decode_read_buffers_response, decode_trace, encode_trace};
35use starnix_sync::{Locked, Unlocked};
36use starnix_uapi::errors::Errno;
37
38mod atrace;
39
40const PERFETTO_BUFFER_SIZE_KB: u32 = 63488;
41
42/// State needed to act upon trace state changes.
43struct CallbackState {
44    /// The previously observed trace state.
45    prev_state: TraceState,
46    /// Path to the Perfetto consumer socket.
47    socket_path: FsString,
48    /// Connection to the consumer socket, if it has been initialized. This gets initialized the
49    /// first time it is needed.
50    connection: Option<perfetto::Consumer>,
51    /// Prolonged trace context to prevent the Fuchsia trace session from terminating while reading
52    /// data from Perfetto.
53    prolonged_context: Option<ProlongedContext>,
54    /// Partial trace packet returned from Perfetto but not yet written to Fuchsia.
55    packet_data: Vec<u8>,
56
57    event_manager: TracePerformanceEventManager,
58}
59
60impl CallbackState {
61    fn connection(
62        &mut self,
63        locked: &mut Locked<Unlocked>,
64        current_task: &CurrentTask,
65    ) -> Result<&mut perfetto::Consumer, anyhow::Error> {
66        match self.connection {
67            None => {
68                self.connection =
69                    Some(perfetto::Consumer::new(locked, current_task, self.socket_path.as_ref())?);
70                Ok(self.connection.as_mut().unwrap())
71            }
72            Some(ref mut conn) => Ok(conn),
73        }
74    }
75
76    fn handle_stopped(&mut self) {
77        self.prolonged_context = None;
78        self.packet_data.clear();
79        self.event_manager.stop();
80        self.event_manager.clear();
81    }
82
83    fn on_state_change(
84        &mut self,
85        locked: &mut Locked<Unlocked>,
86        new_state: TraceState,
87        current_task: &CurrentTask,
88    ) -> Result<(), anyhow::Error> {
89        let prev_state = self.prev_state;
90        self.prev_state = new_state;
91        log_debug!(
92            "Perfetto consumer state change. new_state: {new_state:?}, prev_state: {prev_state:?}"
93        );
94        match new_state {
95            TraceState::Started => {
96                if prev_state != TraceState::Stopped {
97                    // This means something unexpected has caused the trace_engine to change
98                    // states faster than we're processing the trace observer events.
99                    log_error!(
100                        "Started received in {prev_state:?} state! Cleaning up then starting."
101                    );
102                    self.handle_stopped();
103                }
104                self.prolonged_context = ProlongedContext::acquire();
105                let connection = self.connection(locked, current_task)?;
106                // A fixed set of data sources that may be of interest. As demand for other sources
107                // is found, add them here, and it may become worthwhile to allow this set to be
108                // configurable per trace session.
109                let mut data_sources = vec![
110                    DataSource {
111                        config: Some(DataSourceConfig {
112                            name: Some("track_event".to_string()),
113                            ..Default::default()
114                        }),
115                        ..Default::default()
116                    },
117                    DataSource {
118                        config: Some(DataSourceConfig {
119                            name: Some("android.surfaceflinger.frame".to_string()),
120                            target_buffer: Some(0),
121                            ..Default::default()
122                        }),
123                        ..Default::default()
124                    },
125                    DataSource {
126                        config: Some(DataSourceConfig {
127                            name: Some("android.surfaceflinger.frametimeline".to_string()),
128                            target_buffer: Some(0),
129                            ..Default::default()
130                        }),
131                        ..Default::default()
132                    },
133                ];
134                if category_enabled(CATEGORY_ATRACE) {
135                    data_sources.push(DataSource {
136                        config: Some(DataSourceConfig {
137                            name: Some("linux.ftrace".to_string()),
138                            ftrace_config: Some(FtraceConfig {
139                                ftrace_events: vec!["ftrace/print".to_string()],
140                                // Enable all supported atrace categories. This could be improved
141                                // in the future to be a subset that is configurable by each trace
142                                // session.
143                                atrace_categories: vec![
144                                    "am".to_string(),
145                                    "adb".to_string(),
146                                    "aidl".to_string(),
147                                    "dalvik".to_string(),
148                                    "audio".to_string(),
149                                    "binder_lock".to_string(),
150                                    "binder_driver".to_string(),
151                                    "bionic".to_string(),
152                                    "camera".to_string(),
153                                    "database".to_string(),
154                                    "gfx".to_string(),
155                                    "hal".to_string(),
156                                    "input".to_string(),
157                                    "network".to_string(),
158                                    "nnapi".to_string(),
159                                    "pm".to_string(),
160                                    "power".to_string(),
161                                    "rs".to_string(),
162                                    "res".to_string(),
163                                    "rro".to_string(),
164                                    "sched".to_string(),
165                                    "sm".to_string(),
166                                    "ss".to_string(),
167                                    "vibrator".to_string(),
168                                    "video".to_string(),
169                                    "view".to_string(),
170                                    "webview".to_string(),
171                                    "wm".to_string(),
172                                ],
173                                atrace_apps: vec!["*".to_string()],
174                                ..Default::default()
175                            }),
176                            ..Default::default()
177                        }),
178                        ..Default::default()
179                    });
180                }
181                connection.enable_tracing(
182                    locked,
183                    current_task,
184                    EnableTracingRequest {
185                        trace_config: Some(TraceConfig {
186                            buffers: vec![BufferConfig {
187                                size_kb: Some(PERFETTO_BUFFER_SIZE_KB),
188                                fill_policy: Some(FillPolicy::Discard.into()),
189                                ..Default::default()
190                            }],
191                            data_sources,
192                            ..Default::default()
193                        }),
194                        attach_notification_only: None,
195                    },
196                )?;
197                // Once tracing has started, notify the event manager so it can start tracking processes.
198                self.event_manager.start(current_task.kernel());
199            }
200            TraceState::Stopping => {
201                if prev_state != TraceState::Started {
202                    // If we receive a stop request and we don't think we're actually tracing, our
203                    // local state likely desynced from the global trace state. Clean up our state
204                    // and ensure we're stopped so we re-synchronize.
205                    log_error!("Stopping received in {prev_state:?} state! Cleaning up.");
206                    self.handle_stopped();
207                    return Ok(());
208                }
209
210                // We want to hold the prolonged context to ensure the trace session doesn't
211                // exit out from under us, but we also want to ensure we drop the prolonged
212                // context if we bail for whatever reason below.
213                let _local_prolonged_context = std::mem::replace(&mut self.prolonged_context, None);
214                let start_time = std::time::Instant::now();
215
216                let connection = self.connection(locked, current_task)?;
217                let disable_request =
218                    connection.disable_tracing(locked, current_task, DisableTracingRequest {})?;
219                loop {
220                    let frame = connection.next_frame_blocking(locked, current_task)?;
221                    if frame.request_id == Some(disable_request) {
222                        break;
223                    } else {
224                        log_error!(
225                            "Ignoring frame while looking for DisableTracingRequest: {frame:?}"
226                        );
227                    }
228                }
229
230                let read_buffers_request =
231                    connection.read_buffers(locked, current_task, ReadBuffersRequest {})?;
232
233                let blob_name_ref = {
234                    let Some(context) = fuchsia_trace::Context::acquire() else {
235                        bail!("Tracing stopped despite holding prolonged context");
236                    };
237                    context.register_string_literal(NAME_PERFETTO_BLOB)
238                };
239
240                // IPC responses may be spread across multiple frames, so loop until we get a
241                // message that indicates it is the last one. Additionally, if there are
242                // unrelated messages on the socket (e.g. leftover from a previous trace
243                // session), the loop will read past and ignore them.
244                loop {
245                    let frame = self
246                        .connection(locked, current_task)?
247                        .next_frame_blocking(locked, current_task)?;
248                    if frame.request_id != Some(read_buffers_request) {
249                        continue;
250                    } else {
251                        log_debug!(
252                            "perfetto_consumer ignoring frame while looking for ReadBuffersRequest {read_buffers_request}: {frame:?}"
253                        );
254                    }
255                    if let Some(ipc_frame::Msg::MsgInvokeMethodReply(reply)) = &frame.msg {
256                        if let Ok(response) = decode_read_buffers_response(
257                            reply.reply_proto.as_deref().unwrap_or(&[]),
258                        ) {
259                            for slice in &response.slices {
260                                if let Some(data) = &slice.data {
261                                    self.packet_data.extend(data);
262                                }
263                                if slice.last_slice_for_packet.unwrap_or(false) {
264                                    let mut blob_data = Vec::new();
265                                    // Packet field number = 1, length delimited type = 2.
266                                    blob_data.push(1 << 3 | 2);
267                                    // Push a varint encoded length.
268                                    // See https://protobuf.dev/programming-guides/encoding/
269                                    const HIGH_BIT: u8 = 0x80;
270                                    const LOW_SEVEN_BITS: usize = 0x7F;
271                                    let mut value = self.packet_data.len();
272                                    while value >= HIGH_BIT as usize {
273                                        blob_data.push((value & LOW_SEVEN_BITS) as u8 | HIGH_BIT);
274                                        value >>= 7;
275                                    }
276                                    blob_data.push(value as u8);
277                                    // `append` moves all data out of the passed Vec, so
278                                    // s.packet_data will be empty after this call.
279                                    blob_data.append(&mut self.packet_data);
280
281                                    // At this point blob_data is a full Perfetto Trace protobuf.
282                                    // Parse the data and replace the linux pids with their
283                                    // corresponding koid.
284                                    let rewritten =
285                                        self.rewrite_pids(&blob_data).unwrap_or(blob_data);
286
287                                    // Ignore a failure to write the packet here. We don't
288                                    // return immediately because we want to allow the
289                                    // remaining records to be recorded as dropped.
290                                    //
291                                    // Once we fill a buffer in oneshot mode, we expect to drop
292                                    // the remaining packets here.
293                                    //
294                                    // Rather than logging here, allow the trace system to
295                                    // aggregate the number of records dropped and we can query
296                                    // the trace system later to determine if we dropped
297                                    // records when it's more efficient to do so.
298                                    let _ = self.forward_packet(blob_name_ref, rewritten);
299                                }
300                            }
301                        } else {
302                            log_error!("perfetto_consumer cannot decode protobuf from {reply:?}");
303                        }
304                        if reply.has_more != Some(true) {
305                            break;
306                        }
307                    } else {
308                        log_error!(
309                            "perfetto_consumer ignoring non-MsgInvokeMethodReply message: {frame:?}"
310                        );
311                    }
312                }
313                // The response to a free buffers request does not have anything meaningful,
314                // so we don't need to worry about tracking the request id to match to the
315                // response.
316                let _free_buffers_request_id =
317                    self.connection(locked, current_task)?.free_buffers(
318                        locked,
319                        current_task,
320                        FreeBuffersRequest { buffer_ids: vec![0] },
321                    )?;
322                let elapsed = start_time.elapsed().as_millis();
323                log_info!(
324                    "Perfetto frames copied, dropping prolonged trace context. Processing took {elapsed} ms"
325                );
326            }
327            TraceState::Stopped => {
328                self.handle_stopped();
329            }
330        }
331        Ok(())
332    }
333
334    // Forward `data` to the trace buffer by wrapping it in fxt blob records with the name
335    // `blob_name_ref`..
336    fn forward_packet(&self, blob_name_ref: trace_string_ref_t, data: Vec<u8>) -> Option<usize> {
337        // The blob data may be larger than what we can fit in a single record. If so, split it up
338        // over multiple chunks.
339        let mut bytes_written = 0;
340        let mut data_to_write = &data[..];
341
342        // We want to break the data into chunks:
343        // - Bigger chunks means less per-write overheader
344        // - Bigger chunks means less overhead due to blob meta
345        //
346        // However, too big and the blobs won't fit nicely into the trace buffer.
347        // The trace buffer is minimum 1MiB in size, so writing 4k at a time seems like a
348        // reasonable place to start that is both reasonably large and not going to leave a ton of
349        // space at the end of the trace buffer.
350        let max_chunk_size = 4096;
351        while !data_to_write.is_empty() {
352            let chunk_size = data_to_write.len().min(max_chunk_size);
353            let chunk = &data_to_write[..chunk_size];
354            self.forward_blob(blob_name_ref, &chunk)?;
355            data_to_write = &data_to_write[chunk_size..];
356            bytes_written += chunk_size;
357        }
358        Some(bytes_written)
359    }
360
361    // Given a blob name, wrap the data in an fxt perfetto blob and write it to the trace buffer.
362    fn forward_blob(&self, blob_name_ref: trace_string_ref_t, blob_data: &[u8]) -> Option<usize> {
363        let mut header = BlobHeader::empty();
364        header.set_name_ref(blob_name_ref.encoded_value);
365        header.set_payload_len(blob_data.len() as u16);
366        header.set_blob_format_type(BlobType::Perfetto.into());
367
368        let record_bytes = fxt::fxt_builder::FxtBuilder::new(header).atom(blob_data).build();
369        assert!(record_bytes.len() % std::mem::size_of::<u64>() == 0);
370        let num_words = record_bytes.len() / std::mem::size_of::<u64>();
371        let record_data = record_bytes.as_ptr();
372        #[allow(
373            clippy::undocumented_unsafe_blocks,
374            reason = "Force documented unsafe blocks in Starnix"
375        )]
376        let record_words =
377            unsafe { std::slice::from_raw_parts(record_data.cast::<u64>(), num_words) };
378
379        while let Some(context) = fuchsia_trace::Context::acquire() {
380            if let Some(bytes) = context.copy_record(record_words) {
381                return Some(bytes);
382            }
383            if context.buffering_mode() != BufferingMode::Streaming {
384                // If we're not in streaming mode, there will never be room for this record. Drop
385                // it.
386                return None;
387            }
388            // We're writing records pretty quick here, we're just forwarding data from
389            // perfetto with no breaks. trace_manager might not be able to keep up if it's also
390            // servicing other trace-providers. We want to back off we if find that we run out
391            // of space.
392            //
393            // We drop the context to decrement the refcount on the trace session. This allows
394            // trace-engine to switch the buffers if needed and drain out the buffers so that
395            // when we wake, there will hopefully be room.
396            //
397            // TODO(b/304532640)
398            drop(context);
399            std::thread::sleep(std::time::Duration::from_millis(100));
400        }
401        None
402    }
403
404    fn rewrite_pids(&mut self, protobuf_blob: &Vec<u8>) -> anyhow::Result<Vec<u8>> {
405        let mut proto = decode_trace(protobuf_blob.as_slice())?;
406        for p in &mut proto.packet {
407            if let Some(ref mut data) = p.data {
408                match data {
409                    trace_packet::Data::FrameTimelineEvent(frame_timeline_event) => {
410                        if let Some(evt) = &mut frame_timeline_event.event {
411                            // Update the linux pid to the Fuchsia pid. Each event has its own
412                            // match arm since the variant data is of a different type for each event.
413                            match evt {
414                                Event::ExpectedDisplayFrameStart(ExpectedDisplayFrameStart {
415                                    pid,
416                                    ..
417                                })
418                                | Event::ActualDisplayFrameStart(ActualDisplayFrameStart {
419                                    pid,
420                                    ..
421                                })
422                                | Event::ExpectedSurfaceFrameStart(ExpectedSurfaceFrameStart {
423                                    pid,
424                                    ..
425                                })
426                                | Event::ActualSurfaceFrameStart(ActualSurfaceFrameStart {
427                                    pid,
428                                    ..
429                                }) => {
430                                    pid.as_mut().map(|pid| {
431                                        *pid = self.map_to_koid_val(*pid);
432                                    });
433                                }
434                                Event::FrameEnd(_frame_end) => {}
435                            }
436                        }
437                    }
438                    trace_packet::Data::FtraceEvents(ftrace_bundle) => {
439                        for evt in &mut ftrace_bundle.event {
440                            if let Some(ref mut pid) = evt.pid {
441                                *pid = self.map_thread_to_koid_val(*pid as i32) as u32;
442                            }
443                            if let Some(ref mut event_data) = evt.event {
444                                match event_data {
445                                    Print(print) => {
446                                        if let Some(ref mut data) = print.buf {
447                                            *data = self.map_print_event(data)
448                                        }
449                                    }
450                                    _ => (),
451                                }
452                            }
453                        }
454                    }
455                    // No need to process other data; we only fixup data that references the pid.
456                    _ => (),
457                }
458            }
459        }
460        Ok(encode_trace(&proto))
461    }
462
463    fn map_print_event(&mut self, data: &String) -> String {
464        if let Some(mut event) = atrace::ATraceEvent::parse(&data) {
465            match event {
466                atrace::ATraceEvent::Begin { ref mut pid, .. }
467                | atrace::ATraceEvent::End { ref mut pid }
468                | atrace::ATraceEvent::Instant { ref mut pid, .. }
469                | atrace::ATraceEvent::AsyncBegin { ref mut pid, .. }
470                | atrace::ATraceEvent::AsyncEnd { ref mut pid, .. }
471                | atrace::ATraceEvent::Counter { ref mut pid, .. }
472                | atrace::ATraceEvent::AsyncTrackBegin { ref mut pid, .. }
473                | atrace::ATraceEvent::AsyncTrackEnd { ref mut pid, .. }
474                | atrace::ATraceEvent::Track { ref mut pid, .. } => {
475                    *pid = self.map_to_koid_val(*pid as i32) as u64
476                }
477            }
478            event.data()
479        } else {
480            data.to_string()
481        }
482    }
483
484    fn map_thread_to_koid_val(&mut self, pid: i32) -> i32 {
485        if pid == 0 {
486            return 0;
487        }
488        // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
489        // usually not an issue except for artificial koids which have the 2^63 bit set, such as
490        // virtual threads. This is consistent with the perfetto data importer code:
491        // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
492        self.event_manager.map_tid_to_koid(pid).raw_koid() as i32
493    }
494
495    fn map_to_koid_val(&mut self, pid: i32) -> i32 {
496        // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
497        // usually not an issue except for artificial koids which have the 2^63 bit set, such as
498        // virtual threads. This is consistent with the perfetto data importer code:
499        // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
500        if pid == 0 {
501            return 0;
502        }
503        self.event_manager.map_pid_to_koid(pid).raw_koid() as i32
504    }
505}
506
507pub fn start_perfetto_consumer_thread(kernel: &Kernel, socket_path: FsString) -> Result<(), Errno> {
508    let (mut tx, mut rx) = futures::channel::mpsc::channel::<TraceState>(32);
509
510    // Listens for trace state changes and sends them to Perfetto consumer thread.
511    // Unlike the perfetto thread, it won't block, so we can spawn it on the main async executor.
512    kernel.kthreads.spawn_future(
513        move || async move {
514            let observer = TraceObserver::new();
515            while let Ok(state) = observer.on_state_changed().await {
516                if let Err(e) = tx.send(state).await {
517                    log_error!("perfetto-trace-observer failed to send trace state change: {:?}. Receiver dropped.", e);
518                    return;
519                }
520            }
521        },
522        "perfetto-trace-observer",
523    );
524
525    // Perfetto consumer task: reads state changes from the channel and handles them.
526    // This task can block, so we spawn it on a dedicated thread to not block the observer or the
527    // main async executor.
528    let worker_closure = async move |locked_and_task: LockedAndTask<'_>| {
529        let mut callback_state = CallbackState {
530            prev_state: TraceState::Stopped,
531            socket_path,
532            connection: None,
533            prolonged_context: None,
534            packet_data: Vec::new(),
535            event_manager: TracePerformanceEventManager::new(),
536        };
537
538        fn handle_state_change(
539            callback_state: &mut CallbackState,
540            locked_and_task: &LockedAndTask<'_>,
541            state: TraceState,
542        ) -> Result<(), anyhow::Error> {
543            let current_task = locked_and_task.current_task();
544            let locked = &mut locked_and_task.unlocked();
545            // TODO: https://fxbug.dev/457381697 - Revise how this kernel-internal work is security-
546            // checked.
547            let creds = security::creds_start_internal_operation(current_task);
548            current_task.override_creds(creds, || {
549                callback_state.on_state_change(locked, state, current_task)
550            })
551        }
552
553        // Check for tracing already started before we began observing.
554        // This happens when tracing is started on boot.
555        let mut state = trace_state();
556        if trace_state() == TraceState::Started {
557            const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
558            // When we do boot tracing, it is possible (even likely), that starnix has started but
559            // perfetto may not be ready to be connected to yet.
560            // In that case poll until it has started.
561            loop {
562                match handle_state_change(&mut callback_state, &locked_and_task, state) {
563                    Ok(_) => break, // Success, exit loop.
564                    Err(e) => {
565                        if let Some(errno) = e.downcast_ref::<Errno>() {
566                            if errno == &starnix_uapi::errors::ENOENT
567                                || errno == &starnix_uapi::errors::ECONNREFUSED
568                            {
569                                log_warn!(
570                                    "perfetto_consumer initial start tracing failed because perfetto socket connection not established: {e:?} retrying in 5 seconds..."
571                                );
572                                std::thread::sleep(RETRY_DELAY);
573                                callback_state.prev_state = TraceState::Stopped;
574                                callback_state.connection = None;
575                                callback_state.prolonged_context = None;
576                                state = trace_state();
577                                continue; // Retry
578                            }
579                        }
580                        // For any other error, log and exit loop.
581                        log_error!(
582                            "perfetto_consumer initial start tracing failed with error: {e:?}"
583                        );
584                        break;
585                    }
586                }
587            }
588        }
589
590        while let Some(state) = rx.next().await {
591            handle_state_change(&mut callback_state, &locked_and_task, state).unwrap_or_else(|e| {
592                log_error!("perfetto_consumer state change callback error: {:?}", e);
593            })
594        }
595    };
596    let worker_req = SpawnRequestBuilder::new()
597        .with_debug_name("perfetto-consumer")
598        .with_async_closure(worker_closure)
599        .build();
600    kernel.kthreads.spawner().spawn_from_request(worker_req);
601
602    Ok(())
603}