cobalt/
buckets.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::error_from_metrics_error;
6use anyhow::Result;
7use attribution_processing::ResourceEnumerator;
8use attribution_processing::digest::{BucketDefinition, Digest};
9use cobalt_client::traits::{AsEventCode, AsEventCodes};
10use cobalt_registry::MemoryLeakMigratedMetricDimensionTimeSinceBoot as TimeSinceBoot;
11use fuchsia_trace::duration;
12use futures::stream::StreamExt;
13use futures::{TryFutureExt, try_join};
14use memory_metrics_registry::cobalt_registry;
15use std::collections::HashMap;
16use std::sync::Arc;
17use traces::CATEGORY_MEMORY_CAPTURE;
18use {fidl_fuchsia_kernel as fkernel, fidl_fuchsia_metrics as fmetrics};
19
20/// Sorted list mapping durations to the largest event that is lower.
21const UPTIME_LEVEL_INDEX: &[(zx::BootDuration, TimeSinceBoot)] = &[
22    (zx::BootDuration::from_minutes(1), TimeSinceBoot::Up),
23    (zx::BootDuration::from_minutes(30), TimeSinceBoot::UpOneMinute),
24    (zx::BootDuration::from_hours(1), TimeSinceBoot::UpThirtyMinutes),
25    (zx::BootDuration::from_hours(6), TimeSinceBoot::UpOneHour),
26    (zx::BootDuration::from_hours(12), TimeSinceBoot::UpSixHours),
27    (zx::BootDuration::from_hours(24), TimeSinceBoot::UpTwelveHours),
28    (zx::BootDuration::from_hours(48), TimeSinceBoot::UpOneDay),
29    (zx::BootDuration::from_hours(72), TimeSinceBoot::UpTwoDays),
30    (zx::BootDuration::from_hours(144), TimeSinceBoot::UpThreeDays),
31];
32
33/// Convert an instant to the code corresponding to the largest uptime that is smaller.
34fn get_uptime_event_code(capture_time: zx::BootInstant) -> TimeSinceBoot {
35    let uptime = zx::Duration::from_nanos(capture_time.into_nanos());
36    UPTIME_LEVEL_INDEX
37        .into_iter()
38        .find(|&&(time, _)| uptime < time)
39        .map(|(_, code)| *code)
40        .unwrap_or(TimeSinceBoot::UpSixDays)
41}
42
43fn kmem_events(kmem_stats: &fkernel::MemoryStats) -> impl Iterator<Item = fmetrics::MetricEvent> {
44    use cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown as Breakdown;
45    let make_event = |code: Breakdown, value| {
46        Some(fmetrics::MetricEvent {
47            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
48            event_codes: vec![code.as_event_code()],
49            payload: fmetrics::MetricEventPayload::IntegerValue(value? as i64),
50        })
51    };
52    vec![
53        make_event(Breakdown::TotalBytes, kmem_stats.total_bytes),
54        make_event(
55            Breakdown::UsedBytes,
56            (|| Some((kmem_stats.total_bytes? as i64 - kmem_stats.free_bytes? as i64) as u64))(),
57        ),
58        make_event(Breakdown::FreeBytes, kmem_stats.free_bytes),
59        make_event(Breakdown::VmoBytes, kmem_stats.vmo_bytes),
60        make_event(Breakdown::KernelFreeHeapBytes, kmem_stats.free_heap_bytes),
61        make_event(Breakdown::MmuBytes, kmem_stats.mmu_overhead_bytes),
62        make_event(Breakdown::IpcBytes, kmem_stats.ipc_bytes),
63        make_event(Breakdown::KernelTotalHeapBytes, kmem_stats.total_heap_bytes),
64        make_event(Breakdown::WiredBytes, kmem_stats.wired_bytes),
65        make_event(Breakdown::OtherBytes, kmem_stats.other_bytes),
66    ]
67    .into_iter()
68    .flatten()
69}
70
71fn kmem_events_with_uptime(
72    kmem_stats: &fkernel::MemoryStats,
73    capture_time: zx::BootInstant,
74) -> impl Iterator<Item = fmetrics::MetricEvent> {
75    use cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown as Breakdown;
76    let make_event = |code: Breakdown, value| {
77        Some(fmetrics::MetricEvent {
78            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
79            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
80                general_breakdown: code,
81                time_since_boot: get_uptime_event_code(capture_time),
82            }
83            .as_event_codes(),
84            payload: fmetrics::MetricEventPayload::IntegerValue(value? as i64),
85        })
86    };
87    vec![
88        make_event(Breakdown::TotalBytes, kmem_stats.total_bytes),
89        make_event(
90            Breakdown::UsedBytes,
91            (|| Some((kmem_stats.total_bytes? as i64 - kmem_stats.free_bytes? as i64) as u64))(),
92        ),
93        make_event(Breakdown::FreeBytes, kmem_stats.free_bytes),
94        make_event(Breakdown::VmoBytes, kmem_stats.vmo_bytes),
95        make_event(Breakdown::KernelFreeHeapBytes, kmem_stats.free_heap_bytes),
96        make_event(Breakdown::MmuBytes, kmem_stats.mmu_overhead_bytes),
97        make_event(Breakdown::IpcBytes, kmem_stats.ipc_bytes),
98        make_event(Breakdown::KernelTotalHeapBytes, kmem_stats.total_heap_bytes),
99        make_event(Breakdown::WiredBytes, kmem_stats.wired_bytes),
100        make_event(Breakdown::OtherBytes, kmem_stats.other_bytes),
101    ]
102    .into_iter()
103    .flatten()
104}
105
106fn digest_events<'a>(
107    digest: &'a Digest,
108    bucket_name_to_code: &'a HashMap<String, u32>,
109) -> impl 'a + Iterator<Item = fmetrics::MetricEvent> {
110    digest.buckets.iter().filter_map(|bucket| {
111        Some(fmetrics::MetricEvent {
112            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
113            event_codes: vec![*bucket_name_to_code.get(&bucket.name)?],
114            payload: fmetrics::MetricEventPayload::IntegerValue(bucket.size as i64),
115        })
116    })
117}
118
119pub fn prepare_bucket_codes(bucket_definitions: &[BucketDefinition]) -> HashMap<String, u32> {
120    let mut bucket_name_to_code = HashMap::from([
121        (
122            "TotalBytes".to_string(),
123            cobalt_registry::MemoryMigratedMetricDimensionBucket::TotalBytes.as_event_code(),
124        ),
125        (
126            "Free".to_string(),
127            cobalt_registry::MemoryMigratedMetricDimensionBucket::Free.as_event_code(),
128        ),
129        (
130            "Kernel".to_string(),
131            cobalt_registry::MemoryMigratedMetricDimensionBucket::Kernel.as_event_code(),
132        ),
133        (
134            "Orphaned".to_string(),
135            cobalt_registry::MemoryMigratedMetricDimensionBucket::Orphaned.as_event_code(),
136        ),
137        (
138            "Undigested".to_string(),
139            cobalt_registry::MemoryMigratedMetricDimensionBucket::Undigested.as_event_code(),
140        ),
141        (
142            "[Addl]PagerTotal".to_string(),
143            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerTotal.as_event_code(),
144        ),
145        (
146            "[Addl]PagerNewest".to_string(),
147            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerNewest
148                .as_event_code(),
149        ),
150        (
151            "[Addl]PagerOldest".to_string(),
152            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerOldest
153                .as_event_code(),
154        ),
155        (
156            "[Addl]DiscardableLocked".to_string(),
157            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_DiscardableLocked
158                .as_event_code(),
159        ),
160        (
161            "[Addl]DiscardableUnlocked".to_string(),
162            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_DiscardableUnlocked
163                .as_event_code(),
164        ),
165        (
166            "[Addl]ZramCompressedBytes".to_string(),
167            cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_ZramCompressedBytes
168                .as_event_code(),
169        ),
170    ]);
171    bucket_definitions.iter().for_each(|bucket_definition| {
172        bucket_name_to_code
173            .entry(bucket_definition.name.clone())
174            .or_insert(bucket_definition.event_code as u32);
175    });
176    bucket_name_to_code
177}
178
179/// Periodically upload metrics related to memory usage.
180pub async fn collect_metrics_forever(
181    resource_enumerator: Arc<impl ResourceEnumerator + 'static>,
182    kernel_stats_proxy: fkernel::StatsProxy,
183    metric_event_logger: fmetrics::MetricEventLoggerProxy,
184    bucket_definitions: Arc<[BucketDefinition]>,
185) {
186    let bucket_name_to_code = prepare_bucket_codes(&bucket_definitions);
187    let mut timer = fuchsia_async::Interval::new(zx::Duration::from_minutes(5));
188    loop {
189        timer.next().await;
190
191        let result = collect_metrics_once(
192            &*resource_enumerator,
193            &kernel_stats_proxy,
194            &metric_event_logger,
195            &bucket_definitions,
196            &bucket_name_to_code,
197        )
198        .await;
199
200        if let Err(e) = result {
201            log::error!("Metrics collection failed: {e}");
202        }
203    }
204}
205
206async fn collect_metrics_once(
207    resource_enumerator: &impl ResourceEnumerator,
208    kernel_stats_proxy: &fkernel::StatsProxy,
209    metric_event_logger: &fmetrics::MetricEventLoggerProxy,
210    bucket_definitions: &[BucketDefinition],
211    bucket_name_to_code: &HashMap<String, u32>,
212) -> Result<()> {
213    duration!(CATEGORY_MEMORY_CAPTURE, "cobalt");
214    let timestamp = zx::BootInstant::get();
215    let (kmem_stats, kmem_stats_compression) = try_join!(
216        kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
217        kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
218    )?;
219    let digest = Digest::compute(
220        &*resource_enumerator,
221        &kmem_stats,
222        &kmem_stats_compression,
223        bucket_definitions,
224    )?;
225    upload_metrics(timestamp, &kmem_stats, metric_event_logger, &digest, bucket_name_to_code)
226        .await?;
227    Ok(())
228}
229
230/// Upload cobalt data based on collected memory data.
231pub async fn upload_metrics(
232    timestamp: zx::BootInstant,
233    kmem_stats: &fkernel::MemoryStats,
234    metric_event_logger: &fmetrics::MetricEventLoggerProxy,
235    digest: &Digest,
236    bucket_codes: &HashMap<String, u32>,
237) -> Result<()> {
238    let events = kmem_events(kmem_stats)
239        .chain(kmem_events_with_uptime(kmem_stats, timestamp))
240        .chain(digest_events(digest, &bucket_codes));
241    metric_event_logger
242        .log_metric_events(&events.collect::<Vec<fmetrics::MetricEvent>>())
243        .await?
244        .map_err(error_from_metrics_error)?;
245    Ok(())
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use anyhow::anyhow;
252    use attribution_processing::{
253        Attribution, AttributionData, GlobalPrincipalIdentifier, Principal, PrincipalDescription,
254        PrincipalType, Resource, ResourceEnumerator, ResourceReference, ZXName,
255    };
256    use futures::TryStreamExt;
257    use futures::task::Poll;
258    use regex::bytes::Regex;
259    use std::time::Duration;
260    use {fidl_fuchsia_memory_attribution_plugin as fplugin, fuchsia_async as fasync};
261
262    fn get_resource_enumerator() -> Arc<impl ResourceEnumerator + 'static> {
263        let attribution_data = AttributionData {
264            principals_vec: vec![Principal {
265                identifier: GlobalPrincipalIdentifier::new_for_test(1),
266                description: Some(PrincipalDescription::Component("principal".to_owned())),
267                principal_type: PrincipalType::Runnable,
268                parent: None,
269            }],
270            resources_vec: vec![
271                // Orphaned VMO.
272                Resource {
273                    koid: 10,
274                    name_index: 0,
275                    resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
276                        parent: None,
277                        private_committed_bytes: Some(1024),
278                        private_populated_bytes: Some(2048),
279                        scaled_committed_bytes: Some(1024),
280                        scaled_populated_bytes: Some(2048),
281                        total_committed_bytes: Some(1024),
282                        total_populated_bytes: Some(2048),
283                        ..Default::default()
284                    }),
285                },
286                // VMO belonging to bucket1.
287                Resource {
288                    koid: 20,
289                    name_index: 1,
290                    resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
291                        parent: None,
292                        private_committed_bytes: Some(1024),
293                        private_populated_bytes: Some(2048),
294                        scaled_committed_bytes: Some(1024),
295                        scaled_populated_bytes: Some(2048),
296                        total_committed_bytes: Some(1024),
297                        total_populated_bytes: Some(2048),
298                        ..Default::default()
299                    }),
300                },
301                // Process owning bucket1's VMO.
302                Resource {
303                    koid: 30,
304                    name_index: 1,
305                    resource_type: fplugin::ResourceType::Process(fplugin::Process {
306                        vmos: Some(vec![20]),
307                        ..Default::default()
308                    }),
309                },
310            ],
311            resource_names: vec![
312                ZXName::from_string_lossy("resource"),
313                ZXName::from_string_lossy("bucket1_resource"),
314            ],
315            attributions: vec![Attribution {
316                source: GlobalPrincipalIdentifier::new_for_test(1),
317                subject: GlobalPrincipalIdentifier::new_for_test(1),
318                resources: vec![ResourceReference::KernelObject(10)],
319            }],
320        };
321        Arc::new(attribution_data)
322    }
323
324    async fn serve_kernel_stats(
325        mut request_stream: fkernel::StatsRequestStream,
326    ) -> Result<(), fidl::Error> {
327        while let Some(request) = request_stream.try_next().await? {
328            match request {
329                fkernel::StatsRequest::GetMemoryStats { responder } => {
330                    responder
331                        .send(&fkernel::MemoryStats {
332                            total_bytes: Some(1),
333                            free_bytes: Some(2),
334                            wired_bytes: Some(3),
335                            total_heap_bytes: Some(4),
336                            free_heap_bytes: Some(5),
337                            vmo_bytes: Some(6),
338                            mmu_overhead_bytes: Some(7),
339                            ipc_bytes: Some(8),
340                            other_bytes: Some(9),
341                            free_loaned_bytes: Some(10),
342                            cache_bytes: Some(11),
343                            slab_bytes: Some(12),
344                            zram_bytes: Some(13),
345                            vmo_reclaim_total_bytes: Some(14),
346                            vmo_reclaim_newest_bytes: Some(15),
347                            vmo_reclaim_oldest_bytes: Some(16),
348                            vmo_reclaim_disabled_bytes: Some(17),
349                            vmo_discardable_locked_bytes: Some(18),
350                            vmo_discardable_unlocked_bytes: Some(19),
351                            ..Default::default()
352                        })
353                        .unwrap();
354                }
355                fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
356                    unimplemented!("Deprecated call, should not be used")
357                }
358                fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
359                    responder
360                        .send(&fkernel::MemoryStatsCompression {
361                            uncompressed_storage_bytes: Some(20),
362                            compressed_storage_bytes: Some(21),
363                            compressed_fragmentation_bytes: Some(22),
364                            compression_time: Some(23),
365                            decompression_time: Some(24),
366                            total_page_compression_attempts: Some(25),
367                            failed_page_compression_attempts: Some(26),
368                            total_page_decompressions: Some(27),
369                            compressed_page_evictions: Some(28),
370                            eager_page_compressions: Some(29),
371                            memory_pressure_page_compressions: Some(30),
372                            critical_memory_page_compressions: Some(31),
373                            pages_decompressed_unit_ns: Some(32),
374                            pages_decompressed_within_log_time: Some([
375                                40, 41, 42, 43, 44, 45, 46, 47,
376                            ]),
377                            ..Default::default()
378                        })
379                        .unwrap();
380                }
381                fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
382                fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
383            }
384        }
385        Ok(())
386    }
387
388    #[test]
389    fn test_periodic_metrics_collection() -> anyhow::Result<()> {
390        // Setup executor.
391        let mut exec = fasync::TestExecutor::new_with_fake_time();
392
393        // Setup mock data providers.
394        let resource_enumerator = get_resource_enumerator();
395        let (stats_provider, stats_request_stream) =
396            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
397        fasync::Task::spawn(async move {
398            serve_kernel_stats(stats_request_stream).await.unwrap();
399        })
400        .detach();
401        // Define a single bucket which matches a specific VMO.
402        let bucket_definitions = Arc::new([BucketDefinition {
403            name: "bucket1".to_string(),
404            vmo: Some(Regex::new("bucket1.*")?),
405            event_code: 1,
406            process: None,
407        }]);
408
409        // Setup test proxy to observe emitted events from the service.
410        let (metric_event_logger, metric_event_request_stream) =
411            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
412
413        // Service under test.
414        let mut metrics_collector = fuchsia_async::Task::spawn(collect_metrics_forever(
415            resource_enumerator,
416            stats_provider,
417            metric_event_logger,
418            bucket_definitions,
419        ));
420
421        // Give the service the opportunity to run.
422        assert!(
423            exec.run_until_stalled(&mut metrics_collector).is_pending(),
424            "Metrics collection service returned unexpectedly early"
425        );
426
427        // Ensure no metrics has been uploaded yet.
428        let mut metric_event_request_future = metric_event_request_stream.into_future();
429        assert!(
430            exec.run_until_stalled(&mut metric_event_request_future).is_pending(),
431            "Metrics collection service returned unexpectedly early"
432        );
433
434        // Fake the passage of time, so that collect_metrics may do a capture.
435        assert!(
436            exec.run_until_stalled(&mut std::pin::pin!(fasync::TestExecutor::advance_to(
437                exec.now() + Duration::from_secs(5 * 60).into()
438            )))
439            .is_ready(),
440            "Failed to advance time"
441        );
442        let uptime = get_uptime_event_code(zx::BootInstant::get());
443
444        // Ensure we have one and only one event ready for consumption.
445        let Poll::Ready((event, metric_event_request_stream)) =
446            exec.run_until_stalled(&mut metric_event_request_future)
447        else {
448            panic!("Failed to receive metrics")
449        };
450        let event = event.ok_or_else(|| anyhow!("Metrics stream unexpectedly closed"))??;
451        match event {
452            fmetrics::MetricEventLoggerRequest::LogMetricEvents { events, .. } => {
453                // Kernel metrics
454                assert_eq!(
455                    &events[0..10],
456                    vec![
457                        fmetrics::MetricEvent {
458                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
459                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::TotalBytes.as_event_code()],
460                            payload: fmetrics::MetricEventPayload::IntegerValue(1)
461                        },
462                        fmetrics::MetricEvent {
463                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
464                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::UsedBytes.as_event_code()],
465                            payload: fmetrics::MetricEventPayload::IntegerValue(-1)
466                        },
467                        fmetrics::MetricEvent {
468                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
469                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::FreeBytes.as_event_code()],
470                            payload: fmetrics::MetricEventPayload::IntegerValue(2)
471                        },
472                        fmetrics::MetricEvent {
473                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
474                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::VmoBytes.as_event_code()],
475                            payload: fmetrics::MetricEventPayload::IntegerValue(6)
476                        },
477                        fmetrics::MetricEvent {
478                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
479                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::KernelFreeHeapBytes.as_event_code()],
480                            payload: fmetrics::MetricEventPayload::IntegerValue(5)
481                        },
482                        fmetrics::MetricEvent {
483                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
484                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::MmuBytes.as_event_code()],
485                            payload: fmetrics::MetricEventPayload::IntegerValue(7)
486                        },
487                        fmetrics::MetricEvent {
488                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
489                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::IpcBytes.as_event_code()],
490                            payload: fmetrics::MetricEventPayload::IntegerValue(8)
491                        },
492                        fmetrics::MetricEvent {
493                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
494                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::KernelTotalHeapBytes.as_event_code()],
495                            payload: fmetrics::MetricEventPayload::IntegerValue(4)
496                        },
497                        fmetrics::MetricEvent {
498                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
499                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::WiredBytes.as_event_code()],
500                            payload: fmetrics::MetricEventPayload::IntegerValue(3)
501                        },
502                        fmetrics::MetricEvent {
503                            metric_id: cobalt_registry::MEMORY_GENERAL_BREAKDOWN_MIGRATED_METRIC_ID,
504                            event_codes: vec![cobalt_registry::MemoryGeneralBreakdownMigratedMetricDimensionGeneralBreakdown::OtherBytes.as_event_code()],
505                            payload: fmetrics::MetricEventPayload::IntegerValue(9)
506                        },]);
507                // Kernel metrics with uptime
508                assert_eq!(
509                    &events[10..20],
510                    vec![
511                        fmetrics::MetricEvent {
512                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
513                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
514                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::TotalBytes, time_since_boot: uptime}.as_event_codes(),
515                            payload: fmetrics::MetricEventPayload::IntegerValue(1)
516                        },
517                        fmetrics::MetricEvent {
518                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
519                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
520                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::UsedBytes, time_since_boot:uptime}.as_event_codes(),
521                            payload: fmetrics::MetricEventPayload::IntegerValue(-1)
522                        },
523                        fmetrics::MetricEvent {
524                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
525                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
526                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::FreeBytes, time_since_boot:uptime}.as_event_codes(),                            payload: fmetrics::MetricEventPayload::IntegerValue(2)
527                        },
528                        fmetrics::MetricEvent {
529                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
530                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
531                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::VmoBytes, time_since_boot:uptime}.as_event_codes(),
532                            payload: fmetrics::MetricEventPayload::IntegerValue(6)
533                        },
534                        fmetrics::MetricEvent {
535                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
536                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
537                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::KernelFreeHeapBytes, time_since_boot:uptime}.as_event_codes(),
538                            payload: fmetrics::MetricEventPayload::IntegerValue(5)
539                        },
540                        fmetrics::MetricEvent {
541                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
542                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
543                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::MmuBytes, time_since_boot:uptime}.as_event_codes(),
544                            payload: fmetrics::MetricEventPayload::IntegerValue(7)
545                        },
546                        fmetrics::MetricEvent {
547                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
548                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
549                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::IpcBytes, time_since_boot:uptime}.as_event_codes(),
550                            payload: fmetrics::MetricEventPayload::IntegerValue(8)
551                        },
552                        fmetrics::MetricEvent {
553                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
554                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
555                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::KernelTotalHeapBytes, time_since_boot:uptime}.as_event_codes(),
556                            payload: fmetrics::MetricEventPayload::IntegerValue(4)
557                        },
558                        fmetrics::MetricEvent {
559                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
560                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
561                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::WiredBytes, time_since_boot:uptime}.as_event_codes(),
562                            payload: fmetrics::MetricEventPayload::IntegerValue(3)
563                        },
564                        fmetrics::MetricEvent {
565                            metric_id: cobalt_registry::MEMORY_LEAK_MIGRATED_METRIC_ID,
566                            event_codes: cobalt_registry::MemoryLeakMigratedEventCodes {
567                                general_breakdown: cobalt_registry::MemoryLeakMigratedMetricDimensionGeneralBreakdown::OtherBytes, time_since_boot:uptime}.as_event_codes(),
568                            payload: fmetrics::MetricEventPayload::IntegerValue(9)
569                        },
570                    ]
571                );
572                // Digest metrics
573                assert_eq!(
574                    &events[20..],
575                    vec![
576                        // Buckets with custom definitions
577                        fmetrics::MetricEvent {
578                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
579                            event_codes: vec![1], // Corresponds to the "bucket1" bucket
580                            payload: fmetrics::MetricEventPayload::IntegerValue(1024)
581                        },
582                        // Default buckets
583                        fmetrics::MetricEvent {
584                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
585                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::Undigested.as_event_code()],
586                            payload: fmetrics::MetricEventPayload::IntegerValue(1024)
587                        },
588                        fmetrics::MetricEvent {
589                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
590                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::Orphaned.as_event_code()],
591                            payload: fmetrics::MetricEventPayload::IntegerValue(0)
592                        },
593                        fmetrics::MetricEvent {
594                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
595                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::Kernel.as_event_code()],
596                            payload: fmetrics::MetricEventPayload::IntegerValue(31)
597                        },
598                        fmetrics::MetricEvent {
599                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
600                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::Free.as_event_code()],
601                            payload: fmetrics::MetricEventPayload::IntegerValue(2)
602                        },
603                        fmetrics::MetricEvent {
604                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
605                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerTotal.as_event_code()],
606                            payload: fmetrics::MetricEventPayload::IntegerValue(14)
607                        },
608                        fmetrics::MetricEvent {
609                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
610                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerNewest.as_event_code()],
611                            payload: fmetrics::MetricEventPayload::IntegerValue(15)
612                        },
613                        fmetrics::MetricEvent {
614                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
615                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_PagerOldest.as_event_code()],
616                            payload: fmetrics::MetricEventPayload::IntegerValue(16)
617                        },
618                        fmetrics::MetricEvent {
619                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
620                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_DiscardableLocked.as_event_code()],
621                            payload: fmetrics::MetricEventPayload::IntegerValue(18)
622                        },
623                        fmetrics::MetricEvent {
624                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
625                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_DiscardableUnlocked.as_event_code()],
626                            payload: fmetrics::MetricEventPayload::IntegerValue(19)
627                        },
628                        fmetrics::MetricEvent {
629                            metric_id: cobalt_registry::MEMORY_MIGRATED_METRIC_ID,
630                            event_codes: vec![cobalt_registry::MemoryMigratedMetricDimensionBucket::__Addl_ZramCompressedBytes.as_event_code()],
631                            payload: fmetrics::MetricEventPayload::IntegerValue(21)
632                        }
633                    ]
634                )
635            }
636            _ => panic!("Unexpected metric event"),
637        }
638
639        assert!(
640            exec.run_until_stalled(&mut metric_event_request_stream.into_future()).is_pending()
641        );
642        Ok(())
643    }
644}