1use crate::execution::create_kernel_thread;
11use crate::task::{
12 CurrentTask, DelayedReleaser, LockedAndTask, Task, WrappedFuture, with_new_current_task,
13};
14use fuchsia_sync::Mutex;
15use futures::TryFutureExt;
16use futures::channel::oneshot;
17use starnix_logging::{log_debug, log_error};
18use starnix_sync::{Locked, Unlocked};
19use starnix_task_command::TaskCommand;
20use starnix_types::ownership::{WeakRef, release_after};
21use starnix_uapi::errno;
22use starnix_uapi::errors::Errno;
23use std::future::Future;
24use std::sync::Arc;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::thread::JoinHandle;
27
28type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
29
30const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
31
32pub struct SpawnRequestBuilder<C: ClosureKind> {
60 debug_name: String,
61 role: Option<&'static str>,
62 closure_kind: C,
63}
64
65impl SpawnRequestBuilder<ClosureNone> {
67 pub fn new() -> Self {
69 Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd".into() }
70 }
71}
72
73impl<C: ClosureKind> SpawnRequestBuilder<C> {
75 pub fn with_role(self, role: &'static str) -> Self {
77 Self { role: Some(role), ..self }
78 }
79
80 pub fn with_debug_name(self, debug_name: &'static str) -> Self {
82 Self { debug_name: debug_name.into(), ..self }
83 }
84}
85
86impl SpawnRequestBuilder<ClosureNone> {
88 pub fn with_sync_closure<F, T>(
90 self,
91 f: F,
92 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
93 where
94 T: Send + 'static,
95 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
96 {
97 let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
98 SpawnRequestBuilder { role, closure_kind: f, debug_name }
99 }
100
101 pub fn with_async_closure<F, T>(
103 self,
104 f: F,
105 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
106 where
107 T: Send + 'static,
108 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
109 {
110 let sync_fn = async_to_sync(f);
111 self.with_sync_closure(sync_fn)
112 }
113}
114
115pub struct SpawnRequest {
117 closure: BoxedClosure,
119 debug_name: String,
121}
122
123impl<T, F> SpawnRequestBuilder<F>
124where
125 T: Send + 'static,
126 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
127{
128 pub fn build(self) -> SpawnRequest {
130 let Self { role, closure_kind, debug_name } = self;
131 let closure = closure_kind;
132 let closure = maybe_apply_role(role, closure);
133 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
134 let _ = closure(locked, current_task);
135 });
136 SpawnRequest { closure, debug_name }
137 }
138
139 pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
150 let Self { role, closure_kind, debug_name } = self;
151 let closure = closure_kind;
152 let (sender, receiver) = sync_channel::<T>(0);
153 let result_fn = move || {
154 receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
155 };
156 let closure = maybe_apply_role(role, closure);
157 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
158 let _ = sender.send(closure(locked, current_task));
159 });
160 (result_fn, SpawnRequest { closure, debug_name })
161 }
162
163 pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
174 let Self { role, closure_kind, debug_name } = self;
175 let closure = closure_kind;
176 let (sender_async, result_fut) = oneshot::channel::<T>();
177 let maybe_with_role = maybe_apply_role(role, closure);
178 let repackaged =
179 Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
180 let result = maybe_with_role(locked, current_task);
181 let _ = sender_async.send(result);
182 });
183 let result_fut =
184 result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
185 (result_fut, SpawnRequest { closure: repackaged, debug_name })
186 }
187}
188
189#[derive(Debug)]
192pub struct DynamicThreadSpawner {
193 state: Arc<Mutex<DynamicThreadSpawnerState>>,
194 system_task: WeakRef<Task>,
196 persistent_thread: RunningThread,
199}
200
201fn maybe_apply_role<R, F>(
203 role: Option<&'static str>,
204 f: F,
205) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
206where
207 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
208{
209 move |locked, current_task| {
210 if let Some(role) = role {
211 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
212 log_debug!(e:%; "failed to set kthread role");
213 }
214 let result = f(locked, current_task);
215 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
216 log_debug!(e:%; "failed to reset kthread role to default priority");
217 }
218 result
219 } else {
220 f(locked, current_task)
221 }
222 }
223}
224
225fn async_to_sync<T, F>(
227 f: F,
228) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
229where
230 T: Send + 'static,
231 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
232{
233 move |locked, current_task| {
234 let mut exec = fuchsia_async::LocalExecutor::default();
235 let locked_and_task = LockedAndTask::new(locked, current_task);
236
237 let locked_and_task_clone = locked_and_task.clone();
238 let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, f(locked_and_task_clone));
239 exec.run_singlethreaded(wrapped_future)
240 }
241}
242
243pub trait ClosureKind {}
248
249pub struct ClosureNone {}
253impl ClosureKind for ClosureNone {}
254
255impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
258 ClosureKind for FN
259{
260}
261
262#[derive(Debug)]
263struct DynamicThreadSpawnerState {
264 threads: Vec<RunningThread>,
265 idle_threads: u8,
266 max_idle_threads: u8,
267}
268
269impl DynamicThreadSpawner {
270 pub fn new(
271 max_idle_threads: u8,
272 system_task: WeakRef<Task>,
273 debug_name: impl Into<String>,
274 ) -> Self {
275 let persistent_thread =
276 RunningThread::new_persistent(system_task.clone(), debug_name.into());
277 Self {
278 state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
279 max_idle_threads,
280 idle_threads: 0,
281 threads: vec![],
282 })),
283 system_task,
284 persistent_thread,
285 }
286 }
287
288 pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
296 let mut function: BoxedClosure = spawn_request.closure;
298 let mut state = self.state.lock();
299 if state.idle_threads > 0 {
300 let mut i = 0;
301 while i < state.threads.len() {
302 let thread_index = i;
305 i += 1;
306 match state.threads[thread_index].try_dispatch(function) {
307 Ok(_) => {
308 state.idle_threads -= 1;
310 return;
311 }
312 Err(TrySendError::Full(f)) => {
313 function = f;
315 }
316 Err(TrySendError::Disconnected(f)) => {
317 state.idle_threads -= 1;
319 state.threads.remove(thread_index);
320 i -= 1;
321 function = f;
322 }
323 }
324 }
325 }
326
327 let (sender, receiver) = sync_channel::<RunningThread>(0);
329 let dispatch_function: BoxedClosure = Box::new({
330 let state = self.state.clone();
331 let system_task = self.system_task.clone();
332 move |_, _| {
333 sender
334 .send(RunningThread::new(
335 state,
336 system_task,
337 spawn_request.debug_name,
338 function,
339 ))
340 .expect("receiver must not be dropped");
341 }
342 });
343 self.persistent_thread
344 .dispatch(dispatch_function)
345 .expect("persistent thread should not have ended.");
346 state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
347 }
348}
349
350type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
351
352impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
353 fn new(locked_and_task: LockedAndTask<'a>, fut: F) -> Self {
354 Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut)
355 }
356}
357
358fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
359 locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
360}
361
362#[derive(Debug)]
363struct RunningThread {
364 thread: Option<JoinHandle<()>>,
365 sender: Option<SyncSender<BoxedClosure>>,
366}
367
368impl RunningThread {
369 fn new(
370 state: Arc<Mutex<DynamicThreadSpawnerState>>,
371 system_task: WeakRef<Task>,
372 debug_task_name: String,
373 f: BoxedClosure,
374 ) -> Self {
375 let (sender, receiver) = sync_channel::<BoxedClosure>(0);
376 let thread = Some(
377 std::thread::Builder::new()
378 .name("kthread-dynamic-worker".to_string())
379 .spawn(move || {
380 #[allow(
382 clippy::undocumented_unsafe_blocks,
383 reason = "Force documented unsafe blocks in Starnix"
384 )]
385 let locked = unsafe { Unlocked::new() };
386 let result = with_new_current_task(
387 locked,
388 &system_task,
389 debug_task_name,
390 |locked, current_task| {
391 while let Ok(f) = receiver.recv() {
392 f(locked, ¤t_task);
393 current_task.trigger_delayed_releaser(locked);
395 let mut state = state.lock();
396 state.idle_threads += 1;
397 if state.idle_threads > state.max_idle_threads {
398 return;
403 }
404 }
405 },
406 );
407 if let Err(e) = result {
408 log_error!("Unable to create a kernel thread: {e:?}");
409 }
410 })
411 .expect("able to create threads"),
412 );
413 let result = Self { thread, sender: Some(sender) };
414 result
417 .sender
418 .as_ref()
419 .expect("sender should never be None")
420 .send(f)
421 .expect("Dispatch cannot fail");
422 result
423 }
424
425 fn new_persistent(system_task: WeakRef<Task>, task_name: String) -> Self {
426 let (sender, receiver) = sync_channel::<BoxedClosure>(20);
428 let thread = Some(
429 std::thread::Builder::new()
430 .name("kthread-persistent-worker".to_string())
431 .spawn(move || {
432 #[allow(
434 clippy::undocumented_unsafe_blocks,
435 reason = "Force documented unsafe blocks in Starnix"
436 )]
437 let locked = unsafe { Unlocked::new() };
438 let current_task = {
439 let Some(system_task) = system_task.upgrade() else {
440 return;
441 };
442 match create_kernel_thread(
443 locked,
444 &system_task,
445 TaskCommand::new(task_name.as_bytes()),
446 ) {
447 Ok(task) => task,
448 Err(e) => {
449 log_error!("Unable to create a kernel thread: {e:?}");
450 return;
451 }
452 }
453 };
454 release_after!(current_task, locked, {
455 while let Ok(f) = receiver.recv() {
456 f(locked, ¤t_task);
457
458 current_task.trigger_delayed_releaser(locked);
460 }
461 });
462
463 DelayedReleaser::finalize();
465 })
466 .expect("able to create threads"),
467 );
468 Self { thread, sender: Some(sender) }
469 }
470
471 fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
472 self.sender.as_ref().expect("sender should never be None").try_send(f)
473 }
474
475 fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
476 self.sender.as_ref().expect("sender should never be None").send(f)
477 }
478}
479
480impl Drop for RunningThread {
481 fn drop(&mut self) {
482 self.sender = None;
483 match self.thread.take() {
484 Some(thread) => thread.join().expect("Thread should join."),
485 _ => panic!("Thread should never be None"),
486 };
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::testing::spawn_kernel_and_run;
494
495 #[fuchsia::test]
496 async fn run_simple_task() {
497 spawn_kernel_and_run(async |_, current_task| {
498 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
499 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
502 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
503 spawner.spawn_from_request(req);
504 })
505 .await;
506 }
507
508 #[fuchsia::test]
509 async fn run_10_tasks() {
510 spawn_kernel_and_run(async |_, current_task| {
511 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
512 for _ in 0..10 {
513 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
514 let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
515 spawner.spawn_from_request(opts);
516 }
517 })
518 .await;
519 }
520
521 #[fuchsia::test]
522 async fn blocking_task_do_not_prevent_further_processing() {
523 spawn_kernel_and_run(async |_, current_task| {
524 let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
525
526 let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
527 for _ in 0..10 {
528 let pair2 = Arc::clone(&pair);
529 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
530 let (lock, cvar) = &*pair2;
531 let mut cont = lock.lock();
532 while !*cont {
533 cvar.wait(&mut cont);
534 }
535 };
536 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
537 spawner.spawn_from_request(req);
538 }
539
540 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
541 let (lock, cvar) = &*pair;
542 let mut cont = lock.lock();
543 *cont = true;
544 cvar.notify_all();
545 };
546
547 let (result, req) =
548 SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
549 spawner.spawn_from_request(req);
550
551 assert_eq!(result(), Ok(()));
552 })
553 .await;
554 }
555
556 #[fuchsia::test]
557 async fn run_spawn_and_get_result() {
558 spawn_kernel_and_run(async |_, current_task| {
559 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
560
561 let (result, req) =
562 SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
563 spawner.spawn_from_request(req);
564 assert_eq!(result(), Ok(3));
565 })
566 .await;
567 }
568
569 #[fuchsia::test]
570 async fn test_spawn_async() {
571 spawn_kernel_and_run(async |_, current_task| {
572 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
573
574 let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
578 let mut exec = fuchsia_async::LocalExecutor::default();
579 let locked_and_task = LockedAndTask::new(locked, current_task);
580 let fut = async {};
581 let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut);
582 exec.run_singlethreaded(wrapped_future);
583 };
584 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
585 spawner.spawn_from_request(req);
586 })
587 .await;
588 }
589
590 #[fuchsia::test]
591 async fn test_spawn_async_closure() {
592 spawn_kernel_and_run(async |_, current_task| {
593 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
594 let fut = async |_: LockedAndTask<'_>| 42;
595 let (result, req) =
596 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
597 spawner.spawn_from_request(req);
598 assert_eq!(result(), Ok(42));
599 })
600 .await;
601 }
602
603 #[fuchsia::test]
604 async fn test_spawn_sync_to_async_result() {
605 spawn_kernel_and_run(async |_, current_task| {
606 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
607 let fut = async |_: LockedAndTask<'_>| 42;
608 let (result, req) =
609 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
610
611 let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
612 let (result2, req2) =
613 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
614 spawner.spawn_from_request(req2);
615 spawner.spawn_from_request(req);
616 assert_eq!(result2(), Ok(42));
617 })
618 .await;
619 }
620
621 #[fuchsia::test]
622 async fn test_spawn_async_to_async_result() {
623 spawn_kernel_and_run(async |_, current_task| {
624 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
625 let fut = async |_: LockedAndTask<'_>| 42;
626 let (result_fut, req) =
627 SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
628
629 let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
630 let (result2, req2) =
631 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
632 spawner.spawn_from_request(req2);
633 spawner.spawn_from_request(req);
634 assert_eq!(result2(), Ok(42));
635 })
636 .await;
637 }
638}