1use crate::task_metrics::component_stats::ComponentStats;
6use crate::task_metrics::constants::*;
7use crate::task_metrics::measurement::{Measurement, MeasurementsQueue};
8use crate::task_metrics::runtime_stats_source::{
9 ComponentStartedInfo, RuntimeStatsContainer, RuntimeStatsSource,
10};
11use crate::task_metrics::task_info::{TaskInfo, create_cpu_histogram};
12use async_trait::async_trait;
13use errors::ModelError;
14use fidl_fuchsia_component_runner::Task as DiagnosticsTask;
15use fuchsia_async as fasync;
16use fuchsia_inspect::{self as inspect, ArrayProperty, HistogramProperty};
17use fuchsia_sync::Mutex;
18use futures::channel::{mpsc, oneshot};
19use futures::{FutureExt, StreamExt};
20use hooks::{Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration};
21use injectable_time::{BootInstant, TimeSource};
22use log::warn;
23use moniker::{ExtendedMoniker, Moniker};
24use std::collections::{BTreeMap, VecDeque};
25use std::fmt::Debug;
26use std::sync::{Arc, Weak};
27use zx::{self as zx, HandleBased, sys as zx_sys};
28
29macro_rules! maybe_return {
30 ($e:expr) => {
31 match $e {
32 None => return,
33 Some(v) => v,
34 }
35 };
36}
37
38const MAX_INSPECT_SIZE : usize = 2 * 1024 * 1024 ;
39
40const AGGREGATE_SAMPLES: &'static str = "@aggregated";
41
42pub struct ComponentTreeStats<T: RuntimeStatsSource + Debug> {
44 tree: Mutex<BTreeMap<ExtendedMoniker, Arc<Mutex<ComponentStats<T>>>>>,
46
47 tasks: Mutex<BTreeMap<zx_sys::zx_koid_t, Weak<Mutex<TaskInfo<T>>>>>,
50
51 node: inspect::Node,
53
54 histograms_node: inspect::Node,
56
57 processing_times: inspect::IntExponentialHistogramProperty,
59
60 sampler_task: Mutex<Option<fasync::Task<()>>>,
62
63 totals: Mutex<AggregatedStats>,
65
66 _wait_diagnostics_drain: fasync::Task<()>,
67
68 diagnostics_waiter_task_sender: mpsc::UnboundedSender<fasync::Task<()>>,
69
70 time_source: Arc<dyn TimeSource + Send + Sync>,
71
72 aggregated_dead_task_data: Mutex<MeasurementsQueue>,
76
77 exited_measurements: Mutex<Measurement>,
79}
80
81impl<T: 'static + RuntimeStatsSource + Debug + Send + Sync> ComponentTreeStats<T> {
82 pub fn new(node: inspect::Node) -> Arc<Self> {
83 Self::new_with_timesource(node, Arc::new(BootInstant::new()))
84 }
85
86 fn new_with_timesource(
87 node: inspect::Node,
88 time_source: Arc<dyn TimeSource + Send + Sync>,
89 ) -> Arc<Self> {
90 let processing_times = node.create_int_exponential_histogram(
91 "processing_times_ns",
92 inspect::ExponentialHistogramParams {
93 floor: 1000,
94 initial_step: 1000,
95 step_multiplier: 2,
96 buckets: 16,
97 },
98 );
99
100 let histograms_node = node.create_child("histograms");
101 let totals = Mutex::new(AggregatedStats::new());
102 let (snd, rcv) = mpsc::unbounded();
103 let this = Arc::new(Self {
104 tree: Mutex::new(BTreeMap::new()),
105 tasks: Mutex::new(BTreeMap::new()),
106 node,
107 histograms_node,
108 processing_times,
109 sampler_task: Mutex::new(None),
110 totals,
111 diagnostics_waiter_task_sender: snd,
112 _wait_diagnostics_drain: fasync::Task::spawn(async move {
113 rcv.for_each_concurrent(None, |rx| async move { rx.await }).await;
114 }),
115 time_source: time_source.clone(),
116 aggregated_dead_task_data: Mutex::new(MeasurementsQueue::new(
117 COMPONENT_CPU_MAX_SAMPLES,
118 time_source,
119 )),
120 exited_measurements: Mutex::new(Measurement::default()),
121 });
122
123 let weak_self = Arc::downgrade(&this);
124
125 let weak_self_for_fut = weak_self.clone();
126 this.node.record_lazy_child("measurements", move || {
127 let weak_self_clone = weak_self_for_fut.clone();
128 async move {
129 if let Some(this) = weak_self_clone.upgrade() {
130 Ok(this.write_measurements_to_inspect())
131 } else {
132 Ok(inspect::Inspector::default())
133 }
134 }
135 .boxed()
136 });
137
138 let weak_self_clone_for_fut = weak_self.clone();
139 this.node.record_lazy_child("recent_usage", move || {
140 let weak_self_clone = weak_self_clone_for_fut.clone();
141 async move {
142 if let Some(this) = weak_self_clone.upgrade() {
143 Ok(this.write_recent_usage_to_inspect())
144 } else {
145 Ok(inspect::Inspector::default())
146 }
147 }
148 .boxed()
149 });
150 let weak_self_for_fut = weak_self;
151 this.node.record_lazy_child("@total", move || {
152 let weak_self_clone = weak_self_for_fut.clone();
153 async move {
154 if let Some(this) = weak_self_clone.upgrade() {
155 Ok(this.write_totals_to_inspect())
156 } else {
157 Ok(inspect::Inspector::default())
158 }
159 }
160 .boxed()
161 });
162
163 this
164 }
165
166 pub fn start_measuring(self: &Arc<Self>) {
169 let weak_self = Arc::downgrade(self);
170 self.measure();
171 *(self.sampler_task.lock()) = Some(fasync::Task::spawn(async move {
172 loop {
173 fasync::Timer::new(CPU_SAMPLE_PERIOD).await;
174 match weak_self.upgrade() {
175 None => break,
176 Some(this) => {
177 this.measure();
178 }
179 }
180 }
181 }));
182 }
183
184 fn track_ready(&self, moniker: ExtendedMoniker, task: T) {
186 let histogram = create_cpu_histogram(&self.histograms_node, &moniker);
187 if let Ok(task_info) = TaskInfo::try_from(task, Some(histogram), self.time_source.clone()) {
188 let koid = task_info.koid();
189 let arc_task_info = Arc::new(Mutex::new(task_info));
190 let mut stats = ComponentStats::new();
191 stats.add_task(arc_task_info.clone());
192 let stats = Arc::new(Mutex::new(stats));
193 self.tree.lock().insert(moniker, stats);
194 self.tasks.lock().insert(koid, Arc::downgrade(&arc_task_info));
195 }
196 }
197
198 fn write_measurements_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
199 let inspector =
200 inspect::Inspector::new(inspect::InspectorConfig::default().size(MAX_INSPECT_SIZE));
201 let components = inspector.root().create_child("components");
202 let (component_count, task_count) = self.write_measurements(&components);
203 self.write_aggregate_measurements(&components);
204 inspector.root().record_uint("component_count", component_count);
205 inspector.root().record_uint("task_count", task_count);
206 inspector.root().record(components);
207
208 let stats_node = inspect::stats::StatsNode::new(&inspector);
209 stats_node.record_data_to(inspector.root());
210
211 inspector
212 }
213
214 fn write_recent_usage_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
215 let inspector = inspect::Inspector::default();
216 self.totals.lock().write_recents_to(inspector.root());
217 inspector
218 }
219
220 fn write_totals_to_inspect(self: &Arc<Self>) -> inspect::Inspector {
221 let inspector = inspect::Inspector::default();
222 self.totals.lock().write_totals_to(inspector.root());
223 inspector
224 }
225
226 fn write_aggregate_measurements(&self, components_node: &inspect::Node) {
227 let locked_aggregate = self.aggregated_dead_task_data.lock();
228 if locked_aggregate.no_true_measurements() {
229 return;
230 }
231
232 let aggregate = components_node.create_child(&*AGGREGATE_SAMPLES);
233 locked_aggregate.record_to_node(&aggregate);
234 components_node.record(aggregate);
235 }
236
237 fn write_measurements(&self, node: &inspect::Node) -> (u64, u64) {
238 let mut task_count = 0;
239 let tree = self.tree.lock();
240 for (moniker, stats) in tree.iter() {
241 let stats_guard = stats.lock();
242 let key = match moniker {
243 ExtendedMoniker::ComponentManager => moniker.to_string(),
244 ExtendedMoniker::ComponentInstance(m) => {
245 if *m == Moniker::root() {
246 "<root>".to_string()
247 } else {
248 m.to_string()
249 }
250 }
251 };
252 let child = node.create_child(key);
253 task_count += stats_guard.record_to_node(&child);
254 node.record(child);
255 }
256 (tree.len() as u64, task_count)
257 }
258
259 pub fn measure(self: &Arc<Self>) {
263 let start = zx::BootInstant::get();
264
265 let stats = self
267 .tree
268 .lock()
269 .iter()
270 .map(|(k, v)| (k.clone(), Arc::downgrade(&v)))
271 .collect::<Vec<_>>();
272 let mut locked_exited_measurements = self.exited_measurements.lock();
273 let mut aggregated = Measurement::clone_with_time(&*locked_exited_measurements, start);
274 let mut stats_to_remove = vec![];
275 let mut koids_to_remove = vec![];
276 for (moniker, weak_stats) in stats.into_iter() {
277 if let Some(stats) = weak_stats.upgrade() {
278 let mut stat_guard = stats.lock();
279 aggregated += &stat_guard.measure();
281 aggregated += &stat_guard.measure_tracked_dead_tasks();
282 let (mut stale_koids, exited_cpu_of_deleted) = stat_guard.clean_stale();
283 aggregated += &exited_cpu_of_deleted;
284 *locked_exited_measurements += &exited_cpu_of_deleted;
285 koids_to_remove.append(&mut stale_koids);
286 if !stat_guard.is_alive() {
287 stats_to_remove.push(moniker);
288 }
289 }
290 }
291
292 let mut stats = self.tree.lock();
294 for moniker in stats_to_remove {
295 if let Some(stat) = stats.get(&moniker) {
298 if !stat.lock().is_alive() {
299 stats.remove(&moniker);
300 }
301 }
302 }
303
304 let mut tasks = self.tasks.lock();
305 for koid in koids_to_remove {
306 tasks.remove(&koid);
307 }
308
309 self.totals.lock().insert(aggregated);
310 self.processing_times.insert((zx::BootInstant::get() - start).into_nanos());
311 }
312
313 fn prune_dead_tasks(self: &Arc<Self>, max_dead_tasks: usize) {
314 let mut all_dead_tasks = BTreeMap::new();
315 for (moniker, component) in self.tree.lock().iter() {
316 let dead_tasks = component.lock().gather_dead_tasks();
317 for (timestamp, task) in dead_tasks {
318 all_dead_tasks.insert(timestamp, (task, moniker.clone()));
319 }
320 }
321
322 if all_dead_tasks.len() <= max_dead_tasks {
323 return;
324 }
325
326 let total = all_dead_tasks.len();
327 let to_remove = all_dead_tasks.iter().take(total - (max_dead_tasks / 2));
328
329 let mut koids_to_remove = vec![];
330 let mut aggregate_data = self.aggregated_dead_task_data.lock();
331 for (_, (unlocked_task, _)) in to_remove {
332 let mut task = unlocked_task.lock();
333 if let Ok(measurements) = task.take_measurements_queue() {
334 koids_to_remove.push(task.koid());
335 *aggregate_data += measurements;
336 }
337 }
338
339 let mut stats_to_remove = vec![];
340 for (moniker, stats) in self.tree.lock().iter() {
341 let mut stat_guard = stats.lock();
342 stat_guard.remove_by_koids(&koids_to_remove);
343 if !stat_guard.is_alive() {
344 stats_to_remove.push(moniker.clone());
345 }
346 }
347
348 let mut stats = self.tree.lock();
349 for moniker in stats_to_remove {
350 if let Some(stat) = stats.get(&moniker) {
353 if !stat.lock().is_alive() {
354 stats.remove(&moniker);
355 }
356 }
357 }
358
359 let mut tasks = self.tasks.lock();
360 for koid in koids_to_remove {
361 tasks.remove(&koid);
362 }
363 }
364
365 fn on_component_started<P, C>(self: &Arc<Self>, moniker: &Moniker, runtime: &P)
366 where
367 P: ComponentStartedInfo<C, T>,
368 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
369 {
370 if let Some(receiver) = runtime.get_receiver() {
371 let task = fasync::Task::spawn(Self::diagnostics_waiter_task(
372 Arc::downgrade(&self),
373 moniker.clone().into(),
374 receiver,
375 runtime.start_time(),
376 ));
377 let _ = self.diagnostics_waiter_task_sender.unbounded_send(task);
378 }
379 }
380
381 async fn diagnostics_waiter_task<C>(
382 weak_self: Weak<Self>,
383 moniker: ExtendedMoniker,
384 receiver: oneshot::Receiver<C>,
385 start_time: zx::BootInstant,
386 ) where
387 C: RuntimeStatsContainer<T> + Send + Sync + 'static,
388 {
389 let mut source = maybe_return!(receiver.await.ok());
390 let this = maybe_return!(weak_self.upgrade());
391 let mut tree_lock = this.tree.lock();
392 let stats = tree_lock
393 .entry(moniker.clone())
394 .or_insert_with(|| Arc::new(Mutex::new(ComponentStats::new())));
395 let histogram = create_cpu_histogram(&this.histograms_node, &moniker);
396 let mut task_info = maybe_return!(source.take_component_task().and_then(|task| {
397 TaskInfo::try_from(task, Some(histogram), this.time_source.clone()).ok()
398 }));
399
400 let parent_koid = source
401 .take_parent_task()
402 .and_then(|task| TaskInfo::try_from(task, None, this.time_source.clone()).ok())
403 .map(|task| task.koid());
404 let koid = task_info.koid();
405
406 task_info.record_measurement_with_start_time(start_time);
412 task_info.measure_if_no_parent();
413
414 let mut task_guard = this.tasks.lock();
415
416 let task_info = match parent_koid {
417 None => {
418 Arc::new(Mutex::new(task_info))
421 }
422 Some(parent_koid) => {
423 task_info.has_parent_task = true;
424 let task_info = Arc::new(Mutex::new(task_info));
425 if let Some(parent) = task_guard.get(&parent_koid).and_then(|p| p.upgrade()) {
426 let mut parent_guard = parent.lock();
427 parent_guard.add_child(Arc::downgrade(&task_info));
428 }
429 task_info
430 }
431 };
432 task_guard.insert(koid, Arc::downgrade(&task_info));
433 stats.lock().add_task(task_info);
434 drop(task_guard);
435 drop(tree_lock);
436 this.prune_dead_tasks(MAX_DEAD_TASKS);
437 }
438}
439
440impl ComponentTreeStats<DiagnosticsTask> {
441 pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
442 vec![HooksRegistration::new(
443 "ComponentTreeStats",
444 vec![EventType::Started],
445 Arc::downgrade(self) as Weak<dyn Hook>,
446 )]
447 }
448
449 pub fn track_component_manager_stats(&self) {
451 match fuchsia_runtime::job_default().duplicate_handle(zx::Rights::SAME_RIGHTS) {
452 Ok(job) => {
453 self.track_ready(ExtendedMoniker::ComponentManager, DiagnosticsTask::Job(job));
454 }
455 Err(err) => warn!(
456 "Failed to duplicate component manager job. Not tracking its own stats: {:?}",
457 err
458 ),
459 }
460 }
461}
462
463#[async_trait]
464impl Hook for ComponentTreeStats<DiagnosticsTask> {
465 async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
466 let target_moniker = event
467 .target_moniker
468 .unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
469 match event.event_type() {
470 EventType::Started => {
471 if let EventPayload::Started { runtime, .. } = &event.payload {
472 self.on_component_started(target_moniker, runtime);
473 }
474 }
475 _ => {}
476 }
477 Ok(())
478 }
479}
480
481struct AggregatedStats {
482 measurements: VecDeque<Measurement>,
484}
485
486impl AggregatedStats {
487 fn new() -> Self {
488 Self { measurements: VecDeque::with_capacity(COMPONENT_CPU_MAX_SAMPLES) }
489 }
490
491 fn insert(&mut self, measurement: Measurement) {
492 while self.measurements.len() >= COMPONENT_CPU_MAX_SAMPLES {
493 self.measurements.pop_front();
494 }
495 self.measurements.push_back(measurement);
496 }
497
498 fn write_totals_to(&self, node: &inspect::Node) {
499 let count = self.measurements.len();
500 let timestamps = node.create_int_array(TIMESTAMPS, count);
501 let cpu_times = node.create_int_array(CPU_TIMES, count);
502 let queue_times = node.create_int_array(QUEUE_TIMES, count);
503 for (i, measurement) in self.measurements.iter().enumerate() {
504 timestamps.set(i, measurement.timestamp().into_nanos());
505 cpu_times.set(i, measurement.cpu_time().into_nanos());
506 queue_times.set(i, measurement.queue_time().into_nanos());
507 }
508 node.record(timestamps);
509 node.record(cpu_times);
510 node.record(queue_times);
511 }
512
513 fn write_recents_to(&self, node: &inspect::Node) {
514 if self.measurements.is_empty() {
515 return;
516 }
517 if self.measurements.len() >= 2 {
518 let measurement = self.measurements.get(self.measurements.len() - 2).unwrap();
519 node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
520 node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
521 node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
522 }
523 let measurement = self.measurements.get(self.measurements.len() - 1).unwrap();
524 node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
525 node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
526 node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::task_metrics::testing::{FakeDiagnosticsContainer, FakeRuntime, FakeTask};
534 use diagnostics_assertions::{AnyProperty, assert_data_tree};
535 use diagnostics_hierarchy::DiagnosticsHierarchy;
536 use fuchsia_inspect::DiagnosticsHierarchyGetter;
537
538 use injectable_time::{FakeTime, IncrementingFakeTime};
539
540 #[fuchsia::test]
541 async fn total_tracks_cpu_after_termination() {
542 fuchsia_sync::suppress_lock_cycle_panics();
544
545 let inspector = inspect::Inspector::default();
546 let clock = Arc::new(FakeTime::new());
547 let stats = ComponentTreeStats::new_with_timesource(
548 inspector.root().create_child("stats"),
549 clock.clone(),
550 );
551
552 let mut previous_task_count = 0;
553 for i in 0..10 {
554 clock.add_ticks(1);
555 let component_task = FakeTask::new(
556 i as u64,
557 create_measurements_vec_for_fake_task(COMPONENT_CPU_MAX_SAMPLES as i64 * 3, 2, 4),
558 );
559
560 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
561 let fake_runtime =
562 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
563 stats.on_component_started(&moniker, &fake_runtime);
564
565 loop {
566 let current = stats.tree.lock().len();
567 if current != previous_task_count {
568 previous_task_count = current;
569 break;
570 }
571 fasync::Timer::new(fasync::MonotonicInstant::after(
572 zx::MonotonicDuration::from_millis(100i64),
573 ))
574 .await;
575 }
576 }
577
578 assert_eq!(stats.tasks.lock().len(), 10);
579 assert_eq!(stats.tree.lock().len(), 10);
580
581 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES - 2 {
582 stats.measure();
583 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
584 }
585
586 {
589 let totals = stats.totals.lock();
590 let recent_measurement = totals
591 .measurements
592 .get(totals.measurements.len() - 1)
593 .expect("there's at least one measurement");
594 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
595 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
596
597 let previous_measurement = totals
598 .measurements
599 .get(totals.measurements.len() - 2)
600 .expect("there's a previous measurement");
601 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
602 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320,);
603 }
604
605 for i in 0..10 {
607 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
608 for task in
609 stats.tree.lock().get(&moniker.into()).unwrap().lock().tasks_mut().iter_mut()
610 {
611 task.lock().force_terminate().await;
612 clock.add_ticks(1);
615 }
616 }
617
618 {
620 let totals = stats.totals.lock();
621 let recent_measurement = totals
622 .measurements
623 .get(totals.measurements.len() - 1)
624 .expect("there's at least one measurement");
625 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1180);
626 assert_eq!(recent_measurement.queue_time().into_nanos(), 2360);
627
628 let previous_measurement = totals
629 .measurements
630 .get(totals.measurements.len() - 2)
631 .expect("there's a previous measurement");
632 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1160);
633 assert_eq!(previous_measurement.queue_time().into_nanos(), 2320);
634 }
635
636 stats.measure();
638 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
639
640 {
641 let totals = stats.totals.lock();
642 let recent_measurement = totals
643 .measurements
644 .get(totals.measurements.len() - 1)
645 .expect("there's at least one measurement");
646 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
647 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
648
649 let previous_measurement = totals
650 .measurements
651 .get(totals.measurements.len() - 2)
652 .expect("there's a previous measurement");
653 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1180);
654 assert_eq!(previous_measurement.queue_time().into_nanos(), 2360);
655 }
656
657 stats.measure();
658 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
659
660 {
661 let totals = stats.totals.lock();
662 let recent_measurement = totals
663 .measurements
664 .get(totals.measurements.len() - 1)
665 .expect("there's at least one measurement");
666 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
667 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
668
669 let previous_measurement = totals
670 .measurements
671 .get(totals.measurements.len() - 2)
672 .expect("there's a previous measurement");
673 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
674 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
675 }
676
677 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
679 stats.measure();
680 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
681 }
682
683 assert_eq!(stats.tasks.lock().len(), 0);
685 assert_eq!(stats.tree.lock().len(), 0);
686
687 {
689 let totals = stats.totals.lock();
690 let recent_measurement = totals
691 .measurements
692 .get(totals.measurements.len() - 1)
693 .expect("there's at least one measurement");
694 assert_eq!(recent_measurement.cpu_time().into_nanos(), 1200);
695 assert_eq!(recent_measurement.queue_time().into_nanos(), 2400);
696
697 let previous_measurement = totals
698 .measurements
699 .get(totals.measurements.len() - 2)
700 .expect("there's a previous measurement");
701 assert_eq!(previous_measurement.cpu_time().into_nanos(), 1200);
702 assert_eq!(previous_measurement.queue_time().into_nanos(), 2400);
703 }
704 }
705
706 #[fuchsia::test]
707 async fn components_are_deleted_when_all_tasks_are_gone() {
708 fuchsia_sync::suppress_lock_cycle_panics();
710
711 let inspector = inspect::Inspector::default();
712 let clock = Arc::new(FakeTime::new());
713 let stats = ComponentTreeStats::new_with_timesource(
714 inspector.root().create_child("stats"),
715 clock.clone(),
716 );
717 let moniker: Moniker = ["a"].try_into().unwrap();
718 let moniker: ExtendedMoniker = moniker.into();
719 stats.track_ready(moniker.clone(), FakeTask::default());
720 for _ in 0..=COMPONENT_CPU_MAX_SAMPLES {
721 stats.measure();
722 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
723 }
724 assert_eq!(stats.tree.lock().len(), 1);
725 assert_eq!(stats.tasks.lock().len(), 1);
726 assert_eq!(
727 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
728 COMPONENT_CPU_MAX_SAMPLES
729 );
730
731 for task in stats.tree.lock().get(&moniker).unwrap().lock().tasks_mut().iter_mut() {
733 task.lock().force_terminate().await;
734 clock.add_ticks(1);
735 }
736
737 for i in 0..COMPONENT_CPU_MAX_SAMPLES {
739 stats.measure();
740 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
741 assert_eq!(
742 stats.tree.lock().get(&moniker).unwrap().lock().total_measurements(),
743 COMPONENT_CPU_MAX_SAMPLES - i,
744 );
745 }
746 stats.measure();
747 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
748 assert!(stats.tree.lock().get(&moniker).is_none());
749 assert_eq!(stats.tree.lock().len(), 0);
750 assert_eq!(stats.tasks.lock().len(), 0);
751 }
752
753 fn create_measurements_vec_for_fake_task(
754 num_measurements: i64,
755 init_cpu: i64,
756 init_queue: i64,
757 ) -> Vec<zx::TaskRuntimeInfo> {
758 let mut v = vec![];
759 for i in 0..num_measurements {
760 v.push(zx::TaskRuntimeInfo {
761 cpu_time: i * init_cpu,
762 queue_time: i * init_queue,
763 ..zx::TaskRuntimeInfo::default()
764 });
765 }
766
767 v
768 }
769
770 #[fuchsia::test]
771 async fn dead_tasks_are_pruned() {
772 let clock = Arc::new(FakeTime::new());
773 let inspector = inspect::Inspector::default();
774 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
775 inspector.root().create_child("stats"),
776 clock.clone(),
777 ));
778
779 let mut previous_task_count = 0;
780 for i in 0..(MAX_DEAD_TASKS * 2) {
781 clock.add_ticks(1);
782 let component_task =
783 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(300, 2, 4));
784
785 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
786 let fake_runtime =
787 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, None));
788 stats.on_component_started(&moniker, &fake_runtime);
789
790 loop {
791 let current = stats.tree.lock().len();
792 if current != previous_task_count {
793 previous_task_count = current;
794 break;
795 }
796 fasync::Timer::new(fasync::MonotonicInstant::after(
797 zx::MonotonicDuration::from_millis(100i64),
798 ))
799 .await;
800 }
801
802 for task in
803 stats.tree.lock().get(&moniker.into()).unwrap().lock().tasks_mut().iter_mut()
804 {
805 task.lock().force_terminate().await;
806 clock.add_ticks(1);
807 }
808 }
809
810 let task_count = stats.tasks.lock().len();
811 let moniker_count = stats.tree.lock().len();
812 assert_eq!(task_count, 88);
813 assert_eq!(moniker_count, 88);
814 }
815
816 #[fuchsia::test]
817 async fn aggregated_data_available_inspect() {
818 let max_dead_tasks = 4;
819 let clock = Arc::new(FakeTime::new());
820 let inspector = inspect::Inspector::default();
821 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
822 inspector.root().create_child("stats"),
823 clock.clone(),
824 ));
825
826 let mut moniker_list = vec![];
827 for i in 0..(max_dead_tasks * 2) {
828 clock.add_ticks(1);
829 let moniker = Moniker::try_from([format!("moniker-{}", i).as_ref()]).unwrap();
830 moniker_list.push(moniker.clone());
831 let component_task =
832 FakeTask::new(i as u64, create_measurements_vec_for_fake_task(5, 1, 1));
833 stats.track_ready(moniker.into(), component_task);
834 }
835
836 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
837 stats.measure();
838 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
839 stats.measure();
840 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
841 stats.measure();
842
843 assert_data_tree!(inspector, root: {
844 stats: contains {
845 measurements: contains {
846 components: {
847 "moniker-0": contains {},
848 "moniker-1": contains {},
849 "moniker-2": contains {},
850 "moniker-3": contains {},
851 "moniker-4": contains {},
852 "moniker-5": contains {},
853 "moniker-6": contains {},
854 "moniker-7": contains {},
855 }
856 }
857 }
858 });
859
860 for moniker in moniker_list {
861 for task in stats
862 .tree
863 .lock()
864 .get(&moniker.clone().into())
865 .unwrap()
866 .lock()
867 .tasks_mut()
868 .iter_mut()
869 {
870 task.lock().force_terminate().await;
871 clock.add_ticks(1);
874 }
875 }
876
877 stats.prune_dead_tasks(max_dead_tasks);
878
879 let hierarchy = inspector.get_diagnostics_hierarchy().await;
880 assert_data_tree!(inspector, root: {
881 stats: contains {
882 measurements: contains {
883 components: {
884 "@aggregated": {
885 "timestamps": AnyProperty,
886 "cpu_times": vec![0i64, 6i64, 12i64],
887 "queue_times": vec![0i64, 6i64, 12i64],
888 },
889 "moniker-6": contains {},
890 "moniker-7": contains {},
891 }
892 }
893 }
894 });
895 let (timestamps, _, _) = get_data(&hierarchy, "@aggregated", None);
896 assert_eq!(timestamps.len(), 3);
897 assert!(timestamps[1] > timestamps[0]);
898 assert!(timestamps[2] > timestamps[1]);
899 }
900
901 #[fuchsia::test]
902 async fn total_holds_sum_of_stats() {
903 let inspector = inspect::Inspector::default();
904 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
905 stats.measure();
906 stats.track_ready(
907 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
908 FakeTask::new(
909 1,
910 vec![
911 zx::TaskRuntimeInfo {
912 cpu_time: 2,
913 queue_time: 4,
914 ..zx::TaskRuntimeInfo::default()
915 },
916 zx::TaskRuntimeInfo {
917 cpu_time: 6,
918 queue_time: 8,
919 ..zx::TaskRuntimeInfo::default()
920 },
921 ],
922 ),
923 );
924 stats.track_ready(
925 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
926 FakeTask::new(
927 2,
928 vec![
929 zx::TaskRuntimeInfo {
930 cpu_time: 1,
931 queue_time: 3,
932 ..zx::TaskRuntimeInfo::default()
933 },
934 zx::TaskRuntimeInfo {
935 cpu_time: 5,
936 queue_time: 7,
937 ..zx::TaskRuntimeInfo::default()
938 },
939 ],
940 ),
941 );
942
943 stats.measure();
944 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
945 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
946 assert_eq!(timestamps.len(), 2);
947 assert_eq!(cpu_times, vec![0, 2 + 1]);
948 assert_eq!(queue_times, vec![0, 4 + 3]);
949
950 stats.measure();
951 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
952 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
953 assert_eq!(timestamps.len(), 3);
954 assert_eq!(cpu_times, vec![0, 2 + 1, 6 + 5]);
955 assert_eq!(queue_times, vec![0, 4 + 3, 8 + 7]);
956 }
957
958 #[fuchsia::test]
959 async fn recent_usage() {
960 let inspector = inspect::Inspector::default();
962 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
963 stats.measure();
964
965 stats.track_ready(
966 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
967 FakeTask::new(
968 1,
969 vec![
970 zx::TaskRuntimeInfo {
971 cpu_time: 2,
972 queue_time: 4,
973 ..zx::TaskRuntimeInfo::default()
974 },
975 zx::TaskRuntimeInfo {
976 cpu_time: 6,
977 queue_time: 8,
978 ..zx::TaskRuntimeInfo::default()
979 },
980 ],
981 ),
982 );
983 stats.track_ready(
984 ExtendedMoniker::ComponentInstance(["b"].try_into().unwrap()),
985 FakeTask::new(
986 2,
987 vec![
988 zx::TaskRuntimeInfo {
989 cpu_time: 1,
990 queue_time: 3,
991 ..zx::TaskRuntimeInfo::default()
992 },
993 zx::TaskRuntimeInfo {
994 cpu_time: 5,
995 queue_time: 7,
996 ..zx::TaskRuntimeInfo::default()
997 },
998 ],
999 ),
1000 );
1001
1002 stats.measure();
1003 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1004
1005 assert_data_tree!(&hierarchy, root: contains {
1008 stats: contains {
1009 recent_usage: {
1010 previous_cpu_time: 0i64,
1011 previous_queue_time: 0i64,
1012 previous_timestamp: AnyProperty,
1013 recent_cpu_time: 2 + 1i64,
1014 recent_queue_time: 4 + 3i64,
1015 recent_timestamp: AnyProperty,
1016 }
1017 }
1018 });
1019
1020 let initial_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1022 let (timestamps, cpu_times, queue_times) = get_data_at(&hierarchy, &["stats", "@total"]);
1023 assert_eq!(timestamps.len(), 2);
1024 assert_eq!(timestamps[1], initial_timestamp);
1025 assert_eq!(cpu_times, vec![0, 2 + 1]);
1026 assert_eq!(queue_times, vec![0, 4 + 3]);
1027
1028 stats.measure();
1030 let hierarchy = inspect::reader::read(&inspector).await.expect("read inspect hierarchy");
1031
1032 assert_data_tree!(&hierarchy, root: contains {
1034 stats: contains {
1035 recent_usage: {
1036 previous_cpu_time: 2 + 1i64,
1037 previous_queue_time: 4 + 3i64,
1038 previous_timestamp: initial_timestamp,
1039 recent_cpu_time: 6 + 5i64,
1040 recent_queue_time: 8 + 7i64,
1041 recent_timestamp: AnyProperty,
1042 }
1043 }
1044 });
1045
1046 let recent_timestamp = get_recent_property(&hierarchy, "recent_timestamp");
1048 assert!(recent_timestamp > initial_timestamp);
1049 }
1050
1051 #[fuchsia::test]
1052 async fn component_stats_are_available_in_inspect() {
1053 let inspector = inspect::Inspector::default();
1054 let stats = ComponentTreeStats::new(inspector.root().create_child("stats"));
1055 stats.track_ready(
1056 ExtendedMoniker::ComponentInstance(["a"].try_into().unwrap()),
1057 FakeTask::new(
1058 1,
1059 vec![
1060 zx::TaskRuntimeInfo {
1061 cpu_time: 2,
1062 queue_time: 4,
1063 ..zx::TaskRuntimeInfo::default()
1064 },
1065 zx::TaskRuntimeInfo {
1066 cpu_time: 6,
1067 queue_time: 8,
1068 ..zx::TaskRuntimeInfo::default()
1069 },
1070 ],
1071 ),
1072 );
1073
1074 stats.measure();
1075
1076 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1077 assert_data_tree!(hierarchy, root: {
1078 stats: contains {
1079 measurements: contains {
1080 components: {
1081 "a": {
1082 "1": {
1083 timestamps: AnyProperty,
1084 cpu_times: vec![2i64],
1085 queue_times: vec![4i64],
1086 }
1087 }
1088 }
1089 }
1090 }
1091 });
1092 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1093 assert_eq!(timestamps.len(), 1);
1094
1095 stats.measure();
1097
1098 let hierarchy = inspector.get_diagnostics_hierarchy().await;
1099 assert_data_tree!(hierarchy, root: {
1100 stats: contains {
1101 measurements: contains {
1102 components: {
1103 "a": {
1104 "1": {
1105 timestamps: AnyProperty,
1106 cpu_times: vec![2i64, 6],
1107 queue_times: vec![4i64, 8],
1108 }
1109 }
1110 }
1111 }
1112 }
1113 });
1114 let (timestamps, _, _) = get_data(&hierarchy, "a", Some("1"));
1115 assert_eq!(timestamps.len(), 2);
1116 assert!(timestamps[1] > timestamps[0]);
1117 }
1118
1119 #[fuchsia::test]
1120 async fn on_started_handles_parent_task() {
1121 let inspector = inspect::Inspector::default();
1122 let clock = Arc::new(FakeTime::new());
1123 clock.add_ticks(20);
1126 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1127 inspector.root().create_child("stats"),
1128 clock.clone(),
1129 ));
1130 let parent_task = FakeTask::new(
1131 1,
1132 vec![
1133 zx::TaskRuntimeInfo {
1134 cpu_time: 20,
1135 queue_time: 40,
1136 ..zx::TaskRuntimeInfo::default()
1137 },
1138 zx::TaskRuntimeInfo {
1139 cpu_time: 60,
1140 queue_time: 80,
1141 ..zx::TaskRuntimeInfo::default()
1142 },
1143 ],
1144 );
1145 let component_task = FakeTask::new(
1146 2,
1147 vec![
1148 zx::TaskRuntimeInfo {
1149 cpu_time: 2,
1150 queue_time: 4,
1151 ..zx::TaskRuntimeInfo::default()
1152 },
1153 zx::TaskRuntimeInfo {
1154 cpu_time: 6,
1155 queue_time: 8,
1156 ..zx::TaskRuntimeInfo::default()
1157 },
1158 ],
1159 );
1160
1161 let fake_runtime = FakeRuntime::new_with_start_times(
1162 FakeDiagnosticsContainer::new(parent_task.clone(), None),
1163 IncrementingFakeTime::new(3, std::time::Duration::from_nanos(5)),
1164 );
1165 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_runtime);
1166
1167 let fake_runtime = FakeRuntime::new_with_start_times(
1168 FakeDiagnosticsContainer::new(component_task, Some(parent_task)),
1169 IncrementingFakeTime::new(8, std::time::Duration::from_nanos(5)),
1170 );
1171 stats.on_component_started(&Moniker::try_from(["child"]).unwrap(), &fake_runtime);
1172
1173 loop {
1176 if stats.tree.lock().len() == 2 {
1177 break;
1178 }
1179 fasync::Timer::new(fasync::MonotonicInstant::after(
1180 zx::MonotonicDuration::from_millis(100i64),
1181 ))
1182 .await;
1183 }
1184
1185 assert_data_tree!(inspector, root: {
1186 stats: contains {
1187 measurements: contains {
1188 components: {
1189 "parent": {
1190 "1": {
1191 "timestamps": AnyProperty,
1192 "cpu_times": vec![0i64, 20],
1193 "queue_times": vec![0i64, 40],
1194 },
1195 },
1196 "child": {
1197 "2": {
1198 "timestamps": AnyProperty,
1199 "cpu_times": vec![0i64, 2],
1200 "queue_times": vec![0i64, 4],
1201 }
1202 }
1203 }
1204 }
1205 }
1206 });
1207 }
1208
1209 #[fuchsia::test]
1210 async fn child_tasks_garbage_collection() {
1211 fuchsia_sync::suppress_lock_cycle_panics();
1213
1214 let inspector = inspect::Inspector::default();
1215 let clock = Arc::new(FakeTime::new());
1216 let stats = Arc::new(ComponentTreeStats::new_with_timesource(
1217 inspector.root().create_child("stats"),
1218 clock.clone(),
1219 ));
1220 let parent_task = FakeTask::new(
1221 1,
1222 vec![
1223 zx::TaskRuntimeInfo {
1224 cpu_time: 20,
1225 queue_time: 40,
1226 ..zx::TaskRuntimeInfo::default()
1227 },
1228 zx::TaskRuntimeInfo {
1229 cpu_time: 60,
1230 queue_time: 80,
1231 ..zx::TaskRuntimeInfo::default()
1232 },
1233 ],
1234 );
1235 let component_task = FakeTask::new(
1236 2,
1237 vec![zx::TaskRuntimeInfo {
1238 cpu_time: 2,
1239 queue_time: 4,
1240 ..zx::TaskRuntimeInfo::default()
1241 }],
1242 );
1243 let fake_parent_runtime =
1244 FakeRuntime::new(FakeDiagnosticsContainer::new(parent_task.clone(), None));
1245 stats.on_component_started(&Moniker::try_from(["parent"]).unwrap(), &fake_parent_runtime);
1246
1247 let child_moniker = Moniker::try_from(["child"]).unwrap();
1248 let fake_runtime =
1249 FakeRuntime::new(FakeDiagnosticsContainer::new(component_task, Some(parent_task)));
1250 stats.on_component_started(&child_moniker, &fake_runtime);
1251
1252 loop {
1255 if stats.tree.lock().len() == 2 {
1256 break;
1257 }
1258 fasync::Timer::new(fasync::MonotonicInstant::after(
1259 zx::MonotonicDuration::from_millis(100i64),
1260 ))
1261 .await;
1262 }
1263
1264 assert_eq!(stats.tree.lock().len(), 2);
1265 assert_eq!(stats.tasks.lock().len(), 2);
1266
1267 let extended_moniker = child_moniker.into();
1268 for task in stats.tree.lock().get(&extended_moniker).unwrap().lock().tasks_mut() {
1270 task.lock().force_terminate().await;
1271 clock.add_ticks(1);
1272 }
1273
1274 stats.measure();
1276 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1277
1278 for _ in 0..COMPONENT_CPU_MAX_SAMPLES {
1280 stats.measure();
1281 clock.add_ticks(CPU_SAMPLE_PERIOD.as_nanos() as i64);
1282 }
1283
1284 stats.measure();
1286
1287 assert!(stats.tree.lock().get(&extended_moniker).is_none());
1289 assert_eq!(stats.tree.lock().len(), 1);
1290 assert_eq!(stats.tasks.lock().len(), 1);
1291 }
1292
1293 fn get_recent_property(hierarchy: &DiagnosticsHierarchy, name: &str) -> i64 {
1294 hierarchy.get_property_by_path(&vec!["stats", "recent_usage", name]).unwrap().int().unwrap()
1295 }
1296
1297 fn get_data(
1298 hierarchy: &DiagnosticsHierarchy,
1299 moniker: &str,
1300 task: Option<&str>,
1301 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1302 let mut path = vec!["stats", "measurements", "components", moniker];
1303 if let Some(task) = task {
1304 path.push(task);
1305 }
1306 get_data_at(&hierarchy, &path)
1307 }
1308
1309 fn get_data_at(
1310 hierarchy: &DiagnosticsHierarchy,
1311 path: &[&str],
1312 ) -> (Vec<i64>, Vec<i64>, Vec<i64>) {
1313 let node = hierarchy.get_child_by_path(&path).expect("found stats node");
1314 let cpu_times = node
1315 .get_property("cpu_times")
1316 .expect("found cpu")
1317 .int_array()
1318 .expect("cpu are ints")
1319 .raw_values();
1320 let queue_times = node
1321 .get_property("queue_times")
1322 .expect("found queue")
1323 .int_array()
1324 .expect("queue are ints")
1325 .raw_values();
1326 let timestamps = node
1327 .get_property("timestamps")
1328 .expect("found timestamps")
1329 .int_array()
1330 .expect("timestamps are ints")
1331 .raw_values();
1332 (timestamps.into_owned(), cpu_times.into_owned(), queue_times.into_owned())
1333 }
1334}