starnix_modules_perfetto_consumer/lib.rs
1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![recursion_limit = "256"]
6
7use anyhow::bail;
8use fuchsia_trace::{
9 BufferingMode, ProlongedContext, TraceState, category_enabled, trace_state, trace_string_ref_t,
10};
11use fuchsia_trace_observer::TraceObserver;
12use fxt::blob::{BlobHeader, BlobType};
13use perfetto_protos::perfetto::protos::trace_config::buffer_config::FillPolicy;
14use perfetto_protos::perfetto::protos::trace_config::{BufferConfig, DataSource};
15use perfetto_protos::perfetto::protos::{
16 DataSourceConfig, DisableTracingRequest, EnableTracingRequest, FreeBuffersRequest,
17 FtraceConfig, ReadBuffersRequest, TraceConfig, ipc_frame,
18};
19use perfetto_trace_protos::perfetto::protos::frame_timeline_event::{
20 ActualDisplayFrameStart, ActualSurfaceFrameStart, Event, ExpectedDisplayFrameStart,
21 ExpectedSurfaceFrameStart,
22};
23use perfetto_trace_protos::perfetto::protos::ftrace_event::Event::Print;
24use perfetto_trace_protos::perfetto::protos::trace_packet;
25use starnix_core::security;
26use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
27use starnix_core::task::tracing::TracePerformanceEventManager;
28use starnix_core::task::{CurrentTask, Kernel, LockedAndTask};
29use starnix_core::vfs::FsString;
30use starnix_logging::{
31 CATEGORY_ATRACE, NAME_PERFETTO_BLOB, log_debug, log_error, log_warn,
32};
33use starnix_perfetto_trace_decoder::{decode_read_buffers_response, decode_trace, encode_trace};
34use starnix_sync::{Locked, Unlocked};
35use starnix_uapi::errors::Errno;
36
37mod atrace;
38
39const PERFETTO_BUFFER_SIZE_KB: u32 = 63488;
40
41/// State needed to act upon trace state changes.
42struct CallbackState {
43 /// The previously observed trace state.
44 prev_state: TraceState,
45 /// Path to the Perfetto consumer socket.
46 socket_path: FsString,
47 /// Connection to the consumer socket, if it has been initialized. This gets initialized the
48 /// first time it is needed.
49 connection: Option<perfetto::Consumer>,
50 /// Prolonged trace context to prevent the Fuchsia trace session from terminating while reading
51 /// data from Perfetto.
52 prolonged_context: Option<ProlongedContext>,
53 /// Partial trace packet returned from Perfetto but not yet written to Fuchsia.
54 packet_data: Vec<u8>,
55
56 event_manager: TracePerformanceEventManager,
57}
58
59impl CallbackState {
60 fn connection(
61 &mut self,
62 locked: &mut Locked<Unlocked>,
63 current_task: &CurrentTask,
64 ) -> Result<&mut perfetto::Consumer, anyhow::Error> {
65 match self.connection {
66 None => {
67 self.connection =
68 Some(perfetto::Consumer::new(locked, current_task, self.socket_path.as_ref())?);
69 Ok(self.connection.as_mut().unwrap())
70 }
71 Some(ref mut conn) => Ok(conn),
72 }
73 }
74
75 fn handle_stopped(&mut self) {
76 self.prolonged_context = None;
77 self.packet_data.clear();
78 self.event_manager.stop();
79 self.event_manager.clear();
80 }
81
82 fn on_state_change(
83 &mut self,
84 locked: &mut Locked<Unlocked>,
85 new_state: TraceState,
86 current_task: &CurrentTask,
87 ) -> Result<(), anyhow::Error> {
88 let prev_state = self.prev_state;
89 self.prev_state = new_state;
90 log_debug!(
91 "Perfetto consumer state change. new_state: {new_state:?}, prev_state: {prev_state:?}"
92 );
93 match new_state {
94 TraceState::Started => {
95 if prev_state != TraceState::Stopped {
96 // This means something unexpected has caused the trace_engine to change
97 // states faster than we're processing the trace observer events.
98 log_error!(
99 "Started received in {prev_state:?} state! Cleaning up then starting."
100 );
101 self.handle_stopped();
102 }
103 self.prolonged_context = ProlongedContext::acquire();
104 let connection = self.connection(locked, current_task)?;
105 // A fixed set of data sources that may be of interest. As demand for other sources
106 // is found, add them here, and it may become worthwhile to allow this set to be
107 // configurable per trace session.
108 let mut data_sources = vec![
109 DataSource {
110 config: Some(DataSourceConfig {
111 name: Some("track_event".to_string()),
112 ..Default::default()
113 }),
114 ..Default::default()
115 },
116 DataSource {
117 config: Some(DataSourceConfig {
118 name: Some("android.surfaceflinger.frame".to_string()),
119 target_buffer: Some(0),
120 ..Default::default()
121 }),
122 ..Default::default()
123 },
124 DataSource {
125 config: Some(DataSourceConfig {
126 name: Some("android.surfaceflinger.frametimeline".to_string()),
127 target_buffer: Some(0),
128 ..Default::default()
129 }),
130 ..Default::default()
131 },
132 ];
133 if category_enabled(CATEGORY_ATRACE) {
134 data_sources.push(DataSource {
135 config: Some(DataSourceConfig {
136 name: Some("linux.ftrace".to_string()),
137 ftrace_config: Some(FtraceConfig {
138 ftrace_events: vec!["ftrace/print".to_string()],
139 // Enable all supported atrace categories. This could be improved
140 // in the future to be a subset that is configurable by each trace
141 // session.
142 atrace_categories: vec![
143 "am".to_string(),
144 "adb".to_string(),
145 "aidl".to_string(),
146 "dalvik".to_string(),
147 "audio".to_string(),
148 "binder_lock".to_string(),
149 "binder_driver".to_string(),
150 "bionic".to_string(),
151 "camera".to_string(),
152 "database".to_string(),
153 "gfx".to_string(),
154 "hal".to_string(),
155 "input".to_string(),
156 "network".to_string(),
157 "nnapi".to_string(),
158 "pm".to_string(),
159 "power".to_string(),
160 "rs".to_string(),
161 "res".to_string(),
162 "rro".to_string(),
163 "sched".to_string(),
164 "sm".to_string(),
165 "ss".to_string(),
166 "vibrator".to_string(),
167 "video".to_string(),
168 "view".to_string(),
169 "webview".to_string(),
170 "wm".to_string(),
171 ],
172 atrace_apps: vec!["*".to_string()],
173 ..Default::default()
174 }),
175 ..Default::default()
176 }),
177 ..Default::default()
178 });
179 }
180 connection.enable_tracing(
181 locked,
182 current_task,
183 EnableTracingRequest {
184 trace_config: Some(TraceConfig {
185 buffers: vec![BufferConfig {
186 size_kb: Some(PERFETTO_BUFFER_SIZE_KB),
187 fill_policy: Some(FillPolicy::Discard.into()),
188 ..Default::default()
189 }],
190 data_sources,
191 ..Default::default()
192 }),
193 attach_notification_only: None,
194 },
195 )?;
196 // Once tracing has started, notify the event manager so it can start tracking processes.
197 self.event_manager.start(current_task.kernel());
198 }
199 TraceState::Stopping | TraceState::Stopped => {
200 if prev_state == TraceState::Started {
201 // We want to hold the prolonged context to ensure the trace session doesn't
202 // exit out from under us, but we also want to ensure we drop the prolonged
203 // context if we bail for whatever reason below.
204 let _local_prolonged_context =
205 std::mem::replace(&mut self.prolonged_context, None);
206
207 let connection = self.connection(locked, current_task)?;
208 let disable_request = connection.disable_tracing(
209 locked,
210 current_task,
211 DisableTracingRequest {},
212 )?;
213 loop {
214 let frame = connection.next_frame_blocking(locked, current_task)?;
215 if frame.request_id == Some(disable_request) {
216 break;
217 } else {
218 log_error!(
219 "Ignoring frame while looking for DisableTracingRequest: {frame:?}"
220 );
221 }
222 }
223
224 let read_buffers_request =
225 connection.read_buffers(locked, current_task, ReadBuffersRequest {})?;
226
227 let blob_name_ref = {
228 let Some(context) = fuchsia_trace::Context::acquire() else {
229 bail!("Tracing stopped despite holding prolonged context");
230 };
231 context.register_string_literal(NAME_PERFETTO_BLOB)
232 };
233
234 // IPC responses may be spread across multiple frames, so loop until we get a
235 // message that indicates it is the last one. Additionally, if there are
236 // unrelated messages on the socket (e.g. leftover from a previous trace
237 // session), the loop will read past and ignore them.
238 loop {
239 let frame = self
240 .connection(locked, current_task)?
241 .next_frame_blocking(locked, current_task)?;
242 if frame.request_id != Some(read_buffers_request) {
243 continue;
244 } else {
245 log_debug!(
246 "perfetto_consumer ignoring frame while looking for ReadBuffersRequest {read_buffers_request}: {frame:?}"
247 );
248 }
249 if let Some(ipc_frame::Msg::MsgInvokeMethodReply(reply)) = &frame.msg {
250 if let Ok(response) = decode_read_buffers_response(
251 reply.reply_proto.as_deref().unwrap_or(&[]),
252 ) {
253 for slice in &response.slices {
254 if let Some(data) = &slice.data {
255 self.packet_data.extend(data);
256 }
257 if slice.last_slice_for_packet.unwrap_or(false) {
258 let mut blob_data = Vec::new();
259 // Packet field number = 1, length delimited type = 2.
260 blob_data.push(1 << 3 | 2);
261 // Push a varint encoded length.
262 // See https://protobuf.dev/programming-guides/encoding/
263 const HIGH_BIT: u8 = 0x80;
264 const LOW_SEVEN_BITS: usize = 0x7F;
265 let mut value = self.packet_data.len();
266 while value >= HIGH_BIT as usize {
267 blob_data
268 .push((value & LOW_SEVEN_BITS) as u8 | HIGH_BIT);
269 value >>= 7;
270 }
271 blob_data.push(value as u8);
272 // `append` moves all data out of the passed Vec, so
273 // s.packet_data will be empty after this call.
274 blob_data.append(&mut self.packet_data);
275
276 // At this point blob_data is a full Perfetto Trace protobuf.
277 // Parse the data and replace the linux pids with their
278 // corresponding koid.
279 let rewritten =
280 self.rewrite_pids(&blob_data).unwrap_or(blob_data);
281
282 // Ignore a failure to write the packet here. We don't
283 // return immediately because we want to allow the
284 // remaining records to be recorded as dropped.
285 //
286 // Once we fill a buffer in oneshot mode, we expect to drop
287 // the remaining packets here.
288 //
289 // Rather than logging here, allow the trace system to
290 // aggregate the number of records dropped and we can query
291 // the trace system later to determine if we dropped
292 // records when it's more efficient to do so.
293 let _ = self.forward_packet(blob_name_ref, rewritten);
294 }
295 }
296 } else {
297 log_error!(
298 "perfetto_consumer cannot decode protobuf from {reply:?}"
299 );
300 }
301 if reply.has_more != Some(true) {
302 break;
303 }
304 } else {
305 log_error!(
306 "perfetto_consumer ignoring non-MsgInvokeMethodReply message: {frame:?}"
307 );
308 }
309 }
310 // The response to a free buffers request does not have anything meaningful,
311 // so we don't need to worry about tracking the request id to match to the
312 // response.
313 let _free_buffers_request_id =
314 self.connection(locked, current_task)?.free_buffers(
315 locked,
316 current_task,
317 FreeBuffersRequest { buffer_ids: vec![0] },
318 )?;
319 } else {
320 // If we receive a stop request and we don't think we're actually tracing, our
321 // local state likely desynced from the global trace state. Clean up our state
322 // and ensure we're stopped so we re-synchronize.
323 self.handle_stopped();
324 }
325 }
326 }
327 Ok(())
328 }
329
330 // Forward `data` to the trace buffer by wrapping it in fxt blob records with the name
331 // `blob_name_ref`..
332 fn forward_packet(&self, blob_name_ref: trace_string_ref_t, data: Vec<u8>) -> Option<usize> {
333 // The blob data may be larger than what we can fit in a single record. If so, split it up
334 // over multiple chunks.
335 let mut bytes_written = 0;
336 let mut data_to_write = &data[..];
337
338 // We want to break the data into chunks:
339 // - Bigger chunks means less per-write overheader
340 // - Bigger chunks means less overhead due to blob meta
341 //
342 // However, too big and the blobs won't fit nicely into the trace buffer.
343 // The trace buffer is minimum 1MiB in size, so writing 4k at a time seems like a
344 // reasonable place to start that is both reasonably large and not going to leave a ton of
345 // space at the end of the trace buffer.
346 let max_chunk_size = 4096;
347 while !data_to_write.is_empty() {
348 let chunk_size = data_to_write.len().min(max_chunk_size);
349 let chunk = &data_to_write[..chunk_size];
350 self.forward_blob(blob_name_ref, &chunk)?;
351 data_to_write = &data_to_write[chunk_size..];
352 bytes_written += chunk_size;
353 }
354 Some(bytes_written)
355 }
356
357 // Given a blob name, wrap the data in an fxt perfetto blob and write it to the trace buffer.
358 fn forward_blob(&self, blob_name_ref: trace_string_ref_t, blob_data: &[u8]) -> Option<usize> {
359 let mut header = BlobHeader::empty();
360 header.set_name_ref(blob_name_ref.encoded_value);
361 header.set_payload_len(blob_data.len() as u16);
362 header.set_blob_format_type(BlobType::Perfetto.into());
363
364 let record_bytes = fxt::fxt_builder::FxtBuilder::new(header).atom(blob_data).build();
365 assert!(record_bytes.len() % std::mem::size_of::<u64>() == 0);
366 let num_words = record_bytes.len() / std::mem::size_of::<u64>();
367 let record_data = record_bytes.as_ptr();
368 #[allow(
369 clippy::undocumented_unsafe_blocks,
370 reason = "Force documented unsafe blocks in Starnix"
371 )]
372 let record_words =
373 unsafe { std::slice::from_raw_parts(record_data.cast::<u64>(), num_words) };
374
375 while let Some(context) = fuchsia_trace::Context::acquire() {
376 if let Some(bytes) = context.copy_record(record_words) {
377 return Some(bytes);
378 }
379 if context.buffering_mode() != BufferingMode::Streaming {
380 // If we're not in streaming mode, there will never be room for this record. Drop
381 // it.
382 return None;
383 }
384 // We're writing records pretty quick here, we're just forwarding data from
385 // perfetto with no breaks. trace_manager might not be able to keep up if it's also
386 // servicing other trace-providers. We want to back off we if find that we run out
387 // of space.
388 //
389 // We drop the context to decrement the refcount on the trace session. This allows
390 // trace-engine to switch the buffers if needed and drain out the buffers so that
391 // when we wake, there will hopefully be room.
392 //
393 // TODO(b/304532640)
394 drop(context);
395 std::thread::sleep(std::time::Duration::from_millis(100));
396 }
397 None
398 }
399
400 fn rewrite_pids(&mut self, protobuf_blob: &Vec<u8>) -> anyhow::Result<Vec<u8>> {
401 let mut proto = decode_trace(protobuf_blob.as_slice())?;
402 for p in &mut proto.packet {
403 if let Some(ref mut data) = p.data {
404 match data {
405 trace_packet::Data::FrameTimelineEvent(frame_timeline_event) => {
406 if let Some(evt) = &mut frame_timeline_event.event {
407 // Update the linux pid to the Fuchsia pid. Each event has its own
408 // match arm since the variant data is of a different type for each event.
409 match evt {
410 Event::ExpectedDisplayFrameStart(ExpectedDisplayFrameStart {
411 pid,
412 ..
413 })
414 | Event::ActualDisplayFrameStart(ActualDisplayFrameStart {
415 pid,
416 ..
417 })
418 | Event::ExpectedSurfaceFrameStart(ExpectedSurfaceFrameStart {
419 pid,
420 ..
421 })
422 | Event::ActualSurfaceFrameStart(ActualSurfaceFrameStart {
423 pid,
424 ..
425 }) => {
426 pid.as_mut().map(|pid| {
427 *pid = self.map_to_koid_val(*pid);
428 });
429 }
430 Event::FrameEnd(_frame_end) => {}
431 }
432 }
433 }
434 trace_packet::Data::FtraceEvents(ftrace_bundle) => {
435 for evt in &mut ftrace_bundle.event {
436 if let Some(ref mut pid) = evt.pid {
437 *pid = self.map_thread_to_koid_val(*pid as i32) as u32;
438 }
439 if let Some(ref mut event_data) = evt.event {
440 match event_data {
441 Print(print) => {
442 if let Some(ref mut data) = print.buf {
443 *data = self.map_print_event(data)
444 }
445 }
446 _ => (),
447 }
448 }
449 }
450 }
451 // No need to process other data; we only fixup data that references the pid.
452 _ => (),
453 }
454 }
455 }
456 Ok(encode_trace(&proto))
457 }
458
459 fn map_print_event(&mut self, data: &String) -> String {
460 if let Some(mut event) = atrace::ATraceEvent::parse(&data) {
461 match event {
462 atrace::ATraceEvent::Begin { ref mut pid, .. }
463 | atrace::ATraceEvent::End { ref mut pid }
464 | atrace::ATraceEvent::Instant { ref mut pid, .. }
465 | atrace::ATraceEvent::AsyncBegin { ref mut pid, .. }
466 | atrace::ATraceEvent::AsyncEnd { ref mut pid, .. }
467 | atrace::ATraceEvent::Counter { ref mut pid, .. }
468 | atrace::ATraceEvent::AsyncTrackBegin { ref mut pid, .. }
469 | atrace::ATraceEvent::AsyncTrackEnd { ref mut pid, .. }
470 | atrace::ATraceEvent::Track { ref mut pid, .. } => {
471 *pid = self.map_to_koid_val(*pid as i32) as u64
472 }
473 }
474 event.data()
475 } else {
476 data.to_string()
477 }
478 }
479
480 fn map_thread_to_koid_val(&mut self, pid: i32) -> i32 {
481 if pid == 0 {
482 return 0;
483 }
484 // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
485 // usually not an issue except for artificial koids which have the 2^63 bit set, such as
486 // virtual threads. This is consistent with the perfetto data importer code:
487 // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
488 self.event_manager.map_tid_to_koid(pid).raw_koid() as i32
489 }
490
491 fn map_to_koid_val(&mut self, pid: i32) -> i32 {
492 // Truncate the koid down to 32 bits in order to match the perfetto data schema. This is
493 // usually not an issue except for artificial koids which have the 2^63 bit set, such as
494 // virtual threads. This is consistent with the perfetto data importer code:
495 // https://github.com/google/perfetto/blob/c343c8a77c6e665c679e5c1ec845ac6dde0fc685/src/trace_processor/importers/fuchsia/fuchsia_trace_tokenizer.cc#L490
496 if pid == 0 {
497 return 0;
498 }
499 self.event_manager.map_pid_to_koid(pid).raw_koid() as i32
500 }
501}
502
503pub fn start_perfetto_consumer_thread(kernel: &Kernel, socket_path: FsString) -> Result<(), Errno> {
504 // We unfortunately need to spawn a dedicated thread to run our async task.
505 //
506 // While the TraceObserver waits asynchronously, the interactions we do with Perfetto over the
507 // vfs::socket are blocking.
508 //
509 // It blocks in two scenarios:
510 // 1) When we forward a control plane request over the socket and block for a response. This is
511 // for a few ms. See `perfetto::Consumer::enable_tracing`.
512 // 2) When a trace ends, we repeatedly do blocking reads on the socket until we read and
513 // forward all the trace data. This servicing of trace data would hold the executor for
514 // several seconds. See `perfetto::Consumer::next_frame_blocking`.
515 let closure = async move |locked_and_task: LockedAndTask<'_>| {
516 let observer = TraceObserver::new();
517 let mut callback_state = CallbackState {
518 prev_state: TraceState::Stopped,
519 socket_path,
520 connection: None,
521 prolonged_context: None,
522 packet_data: Vec::new(),
523 event_manager: TracePerformanceEventManager::new(),
524 };
525
526 fn handle_state_change(
527 callback_state: &mut CallbackState,
528 locked_and_task: &LockedAndTask<'_>,
529 state: TraceState,
530 ) -> Result<(), anyhow::Error> {
531 let current_task = locked_and_task.current_task();
532 let locked = &mut locked_and_task.unlocked();
533 // TODO: https://fxbug.dev/457381697 - Revise how this kernel-internal work is security-
534 // checked.
535 let creds = security::creds_start_internal_operation(current_task);
536 current_task.override_creds(creds, || {
537 callback_state.on_state_change(locked, state, current_task)
538 })
539 }
540
541 // Check for tracing already started before we began observing.
542 // This happens when tracing is started on boot.
543 let mut state = trace_state();
544 if trace_state() == TraceState::Started {
545 const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
546 // When we do boot tracing, it is possible (even likely), that starnix has started but
547 // perfetto may not be ready to be connected to yet.
548 // In that case poll until it has started.
549 loop {
550 match handle_state_change(&mut callback_state, &locked_and_task, state) {
551 Ok(_) => break, // Success, exit loop.
552 Err(e) => {
553 if let Some(errno) = e.downcast_ref::<Errno>() {
554 if errno == &starnix_uapi::errors::ENOENT
555 || errno == &starnix_uapi::errors::ECONNREFUSED
556 {
557 log_warn!(
558 "perfetto_consumer initial start tracing failed because perfetto socket connection not established: {e:?} retrying in 5 seconds..."
559 );
560 std::thread::sleep(RETRY_DELAY);
561 callback_state.prev_state = TraceState::Stopped;
562 callback_state.connection = None;
563 callback_state.prolonged_context = None;
564 state = trace_state();
565 continue; // Retry
566 }
567 }
568 // For any other error, log and exit loop.
569 log_error!(
570 "perfetto_consumer initial start tracing failed with error: {e:?}"
571 );
572 break;
573 }
574 }
575 }
576 }
577
578 while let Ok(state) = observer.on_state_changed().await {
579 handle_state_change(&mut callback_state, &locked_and_task, state).unwrap_or_else(|e| {
580 log_error!("perfetto_consumer state change callback error: {:?}", e);
581 })
582 }
583 };
584 let req = SpawnRequestBuilder::new()
585 .with_debug_name("perfetto-consumer")
586 .with_async_closure(closure)
587 .build();
588 kernel.kthreads.spawner().spawn_from_request(req);
589
590 Ok(())
591}