starnix_core/task/
kernel_threads.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::execution::create_kernel_thread;
6use crate::task::dynamic_thread_spawner::DynamicThreadSpawner;
7use crate::task::{CurrentTask, DelayedReleaser, Kernel, Task, ThreadGroup};
8use fragile::Fragile;
9use fuchsia_async as fasync;
10use pin_project::pin_project;
11use scopeguard::ScopeGuard;
12use starnix_sync::{Locked, Unlocked};
13use starnix_task_command::TaskCommand;
14use starnix_types::ownership::WeakRef;
15use starnix_uapi::errors::Errno;
16use starnix_uapi::{errno, error};
17use std::cell::{RefCell, RefMut};
18use std::future::Future;
19use std::ops::DerefMut;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::{Arc, OnceLock, Weak};
23use std::task::{Context, Poll};
24
25/// The threads that the kernel runs internally.
26///
27/// These threads run in the main starnix process and outlive any specific userspace process.
28pub struct KernelThreads {
29    /// The main starnix process. This process is used to create new processes when using the
30    /// restricted executor.
31    pub starnix_process: zx::Process,
32
33    /// A handle to the async executor running in `starnix_process`.
34    ///
35    /// You can spawn tasks on this executor using `spawn_future`. However, those task must not
36    /// block. If you need to block, you can spawn a worker thread using `spawner`.
37    ehandle: fasync::EHandle,
38
39    /// The thread pool to spawn blocking calls to.
40    spawner: OnceLock<DynamicThreadSpawner>,
41
42    /// Information about the main system task that is bound to the kernel main thread.
43    system_task: OnceLock<SystemTask>,
44
45    /// A `RefCell` containing an `Unlocked` state for the lock ordering purposes.
46    unlocked_for_async: UnlockedForAsync,
47
48    /// A weak reference to the kernel owning this struct.
49    kernel: Weak<Kernel>,
50}
51
52impl KernelThreads {
53    /// Create a KernelThreads object for the given Kernel.
54    ///
55    /// Must be called in the initial Starnix process on a thread with an async executor. This
56    /// function captures the async executor for this thread for use with spawned futures.
57    ///
58    /// Used during kernel boot.
59    pub fn new(kernel: Weak<Kernel>) -> Self {
60        KernelThreads {
61            starnix_process: fuchsia_runtime::process_self()
62                .duplicate(zx::Rights::SAME_RIGHTS)
63                .expect("Failed to duplicate process self"),
64            ehandle: fasync::EHandle::local(),
65            spawner: Default::default(),
66            system_task: Default::default(),
67            unlocked_for_async: UnlockedForAsync::new(),
68            kernel,
69        }
70    }
71
72    /// Initialize this object with the system task that will be used for spawned threads.
73    ///
74    /// This function must be called before this object is used to spawn threads.
75    pub fn init(&self, system_task: CurrentTask) -> Result<(), Errno> {
76        self.system_task.set(SystemTask::new(system_task)).map_err(|_| errno!(EEXIST))?;
77        self.spawner
78            .set(DynamicThreadSpawner::new(2, self.system_task().weak_task(), "kthreadd/init"))
79            .map_err(|_| errno!(EEXIST))?;
80        Ok(())
81    }
82
83    /// Spawn an async task in the main async executor to await the given future.
84    ///
85    /// Use this function to run async tasks in the background. These tasks cannot block or else
86    /// they will starve the main async executor.
87    ///
88    /// Prefer this function to `spawn` for non-blocking work.
89    pub fn spawn_future(&self, future: impl AsyncFnOnce() -> () + Send + 'static) {
90        self.ehandle.spawn_detached(WrappedMainFuture::new(self.kernel.clone(), async move {
91            fasync::Task::local(future()).await
92        }));
93    }
94
95    /// The dynamic thread spawner used to spawn threads.
96    ///
97    /// To spawn a thread in this thread pool, use `spawn()`.
98    pub fn spawner(&self) -> &DynamicThreadSpawner {
99        self.spawner.get().as_ref().unwrap()
100    }
101
102    /// Access the `CurrentTask` for the kernel main thread.
103    ///
104    /// This function can only be called from the kernel main thread itself.
105    pub fn system_task(&self) -> &CurrentTask {
106        self.system_task.get().expect("KernelThreads::init must be called").system_task.get()
107    }
108
109    /// Access the `Unlocked` state.
110    ///
111    /// This function is intended for limited use in async contexts and can only be called from the
112    /// kernel main thread.
113    pub fn unlocked_for_async(&self) -> RefMut<'_, Locked<Unlocked>> {
114        self.unlocked_for_async.unlocked.get().borrow_mut()
115    }
116
117    /// Access the `ThreadGroup` for the system tasks.
118    ///
119    /// This function can be safely called from anywhere as soon as `KernelThreads::init` has been
120    /// called.
121    pub fn system_thread_group(&self) -> Arc<ThreadGroup> {
122        self.system_task
123            .get()
124            .expect("KernelThreads::init must be called")
125            .system_thread_group
126            .upgrade()
127            .expect("System task must be still alive")
128    }
129}
130
131impl Drop for KernelThreads {
132    fn drop(&mut self) {
133        // TODO: Replace with .release. Creating a new lock context here is not
134        // actually safe, since locks may be held elsewhere on this thread.
135        #[allow(
136            clippy::undocumented_unsafe_blocks,
137            reason = "Force documented unsafe blocks in Starnix"
138        )]
139        let locked = unsafe { Unlocked::new() };
140        if let Some(system_task) = self.system_task.take() {
141            system_task.system_task.into_inner().release(locked);
142        }
143    }
144}
145
146/// Create a new system task, register it on the thread and run the given closure with it.
147
148pub fn with_new_current_task<F, R>(
149    locked: &mut Locked<Unlocked>,
150    system_task: &WeakRef<Task>,
151    task_name: String,
152    f: F,
153) -> Result<R, Errno>
154where
155    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R,
156{
157    let current_task = {
158        let Some(system_task) = system_task.upgrade() else {
159            return error!(ESRCH);
160        };
161        create_kernel_thread(locked, &system_task, TaskCommand::new(task_name.as_bytes())).unwrap()
162    };
163    let result = f(locked, &current_task);
164    current_task.release(locked);
165
166    // Ensure that no releasables are registered after this point as we unwind the stack.
167    DelayedReleaser::finalize();
168
169    Ok(result)
170}
171
172#[derive(Clone, Debug)]
173pub struct LockedAndTask<'a>(
174    Rc<Fragile<(RefCell<&'a mut Locked<Unlocked>>, RefCell<&'a CurrentTask>)>>,
175);
176
177impl<'a> LockedAndTask<'a> {
178    pub(crate) fn new(locked: &'a mut Locked<Unlocked>, current_task: &'a CurrentTask) -> Self {
179        Self(Rc::new(Fragile::new((RefCell::new(locked), RefCell::new(current_task)))))
180    }
181
182    pub fn unlocked(&self) -> impl DerefMut<Target = &'a mut Locked<Unlocked>> + '_ {
183        self.0.get().0.borrow_mut()
184    }
185
186    pub fn current_task(&self) -> &'a CurrentTask {
187        *self.0.get().1.borrow()
188    }
189}
190
191struct SystemTask {
192    /// The system task is bound to the kernel main thread. `Fragile` ensures a runtime crash if it
193    /// is accessed from any other thread.
194    system_task: Fragile<CurrentTask>,
195
196    /// The system `ThreadGroup` is accessible from everywhere.
197    system_thread_group: Weak<ThreadGroup>,
198}
199
200struct UnlockedForAsync {
201    unlocked: Fragile<RefCell<Locked<Unlocked>>>,
202}
203
204impl UnlockedForAsync {
205    fn new() -> Self {
206        #[allow(
207            clippy::undocumented_unsafe_blocks,
208            reason = "Force documented unsafe blocks in Starnix"
209        )]
210        Self { unlocked: Fragile::new(RefCell::new(unsafe { Unlocked::new_instance() })) }
211    }
212}
213
214impl SystemTask {
215    fn new(system_task: CurrentTask) -> Self {
216        let system_thread_group = Arc::downgrade(&system_task.thread_group());
217        Self { system_task: system_task.into(), system_thread_group }
218    }
219}
220
221// The order is important here. Rust will drop fields in declaration order and we want
222// the future to be dropped before the ScopeGuard runs.
223#[pin_project]
224pub(crate) struct WrappedFuture<F, C: Clone>(#[pin] F, fn(C), ScopeGuard<C, fn(C)>);
225
226impl<F, C: Clone> WrappedFuture<F, C> {
227    pub(crate) fn new_with_cleaner(context: C, cleaner: fn(C), fut: F) -> Self {
228        // We need the ScopeGuard in case the future queues releasers when dropped.
229        Self(fut, cleaner, ScopeGuard::with_strategy(context, cleaner))
230    }
231}
232
233impl<F: Future, C: Clone> Future for WrappedFuture<F, C> {
234    type Output = F::Output;
235
236    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237        let this = self.project();
238        let result = this.0.poll(cx);
239
240        this.1(this.2.clone());
241        result
242    }
243}
244
245type WrappedMainFuture<F> = WrappedFuture<F, Weak<Kernel>>;
246
247impl<F> WrappedMainFuture<F> {
248    fn new(kernel: Weak<Kernel>, fut: F) -> Self {
249        Self::new_with_cleaner(kernel, trigger_delayed_releaser, fut)
250    }
251}
252
253fn trigger_delayed_releaser(kernel: Weak<Kernel>) {
254    if let Some(kernel) = kernel.upgrade() {
255        kernel
256            .kthreads
257            .system_task()
258            .trigger_delayed_releaser(kernel.kthreads.unlocked_for_async().deref_mut());
259    }
260}