persistence/
scheduler.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::BUILD_CONFIG;
6use crate::fetcher::{PersistenceData, ServiceData, TagData};
7use crate::file_handler::{self, Timestamps};
8use anyhow::{Context, anyhow, bail};
9use fidl::endpoints::ControlHandle;
10use futures::{StreamExt, TryStreamExt};
11use hashbrown::HashMap;
12use itertools::Itertools;
13use log::{debug, error, warn};
14
15use persistence_config::{Config, ServiceName, Tag};
16use std::collections::VecDeque;
17use std::pin::pin;
18use std::sync::Arc;
19use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
20
21// This contains the logic to configure the Archivist to sample diagnostics data based on the
22// persistence configuration. It handles the `fuchsia.diagnostics.SampleSink` protocol to receive
23// signals when data is ready, reads the data, and persists it.
24
25/// Tracks when each tag was persisted last, as necessary for implementing
26/// debounce on [`TagConfig`]'s `min_seconds_between_fetch`.
27#[derive(Clone)]
28pub(crate) struct Scheduler {
29    /// Flat lookup table for corresponding OnSampleReady responses with
30    /// Persistence configured services/tags.
31    tag_info: Arc<Vec<QuickTagInfo>>,
32}
33
34/// Scheduler error.
35#[derive(thiserror::Error, Debug)]
36pub(crate) enum Error {
37    #[error("invalid selector")]
38    InvalidSelector(#[from] selectors::Error),
39    #[error("fidl error: {0:?}")]
40    Fidl(#[from] fidl::Error),
41    #[error("unable to configure Archivist sampling: {0:?}")]
42    UnableToSample(#[from] anyhow::Error),
43}
44
45impl Scheduler {
46    pub(crate) fn new(config: &Config) -> Self {
47        let tag_info = config
48            .clone()
49            .into_iter()
50            .flat_map(|(service, tags)| {
51                tags.into_iter().map(move |(tag, config)| QuickTagInfo {
52                    service: service.clone(),
53                    tag,
54                    max_bytes: config.max_bytes,
55                    selectors: config.selectors,
56                })
57            })
58            .collect::<Vec<_>>();
59
60        Self { tag_info: Arc::new(tag_info) }
61    }
62
63    pub(crate) async fn subscribe(
64        &self,
65        scope: fasync::ScopeHandle,
66        config: &Config,
67    ) -> Result<(), Error> {
68        let sample_datums = config
69            .values()
70            .flat_map(|tags| {
71                tags.values().flat_map(|tag| {
72                    tag.selectors
73                        .clone()
74                        .into_iter()
75                        .map(|selector| (selector, tag.min_seconds_between_fetch))
76                })
77            })
78            // Convert to SampleDatums.
79            .map(|(selector, min_seconds_between_fetch)| fdiagnostics::SampleDatum {
80                selector: Some(fdiagnostics::SelectorArgument::StructuredSelector(selector)),
81                strategy: Some(fdiagnostics::SampleStrategy::OnDiff),
82                interval_secs: Some(min_seconds_between_fetch),
83                ..Default::default()
84            })
85            .collect::<Vec<_>>();
86
87        if sample_datums.is_empty() {
88            warn!("No tags configured; skipping subscription to fuchsia.diagnostics.Sample");
89            return Ok(());
90        }
91
92        let sample =
93            fuchsia_component::client::connect_to_protocol::<fdiagnostics::SampleMarker>()?;
94
95        for chunk in sample_datums.chunks(fdiagnostics::MAX_SAMPLE_PARAMETERS_PER_SET as usize) {
96            sample.set(&fdiagnostics::SampleParameters {
97                data: Some(chunk.to_vec()),
98                ..Default::default()
99            })?;
100        }
101
102        let (client_end, sample_sink_stream) =
103            fidl::endpoints::create_request_stream::<fdiagnostics::SampleSinkMarker>();
104
105        // Start the SampleSink server before committing Sample configuration.
106        // Dropping the JoinHandle will detach it.
107        let scheduler = self.clone();
108        scope.spawn(async move {
109            if let Err(e) = scheduler.handle_sample_sink(sample_sink_stream).await {
110                error!("Error serving SampleSink: {e:?}");
111            }
112        });
113
114        sample
115            .commit(client_end)
116            .await?
117            .map_err(|e| anyhow!("failed to commit fuchsia.diagnostics.Sample config: {e:?}"))?;
118
119        Ok(())
120    }
121
122    pub(crate) async fn handle_sample_sink(
123        &self,
124        stream: fdiagnostics::SampleSinkRequestStream,
125    ) -> Result<(), anyhow::Error> {
126        let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
127        let mut stream = pin!(stream);
128        loop {
129            match stream.try_next().await {
130                Ok(Some(fdiagnostics::SampleSinkRequest::OnSampleReadied {
131                    event,
132                    control_handle,
133                })) => match event {
134                    fdiagnostics::SampleSinkResult::Ready(fdiagnostics::SampleReady {
135                        batch_iter,
136                        seconds_since_start: _,
137                        __source_breaking: _,
138                    }) => {
139                        if let Some(iter) = batch_iter {
140                            if let Err(e) = self.handle_sample_ready(iter).await {
141                                warn!("Failed to process Sample: {e:?}");
142                            }
143                        } else {
144                            bail!("expected BatchIterator, got None");
145                        }
146                    }
147                    fdiagnostics::SampleSinkResult::Error(err) => {
148                        control_handle.shutdown();
149                        bail!("Failed receiving samples: {err:?}");
150                    }
151                    fdiagnostics::SampleSinkResult::__SourceBreaking { unknown_ordinal } => {
152                        control_handle.shutdown();
153                        bail!("unknown ordinal {unknown_ordinal}");
154                    }
155                },
156                Ok(Some(req)) => {
157                    warn!("Unknown SampleSinkRequest {req:?}");
158                }
159                Ok(None) => break,
160                Err(e) => bail!("Unexpected error handling SampleSinkRequest: {e}"),
161            }
162        }
163        if let Some(server_end) = stalled.await.context("FIDL error")? {
164            // Send the server endpoint back to the framework.
165            debug!("Escrowing fuchsia.diagnostics.SampleSink");
166            fuchsia_component::client::connect_channel_to_protocol_at_path(
167                server_end,
168                "/escrow/fuchsia.diagnostics.SampleSink",
169            )
170            .context("Failed to connect to fuchsia.diagnostics.SampleSink")?;
171        }
172        Ok(())
173    }
174
175    async fn handle_sample_ready(
176        &self,
177        iter: fidl::endpoints::ClientEnd<fdiagnostics::BatchIteratorMarker>,
178    ) -> Result<(), anyhow::Error> {
179        let timestamps = file_handler::Timestamps {
180            last_sample_boot: zx::BootInstant::get(),
181            last_sample_utc: fuchsia_runtime::utc_time(),
182        };
183
184        let proxy = Arc::new(iter.into_proxy());
185        let (snapshot, errs): (Vec<_>, Vec<_>) =
186            diagnostics_reader::drain_batch_iterator::<diagnostics_data::InspectData>(proxy)
187                .collect::<Vec<_>>()
188                .await
189                .into_iter()
190                .partition_result();
191        if !errs.is_empty() {
192            if snapshot.is_empty() {
193                bail!("failed reading all Inspect data: {errs:?}")
194            } else {
195                warn!("failed reading some Inspect data: {errs:?}")
196            }
197        }
198
199        let mut current_data = file_handler::current_data().await?.unwrap_or_default();
200
201        for tag_info in self.tag_info.iter() {
202            for data in snapshot.clone() {
203                match data.filter(&tag_info.selectors) {
204                    Ok(Some(data)) => {
205                        modify_tag_data(&mut current_data, tag_info, &timestamps, |tag_data| {
206                            tag_data.merge(timestamps.clone(), data)
207                        })
208                    }
209                    Ok(None) => {}
210                    Err(e) => {
211                        modify_tag_data(&mut current_data, tag_info, &timestamps, |tag_data| {
212                            tag_data.add_error(e.to_string())
213                        })
214                    }
215                }
216            }
217        }
218
219        file_handler::write_current_data(&current_data)
220            .context("Failed to write current data to disk")
221    }
222}
223
224fn modify_tag_data<'a>(
225    data: &'a mut PersistenceData,
226    lookup: &'a QuickTagInfo,
227    timestamps: &Timestamps,
228    modify_fn: impl FnOnce(&mut TagData),
229) {
230    let QuickTagInfo { service, tag, max_bytes, selectors } = lookup;
231
232    let service_data = data.entry_ref(service).or_insert_with(ServiceData::default);
233    let tag_data = service_data.entry_ref(tag).or_insert_with(|| TagData {
234        max_bytes: *max_bytes,
235        total_bytes: 0,
236        timestamps: timestamps.clone(),
237        selectors: selectors.clone(),
238        data: HashMap::new(),
239        errors: VecDeque::new(),
240    });
241
242    modify_fn(tag_data)
243}
244
245/// Minimal set of information to perform quick lookups of tags.
246struct QuickTagInfo {
247    service: ServiceName,
248    tag: Tag,
249    max_bytes: usize,
250    selectors: Vec<fidl_fuchsia_diagnostics::Selector>,
251}