trace_task/
trace_task.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::triggers::{Trigger, TriggerAction, TriggersWatcher};
6use crate::{TracingError, trace_shutdown};
7use async_lock::Mutex;
8use fidl::AsyncSocket;
9use fidl_fuchsia_tracing_controller::{self as trace, StopResult, TraceConfig};
10use fuchsia_async::Task;
11use futures::io::AsyncWrite;
12use futures::prelude::*;
13use futures::task::{Context as FutContext, Poll};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU64};
17use std::time::{Duration, Instant};
18
19static SERIAL: AtomicU64 = AtomicU64::new(100);
20
21#[derive(Debug)]
22pub struct TraceTask {
23    /// Unique identifier for this task. The value of this id monotonicallly increases.
24    task_id: u64,
25    /// Tag used to identify this task in the log.
26    debug_tag: String,
27    /// Trace configuration.
28    config: trace::TraceConfig,
29    /// Requested categories. These are unexpanded from the user.
30    requested_categories: Vec<String>,
31    /// Duration to capture trace. None indicates capture until canceled.
32    duration: Option<Duration>,
33    /// Triggers for terminating the trace.
34    triggers: Vec<Trigger>,
35    /// True when the task is cleaning up.
36    terminating: Arc<AtomicBool>,
37    /// Start time of the task.
38    start_time: Instant,
39    /// Channel used to shutdown this task.
40    shutdown_sender: async_channel::Sender<()>,
41    /// The task.
42    task: Task<Option<trace::StopResult>>,
43    /// The socket to read the trace data from when tracing is completed.
44    read_socket: AsyncSocket,
45}
46
47// This is just implemented for convenience so the wrapper is await-able.
48impl Future for TraceTask {
49    type Output = Option<trace::StopResult>;
50
51    fn poll(mut self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll<Self::Output> {
52        Pin::new(&mut self.task).poll(cx)
53    }
54}
55
56impl TraceTask {
57    pub async fn new(
58        debug_tag: String,
59        config: trace::TraceConfig,
60        duration: Option<Duration>,
61        triggers: Vec<Trigger>,
62        requested_categories: Option<Vec<String>>,
63        provisioner: trace::ProvisionerProxy,
64    ) -> Result<Self, TracingError> {
65        // Start the tracing session immediately. Maybe we should consider separating the creating
66        // of the session and the actual starting of it. This seems like a side-effect.
67        let task_id = SERIAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
68        let (client, server) = fidl::Socket::create_stream();
69        let client = fidl::AsyncSocket::from_socket(client);
70        let (client_end, server_end) = fidl::endpoints::create_proxy::<trace::SessionMarker>();
71        provisioner.initialize_tracing(server_end, &config, server)?;
72
73        client_end
74            .start_tracing(&trace::StartOptions::default())
75            .await?
76            .map_err(Into::<TracingError>::into)?;
77
78        let logging_prefix_og = format!("Task {task_id} ({debug_tag})");
79        let terminate_result = Arc::new(Mutex::new(None));
80        let (shutdown_sender, shutdown_receiver) = async_channel::bounded::<()>(1);
81
82        let controller = client_end.clone();
83        let shutdown_controller = client_end.clone();
84        let triggers_watcher =
85            TriggersWatcher::new(controller, triggers.clone(), shutdown_receiver);
86        let terminating = Arc::new(AtomicBool::new(false));
87        let terminating_clone = terminating.clone();
88        let terminate_result_clone = terminate_result.clone();
89        let shutdown_fut = {
90            let logging_prefix = logging_prefix_og.clone();
91            async move {
92                if terminating_clone
93                    .compare_exchange(
94                        false,
95                        true,
96                        std::sync::atomic::Ordering::SeqCst,
97                        std::sync::atomic::Ordering::Relaxed,
98                    )
99                    .is_ok()
100                {
101                    log::info!("{logging_prefix} Running shutdown future.");
102                    let result = trace_shutdown(&shutdown_controller).await;
103
104                    let mut done = terminate_result_clone.lock().await;
105                    if done.is_none() {
106                        match result {
107                            Ok(stop) => {
108                                log::info!("{logging_prefix} call to trace_shutdown successful.");
109                                *done = Some(stop)
110                            }
111                            Err(e) => {
112                                log::error!(
113                                    "{logging_prefix} call to trace_shutdown failed: {e:?}"
114                                );
115                            }
116                        }
117                    }
118                } else {
119                    log::debug!("Shutdown already triggered");
120                }
121                "shutdown future completed"
122            }
123        };
124
125        Ok(Self {
126            task_id,
127            debug_tag: logging_prefix_og,
128            config,
129            duration,
130            triggers: triggers.clone(),
131            terminating,
132            requested_categories: requested_categories.unwrap_or_default(),
133            start_time: Instant::now(),
134            shutdown_sender,
135            read_socket: client,
136            task: Self::make_task(
137                task_id,
138                debug_tag,
139                duration,
140                shutdown_fut,
141                triggers_watcher,
142                terminate_result,
143            ),
144        })
145    }
146
147    /// Shutdown the tracing task.
148    async fn shutdown(self) -> Result<trace::StopResult, TracingError> {
149        if !self.terminating.load(std::sync::atomic::Ordering::SeqCst) {
150            log::info!("{} Sending shutdown message.", self.debug_tag);
151            if self.shutdown_sender.send(()).await.is_err() {
152                log::warn!(
153                    "{} Shutdown channel was closed. Task may have already completed.",
154                    self.debug_tag
155                );
156            }
157        } else {
158            log::debug!("{} Shutdown already in progress.", self.debug_tag);
159        }
160
161        self.await
162            .map(|r| Ok(r))
163            .unwrap_or_else(|| Err(TracingError::RecordingStop("Error awaiting".into())))
164    }
165
166    fn make_task(
167        task_id: u64,
168        debug_tag: String,
169        duration: Option<Duration>,
170        shutdown_fut: impl Future<Output = &'static str> + 'static + std::marker::Send,
171        trigger_watcher: TriggersWatcher<'static>,
172        terminate_result: Arc<Mutex<Option<StopResult>>>,
173    ) -> Task<Option<trace::StopResult>> {
174        Task::local(async move {
175            let mut timeout_fut = Box::pin(async move {
176                if let Some(duration) = duration {
177                    fuchsia_async::Timer::new(duration).await;
178                } else {
179                    std::future::pending::<()>().await;
180                }
181            })
182            .fuse();
183            let mut trigger_fut = trigger_watcher.fuse();
184
185            futures::select! {
186                // Timeout, clean up and wait for copying to finish.
187                _ = timeout_fut => {
188                    log::info!("Trace {task_id} (debug_tag): timeout of {} successfully completed. Stopping and cleaning up.",
189                     duration.map(|d| format!("{} secs", d.as_secs())).unwrap_or_else(|| "infinite?".into()));
190
191                    shutdown_fut.await;
192                     log::debug!("done with timeout!");
193
194                }
195
196                // Trigger hit, shutdown and copy the trace.
197                action = trigger_fut => {
198                    if let Some(action) = action {
199                        match action {
200                            TriggerAction::Terminate => {
201                                log::info!("Task {task_id} ({debug_tag}): received terminate trigger");
202                            }
203                        }
204                    } else {
205                        // This usually means the proxy was closed.
206                        log::debug!("Task {task_id} ({debug_tag}): Trigger future completed without an action!");
207                    }
208                    shutdown_fut.await;
209                     log::debug!("done with trigger future!");
210                }
211            };
212            log::debug!("end of task waiting for terminate_result lock");
213            let res = terminate_result.lock().await.clone();
214            log::debug!("got res in task is some: {}", res.is_some());
215            res
216        })
217    }
218
219    pub fn triggers(&self) -> Vec<Trigger> {
220        self.triggers.clone()
221    }
222    pub fn config(&self) -> TraceConfig {
223        self.config.clone()
224    }
225
226    pub fn start_time(&self) -> Instant {
227        self.start_time
228    }
229
230    pub fn duration(&self) -> Option<Duration> {
231        self.duration.clone()
232    }
233
234    pub fn requested_categories(&self) -> Vec<String> {
235        self.requested_categories.clone()
236    }
237
238    pub fn task_id(&self) -> u64 {
239        self.task_id
240    }
241
242    /// Signals the trace session to stop, copies all trace data to the
243    /// provided writer, and awaits task completion.
244    pub async fn stop_and_receive_data<W>(
245        self,
246        mut writer: W,
247    ) -> Result<trace::StopResult, TracingError>
248    where
249        W: AsyncWrite + Unpin + Send + 'static,
250    {
251        if !self.terminating.load(std::sync::atomic::Ordering::SeqCst) {
252            log::info!("{} Sending shutdown message for task", self.debug_tag);
253            if self.shutdown_sender.send(()).await.is_err() {
254                log::warn!(
255                    "{} Shutdown channel was closed. Task may have already completed.",
256                    self.debug_tag
257                );
258            }
259        } else {
260            log::debug!("{} Shutdown already in progress.", self.debug_tag);
261        }
262
263        let res = futures::io::copy(&self.read_socket, &mut writer)
264            .await
265            .map_err(|e| TracingError::GeneralError(format!("{e:?}")));
266
267        if res.is_ok() { self.shutdown().await } else { Err(res.err().unwrap()) }
268    }
269
270    /// Waits for the tracing task to complete and copies the trace data to the writer.
271    /// If the tracing should be stopped vs. waiting, call |stop_and_receive_data|.
272    pub async fn await_completion_and_receive_data<W>(
273        self,
274        mut writer: W,
275    ) -> Result<StopResult, TracingError>
276    where
277        W: AsyncWrite + Unpin + Send + 'static,
278    {
279        match futures::io::copy(&self.read_socket, &mut writer)
280            .await
281            .map_err(|e| TracingError::RecordingStop(e.to_string()))
282        {
283            Ok(_) => match self.await {
284                Some(r) => Ok(r),
285                None => Err(TracingError::RecordingStop("could not await task".into())),
286            },
287            Err(e) => Err(e),
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use fidl_fuchsia_tracing_controller::StartError;
296
297    const FAKE_CONTROLLER_TRACE_OUTPUT: &'static str = "HOWDY HOWDY HOWDY";
298
299    fn setup_fake_provisioner_proxy(
300        start_error: Option<StartError>,
301        trigger_name: Option<&'static str>,
302    ) -> trace::ProvisionerProxy {
303        let (proxy, mut stream) =
304            fidl::endpoints::create_proxy_and_stream::<trace::ProvisionerMarker>();
305        fuchsia_async::Task::local(async move {
306            while let Ok(Some(req)) = stream.try_next().await {
307                match req {
308                    trace::ProvisionerRequest::InitializeTracing { controller, output, .. } => {
309                        let mut stream = controller.into_stream();
310                        while let Ok(Some(req)) = stream.try_next().await {
311                            match req {
312                                trace::SessionRequest::StartTracing { responder, .. } => {
313                                    let response = match start_error {
314                                        Some(e) => Err(e),
315                                        None => Ok(()),
316                                    };
317                                    responder.send(response).expect("Failed to start")
318                                }
319                                trace::SessionRequest::StopTracing { responder, payload } => {
320                                    if start_error.is_some() {
321                                        responder
322                                            .send(Err(trace::StopError::NotStarted))
323                                            .expect("Failed to stop")
324                                    } else {
325                                        assert_eq!(payload.write_results.unwrap(), true);
326                                        assert_eq!(
327                                            FAKE_CONTROLLER_TRACE_OUTPUT.len(),
328                                            output
329                                                .write(FAKE_CONTROLLER_TRACE_OUTPUT.as_bytes())
330                                                .unwrap()
331                                        );
332                                        let stop_result = trace::StopResult {
333                                            provider_stats: Some(vec![]),
334                                            ..Default::default()
335                                        };
336                                        responder.send(Ok(&stop_result)).expect("Failed to stop")
337                                    }
338                                    break;
339                                }
340                                trace::SessionRequest::WatchAlert { responder } => {
341                                    responder
342                                        .send(trigger_name.unwrap_or(""))
343                                        .expect("Unable to send alert");
344                                }
345                                r => panic!("unexpected request: {:#?}", r),
346                            }
347                        }
348                    }
349                    r => panic!("unexpected request: {:#?}", r),
350                }
351            }
352        })
353        .detach();
354        proxy
355    }
356
357    #[fuchsia::test]
358    async fn test_trace_task_start_stop_write_check_with_vec() {
359        let provisioner = setup_fake_provisioner_proxy(None, None);
360
361        let trace_task = TraceTask::new(
362            "test_trace_start_stop_write_check".into(),
363            trace::TraceConfig::default(),
364            None,
365            vec![],
366            None,
367            provisioner,
368        )
369        .await
370        .expect("tracing task started");
371
372        let shutdown_result = trace_task.shutdown().await.expect("tracing shutdown");
373        assert_eq!(
374            shutdown_result,
375            trace::StopResult { provider_stats: Some(vec![]), ..Default::default() }.into()
376        );
377    }
378
379    #[cfg(not(target_os = "fuchsia"))]
380    #[fuchsia::test]
381    async fn test_trace_task_start_stop_write_check_with_file() {
382        let temp_dir = tempfile::TempDir::new().unwrap();
383        let output = temp_dir.path().join("trace-test.fxt");
384
385        let provisioner = setup_fake_provisioner_proxy(None, None);
386        let writer = async_fs::File::create(&output).await.unwrap();
387
388        let trace_task = TraceTask::new(
389            "test_trace_start_stop_write_check".into(),
390            trace::TraceConfig::default(),
391            None,
392            vec![],
393            None,
394            provisioner,
395        )
396        .await
397        .expect("tracing task started");
398
399        let shutdown_result =
400            trace_task.stop_and_receive_data(writer).await.expect("tracing shutdown");
401
402        let res = async_fs::read_to_string(&output).await.unwrap();
403        assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
404        let expected = trace::StopResult { provider_stats: Some(vec![]), ..Default::default() };
405        assert_eq!(shutdown_result, expected);
406    }
407
408    #[fuchsia::test]
409    async fn test_trace_error_handling_already_started() {
410        let provisioner = setup_fake_provisioner_proxy(Some(StartError::AlreadyStarted), None);
411
412        let trace_task_result = TraceTask::new(
413            "test_trace_error_handling_already_started".into(),
414            trace::TraceConfig::default(),
415            None,
416            vec![],
417            None,
418            provisioner,
419        )
420        .await
421        .err();
422
423        assert_eq!(trace_task_result, Some(TracingError::RecordingAlreadyStarted));
424    }
425
426    #[cfg(not(target_os = "fuchsia"))]
427    #[fuchsia::test]
428    async fn test_trace_task_start_with_duration() {
429        let temp_dir = tempfile::TempDir::new().unwrap();
430        let output = temp_dir.path().join("trace-test.fxt");
431
432        let provisioner = setup_fake_provisioner_proxy(None, None);
433        let writer = async_fs::File::create(&output).await.unwrap();
434
435        let trace_task = TraceTask::new(
436            "test_trace_task_start_with_duration".into(),
437            trace::TraceConfig::default(),
438            Some(Duration::from_millis(100)),
439            vec![],
440            None,
441            provisioner,
442        )
443        .await
444        .expect("tracing task started");
445
446        let res = trace_task.await_completion_and_receive_data(writer).await;
447        if let Some(ref stop_result) = res.as_ref().ok() {
448            assert!(stop_result.provider_stats.is_some());
449        } else {
450            panic!("Expected stop result from trace_task.await: {res:?}");
451        }
452
453        let mut f = async_fs::File::open(std::path::PathBuf::from(output)).await.unwrap();
454        let mut res = String::new();
455        f.read_to_string(&mut res).await.unwrap();
456        assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
457    }
458
459    #[cfg(not(target_os = "fuchsia"))]
460    #[fuchsia::test]
461    async fn test_triggers_valid() {
462        let temp_dir = tempfile::TempDir::new().unwrap();
463        let output = temp_dir.path().join("trace-test.fxt");
464        let alert_name = "some_alert";
465        let provisioner = setup_fake_provisioner_proxy(None, Some(alert_name.into()));
466        let writer = async_fs::File::create(output.clone()).await.unwrap();
467
468        let trace_task = TraceTask::new(
469            "test_triggers_valid".into(),
470            trace::TraceConfig::default(),
471            None,
472            vec![Trigger {
473                alert: Some(alert_name.into()),
474                action: Some(TriggerAction::Terminate),
475            }],
476            None,
477            provisioner,
478        )
479        .await
480        .expect("tracing task started");
481
482        trace_task.await_completion_and_receive_data(writer).await.unwrap();
483        let res = async_fs::read_to_string(&output).await.unwrap();
484        assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
485    }
486}