1use crate::identity::ComponentIdentity;
6use diagnostics_data::Severity;
7use flyweights::FlyStr;
8use fuchsia_inspect::{
9 ArrayProperty, Inspector, IntArrayProperty, LazyNode, Node, UintArrayProperty,
10};
11use fuchsia_sync::Mutex;
12use futures::FutureExt;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
15
16#[derive(Debug, Default)]
17struct LogCounter {
18 number: AtomicU64,
19 bytes: AtomicU64,
20}
21
22impl LogCounter {
23 fn count(&self, bytes: usize) {
24 self.number.fetch_add(1, Ordering::Relaxed);
25 self.bytes.fetch_add(bytes as u64, Ordering::Relaxed);
26 }
27
28 fn increment_bytes(&self, bytes: usize) {
29 self.number.fetch_add(1, Ordering::Relaxed);
30 self.bytes.fetch_add(bytes as u64, Ordering::Relaxed);
31 }
32}
33
34#[derive(Debug, Default)]
35struct InnerStats {
36 sockets_opened: AtomicU64,
37 sockets_closed: AtomicU64,
38 last_timestamp: AtomicI64,
39 total: LogCounter,
40 rolled_out: LogCounter,
41 fatal: LogCounter,
42 error: LogCounter,
43 warn: LogCounter,
44 info: LogCounter,
45 debug: LogCounter,
46 trace: LogCounter,
47 url: FlyStr,
48 invalid: LogCounter,
49}
50
51#[derive(Debug)]
52pub struct LogStreamStats {
53 inner: Arc<InnerStats>,
54 _lazy_node: LazyNode,
55}
56
57impl LogStreamStats {
58 pub fn new(parent: &Node, identity: &ComponentIdentity) -> Self {
59 let inner = Arc::new(InnerStats { url: identity.url.clone(), ..Default::default() });
60 let inner_clone = Arc::clone(&inner);
61 let lazy_node = parent.create_lazy_child(identity.moniker.to_string(), move || {
62 let inner = Arc::clone(&inner_clone);
63 async move {
64 let inspector = Inspector::default();
65 let root = inspector.root();
66 root.record_uint("sockets_opened", inner.sockets_opened.load(Ordering::Relaxed));
67 root.record_uint("sockets_closed", inner.sockets_closed.load(Ordering::Relaxed));
68 root.record_int("last_timestamp", inner.last_timestamp.load(Ordering::Relaxed));
69 root.record_string("url", inner.url.as_str());
70
71 let record_counter = |name: &str, counter: &LogCounter| {
72 let child = root.create_child(name);
73 child.record_uint("number", counter.number.load(Ordering::Relaxed));
74 child.record_uint("bytes", counter.bytes.load(Ordering::Relaxed));
75 root.record(child);
76 };
77
78 record_counter("total", &inner.total);
79 record_counter("rolled_out", &inner.rolled_out);
80 record_counter("fatal", &inner.fatal);
81 record_counter("error", &inner.error);
82 record_counter("warn", &inner.warn);
83 record_counter("info", &inner.info);
84 record_counter("debug", &inner.debug);
85 record_counter("trace", &inner.trace);
86 record_counter("invalid", &inner.invalid);
87
88 Ok(inspector)
89 }
90 .boxed()
91 });
92 Self { inner, _lazy_node: lazy_node }
93 }
94
95 pub fn open_socket(&self) {
96 self.inner.sockets_opened.fetch_add(1, Ordering::Relaxed);
97 }
98
99 pub fn close_socket(&self) {
100 self.inner.sockets_closed.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub fn increment_rolled_out(&self, msg_len: usize) {
104 self.inner.rolled_out.increment_bytes(msg_len);
105 }
106
107 pub fn increment_invalid(&self, bytes: usize) {
108 self.inner.invalid.increment_bytes(bytes);
109 }
110
111 pub fn ingest_message(&self, bytes: usize, severity: Severity) {
112 self.inner.total.count(bytes);
113 match severity {
114 Severity::Trace => self.inner.trace.count(bytes),
115 Severity::Debug => self.inner.debug.count(bytes),
116 Severity::Info => self.inner.info.count(bytes),
117 Severity::Warn => self.inner.warn.count(bytes),
118 Severity::Error => self.inner.error.count(bytes),
119 Severity::Fatal => self.inner.fatal.count(bytes),
120 }
121 }
122}
123
124pub struct GlobalAnalytics {
125 _node: Node,
126 logs_node: Node,
127}
128
129impl GlobalAnalytics {
130 pub fn new(parent: &Node) -> Self {
131 let node = parent.create_child("global_analytics");
132 let logs_node = node.create_child("logs");
133 Self { _node: node, logs_node }
134 }
135
136 pub fn logs_node(&self) -> &Node {
137 &self.logs_node
138 }
139}
140
141pub struct SaturationCurve {
142 boot_times: IntArrayProperty,
143 message_counts: UintArrayProperty,
144 cursor: Mutex<usize>,
145 size: usize,
146 _node: Node,
147}
148
149impl SaturationCurve {
150 pub fn new(parent: &Node, size: usize) -> Self {
151 let node = parent.create_child("saturation_curve");
152 let boot_times = node.create_int_array("boot_times", size);
153 let message_counts = node.create_uint_array("message_counts", size);
154 Self { boot_times, message_counts, cursor: Mutex::new(0), size, _node: node }
155 }
156
157 pub fn record(&self, boot_time: i64, message_count: u64) {
158 let mut cursor = self.cursor.lock();
159 self.boot_times.set(*cursor, boot_time);
160 self.message_counts.set(*cursor, message_count);
161 *cursor = (*cursor + 1) % self.size;
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use diagnostics_assertions::assert_data_tree;
169 use fuchsia_inspect::Inspector;
170
171 #[fuchsia::test]
172 async fn saturation_curve_circular() {
173 let inspector = Inspector::default();
174 let curve = SaturationCurve::new(inspector.root(), 3);
175
176 curve.record(1, 10);
177 curve.record(2, 20);
178 curve.record(3, 30);
179
180 assert_data_tree!(inspector,
181 root: {
182 saturation_curve: {
183 boot_times: vec![1i64, 2, 3],
184 message_counts: vec![10u64, 20, 30],
185 }
186 });
187
188 curve.record(4, 40);
190
191 assert_data_tree!(inspector,
192 root: {
193 saturation_curve: {
194 boot_times: vec![4i64, 2, 3],
195 message_counts: vec![40u64, 20, 30],
196 }
197 });
198 }
199}