1use anyhow::{anyhow, Error, Result};
5use attribution_processing::digest::{BucketDefinition, Digest};
6use attribution_processing::AttributionDataProvider;
7use fpressure::WatcherRequest;
8use fuchsia_async::{MonotonicDuration, Task, WakeupTime};
9use fuchsia_inspect::{ArrayProperty, Inspector, Node};
10use fuchsia_inspect_contrib::nodes::BoundedListNode;
11use futures::{select, try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
12use inspect_runtime::PublishedInspectController;
13use log::debug;
14use memory_monitor2_config::Config;
15use stalls::StallProvider;
16use std::sync::Arc;
17use {
18 fidl_fuchsia_kernel as fkernel, fidl_fuchsia_memorypressure as fpressure, fuchsia_inspect as _,
19 inspect_runtime as _,
20};
21
22pub struct ServiceTask {
25 _inspect_controller: PublishedInspectController,
26 _periodic_digest: Task<Result<(), anyhow::Error>>,
27}
28
29pub fn start_service(
32 attribution_data_service: Arc<impl AttributionDataProvider>,
33 kernel_stats_proxy: fkernel::StatsProxy,
34 stall_provider: Arc<impl StallProvider>,
35 memory_monitor2_config: Config,
36 memorypressure_proxy: fpressure::ProviderProxy,
37 bucket_definitions: Vec<BucketDefinition>,
38) -> Result<ServiceTask> {
39 debug!("Start serving inspect tree.");
40
41 let inspector = fuchsia_inspect::component::inspector();
44
45 let inspect_controller =
47 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
48 .ok_or_else(|| anyhow!("Failed to serve server handling `fuchsia.inspect.Tree`"))?;
49
50 build_inspect_tree(
51 attribution_data_service.clone(),
52 kernel_stats_proxy.clone(),
53 stall_provider,
54 inspector,
55 );
56 let digest_service = digest_service(
57 memory_monitor2_config,
58 attribution_data_service,
59 kernel_stats_proxy,
60 memorypressure_proxy,
61 bucket_definitions,
62 inspector.root().create_child("logger"),
63 )?;
64 Ok(ServiceTask { _inspect_controller: inspect_controller, _periodic_digest: digest_service })
65}
66
67fn build_inspect_tree(
68 attribution_data_service: Arc<impl AttributionDataProvider>,
69 kernel_stats_proxy: fkernel::StatsProxy,
70 stall_provider: Arc<impl StallProvider>,
71 inspector: &Inspector,
72) {
73 {
75 let kernel_stats_proxy = kernel_stats_proxy.clone();
76 inspector.root().record_lazy_child("kmem_stats", move || {
77 let kernel_stats_proxy = kernel_stats_proxy.clone();
78 async move {
79 let inspector = Inspector::default();
80 let root = inspector.root();
81 let mem_stats = kernel_stats_proxy.get_memory_stats().await?;
82 mem_stats.total_bytes.map(|v| root.record_uint("total_bytes", v));
83 mem_stats.free_bytes.map(|v| root.record_uint("free_bytes", v));
84 mem_stats.free_loaned_bytes.map(|v| root.record_uint("free_loaned_bytes", v));
85 mem_stats.wired_bytes.map(|v| root.record_uint("wired_bytes", v));
86 mem_stats.total_heap_bytes.map(|v| root.record_uint("total_heap_bytes", v));
87 mem_stats.free_heap_bytes.map(|v| root.record_uint("free_heap_bytes", v));
88 mem_stats.vmo_bytes.map(|v| root.record_uint("vmo_bytes", v));
89 mem_stats.mmu_overhead_bytes.map(|v| root.record_uint("mmu_overhead_bytes", v));
90 mem_stats.ipc_bytes.map(|v| root.record_uint("ipc_bytes", v));
91 mem_stats.cache_bytes.map(|v| root.record_uint("cache_bytes", v));
92 mem_stats.slab_bytes.map(|v| root.record_uint("slab_bytes", v));
93 mem_stats.zram_bytes.map(|v| root.record_uint("zram_bytes", v));
94 mem_stats.other_bytes.map(|v| root.record_uint("other_bytes", v));
95 mem_stats
96 .vmo_reclaim_total_bytes
97 .map(|v| root.record_uint("vmo_reclaim_total_bytes", v));
98 mem_stats
99 .vmo_reclaim_newest_bytes
100 .map(|v| root.record_uint("vmo_reclaim_newest_bytes", v));
101 mem_stats
102 .vmo_reclaim_oldest_bytes
103 .map(|v| root.record_uint("vmo_reclaim_oldest_bytes", v));
104 mem_stats
105 .vmo_reclaim_disabled_bytes
106 .map(|v| root.record_uint("vmo_reclaim_disabled_bytes", v));
107 mem_stats
108 .vmo_discardable_locked_bytes
109 .map(|v| root.record_uint("vmo_discardable_locked_bytes", v));
110 mem_stats
111 .vmo_discardable_unlocked_bytes
112 .map(|v| root.record_uint("vmo_discardable_unlocked_bytes", v));
113 Ok(inspector)
114 }
115 .boxed()
116 })
117 };
118
119 {
120 inspector.root().record_lazy_child("kmem_stats_compression", move || {
121 let kernel_stats_proxy = kernel_stats_proxy.clone();
122 async move {
123 let inspector = Inspector::default();
124 let cmp_stats = kernel_stats_proxy.get_memory_stats_compression().await?;
125 cmp_stats
126 .uncompressed_storage_bytes
127 .map(|v| inspector.root().record_uint("uncompressed_storage_bytes", v));
128 cmp_stats
129 .compressed_storage_bytes
130 .map(|v| inspector.root().record_uint("compressed_storage_bytes", v));
131 cmp_stats
132 .compressed_fragmentation_bytes
133 .map(|v| inspector.root().record_uint("compressed_fragmentation_bytes", v));
134 cmp_stats
135 .compression_time
136 .map(|v| inspector.root().record_int("compression_time", v));
137 cmp_stats
138 .decompression_time
139 .map(|v| inspector.root().record_int("decompression_time", v));
140 cmp_stats
141 .total_page_compression_attempts
142 .map(|v| inspector.root().record_uint("total_page_compression_attempts", v));
143 cmp_stats
144 .failed_page_compression_attempts
145 .map(|v| inspector.root().record_uint("failed_page_compression_attempts", v));
146 cmp_stats
147 .total_page_decompressions
148 .map(|v| inspector.root().record_uint("total_page_decompressions", v));
149 cmp_stats
150 .compressed_page_evictions
151 .map(|v| inspector.root().record_uint("compressed_page_evictions", v));
152 cmp_stats
153 .eager_page_compressions
154 .map(|v| inspector.root().record_uint("eager_page_compressions", v));
155 cmp_stats
156 .memory_pressure_page_compressions
157 .map(|v| inspector.root().record_uint("memory_pressure_page_compressions", v));
158 cmp_stats
159 .critical_memory_page_compressions
160 .map(|v| inspector.root().record_uint("critical_memory_page_compressions", v));
161 cmp_stats
162 .pages_decompressed_unit_ns
163 .map(|v| inspector.root().record_uint("pages_decompressed_unit_ns", v));
164 cmp_stats.pages_decompressed_within_log_time.map(|v| {
165 let array =
166 inspector.root().create_uint_array("pages_decompressed_within_log_time", 8);
167 array.set(0, v[0]);
169 array.set(1, v[1]);
170 array.set(2, v[2]);
171 array.set(3, v[3]);
172 array.set(4, v[4]);
173 array.set(5, v[5]);
174 array.set(6, v[6]);
175 array.set(7, v[7]);
176 inspector.root().record(array);
177 });
178 Ok(inspector)
179 }
180 .boxed()
181 });
182 }
183
184 {
185 inspector.root().record_lazy_child("current", move || {
186 let attribution_data_service = attribution_data_service.clone();
187 async move {
188 let inspector = Inspector::default();
189 let current_attribution_data =
190 attribution_data_service.get_attribution_data().await?;
191 let summary =
192 attribution_processing::attribute_vmos(current_attribution_data).summary();
193
194 summary.principals.into_iter().for_each(|p| {
195 let node = inspector.root().create_child(p.name);
196 node.record_uint("committed_private", p.committed_private);
197 node.record_double("committed_scaled", p.committed_scaled);
198 node.record_uint("committed_total", p.committed_total);
199 node.record_uint("populated_private", p.populated_private);
200 node.record_double("populated_scaled", p.populated_scaled);
201 node.record_uint("populated_total", p.populated_total);
202 inspector.root().record(node);
203 });
204 Ok(inspector)
205 }
206 .boxed()
207 });
208 }
209
210 {
211 inspector.root().record_lazy_child("stalls", move || {
212 let stall_info = stall_provider.get_stall_info().unwrap();
213 let stall_rate_opt = stall_provider.get_stall_rate();
214 async move {
215 let inspector = Inspector::default();
216 inspector.root().record_int("current_some", stall_info.stall_time_some);
217 inspector.root().record_int("current_full", stall_info.stall_time_full);
218 if let Some(stall_rate) = stall_rate_opt {
219 inspector
220 .root()
221 .record_int("rate_interval_s", stall_rate.interval.into_seconds());
222 inspector.root().record_int("rate_some", stall_rate.rate_some);
223 inspector.root().record_int("rate_full", stall_rate.rate_full);
224 }
225 Ok(inspector)
226 }
227 .boxed()
228 });
229 }
230}
231
232fn digest_service(
233 memory_monitor2_config: Config,
234 attribution_data_service: Arc<impl AttributionDataProvider + 'static>,
235 kernel_stats_proxy: fkernel::StatsProxy,
236 memorypressure_proxy: fpressure::ProviderProxy,
237 bucket_definitions: Vec<BucketDefinition>,
238 digest_node: Node,
239) -> Result<Task<Result<(), Error>>> {
240 let (watcher, watcher_stream) =
242 fidl::endpoints::create_request_stream::<fpressure::WatcherMarker>();
243 memorypressure_proxy.register_watcher(watcher)?;
244
245 Ok(fuchsia_async::Task::spawn(async move {
246 let mut buckets_list_node =
247 BoundedListNode::new(digest_node.create_child("measurements"), 100);
249 let buckets_names = std::cell::OnceCell::new();
250 let attribution_data_service = attribution_data_service;
251 let pressure_stream = watcher_stream.map_err(anyhow::Error::from);
252
253 let (request, mut pressure_stream) = pressure_stream.into_future().await;
255 let WatcherRequest::OnLevelChanged { level, responder } = request.ok_or_else(|| {
256 anyhow::Error::msg(
257 "Unexpectedly exhausted pressure stream before receiving baseline pressure level",
258 )
259 })??;
260 responder.send()?;
261 let mut current_level = level;
262 let new_timer = |level| {
263 MonotonicDuration::from_seconds(match level {
264 fpressure::Level::Normal => memory_monitor2_config.normal_capture_delay_s,
265 fpressure::Level::Warning => memory_monitor2_config.warning_capture_delay_s,
266 fpressure::Level::Critical => memory_monitor2_config.critical_capture_delay_s,
267 } as i64)
268 .into_timer()
269 .boxed()
270 .fuse()
271 };
272 let mut timer = new_timer(current_level);
273 loop {
274 let () = select! {
277 pressure = pressure_stream.next() =>
280 match pressure.ok_or_else(|| anyhow::Error::msg("Unexpectedly exhausted pressure stream"))?? {
281 WatcherRequest::OnLevelChanged{level, responder} => {
282 responder.send()?;
283 if level == current_level { continue; }
284 current_level = level;
285 timer = new_timer(level);
286 if !memory_monitor2_config.capture_on_pressure_change { continue; }
287 },
288 },
289 _ = timer => {timer = new_timer(current_level);}
292 };
293
294 let (attribution_data, kmem_stats, kmem_stats_compression) = try_join!(
296 attribution_data_service.get_attribution_data(),
297 kernel_stats_proxy.get_memory_stats().map_err(anyhow::Error::from),
298 kernel_stats_proxy.get_memory_stats_compression().map_err(anyhow::Error::from)
299 )?;
300
301 let Digest { buckets } = Digest::new(
303 &attribution_data,
304 kmem_stats,
305 kmem_stats_compression,
306 &bucket_definitions,
307 );
308
309 let _ = buckets_names.get_or_init(|| {
311 let buckets_names = digest_node.create_string_array("buckets", buckets.len());
313 for (i, attribution_processing::digest::Bucket { name, .. }) in
314 buckets.iter().enumerate()
315 {
316 buckets_names.set(i, name);
317 }
318 buckets_names
319 });
320
321 buckets_list_node.add_entry(|n| {
323 n.record_int("timestamp", zx::BootInstant::get().into_nanos());
324 let ia = n.create_uint_array("bucket_sizes", buckets.len());
325 for (i, b) in buckets.iter().enumerate() {
326 ia.set(i, b.size as u64);
327 }
328 n.record(ia);
329 });
330 }
331 }))
332}
333
334#[cfg(test)]
335mod tests {
336 use attribution_processing::{
337 Attribution, AttributionData, Principal, PrincipalDescription, PrincipalIdentifier,
338 PrincipalType, Resource, ResourceReference,
339 };
340 use diagnostics_assertions::{assert_data_tree, NonZeroIntProperty};
341 use fuchsia_async::TestExecutor;
342 use futures::future::BoxFuture;
343 use futures::task::Poll;
344 use futures::TryStreamExt;
345 use {
346 fidl_fuchsia_memory_attribution_plugin as fplugin,
347 fidl_fuchsia_memorypressure as fpressure, fuchsia_async as fasync,
348 };
349
350 use super::*;
351 use std::time::Duration;
352
353 struct FakeAttributionDataProvider {}
354
355 impl AttributionDataProvider for FakeAttributionDataProvider {
356 fn get_attribution_data(&self) -> BoxFuture<'_, Result<AttributionData, anyhow::Error>> {
357 async {
358 Ok(AttributionData {
359 principals_vec: vec![Principal {
360 identifier: PrincipalIdentifier(1),
361 description: PrincipalDescription::Component("principal".to_owned()),
362 principal_type: PrincipalType::Runnable,
363 parent: None,
364 }],
365 resources_vec: vec![Resource {
366 koid: 10,
367 name_index: 0,
368 resource_type: fplugin::ResourceType::Vmo(fplugin::Vmo {
369 parent: None,
370 private_committed_bytes: Some(1024),
371 private_populated_bytes: Some(2048),
372 scaled_committed_bytes: Some(1024),
373 scaled_populated_bytes: Some(2048),
374 total_committed_bytes: Some(1024),
375 total_populated_bytes: Some(2048),
376 ..Default::default()
377 }),
378 }],
379 resource_names: vec!["resource".to_owned()],
380 attributions: vec![Attribution {
381 source: PrincipalIdentifier(1),
382 subject: PrincipalIdentifier(1),
383 resources: vec![ResourceReference::KernelObject(10)],
384 }],
385 })
386 }
387 .boxed()
388 }
389 }
390
391 async fn serve_kernel_stats(
392 mut request_stream: fkernel::StatsRequestStream,
393 ) -> Result<(), fidl::Error> {
394 while let Some(request) = request_stream.try_next().await? {
395 match request {
396 fkernel::StatsRequest::GetMemoryStats { responder } => {
397 responder
398 .send(&fkernel::MemoryStats {
399 total_bytes: Some(1),
400 free_bytes: Some(2),
401 wired_bytes: Some(3),
402 total_heap_bytes: Some(4),
403 free_heap_bytes: Some(5),
404 vmo_bytes: Some(6),
405 mmu_overhead_bytes: Some(7),
406 ipc_bytes: Some(8),
407 other_bytes: Some(9),
408 free_loaned_bytes: Some(10),
409 cache_bytes: Some(11),
410 slab_bytes: Some(12),
411 zram_bytes: Some(13),
412 vmo_reclaim_total_bytes: Some(14),
413 vmo_reclaim_newest_bytes: Some(15),
414 vmo_reclaim_oldest_bytes: Some(16),
415 vmo_reclaim_disabled_bytes: Some(17),
416 vmo_discardable_locked_bytes: Some(18),
417 vmo_discardable_unlocked_bytes: Some(19),
418 ..Default::default()
419 })
420 .unwrap();
421 }
422 fkernel::StatsRequest::GetMemoryStatsExtended { responder: _ } => {
423 unimplemented!("Deprecated call, should not be used")
424 }
425 fkernel::StatsRequest::GetMemoryStatsCompression { responder } => {
426 responder
427 .send(&fkernel::MemoryStatsCompression {
428 uncompressed_storage_bytes: Some(20),
429 compressed_storage_bytes: Some(21),
430 compressed_fragmentation_bytes: Some(22),
431 compression_time: Some(23),
432 decompression_time: Some(24),
433 total_page_compression_attempts: Some(25),
434 failed_page_compression_attempts: Some(26),
435 total_page_decompressions: Some(27),
436 compressed_page_evictions: Some(28),
437 eager_page_compressions: Some(29),
438 memory_pressure_page_compressions: Some(30),
439 critical_memory_page_compressions: Some(31),
440 pages_decompressed_unit_ns: Some(32),
441 pages_decompressed_within_log_time: Some([
442 40, 41, 42, 43, 44, 45, 46, 47,
443 ]),
444 ..Default::default()
445 })
446 .unwrap();
447 }
448 fkernel::StatsRequest::GetCpuStats { responder: _ } => unimplemented!(),
449 fkernel::StatsRequest::GetCpuLoad { duration: _, responder: _ } => unimplemented!(),
450 }
451 }
452 Ok(())
453 }
454
455 #[test]
456 fn test_build_inspect_tree() {
457 let mut exec = fasync::TestExecutor::new();
458
459 let data_provider = Arc::new(FakeAttributionDataProvider {});
460
461 let (stats_provider, stats_request_stream) =
462 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
463
464 fasync::Task::spawn(async move {
465 serve_kernel_stats(stats_request_stream).await.unwrap();
466 })
467 .detach();
468
469 let inspector = fuchsia_inspect::Inspector::default();
470
471 struct FakeStallProvider {}
472 impl StallProvider for FakeStallProvider {
473 fn get_stall_info(&self) -> Result<zx::MemoryStall, anyhow::Error> {
474 Ok(zx::MemoryStall { stall_time_some: 10, stall_time_full: 20 })
475 }
476
477 fn get_stall_rate(&self) -> Option<stalls::MemoryStallRate> {
478 Some(stalls::MemoryStallRate {
479 interval: fasync::MonotonicDuration::from_seconds(60),
480 rate_some: 1,
481 rate_full: 2,
482 })
483 }
484 }
485
486 build_inspect_tree(
487 data_provider,
488 stats_provider,
489 Arc::new(FakeStallProvider {}),
490 &inspector,
491 );
492
493 let output = exec
494 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
495 .expect("got hierarchy");
496
497 assert_data_tree!(output, root: {
498 current: {
499 principal: {
500 committed_private: 1024u64,
501 committed_scaled: 1024.0,
502 committed_total: 1024u64,
503 populated_private: 2048u64,
504 populated_scaled: 2048.0,
505 populated_total: 2048u64
506 }
507 },
508 kmem_stats: {
509 total_bytes: 1u64,
510 free_bytes: 2u64,
511 wired_bytes: 3u64,
512 total_heap_bytes: 4u64,
513 free_heap_bytes: 5u64,
514 vmo_bytes: 6u64,
515 mmu_overhead_bytes: 7u64,
516 ipc_bytes: 8u64,
517 other_bytes: 9u64,
518 free_loaned_bytes: 10u64,
519 cache_bytes: 11u64,
520 slab_bytes: 12u64,
521 zram_bytes: 13u64,
522 vmo_reclaim_total_bytes: 14u64,
523 vmo_reclaim_newest_bytes: 15u64,
524 vmo_reclaim_oldest_bytes: 16u64,
525 vmo_reclaim_disabled_bytes: 17u64,
526 vmo_discardable_locked_bytes: 18u64,
527 vmo_discardable_unlocked_bytes: 19u64
528 },
529 kmem_stats_compression: {
530 uncompressed_storage_bytes: 20u64,
531 compressed_storage_bytes: 21u64,
532 compressed_fragmentation_bytes: 22u64,
533 compression_time: 23i64,
534 decompression_time: 24i64,
535 total_page_compression_attempts: 25u64,
536 failed_page_compression_attempts: 26u64,
537 total_page_decompressions: 27u64,
538 compressed_page_evictions: 28u64,
539 eager_page_compressions: 29u64,
540 memory_pressure_page_compressions: 30u64,
541 critical_memory_page_compressions: 31u64,
542 pages_decompressed_unit_ns: 32u64,
543 pages_decompressed_within_log_time: vec![
544 40u64, 41u64, 42u64, 43u64, 44u64, 45u64, 46u64, 47u64,
545 ]
546 },
547 stalls: {
548 current_some: 10i64,
549 current_full: 20i64,
550 rate_some: 1i64,
551 rate_full: 2i64,
552 rate_interval_s: 60i64
553 }
554 });
555 }
556
557 #[test]
558 fn test_digest_service_capture_on_pressure_change_and_wait() -> anyhow::Result<()> {
559 let mut exec = fasync::TestExecutor::new_with_fake_time();
560 let data_provider = Arc::new(FakeAttributionDataProvider {});
561 let (stats_provider, stats_request_stream) =
562 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
563
564 fasync::Task::spawn(async move {
565 serve_kernel_stats(stats_request_stream).await.unwrap();
566 })
567 .detach();
568
569 let inspector = fuchsia_inspect::Inspector::default();
570 let (pressure_provider, pressure_request_stream) =
571 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
572 let mut digest_service = std::pin::pin!(digest_service(
573 Config {
574 capture_on_pressure_change: true,
575 imminent_oom_capture_delay_s: 10,
576 critical_capture_delay_s: 10,
577 warning_capture_delay_s: 10,
578 normal_capture_delay_s: 10,
579 },
580 data_provider,
581 stats_provider,
582 pressure_provider,
583 vec![],
584 inspector.root().create_child("logger"),
585 )?);
586 let Poll::Ready(watcher) = exec
591 .run_until_stalled(
592 &mut pressure_request_stream
593 .then(|request| async {
594 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
595 request.expect("digest_service failed to register a watcher");
596 let watcher = watcher.into_proxy();
597 watcher.on_level_changed(fpressure::Level::Normal).await.expect(
598 "digest_service failed to acknowledge the initial pressure level",
599 );
600 watcher
601 })
602 .boxed()
603 .into_future(),
604 )
605 .map(|(watcher, _)| {
606 watcher.ok_or_else(|| anyhow::Error::msg("failed to register watcher"))
607 })?
608 else {
609 panic!("digest_service failed to register a watcher");
610 };
611 assert!(exec
613 .run_until_stalled(&mut watcher.on_level_changed(fpressure::Level::Warning))?
614 .is_ready());
615 let _ = exec.run_until_stalled(&mut digest_service);
617
618 assert!(exec
620 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
621 exec.now() + Duration::from_secs(10).into()
622 )))
623 .is_ready());
624 let _ = exec.run_until_stalled(&mut digest_service)?;
626
627 let Poll::Ready(output) = exec
629 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
630 .map(|r| r.expect("got hierarchy"))
631 else {
632 panic!("Couldn't retrieve inspect output");
633 };
634
635 assert_data_tree!(output, root: {
636 logger: {
637 buckets: vec![
638 "Undigested",
639 "Orphaned",
640 "Kernel",
641 "Free",
642 "[Addl]PagerTotal",
643 "[Addl]PagerNewest",
644 "[Addl]PagerOldest",
645 "[Addl]DiscardableLocked",
646 "[Addl]DiscardableUnlocked",
647 "[Addl]ZramCompressedBytes",
648 ],
649 measurements: {
650 "0": {
652 timestamp: NonZeroIntProperty,
653 bucket_sizes: vec![
654 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
665 },
666 "1": {
668 timestamp: NonZeroIntProperty,
669 bucket_sizes: vec![
670 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
681 },
682 },
683 },
684 });
685 Ok(())
686 }
687
688 #[test]
689 fn test_digest_service_wait() -> anyhow::Result<()> {
690 let mut exec = fasync::TestExecutor::new_with_fake_time();
691 let data_provider = Arc::new(FakeAttributionDataProvider {});
692 let (stats_provider, stats_request_stream) =
693 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
694
695 fasync::Task::spawn(async move {
696 serve_kernel_stats(stats_request_stream).await.unwrap();
697 })
698 .detach();
699 let (pressure_provider, pressure_request_stream) =
700 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
701 let inspector = fuchsia_inspect::Inspector::default();
702 let mut digest_service = std::pin::pin!(digest_service(
703 Config {
704 capture_on_pressure_change: false,
705 imminent_oom_capture_delay_s: 10,
706 critical_capture_delay_s: 10,
707 warning_capture_delay_s: 10,
708 normal_capture_delay_s: 10,
709 },
710 data_provider,
711 stats_provider,
712 pressure_provider,
713 vec![],
714 inspector.root().create_child("logger"),
715 )?);
716 let Poll::Ready((_watcher, _pressure_stream)) = exec
720 .run_until_stalled(
721 &mut std::pin::pin!(pressure_request_stream.then(|request| async {
722 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
723 request.map_err(anyhow::Error::from)?;
724 let watcher_proxy = watcher.into_proxy();
725 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
726 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
727 }))
728 .into_future(),
729 )
730 .map(|(watcher, pressure_stream)| {
731 (
732 watcher.ok_or_else(|| {
733 anyhow::Error::msg("Pressure stream unexpectedly exhausted")
734 }),
735 pressure_stream,
736 )
737 })
738 else {
739 panic!("Failed to register the watcher");
740 };
741
742 let _ = exec.run_until_stalled(&mut digest_service)?;
744 assert!(exec
746 .run_until_stalled(&mut std::pin::pin!(TestExecutor::advance_to(
747 exec.now() + Duration::from_secs(15).into()
748 )))
749 .is_ready());
750 assert!(exec.run_until_stalled(&mut digest_service).is_pending());
752 let Poll::Ready(output) = exec
754 .run_until_stalled(&mut fuchsia_inspect::reader::read(&inspector).boxed())
755 .map(|r| r.expect("got hierarchy"))
756 else {
757 panic!("Couldn't retrieve inspect output");
758 };
759
760 assert_data_tree!(output, root: {
761 logger: {
762 buckets: vec![
763 "Undigested",
764 "Orphaned",
765 "Kernel",
766 "Free",
767 "[Addl]PagerTotal",
768 "[Addl]PagerNewest",
769 "[Addl]PagerOldest",
770 "[Addl]DiscardableLocked",
771 "[Addl]DiscardableUnlocked",
772 "[Addl]ZramCompressedBytes",
773 ],
774 measurements: {
775 "0": {
777 timestamp: NonZeroIntProperty,
778 bucket_sizes: vec![
779 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
790 },
791 },
792 },
793 });
794 Ok(())
795 }
796
797 #[test]
798 fn test_digest_service_no_capture_on_pressure_change() -> anyhow::Result<()> {
799 let mut exec = fasync::TestExecutor::new();
800 let data_provider = Arc::new(FakeAttributionDataProvider {});
801 let (stats_provider, stats_request_stream) =
802 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
803
804 fasync::Task::spawn(async move {
805 serve_kernel_stats(stats_request_stream).await.unwrap();
806 })
807 .detach();
808
809 let inspector = fuchsia_inspect::Inspector::default();
810 let (pressure_provider, pressure_request_stream) =
811 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
812 let mut serve_pressure_stream = pressure_request_stream
813 .then(|request| async {
814 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
815 request.map_err(anyhow::Error::from)?;
816 let watcher_proxy = watcher.into_proxy();
817 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
818 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
819 })
820 .boxed();
821 let mut digest_service = std::pin::pin!(digest_service(
822 Config {
823 capture_on_pressure_change: false,
824 imminent_oom_capture_delay_s: 10,
825 critical_capture_delay_s: 10,
826 warning_capture_delay_s: 10,
827 normal_capture_delay_s: 10,
828 },
829 data_provider,
830 stats_provider,
831 pressure_provider,
832 vec![],
833 inspector.root().create_child("logger"),
834 )?);
835 let watcher =
836 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
837 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
838 let _ = exec.run_until_stalled(&mut digest_service);
839 let output = exec
840 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
841 .expect("got hierarchy");
842
843 assert_data_tree!(output, root: {
844 logger: {
845 measurements: {},
846 },
847 });
848 Ok(())
849 }
850
851 #[test]
852 fn test_digest_service_capture_on_pressure_change() -> anyhow::Result<()> {
853 let mut exec = fasync::TestExecutor::new();
854 let data_provider = Arc::new(FakeAttributionDataProvider {});
855 let (stats_provider, stats_request_stream) =
856 fidl::endpoints::create_proxy_and_stream::<fkernel::StatsMarker>();
857
858 fasync::Task::spawn(async move {
859 serve_kernel_stats(stats_request_stream).await.unwrap();
860 })
861 .detach();
862
863 let inspector = fuchsia_inspect::Inspector::default();
864 let (pressure_provider, pressure_request_stream) =
865 fidl::endpoints::create_proxy_and_stream::<fpressure::ProviderMarker>();
866 let mut serve_pressure_stream = pressure_request_stream
867 .then(|request| async {
868 let fpressure::ProviderRequest::RegisterWatcher { watcher, .. } =
869 request.map_err(anyhow::Error::from)?;
870 let watcher_proxy = watcher.into_proxy();
871 let _ = watcher_proxy.on_level_changed(fpressure::Level::Normal).await?;
872 Ok::<fpressure::WatcherProxy, anyhow::Error>(watcher_proxy)
873 })
874 .boxed();
875 let mut digest_service = std::pin::pin!(digest_service(
876 Config {
877 capture_on_pressure_change: true,
878 imminent_oom_capture_delay_s: 10,
879 critical_capture_delay_s: 10,
880 warning_capture_delay_s: 10,
881 normal_capture_delay_s: 10,
882 },
883 data_provider,
884 stats_provider,
885 pressure_provider,
886 vec![],
887 inspector.root().create_child("logger"),
888 )?);
889 let watcher =
890 exec.run_singlethreaded(serve_pressure_stream.next()).transpose()?.expect("watcher");
891 let _ = exec.run_singlethreaded(watcher.on_level_changed(fpressure::Level::Warning))?;
892 let _ = exec.run_until_stalled(&mut digest_service);
893 let output = exec
894 .run_singlethreaded(fuchsia_inspect::reader::read(&inspector))
895 .expect("got hierarchy");
896
897 assert_data_tree!(output, root: {
898 logger: {
899 buckets: vec![
900 "Undigested",
901 "Orphaned",
902 "Kernel",
903 "Free",
904 "[Addl]PagerTotal",
905 "[Addl]PagerNewest",
906 "[Addl]PagerOldest",
907 "[Addl]DiscardableLocked",
908 "[Addl]DiscardableUnlocked",
909 "[Addl]ZramCompressedBytes",
910 ],
911 measurements: {
912 "0": {
913 timestamp: NonZeroIntProperty,
914 bucket_sizes: vec![
915 1024u64, 6u64, 31u64, 2u64, 14u64, 15u64, 16u64, 18u64, 19u64, 21u64, ],
926 },
927 },
928 },
929 });
930 Ok(())
931 }
932}