Skip to main content

archivist_lib/logs/
stats.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use diagnostics_data::Severity;
7use flyweights::FlyStr;
8use fuchsia_inspect::{
9    ArrayProperty, Inspector, IntArrayProperty, LazyNode, Node, UintArrayProperty,
10};
11use fuchsia_sync::Mutex;
12use futures::FutureExt;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
15
16#[derive(Debug, Default)]
17struct LogCounter {
18    number: AtomicU64,
19    bytes: AtomicU64,
20}
21
22impl LogCounter {
23    fn count(&self, bytes: usize) {
24        self.number.fetch_add(1, Ordering::Relaxed);
25        self.bytes.fetch_add(bytes as u64, Ordering::Relaxed);
26    }
27
28    fn increment_bytes(&self, bytes: usize) {
29        self.number.fetch_add(1, Ordering::Relaxed);
30        self.bytes.fetch_add(bytes as u64, Ordering::Relaxed);
31    }
32}
33
34#[derive(Debug, Default)]
35struct InnerStats {
36    sockets_opened: AtomicU64,
37    sockets_closed: AtomicU64,
38    last_timestamp: AtomicI64,
39    total: LogCounter,
40    rolled_out: LogCounter,
41    fatal: LogCounter,
42    error: LogCounter,
43    warn: LogCounter,
44    info: LogCounter,
45    debug: LogCounter,
46    trace: LogCounter,
47    url: FlyStr,
48    invalid: LogCounter,
49}
50
51#[derive(Debug)]
52pub struct LogStreamStats {
53    inner: Arc<InnerStats>,
54    _lazy_node: LazyNode,
55}
56
57impl LogStreamStats {
58    pub fn new(parent: &Node, identity: &ComponentIdentity) -> Self {
59        let inner = Arc::new(InnerStats { url: identity.url.clone(), ..Default::default() });
60        let inner_clone = Arc::clone(&inner);
61        let lazy_node = parent.create_lazy_child(identity.moniker.to_string(), move || {
62            let inner = Arc::clone(&inner_clone);
63            async move {
64                let inspector = Inspector::default();
65                let root = inspector.root();
66                root.record_uint("sockets_opened", inner.sockets_opened.load(Ordering::Relaxed));
67                root.record_uint("sockets_closed", inner.sockets_closed.load(Ordering::Relaxed));
68                root.record_int("last_timestamp", inner.last_timestamp.load(Ordering::Relaxed));
69                root.record_string("url", inner.url.as_str());
70
71                let record_counter = |name: &str, counter: &LogCounter| {
72                    let child = root.create_child(name);
73                    child.record_uint("number", counter.number.load(Ordering::Relaxed));
74                    child.record_uint("bytes", counter.bytes.load(Ordering::Relaxed));
75                    root.record(child);
76                };
77
78                record_counter("total", &inner.total);
79                record_counter("rolled_out", &inner.rolled_out);
80                record_counter("fatal", &inner.fatal);
81                record_counter("error", &inner.error);
82                record_counter("warn", &inner.warn);
83                record_counter("info", &inner.info);
84                record_counter("debug", &inner.debug);
85                record_counter("trace", &inner.trace);
86                record_counter("invalid", &inner.invalid);
87
88                Ok(inspector)
89            }
90            .boxed()
91        });
92        Self { inner, _lazy_node: lazy_node }
93    }
94
95    pub fn open_socket(&self) {
96        self.inner.sockets_opened.fetch_add(1, Ordering::Relaxed);
97    }
98
99    pub fn close_socket(&self) {
100        self.inner.sockets_closed.fetch_add(1, Ordering::Relaxed);
101    }
102
103    pub fn increment_rolled_out(&self, msg_len: usize) {
104        self.inner.rolled_out.increment_bytes(msg_len);
105    }
106
107    pub fn increment_invalid(&self, bytes: usize) {
108        self.inner.invalid.increment_bytes(bytes);
109    }
110
111    pub fn ingest_message(&self, bytes: usize, severity: Severity) {
112        self.inner.total.count(bytes);
113        match severity {
114            Severity::Trace => self.inner.trace.count(bytes),
115            Severity::Debug => self.inner.debug.count(bytes),
116            Severity::Info => self.inner.info.count(bytes),
117            Severity::Warn => self.inner.warn.count(bytes),
118            Severity::Error => self.inner.error.count(bytes),
119            Severity::Fatal => self.inner.fatal.count(bytes),
120        }
121    }
122}
123
124pub struct GlobalAnalytics {
125    _node: Node,
126    logs_node: Node,
127}
128
129impl GlobalAnalytics {
130    pub fn new(parent: &Node) -> Self {
131        let node = parent.create_child("global_analytics");
132        let logs_node = node.create_child("logs");
133        Self { _node: node, logs_node }
134    }
135
136    pub fn logs_node(&self) -> &Node {
137        &self.logs_node
138    }
139}
140
141pub struct SaturationCurve {
142    boot_times: IntArrayProperty,
143    message_counts: UintArrayProperty,
144    cursor: Mutex<usize>,
145    size: usize,
146    _node: Node,
147}
148
149impl SaturationCurve {
150    pub fn new(parent: &Node, size: usize) -> Self {
151        let node = parent.create_child("saturation_curve");
152        let boot_times = node.create_int_array("boot_times", size);
153        let message_counts = node.create_uint_array("message_counts", size);
154        Self { boot_times, message_counts, cursor: Mutex::new(0), size, _node: node }
155    }
156
157    pub fn record(&self, boot_time: i64, message_count: u64) {
158        let mut cursor = self.cursor.lock();
159        self.boot_times.set(*cursor, boot_time);
160        self.message_counts.set(*cursor, message_count);
161        *cursor = (*cursor + 1) % self.size;
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use diagnostics_assertions::assert_data_tree;
169    use fuchsia_inspect::Inspector;
170
171    #[fuchsia::test]
172    async fn saturation_curve_circular() {
173        let inspector = Inspector::default();
174        let curve = SaturationCurve::new(inspector.root(), 3);
175
176        curve.record(1, 10);
177        curve.record(2, 20);
178        curve.record(3, 30);
179
180        assert_data_tree!(inspector,
181        root: {
182            saturation_curve: {
183                boot_times: vec![1i64, 2, 3],
184                message_counts: vec![10u64, 20, 30],
185            }
186        });
187
188        // Should wrap around
189        curve.record(4, 40);
190
191        assert_data_tree!(inspector,
192        root: {
193            saturation_curve: {
194                boot_times: vec![4i64, 2, 3],
195                message_counts: vec![40u64, 20, 30],
196            }
197        });
198    }
199}