Skip to main content

netcfg/telemetry/
mod.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod processors;
6
7use crate::telemetry::processors::network_properties::NetworkPropertiesProcessor;
8use anyhow::Error;
9use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
10use fuchsia_inspect::Inspector;
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::{Future, StreamExt};
14use log::{info, warn};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17
18#[derive(Clone, Debug)]
19pub struct NetworkEventMetadata {
20    pub id: u64,
21    pub name: Option<String>,
22    pub transport: fnp_socketproxy::NetworkType,
23    pub is_fuchsia_provisioned: bool,
24    pub connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
25}
26
27#[derive(Debug)]
28pub enum TelemetryEvent {
29    DefaultNetworkChanged(NetworkEventMetadata),
30    DefaultNetworkLost,
31    NetworkChanged(NetworkEventMetadata),
32}
33
34#[derive(Clone, Debug)]
35pub struct TelemetrySender {
36    sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
37    sender_is_blocked: Arc<AtomicBool>,
38}
39
40impl TelemetrySender {
41    pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
42        Self {
43            sender: Arc::new(Mutex::new(sender)),
44            sender_is_blocked: Arc::new(AtomicBool::new(false)),
45        }
46    }
47
48    pub fn send(&self, event: TelemetryEvent) {
49        match self.sender.lock().try_send(event) {
50            Ok(_) => {
51                if self
52                    .sender_is_blocked
53                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
54                    .is_ok()
55                {
56                    info!("TelemetrySender recovered and resumed sending");
57                }
58            }
59            Err(_) => {
60                if self
61                    .sender_is_blocked
62                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
63                    .is_ok()
64                {
65                    warn!(
66                        "TelemetrySender dropped a msg: either buffer is full or no receiver is waiting"
67                    );
68                }
69            }
70        }
71    }
72}
73
74const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
75
76pub fn serve_telemetry(
77    inspector: &Inspector,
78) -> (TelemetrySender, impl Future<Output = Result<(), Error>>) {
79    let inspect_node = inspector.root();
80    let telemetry_node = inspect_node.create_child("telemetry");
81    let time_series_node = telemetry_node.create_child("time_series");
82    let client =
83        windowed_stats::experimental::inspect::TimeMatrixClient::new(time_series_node.clone_weak());
84
85    let processor = NetworkPropertiesProcessor::new(&telemetry_node, "root/telemetry", &client);
86    inspect_node.record(time_series_node);
87    inspect_node.record(telemetry_node);
88
89    let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
90    let sender = TelemetrySender::new(sender);
91
92    let fut = async move {
93        let mut processor = processor;
94        while let Some(event) = receiver.next().await {
95            match event {
96                TelemetryEvent::DefaultNetworkChanged(metadata) => {
97                    processor.log_default_network_changed(metadata);
98                }
99                TelemetryEvent::DefaultNetworkLost => {
100                    processor.log_default_network_lost();
101                }
102                TelemetryEvent::NetworkChanged(metadata) => {
103                    processor.log_network_changed(metadata, &client);
104                }
105            }
106        }
107        Ok(())
108    };
109    (sender, fut)
110}