1use crate::triggers::{Trigger, TriggerAction, TriggersWatcher};
6use crate::{TracingError, trace_shutdown};
7use async_lock::Mutex;
8use fidl::AsyncSocket;
9use fidl_fuchsia_tracing_controller::{self as trace, StopResult, TraceConfig};
10use fuchsia_async::Task;
11use futures::io::AsyncWrite;
12use futures::prelude::*;
13use futures::task::{Context as FutContext, Poll};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU64};
17use std::time::{Duration, Instant};
18
19static SERIAL: AtomicU64 = AtomicU64::new(100);
20
21#[derive(Debug)]
22pub struct TraceTask {
23 task_id: u64,
25 debug_tag: String,
27 config: trace::TraceConfig,
29 requested_categories: Vec<String>,
31 duration: Option<Duration>,
33 triggers: Vec<Trigger>,
35 terminating: Arc<AtomicBool>,
37 start_time: Instant,
39 shutdown_sender: async_channel::Sender<()>,
41 task: Task<Option<trace::StopResult>>,
43 read_socket: AsyncSocket,
45}
46
47impl Future for TraceTask {
49 type Output = Option<trace::StopResult>;
50
51 fn poll(mut self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll<Self::Output> {
52 Pin::new(&mut self.task).poll(cx)
53 }
54}
55
56impl TraceTask {
57 pub async fn new(
58 debug_tag: String,
59 config: trace::TraceConfig,
60 duration: Option<Duration>,
61 triggers: Vec<Trigger>,
62 requested_categories: Option<Vec<String>>,
63 provisioner: trace::ProvisionerProxy,
64 ) -> Result<Self, TracingError> {
65 let task_id = SERIAL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
68 let (client, server) = fidl::Socket::create_stream();
69 let client = fidl::AsyncSocket::from_socket(client);
70 let (client_end, server_end) = fidl::endpoints::create_proxy::<trace::SessionMarker>();
71 provisioner.initialize_tracing(server_end, &config, server)?;
72
73 client_end
74 .start_tracing(&trace::StartOptions::default())
75 .await?
76 .map_err(Into::<TracingError>::into)?;
77
78 let logging_prefix_og = format!("Task {task_id} ({debug_tag})");
79 let terminate_result = Arc::new(Mutex::new(None));
80 let (shutdown_sender, shutdown_receiver) = async_channel::bounded::<()>(1);
81
82 let controller = client_end.clone();
83 let shutdown_controller = client_end.clone();
84 let triggers_watcher =
85 TriggersWatcher::new(controller, triggers.clone(), shutdown_receiver);
86 let terminating = Arc::new(AtomicBool::new(false));
87 let terminating_clone = terminating.clone();
88 let terminate_result_clone = terminate_result.clone();
89 let shutdown_fut = {
90 let logging_prefix = logging_prefix_og.clone();
91 async move {
92 if terminating_clone
93 .compare_exchange(
94 false,
95 true,
96 std::sync::atomic::Ordering::SeqCst,
97 std::sync::atomic::Ordering::Relaxed,
98 )
99 .is_ok()
100 {
101 log::info!("{logging_prefix} Running shutdown future.");
102 let result = trace_shutdown(&shutdown_controller).await;
103
104 let mut done = terminate_result_clone.lock().await;
105 if done.is_none() {
106 match result {
107 Ok(stop) => {
108 log::info!("{logging_prefix} call to trace_shutdown successful.");
109 *done = Some(stop)
110 }
111 Err(e) => {
112 log::error!(
113 "{logging_prefix} call to trace_shutdown failed: {e:?}"
114 );
115 }
116 }
117 }
118 } else {
119 log::debug!("Shutdown already triggered");
120 }
121 "shutdown future completed"
122 }
123 };
124
125 Ok(Self {
126 task_id,
127 debug_tag: logging_prefix_og,
128 config,
129 duration,
130 triggers: triggers.clone(),
131 terminating,
132 requested_categories: requested_categories.unwrap_or_default(),
133 start_time: Instant::now(),
134 shutdown_sender,
135 read_socket: client,
136 task: Self::make_task(
137 task_id,
138 debug_tag,
139 duration,
140 shutdown_fut,
141 triggers_watcher,
142 terminate_result,
143 ),
144 })
145 }
146
147 async fn shutdown(self) -> Result<trace::StopResult, TracingError> {
149 if !self.terminating.load(std::sync::atomic::Ordering::SeqCst) {
150 log::info!("{} Sending shutdown message.", self.debug_tag);
151 if self.shutdown_sender.send(()).await.is_err() {
152 log::warn!(
153 "{} Shutdown channel was closed. Task may have already completed.",
154 self.debug_tag
155 );
156 }
157 } else {
158 log::debug!("{} Shutdown already in progress.", self.debug_tag);
159 }
160
161 self.await
162 .map(|r| Ok(r))
163 .unwrap_or_else(|| Err(TracingError::RecordingStop("Error awaiting".into())))
164 }
165
166 fn make_task(
167 task_id: u64,
168 debug_tag: String,
169 duration: Option<Duration>,
170 shutdown_fut: impl Future<Output = &'static str> + 'static + std::marker::Send,
171 trigger_watcher: TriggersWatcher<'static>,
172 terminate_result: Arc<Mutex<Option<StopResult>>>,
173 ) -> Task<Option<trace::StopResult>> {
174 Task::local(async move {
175 let mut timeout_fut = Box::pin(async move {
176 if let Some(duration) = duration {
177 fuchsia_async::Timer::new(duration).await;
178 } else {
179 std::future::pending::<()>().await;
180 }
181 })
182 .fuse();
183 let mut trigger_fut = trigger_watcher.fuse();
184
185 futures::select! {
186 _ = timeout_fut => {
188 log::info!("Trace {task_id} (debug_tag): timeout of {} successfully completed. Stopping and cleaning up.",
189 duration.map(|d| format!("{} secs", d.as_secs())).unwrap_or_else(|| "infinite?".into()));
190
191 shutdown_fut.await;
192 log::debug!("done with timeout!");
193
194 }
195
196 action = trigger_fut => {
198 if let Some(action) = action {
199 match action {
200 TriggerAction::Terminate => {
201 log::info!("Task {task_id} ({debug_tag}): received terminate trigger");
202 }
203 }
204 } else {
205 log::debug!("Task {task_id} ({debug_tag}): Trigger future completed without an action!");
207 }
208 shutdown_fut.await;
209 log::debug!("done with trigger future!");
210 }
211 };
212 log::debug!("end of task waiting for terminate_result lock");
213 let res = terminate_result.lock().await.clone();
214 log::debug!("got res in task is some: {}", res.is_some());
215 res
216 })
217 }
218
219 pub fn triggers(&self) -> Vec<Trigger> {
220 self.triggers.clone()
221 }
222 pub fn config(&self) -> TraceConfig {
223 self.config.clone()
224 }
225
226 pub fn start_time(&self) -> Instant {
227 self.start_time
228 }
229
230 pub fn duration(&self) -> Option<Duration> {
231 self.duration.clone()
232 }
233
234 pub fn requested_categories(&self) -> Vec<String> {
235 self.requested_categories.clone()
236 }
237
238 pub fn task_id(&self) -> u64 {
239 self.task_id
240 }
241
242 pub async fn stop_and_receive_data<W>(
245 self,
246 mut writer: W,
247 ) -> Result<trace::StopResult, TracingError>
248 where
249 W: AsyncWrite + Unpin + Send + 'static,
250 {
251 if !self.terminating.load(std::sync::atomic::Ordering::SeqCst) {
252 log::info!("{} Sending shutdown message for task", self.debug_tag);
253 if self.shutdown_sender.send(()).await.is_err() {
254 log::warn!(
255 "{} Shutdown channel was closed. Task may have already completed.",
256 self.debug_tag
257 );
258 }
259 } else {
260 log::debug!("{} Shutdown already in progress.", self.debug_tag);
261 }
262
263 let res = futures::io::copy(&self.read_socket, &mut writer)
264 .await
265 .map_err(|e| TracingError::GeneralError(format!("{e:?}")));
266
267 if res.is_ok() { self.shutdown().await } else { Err(res.err().unwrap()) }
268 }
269
270 pub async fn await_completion_and_receive_data<W>(
273 self,
274 mut writer: W,
275 ) -> Result<StopResult, TracingError>
276 where
277 W: AsyncWrite + Unpin + Send + 'static,
278 {
279 match futures::io::copy(&self.read_socket, &mut writer)
280 .await
281 .map_err(|e| TracingError::RecordingStop(e.to_string()))
282 {
283 Ok(_) => match self.await {
284 Some(r) => Ok(r),
285 None => Err(TracingError::RecordingStop("could not await task".into())),
286 },
287 Err(e) => Err(e),
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use fidl_fuchsia_tracing_controller::StartError;
296
297 const FAKE_CONTROLLER_TRACE_OUTPUT: &'static str = "HOWDY HOWDY HOWDY";
298
299 fn setup_fake_provisioner_proxy(
300 start_error: Option<StartError>,
301 trigger_name: Option<&'static str>,
302 ) -> trace::ProvisionerProxy {
303 let (proxy, mut stream) =
304 fidl::endpoints::create_proxy_and_stream::<trace::ProvisionerMarker>();
305 fuchsia_async::Task::local(async move {
306 while let Ok(Some(req)) = stream.try_next().await {
307 match req {
308 trace::ProvisionerRequest::InitializeTracing { controller, output, .. } => {
309 let mut stream = controller.into_stream();
310 while let Ok(Some(req)) = stream.try_next().await {
311 match req {
312 trace::SessionRequest::StartTracing { responder, .. } => {
313 let response = match start_error {
314 Some(e) => Err(e),
315 None => Ok(()),
316 };
317 responder.send(response).expect("Failed to start")
318 }
319 trace::SessionRequest::StopTracing { responder, payload } => {
320 if start_error.is_some() {
321 responder
322 .send(Err(trace::StopError::NotStarted))
323 .expect("Failed to stop")
324 } else {
325 assert_eq!(payload.write_results.unwrap(), true);
326 assert_eq!(
327 FAKE_CONTROLLER_TRACE_OUTPUT.len(),
328 output
329 .write(FAKE_CONTROLLER_TRACE_OUTPUT.as_bytes())
330 .unwrap()
331 );
332 let stop_result = trace::StopResult {
333 provider_stats: Some(vec![]),
334 ..Default::default()
335 };
336 responder.send(Ok(&stop_result)).expect("Failed to stop")
337 }
338 break;
339 }
340 trace::SessionRequest::WatchAlert { responder } => {
341 responder
342 .send(trigger_name.unwrap_or(""))
343 .expect("Unable to send alert");
344 }
345 r => panic!("unexpected request: {:#?}", r),
346 }
347 }
348 }
349 r => panic!("unexpected request: {:#?}", r),
350 }
351 }
352 })
353 .detach();
354 proxy
355 }
356
357 #[fuchsia::test]
358 async fn test_trace_task_start_stop_write_check_with_vec() {
359 let provisioner = setup_fake_provisioner_proxy(None, None);
360
361 let trace_task = TraceTask::new(
362 "test_trace_start_stop_write_check".into(),
363 trace::TraceConfig::default(),
364 None,
365 vec![],
366 None,
367 provisioner,
368 )
369 .await
370 .expect("tracing task started");
371
372 let shutdown_result = trace_task.shutdown().await.expect("tracing shutdown");
373 assert_eq!(
374 shutdown_result,
375 trace::StopResult { provider_stats: Some(vec![]), ..Default::default() }.into()
376 );
377 }
378
379 #[cfg(not(target_os = "fuchsia"))]
380 #[fuchsia::test]
381 async fn test_trace_task_start_stop_write_check_with_file() {
382 let temp_dir = tempfile::TempDir::new().unwrap();
383 let output = temp_dir.path().join("trace-test.fxt");
384
385 let provisioner = setup_fake_provisioner_proxy(None, None);
386 let writer = async_fs::File::create(&output).await.unwrap();
387
388 let trace_task = TraceTask::new(
389 "test_trace_start_stop_write_check".into(),
390 trace::TraceConfig::default(),
391 None,
392 vec![],
393 None,
394 provisioner,
395 )
396 .await
397 .expect("tracing task started");
398
399 let shutdown_result =
400 trace_task.stop_and_receive_data(writer).await.expect("tracing shutdown");
401
402 let res = async_fs::read_to_string(&output).await.unwrap();
403 assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
404 let expected = trace::StopResult { provider_stats: Some(vec![]), ..Default::default() };
405 assert_eq!(shutdown_result, expected);
406 }
407
408 #[fuchsia::test]
409 async fn test_trace_error_handling_already_started() {
410 let provisioner = setup_fake_provisioner_proxy(Some(StartError::AlreadyStarted), None);
411
412 let trace_task_result = TraceTask::new(
413 "test_trace_error_handling_already_started".into(),
414 trace::TraceConfig::default(),
415 None,
416 vec![],
417 None,
418 provisioner,
419 )
420 .await
421 .err();
422
423 assert_eq!(trace_task_result, Some(TracingError::RecordingAlreadyStarted));
424 }
425
426 #[cfg(not(target_os = "fuchsia"))]
427 #[fuchsia::test]
428 async fn test_trace_task_start_with_duration() {
429 let temp_dir = tempfile::TempDir::new().unwrap();
430 let output = temp_dir.path().join("trace-test.fxt");
431
432 let provisioner = setup_fake_provisioner_proxy(None, None);
433 let writer = async_fs::File::create(&output).await.unwrap();
434
435 let trace_task = TraceTask::new(
436 "test_trace_task_start_with_duration".into(),
437 trace::TraceConfig::default(),
438 Some(Duration::from_millis(100)),
439 vec![],
440 None,
441 provisioner,
442 )
443 .await
444 .expect("tracing task started");
445
446 let res = trace_task.await_completion_and_receive_data(writer).await;
447 if let Some(ref stop_result) = res.as_ref().ok() {
448 assert!(stop_result.provider_stats.is_some());
449 } else {
450 panic!("Expected stop result from trace_task.await: {res:?}");
451 }
452
453 let mut f = async_fs::File::open(std::path::PathBuf::from(output)).await.unwrap();
454 let mut res = String::new();
455 f.read_to_string(&mut res).await.unwrap();
456 assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
457 }
458
459 #[cfg(not(target_os = "fuchsia"))]
460 #[fuchsia::test]
461 async fn test_triggers_valid() {
462 let temp_dir = tempfile::TempDir::new().unwrap();
463 let output = temp_dir.path().join("trace-test.fxt");
464 let alert_name = "some_alert";
465 let provisioner = setup_fake_provisioner_proxy(None, Some(alert_name.into()));
466 let writer = async_fs::File::create(output.clone()).await.unwrap();
467
468 let trace_task = TraceTask::new(
469 "test_triggers_valid".into(),
470 trace::TraceConfig::default(),
471 None,
472 vec![Trigger {
473 alert: Some(alert_name.into()),
474 action: Some(TriggerAction::Terminate),
475 }],
476 None,
477 provisioner,
478 )
479 .await
480 .expect("tracing task started");
481
482 trace_task.await_completion_and_receive_data(writer).await.unwrap();
483 let res = async_fs::read_to_string(&output).await.unwrap();
484 assert_eq!(res, FAKE_CONTROLLER_TRACE_OUTPUT.to_string());
485 }
486}