1use crate::task_metrics::component_stats::ComponentStats;
6use crate::task_metrics::constants::*;
7use crate::task_metrics::measurement::{Measurement, MeasurementsQueue};
8use crate::task_metrics::runtime_stats_source::{
9 ComponentStartedInfo, RuntimeStatsContainer, RuntimeStatsSource,
10};
11use crate::task_metrics::task_info::{TaskInfo, create_cpu_histogram};
12use async_trait::async_trait;
13use errors::ModelError;
14use fidl_fuchsia_component_runner::Task as DiagnosticsTask;
15use fuchsia_async as fasync;
16use fuchsia_inspect::{self as inspect, ArrayProperty, HistogramProperty};
17use fuchsia_sync::Mutex;
18use futures::channel::{mpsc, oneshot};
19use futures::{FutureExt, StreamExt};
20use hooks::{Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration};
21use injectable_time::{BootInstant, TimeSource};
22use log::warn;
23use moniker::{ExtendedMoniker, Moniker};
24use std::collections::{BTreeMap, VecDeque};
25use std::fmt::Debug;
26use std::sync::{Arc, Weak};
27use zx::{self as zx, HandleBased, sys as zx_sys};
28
29macro_rules! maybe_return {
30 ($e:expr) => {
31 match $e {
32 None => return,
33 Some(v) => v,
34 }
35 };
36}
37
38const MAX_INSPECT_SIZE : usize = 2 * 1024 * 1024 ;
39
40const AGGREGATE_SAMPLES: &'static str = "@aggregated";
41
42pub struct ComponentTreeStats<T: RuntimeStatsSource + Debug> {
44 tree: Mutex<BTreeMap<ExtendedMoniker, Arc<Mutex<ComponentStats<T>>>>>,
46
47 tasks: Mutex<BTreeMap<zx_sys::zx_koid_t, Weak<TaskInfo<T>>>>,
50
51 node: inspect::Node,
53
54 histograms_node: inspect::Node,
56
57 processing_times: inspect::IntExponentialHistogramProperty,
59
60 sampler_task: Mutex<Option<fasync::Task<()>>>,
62
63 totals: Mutex<AggregatedStats>,
65
66 _wait_diagnostics_drain: fasync::Task<()>,
67
68 diagnostics_waiter_task_sender: mpsc::UnboundedSender<fasync::Task<()>>,
69
70 time_source: Arc<dyn TimeSource + Send + Sync>,
71
72 aggregated_dead_task_data: Mutex<MeasurementsQueue>,
76
77 exited_measurements: Mutex<Measurement>,
79}
80
81impl<T: 'static + RuntimeStatsSource + Debug + Send + Sync> ComponentTreeStats<T> {
82 pub fn new(node: inspect::Node) -> Arc<Self> {
83 Self::new_with_timesource(node, Arc::new(BootInstant::new()))
84 }
85
86 fn new_with_timesource(
87 node: inspect::Node,
88 time_source: Arc<dyn TimeSource + Send + Sync>,
89 ) -> Arc<Self> {
90 let processing_times = node.create_int_exponential_histogram(
91 "processing_times_ns",
92 inspect::ExponentialHistogramParams {
93 floor: 1000,
94 initial_step: 1000,
95 step_multiplier: 2,
96 buckets: 16,
97 },
98 );
99
100 let histograms_node = node.create_child("histograms");
101 let totals = Mutex::new(AggregatedStats::new());
102 let (snd, rcv) = mpsc::unbounded();
103 let this = Arc::new(Self {
104 tree: Mutex::new(BTreeMap::new()),
105 tasks: Mutex::new(BTreeMap::new()),
106 node,
107 histograms_node,
108 processing_times,
109 sampler_task: Mutex::new(None),
110 totals,
111 diagnostics_waiter_task_sender: snd,
112 _wait_diagnostics_drain: fasync::Task::spawn(async move {
113 rcv.for_each_concurrent(None, |rx| async move { rx.await }).await;
114 }),
115 time_source: time_source.clone(),
116 aggregated_dead_task_data: Mutex::new(MeasurementsQueue::new(
117 COMPONENT_CPU_MAX_SAMPLES,
118 time_source,
119 )),
120 exited_measurements: Mutex::new(Measurement::default()),
121 });
122
123 let weak_self = Arc::downgrade(&this);
124
125 let weak_self_for_fut = weak_self.clone();
126 this.node.record_lazy_child("measurements", move || {
127 let weak_self_clone = weak_self_for_fut.clone();
128 async move {
129 if let Some(this) = weak_self_clone.upgrade() {
130 Ok(this.write_measurements_to_inspect())
131 } else {
132 Ok(inspect::Inspector::default())
133 }
134 }
135 .boxed()
136 });
137
138 let weak_self_clone_for_fut = weak_self.clone();
139 this.node.record_lazy_child("recent_usage", move || {
140 let weak_self_clone = weak_self_clone_for_fut.clone();
141 async move {
142 if let Some(this) = weak_self_clone.upgrade() {
143 Ok(this.write_recent_usage_to_inspect())
144 } else {
145 Ok(inspect::Inspector::default())
146 }
147 }
148 .boxed()
149 });
150 let weak_self_for_fut = weak_self;
151 this.node.record_lazy_child("@total", move || {
152 let weak_self_clone = weak_self_for_fut.clone();
153 async move {
154 if let Some(this) = weak_self_clone.upgrade() {
155 Ok(this.write_totals_to_inspect())
156 } else {
157 Ok(inspect::Inspector::default())
158 }
159 }
160 .boxed()
161 });
162
163 this
164 }
165
166 pub fn start_measuring(self: &Arc<Self>) {
169 let weak_self = Arc::downgrade(self);
170 self.measure();
171 *(self.sampler_task.lock()) = Some(fasync::Task::spawn(async move {
172 loop {
173 fasync::Timer::new(CPU_SAMPLE_PERIOD).await;
174 match weak_self.upgrade() {
175 None => break,
176 Some(this) => {
177 this.measure();
178 }
179 }
180 }
181 }));
182 }
183
184 fn track_ready(&self, moniker: ExtendedMoniker, task: T) {
186 let histogram = create_cpu_histogram(&self.histograms_node, &moniker);
187
188 if let Ok(task_info) = TaskInfo::try_from(task, Some(histogram), self.time_source.clone()) {
189 let koid = task_info.koid();
190 let arc_task_info = Arc::new(task_info);
191
192 let mut tree_guard = self.tree.lock();
193 let mut tasks_guard = self.tasks.lock();
194
195 let stats = tree_guard
196 .entry(moniker)
197 .or_insert_with(|| Arc::new(Mutex::new(ComponentStats::new())));
198
199 stats.lock().add_task(arc_task_info.clone());
200
201 tasks_guard.insert(koid, Arc::downgrade(&arc_task_info));
202 }
203 }
204
205 fn write_measurements_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
206 let inspector =
207 inspect::Inspector::new(inspect::InspectorConfig::default().size(MAX_INSPECT_SIZE));
208 let components = inspector.root().create_child("components");
209 let (component_count, task_count) = self.write_measurements(&components);
210 self.write_aggregate_measurements(&components);
211 inspector.root().record_uint("component_count", component_count);
212 inspector.root().record_uint("task_count", task_count);
213 inspector.root().record(components);
214
215 let stats_node = inspect::stats::StatsNode::new(&inspector);
216 stats_node.record_data_to(inspector.root());
217
218 inspector
219 }
220
221 fn write_recent_usage_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
222 let inspector = inspect::Inspector::default();
223 self.totals.lock().write_recents_to(inspector.root());
224 inspector
225 }
226
227 fn write_totals_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
228 let inspector = inspect::Inspector::default();
229 self.totals.lock().write_totals_to(inspector.root());
230 inspector
231 }
232
233 fn write_aggregate_measurements(&self, components_node: &inspect::Node) {
234 let locked_aggregate = self.aggregated_dead_task_data.lock();
235 if locked_aggregate.no_true_measurements() {
236 return;
237 }
238
239 let aggregate = components_node.create_child(&*AGGREGATE_SAMPLES);
240 locked_aggregate.record_to_node(&aggregate);
241 components_node.record(aggregate);
242 }
243
244 fn write_measurements(&self, node: &inspect::Node) -> (u64, u64) {
245 let mut task_count = 0;
246 let tree = self.tree.lock();
247 for (moniker, stats) in tree.iter() {
248 let stats_guard = stats.lock();
249 let key = match moniker {
250 ExtendedMoniker::ComponentManager => moniker.to_string(),
251 ExtendedMoniker::ComponentInstance(m) => {
252 if *m == Moniker::root() {
253 "<root>".to_string()
254 } else {
255 m.to_string()
256 }
257 }
258 };
259 let child = node.create_child(key);
260 task_count += stats_guard.record_to_node(&child);
261 node.record(child);
262 }
263 (tree.len() as u64, task_count)
264 }
265
266 pub fn measure(self: &Arc<Self>) {
270 let start = zx::BootInstant::get();
271
272 let stats = self
274 .tree
275 .lock()
276 .iter()
277 .map(|(k, v)| (k.clone(), Arc::downgrade(&v)))
278 .collect::<Vec<_>>();
279
280 let mut aggregated = Measurement::clone_with_time(&*self.exited_measurements.lock(), start);
281 let mut stats_to_remove = vec![];
282 let mut koids_to_remove = vec![];
283
284 for (moniker, weak_stats) in stats.into_iter() {
285 if let Some(stats) = weak_stats.upgrade() {
286 let mut stat_guard = stats.lock();
287 aggregated += &stat_guard.measure();
289 aggregated += &stat_guard.measure_tracked_dead_tasks();
290 let (mut stale_koids, exited_cpu_of_deleted) = stat_guard.clean_stale();
291 aggregated += &exited_cpu_of_deleted;
292
293 *self.exited_measurements.lock() += &exited_cpu_of_deleted;
294
295 koids_to_remove.append(&mut stale_koids);
296 if !stat_guard.is_alive() {
297 stats_to_remove.push(moniker);
298 }
299 }
300 }
301
302 let mut stats = self.tree.lock();
304 for moniker in stats_to_remove {
305 if let Some(stat) = stats.get(&moniker) {
308 if !stat.lock().is_alive() {
309 stats.remove(&moniker);
310 }
311 }
312 }
313
314 let mut tasks = self.tasks.lock();
315 for koid in koids_to_remove {
316 tasks.remove(&koid);
317 }
318
319 self.totals.lock().insert(aggregated);
320 self.processing_times.insert((zx::BootInstant::get() - start).into_nanos());
321 }
322
323 fn prune_dead_tasks(self: &Arc<Self>, max_dead_tasks: usize) {
324 let mut all_dead_tasks = BTreeMap::new();
325 for (_moniker, component) in self.tree.lock().iter() {
326 let dead_tasks = component.lock().gather_dead_tasks();
327 for (timestamp, task) in dead_tasks {
328 all_dead_tasks.insert(timestamp, task);
329 }
330 }
331
332 if all_dead_tasks.len() <= max_dead_tasks {
333 return;
334 }
335
336 let remove_count = all_dead_tasks.len() - (max_dead_tasks / 2);
337 let to_remove = all_dead_tasks.iter().take(remove_count);
338
339 let mut koids_to_remove = Vec::with_capacity(remove_count);
340
341 for (_, task) in to_remove {
342 if let Ok(measurements) = task.take_measurements_queue() {
343 koids_to_remove.push(task.koid());
344 *self.aggregated_dead_task_data.lock() += measurements;
345 }
346 }
347
348 self.tree.lock().retain(|_, stats| {
349 let mut stat_guard = stats.lock();
350 stat_guard.remove_by_koids(&koids_to_remove);
351 stat_guard.is_alive()
352 });
353
354 let mut tasks = self.tasks.lock();
355 for koid in &koids_to_remove {
356 tasks.remove(koid);
357 }
358 }
359
360 fn on_component_started<P, C>(self: &Arc<Self>, moniker: &Moniker, runtime: &P)
361 where
362 P: ComponentStartedInfo<C, T>,
363 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
364 {
365 if let Some(receiver) = runtime.get_receiver() {
366 let task = fasync::Task::spawn(Self::diagnostics_waiter_task(
367 Arc::downgrade(&self),
368 moniker.clone().into(),
369 receiver,
370 runtime.start_time(),
371 ));
372 let _ = self.diagnostics_waiter_task_sender.unbounded_send(task);
373 }
374 }
375
376 async fn diagnostics_waiter_task<C>(
377 weak_self: Weak<Self>,
378 moniker: ExtendedMoniker,
379 receiver: oneshot::Receiver<C>,
380 start_time: zx::BootInstant,
381 ) where
382 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
383 {
384 let mut source = maybe_return!(receiver.await.ok());
385 let this = maybe_return!(weak_self.upgrade());
386
387 let stats = {
388 let mut tree_lock = this.tree.lock();
389 if let Some(stats) = tree_lock.get(&moniker) {
390 stats.clone()
391 } else {
392 let stats = Arc::new(Mutex::new(ComponentStats::new()));
393 tree_lock.insert(moniker.clone(), stats.clone());
394 stats
395 }
396 };
397
398 let task = maybe_return!(source.take_component_task());
399
400 let histogram = create_cpu_histogram(&this.histograms_node, &moniker);
401 let task_info =
402 maybe_return!(TaskInfo::try_from(task, Some(histogram), this.time_source.clone()).ok());
403
404 let parent_koid = source
405 .take_parent_task()
406 .and_then(|task| TaskInfo::try_from(task, None, this.time_source.clone()).ok())
407 .map(|task| task.koid());
408
409 let koid = task_info.koid();
410
411 task_info.record_measurement_with_start_time(start_time);
417 task_info.measure_if_no_parent();
418
419 let task_info = {
420 let mut task_guard = this.tasks.lock();
421 let task_info = match parent_koid {
422 None => {
423 Arc::new(task_info)
426 }
427 Some(parent_koid) => {
428 task_info.stats.lock().has_parent_task = true;
429 let task_info = Arc::new(task_info);
430 if let Some(parent) = task_guard.get(&parent_koid).and_then(|p| p.upgrade()) {
431 parent.add_child(Arc::downgrade(&task_info));
432 }
433 task_info
434 }
435 };
436 task_guard.insert(koid, Arc::downgrade(&task_info));
437 task_info
438 };
439
440 stats.lock().add_task(task_info);
441
442 this.prune_dead_tasks(MAX_DEAD_TASKS);
443 }
444}
445
446impl ComponentTreeStats<DiagnosticsTask> {
447 pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
448 vec![HooksRegistration::new(
449 "ComponentTreeStats",
450 vec![EventType::Started],
451 Arc::downgrade(self) as Weak<dyn Hook>,
452 )]
453 }
454
455 pub fn track_component_manager_stats(&self) {
457 match fuchsia_runtime::job_default().duplicate_handle(zx::Rights::SAME_RIGHTS) {
458 Ok(job) => {
459 self.track_ready(ExtendedMoniker::ComponentManager, DiagnosticsTask::Job(job));
460 }
461 Err(err) => warn!(
462 "Failed to duplicate component manager job. Not tracking its own stats: {:?}",
463 err
464 ),
465 }
466 }
467}
468
469#[async_trait]
470impl Hook for ComponentTreeStats<DiagnosticsTask> {
471 async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
472 let target_moniker = event
473 .target_moniker
474 .unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
475 match event.event_type() {
476 EventType::Started => {
477 if let EventPayload::Started { runtime, .. } = &event.payload {
478 self.on_component_started(target_moniker, runtime);
479 }
480 }
481 _ => {}
482 }
483 Ok(())
484 }
485}
486
487struct AggregatedStats {
488 measurements: VecDeque<Measurement>,
490}
491
492impl AggregatedStats {
493 fn new() -> Self {
494 Self { measurements: VecDeque::with_capacity(COMPONENT_CPU_MAX_SAMPLES) }
495 }
496
497 fn insert(&mut self, measurement: Measurement) {
498 while self.measurements.len() >= COMPONENT_CPU_MAX_SAMPLES {
499 self.measurements.pop_front();
500 }
501 self.measurements.push_back(measurement);
502 }
503
504 fn write_totals_to(&self, node: &inspect::Node) {
505 let count = self.measurements.len();
506 let timestamps = node.create_int_array(TIMESTAMPS, count);
507 let cpu_times = node.create_int_array(CPU_TIMES, count);
508 let queue_times = node.create_int_array(QUEUE_TIMES, count);
509 for (i, measurement) in self.measurements.iter().enumerate() {
510 timestamps.set(i, measurement.timestamp().into_nanos());
511 cpu_times.set(i, measurement.cpu_time().into_nanos());
512 queue_times.set(i, measurement.queue_time().into_nanos());
513 }
514 node.record(timestamps);
515 node.record(cpu_times);
516 node.record(queue_times);
517 }
518
519 fn write_recents_to(&self, node: &inspect::Node) {
520 if self.measurements.is_empty() {
521 return;
522 }
523 if self.measurements.len() >= 2 {
524 let measurement = self.measurements.get(self.measurements.len() - 2).unwrap();
525 node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
526 node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
527 node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
528 }
529 let measurement = self.measurements.get(self.measurements.len() - 1).unwrap();
530 node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
531 node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
532 node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use crate::task_metrics::testing::{FakeDiagnosticsContainer, FakeRuntime, FakeTask};
540 use diagnostics_assertions::{AnyProperty, assert_data_tree};
541 use diagnostics_hierarchy::DiagnosticsHierarchy;
542 use fuchsia_inspect::DiagnosticsHierarchyGetter;
543
544 use injectable_time::{FakeTime, IncrementingFakeTime};
545
546 #[fuchsia::test]
547 async fn total_tracks_cpu_after_termination() {
548 let inspector = inspect::Inspector::default();
549 let clock = Arc::new(FakeTime::new());
550 let stats = ComponentTreeStats::new_with_timesource(
551 inspector.root().create_child("stats"),
552 clock.clone(),
553 );
554
555 let mut previous_task_count = 0;
556 for i in 0..10 {
557 clock.add_ticks(1);
558 let component_task = FakeTask::new(
559 i as u64,
560 create_measurements_vec_for_fake_task(COMPONENT_CPU_MAX_SAMPLES as i64 * 3, 2, 4),
561 );
562
563 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
564 let fake_runtime =
565 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
566 stats.on_component_started(&moniker, &fake_runtime);
567
568 loop {
569 let current = stats.tree.lock().len();
570 if current != previous_task_count {
571 previous_task_count = current;
572 break;
573 }
574 fasync::Timer::new(fasync::MonotonicInstant::after(
575 zx::MonotonicDuration::from_millis(100i64),
576 ))
577 .await;
578 }
579 }
580
581 assert_eq!(stats.tasks.lock().len(), 10);
582 assert_eq!(stats.tree.lock().len(), 10);
583
584 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES - 2 {
585 stats.measure();
586 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
587 }
588
589 {
592 let totals = stats.totals.lock();
593 let recent_measurement = totals
594 .measurements
595 .get(totals.measurements.len() - 1)
596 .expect("there's at least one measurement");
597 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
598 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
599
600 let previous_measurement = totals
601 .measurements
602 .get(totals.measurements.len() - 2)
603 .expect("there's a previous measurement");
604 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
605 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320,);
606 }
607
608 for i in 0..10 {
610 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
611
612 let tasks_to_terminate: Vec<_> = {
613 let tree_guard = stats.tree.lock();
614 let mut node_guard = tree_guard.get(&moniker.into()).unwrap().lock();
615
616 node_guard.tasks_mut().iter().map(|t| t.clone()).collect()
617 };
618
619 for task in tasks_to_terminate {
620 task.force_terminate().await;
621 clock.add_ticks(1);
624 }
625 }
626
627 {
629 let totals = stats.totals.lock();
630 let recent_measurement = totals
631 .measurements
632 .get(totals.measurements.len() - 1)
633 .expect("there's at least one measurement");
634 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
635 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
636
637 let previous_measurement = totals
638 .measurements
639 .get(totals.measurements.len() - 2)
640 .expect("there's a previous measurement");
641 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
642 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320);
643 }
644
645 stats.measure();
647 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
648
649 {
650 let totals = stats.totals.lock();
651 let recent_measurement = totals
652 .measurements
653 .get(totals.measurements.len() - 1)
654 .expect("there's at least one measurement");
655 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
656 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
657
658 let previous_measurement = totals
659 .measurements
660 .get(totals.measurements.len() - 2)
661 .expect("there's a previous measurement");
662 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1180);
663 assert_eq!(previous_measurement.queue_time().into_nanos(), 2360);
664 }
665
666 stats.measure();
667 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
668
669 {
670 let totals = stats.totals.lock();
671 let recent_measurement = totals
672 .measurements
673 .get(totals.measurements.len() - 1)
674 .expect("there's at least one measurement");
675 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
676 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
677
678 let previous_measurement = totals
679 .measurements
680 .get(totals.measurements.len() - 2)
681 .expect("there's a previous measurement");
682 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
683 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
684 }
685
686 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
688 stats.measure();
689 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
690 }
691
692 assert_eq!(stats.tasks.lock().len(), 0);
694 assert_eq!(stats.tree.lock().len(), 0);
695
696 {
698 let totals = stats.totals.lock();
699 let recent_measurement = totals
700 .measurements
701 .get(totals.measurements.len() - 1)
702 .expect("there's at least one measurement");
703 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
704 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
705
706 let previous_measurement = totals
707 .measurements
708 .get(totals.measurements.len() - 2)
709 .expect("there's a previous measurement");
710 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
711 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
712 }
713 }
714
715 #[fuchsia::test]
716 async fn components_are_deleted_when_all_tasks_are_gone() {
717 let inspector = inspect::Inspector::default();
718 let clock = Arc::new(FakeTime::new());
719 let stats = ComponentTreeStats::new_with_timesource(
720 inspector.root().create_child("stats"),
721 clock.clone(),
722 );
723 let moniker: Moniker = ["a"].try_into().unwrap();
724 let moniker: ExtendedMoniker = moniker.into();
725 stats.track_ready(moniker.clone(), FakeTask::default());
726 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES {
727 stats.measure();
728 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
729 }
730 assert_eq!(stats.tree.lock().len(), 1);
731 assert_eq!(stats.tasks.lock().len(), 1);
732 assert_eq!(
733 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
734 COMPONENT_CPU_MAX_SAMPLES
735 );
736
737 let tasks_to_terminate: Vec<_> = {
739 let tree_guard = stats.tree.lock();
740 let mut node_guard = tree_guard.get(&moniker).unwrap().lock();
741 node_guard.tasks_mut().iter().cloned().collect()
742 };
743
744 for task in tasks_to_terminate {
745 task.force_terminate().await;
746 clock.add_ticks(1);
747 }
748
749 for i in 0..COMPONENT_CPU_MAX_SAMPLES {
751 stats.measure();
752 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
753 assert_eq!(
754 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
755 COMPONENT_CPU_MAX_SAMPLES - i,
756 );
757 }
758 stats.measure();
759 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
760 assert!(stats.tree.lock().get(&moniker).is_none());
761 assert_eq!(stats.tree.lock().len(), 0);
762 assert_eq!(stats.tasks.lock().len(), 0);
763 }
764
765 fn create_measurements_vec_for_fake_task(
766 num_measurements: i64,
767 init_cpu: i64,
768 init_queue: i64,
769 ) -> Vec<zx::TaskRuntimeInfo> {
770 let mut v = vec![];
771 for i in 0..num_measurements {
772 v.push(zx::TaskRuntimeInfo {
773 cpu_time: i * init_cpu,
774 queue_time: i * init_queue,
775 ..zx::TaskRuntimeInfo::default()
776 });
777 }
778
779 v
780 }
781
782 #[fuchsia::test]
783 async fn dead_tasks_are_pruned() {
784 let clock = Arc::new(FakeTime::new());
785 let inspector = inspect::Inspector::default();
786 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
787 inspector.root().create_child("stats"),
788 clock.clone(),
789 ));
790
791 let mut previous_task_count = 0;
792 for i in 0..(MAX_DEAD_TASKS * 2) {
793 clock.add_ticks(1);
794 let component_task =
795 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(300, 2, 4));
796
797 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
798 let fake_runtime =
799 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
800 stats.on_component_started(&moniker, &fake_runtime);
801
802 loop {
803 let current = stats.tree.lock().len();
804 if current != previous_task_count {
805 previous_task_count = current;
806 break;
807 }
808 fasync::Timer::new(fasync::MonotonicInstant::after(
809 zx::MonotonicDuration::from_millis(100i64),
810 ))
811 .await;
812 }
813
814 for task in
815 stats.tree.lock().get(&moniker.into()).unwrap().lock().tasks_mut().iter_mut()
816 {
817 task.force_terminate().await;
818 clock.add_ticks(1);
819 }
820 }
821
822 let task_count = stats.tasks.lock().len();
823 let moniker_count = stats.tree.lock().len();
824 assert_eq!(task_count, 88);
825 assert_eq!(moniker_count, 88);
826 }
827
828 #[fuchsia::test]
829 async fn aggregated_data_available_inspect() {
830 let max_dead_tasks = 4;
831 let clock = Arc::new(FakeTime::new());
832 let inspector = inspect::Inspector::default();
833 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
834 inspector.root().create_child("stats"),
835 clock.clone(),
836 ));
837
838 let mut moniker_list = vec![];
839 for i in 0..(max_dead_tasks * 2) {
840 clock.add_ticks(1);
841 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
842 moniker_list.push(moniker.clone());
843 let component_task =
844 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(5, 1, 1));
845 stats.track_ready(moniker.into(), component_task);
846 }
847
848 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
849 stats.measure();
850 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
851 stats.measure();
852 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
853 stats.measure();
854
855 assert_data_tree!(inspector, root: {
856 stats: contains {
857 measurements: contains {
858 components: {
859 "moniker-0": contains {},
860 "moniker-1": contains {},
861 "moniker-2": contains {},
862 "moniker-3": contains {},
863 "moniker-4": contains {},
864 "moniker-5": contains {},
865 "moniker-6": contains {},
866 "moniker-7": contains {},
867 }
868 }
869 }
870 });
871
872 for moniker in moniker_list {
873 for task in stats
874 .tree
875 .lock()
876 .get(&moniker.clone().into())
877 .unwrap()
878 .lock()
879 .tasks_mut()
880 .iter_mut()
881 {
882 task.force_terminate().await;
883 clock.add_ticks(1);
886 }
887 }
888
889 stats.prune_dead_tasks(max_dead_tasks);
890
891 let hierarchy = inspector.get_diagnostics_hierarchy().await;
892 assert_data_tree!(inspector, root: {
893 stats: contains {
894 measurements: contains {
895 components: {
896 "@aggregated": {
897 "timestamps": AnyProperty,
898 "cpu_times": vec![0i64, 6i64, 12i64],
899 "queue_times": vec![0i64, 6i64, 12i64],
900 },
901 "moniker-6": contains {},
902 "moniker-7": contains {},
903 }
904 }
905 }
906 });
907 let (timestamps, _, _) = get_data(&hierarchy, "@aggregated", None);
908 assert_eq!(timestamps.len(), 3);
909 assert!(timestamps[1] > timestamps[0]);
910 assert!(timestamps[2] > timestamps[1]);
911 }
912
913 #[fuchsia::test]
914 async fn total_holds_sum_of_stats() {
915 let inspector = inspect::Inspector::default();
916 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
917 stats.measure();
918 stats.track_ready(
919 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
920 FakeTask::new(
921 1,
922 vec![
923 zx::TaskRuntimeInfo {
924 cpu_time: 2,
925 queue_time: 4,
926 ..zx::TaskRuntimeInfo::default()
927 },
928 zx::TaskRuntimeInfo {
929 cpu_time: 6,
930 queue_time: 8,
931 ..zx::TaskRuntimeInfo::default()
932 },
933 ],
934 ),
935 );
936 stats.track_ready(
937 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
938 FakeTask::new(
939 2,
940 vec![
941 zx::TaskRuntimeInfo {
942 cpu_time: 1,
943 queue_time: 3,
944 ..zx::TaskRuntimeInfo::default()
945 },
946 zx::TaskRuntimeInfo {
947 cpu_time: 5,
948 queue_time: 7,
949 ..zx::TaskRuntimeInfo::default()
950 },
951 ],
952 ),
953 );
954
955 stats.measure();
956 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
957 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
958 assert_eq!(timestamps.len(), 2);
959 assert_eq!(cpu_times, vec![0, 2 + 1]);
960 assert_eq!(queue_times, vec![0, 4 + 3]);
961
962 stats.measure();
963 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
964 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
965 assert_eq!(timestamps.len(), 3);
966 assert_eq!(cpu_times, vec![0, 2 + 1, 6 + 5]);
967 assert_eq!(queue_times, vec![0, 4 + 3, 8 + 7]);
968 }
969
970 #[fuchsia::test]
971 async fn recent_usage() {
972 let inspector = inspect::Inspector::default();
974 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
975 stats.measure();
976
977 stats.track_ready(
978 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
979 FakeTask::new(
980 1,
981 vec![
982 zx::TaskRuntimeInfo {
983 cpu_time: 2,
984 queue_time: 4,
985 ..zx::TaskRuntimeInfo::default()
986 },
987 zx::TaskRuntimeInfo {
988 cpu_time: 6,
989 queue_time: 8,
990 ..zx::TaskRuntimeInfo::default()
991 },
992 ],
993 ),
994 );
995 stats.track_ready(
996 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
997 FakeTask::new(
998 2,
999 vec![
1000 zx::TaskRuntimeInfo {
1001 cpu_time: 1,
1002 queue_time: 3,
1003 ..zx::TaskRuntimeInfo::default()
1004 },
1005 zx::TaskRuntimeInfo {
1006 cpu_time: 5,
1007 queue_time: 7,
1008 ..zx::TaskRuntimeInfo::default()
1009 },
1010 ],
1011 ),
1012 );
1013
1014 stats.measure();
1015 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1016
1017 assert_data_tree!(&hierarchy, root: contains {
1020 stats: contains {
1021 recent_usage: {
1022 previous_cpu_time: 0i64,
1023 previous_queue_time: 0i64,
1024 previous_timestamp: AnyProperty,
1025 recent_cpu_time: 2 + 1i64,
1026 recent_queue_time: 4 + 3i64,
1027 recent_timestamp: AnyProperty,
1028 }
1029 }
1030 });
1031
1032 let initial_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1034 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
1035 assert_eq!(timestamps.len(), 2);
1036 assert_eq!(timestamps[1], initial_timestamp);
1037 assert_eq!(cpu_times, vec![0, 2 + 1]);
1038 assert_eq!(queue_times, vec![0, 4 + 3]);
1039
1040 stats.measure();
1042 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1043
1044 assert_data_tree!(&hierarchy, root: contains {
1046 stats: contains {
1047 recent_usage: {
1048 previous_cpu_time: 2 + 1i64,
1049 previous_queue_time: 4 + 3i64,
1050 previous_timestamp: initial_timestamp,
1051 recent_cpu_time: 6 + 5i64,
1052 recent_queue_time: 8 + 7i64,
1053 recent_timestamp: AnyProperty,
1054 }
1055 }
1056 });
1057
1058 let recent_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1060 assert!(recent_timestamp > initial_timestamp);
1061 }
1062
1063 #[fuchsia::test]
1064 async fn component_stats_are_available_in_inspect() {
1065 let inspector = inspect::Inspector::default();
1066 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
1067 stats.track_ready(
1068 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
1069 FakeTask::new(
1070 1,
1071 vec![
1072 zx::TaskRuntimeInfo {
1073 cpu_time: 2,
1074 queue_time: 4,
1075 ..zx::TaskRuntimeInfo::default()
1076 },
1077 zx::TaskRuntimeInfo {
1078 cpu_time: 6,
1079 queue_time: 8,
1080 ..zx::TaskRuntimeInfo::default()
1081 },
1082 ],
1083 ),
1084 );
1085
1086 stats.measure();
1087
1088 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1089 assert_data_tree!(hierarchy, root: {
1090 stats: contains {
1091 measurements: contains {
1092 components: {
1093 "a": {
1094 "1": {
1095 timestamps: AnyProperty,
1096 cpu_times: vec![2i64],
1097 queue_times: vec![4i64],
1098 }
1099 }
1100 }
1101 }
1102 }
1103 });
1104 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1105 assert_eq!(timestamps.len(), 1);
1106
1107 stats.measure();
1109
1110 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1111 assert_data_tree!(hierarchy, root: {
1112 stats: contains {
1113 measurements: contains {
1114 components: {
1115 "a": {
1116 "1": {
1117 timestamps: AnyProperty,
1118 cpu_times: vec![2i64, 6],
1119 queue_times: vec![4i64, 8],
1120 }
1121 }
1122 }
1123 }
1124 }
1125 });
1126 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1127 assert_eq!(timestamps.len(), 2);
1128 assert!(timestamps[1] > timestamps[0]);
1129 }
1130
1131 #[fuchsia::test]
1132 async fn on_started_handles_parent_task() {
1133 let inspector = inspect::Inspector::default();
1134 let clock = Arc::new(FakeTime::new());
1135 clock.add_ticks(20);
1138 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1139 inspector.root().create_child("stats"),
1140 clock.clone(),
1141 ));
1142 let parent_task = FakeTask::new(
1143 1,
1144 vec![
1145 zx::TaskRuntimeInfo {
1146 cpu_time: 20,
1147 queue_time: 40,
1148 ..zx::TaskRuntimeInfo::default()
1149 },
1150 zx::TaskRuntimeInfo {
1151 cpu_time: 60,
1152 queue_time: 80,
1153 ..zx::TaskRuntimeInfo::default()
1154 },
1155 ],
1156 );
1157 let component_task = FakeTask::new(
1158 2,
1159 vec![
1160 zx::TaskRuntimeInfo {
1161 cpu_time: 2,
1162 queue_time: 4,
1163 ..zx::TaskRuntimeInfo::default()
1164 },
1165 zx::TaskRuntimeInfo {
1166 cpu_time: 6,
1167 queue_time: 8,
1168 ..zx::TaskRuntimeInfo::default()
1169 },
1170 ],
1171 );
1172
1173 let fake_runtime = FakeRuntime::new_with_start_times(
1174 FakeDiagnosticsContainer::new(parent_task.clone(), None),
1175 IncrementingFakeTime::new(3, std::time::Duration::from_nanos(5)),
1176 );
1177 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_runtime);
1178
1179 let fake_runtime = FakeRuntime::new_with_start_times(
1180 FakeDiagnosticsContainer::new(component_task, Some(parent_task)),
1181 IncrementingFakeTime::new(8, std::time::Duration::from_nanos(5)),
1182 );
1183 stats.on_component_started(&Moniker::try_from(["child"]).unwrap(), &fake_runtime);
1184
1185 loop {
1188 if stats.tree.lock().len() == 2 {
1189 break;
1190 }
1191 fasync::Timer::new(fasync::MonotonicInstant::after(
1192 zx::MonotonicDuration::from_millis(100i64),
1193 ))
1194 .await;
1195 }
1196
1197 assert_data_tree!(inspector, root: {
1198 stats: contains {
1199 measurements: contains {
1200 components: {
1201 "parent": {
1202 "1": {
1203 "timestamps": AnyProperty,
1204 "cpu_times": vec![0i64, 20],
1205 "queue_times": vec![0i64, 40],
1206 },
1207 },
1208 "child": {
1209 "2": {
1210 "timestamps": AnyProperty,
1211 "cpu_times": vec![0i64, 2],
1212 "queue_times": vec![0i64, 4],
1213 }
1214 }
1215 }
1216 }
1217 }
1218 });
1219 }
1220
1221 #[fuchsia::test]
1222 async fn child_tasks_garbage_collection() {
1223 let inspector = inspect::Inspector::default();
1224 let clock = Arc::new(FakeTime::new());
1225 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1226 inspector.root().create_child("stats"),
1227 clock.clone(),
1228 ));
1229 let parent_task = FakeTask::new(
1230 1,
1231 vec![
1232 zx::TaskRuntimeInfo {
1233 cpu_time: 20,
1234 queue_time: 40,
1235 ..zx::TaskRuntimeInfo::default()
1236 },
1237 zx::TaskRuntimeInfo {
1238 cpu_time: 60,
1239 queue_time: 80,
1240 ..zx::TaskRuntimeInfo::default()
1241 },
1242 ],
1243 );
1244 let component_task = FakeTask::new(
1245 2,
1246 vec![zx::TaskRuntimeInfo {
1247 cpu_time: 2,
1248 queue_time: 4,
1249 ..zx::TaskRuntimeInfo::default()
1250 }],
1251 );
1252 let fake_parent_runtime =
1253 FakeRuntime::new(FakeDiagnosticsContainer::new(parent_task.clone(), None));
1254 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_parent_runtime);
1255
1256 let child_moniker = Moniker::try_from(["child"]).unwrap();
1257 let fake_runtime =
1258 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, Some(parent_task)));
1259 stats.on_component_started(&child_moniker, &fake_runtime);
1260
1261 loop {
1264 if stats.tree.lock().len() == 2 {
1265 break;
1266 }
1267 fasync::Timer::new(fasync::MonotonicInstant::after(
1268 zx::MonotonicDuration::from_millis(100i64),
1269 ))
1270 .await;
1271 }
1272
1273 assert_eq!(stats.tree.lock().len(), 2);
1274 assert_eq!(stats.tasks.lock().len(), 2);
1275
1276 let extended_moniker = child_moniker.into();
1277 let tasks_to_terminate: Vec<_> = {
1279 let tree_guard = stats.tree.lock();
1280 let mut node_guard = tree_guard.get(&extended_moniker).unwrap().lock();
1281 node_guard.tasks_mut().iter().cloned().collect()
1282 };
1283
1284 for task in tasks_to_terminate {
1285 task.force_terminate().await;
1286 clock.add_ticks(1);
1287 }
1288
1289 stats.measure();
1291 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1292
1293 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
1295 stats.measure();
1296 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1297 }
1298
1299 stats.measure();
1301
1302 assert!(stats.tree.lock().get(&extended_moniker).is_none());
1304 assert_eq!(stats.tree.lock().len(), 1);
1305 assert_eq!(stats.tasks.lock().len(), 1);
1306 }
1307
1308 fn get_recent_property(hierarchy: &DiagnosticsHierarchy, name: &str) -> i64 {
1309 hierarchy.get_property_by_path(&vec!["stats", "recent_usage", name]).unwrap().int().unwrap()
1310 }
1311
1312 fn get_data(
1313 hierarchy: &DiagnosticsHierarchy,
1314 moniker: &str,
1315 task: Option<&str>,
1316 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1317 let mut path = vec!["stats", "measurements", "components", moniker];
1318 if let Some(task) = task {
1319 path.push(task);
1320 }
1321 get_data_at(&hierarchy, &path)
1322 }
1323
1324 fn get_data_at(
1325 hierarchy: &DiagnosticsHierarchy,
1326 path: &[&str],
1327 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1328 let node = hierarchy.get_child_by_path(&path).expect("found stats node");
1329 let cpu_times = node
1330 .get_property("cpu_times")
1331 .expect("found cpu")
1332 .int_array()
1333 .expect("cpu are ints")
1334 .raw_values();
1335 let queue_times = node
1336 .get_property("queue_times")
1337 .expect("found queue")
1338 .int_array()
1339 .expect("queue are ints")
1340 .raw_values();
1341 let timestamps = node
1342 .get_property("timestamps")
1343 .expect("found timestamps")
1344 .int_array()
1345 .expect("timestamps are ints")
1346 .raw_values();
1347 (timestamps.into_owned(), cpu_times.into_owned(), queue_times.into_owned())
1348 }
1349}