1use anyhow::Result;
6use cobalt_client::traits::AsEventCode;
7use futures::StreamExt;
8use memory_metrics_registry::cobalt_registry;
9use stalls::{MemoryStallMetrics, StallProvider};
10use zx::MonotonicInstant;
11use {anyhow, fidl_fuchsia_metrics as fmetrics};
12
13use crate::error_from_metrics_error;
14
15pub async fn collect_stalls_forever(
17 stalls_provider: impl StallProvider,
18 metric_event_logger: fmetrics::MetricEventLoggerProxy,
19) -> Result<()> {
20 let mut last_stall = MemoryStallMetrics::default();
21
22 fuchsia_async::Timer::new(MonotonicInstant::ZERO + zx::Duration::from_hours(1)).await;
25
26 let mut timer = fuchsia_async::Interval::new(zx::Duration::from_hours(1));
27 loop {
28 let new_stall = stalls_provider.get_stall_info()?;
29
30 let stall_some_event = fmetrics::MetricEvent {
32 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
33 payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
34 (new_stall.some - last_stall.some).as_millis(),
35 )?),
36 event_codes: vec![
37 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code(),
38 ],
39 };
40 let stall_full_event = fmetrics::MetricEvent {
41 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
42 payload: fmetrics::MetricEventPayload::IntegerValue(i64::try_from(
43 (new_stall.full - last_stall.full).as_millis(),
44 )?),
45 event_codes: vec![
46 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code(),
47 ],
48 };
49
50 last_stall = new_stall;
51
52 let events = vec![stall_some_event, stall_full_event];
53 metric_event_logger.log_metric_events(&events).await?.map_err(error_from_metrics_error)?;
54 timer.next().await;
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use anyhow::anyhow;
62 use fuchsia_async as fasync;
63 use futures::task::Poll;
64 use std::sync::Arc;
65 use std::sync::atomic::{AtomicU32, Ordering};
66 use std::time::Duration;
67
68 fn get_stall_provider() -> impl StallProvider {
69 #[derive(Clone)]
70 struct FakeStallProvider {
71 count: Arc<AtomicU32>,
72 }
73
74 impl Default for FakeStallProvider {
75 fn default() -> Self {
76 Self { count: Arc::new(AtomicU32::new(1)) }
77 }
78 }
79
80 impl StallProvider for FakeStallProvider {
81 fn get_stall_info(&self) -> Result<MemoryStallMetrics, anyhow::Error> {
82 let count = self.count.fetch_add(1, Ordering::Relaxed);
83 let memory_stall = MemoryStallMetrics {
84 some: Duration::from_millis((count * 10).into()),
85 full: Duration::from_millis((count * 20).into()),
86 };
87 Ok(memory_stall)
88 }
89 }
90
91 FakeStallProvider::default()
92 }
93
94 #[test]
95 fn test_periodic_stalls_collection() -> anyhow::Result<()> {
96 let mut exec = fasync::TestExecutor::new_with_fake_time();
98
99 let data_provider = get_stall_provider();
101
102 let (metric_event_logger, metric_event_request_stream) =
104 fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
105
106 exec.set_fake_time(
108 (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(3 * 60)).into(),
109 );
110 let mut stalls_collector = fuchsia_async::Task::spawn(async move {
112 collect_stalls_forever(data_provider, metric_event_logger).await
113 });
114
115 assert!(
117 exec.run_until_stalled(&mut stalls_collector).is_pending(),
118 "Stalls collection service returned unexpectedly early"
119 );
120
121 let mut metric_event_request_future = metric_event_request_stream.into_future();
123 assert!(
124 exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
125 "Stalls collection service returned unexpectedly early"
126 );
127
128 assert!(
130 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
131 exec.now() + zx::Duration::from_seconds(60 * 60 + 10)
132 )))
133 .is_ready(),
134 "Failed to advance time"
135 );
136
137 let Poll::Ready((event, metric_event_request_stream)) =
139 exec.run_until_stalled(&mut metric_event_request_future)
140 else {
141 panic!("Failed to receive metrics")
142 };
143 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
144 match event {
145 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
146 assert_eq!(events.len(), 2);
147 assert_eq!(
149 events[0],
150 fmetrics::MetricEvent {
151 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
152 event_codes: vec![
153 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
154 ],
155 payload: fmetrics::MetricEventPayload::IntegerValue(10)
156 }
157 );
158 assert_eq!(
159 events[1],
160 fmetrics::MetricEvent {
161 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
162 event_codes: vec![
163 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
164 ],
165 payload: fmetrics::MetricEventPayload::IntegerValue(20)
166 }
167 );
168 responder.send(Ok(()))?;
169 }
170 _ => panic!("Unexpected metric event"),
171 }
172
173 let mut metric_event_request_future = metric_event_request_stream.into_future();
174
175 assert!(exec.run_until_stalled(&mut metric_event_request_future).is_pending());
176
177 assert!(
179 exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
180 (zx::MonotonicInstant::ZERO + zx::Duration::from_seconds(60 * 60 * 2 + 10)).into()
181 )))
182 .is_ready(),
183 "Failed to advance time"
184 );
185
186 let Poll::Ready((event, metric_event_request_stream)) =
188 exec.run_until_stalled(&mut metric_event_request_future)
189 else {
190 panic!("Failed to receive metrics")
191 };
192 let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
193 match event {
194 fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, responder, .. } => {
195 assert_eq!(events.len(), 2);
196 assert_eq!(
198 events[0],
199 fmetrics::MetricEvent {
200 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
201 event_codes: vec![
202 cobalt_registry::MemoryMetricDimensionStallType::Some.as_event_code()
203 ],
204 payload: fmetrics::MetricEventPayload::IntegerValue(10)
205 }
206 );
207 assert_eq!(
208 events[1],
209 fmetrics::MetricEvent {
210 metric_id: cobalt_registry::MEMORY_STALLS_PER_HOUR_METRIC_ID,
211 event_codes: vec![
212 cobalt_registry::MemoryMetricDimensionStallType::Full.as_event_code()
213 ],
214 payload: fmetrics::MetricEventPayload::IntegerValue(20)
215 }
216 );
217 responder.send(Ok(()))?;
218 }
219 _ => panic!("Unexpected metric event"),
220 }
221
222 assert!(
223 exec.run_until_stalled(&mut metric_event_request_stream.into_future()).is_pending()
224 );
225
226 Ok(())
227 }
228}