cobalt/
stalls.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Result;
6use cobalt_client::traits::AsEventCode;
7use futures::StreamExt;
8use memory_metrics_registry::cobalt_registry;
9use stalls::{MemoryStallMetrics, StallProvider};
10use zx::MonotonicInstant;
11use {anyhow, fidl_fuchsia_metrics as fmetrics};
12
13use crate::error_from_metrics_error;
14
15/// Collect and publish to Cobalt memory stall increase rate, every hour.
16pub async fn collect_stalls_forever(
17    stalls_provider: impl StallProvider,
18    metric_event_logger: fmetrics::MetricEventLoggerProxy,
19) -> Result<()> {
20    let mut last_stall = MemoryStallMetrics::default();
21
22    // Wait for one hour after device start to get the first stall value. We don't use the one-hour
23    // timer as we may have been started later than at boot exactly.
24    fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
25
26    let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
27    loop {
28        let new_stall = stalls_provider.get_stall_info()?;
29
30        // The Cobalt metrics for stalls expect milliseconds, as defined in the Cobalt registry.
31        let stall_some_event = fmetrics::MetricEvent {
32            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
33            payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
34                (new_stall.some - last_stall.some).as_millis(),
35            )?),
36            event_codes: vec![
37                cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code(),
38            ],
39        };
40        let stall_full_event = fmetrics::MetricEvent {
41            metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
42            payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
43                (new_stall.full - last_stall.full).as_millis(),
44            )?),
45            event_codes: vec![
46                cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code(),
47            ],
48        };
49
50        last_stall = new_stall;
51
52        let events = vec![stall_some_event, stall_full_event];
53        metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
54        timer.next().await;
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use anyhow::anyhow;
62    use fuchsia_async as fasync;
63    use futures::task::Poll;
64    use std::sync::Arc;
65    use std::sync::atomic::{AtomicU32, Ordering};
66    use std::time::Duration;
67
68    fn get_stall_provider() -> impl StallProvider {
69        #[derive(Clone)]
70        struct FakeStallProvider {
71            count: Arc<AtomicU32>,
72        }
73
74        impl Default for FakeStallProvider {
75            fn default() -> Self {
76                Self { count: Arc::new(AtomicU32::new(1)) }
77            }
78        }
79
80        impl StallProvider for FakeStallProvider {
81            fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
82                let count = self.count.fetch_add(1, Ordering::Relaxed);
83                let memory_stall = MemoryStallMetrics {
84                    some: Duration::from_millis((count * 10).into()),
85                    full: Duration::from_millis((count * 20).into()),
86                };
87                Ok(memory_stall)
88            }
89        }
90
91        FakeStallProvider::default()
92    }
93
94    #[test]
95    fn test_periodic_stalls_collection() -> anyhow::Result<()> {
96        // Setup executor.
97        let mut exec = fasync::TestExecutor::new_with_fake_time();
98
99        // Setup mock data providers.
100        let data_provider = get_stall_provider();
101
102        // Setup test proxy to observe emitted events from the service.
103        let (metric_event_logger, metric_event_request_stream) =
104            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
105
106        // Set the time to shortly after boot
107        exec.set_fake_time(
108            (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(3 * 60)).into(),
109        );
110        // Service under test.
111        let mut stalls_collector = fuchsia_async::Task::spawn(async move {
112            collect_stalls_forever(data_provider, metric_event_logger).await
113        });
114
115        // Give the service the opportunity to run.
116        assert!(
117            exec.run_until_stalled(&mut stalls_collector).is_pending(),
118            "Stalls collection service returned unexpectedly early"
119        );
120
121        // Ensure no metrics has been uploaded yet.
122        let mut metric_event_request_future = metric_event_request_stream.into_future();
123        assert!(
124            exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
125            "Stalls collection service returned unexpectedly early"
126        );
127
128        // Fake the passage of time, so that collect_metrics may do a capture.
129        assert!(
130            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
131                exec.now() + zx::Duration::from_seconds(60 * 60 + 10)
132            )))
133            .is_ready(),
134            "Failed to advance time"
135        );
136
137        // Ensure we have one and only one event ready for consumption.
138        let Poll::Ready((event, metric_event_request_stream)) =
139            exec.run_until_stalled(&mut metric_event_request_future)
140        else {
141            panic!("Failed to receive metrics")
142        };
143        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
144        match event {
145            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
146                assert_eq!(events.len(), 2);
147                // Kernel metrics
148                assert_eq!(
149                    events[0],
150                    fmetrics::MetricEvent {
151                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
152                        event_codes: vec![
153                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
154                        ],
155                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
156                    }
157                );
158                assert_eq!(
159                    events[1],
160                    fmetrics::MetricEvent {
161                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
162                        event_codes: vec![
163                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
164                        ],
165                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
166                    }
167                );
168                responder.send(Ok(()))?;
169            }
170            _ => panic!("Unexpected metric event"),
171        }
172
173        let mut metric_event_request_future = metric_event_request_stream.into_future();
174
175        assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
176
177        // Advance to the next hour
178        assert!(
179            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
180                (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(60 * 60 * 2 + 10)).into()
181            )))
182            .is_ready(),
183            "Failed to advance time"
184        );
185
186        // Ensure we have one and only one event ready for consumption.
187        let Poll::Ready((event, metric_event_request_stream)) =
188            exec.run_until_stalled(&mut metric_event_request_future)
189        else {
190            panic!("Failed to receive metrics")
191        };
192        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
193        match event {
194            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
195                assert_eq!(events.len(), 2);
196                // Kernel metrics
197                assert_eq!(
198                    events[0],
199                    fmetrics::MetricEvent {
200                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
201                        event_codes: vec![
202                            cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
203                        ],
204                        payload: fmetrics::MetricEventPayload::IntegerValue(10)
205                    }
206                );
207                assert_eq!(
208                    events[1],
209                    fmetrics::MetricEvent {
210                        metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
211                        event_codes: vec![
212                            cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
213                        ],
214                        payload: fmetrics::MetricEventPayload::IntegerValue(20)
215                    }
216                );
217                responder.send(Ok(()))?;
218            }
219            _ => panic!("Unexpected metric event"),
220        }
221
222        assert!(
223            exec.run_until_stalled(&mut metric_event_request_stream.into_future()).is_pending()
224        );
225
226        Ok(())
227    }
228}