Skip to main content

starnix_core/task/
thread_lockup_detector.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module implements a lockup detector for Starnix kernel threads.
6//! It tracks the start time of operations and reports threads that run for too long
7//! without pausing or stopping the operation.
8//!
9//! It uses a global registry to track active operations across all threads.
10
11use pin_project::pin_project;
12use starnix_sync::RwLock;
13use std::borrow::Borrow;
14use std::cell::RefCell;
15use std::collections::HashSet;
16use std::sync::LazyLock;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19#[derive(Default)]
20pub struct ThreadLockupDetector;
21
22/// Thread-local state that registers the thread in the global registry on creation
23/// and removes it on drop.
24struct ThreadState {
25    /// Pointer to the atomic u64 used to store the start time of the current operation.
26    /// This is boxed to ensure its address remains stable while registered.
27    atomic: Box<AtomicU64>,
28    /// The KOID of the thread, used as the key for removal in `Drop`.
29    koid: zx::Koid,
30}
31
32impl ThreadState {
33    /// Creates a new `ThreadState`, registering the current thread in the global `REGISTRY`.
34    fn new() -> Self {
35        let handle = fuchsia_runtime::with_thread_self(|thread| thread.raw_handle());
36        let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
37        let atomic = Box::new(AtomicU64::new(0));
38        let ptr = &*atomic as *const AtomicU64;
39        let registered = RegisteredThread {
40            // SAFETY: The handle is valid as long as the thread is registered.
41            thread: unsafe { zx::Unowned::from_raw_handle(handle) },
42            koid,
43            atomic: ptr,
44        };
45        REGISTRY.write().insert(registered);
46        Self { atomic, koid }
47    }
48}
49
50impl Drop for ThreadState {
51    /// Removes the thread from the global `REGISTRY` when the thread exits.
52    fn drop(&mut self) {
53        REGISTRY.write().remove(&self.koid);
54    }
55}
56
57thread_local! {
58    static THREAD_STATE: RefCell<Option<ThreadState>> = const { RefCell::new(None) };
59}
60
61/// The information stored in the global registry for each tracked thread.
62#[derive(Clone)]
63struct RegisteredThread {
64    /// An unowned handle to the thread, used for inspection.
65    thread: zx::Unowned<'static, zx::Thread>,
66    /// The KOID of the thread.
67    koid: zx::Koid,
68    /// Pointer to the atomic u64 in the thread's `ThreadState`.
69    atomic: *const AtomicU64,
70}
71
72// We only hash and compare by `koid` to allow lookup and removal by `koid`
73// in the `HashSet`.
74impl std::hash::Hash for RegisteredThread {
75    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
76        self.koid.hash(state);
77    }
78}
79
80impl PartialEq for RegisteredThread {
81    fn eq(&self, other: &Self) -> bool {
82        self.koid == other.koid
83    }
84}
85
86impl Eq for RegisteredThread {}
87
88impl Borrow<zx::Koid> for RegisteredThread {
89    fn borrow(&self) -> &zx::Koid {
90        &self.koid
91    }
92}
93
94// SAFETY: Access to the pointers in the global REGISTRY is protected by a RwLock,
95// ensuring that a thread cannot free its data while another thread is reading it.
96unsafe impl Send for RegisteredThread {}
97// SAFETY: Same as above.
98unsafe impl Sync for RegisteredThread {}
99
100pub struct ThreadLockupInfo {
101    pub thread: zx::Unowned<'static, zx::Thread>,
102    pub koid: zx::Koid,
103}
104
105/// Global registry of all tracked threads.
106static REGISTRY: LazyLock<RwLock<HashSet<RegisteredThread>>> =
107    LazyLock::new(|| RwLock::new(HashSet::new()));
108
109impl ThreadLockupDetector {
110    /// Starts an operation by storing the current time in the thread-local atomic.
111    fn start_operation() {
112        THREAD_STATE.with(|state| {
113            let mut state = state.borrow_mut();
114            let state = state.get_or_insert_with(|| ThreadState::new());
115            state.atomic.store(zx::MonotonicInstant::get().into_nanos() as u64, Ordering::Relaxed);
116        });
117    }
118
119    /// Stops an operation by storing 0 in the thread-local atomic.
120    fn stop_operation() {
121        THREAD_STATE.with(|state| {
122            if let Some(state) = state.borrow().as_ref() {
123                state.atomic.store(0, Ordering::Relaxed);
124            }
125        });
126    }
127
128    /// Iterates over the registry, finds threads that have been running longer than the threshold,
129    /// and returns their `ThreadLockupInfo`.
130    pub fn get_long_running_threads(threshold: zx::MonotonicDuration) -> Vec<ThreadLockupInfo> {
131        let now = zx::MonotonicInstant::get();
132        let registry = REGISTRY.read();
133        registry
134            .iter()
135            .filter_map(|registered| {
136                // SAFETY: We hold the read lock on REGISTRY. Any thread exiting must
137                // acquire the write lock to remove its pointer before freeing the memory.
138                // So the pointer is valid as long as we hold the read lock.
139                let atomic = unsafe { &*registered.atomic };
140                let start_nanos = atomic.load(Ordering::Relaxed);
141                if start_nanos == 0 {
142                    return None;
143                }
144                let start_time = zx::MonotonicInstant::from_nanos(start_nanos as i64);
145                if now - start_time > threshold {
146                    Some(ThreadLockupInfo {
147                        thread: registered.thread.clone(),
148                        koid: registered.koid,
149                    })
150                } else {
151                    None
152                }
153            })
154            .collect()
155    }
156
157    /// Starts tracking the current operation on the current thread.
158    /// Returns a guard that stops tracking when dropped.
159    pub fn track() -> LockupDetectorGuard {
160        LockupDetectorGuard::new()
161    }
162
163    /// Pauses tracking for the current operation on the current thread.
164    /// Returns a guard that resumes tracking when dropped.
165    pub fn pause_tracking() -> LockupDetectorWaitingGuard {
166        LockupDetectorWaitingGuard::new()
167    }
168
169    /// Wraps a future to track its execution when polled.
170    pub fn track_future<F>(inner: F) -> LockupDetectorFuture<F> {
171        LockupDetectorFuture::new(inner)
172    }
173
174    /// Creates a channel where the receiver pauses tracking while waiting for messages.
175    pub fn tracked_channel<T>() -> (std::sync::mpsc::Sender<T>, LockupDetectorReceiver<T>) {
176        let (sender, receiver) = std::sync::mpsc::channel();
177        (sender, LockupDetectorReceiver::new(receiver))
178    }
179}
180
181pub struct LockupDetectorGuard;
182
183impl LockupDetectorGuard {
184    fn new() -> Self {
185        ThreadLockupDetector::start_operation();
186        Self
187    }
188}
189
190impl Drop for LockupDetectorGuard {
191    fn drop(&mut self) {
192        ThreadLockupDetector::stop_operation();
193    }
194}
195
196pub struct LockupDetectorWaitingGuard;
197
198impl LockupDetectorWaitingGuard {
199    fn new() -> Self {
200        ThreadLockupDetector::stop_operation();
201        Self
202    }
203}
204
205impl Drop for LockupDetectorWaitingGuard {
206    fn drop(&mut self) {
207        ThreadLockupDetector::start_operation();
208    }
209}
210
211#[pin_project]
212pub struct LockupDetectorFuture<F> {
213    #[pin]
214    inner: F,
215}
216
217impl<F> LockupDetectorFuture<F> {
218    fn new(inner: F) -> Self {
219        Self { inner }
220    }
221}
222
223impl<F: std::future::Future> std::future::Future for LockupDetectorFuture<F> {
224    type Output = F::Output;
225
226    fn poll(
227        self: std::pin::Pin<&mut Self>,
228        cx: &mut std::task::Context<'_>,
229    ) -> std::task::Poll<Self::Output> {
230        let _guard = LockupDetectorGuard::new();
231        let this = self.project();
232        this.inner.poll(cx)
233    }
234}
235
236pub struct LockupDetectorReceiver<T> {
237    inner: std::sync::mpsc::Receiver<T>,
238}
239
240impl<T> LockupDetectorReceiver<T> {
241    fn new(inner: std::sync::mpsc::Receiver<T>) -> Self {
242        Self { inner }
243    }
244
245    pub fn recv(&self) -> Result<T, std::sync::mpsc::RecvError> {
246        let _guard = LockupDetectorWaitingGuard::new();
247        self.inner.recv()
248    }
249
250    pub fn try_iter(&self) -> std::sync::mpsc::TryIter<'_, T> {
251        self.inner.try_iter()
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    fn get_long_running_koids() -> Vec<zx::Koid> {
260        ThreadLockupDetector::get_long_running_threads(zx::MonotonicDuration::from_nanos(0))
261            .iter()
262            .map(|r| r.koid)
263            .collect()
264    }
265
266    #[test]
267    fn test_lockup_detector() {
268        let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
269
270        {
271            let _guard = ThreadLockupDetector::track();
272
273            // Exceed threshold immediately with zero duration.
274            assert!(get_long_running_koids().contains(&koid));
275
276            // After triggering, it still contains it (we don't reset).
277            assert!(get_long_running_koids().contains(&koid));
278        }
279
280        // Guard dropped.
281        assert!(get_long_running_koids().is_empty());
282    }
283
284    #[test]
285    fn test_guard() {
286        let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
287
288        {
289            let _guard = ThreadLockupDetector::track();
290            assert!(get_long_running_koids().contains(&koid));
291        }
292
293        // Guard dropped.
294        assert!(get_long_running_koids().is_empty());
295    }
296
297    #[test]
298    fn test_waiting_guard() {
299        let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
300
301        let _guard = ThreadLockupDetector::track();
302
303        {
304            let _waiting_guard = ThreadLockupDetector::pause_tracking();
305            // Operation stopped during wait.
306            assert!(get_long_running_koids().is_empty());
307        }
308
309        // Guard dropped, operation restarted.
310        assert!(get_long_running_koids().contains(&koid));
311    }
312
313    #[test]
314    fn test_track_future() {
315        let (koid_tx, koid_rx) = std::sync::mpsc::channel();
316        let (signal_tx, signal_rx) = futures::channel::oneshot::channel::<()>();
317
318        let t = std::thread::spawn(move || {
319            let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
320            koid_tx.send(koid).unwrap();
321
322            let fut = ThreadLockupDetector::track_future(async move {
323                signal_rx.await.unwrap();
324            });
325
326            fuchsia_async::LocalExecutor::default().run_singlethreaded(fut);
327
328            koid
329        });
330
331        let spawned_koid = koid_rx.recv().unwrap();
332
333        // Wait a bit to ensure it entered the future and is waiting.
334        std::thread::sleep(std::time::Duration::from_millis(100));
335
336        // Check that spawned_koid is NOT in long running koids.
337        assert!(!get_long_running_koids().contains(&spawned_koid));
338
339        // Now signal to unblock it.
340        signal_tx.send(()).unwrap();
341
342        t.join().unwrap();
343    }
344
345    #[test]
346    fn test_track_future_polling() {
347        let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
348
349        // Before polling, should not be found.
350        assert!(!get_long_running_koids().contains(&koid));
351
352        let fut = ThreadLockupDetector::track_future(async {
353            assert!(get_long_running_koids().contains(&koid));
354        });
355
356        fuchsia_async::LocalExecutor::default().run_singlethreaded(fut);
357
358        // After polling, should not be found.
359        assert!(!get_long_running_koids().contains(&koid));
360    }
361
362    #[test]
363    fn test_track_channel() {
364        let (koid_tx, koid_rx) = std::sync::mpsc::channel();
365        let (tx, rx) = ThreadLockupDetector::tracked_channel();
366
367        let t = std::thread::spawn(move || {
368            let koid = fuchsia_runtime::with_thread_self(|thread| thread.koid()).unwrap();
369            koid_tx.send(koid).unwrap();
370
371            let _guard = ThreadLockupDetector::track();
372
373            // This will block.
374            rx.recv().unwrap();
375
376            koid
377        });
378
379        let spawned_koid = koid_rx.recv().unwrap();
380
381        // Wait a bit to ensure it entered rx.recv()
382        std::thread::sleep(std::time::Duration::from_millis(100));
383
384        // Check that spawned_koid is NOT in long running koids.
385        assert!(!get_long_running_koids().contains(&spawned_koid));
386
387        // Now send data to unblock it.
388        tx.send(()).unwrap();
389
390        t.join().unwrap();
391    }
392}