starnix_core/task/
kernel_threads.rs1use crate::execution::create_kernel_thread;
6use crate::task::dynamic_thread_spawner::DynamicThreadSpawner;
7use crate::task::{CurrentTask, DelayedReleaser, Kernel, Task, ThreadGroup};
8use fragile::Fragile;
9use fuchsia_async as fasync;
10use pin_project::pin_project;
11use scopeguard::ScopeGuard;
12use starnix_sync::{Locked, Unlocked};
13use starnix_task_command::TaskCommand;
14use starnix_types::ownership::WeakRef;
15use starnix_uapi::errors::Errno;
16use starnix_uapi::{errno, error};
17use std::cell::{RefCell, RefMut};
18use std::future::Future;
19use std::ops::DerefMut;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::{Arc, OnceLock, Weak};
23use std::task::{Context, Poll};
24
25pub struct KernelThreads {
29 pub starnix_process: zx::Process,
32
33 ehandle: fasync::EHandle,
38
39 spawner: OnceLock<DynamicThreadSpawner>,
41
42 system_task: OnceLock<SystemTask>,
44
45 unlocked_for_async: UnlockedForAsync,
47
48 kernel: Weak<Kernel>,
50}
51
52impl KernelThreads {
53 pub fn new(kernel: Weak<Kernel>) -> Self {
60 KernelThreads {
61 starnix_process: fuchsia_runtime::process_self()
62 .duplicate(zx::Rights::SAME_RIGHTS)
63 .expect("Failed to duplicate process self"),
64 ehandle: fasync::EHandle::local(),
65 spawner: Default::default(),
66 system_task: Default::default(),
67 unlocked_for_async: UnlockedForAsync::new(),
68 kernel,
69 }
70 }
71
72 pub fn init(&self, system_task: CurrentTask) -> Result<(), Errno> {
76 self.system_task.set(SystemTask::new(system_task)).map_err(|_| errno!(EEXIST))?;
77 self.spawner
78 .set(DynamicThreadSpawner::new(2, self.system_task().weak_task(), "kthreadd/init"))
79 .map_err(|_| errno!(EEXIST))?;
80 Ok(())
81 }
82
83 pub fn spawn_future(&self, future: impl AsyncFnOnce() -> () + Send + 'static) {
90 self.ehandle.spawn_detached(WrappedMainFuture::new(self.kernel.clone(), async move {
91 fasync::Task::local(future()).await
92 }));
93 }
94
95 pub fn spawner(&self) -> &DynamicThreadSpawner {
99 self.spawner.get().as_ref().unwrap()
100 }
101
102 pub fn system_task(&self) -> &CurrentTask {
106 self.system_task.get().expect("KernelThreads::init must be called").system_task.get()
107 }
108
109 pub fn unlocked_for_async(&self) -> RefMut<'_, Locked<Unlocked>> {
114 self.unlocked_for_async.unlocked.get().borrow_mut()
115 }
116
117 pub fn system_thread_group(&self) -> Arc<ThreadGroup> {
122 self.system_task
123 .get()
124 .expect("KernelThreads::init must be called")
125 .system_thread_group
126 .upgrade()
127 .expect("System task must be still alive")
128 }
129}
130
131impl Drop for KernelThreads {
132 fn drop(&mut self) {
133 #[allow(
136 clippy::undocumented_unsafe_blocks,
137 reason = "Force documented unsafe blocks in Starnix"
138 )]
139 let locked = unsafe { Unlocked::new() };
140 if let Some(system_task) = self.system_task.take() {
141 system_task.system_task.into_inner().release(locked);
142 }
143 }
144}
145
146pub fn with_new_current_task<F, R>(
149 locked: &mut Locked<Unlocked>,
150 system_task: &WeakRef<Task>,
151 task_name: String,
152 f: F,
153) -> Result<R, Errno>
154where
155 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R,
156{
157 let current_task = {
158 let Some(system_task) = system_task.upgrade() else {
159 return error!(ESRCH);
160 };
161 create_kernel_thread(locked, &system_task, TaskCommand::new(task_name.as_bytes())).unwrap()
162 };
163 let result = f(locked, ¤t_task);
164 current_task.release(locked);
165
166 DelayedReleaser::finalize();
168
169 Ok(result)
170}
171
172#[derive(Clone, Debug)]
173pub struct LockedAndTask<'a>(
174 Rc<Fragile<(RefCell<&'a mut Locked<Unlocked>>, RefCell<&'a CurrentTask>)>>,
175);
176
177impl<'a> LockedAndTask<'a> {
178 pub(crate) fn new(locked: &'a mut Locked<Unlocked>, current_task: &'a CurrentTask) -> Self {
179 Self(Rc::new(Fragile::new((RefCell::new(locked), RefCell::new(current_task)))))
180 }
181
182 pub fn unlocked(&self) -> impl DerefMut<Target = &'a mut Locked<Unlocked>> + '_ {
183 self.0.get().0.borrow_mut()
184 }
185
186 pub fn current_task(&self) -> &'a CurrentTask {
187 *self.0.get().1.borrow()
188 }
189}
190
191struct SystemTask {
192 system_task: Fragile<CurrentTask>,
195
196 system_thread_group: Weak<ThreadGroup>,
198}
199
200struct UnlockedForAsync {
201 unlocked: Fragile<RefCell<Locked<Unlocked>>>,
202}
203
204impl UnlockedForAsync {
205 fn new() -> Self {
206 #[allow(
207 clippy::undocumented_unsafe_blocks,
208 reason = "Force documented unsafe blocks in Starnix"
209 )]
210 Self { unlocked: Fragile::new(RefCell::new(unsafe { Unlocked::new_instance() })) }
211 }
212}
213
214impl SystemTask {
215 fn new(system_task: CurrentTask) -> Self {
216 let system_thread_group = Arc::downgrade(&system_task.thread_group());
217 Self { system_task: system_task.into(), system_thread_group }
218 }
219}
220
221#[pin_project]
224pub(crate) struct WrappedFuture<F, C: Clone>(#[pin] F, fn(C), ScopeGuard<C, fn(C)>);
225
226impl<F, C: Clone> WrappedFuture<F, C> {
227 pub(crate) fn new_with_cleaner(context: C, cleaner: fn(C), fut: F) -> Self {
228 Self(fut, cleaner, ScopeGuard::with_strategy(context, cleaner))
230 }
231}
232
233impl<F: Future, C: Clone> Future for WrappedFuture<F, C> {
234 type Output = F::Output;
235
236 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237 let this = self.project();
238 let result = this.0.poll(cx);
239
240 this.1(this.2.clone());
241 result
242 }
243}
244
245type WrappedMainFuture<F> = WrappedFuture<F, Weak<Kernel>>;
246
247impl<F> WrappedMainFuture<F> {
248 fn new(kernel: Weak<Kernel>, fut: F) -> Self {
249 Self::new_with_cleaner(kernel, trigger_delayed_releaser, fut)
250 }
251}
252
253fn trigger_delayed_releaser(kernel: Weak<Kernel>) {
254 if let Some(kernel) = kernel.upgrade() {
255 kernel
256 .kthreads
257 .system_task()
258 .trigger_delayed_releaser(kernel.kthreads.unlocked_for_async().deref_mut());
259 }
260}