1use crate::BUILD_CONFIG;
6use crate::fetcher::{PersistenceData, ServiceData, TagData};
7use crate::file_handler::{self, Timestamps};
8use anyhow::{Context, anyhow, bail};
9use fidl::endpoints::ControlHandle;
10use futures::{StreamExt, TryStreamExt};
11use hashbrown::HashMap;
12use itertools::Itertools;
13use log::{debug, error, warn};
14
15use persistence_config::{Config, ServiceName, Tag};
16use std::collections::VecDeque;
17use std::pin::pin;
18use std::sync::Arc;
19use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
20
21#[derive(Clone)]
28pub(crate) struct Scheduler {
29 tag_info: Arc<Vec<QuickTagInfo>>,
32}
33
34#[derive(thiserror::Error, Debug)]
36pub(crate) enum Error {
37 #[error("invalid selector")]
38 InvalidSelector(#[from] selectors::Error),
39 #[error("fidl error: {0:?}")]
40 Fidl(#[from] fidl::Error),
41 #[error("unable to configure Archivist sampling: {0:?}")]
42 UnableToSample(#[from] anyhow::Error),
43}
44
45impl Scheduler {
46 pub(crate) fn new(config: &Config) -> Self {
47 let tag_info = config
48 .clone()
49 .into_iter()
50 .flat_map(|(service, tags)| {
51 tags.into_iter().map(move |(tag, config)| QuickTagInfo {
52 service: service.clone(),
53 tag,
54 max_bytes: config.max_bytes,
55 selectors: config.selectors,
56 })
57 })
58 .collect::<Vec<_>>();
59
60 Self { tag_info: Arc::new(tag_info) }
61 }
62
63 pub(crate) async fn subscribe(
64 &self,
65 scope: fasync::ScopeHandle,
66 config: &Config,
67 ) -> Result<(), Error> {
68 let sample_datums = config
69 .values()
70 .flat_map(|tags| {
71 tags.values().flat_map(|tag| {
72 tag.selectors
73 .clone()
74 .into_iter()
75 .map(|selector| (selector, tag.min_seconds_between_fetch))
76 })
77 })
78 .map(|(selector, min_seconds_between_fetch)| fdiagnostics::SampleDatum {
80 selector: Some(fdiagnostics::SelectorArgument::StructuredSelector(selector)),
81 strategy: Some(fdiagnostics::SampleStrategy::OnDiff),
82 interval_secs: Some(min_seconds_between_fetch),
83 ..Default::default()
84 })
85 .collect::<Vec<_>>();
86
87 if sample_datums.is_empty() {
88 warn!("No tags configured; skipping subscription to fuchsia.diagnostics.Sample");
89 return Ok(());
90 }
91
92 let sample =
93 fuchsia_component::client::connect_to_protocol::<fdiagnostics::SampleMarker>()?;
94
95 for chunk in sample_datums.chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize) {
96 sample.set(&fdiagnostics::SampleParameters {
97 data: Some(chunk.to_vec()),
98 ..Default::default()
99 })?;
100 }
101
102 let (client_end, sample_sink_stream) =
103 fidl::endpoints::create_request_stream::<fdiagnostics::SampleSinkMarker>();
104
105 let scheduler = self.clone();
108 scope.spawn(async move {
109 if let Err(e) = scheduler.handle_sample_sink(sample_sink_stream).await {
110 error!("Error serving SampleSink: {e:?}");
111 }
112 });
113
114 sample
115 .commit(client_end)
116 .await?
117 .map_err(|e| anyhow!("failed to commit fuchsia.diagnostics.Sample config: {e:?}"))?;
118
119 Ok(())
120 }
121
122 pub(crate) async fn handle_sample_sink(
123 &self,
124 stream: fdiagnostics::SampleSinkRequestStream,
125 ) -> Result<(), anyhow::Error> {
126 let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
127 let mut stream = pin!(stream);
128 loop {
129 match stream.try_next().await {
130 Ok(Some(fdiagnostics::SampleSinkRequest::OnSampleReadied {
131 event,
132 control_handle,
133 })) => match event {
134 fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
135 batch_iter,
136 seconds_since_start: _,
137 __source_breaking: _,
138 }) => {
139 if let Some(iter) = batch_iter {
140 if let Err(e) = self.handle_sample_ready(iter).await {
141 warn!("Failed to process Sample: {e:?}");
142 }
143 } else {
144 bail!("expected BatchIterator, got None");
145 }
146 }
147 fdiagnostics::SampleSinkResult::Error(err) => {
148 control_handle.shutdown();
149 bail!("Failed receiving samples: {err:?}");
150 }
151 fdiagnostics::SampleSinkResult::__SourceBreaking { unknown_ordinal } => {
152 control_handle.shutdown();
153 bail!("unknown ordinal {unknown_ordinal}");
154 }
155 },
156 Ok(Some(req)) => {
157 warn!("Unknown SampleSinkRequest {req:?}");
158 }
159 Ok(None) => break,
160 Err(e) => bail!("Unexpected error handling SampleSinkRequest: {e}"),
161 }
162 }
163 if let Some(server_end) = stalled.await.context("FIDL error")? {
164 debug!("Escrowing fuchsia.diagnostics.SampleSink");
166 fuchsia_component::client::connect_channel_to_protocol_at_path(
167 server_end,
168 "/escrow/fuchsia.diagnostics.SampleSink",
169 )
170 .context("Failed to connect to fuchsia.diagnostics.SampleSink")?;
171 }
172 Ok(())
173 }
174
175 async fn handle_sample_ready(
176 &self,
177 iter: fidl::endpoints::ClientEnd<fdiagnostics::BatchIteratorMarker>,
178 ) -> Result<(), anyhow::Error> {
179 let timestamps = file_handler::Timestamps {
180 last_sample_boot: zx::BootInstant::get(),
181 last_sample_utc: fuchsia_runtime::utc_time(),
182 };
183
184 let proxy = Arc::new(iter.into_proxy());
185 let (snapshot, errs): (Vec<_>, Vec<_>) =
186 diagnostics_reader::drain_batch_iterator::<diagnostics_data::InspectData>(proxy)
187 .collect::<Vec<_>>()
188 .await
189 .into_iter()
190 .partition_result();
191 if !errs.is_empty() {
192 if snapshot.is_empty() {
193 bail!("failed reading all Inspect data: {errs:?}")
194 } else {
195 warn!("failed reading some Inspect data: {errs:?}")
196 }
197 }
198
199 let mut current_data = file_handler::current_data().await?.unwrap_or_default();
200
201 for tag_info in self.tag_info.iter() {
202 for data in snapshot.clone() {
203 match data.filter(&tag_info.selectors) {
204 Ok(Some(data)) => {
205 modify_tag_data(&mut current_data, tag_info, ×tamps, |tag_data| {
206 tag_data.merge(timestamps.clone(), data)
207 })
208 }
209 Ok(None) => {}
210 Err(e) => {
211 modify_tag_data(&mut current_data, tag_info, ×tamps, |tag_data| {
212 tag_data.add_error(e.to_string())
213 })
214 }
215 }
216 }
217 }
218
219 file_handler::write_current_data(¤t_data)
220 .context("Failed to write current data to disk")
221 }
222}
223
224fn modify_tag_data<'a>(
225 data: &'a mut PersistenceData,
226 lookup: &'a QuickTagInfo,
227 timestamps: &Timestamps,
228 modify_fn: impl FnOnce(&mut TagData),
229) {
230 let QuickTagInfo { service, tag, max_bytes, selectors } = lookup;
231
232 let service_data = data.entry_ref(service).or_insert_with(ServiceData::default);
233 let tag_data = service_data.entry_ref(tag).or_insert_with(|| TagData {
234 max_bytes: *max_bytes,
235 total_bytes: 0,
236 timestamps: timestamps.clone(),
237 selectors: selectors.clone(),
238 data: HashMap::new(),
239 errors: VecDeque::new(),
240 });
241
242 modify_fn(tag_data)
243}
244
245struct QuickTagInfo {
247 service: ServiceName,
248 tag: Tag,
249 max_bytes: usize,
250 selectors: Vec<fidl_fuchsia_diagnostics::Selector>,
251}