1use crate::config::SamplerConfig;
6use crate::project::Project;
7use anyhow::Error as AnyhowError;
8use argh::FromArgs;
9use diagnostics_reader::drain_batch_iterator;
10use fidl::endpoints::{ControlHandle, RequestStream, create_endpoints};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_hardware_power_statecontrol::{
13 ShutdownWatcherMarker, ShutdownWatcherRegisterMarker, ShutdownWatcherRequest,
14};
15use fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker;
16use fuchsia_component_client::connect_to_protocol;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use futures::future::{Either, select};
20use futures::stream::{self, StreamExt};
21use inspect_runtime::publish;
22use itertools::Itertools;
23use log::{info, warn};
24use sampler_component_config::Config;
25use std::sync::Arc;
26
27mod config;
28mod error;
29mod project;
30
31#[derive(Debug, Default, FromArgs, PartialEq)]
33#[argh(subcommand, name = "sampler")]
34pub struct Args {}
35
36pub const PROGRAM_NAME: &str = "sampler";
37
38pub async fn main() -> Result<(), AnyhowError> {
39 info!("Sampler starting up");
40 component::health().set_starting_up();
41
42 let _inspect = publish(component::inspector(), Default::default());
43
44 let execution_stats = component::inspector().root().create_child("sampler_executor_stats");
45 let config = SamplerConfig::new(Config::take_from_startup_handle(), &execution_stats)?;
46
47 let sampler = connect_to_protocol::<fdiagnostics::SampleMarker>()?;
48
49 for chunk in &config
50 .sample_data()
51 .into_iter()
52 .chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize)
53 {
54 sampler.set(&fdiagnostics::SampleParameters {
55 data: Some(chunk.collect()),
56 ..Default::default()
57 })?;
58 }
59
60 let (sample_sink_client, sample_sink_server) =
61 create_endpoints::<fdiagnostics::SampleSinkMarker>();
62
63 if let Err(e) = sampler.commit(sample_sink_client).await? {
64 match e {
65 fdiagnostics::ConfigurationError::SamplePeriodTooSmall => {
66 return Err(anyhow::anyhow!(
67 "Configured sample period was too small, indicating a config bug. Exiting."
68 ));
69 }
70 err => warn!(err:?; "Sampler encountered non-fatal error. Review Archivist's logs."),
71 }
72 }
73
74 let metric_logger_factory = connect_to_protocol::<MetricEventLoggerFactoryMarker>()?;
75
76 let mut projects = futures::stream::iter(config.project_configs)
77 .filter_map(|project_config| async {
78 let project_id = *project_config.project_id;
79 let stats = config.stats.projects.get(&project_config.project_id);
80 match Project::new(&metric_logger_factory, project_config, stats).await {
81 Ok(project) => Some(project),
82 Err(e) => {
83 warn!(
84 e:?,
85 project_id;
86 "Sampler failed to configure a project",
87 );
88 None
89 }
90 }
91 })
92 .collect::<Vec<_>>()
93 .await;
94
95 let (shutdown_watcher_client, shutdown_watcher_request_stream) =
96 fidl::endpoints::create_request_stream::<ShutdownWatcherMarker>();
97 let shutdown_watcher_register = connect_to_protocol::<ShutdownWatcherRegisterMarker>()?;
98 shutdown_watcher_register.register_watcher(shutdown_watcher_client).await?;
99
100 let sink_stream = sample_sink_server.into_stream();
101 let sample_sink_control = sink_stream.control_handle();
102 let mut sink_stream = sink_stream.fuse();
103 let mut shutdown_stream = Either::Left(shutdown_watcher_request_stream);
104 let mut shutdown = false;
105
106 component::health().set_ok();
107
108 loop {
109 match select(shutdown_stream.next(), sink_stream.next()).await {
110 Either::Left((shutdown_request, _)) => match shutdown_request {
111 Some(Ok(ShutdownWatcherRequest::OnShutdown { responder, .. })) => {
112 shutdown = true;
113 sample_sink_control.send_on_now_or_never()?;
114 responder.send()?;
115 }
116 Some(Ok(ShutdownWatcherRequest::_UnknownMethod { .. })) => {
117 warn!("Sampler encountered unknown method on ShutdownWatcher");
118 }
119 Some(Err(err)) => {
120 warn!(err:?; "Sampler encountered error on ShutdownWatcher, data may be missing");
121 }
122 None => {
123 shutdown_stream = Either::Right(stream::pending());
124 continue;
125 }
126 },
127 Either::Right((event, _)) => {
128 let Some(Ok(event)) = event else {
129 break;
130 };
131
132 handle_sample_sink_request(event, shutdown, &mut projects).await;
133
134 if shutdown {
135 break;
136 }
137 }
138 }
139 }
140
141 Ok(())
142}
143
144async fn handle_sample_sink_request(
145 event: fdiagnostics::SampleSinkRequest,
146 shutdown: bool,
147 projects: &mut [Project<'_>],
148) {
149 match event {
150 fdiagnostics::SampleSinkRequest::OnSampleReadied {
151 event:
152 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
153 batch_iter: Some(batch_iter),
154 seconds_since_start: Some(seconds_since_start),
155 ..
156 }),
157 control_handle: _control_handle,
158 } => {
159 let data = drain_batch_iterator::<diagnostics_data::InspectData>(Arc::new(
160 batch_iter.into_proxy(),
161 ))
162 .filter_map(|v| async {
163 match v {
164 Ok(v) => Some(v),
165 Err(e) => {
166 warn!(e:?; "Failed to read some Inspect data; skipping");
167 None
168 }
169 }
170 })
171 .collect::<Vec<_>>()
172 .await;
173
174 let seconds_since_start = if shutdown {
175 None
176 } else {
177 Some(zx::MonotonicDuration::from_seconds(seconds_since_start))
178 };
179
180 for project in projects {
181 if let Err(e) = project.log(&data, seconds_since_start).await {
182 warn!(e:?; "Project failed to log");
183 }
184
185 }
189 }
190 fdiagnostics::SampleSinkRequest::OnSampleReadied {
191 event:
192 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
193 batch_iter,
194 seconds_since_start,
195 ..
196 }),
197 control_handle,
198 } => {
199 control_handle.shutdown();
200 warn!(
201 batch_iter:?, seconds_since_start:?;
202 "Sample server sent Ready but crucial fields were None"
203 );
204 }
205 fdiagnostics::SampleSinkRequest::OnSampleReadied {
206 event: fdiagnostics::SampleSinkResult::Error(e),
207 ..
208 } => {
209 warn!(e:?; "Sample server sent an error, data may be missing");
210 }
211 fdiagnostics::SampleSinkRequest::OnSampleReadied {
212 event: fdiagnostics::SampleSinkResult::__SourceBreaking { .. },
213 control_handle,
214 }
215 | fdiagnostics::SampleSinkRequest::_UnknownMethod { control_handle, .. } => {
216 control_handle.shutdown();
217 warn!("Sample server sent a source-breaking or unknown event")
218 }
219 }
220}