starnix_core/task/
kernel_threads.rs1use crate::execution::create_kernel_thread;
6use crate::task::dynamic_thread_spawner::DynamicThreadSpawner;
7use crate::task::{CurrentTask, DelayedReleaser, Kernel, Task, ThreadGroup};
8use fragile::Fragile;
9use fuchsia_async as fasync;
10use pin_project::pin_project;
11use scopeguard::ScopeGuard;
12use starnix_sync::{Locked, Unlocked};
13use starnix_task_command::TaskCommand;
14use starnix_types::ownership::WeakRef;
15use starnix_uapi::errors::Errno;
16use starnix_uapi::{errno, error};
17use std::cell::{RefCell, RefMut};
18use std::future::Future;
19use std::ops::DerefMut;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::{Arc, OnceLock, Weak};
23use std::task::{Context, Poll};
24
25pub struct KernelThreads {
29 pub starnix_process: zx::Process,
32
33 ehandle: fasync::EHandle,
38
39 spawner: OnceLock<DynamicThreadSpawner>,
41
42 system_task: OnceLock<SystemTask>,
44
45 unlocked_for_async: UnlockedForAsync,
47
48 kernel: Weak<Kernel>,
50}
51
52impl KernelThreads {
53 pub fn new(kernel: Weak<Kernel>) -> Self {
60 KernelThreads {
61 starnix_process: fuchsia_runtime::process_self()
62 .duplicate(zx::Rights::SAME_RIGHTS)
63 .expect("Failed to duplicate process self"),
64 ehandle: fasync::EHandle::local(),
65 spawner: Default::default(),
66 system_task: Default::default(),
67 unlocked_for_async: UnlockedForAsync::new(),
68 kernel,
69 }
70 }
71
72 pub fn init(&self, system_task: CurrentTask) -> Result<(), Errno> {
76 self.system_task.set(SystemTask::new(system_task)).map_err(|_| errno!(EEXIST))?;
77 self.spawner
78 .set(DynamicThreadSpawner::new(2, self.system_task().weak_task(), "kthreadd/init"))
79 .map_err(|_| errno!(EEXIST))?;
80 Ok(())
81 }
82
83 pub fn spawn_future(
90 &self,
91 future: impl AsyncFnOnce() -> () + Send + 'static,
92 name: &'static str,
93 ) {
94 self.ehandle.spawn_detached(WrappedMainFuture::new(
95 self.kernel.clone(),
96 async move { fasync::Task::local(future()).await },
97 name,
98 ));
99 }
100
101 pub fn spawner(&self) -> &DynamicThreadSpawner {
105 self.spawner.get().as_ref().unwrap()
106 }
107
108 pub fn system_task(&self) -> &CurrentTask {
112 self.system_task.get().expect("KernelThreads::init must be called").system_task.get()
113 }
114
115 pub fn unlocked_for_async(&self) -> RefMut<'_, Locked<Unlocked>> {
120 self.unlocked_for_async.unlocked.get().borrow_mut()
121 }
122
123 pub fn system_thread_group(&self) -> Arc<ThreadGroup> {
128 self.system_task
129 .get()
130 .expect("KernelThreads::init must be called")
131 .system_thread_group
132 .upgrade()
133 .expect("System task must be still alive")
134 }
135}
136
137impl Drop for KernelThreads {
138 fn drop(&mut self) {
139 #[allow(
142 clippy::undocumented_unsafe_blocks,
143 reason = "Force documented unsafe blocks in Starnix"
144 )]
145 let locked = unsafe { Unlocked::new() };
146 if let Some(system_task) = self.system_task.take() {
147 system_task.system_task.into_inner().release(locked);
148 }
149 }
150}
151
152pub fn with_new_current_task<F, R>(
155 locked: &mut Locked<Unlocked>,
156 system_task: &WeakRef<Task>,
157 task_name: String,
158 f: F,
159) -> Result<R, Errno>
160where
161 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R,
162{
163 let current_task = {
164 let Some(system_task) = system_task.upgrade() else {
165 return error!(ESRCH);
166 };
167 create_kernel_thread(locked, &system_task, TaskCommand::new(task_name.as_bytes())).unwrap()
168 };
169 let result = f(locked, ¤t_task);
170 current_task.release(locked);
171
172 DelayedReleaser::finalize();
174
175 Ok(result)
176}
177
178#[derive(Clone, Debug)]
179pub struct LockedAndTask<'a>(
180 Rc<Fragile<(RefCell<&'a mut Locked<Unlocked>>, RefCell<&'a CurrentTask>)>>,
181);
182
183impl<'a> LockedAndTask<'a> {
184 pub(crate) fn new(locked: &'a mut Locked<Unlocked>, current_task: &'a CurrentTask) -> Self {
185 Self(Rc::new(Fragile::new((RefCell::new(locked), RefCell::new(current_task)))))
186 }
187
188 pub fn unlocked(&self) -> impl DerefMut<Target = &'a mut Locked<Unlocked>> + '_ {
189 self.0.get().0.borrow_mut()
190 }
191
192 pub fn current_task(&self) -> &'a CurrentTask {
193 *self.0.get().1.borrow()
194 }
195}
196
197struct SystemTask {
198 system_task: Fragile<CurrentTask>,
201
202 system_thread_group: Weak<ThreadGroup>,
204}
205
206struct UnlockedForAsync {
207 unlocked: Fragile<RefCell<Locked<Unlocked>>>,
208}
209
210impl UnlockedForAsync {
211 fn new() -> Self {
212 #[allow(
213 clippy::undocumented_unsafe_blocks,
214 reason = "Force documented unsafe blocks in Starnix"
215 )]
216 Self { unlocked: Fragile::new(RefCell::new(unsafe { Unlocked::new_instance() })) }
217 }
218}
219
220impl SystemTask {
221 fn new(system_task: CurrentTask) -> Self {
222 let system_thread_group = Arc::downgrade(&system_task.thread_group());
223 Self { system_task: system_task.into(), system_thread_group }
224 }
225}
226
227#[pin_project]
230pub(crate) struct WrappedFuture<F, C: Clone> {
231 #[pin]
232 fut: F,
233 cleaner: fn(C),
234 context: ScopeGuard<C, fn(C)>,
235 name: &'static str,
236}
237
238impl<F, C: Clone> WrappedFuture<F, C> {
239 pub(crate) fn new_with_cleaner(context: C, cleaner: fn(C), fut: F, name: &'static str) -> Self {
240 Self { fut, cleaner, context: ScopeGuard::with_strategy(context, cleaner), name }
242 }
243}
244
245impl<F: Future, C: Clone> Future for WrappedFuture<F, C> {
246 type Output = F::Output;
247
248 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249 let this = self.project();
250 starnix_logging::trace_duration!(starnix_logging::CATEGORY_STARNIX, &*this.name);
251 let result = this.fut.poll(cx);
252
253 (this.cleaner)(this.context.clone());
254 result
255 }
256}
257
258type WrappedMainFuture<F> = WrappedFuture<F, Weak<Kernel>>;
259
260impl<F> WrappedMainFuture<F> {
261 fn new(kernel: Weak<Kernel>, fut: F, name: &'static str) -> Self {
262 Self::new_with_cleaner(kernel, trigger_delayed_releaser, fut, name)
263 }
264}
265
266fn trigger_delayed_releaser(kernel: Weak<Kernel>) {
267 if let Some(kernel) = kernel.upgrade() {
268 if let Some(system_task) = kernel.kthreads.system_task.get() {
269 system_task
270 .system_task
271 .get()
272 .trigger_delayed_releaser(kernel.kthreads.unlocked_for_async().deref_mut());
273 }
274 }
275}