inspect_nodes/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error, Result};
5use attribution_processing::digest::{BucketDefinition, Digest};
6use attribution_processing::AttributionDataProvider;
7use fpressure::WatcherRequest;
8use fuchsia_async::{MonotonicDuration, Task, WakeupTime};
9use fuchsia_inspect::{ArrayProperty, Inspector, Node};
10use fuchsia_inspect_contrib::nodes::BoundedListNode;
11use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
12use inspect_runtime::PublishedInspectController;
13use log::debug;
14use memory_monitor2_config::Config;
15use stalls::StallProvider;
16use std::sync::Arc;
17use {
18    fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
19    inspect_runtime as _,
20};
21
22/// Hold the resource required to serve the inspect tree.
23/// The FIDL service stops when this object is dropped.
24pub struct ServiceTask {
25    _inspect_controller: PublishedInspectController,
26    _periodic_digest: Task<Result<(), anyhow::Error>>,
27}
28
29/// Begins to serve the inspect tree, and returns an object holding the server's resources.
30/// Dropping the `ServiceTask` stops the service.
31pub fn start_service(
32    attribution_data_service: Arc<impl AttributionDataProvider>,
33    kernel_stats_proxy: fkernel::StatsProxy,
34    stall_provider: Arc<impl StallProvider>,
35    memory_monitor2_config: Config,
36    memorypressure_proxy: fpressure::ProviderProxy,
37    bucket_definitions: Vec<BucketDefinition>,
38) -> Result<ServiceTask> {
39    debug!("Start serving inspect tree.");
40
41    // This creates the root of an Inspect tree
42    // The Inspector is a singleton that you can access from any scope
43    let inspector = fuchsia_inspect::component::inspector();
44
45    // This serves the Inspect tree, converting failures into fatal errors
46    let inspect_controller =
47        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
48            .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
49
50    build_inspect_tree(
51        attribution_data_service.clone(),
52        kernel_stats_proxy.clone(),
53        stall_provider,
54        inspector,
55    );
56    let digest_service = digest_service(
57        memory_monitor2_config,
58        attribution_data_service,
59        kernel_stats_proxy,
60        memorypressure_proxy,
61        bucket_definitions,
62        inspector.root().create_child("logger"),
63    )?;
64    Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
65}
66
67fn build_inspect_tree(
68    attribution_data_service: Arc<impl AttributionDataProvider>,
69    kernel_stats_proxy: fkernel::StatsProxy,
70    stall_provider: Arc<impl StallProvider>,
71    inspector: &Inspector,
72) {
73    // Lazy evaluation is unregistered when the `LazyNode` is dropped.
74    {
75        let kernel_stats_proxy = kernel_stats_proxy.clone();
76        inspector.root().record_lazy_child("kmem_stats", move || {
77            let kernel_stats_proxy = kernel_stats_proxy.clone();
78            async move {
79                let inspector = Inspector::default();
80                let root = inspector.root();
81                let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
82                mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
83                mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
84                mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
85                mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
86                mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
87                mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
88                mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
89                mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
90                mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
91                mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
92                mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
93                mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
94                mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
95                mem_stats
96                    .vmo_reclaim_total_bytes
97                    .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
98                mem_stats
99                    .vmo_reclaim_newest_bytes
100                    .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
101                mem_stats
102                    .vmo_reclaim_oldest_bytes
103                    .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
104                mem_stats
105                    .vmo_reclaim_disabled_bytes
106                    .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
107                mem_stats
108                    .vmo_discardable_locked_bytes
109                    .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
110                mem_stats
111                    .vmo_discardable_unlocked_bytes
112                    .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
113                Ok(inspector)
114            }
115            .boxed()
116        })
117    };
118
119    {
120        inspector.root().record_lazy_child("kmem_stats_compression", move || {
121            let kernel_stats_proxy = kernel_stats_proxy.clone();
122            async move {
123                let inspector = Inspector::default();
124                let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
125                cmp_stats
126                    .uncompressed_storage_bytes
127                    .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
128                cmp_stats
129                    .compressed_storage_bytes
130                    .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
131                cmp_stats
132                    .compressed_fragmentation_bytes
133                    .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
134                cmp_stats
135                    .compression_time
136                    .map(|v| inspector.root().record_int("compression_time", v));
137                cmp_stats
138                    .decompression_time
139                    .map(|v| inspector.root().record_int("decompression_time", v));
140                cmp_stats
141                    .total_page_compression_attempts
142                    .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
143                cmp_stats
144                    .failed_page_compression_attempts
145                    .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
146                cmp_stats
147                    .total_page_decompressions
148                    .map(|v| inspector.root().record_uint("total_page_decompressions", v));
149                cmp_stats
150                    .compressed_page_evictions
151                    .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
152                cmp_stats
153                    .eager_page_compressions
154                    .map(|v| inspector.root().record_uint("eager_page_compressions", v));
155                cmp_stats
156                    .memory_pressure_page_compressions
157                    .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
158                cmp_stats
159                    .critical_memory_page_compressions
160                    .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
161                cmp_stats
162                    .pages_decompressed_unit_ns
163                    .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
164                cmp_stats.pages_decompressed_within_log_time.map(|v| {
165                    let array =
166                        inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
167                    // Using constant strings saves allocations.
168                    array.set(0, v[0]);
169                    array.set(1, v[1]);
170                    array.set(2, v[2]);
171                    array.set(3, v[3]);
172                    array.set(4, v[4]);
173                    array.set(5, v[5]);
174                    array.set(6, v[6]);
175                    array.set(7, v[7]);
176                    inspector.root().record(array);
177                });
178                Ok(inspector)
179            }
180            .boxed()
181        });
182    }
183
184    {
185        inspector.root().record_lazy_child("current", move || {
186            let attribution_data_service = attribution_data_service.clone();
187            async move {
188                let inspector = Inspector::default();
189                let current_attribution_data =
190                    attribution_data_service.get_attribution_data().await?;
191                let summary =
192                    attribution_processing::attribute_vmos(current_attribution_data).summary();
193
194                summary.principals.into_iter().for_each(|p| {
195                    let node = inspector.root().create_child(p.name);
196                    node.record_uint("committed_private", p.committed_private);
197                    node.record_double("committed_scaled", p.committed_scaled);
198                    node.record_uint("committed_total", p.committed_total);
199                    node.record_uint("populated_private", p.populated_private);
200                    node.record_double("populated_scaled", p.populated_scaled);
201                    node.record_uint("populated_total", p.populated_total);
202                    inspector.root().record(node);
203                });
204                Ok(inspector)
205            }
206            .boxed()
207        });
208    }
209
210    {
211        inspector.root().record_lazy_child("stalls", move || {
212            let stall_info = stall_provider.get_stall_info().unwrap();
213            let stall_rate_opt = stall_provider.get_stall_rate();
214            async move {
215                let inspector = Inspector::default();
216                inspector.root().record_int("current_some", stall_info.stall_time_some);
217                inspector.root().record_int("current_full", stall_info.stall_time_full);
218                if let Some(stall_rate) = stall_rate_opt {
219                    inspector
220                        .root()
221                        .record_int("rate_interval_s", stall_rate.interval.into_seconds());
222                    inspector.root().record_int("rate_some", stall_rate.rate_some);
223                    inspector.root().record_int("rate_full", stall_rate.rate_full);
224                }
225                Ok(inspector)
226            }
227            .boxed()
228        });
229    }
230}
231
232fn digest_service(
233    memory_monitor2_config: Config,
234    attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
235    kernel_stats_proxy: fkernel::StatsProxy,
236    memorypressure_proxy: fpressure::ProviderProxy,
237    bucket_definitions: Vec<BucketDefinition>,
238    digest_node: Node,
239) -> Result<Task<Result<(), Error>>> {
240    // Initialize pressure monitoring.
241    let (watcher, watcher_stream) =
242        fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
243    memorypressure_proxy.register_watcher(watcher)?;
244
245    Ok(fuchsia_async::Task::spawn(async move {
246        let mut buckets_list_node =
247            // Keep up to 100 measurements.
248            BoundedListNode::new(digest_node.create_child("measurements"), 100);
249        let buckets_names = std::cell::OnceCell::new();
250        let attribution_data_service = attribution_data_service;
251        let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
252
253        // Get the initial, baseline pressure level.
254        let (request, mut pressure_stream) = pressure_stream.into_future().await;
255        let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
256            anyhow::Error::msg(
257                "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
258            )
259        })??;
260        responder.send()?;
261        let mut current_level = level;
262        let new_timer = |level| {
263            MonotonicDuration::from_seconds(match level {
264                fpressure::Level::Normal => memory_monitor2_config.normal_capture_delay_s,
265                fpressure::Level::Warning => memory_monitor2_config.warning_capture_delay_s,
266                fpressure::Level::Critical => memory_monitor2_config.critical_capture_delay_s,
267            } as i64)
268            .into_timer()
269            .boxed()
270            .fuse()
271        };
272        let mut timer = new_timer(current_level);
273        loop {
274            // Wait for either a pressure change or the timer corresponding to the current level. In
275            // either case, reset the timer.
276            let () = select! {
277                // When we receive a pressure change, update the current level, and if necessary do
278                // a capture.
279                pressure = pressure_stream.next() =>
280                    match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
281                        WatcherRequest::OnLevelChanged{level, responder} => {
282                            responder.send()?;
283                            if level == current_level { continue; }
284                            current_level = level;
285                            timer = new_timer(level);
286                            if !memory_monitor2_config.capture_on_pressure_change { continue; }
287                        },
288                    },
289                // If instead we reached the deadline, do a capture anyway. The deadline depends on
290                // the current pressure level and the configuration.
291                _ = timer => {timer = new_timer(current_level);}
292            };
293
294            // Retrieve (concurrently) the data necessary to perform the aggregation.
295            let (attribution_data, kmem_stats, kmem_stats_compression) = try_join!(
296                attribution_data_service.get_attribution_data(),
297                kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
298                kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
299            )?;
300
301            // Compute the aggregation.
302            let Digest { buckets } = Digest::new(
303                &attribution_data,
304                kmem_stats,
305                kmem_stats_compression,
306                &bucket_definitions,
307            );
308
309            // Initialize the inspect property containing the buckets names, if necessary.
310            let _ = buckets_names.get_or_init(|| {
311                // Create inspect node to store buckets related information.
312                let buckets_names = digest_node.create_string_array("buckets", buckets.len());
313                for (i, attribution_processing::digest::Bucket { name, .. }) in
314                    buckets.iter().enumerate()
315                {
316                    buckets_names.set(i, name);
317                }
318                buckets_names
319            });
320
321            // Add an entry for the current aggregation.
322            buckets_list_node.add_entry(|n| {
323                n.record_int("timestamp", zx::BootInstant::get().into_nanos());
324                let ia = n.create_uint_array("bucket_sizes", buckets.len());
325                for (i, b) in buckets.iter().enumerate() {
326                    ia.set(i, b.size as u64);
327                }
328                n.record(ia);
329            });
330        }
331    }))
332}
333
334#[cfg(test)]
335mod tests {
336    use attribution_processing::{
337        Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
338        PrincipalType, Resource, ResourceReference,
339    };
340    use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
341    use fuchsia_async::TestExecutor;
342    use futures::future::BoxFuture;
343    use futures::task::Poll;
344    use futures::TryStreamExt;
345    use {
346        fidl_fuchsia_memory_attribution_plugin as fplugin,
347        fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
348    };
349
350    use super::*;
351    use std::time::Duration;
352
353    struct FakeAttributionDataProvider {}
354
355    impl AttributionDataProvider for FakeAttributionDataProvider {
356        fn get_attribution_data(&self) -> BoxFuture<'_, Result<AttributionData, anyhow::Error>> {
357            async {
358                Ok(AttributionData {
359                    principals_vec: vec![Principal {
360                        identifier: PrincipalIdentifier(1),
361                        description: PrincipalDescription::Component("principal".to_owned()),
362                        principal_type: PrincipalType::Runnable,
363                        parent: None,
364                    }],
365                    resources_vec: vec![Resource {
366                        koid: 10,
367                        name_index: 0,
368                        resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
369                            parent: None,
370                            private_committed_bytes: Some(1024),
371                            private_populated_bytes: Some(2048),
372                            scaled_committed_bytes: Some(1024),
373                            scaled_populated_bytes: Some(2048),
374                            total_committed_bytes: Some(1024),
375                            total_populated_bytes: Some(2048),
376                            ..Default::default()
377                        }),
378                    }],
379                    resource_names: vec!["resource".to_owned()],
380                    attributions: vec![Attribution {
381                        source: PrincipalIdentifier(1),
382                        subject: PrincipalIdentifier(1),
383                        resources: vec![ResourceReference::KernelObject(10)],
384                    }],
385                })
386            }
387            .boxed()
388        }
389    }
390
391    async fn serve_kernel_stats(
392        mut request_stream: fkernel::StatsRequestStream,
393    ) -> Result<(), fidl::Error> {
394        while let Some(request) = request_stream.try_next().await? {
395            match request {
396                fkernel::StatsRequest::GetMemoryStats { responder } => {
397                    responder
398                        .send(&fkernel::MemoryStats {
399                            total_bytes: Some(1),
400                            free_bytes: Some(2),
401                            wired_bytes: Some(3),
402                            total_heap_bytes: Some(4),
403                            free_heap_bytes: Some(5),
404                            vmo_bytes: Some(6),
405                            mmu_overhead_bytes: Some(7),
406                            ipc_bytes: Some(8),
407                            other_bytes: Some(9),
408                            free_loaned_bytes: Some(10),
409                            cache_bytes: Some(11),
410                            slab_bytes: Some(12),
411                            zram_bytes: Some(13),
412                            vmo_reclaim_total_bytes: Some(14),
413                            vmo_reclaim_newest_bytes: Some(15),
414                            vmo_reclaim_oldest_bytes: Some(16),
415                            vmo_reclaim_disabled_bytes: Some(17),
416                            vmo_discardable_locked_bytes: Some(18),
417                            vmo_discardable_unlocked_bytes: Some(19),
418                            ..Default::default()
419                        })
420                        .unwrap();
421                }
422                fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
423                    unimplemented!("Deprecated call, should not be used")
424                }
425                fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
426                    responder
427                        .send(&fkernel::MemoryStatsCompression {
428                            uncompressed_storage_bytes: Some(20),
429                            compressed_storage_bytes: Some(21),
430                            compressed_fragmentation_bytes: Some(22),
431                            compression_time: Some(23),
432                            decompression_time: Some(24),
433                            total_page_compression_attempts: Some(25),
434                            failed_page_compression_attempts: Some(26),
435                            total_page_decompressions: Some(27),
436                            compressed_page_evictions: Some(28),
437                            eager_page_compressions: Some(29),
438                            memory_pressure_page_compressions: Some(30),
439                            critical_memory_page_compressions: Some(31),
440                            pages_decompressed_unit_ns: Some(32),
441                            pages_decompressed_within_log_time: Some([
442                                40, 41, 42, 43, 44, 45, 46, 47,
443                            ]),
444                            ..Default::default()
445                        })
446                        .unwrap();
447                }
448                fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
449                fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
450            }
451        }
452        Ok(())
453    }
454
455    #[test]
456    fn test_build_inspect_tree() {
457        let mut exec = fasync::TestExecutor::new();
458
459        let data_provider = Arc::new(FakeAttributionDataProvider {});
460
461        let (stats_provider, stats_request_stream) =
462            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
463
464        fasync::Task::spawn(async move {
465            serve_kernel_stats(stats_request_stream).await.unwrap();
466        })
467        .detach();
468
469        let inspector = fuchsia_inspect::Inspector::default();
470
471        struct FakeStallProvider {}
472        impl StallProvider for FakeStallProvider {
473            fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
474                Ok(zx::MemoryStall { stall_time_some: 10, stall_time_full: 20 })
475            }
476
477            fn get_stall_rate(&self) -> Option<stalls::MemoryStallRate> {
478                Some(stalls::MemoryStallRate {
479                    interval: fasync::MonotonicDuration::from_seconds(60),
480                    rate_some: 1,
481                    rate_full: 2,
482                })
483            }
484        }
485
486        build_inspect_tree(
487            data_provider,
488            stats_provider,
489            Arc::new(FakeStallProvider {}),
490            &inspector,
491        );
492
493        let output = exec
494            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
495            .expect("got hierarchy");
496
497        assert_data_tree!(output, root: {
498            current: {
499                principal: {
500                    committed_private: 1024u64,
501                    committed_scaled: 1024.0,
502                    committed_total: 1024u64,
503                    populated_private: 2048u64,
504                    populated_scaled: 2048.0,
505                    populated_total: 2048u64
506                }
507            },
508            kmem_stats: {
509                total_bytes: 1u64,
510                free_bytes: 2u64,
511                wired_bytes: 3u64,
512                total_heap_bytes: 4u64,
513                free_heap_bytes: 5u64,
514                vmo_bytes: 6u64,
515                mmu_overhead_bytes: 7u64,
516                ipc_bytes: 8u64,
517                other_bytes: 9u64,
518                free_loaned_bytes: 10u64,
519                cache_bytes: 11u64,
520                slab_bytes: 12u64,
521                zram_bytes: 13u64,
522                vmo_reclaim_total_bytes: 14u64,
523                vmo_reclaim_newest_bytes: 15u64,
524                vmo_reclaim_oldest_bytes: 16u64,
525                vmo_reclaim_disabled_bytes: 17u64,
526                vmo_discardable_locked_bytes: 18u64,
527                vmo_discardable_unlocked_bytes: 19u64
528            },
529            kmem_stats_compression: {
530                uncompressed_storage_bytes: 20u64,
531                compressed_storage_bytes: 21u64,
532                compressed_fragmentation_bytes: 22u64,
533                compression_time: 23i64,
534                decompression_time: 24i64,
535                total_page_compression_attempts: 25u64,
536                failed_page_compression_attempts: 26u64,
537                total_page_decompressions: 27u64,
538                compressed_page_evictions: 28u64,
539                eager_page_compressions: 29u64,
540                memory_pressure_page_compressions: 30u64,
541                critical_memory_page_compressions: 31u64,
542                pages_decompressed_unit_ns: 32u64,
543                pages_decompressed_within_log_time: vec![
544                    40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
545                ]
546            },
547            stalls: {
548                current_some: 10i64,
549                current_full: 20i64,
550                rate_some: 1i64,
551                rate_full: 2i64,
552                rate_interval_s: 60i64
553            }
554        });
555    }
556
557    #[test]
558    fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
559        let mut exec = fasync::TestExecutor::new_with_fake_time();
560        let data_provider = Arc::new(FakeAttributionDataProvider {});
561        let (stats_provider, stats_request_stream) =
562            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
563
564        fasync::Task::spawn(async move {
565            serve_kernel_stats(stats_request_stream).await.unwrap();
566        })
567        .detach();
568
569        let inspector = fuchsia_inspect::Inspector::default();
570        let (pressure_provider, pressure_request_stream) =
571            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
572        let mut digest_service = std::pin::pin!(digest_service(
573            Config {
574                capture_on_pressure_change: true,
575                imminent_oom_capture_delay_s: 10,
576                critical_capture_delay_s: 10,
577                warning_capture_delay_s: 10,
578                normal_capture_delay_s: 10,
579            },
580            data_provider,
581            stats_provider,
582            pressure_provider,
583            vec![],
584            inspector.root().create_child("logger"),
585        )?);
586        // Expects digst_service to register a watcher, answers with
587        // an initial pressure level, then returns the watcher for
588        // further signaling. Panics if this whole transaction is not
589        // immediately ready.
590        let Poll::Ready(watcher) = exec
591            .run_until_stalled(
592                &mut pressure_request_stream
593                    .then(|request| async {
594                        let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
595                            request.expect("digest_service failed to register a watcher");
596                        let watcher = watcher.into_proxy();
597                        watcher.on_level_changed(fpressure::Level::Normal).await.expect(
598                            "digest_service failed to acknowledge the initial pressure level",
599                        );
600                        watcher
601                    })
602                    .boxed()
603                    .into_future(),
604            )
605            .map(|(watcher, _)| {
606                watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
607            })?
608        else {
609            panic!("digest_service failed to register a watcher");
610        };
611        // Send a pressure signal, to trigger a capture.
612        assert!(exec
613            .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
614            .is_ready());
615        // Ensure that digest_service has an opportunity to react to the pressure signal.
616        let _ = exec.run_until_stalled(&mut digest_service);
617
618        // Fake the passage of time, so that digest_service may do another capture.
619        assert!(exec
620            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
621                exec.now() + Duration::from_secs(10).into()
622            )))
623            .is_ready());
624        // Ensure that digest_service has an opportunity to react to the passage of time.
625        let _ = exec.run_until_stalled(&mut digest_service)?;
626
627        // This should resolve immediately because the inspect hierarchy has been populated by now.
628        let Poll::Ready(output) = exec
629            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
630            .map(|r| r.expect("got hierarchy"))
631        else {
632            panic!("Couldn't retrieve inspect output");
633        };
634
635        assert_data_tree!(output, root: {
636            logger: {
637                buckets: vec![
638                    "Undigested",
639                    "Orphaned",
640                    "Kernel",
641                    "Free",
642                    "[Addl]PagerTotal",
643                    "[Addl]PagerNewest",
644                    "[Addl]PagerOldest",
645                    "[Addl]DiscardableLocked",
646                    "[Addl]DiscardableUnlocked",
647                    "[Addl]ZramCompressedBytes",
648                ],
649                measurements: {
650                    // Corresponds to the capture on pressure change
651                    "0": {
652                        timestamp: NonZeroIntProperty,
653                        bucket_sizes: vec![
654                            1024u64, // Undigested: matches the single unmatched VMO
655                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
656                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
657                            2u64,    // Free
658                            14u64,   // [Addl]PagerTotal
659                            15u64,   // [Addl]PagerNewest
660                            16u64,   // [Addl]PagerOldest
661                            18u64,   // [Addl]DiscardableLocked
662                            19u64,   // [Addl]DiscardableUnlocked
663                            21u64,   // [Addl]ZramCompressedBytes
664                        ],
665                    },
666                    // Corresponds to the capture after the passage of time
667                    "1": {
668                        timestamp: NonZeroIntProperty,
669                        bucket_sizes: vec![
670                            1024u64, // Undigested: matches the single unmatched VMO
671                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
672                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
673                            2u64,    // Free
674                            14u64,   // [Addl]PagerTotal
675                            15u64,   // [Addl]PagerNewest
676                            16u64,   // [Addl]PagerOldest
677                            18u64,   // [Addl]DiscardableLocked
678                            19u64,   // [Addl]DiscardableUnlocked
679                            21u64,   // [Addl]ZramCompressedBytes
680                        ],
681                    },
682                },
683            },
684        });
685        Ok(())
686    }
687
688    #[test]
689    fn test_digest_service_wait() -> anyhow::Result<()> {
690        let mut exec = fasync::TestExecutor::new_with_fake_time();
691        let data_provider = Arc::new(FakeAttributionDataProvider {});
692        let (stats_provider, stats_request_stream) =
693            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
694
695        fasync::Task::spawn(async move {
696            serve_kernel_stats(stats_request_stream).await.unwrap();
697        })
698        .detach();
699        let (pressure_provider, pressure_request_stream) =
700            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
701        let inspector = fuchsia_inspect::Inspector::default();
702        let mut digest_service = std::pin::pin!(digest_service(
703            Config {
704                capture_on_pressure_change: false,
705                imminent_oom_capture_delay_s: 10,
706                critical_capture_delay_s: 10,
707                warning_capture_delay_s: 10,
708                normal_capture_delay_s: 10,
709            },
710            data_provider,
711            stats_provider,
712            pressure_provider,
713            vec![],
714            inspector.root().create_child("logger"),
715        )?);
716        // digest_service registers a watcher; make sure we answer.  Also, make sure not to drop the
717        // proxy nor the pressure stream; early termination would get reported to digest_service,
718        // which then prematurely interrupts it, before the timers have a chance to run.
719        let Poll::Ready((_watcher, _pressure_stream)) = exec
720            .run_until_stalled(
721                &mut std::pin::pin!(pressure_request_stream.then(|request| async {
722                    let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
723                        request.map_err(anyhow::Error::from)?;
724                    let watcher_proxy = watcher.into_proxy();
725                    let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
726                    Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
727                }))
728                .into_future(),
729            )
730            .map(|(watcher, pressure_stream)| {
731                (
732                    watcher.ok_or_else(|| {
733                        anyhow::Error::msg("Pressure stream unexpectedly exhausted")
734                    }),
735                    pressure_stream,
736                )
737            })
738        else {
739            panic!("Failed to register the watcher");
740        };
741
742        // Give digest_service the opportunity to setup its timers.
743        let _ = exec.run_until_stalled(&mut digest_service)?;
744        // Fake the passage of time, so that digest_service may do another capture.
745        assert!(exec
746            .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
747                exec.now() + Duration::from_secs(15).into()
748            )))
749            .is_ready());
750        // Ensure that digest_service has an opportunity to react to the passage of time.
751        assert!(exec.run_until_stalled(&mut digest_service).is_pending());
752        // This should resolve immediately because the inspect hierarchy has been populated by now.
753        let Poll::Ready(output) = exec
754            .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
755            .map(|r| r.expect("got hierarchy"))
756        else {
757            panic!("Couldn't retrieve inspect output");
758        };
759
760        assert_data_tree!(output, root: {
761            logger: {
762                buckets: vec![
763                    "Undigested",
764                    "Orphaned",
765                    "Kernel",
766                    "Free",
767                    "[Addl]PagerTotal",
768                    "[Addl]PagerNewest",
769                    "[Addl]PagerOldest",
770                    "[Addl]DiscardableLocked",
771                    "[Addl]DiscardableUnlocked",
772                    "[Addl]ZramCompressedBytes",
773                ],
774                measurements: {
775                    // Corresponds to the capture after the passage of time
776                    "0": {
777                        timestamp: NonZeroIntProperty,
778                        bucket_sizes: vec![
779                            1024u64, // Undigested: matches the single unmatched VMO
780                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
781                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
782                            2u64,    // Free
783                            14u64,   // [Addl]PagerTotal
784                            15u64,   // [Addl]PagerNewest
785                            16u64,   // [Addl]PagerOldest
786                            18u64,   // [Addl]DiscardableLocked
787                            19u64,   // [Addl]DiscardableUnlocked
788                            21u64,   // [Addl]ZramCompressedBytes
789                        ],
790                    },
791                },
792            },
793        });
794        Ok(())
795    }
796
797    #[test]
798    fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
799        let mut exec = fasync::TestExecutor::new();
800        let data_provider = Arc::new(FakeAttributionDataProvider {});
801        let (stats_provider, stats_request_stream) =
802            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
803
804        fasync::Task::spawn(async move {
805            serve_kernel_stats(stats_request_stream).await.unwrap();
806        })
807        .detach();
808
809        let inspector = fuchsia_inspect::Inspector::default();
810        let (pressure_provider, pressure_request_stream) =
811            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
812        let mut serve_pressure_stream = pressure_request_stream
813            .then(|request| async {
814                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
815                    request.map_err(anyhow::Error::from)?;
816                let watcher_proxy = watcher.into_proxy();
817                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
818                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
819            })
820            .boxed();
821        let mut digest_service = std::pin::pin!(digest_service(
822            Config {
823                capture_on_pressure_change: false,
824                imminent_oom_capture_delay_s: 10,
825                critical_capture_delay_s: 10,
826                warning_capture_delay_s: 10,
827                normal_capture_delay_s: 10,
828            },
829            data_provider,
830            stats_provider,
831            pressure_provider,
832            vec![],
833            inspector.root().create_child("logger"),
834        )?);
835        let watcher =
836            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
837        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
838        let _ = exec.run_until_stalled(&mut digest_service);
839        let output = exec
840            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
841            .expect("got hierarchy");
842
843        assert_data_tree!(output, root: {
844            logger: {
845                measurements: {},
846            },
847        });
848        Ok(())
849    }
850
851    #[test]
852    fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
853        let mut exec = fasync::TestExecutor::new();
854        let data_provider = Arc::new(FakeAttributionDataProvider {});
855        let (stats_provider, stats_request_stream) =
856            fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
857
858        fasync::Task::spawn(async move {
859            serve_kernel_stats(stats_request_stream).await.unwrap();
860        })
861        .detach();
862
863        let inspector = fuchsia_inspect::Inspector::default();
864        let (pressure_provider, pressure_request_stream) =
865            fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
866        let mut serve_pressure_stream = pressure_request_stream
867            .then(|request| async {
868                let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
869                    request.map_err(anyhow::Error::from)?;
870                let watcher_proxy = watcher.into_proxy();
871                let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
872                Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
873            })
874            .boxed();
875        let mut digest_service = std::pin::pin!(digest_service(
876            Config {
877                capture_on_pressure_change: true,
878                imminent_oom_capture_delay_s: 10,
879                critical_capture_delay_s: 10,
880                warning_capture_delay_s: 10,
881                normal_capture_delay_s: 10,
882            },
883            data_provider,
884            stats_provider,
885            pressure_provider,
886            vec![],
887            inspector.root().create_child("logger"),
888        )?);
889        let watcher =
890            exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
891        let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
892        let _ = exec.run_until_stalled(&mut digest_service);
893        let output = exec
894            .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
895            .expect("got hierarchy");
896
897        assert_data_tree!(output, root: {
898            logger: {
899                buckets: vec![
900                    "Undigested",
901                    "Orphaned",
902                    "Kernel",
903                    "Free",
904                    "[Addl]PagerTotal",
905                    "[Addl]PagerNewest",
906                    "[Addl]PagerOldest",
907                    "[Addl]DiscardableLocked",
908                    "[Addl]DiscardableUnlocked",
909                    "[Addl]ZramCompressedBytes",
910                ],
911                measurements: {
912                    "0": {
913                        timestamp: NonZeroIntProperty,
914                        bucket_sizes: vec![
915                            1024u64, // Undigested: matches the single unmatched VMO
916                            6u64,    // Orphaned: vmo_bytes reported by the kernel but not covered by any bucket
917                            31u64,   // Kernel: 3 wired + 4 heap + 7 mmu + 8 IPC + 9 other = 31
918                            2u64,    // Free
919                            14u64,   // [Addl]PagerTotal
920                            15u64,   // [Addl]PagerNewest
921                            16u64,   // [Addl]PagerOldest
922                            18u64,   // [Addl]DiscardableLocked
923                            19u64,   // [Addl]DiscardableUnlocked
924                            21u64,   // [Addl]ZramCompressedBytes
925                        ],
926                    },
927                },
928            },
929        });
930        Ok(())
931    }
932}