Skip to main content

starnix_modules_perfetto_consumer/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![recursion_limit = "256"]
6
7use anyhow::bail;
8use fuchsia_trace::{
9    BufferingMode, ProlongedContext, TraceState, category_enabled, trace_state, trace_string_ref_t,
10};
11use fuchsia_trace_observer::TraceObserver;
12use fxt::blob::{BlobHeader, BlobType};
13use perfetto_protos::perfetto::protos::trace_config::buffer_config::FillPolicy;
14use perfetto_protos::perfetto::protos::trace_config::{BufferConfig, DataSource};
15use perfetto_protos::perfetto::protos::{
16    DataSourceConfig, DisableTracingRequest, EnableTracingRequest, FreeBuffersRequest,
17    FtraceConfig, ReadBuffersRequest, TraceConfig, ipc_frame,
18};
19use perfetto_trace_protos::perfetto::protos::frame_timeline_event::{
20    ActualDisplayFrameStart, ActualSurfaceFrameStart, Event, ExpectedDisplayFrameStart,
21    ExpectedSurfaceFrameStart,
22};
23use perfetto_trace_protos::perfetto::protos::ftrace_event::Event::Print;
24use perfetto_trace_protos::perfetto::protos::trace_packet;
25use starnix_core::security;
26use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
27use starnix_core::task::tracing::TracePerformanceEventManager;
28use starnix_core::task::{CurrentTask, Kernel, LockedAndTask};
29use starnix_core::vfs::FsString;
30use starnix_logging::{
31    CATEGORY_ATRACE, NAME_PERFETTO_BLOB, log_debug, log_error, log_info, log_warn,
32};
33use starnix_perfetto_trace_decoder::{decode_read_buffers_response, decode_trace, encode_trace};
34use starnix_sync::{Locked, Unlocked};
35use starnix_uapi::errors::Errno;
36
37mod atrace;
38
39const PERFETTO_BUFFER_SIZE_KB: u32 = 63488;
40
41/// State needed to act upon trace state changes.
42struct CallbackState {
43    /// The previously observed trace state.
44    prev_state: TraceState,
45    /// Path to the Perfetto consumer socket.
46    socket_path: FsString,
47    /// Connection to the consumer socket, if it has been initialized. This gets initialized the
48    /// first time it is needed.
49    connection: Option<perfetto::Consumer>,
50    /// Prolonged trace context to prevent the Fuchsia trace session from terminating while reading
51    /// data from Perfetto.
52    prolonged_context: Option<ProlongedContext>,
53    /// Partial trace packet returned from Perfetto but not yet written to Fuchsia.
54    packet_data: Vec<u8>,
55
56    event_manager: TracePerformanceEventManager,
57}
58
59impl CallbackState {
60    fn connection(
61        &mut self,
62        locked: &mut Locked<Unlocked>,
63        current_task: &CurrentTask,
64    ) -> Result<&mut perfetto::Consumer, anyhow::Error> {
65        match self.connection {
66            None => {
67                self.connection =
68                    Some(perfetto::Consumer::new(locked, current_task, self.socket_path.as_ref())?);
69                Ok(self.connection.as_mut().unwrap())
70            }
71            Some(ref mut conn) => Ok(conn),
72        }
73    }
74
75    fn handle_stopped(&mut self) {
76        self.prolonged_context = None;
77        self.packet_data.clear();
78        self.event_manager.stop();
79        self.event_manager.clear();
80    }
81
82    fn on_state_change(
83        &mut self,
84        locked: &mut Locked<Unlocked>,
85        new_state: TraceState,
86        current_task: &CurrentTask,
87    ) -> Result<(), anyhow::Error> {
88        let prev_state = self.prev_state;
89        self.prev_state = new_state;
90        log_debug!(
91            "Perfetto consumer state change. new_state: {new_state:?}, prev_state: {prev_state:?}"
92        );
93        match new_state {
94            TraceState::Started => {
95                if prev_state != TraceState::Stopped {
96                    // This means something unexpected has caused the trace_engine to change
97                    // states faster than we're processing the trace observer events.
98                    log_error!(
99                        "Started received in {prev_state:?} state! Cleaning up then starting."
100                    );
101                    self.handle_stopped();
102                }
103                self.prolonged_context = ProlongedContext::acquire();
104                let connection = self.connection(locked, current_task)?;
105                // A fixed set of data sources that may be of interest. As demand for other sources
106                // is found, add them here, and it may become worthwhile to allow this set to be
107                // configurable per trace session.
108                let mut data_sources = vec![
109                    DataSource {
110                        config: Some(DataSourceConfig {
111                            name: Some("track_event".to_string()),
112                            ..Default::default()
113                        }),
114                        ..Default::default()
115                    },
116                    DataSource {
117                        config: Some(DataSourceConfig {
118                            name: Some("android.surfaceflinger.frame".to_string()),
119                            target_buffer: Some(0),
120                            ..Default::default()
121                        }),
122                        ..Default::default()
123                    },
124                    DataSource {
125                        config: Some(DataSourceConfig {
126                            name: Some("android.surfaceflinger.frametimeline".to_string()),
127                            target_buffer: Some(0),
128                            ..Default::default()
129                        }),
130                        ..Default::default()
131                    },
132                ];
133                if category_enabled(CATEGORY_ATRACE) {
134                    data_sources.push(DataSource {
135                        config: Some(DataSourceConfig {
136                            name: Some("linux.ftrace".to_string()),
137                            ftrace_config: Some(FtraceConfig {
138                                ftrace_events: vec!["ftrace/print".to_string()],
139                                // Enable all supported atrace categories. This could be improved
140                                // in the future to be a subset that is configurable by each trace
141                                // session.
142                                atrace_categories: vec![
143                                    "am".to_string(),
144                                    "adb".to_string(),
145                                    "aidl".to_string(),
146                                    "dalvik".to_string(),
147                                    "audio".to_string(),
148                                    "binder_lock".to_string(),
149                                    "binder_driver".to_string(),
150                                    "bionic".to_string(),
151                                    "camera".to_string(),
152                                    "database".to_string(),
153                                    "gfx".to_string(),
154                                    "hal".to_string(),
155                                    "input".to_string(),
156                                    "network".to_string(),
157                                    "nnapi".to_string(),
158                                    "pm".to_string(),
159                                    "power".to_string(),
160                                    "rs".to_string(),
161                                    "res".to_string(),
162                                    "rro".to_string(),
163                                    "sched".to_string(),
164                                    "sm".to_string(),
165                                    "ss".to_string(),
166                                    "vibrator".to_string(),
167                                    "video".to_string(),
168                                    "view".to_string(),
169                                    "webview".to_string(),
170                                    "wm".to_string(),
171                                ],
172                                atrace_apps: vec!["*".to_string()],
173                                ..Default::default()
174                            }),
175                            ..Default::default()
176                        }),
177                        ..Default::default()
178                    });
179                }
180                connection.enable_tracing(
181                    locked,
182                    current_task,
183                    EnableTracingRequest {
184                        trace_config: Some(TraceConfig {
185                            buffers: vec![BufferConfig {
186                                size_kb: Some(PERFETTO_BUFFER_SIZE_KB),
187                                fill_policy: Some(FillPolicy::Discard.into()),
188                                ..Default::default()
189                            }],
190                            data_sources,
191                            ..Default::default()
192                        }),
193                        attach_notification_only: None,
194                    },
195                )?;
196                // Once tracing has started, notify the event manager so it can start tracking processes.
197                self.event_manager.start(current_task.kernel());
198            }
199            TraceState::Stopping | TraceState::Stopped => {
200                if prev_state == TraceState::Started {
201                    // We want to hold the prolonged context to ensure the trace session doesn't
202                    // exit out from under us, but we also want to ensure we drop the prolonged
203                    // context if we bail for whatever reason below.
204                    let _local_prolonged_context =
205                        std::mem::replace(&mut self.prolonged_context, None);
206                    let start_time = std::time::Instant::now();
207
208                    let connection = self.connection(locked, current_task)?;
209                    let disable_request = connection.disable_tracing(
210                        locked,
211                        current_task,
212                        DisableTracingRequest {},
213                    )?;
214                    loop {
215                        let frame = connection.next_frame_blocking(locked, current_task)?;
216                        if frame.request_id == Some(disable_request) {
217                            break;
218                        } else {
219                            log_error!(
220                                "Ignoring frame while looking for DisableTracingRequest: {frame:?}"
221                            );
222                        }
223                    }
224
225                    let read_buffers_request =
226                        connection.read_buffers(locked, current_task, ReadBuffersRequest {})?;
227
228                    let blob_name_ref = {
229                        let Some(context) = fuchsia_trace::Context::acquire() else {
230                            bail!("Tracing stopped despite holding prolonged context");
231                        };
232                        context.register_string_literal(NAME_PERFETTO_BLOB)
233                    };
234
235                    // IPC responses may be spread across multiple frames, so loop until we get a
236                    // message that indicates it is the last one. Additionally, if there are
237                    // unrelated messages on the socket (e.g. leftover from a previous trace
238                    // session), the loop will read past and ignore them.
239                    loop {
240                        let frame = self
241                            .connection(locked, current_task)?
242                            .next_frame_blocking(locked, current_task)?;
243                        if frame.request_id != Some(read_buffers_request) {
244                            continue;
245                        } else {
246                            log_debug!(
247                                "perfetto_consumer ignoring frame while looking for ReadBuffersRequest {read_buffers_request}: {frame:?}"
248                            );
249                        }
250                        if let Some(ipc_frame::Msg::MsgInvokeMethodReply(reply)) = &frame.msg {
251                            if let Ok(response) = decode_read_buffers_response(
252                                reply.reply_proto.as_deref().unwrap_or(&[]),
253                            ) {
254                                for slice in &response.slices {
255                                    if let Some(data) = &slice.data {
256                                        self.packet_data.extend(data);
257                                    }
258                                    if slice.last_slice_for_packet.unwrap_or(false) {
259                                        let mut blob_data = Vec::new();
260                                        // Packet field number = 1, length delimited type = 2.
261                                        blob_data.push(1 << 3 | 2);
262                                        // Push a varint encoded length.
263                                        // See https://protobuf.dev/programming-guides/encoding/
264                                        const HIGH_BIT: u8 = 0x80;
265                                        const LOW_SEVEN_BITS: usize = 0x7F;
266                                        let mut value = self.packet_data.len();
267                                        while value >= HIGH_BIT as usize {
268                                            blob_data
269                                                .push((value & LOW_SEVEN_BITS) as u8 | HIGH_BIT);
270                                            value >>= 7;
271                                        }
272                                        blob_data.push(value as u8);
273                                        // `append` moves all data out of the passed Vec, so
274                                        // s.packet_data will be empty after this call.
275                                        blob_data.append(&mut self.packet_data);
276
277                                        // At this point blob_data is a full Perfetto Trace protobuf.
278                                        // Parse the data and replace the linux pids with their
279                                        // corresponding koid.
280                                        let rewritten =
281                                            self.rewrite_pids(&blob_data).unwrap_or(blob_data);
282
283                                        // Ignore a failure to write the packet here. We don't
284                                        // return immediately because we want to allow the
285                                        // remaining records to be recorded as dropped.
286                                        //
287                                        // Once we fill a buffer in oneshot mode, we expect to drop
288                                        // the remaining packets here.
289                                        //
290                                        // Rather than logging here, allow the trace system to
291                                        // aggregate the number of records dropped and we can query
292                                        // the trace system later to determine if we dropped
293                                        // records when it's more efficient to do so.
294                                        let _ = self.forward_packet(blob_name_ref, rewritten);
295                                    }
296                                }
297                            } else {
298                                log_error!(
299                                    "perfetto_consumer cannot decode protobuf from {reply:?}"
300                                );
301                            }
302                            if reply.has_more != Some(true) {
303                                break;
304                            }
305                        } else {
306                            log_error!(
307                                "perfetto_consumer ignoring non-MsgInvokeMethodReply message: {frame:?}"
308                            );
309                        }
310                    }
311                    // The response to a free buffers request does not have anything meaningful,
312                    // so we don't need to worry about tracking the request id to match to the
313                    // response.
314                    let _free_buffers_request_id =
315                        self.connection(locked, current_task)?.free_buffers(
316                            locked,
317                            current_task,
318                            FreeBuffersRequest { buffer_ids: vec![0] },
319                        )?;
320                    let elapsed = start_time.elapsed().as_millis();
321                    log_info!(
322                        "Perfetto frames copied, dropping prolonged trace context. Processing took {elapsed} ms"
323                    );
324                } else {
325                    // If we receive a stop request and we don't think we're actually tracing, our
326                    // local state likely desynced from the global trace state. Clean up our state
327                    // and ensure we're stopped so we re-synchronize.
328                    self.handle_stopped();
329                }
330            }
331        }
332        Ok(())
333    }
334
335    // Forward `data` to the trace buffer by wrapping it in fxt blob records with the name
336    // `blob_name_ref`..
337    fn forward_packet(&self, blob_name_ref: trace_string_ref_t, data: Vec<u8>) -> Option<usize> {
338        // The blob data may be larger than what we can fit in a single record. If so, split it up
339        // over multiple chunks.
340        let mut bytes_written = 0;
341        let mut data_to_write = &data[..];
342
343        // We want to break the data into chunks:
344        // - Bigger chunks means less per-write overheader
345        // - Bigger chunks means less overhead due to blob meta
346        //
347        // However, too big and the blobs won't fit nicely into the trace buffer.
348        // The trace buffer is minimum 1MiB in size, so writing 4k at a time seems like a
349        // reasonable place to start that is both reasonably large and not going to leave a ton of
350        // space at the end of the trace buffer.
351        let max_chunk_size = 4096;
352        while !data_to_write.is_empty() {
353            let chunk_size = data_to_write.len().min(max_chunk_size);
354            let chunk = &data_to_write[..chunk_size];
355            self.forward_blob(blob_name_ref, &chunk)?;
356            data_to_write = &data_to_write[chunk_size..];
357            bytes_written += chunk_size;
358        }
359        Some(bytes_written)
360    }
361
362    // Given a blob name, wrap the data in an fxt perfetto blob and write it to the trace buffer.
363    fn forward_blob(&self, blob_name_ref: trace_string_ref_t, blob_data: &[u8]) -> Option<usize> {
364        let mut header = BlobHeader::empty();
365        header.set_name_ref(blob_name_ref.encoded_value);
366        header.set_payload_len(blob_data.len() as u16);
367        header.set_blob_format_type(BlobType::Perfetto.into());
368
369        let record_bytes = fxt::fxt_builder::FxtBuilder::new(header).atom(blob_data).build();
370        assert!(record_bytes.len() % std::mem::size_of::<u64>() == 0);
371        let num_words = record_bytes.len() / std::mem::size_of::<u64>();
372        let record_data = record_bytes.as_ptr();
373        #[allow(
374            clippy::undocumented_unsafe_blocks,
375            reason = "Force documented unsafe blocks in Starnix"
376        )]
377        let record_words =
378            unsafe { std::slice::from_raw_parts(record_data.cast::<u64>(), num_words) };
379
380        while let Some(context) = fuchsia_trace::Context::acquire() {
381            if let Some(bytes) = context.copy_record(record_words) {
382                return Some(bytes);
383            }
384            if context.buffering_mode() != BufferingMode::Streaming {
385                // If we're not in streaming mode, there will never be room for this record. Drop
386                // it.
387                return None;
388            }
389            // We're writing records pretty quick here, we're just forwarding data from
390            // perfetto with no breaks. trace_manager might not be able to keep up if it's also
391            // servicing other trace-providers. We want to back off we if find that we run out
392            // of space.
393            //
394            // We drop the context to decrement the refcount on the trace session. This allows
395            // trace-engine to switch the buffers if needed and drain out the buffers so that
396            // when we wake, there will hopefully be room.
397            //
398            // TODO(b/304532640)
399            drop(context);
400            std::thread::sleep(std::time::Duration::from_millis(100));
401        }
402        None
403    }
404
405    fn rewrite_pids(&mut self, protobuf_blob: &Vec<u8>) -> anyhow::Result<Vec<u8>> {
406        let mut proto = decode_trace(protobuf_blob.as_slice())?;
407        for p in &mut proto.packet {
408            if let Some(ref mut data) = p.data {
409                match data {
410                    trace_packet::Data::FrameTimelineEvent(frame_timeline_event) => {
411                        if let Some(evt) = &mut frame_timeline_event.event {
412                            // Update the linux pid to the Fuchsia pid. Each event has its own
413                            // match arm since the variant data is of a different type for each event.
414                            match evt {
415                                Event::ExpectedDisplayFrameStart(ExpectedDisplayFrameStart {
416                                    pid,
417                                    ..
418                                })
419                                | Event::ActualDisplayFrameStart(ActualDisplayFrameStart {
420                                    pid,
421                                    ..
422                                })
423                                | Event::ExpectedSurfaceFrameStart(ExpectedSurfaceFrameStart {
424                                    pid,
425                                    ..
426                                })
427                                | Event::ActualSurfaceFrameStart(ActualSurfaceFrameStart {
428                                    pid,
429                                    ..
430                                }) => {
431                                    pid.as_mut().map(|pid| {
432                                        *pid = self.map_to_koid_val(*pid);
433                                    });
434                                }
435                                Event::FrameEnd(_frame_end) => {}
436                            }
437                        }
438                    }
439                    trace_packet::Data::FtraceEvents(ftrace_bundle) => {
440                        for evt in &mut ftrace_bundle.event {
441                            if let Some(ref mut pid) = evt.pid {
442                                *pid = self.map_thread_to_koid_val(*pid as i32) as u32;
443                            }
444                            if let Some(ref mut event_data) = evt.event {
445                                match event_data {
446                                    Print(print) => {
447                                        if let Some(ref mut data) = print.buf {
448                                            *data = self.map_print_event(data)
449                                        }
450                                    }
451                                    _ => (),
452                                }
453                            }
454                        }
455                    }
456                    // No need to process other data; we only fixup data that references the pid.
457                    _ => (),
458                }
459            }
460        }
461        Ok(encode_trace(&proto))
462    }
463
464    fn map_print_event(&mut self, data: &String) -> String {
465        if let Some(mut event) = atrace::ATraceEvent::parse(&data) {
466            match event {
467                atrace::ATraceEvent::Begin { ref mut pid, .. }
468                | atrace::ATraceEvent::End { ref mut pid }
469                | atrace::ATraceEvent::Instant { ref mut pid, .. }
470                | atrace::ATraceEvent::AsyncBegin { ref mut pid, .. }
471                | atrace::ATraceEvent::AsyncEnd { ref mut pid, .. }
472                | atrace::ATraceEvent::Counter { ref mut pid, .. }
473                | atrace::ATraceEvent::AsyncTrackBegin { ref mut pid, .. }
474                | atrace::ATraceEvent::AsyncTrackEnd { ref mut pid, .. }
475                | atrace::ATraceEvent::Track { ref mut pid, .. } => {
476                    *pid = self.map_to_koid_val(*pid as i32) as u64
477                }
478            }
479            event.data()
480        } else {
481            data.to_string()
482        }
483    }
484
485    fn map_thread_to_koid_val(&mut self, pid: i32) -> i32 {
486        if pid == 0 {
487            return 0;
488        }
489        // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
490        // usually not an issue except for artificial koids which have the 2^63 bit set, such as
491        // virtual threads. This is consistent with the perfetto data importer code:
492        // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
493        self.event_manager.map_tid_to_koid(pid).raw_koid() as i32
494    }
495
496    fn map_to_koid_val(&mut self, pid: i32) -> i32 {
497        // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
498        // usually not an issue except for artificial koids which have the 2^63 bit set, such as
499        // virtual threads. This is consistent with the perfetto data importer code:
500        // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
501        if pid == 0 {
502            return 0;
503        }
504        self.event_manager.map_pid_to_koid(pid).raw_koid() as i32
505    }
506}
507
508pub fn start_perfetto_consumer_thread(kernel: &Kernel, socket_path: FsString) -> Result<(), Errno> {
509    // We unfortunately need to spawn a dedicated thread to run our async task.
510    //
511    // While the TraceObserver waits asynchronously, the interactions we do with Perfetto over the
512    // vfs::socket are blocking.
513    //
514    // It blocks in two scenarios:
515    // 1) When we forward a control plane request over the socket and block for a response. This is
516    //    for a few ms. See `perfetto::Consumer::enable_tracing`.
517    // 2) When a trace ends, we repeatedly do blocking reads on the socket until we read and
518    //    forward all the trace data. This servicing of trace data would hold the executor for
519    //    several seconds. See `perfetto::Consumer::next_frame_blocking`.
520    let closure = async move |locked_and_task: LockedAndTask<'_>| {
521        let observer = TraceObserver::new();
522        let mut callback_state = CallbackState {
523            prev_state: TraceState::Stopped,
524            socket_path,
525            connection: None,
526            prolonged_context: None,
527            packet_data: Vec::new(),
528            event_manager: TracePerformanceEventManager::new(),
529        };
530
531        fn handle_state_change(
532            callback_state: &mut CallbackState,
533            locked_and_task: &LockedAndTask<'_>,
534            state: TraceState,
535        ) -> Result<(), anyhow::Error> {
536            let current_task = locked_and_task.current_task();
537            let locked = &mut locked_and_task.unlocked();
538            // TODO: https://fxbug.dev/457381697 - Revise how this kernel-internal work is security-
539            // checked.
540            let creds = security::creds_start_internal_operation(current_task);
541            current_task.override_creds(creds, || {
542                callback_state.on_state_change(locked, state, current_task)
543            })
544        }
545
546        // Check for tracing already started before we began observing.
547        // This happens when tracing is started on boot.
548        let mut state = trace_state();
549        if trace_state() == TraceState::Started {
550            const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
551            // When we do boot tracing, it is possible (even likely), that starnix has started but
552            // perfetto may not be ready to be connected to yet.
553            // In that case poll until it has started.
554            loop {
555                match handle_state_change(&mut callback_state, &locked_and_task, state) {
556                    Ok(_) => break, // Success, exit loop.
557                    Err(e) => {
558                        if let Some(errno) = e.downcast_ref::<Errno>() {
559                            if errno == &starnix_uapi::errors::ENOENT
560                                || errno == &starnix_uapi::errors::ECONNREFUSED
561                            {
562                                log_warn!(
563                                    "perfetto_consumer initial start tracing failed because perfetto socket connection not established: {e:?} retrying in 5 seconds..."
564                                );
565                                std::thread::sleep(RETRY_DELAY);
566                                callback_state.prev_state = TraceState::Stopped;
567                                callback_state.connection = None;
568                                callback_state.prolonged_context = None;
569                                state = trace_state();
570                                continue; // Retry
571                            }
572                        }
573                        // For any other error, log and exit loop.
574                        log_error!(
575                            "perfetto_consumer initial start tracing failed with error: {e:?}"
576                        );
577                        break;
578                    }
579                }
580            }
581        }
582
583        while let Ok(state) = observer.on_state_changed().await {
584            handle_state_change(&mut callback_state, &locked_and_task, state).unwrap_or_else(|e| {
585                log_error!("perfetto_consumer state change callback error: {:?}", e);
586            })
587        }
588    };
589    let req = SpawnRequestBuilder::new()
590        .with_debug_name("perfetto-consumer")
591        .with_async_closure(closure)
592        .build();
593    kernel.kthreads.spawner().spawn_from_request(req);
594
595    Ok(())
596}