Skip to main content

sampler/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13    ShutdownWatcherMarker, ShutdownWatcherRegisterMarker, ShutdownWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31/// Arguments used to configure sampler.
32#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39    info!("Sampler starting up");
40    component::health().set_starting_up();
41
42    let _inspect = publish(component::inspector(), Default::default());
43
44    let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45    let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47    let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49    for chunk in &config
50        .sample_data()
51        .into_iter()
52        .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53    {
54        sampler.set(&fdiagnostics::SampleParameters {
55            data: Some(chunk.collect()),
56            ..Default::default()
57        })?;
58    }
59
60    let (sample_sink_client, sample_sink_server) =
61        create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63    if let Err(e) = sampler.commit(sample_sink_client).await? {
64        match e {
65            fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66                return Err(anyhow::anyhow!(
67                    "Configured sample period was too small, indicating a config bug. Exiting."
68                ));
69            }
70            err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71        }
72    }
73
74    let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76    let mut projects = futures::stream::iter(config.project_configs)
77        .filter_map(|project_config| async {
78            let project_id = *project_config.project_id;
79            let stats = config.stats.projects.get(&project_config.project_id);
80            match Project::new(&metric_logger_factory, project_config, stats).await {
81                Ok(project) => Some(project),
82                Err(e) => {
83                    warn!(
84                        e:?,
85                        project_id;
86                        "Sampler failed to configure a project",
87                    );
88                    None
89                }
90            }
91        })
92        .collect::<Vec<_>>()
93        .await;
94
95    let (shutdown_watcher_client, shutdown_watcher_request_stream) =
96        fidl::endpoints::create_request_stream::<ShutdownWatcherMarker>();
97    let shutdown_watcher_register = connect_to_protocol::<ShutdownWatcherRegisterMarker>()?;
98    shutdown_watcher_register.register_watcher(shutdown_watcher_client).await?;
99
100    let sink_stream = sample_sink_server.into_stream();
101    let sample_sink_control = sink_stream.control_handle();
102    let mut sink_stream = sink_stream.fuse();
103    let mut shutdown_stream = Either::Left(shutdown_watcher_request_stream);
104    let mut shutdown = false;
105
106    component::health().set_ok();
107
108    loop {
109        match select(shutdown_stream.next(), sink_stream.next()).await {
110            Either::Left((shutdown_request, _)) => match shutdown_request {
111                Some(Ok(ShutdownWatcherRequest::OnShutdown { responder, .. })) => {
112                    shutdown = true;
113                    sample_sink_control.send_on_now_or_never()?;
114                    responder.send()?;
115                }
116                Some(Ok(ShutdownWatcherRequest::_UnknownMethod { .. })) => {
117                    warn!("Sampler encountered unknown method on ShutdownWatcher");
118                }
119                Some(Err(err)) => {
120                    warn!(err:?; "Sampler encountered error on ShutdownWatcher, data may be missing");
121                }
122                None => {
123                    shutdown_stream = Either::Right(stream::pending());
124                    continue;
125                }
126            },
127            Either::Right((event, _)) => {
128                let Some(Ok(event)) = event else {
129                    break;
130                };
131
132                handle_sample_sink_request(event, shutdown, &mut projects).await;
133
134                if shutdown {
135                    break;
136                }
137            }
138        }
139    }
140
141    Ok(())
142}
143
144async fn handle_sample_sink_request(
145    event: fdiagnostics::SampleSinkRequest,
146    shutdown: bool,
147    projects: &mut [Project<'_>],
148) {
149    match event {
150        fdiagnostics::SampleSinkRequest::OnSampleReadied {
151            event:
152                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
153                    batch_iter: Some(batch_iter),
154                    seconds_since_start: Some(seconds_since_start),
155                    ..
156                }),
157            control_handle: _control_handle,
158        } => {
159            let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
160                batch_iter.into_proxy(),
161            ))
162            .filter_map(|v| async {
163                match v {
164                    Ok(v) => Some(v),
165                    Err(e) => {
166                        warn!(e:?; "Failed to read some Inspect data; skipping");
167                        None
168                    }
169                }
170            })
171            .collect::<Vec<_>>()
172            .await;
173
174            let seconds_since_start = if shutdown {
175                None
176            } else {
177                Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
178            };
179
180            for project in projects {
181                if let Err(e) = project.log(&data, seconds_since_start).await {
182                    warn!(e:?; "Project failed to log");
183                }
184
185                // TODO: b/440153294 - Update SampleSink to allow removing selectors during
186                // runtime. These selectors are returned by Project::log. That will reduce
187                // the load on Archivist, but is not required for correctness of Sampler.
188            }
189        }
190        fdiagnostics::SampleSinkRequest::OnSampleReadied {
191            event:
192                fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
193                    batch_iter,
194                    seconds_since_start,
195                    ..
196                }),
197            control_handle,
198        } => {
199            control_handle.shutdown();
200            warn!(
201                batch_iter:?, seconds_since_start:?;
202                "Sample server sent Ready but crucial fields were None"
203            );
204        }
205        fdiagnostics::SampleSinkRequest::OnSampleReadied {
206            event: fdiagnostics::SampleSinkResult::Error(e),
207            ..
208        } => {
209            warn!(e:?; "Sample server sent an error, data may be missing");
210        }
211        fdiagnostics::SampleSinkRequest::OnSampleReadied {
212            event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
213            control_handle,
214        }
215        | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
216            control_handle.shutdown();
217            warn!("Sample server sent a source-breaking or unknown event")
218        }
219    }
220}