persistence/
fetcher.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::file_handler::Timestamps;
6use diagnostics_data::{Data, Inspect};
7use hashbrown::HashMap;
8use persistence_config::{ServiceName, Tag};
9use serde::{Deserialize, Deserializer, Serialize, Serializer};
10use std::collections::VecDeque;
11use std::io;
12use std::ops::{Deref, DerefMut};
13
14/// Maximum number of errors to keep for each tag.
15pub(crate) const MAX_TAG_ERRORS: usize = 5;
16/// Maximum string length of an error stored on disk.
17const MAX_ERROR_LEN: usize = 50;
18
19#[derive(Default, Debug, Serialize, Deserialize)]
20pub(crate) struct PersistenceData(pub HashMap<ServiceName, ServiceData>);
21
22impl Deref for PersistenceData {
23    type Target = HashMap<ServiceName, ServiceData>;
24
25    fn deref(&self) -> &Self::Target {
26        &self.0
27    }
28}
29
30impl DerefMut for PersistenceData {
31    fn deref_mut(&mut self) -> &mut Self::Target {
32        &mut self.0
33    }
34}
35
36#[derive(Default, Debug, Serialize, Deserialize)]
37pub(crate) struct ServiceData(pub HashMap<Tag, TagData>);
38
39impl Deref for ServiceData {
40    type Target = HashMap<Tag, TagData>;
41
42    fn deref(&self) -> &Self::Target {
43        &self.0
44    }
45}
46
47impl DerefMut for ServiceData {
48    fn deref_mut(&mut self) -> &mut Self::Target {
49        &mut self.0
50    }
51}
52
53#[derive(Debug, Serialize, Deserialize)]
54pub(crate) struct TagData {
55    pub data: HashMap<ExtendedMoniker, Data<Inspect>>,
56    pub errors: VecDeque<String>,
57    pub timestamps: Timestamps,
58    pub total_bytes: usize,
59    pub max_bytes: usize,
60    #[serde(with = "selectors_ext::inspect")]
61    pub selectors: Vec<fidl_fuchsia_diagnostics::Selector>,
62}
63
64impl TagData {
65    pub fn merge(&mut self, timestamps: Timestamps, data: Data<Inspect>) {
66        if self.total_bytes > self.max_bytes {
67            // Merging is an additive operation and this tag already went over
68            // its size quota. Do nothing.
69            return;
70        }
71
72        self.timestamps.merge(timestamps);
73
74        let moniker = ExtendedMoniker(data.moniker.clone());
75        if let Some(existing) = self.data.get_mut(&moniker) {
76            existing.merge(data);
77        } else {
78            self.data.insert(moniker, data);
79        }
80
81        self.calculate_total_bytes();
82    }
83
84    fn calculate_total_bytes(&mut self) {
85        self.total_bytes = 0;
86
87        for data in self.data.values() {
88            let data_len = data
89                .payload
90                .as_ref()
91                .and_then(|d| if d.name == "root" { d.children.first() } else { Some(d) })
92                .map(|d| {
93                    let mut counter = ByteCounter::default();
94                    serde_json::to_writer(&mut counter, d).map(|()| counter.count)
95                })
96                .unwrap_or(Ok(0));
97
98            self.total_bytes += match data_len {
99                Ok(len) => len,
100                Err(e) => {
101                    self.data.clear();
102                    self.add_error(format!("Unexpected serialize error: {e}"));
103                    return;
104                }
105            };
106        }
107
108        if self.total_bytes > self.max_bytes {
109            self.data.clear();
110            self.add_error(format!(
111                "Data too big: {} > max length {}",
112                self.total_bytes, self.max_bytes
113            ));
114        }
115    }
116
117    /// Add an error to the queue with safeguards to prevent error spam from
118    /// filling up the disk.
119    pub fn add_error(&mut self, mut e: String) {
120        e.truncate(MAX_ERROR_LEN);
121        self.errors.push_front(e);
122        self.errors.truncate(MAX_TAG_ERRORS);
123    }
124}
125
126#[derive(Eq, Ord, PartialOrd, PartialEq, Debug, Clone, Hash)]
127pub(crate) struct ExtendedMoniker(diagnostics_data::ExtendedMoniker);
128
129impl From<diagnostics_data::ExtendedMoniker> for ExtendedMoniker {
130    fn from(value: diagnostics_data::ExtendedMoniker) -> Self {
131        Self(value)
132    }
133}
134
135impl Deref for ExtendedMoniker {
136    type Target = diagnostics_data::ExtendedMoniker;
137
138    fn deref(&self) -> &Self::Target {
139        &self.0
140    }
141}
142
143impl Serialize for ExtendedMoniker {
144    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
145    where
146        S: Serializer,
147    {
148        serializer.collect_str(&self.0)
149    }
150}
151
152impl<'de> Deserialize<'de> for ExtendedMoniker {
153    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
154    where
155        D: Deserializer<'de>,
156    {
157        let moniker_str = String::deserialize(deserializer)?;
158        diagnostics_data::ExtendedMoniker::parse_str(&moniker_str)
159            .map(Self)
160            .map_err(serde::de::Error::custom)
161    }
162}
163
164/// ByteCounter is a no-op writer that counts the number of bytes written to it.
165#[derive(Default)]
166struct ByteCounter {
167    count: usize,
168}
169
170impl io::Write for ByteCounter {
171    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
172        self.count = self
173            .count
174            .checked_add(buf.len())
175            .ok_or_else::<io::Error, _>(|| io::Error::from(io::ErrorKind::FileTooLarge))?;
176        Ok(buf.len())
177    }
178    fn flush(&mut self) -> io::Result<()> {
179        Ok(())
180    }
181}
182
183#[cfg(test)]
184mod test {
185    use super::*;
186    use diagnostics_data::{InspectDataBuilder, Timestamp, hierarchy};
187    use hashbrown::HashMap;
188    use std::collections::VecDeque;
189
190    fn make_tag_data(max_bytes: usize) -> TagData {
191        TagData {
192            data: HashMap::new(),
193            errors: VecDeque::new(),
194            timestamps: make_timestamps(0),
195            total_bytes: 0,
196            max_bytes,
197            selectors: vec![],
198        }
199    }
200
201    fn make_timestamps(nanos: i64) -> Timestamps {
202        Timestamps {
203            last_sample_boot: zx::BootInstant::from_nanos(nanos),
204            last_sample_utc: fuchsia_runtime::UtcInstant::from_nanos(nanos),
205        }
206    }
207
208    #[fuchsia::test]
209    fn test_tag_data_merge_ignores_when_full() {
210        let mut tag_data = make_tag_data(10); // Small limit to trigger overflow easily
211
212        let moniker = diagnostics_data::ExtendedMoniker::parse_str("moniker").unwrap();
213        tag_data.merge(
214            make_timestamps(100),
215            InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(100))
216                .with_hierarchy(hierarchy! { root: { child: { x: 1 } } }) // Size will be > 10
217                .build(),
218        );
219
220        assert!(tag_data.total_bytes > tag_data.max_bytes);
221        assert_eq!(tag_data.data, HashMap::new());
222        assert!(!tag_data.errors.is_empty());
223
224        let initial_errors_count = tag_data.errors.len();
225
226        // Second merge should be ignored
227        let data_ignored =
228            InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(200))
229                .with_hierarchy(hierarchy! { root: { child: { x: 2 } } })
230                .build();
231
232        // Even with newer timestamp, it should be ignored
233        tag_data.merge(make_timestamps(200), data_ignored);
234
235        // State should be unchanged; check timestamp wasn't updated
236        assert_eq!(tag_data.timestamps.last_sample_boot.into_nanos(), 100);
237        // Errors count shouldn't change since no new error were added
238        assert_eq!(tag_data.errors.len(), initial_errors_count);
239    }
240
241    #[fuchsia::test]
242    fn test_tag_data_merge_distinct_monikers() {
243        let mut tag_data = make_tag_data(1000);
244
245        let moniker1 = diagnostics_data::ExtendedMoniker::parse_str("moniker1").unwrap();
246        let data1 = InspectDataBuilder::new(moniker1.clone(), "url", Timestamp::from_nanos(100))
247            .with_hierarchy(hierarchy! { root: { child: { x: 1 } } })
248            .build();
249
250        let moniker2 = diagnostics_data::ExtendedMoniker::parse_str("moniker2").unwrap();
251        let data2 = InspectDataBuilder::new(moniker2.clone(), "url", Timestamp::from_nanos(100))
252            .with_hierarchy(hierarchy! { root: { child: { x: 2 } } })
253            .build();
254
255        tag_data.merge(make_timestamps(100), data1.clone());
256        let size_1 = tag_data.total_bytes;
257        assert!(size_1 > 0);
258        assert_eq!(
259            tag_data.data,
260            HashMap::from([(ExtendedMoniker(moniker1.clone()), data1.clone())])
261        );
262
263        tag_data.merge(make_timestamps(100), data2.clone());
264
265        assert_eq!(
266            tag_data.data,
267            HashMap::from([
268                (ExtendedMoniker(moniker1.clone()), data1),
269                (ExtendedMoniker(moniker2), data2)
270            ])
271        );
272        assert!(tag_data.total_bytes > size_1);
273    }
274
275    #[fuchsia::test]
276    fn test_tag_data_merges_data_for_same_moniker() {
277        use diagnostics_data::{InspectDataBuilder, Timestamp, hierarchy};
278
279        let mut tag_data = make_tag_data(1000);
280
281        let moniker = diagnostics_data::ExtendedMoniker::parse_str("moniker").unwrap();
282        let data1 = InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(100))
283            .with_hierarchy(hierarchy! { root: { child: { x: 1 } } })
284            .build();
285
286        tag_data.merge(make_timestamps(100), data1);
287
288        let data2 = InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(200))
289            .with_hierarchy(hierarchy! { root: { child: { x: 2, y: 3 } } })
290            .build();
291
292        tag_data.merge(make_timestamps(200), data2);
293
294        // Verify that data was merged (x updated to 2, y added)
295        let expected_data =
296            InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(200))
297                .with_hierarchy(hierarchy! { root: { child: { x: 2, y: 3 } } })
298                .build();
299
300        assert_eq!(tag_data.data, HashMap::from([(ExtendedMoniker(moniker), expected_data)]));
301
302        assert_eq!(tag_data.timestamps.last_sample_boot.into_nanos(), 200);
303        assert!(tag_data.total_bytes > 0);
304    }
305
306    #[fuchsia::test]
307    fn test_tag_data_calculate_total_bytes() {
308        let mut tag_data = make_tag_data(1000);
309
310        let moniker = diagnostics_data::ExtendedMoniker::parse_str("moniker").unwrap();
311        let data = InspectDataBuilder::new(moniker.clone(), "url", Timestamp::from_nanos(100))
312            .with_hierarchy(hierarchy! { root: { child: { x: 1 } } })
313            .build();
314
315        tag_data.data.insert(ExtendedMoniker(moniker), data);
316        tag_data.calculate_total_bytes();
317
318        let initial_bytes = tag_data.total_bytes;
319        assert!(initial_bytes > 0);
320        assert!(tag_data.errors.is_empty());
321
322        // Now reduce max_bytes to force overflow
323        tag_data.max_bytes = initial_bytes - 1;
324        tag_data.calculate_total_bytes();
325
326        assert_eq!(tag_data.total_bytes, initial_bytes);
327        assert_eq!(tag_data.data, HashMap::new());
328        let expected_errors = VecDeque::from([format!(
329            "Data too big: {} > max length {}",
330            initial_bytes, tag_data.max_bytes
331        )]);
332        assert_eq!(tag_data.errors, expected_errors);
333    }
334
335    #[fuchsia::test]
336    fn test_tag_data_max_errors() {
337        let mut tag_data = make_tag_data(1000);
338
339        for i in 0..10 {
340            tag_data.add_error(format!("Error {}", i));
341        }
342
343        let expected_errors = VecDeque::from([
344            "Error 9".to_string(),
345            "Error 8".to_string(),
346            "Error 7".to_string(),
347            "Error 6".to_string(),
348            "Error 5".to_string(),
349        ]);
350        assert_eq!(tag_data.errors, expected_errors);
351    }
352
353    #[fuchsia::test]
354    fn test_tag_data_error_truncation() {
355        let mut tag_data = make_tag_data(1000);
356
357        // Test truncation of long error
358        let long_error = "a".repeat(MAX_ERROR_LEN + 10);
359        tag_data.add_error(long_error);
360        assert_eq!(tag_data.errors[0].len(), MAX_ERROR_LEN);
361    }
362}