starnix_core/task/
thread_lockup_detector.rs1use pin_project::pin_project;
12use starnix_sync::RwLock;
13use std::borrow::Borrow;
14use std::cell::RefCell;
15use std::collections::HashSet;
16use std::sync::LazyLock;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19#[derive(Default)]
20pub struct ThreadLockupDetector;
21
22struct ThreadState {
25 atomic: Box<AtomicU64>,
28 koid: zx::Koid,
30}
31
32impl ThreadState {
33 fn new() -> Self {
35 let handle = fuchsia_runtime::with_thread_self(|thread| thread.raw_handle());
36 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
37 let atomic = Box::new(AtomicU64::new(0));
38 let ptr = &*atomic as *const AtomicU64;
39 let registered = RegisteredThread {
40 thread: unsafe { zx::Unowned::from_raw_handle(handle) },
42 koid,
43 atomic: ptr,
44 };
45 REGISTRY.write().insert(registered);
46 Self { atomic, koid }
47 }
48}
49
50impl Drop for ThreadState {
51 fn drop(&mut self) {
53 REGISTRY.write().remove(&self.koid);
54 }
55}
56
57thread_local! {
58 static THREAD_STATE: RefCell<Option<ThreadState>> = const { RefCell::new(None) };
59}
60
61#[derive(Clone)]
63struct RegisteredThread {
64 thread: zx::Unowned<'static, zx::Thread>,
66 koid: zx::Koid,
68 atomic: *const AtomicU64,
70}
71
72impl std::hash::Hash for RegisteredThread {
75 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
76 self.koid.hash(state);
77 }
78}
79
80impl PartialEq for RegisteredThread {
81 fn eq(&self, other: &Self) -> bool {
82 self.koid == other.koid
83 }
84}
85
86impl Eq for RegisteredThread {}
87
88impl Borrow<zx::Koid> for RegisteredThread {
89 fn borrow(&self) -> &zx::Koid {
90 &self.koid
91 }
92}
93
94unsafe impl Send for RegisteredThread {}
97unsafe impl Sync for RegisteredThread {}
99
100pub struct ThreadLockupInfo {
101 pub thread: zx::Unowned<'static, zx::Thread>,
102 pub koid: zx::Koid,
103}
104
105static REGISTRY: LazyLock<RwLock<HashSet<RegisteredThread>>> =
107 LazyLock::new(|| RwLock::new(HashSet::new()));
108
109impl ThreadLockupDetector {
110 fn start_operation() {
112 THREAD_STATE.with(|state| {
113 let mut state = state.borrow_mut();
114 let state = state.get_or_insert_with(|| ThreadState::new());
115 state.atomic.store(zx::MonotonicInstant::get().into_nanos() as u64, Ordering::Relaxed);
116 });
117 }
118
119 fn stop_operation() {
121 THREAD_STATE.with(|state| {
122 if let Some(state) = state.borrow().as_ref() {
123 state.atomic.store(0, Ordering::Relaxed);
124 }
125 });
126 }
127
128 pub fn get_long_running_threads(threshold: zx::MonotonicDuration) -> Vec<ThreadLockupInfo> {
131 let now = zx::MonotonicInstant::get();
132 let registry = REGISTRY.read();
133 registry
134 .iter()
135 .filter_map(|registered| {
136 let atomic = unsafe { &*registered.atomic };
140 let start_nanos = atomic.load(Ordering::Relaxed);
141 if start_nanos == 0 {
142 return None;
143 }
144 let start_time = zx::MonotonicInstant::from_nanos(start_nanos as i64);
145 if now - start_time > threshold {
146 Some(ThreadLockupInfo {
147 thread: registered.thread.clone(),
148 koid: registered.koid,
149 })
150 } else {
151 None
152 }
153 })
154 .collect()
155 }
156
157 pub fn track() -> LockupDetectorGuard {
160 LockupDetectorGuard::new()
161 }
162
163 pub fn pause_tracking() -> LockupDetectorWaitingGuard {
166 LockupDetectorWaitingGuard::new()
167 }
168
169 pub fn track_future<F>(inner: F) -> LockupDetectorFuture<F> {
171 LockupDetectorFuture::new(inner)
172 }
173
174 pub fn tracked_channel<T>() -> (std::sync::mpsc::Sender<T>, LockupDetectorReceiver<T>) {
176 let (sender, receiver) = std::sync::mpsc::channel();
177 (sender, LockupDetectorReceiver::new(receiver))
178 }
179}
180
181pub struct LockupDetectorGuard;
182
183impl LockupDetectorGuard {
184 fn new() -> Self {
185 ThreadLockupDetector::start_operation();
186 Self
187 }
188}
189
190impl Drop for LockupDetectorGuard {
191 fn drop(&mut self) {
192 ThreadLockupDetector::stop_operation();
193 }
194}
195
196pub struct LockupDetectorWaitingGuard;
197
198impl LockupDetectorWaitingGuard {
199 fn new() -> Self {
200 ThreadLockupDetector::stop_operation();
201 Self
202 }
203}
204
205impl Drop for LockupDetectorWaitingGuard {
206 fn drop(&mut self) {
207 ThreadLockupDetector::start_operation();
208 }
209}
210
211#[pin_project]
212pub struct LockupDetectorFuture<F> {
213 #[pin]
214 inner: F,
215}
216
217impl<F> LockupDetectorFuture<F> {
218 fn new(inner: F) -> Self {
219 Self { inner }
220 }
221}
222
223impl<F: std::future::Future> std::future::Future for LockupDetectorFuture<F> {
224 type Output = F::Output;
225
226 fn poll(
227 self: std::pin::Pin<&mut Self>,
228 cx: &mut std::task::Context<'_>,
229 ) -> std::task::Poll<Self::Output> {
230 let _guard = LockupDetectorGuard::new();
231 let this = self.project();
232 this.inner.poll(cx)
233 }
234}
235
236pub struct LockupDetectorReceiver<T> {
237 inner: std::sync::mpsc::Receiver<T>,
238}
239
240impl<T> LockupDetectorReceiver<T> {
241 fn new(inner: std::sync::mpsc::Receiver<T>) -> Self {
242 Self { inner }
243 }
244
245 pub fn recv(&self) -> Result<T, std::sync::mpsc::RecvError> {
246 let _guard = LockupDetectorWaitingGuard::new();
247 self.inner.recv()
248 }
249
250 pub fn try_iter(&self) -> std::sync::mpsc::TryIter<'_, T> {
251 self.inner.try_iter()
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 fn get_long_running_koids() -> Vec<zx::Koid> {
260 ThreadLockupDetector::get_long_running_threads(zx::MonotonicDuration::from_nanos(0))
261 .iter()
262 .map(|r| r.koid)
263 .collect()
264 }
265
266 #[test]
267 fn test_lockup_detector() {
268 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
269
270 {
271 let _guard = ThreadLockupDetector::track();
272
273 assert!(get_long_running_koids().contains(&koid));
275
276 assert!(get_long_running_koids().contains(&koid));
278 }
279
280 assert!(get_long_running_koids().is_empty());
282 }
283
284 #[test]
285 fn test_guard() {
286 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
287
288 {
289 let _guard = ThreadLockupDetector::track();
290 assert!(get_long_running_koids().contains(&koid));
291 }
292
293 assert!(get_long_running_koids().is_empty());
295 }
296
297 #[test]
298 fn test_waiting_guard() {
299 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
300
301 let _guard = ThreadLockupDetector::track();
302
303 {
304 let _waiting_guard = ThreadLockupDetector::pause_tracking();
305 assert!(get_long_running_koids().is_empty());
307 }
308
309 assert!(get_long_running_koids().contains(&koid));
311 }
312
313 #[test]
314 fn test_track_future() {
315 let (koid_tx, koid_rx) = std::sync::mpsc::channel();
316 let (signal_tx, signal_rx) = futures::channel::oneshot::channel::<()>();
317
318 let t = std::thread::spawn(move || {
319 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
320 koid_tx.send(koid).unwrap();
321
322 let fut = ThreadLockupDetector::track_future(async move {
323 signal_rx.await.unwrap();
324 });
325
326 fuchsia_async::LocalExecutor::default().run_singlethreaded(fut);
327
328 koid
329 });
330
331 let spawned_koid = koid_rx.recv().unwrap();
332
333 std::thread::sleep(std::time::Duration::from_millis(100));
335
336 assert!(!get_long_running_koids().contains(&spawned_koid));
338
339 signal_tx.send(()).unwrap();
341
342 t.join().unwrap();
343 }
344
345 #[test]
346 fn test_track_future_polling() {
347 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
348
349 assert!(!get_long_running_koids().contains(&koid));
351
352 let fut = ThreadLockupDetector::track_future(async {
353 assert!(get_long_running_koids().contains(&koid));
354 });
355
356 fuchsia_async::LocalExecutor::default().run_singlethreaded(fut);
357
358 assert!(!get_long_running_koids().contains(&koid));
360 }
361
362 #[test]
363 fn test_track_channel() {
364 let (koid_tx, koid_rx) = std::sync::mpsc::channel();
365 let (tx, rx) = ThreadLockupDetector::tracked_channel();
366
367 let t = std::thread::spawn(move || {
368 let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
369 koid_tx.send(koid).unwrap();
370
371 let _guard = ThreadLockupDetector::track();
372
373 rx.recv().unwrap();
375
376 koid
377 });
378
379 let spawned_koid = koid_rx.recv().unwrap();
380
381 std::thread::sleep(std::time::Duration::from_millis(100));
383
384 assert!(!get_long_running_koids().contains(&spawned_koid));
386
387 tx.send(()).unwrap();
389
390 t.join().unwrap();
391 }
392}