fuchsia_async/runtime/
instrument.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Pluggable instrumentation for the async executor.
6
7use crate::ScopeHandle;
8pub use crate::runtime::fuchsia::executor::atomic_future::AtomicFutureHandle;
9use std::any::Any;
10
11/// A trait for instrumenting futures.
12///
13/// This trait provides a way to receive callbacks for various events that occur
14/// for a future, such as completion, and polling.
15pub trait Hooks {
16    /// Called when the task has completed.
17    fn task_completed(&mut self);
18
19    /// Called when the task is about to be polled.
20    fn task_poll_start(&mut self);
21
22    /// Called when the task has finished being polled.
23    fn task_poll_end(&mut self);
24}
25
26/// A trait for instrumenting the async executor.
27///
28/// This trait provides a way to receive callbacks for various events that occur
29/// within the executor, such as task creation, completion, and polling.
30pub trait TaskInstrument: Send + Sync + 'static {
31    /// Called when a new task is created.
32    /// Typically, implementers will want to call `task.add_hooks()` here
33    /// to add hooks to the task.
34    fn task_created<'a>(&self, parent_scope: &ScopeHandle, task: &mut AtomicFutureHandle<'a>);
35
36    /// Called when scope is created
37    ///
38    /// # Arguments
39    ///
40    /// * `scope_name`: An optional name for the scope.
41    /// * `parent_scope`: A reference to the parent scope, or None for the root.
42    ///
43    /// # Returns
44    ///
45    /// A boxed `Any` trait object representing the created scope
46    /// which contains data that can later be retrieved from the
47    /// scope using instrument_data() on the scope.
48    fn scope_created(
49        &self,
50        scope_name: &str,
51        parent_scope: Option<&ScopeHandle>,
52    ) -> Box<dyn Any + Send + Sync>;
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::{Scope, ScopeHandle, SendExecutorBuilder, yield_now};
59    use fuchsia_sync::Mutex;
60    use std::any::Any;
61    use std::sync::Arc;
62    use std::sync::atomic::{AtomicUsize, Ordering};
63
64    // Instrumentation to track scope associations
65    struct TrackedTask {
66        poll_count: AtomicUsize,
67        poll_end_count: AtomicUsize,
68        completed: AtomicUsize,
69    }
70
71    struct TrackedScope {
72        name: String,
73        tasks: Mutex<Vec<Arc<TrackedTask>>>,
74        scopes: Mutex<Vec<Arc<TrackedScope>>>,
75    }
76
77    struct ScopeTrackingInstrument {
78        scopes: Mutex<Vec<Arc<TrackedScope>>>,
79    }
80
81    impl ScopeTrackingInstrument {
82        fn new() -> Self {
83            Self { scopes: Mutex::new(Vec::new()) }
84        }
85
86        fn get_scopes(&self) -> Vec<Arc<TrackedScope>> {
87            self.scopes.lock().clone()
88        }
89    }
90
91    struct TrackedHooks {
92        task: Arc<TrackedTask>,
93    }
94
95    impl Hooks for TrackedHooks {
96        fn task_completed(&mut self) {
97            // Relaxed ordering is fine because we hold a mut reference,
98            // so nothing else can mutate this (though because we're
99            // technically sharing this state with the test main,
100            // the borrow checker can't reason about this,
101            // making either unsafe or atomics necessary.)
102            self.task.completed.fetch_add(1, Ordering::Relaxed);
103        }
104
105        fn task_poll_end(&mut self) {
106            assert_eq!(
107                self.task.poll_count.load(Ordering::Relaxed) - 1,
108                self.task.poll_end_count.load(Ordering::Relaxed)
109            );
110            self.task.poll_end_count.fetch_add(1, Ordering::Relaxed);
111        }
112
113        fn task_poll_start(&mut self) {
114            self.task.poll_count.fetch_add(1, Ordering::Relaxed);
115        }
116    }
117
118    impl TaskInstrument for ScopeTrackingInstrument {
119        fn task_created<'a>(
120            &self,
121            parent_scope: &ScopeHandle,
122            handle: &mut AtomicFutureHandle<'a>,
123        ) {
124            // Extract scope name from the parent scope
125            let parent = parent_scope
126                .instrument_data()
127                .unwrap()
128                .downcast_ref::<Arc<TrackedScope>>()
129                .unwrap()
130                .clone();
131            let task = Arc::new(TrackedTask {
132                poll_count: AtomicUsize::new(0),
133                completed: AtomicUsize::new(0),
134                poll_end_count: AtomicUsize::new(0),
135            });
136
137            // Add task to scope
138            let mut tasks = parent.tasks.lock();
139            tasks.push(task.clone());
140
141            handle.add_hooks(TrackedHooks { task });
142        }
143
144        fn scope_created(
145            &self,
146            scope_name: &str,
147            parent_scope: Option<&ScopeHandle>,
148        ) -> Box<dyn Any + Send + Sync> {
149            let tracked_scope = Arc::new(TrackedScope {
150                name: scope_name.to_string(),
151                tasks: Default::default(),
152                scopes: Default::default(),
153            });
154            // Extract parent scope
155            if let Some(parent_handle) = parent_scope
156                && let Some(parent_scope) = parent_handle
157                    .instrument_data()
158                    .and_then(|data| data.downcast_ref::<Arc<TrackedScope>>())
159            {
160                parent_scope.scopes.lock().push(tracked_scope.clone());
161            }
162
163            self.scopes.lock().push(tracked_scope.clone());
164
165            Box::new(tracked_scope)
166        }
167    }
168
169    #[test]
170    fn test_global_spawn_with_scope() {
171        let instrumentation = Arc::new(ScopeTrackingInstrument::new());
172
173        let mut executor = SendExecutorBuilder::new()
174            .num_threads(4)
175            .instrument(Some(instrumentation.clone()))
176            .build();
177        executor.run(async move {
178            let root_scope = Scope::new_with_name("test_root");
179
180            // Create a hierarchy of scopes
181            let level2_scope = root_scope.new_child_with_name("level2");
182            let level3_scope = level2_scope.new_child_with_name("level3".to_string());
183
184            // Spawn tasks in different scopes
185            root_scope.spawn(async {});
186
187            level2_scope.spawn(async {
188                yield_now().await; // Multiple polls
189            });
190
191            level3_scope.spawn(async {});
192
193            level2_scope.spawn(async {});
194
195            level3_scope.spawn(async {});
196
197            // Wait for all tasks to complete
198            root_scope.await;
199        });
200
201        // Verify the hierarchy
202        let scopes = instrumentation.get_scopes();
203        assert_eq!(scopes.len(), 4);
204
205        // The Fuchsia executor creates its own scope called "root",
206        // which is the true root scope here. All other scopes are
207        // children under that one.
208        let root_scope = &scopes[0];
209        assert_eq!(root_scope.name, "root".to_string());
210        assert_eq!(root_scope.tasks.lock().len(), 1);
211        assert_eq!(root_scope.scopes.lock().len(), 1);
212
213        let test_root_scope = &root_scope.scopes.lock()[0];
214        assert_eq!(test_root_scope.name, "test_root".to_string());
215        assert_eq!(test_root_scope.tasks.lock().len(), 1);
216        assert_eq!(test_root_scope.scopes.lock().len(), 1);
217
218        let level2_scope = &test_root_scope.scopes.lock()[0];
219        assert_eq!(level2_scope.name, "level2".to_string());
220        assert_eq!(level2_scope.tasks.lock().len(), 2);
221        assert_eq!(level2_scope.scopes.lock().len(), 1);
222
223        let level3_scope = &level2_scope.scopes.lock()[0];
224        assert_eq!(level3_scope.name, "level3".to_string());
225        assert_eq!(level3_scope.tasks.lock().len(), 2);
226        assert_eq!(level3_scope.scopes.lock().len(), 0);
227
228        // Assert poll counts
229        let root_tasks = root_scope.tasks.lock();
230        // We can't assert the number of polls for the root task,
231        // as that is nondeterministic on a multithreaded executor.
232        assert_eq!(root_tasks[0].completed.load(Ordering::Relaxed), 1);
233
234        let test_root_tasks = test_root_scope.tasks.lock();
235        assert_eq!(test_root_tasks[0].poll_count.load(Ordering::Relaxed), 1);
236        assert_eq!(test_root_tasks[0].completed.load(Ordering::Relaxed), 1);
237
238        let level2_tasks = level2_scope.tasks.lock();
239        assert_eq!(level2_tasks[0].poll_count.load(Ordering::Relaxed), 2);
240        assert_eq!(level2_tasks[0].completed.load(Ordering::Relaxed), 1);
241        assert_eq!(level2_tasks[1].poll_count.load(Ordering::Relaxed), 1);
242        assert_eq!(level2_tasks[1].completed.load(Ordering::Relaxed), 1);
243
244        let level3_tasks = level3_scope.tasks.lock();
245        assert_eq!(level3_tasks[0].poll_count.load(Ordering::Relaxed), 1);
246        assert_eq!(level3_tasks[0].completed.load(Ordering::Relaxed), 1);
247        assert_eq!(level3_tasks[1].poll_count.load(Ordering::Relaxed), 1);
248        assert_eq!(level3_tasks[1].completed.load(Ordering::Relaxed), 1);
249    }
250}