1pub mod processors;
6
7use crate::telemetry::processors::network_properties::NetworkPropertiesProcessor;
8use anyhow::Error;
9use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
10use fuchsia_inspect::Inspector;
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::{Future, StreamExt};
14use log::{info, warn};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17
18#[derive(Clone, Debug)]
19pub struct NetworkEventMetadata {
20 pub id: u64,
21 pub name: Option<String>,
22 pub transport: fnp_socketproxy::NetworkType,
23 pub is_fuchsia_provisioned: bool,
24 pub connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
25}
26
27#[derive(Debug)]
28pub enum TelemetryEvent {
29 DefaultNetworkChanged(NetworkEventMetadata),
30 DefaultNetworkLost,
31 NetworkChanged(NetworkEventMetadata),
32}
33
34#[derive(Clone, Debug)]
35pub struct TelemetrySender {
36 sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
37 sender_is_blocked: Arc<AtomicBool>,
38}
39
40impl TelemetrySender {
41 pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
42 Self {
43 sender: Arc::new(Mutex::new(sender)),
44 sender_is_blocked: Arc::new(AtomicBool::new(false)),
45 }
46 }
47
48 pub fn send(&self, event: TelemetryEvent) {
49 match self.sender.lock().try_send(event) {
50 Ok(_) => {
51 if self
52 .sender_is_blocked
53 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
54 .is_ok()
55 {
56 info!("TelemetrySender recovered and resumed sending");
57 }
58 }
59 Err(_) => {
60 if self
61 .sender_is_blocked
62 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
63 .is_ok()
64 {
65 warn!(
66 "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
67 );
68 }
69 }
70 }
71 }
72}
73
74const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
75
76pub fn serve_telemetry(
77 inspector: &Inspector,
78) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
79 let inspect_node = inspector.root();
80 let telemetry_node = inspect_node.create_child("telemetry");
81 let time_series_node = telemetry_node.create_child("time_series");
82 let client =
83 windowed_stats::experimental::inspect::TimeMatrixClient::new(time_series_node.clone_weak());
84
85 let processor = NetworkPropertiesProcessor::new(&telemetry_node, "root/telemetry", &client);
86 inspect_node.record(time_series_node);
87 inspect_node.record(telemetry_node);
88
89 let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
90 let sender = TelemetrySender::new(sender);
91
92 let fut = async move {
93 let mut processor = processor;
94 while let Some(event) = receiver.next().await {
95 match event {
96 TelemetryEvent::DefaultNetworkChanged(metadata) => {
97 processor.log_default_network_changed(metadata);
98 }
99 TelemetryEvent::DefaultNetworkLost => {
100 processor.log_default_network_lost();
101 }
102 TelemetryEvent::NetworkChanged(metadata) => {
103 processor.log_network_changed(metadata, &client);
104 }
105 }
106 }
107 Ok(())
108 };
109 (sender, fut)
110}