1use crate::execution::create_kernel_thread;
11use crate::task::{
12 CurrentTask, DelayedReleaser, LockedAndTask, Task, ThreadLockupDetector, WrappedFuture,
13 with_new_current_task,
14};
15use fuchsia_sync::Mutex;
16use futures::TryFutureExt;
17use futures::channel::oneshot;
18use starnix_logging::{CATEGORY_STARNIX, log_debug, log_error};
19use starnix_sync::{Locked, Unlocked};
20use starnix_task_command::TaskCommand;
21use starnix_types::ownership::release_after;
22use starnix_uapi::errno;
23use starnix_uapi::errors::Errno;
24use std::future::Future;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::sync::{Arc, Weak};
27use std::thread::JoinHandle;
28
29type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
30
31const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
32
33pub struct SpawnRequestBuilder<C: ClosureKind> {
61 debug_name: &'static str,
62 role: Option<&'static str>,
63 closure_kind: C,
64}
65
66impl SpawnRequestBuilder<ClosureNone> {
68 pub fn new() -> Self {
70 Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd" }
71 }
72}
73
74impl<C: ClosureKind> SpawnRequestBuilder<C> {
76 pub fn with_role(self, role: &'static str) -> Self {
78 Self { role: Some(role), ..self }
79 }
80
81 pub fn with_debug_name(self, debug_name: &'static str) -> Self {
83 Self { debug_name, ..self }
84 }
85}
86
87impl SpawnRequestBuilder<ClosureNone> {
89 pub fn with_sync_closure<F, T>(
91 self,
92 f: F,
93 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
94 where
95 T: Send + 'static,
96 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
97 {
98 let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
99 SpawnRequestBuilder { role, closure_kind: f, debug_name }
100 }
101
102 pub fn with_async_closure<F, T>(
104 self,
105 f: F,
106 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
107 where
108 T: Send + 'static,
109 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
110 {
111 let sync_fn = async_to_sync(f, self.debug_name);
112 self.with_sync_closure(sync_fn)
113 }
114}
115
116pub struct SpawnRequest {
118 closure: BoxedClosure,
120 debug_name: &'static str,
122}
123
124impl<T, F> SpawnRequestBuilder<F>
125where
126 T: Send + 'static,
127 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
128{
129 pub fn build(self) -> SpawnRequest {
131 let Self { role, closure_kind, debug_name } = self;
132 let closure = closure_kind;
133 let closure = maybe_apply_role(role, closure);
134 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
135 fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
136 let _ = closure(locked, current_task);
137 });
138 SpawnRequest { closure, debug_name }
139 }
140
141 pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
152 let Self { role, closure_kind, debug_name } = self;
153 let closure = closure_kind;
154 let (sender, receiver) = sync_channel::<T>(0);
155 let result_fn = move || {
156 receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
157 };
158 let closure = maybe_apply_role(role, closure);
159 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
160 fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
161 let _ = sender.send(closure(locked, current_task));
162 });
163 (result_fn, SpawnRequest { closure, debug_name })
164 }
165
166 pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
177 let Self { role, closure_kind, debug_name } = self;
178 let closure = closure_kind;
179 let (sender_async, result_fut) = oneshot::channel::<T>();
180 let maybe_with_role = maybe_apply_role(role, closure);
181 let repackaged =
182 Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
183 fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
184 let result = maybe_with_role(locked, current_task);
185 let _ = sender_async.send(result);
186 });
187 let result_fut =
188 result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
189 (result_fut, SpawnRequest { closure: repackaged, debug_name })
190 }
191}
192
193#[derive(Debug)]
196pub struct DynamicThreadSpawner {
197 state: Arc<Mutex<DynamicThreadSpawnerState>>,
198 system_task: Weak<Task>,
200 persistent_thread: RunningThread,
203}
204
205fn maybe_apply_role<R, F>(
207 role: Option<&'static str>,
208 f: F,
209) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
210where
211 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
212{
213 move |locked, current_task| {
214 if let Some(role) = role {
215 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
216 log_debug!(e:%; "failed to set kthread role");
217 }
218 let result = f(locked, current_task);
219 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
220 log_debug!(e:%; "failed to reset kthread role to default priority");
221 }
222 result
223 } else {
224 f(locked, current_task)
225 }
226 }
227}
228
229fn async_to_sync<T, F>(
231 f: F,
232 name: &'static str,
233) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
234where
235 T: Send + 'static,
236 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
237{
238 move |locked, current_task| {
239 let mut exec = fuchsia_async::LocalExecutor::default();
240 let locked_and_task = LockedAndTask::new(locked, current_task);
241
242 let locked_and_task_clone = locked_and_task.clone();
243 let wrapped_future = WrappedSpawnedFuture::new(
244 locked_and_task,
245 ThreadLockupDetector::track_future(f(locked_and_task_clone)),
246 name,
247 );
248 let _waiting_guard = ThreadLockupDetector::pause_tracking();
249 exec.run_singlethreaded(wrapped_future)
250 }
251}
252
253pub trait ClosureKind {}
258
259pub struct ClosureNone {}
263impl ClosureKind for ClosureNone {}
264
265impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
268 ClosureKind for FN
269{
270}
271
272#[derive(Debug)]
273struct DynamicThreadSpawnerState {
274 threads: Vec<RunningThread>,
275 idle_threads: u8,
276 max_idle_threads: u8,
277}
278
279impl DynamicThreadSpawner {
280 pub fn new(
281 max_idle_threads: u8,
282 system_task: Weak<Task>,
283 debug_name: impl Into<String>,
284 ) -> Self {
285 let persistent_thread =
286 RunningThread::new_persistent(system_task.clone(), debug_name.into());
287 Self {
288 state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
289 max_idle_threads,
290 idle_threads: 0,
291 threads: vec![],
292 })),
293 system_task,
294 persistent_thread,
295 }
296 }
297
298 pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
306 let mut function: BoxedClosure = spawn_request.closure;
308 let mut state = self.state.lock();
309 if state.idle_threads > 0 {
310 let mut i = 0;
311 while i < state.threads.len() {
312 let thread_index = i;
315 i += 1;
316 match state.threads[thread_index].try_dispatch(function) {
317 Ok(_) => {
318 state.idle_threads -= 1;
320 return;
321 }
322 Err(TrySendError::Full(f)) => {
323 function = f;
325 }
326 Err(TrySendError::Disconnected(f)) => {
327 state.idle_threads -= 1;
329 state.threads.remove(thread_index);
330 i -= 1;
331 function = f;
332 }
333 }
334 }
335 }
336
337 let (sender, receiver) = sync_channel::<RunningThread>(0);
339 let dispatch_function: BoxedClosure = Box::new({
340 let state = self.state.clone();
341 let system_task = self.system_task.clone();
342 move |_, _| {
343 sender
344 .send(RunningThread::new(
345 state,
346 system_task,
347 spawn_request.debug_name.to_string(),
348 function,
349 ))
350 .expect("receiver must not be dropped");
351 }
352 });
353 self.persistent_thread
354 .dispatch(dispatch_function)
355 .expect("persistent thread should not have ended.");
356 state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
357 }
358}
359
360type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
361
362impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
363 fn new(locked_and_task: LockedAndTask<'a>, fut: F, name: &'static str) -> Self {
364 Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut, name)
365 }
366}
367
368fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
369 locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
370}
371
372#[derive(Debug)]
373struct RunningThread {
374 thread: Option<JoinHandle<()>>,
375 sender: Option<SyncSender<BoxedClosure>>,
376}
377
378impl RunningThread {
379 fn new(
380 state: Arc<Mutex<DynamicThreadSpawnerState>>,
381 system_task: Weak<Task>,
382 debug_task_name: String,
383 f: BoxedClosure,
384 ) -> Self {
385 let (sender, receiver) = sync_channel::<BoxedClosure>(0);
386 let thread = Some(
387 std::thread::Builder::new()
388 .name("kthread-dynamic-worker".to_string())
389 .spawn(move || {
390 #[allow(
392 clippy::undocumented_unsafe_blocks,
393 reason = "Force documented unsafe blocks in Starnix"
394 )]
395 let locked = unsafe { Unlocked::new() };
396 let result = with_new_current_task(
397 locked,
398 &system_task,
399 debug_task_name,
400 |locked, current_task| {
401 while let Ok(f) = receiver.recv() {
402 let _guard = ThreadLockupDetector::track();
403 f(locked, ¤t_task);
404 current_task.trigger_delayed_releaser(locked);
406 let mut state = state.lock();
407 state.idle_threads += 1;
408 if state.idle_threads > state.max_idle_threads {
409 return;
414 }
415 }
416 },
417 );
418 if let Err(e) = result {
419 log_error!("Unable to create a kernel thread: {e:?}");
420 }
421 })
422 .expect("able to create threads"),
423 );
424 let result = Self { thread, sender: Some(sender) };
425 result
428 .sender
429 .as_ref()
430 .expect("sender should never be None")
431 .send(f)
432 .expect("Dispatch cannot fail");
433 result
434 }
435
436 fn new_persistent(system_task: Weak<Task>, task_name: String) -> Self {
437 let (sender, receiver) = sync_channel::<BoxedClosure>(20);
439 let thread = Some(
440 std::thread::Builder::new()
441 .name("kthread-persistent-worker".to_string())
442 .spawn(move || {
443 #[allow(
445 clippy::undocumented_unsafe_blocks,
446 reason = "Force documented unsafe blocks in Starnix"
447 )]
448 let locked = unsafe { Unlocked::new() };
449 let current_task = {
450 let Some(system_task) = system_task.upgrade() else {
451 return;
452 };
453 match create_kernel_thread(
454 locked,
455 &system_task,
456 TaskCommand::new(task_name.as_bytes()),
457 ) {
458 Ok(task) => task,
459 Err(e) => {
460 log_error!("Unable to create a kernel thread: {e:?}");
461 return;
462 }
463 }
464 };
465 release_after!(current_task, locked, {
466 while let Ok(f) = receiver.recv() {
467 let _guard = ThreadLockupDetector::track();
468 f(locked, ¤t_task);
469
470 current_task.trigger_delayed_releaser(locked);
472 }
473 });
474
475 DelayedReleaser::finalize();
477 })
478 .expect("able to create threads"),
479 );
480 Self { thread, sender: Some(sender) }
481 }
482
483 fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
484 self.sender.as_ref().expect("sender should never be None").try_send(f)
485 }
486
487 fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
488 self.sender.as_ref().expect("sender should never be None").send(f)
489 }
490}
491
492impl Drop for RunningThread {
493 fn drop(&mut self) {
494 self.sender = None;
495 match self.thread.take() {
496 Some(thread) => thread.join().expect("Thread should join."),
497 _ => panic!("Thread should never be None"),
498 };
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::testing::spawn_kernel_and_run;
506
507 #[fuchsia::test]
508 async fn run_simple_task() {
509 spawn_kernel_and_run(async |_, current_task| {
510 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
511 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
514 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
515 spawner.spawn_from_request(req);
516 })
517 .await;
518 }
519
520 #[fuchsia::test]
521 async fn run_10_tasks() {
522 spawn_kernel_and_run(async |_, current_task| {
523 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
524 for _ in 0..10 {
525 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
526 let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
527 spawner.spawn_from_request(opts);
528 }
529 })
530 .await;
531 }
532
533 #[fuchsia::test]
534 async fn blocking_task_do_not_prevent_further_processing() {
535 spawn_kernel_and_run(async |_, current_task| {
536 let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
537
538 let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
539 for _ in 0..10 {
540 let pair2 = Arc::clone(&pair);
541 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
542 let (lock, cvar) = &*pair2;
543 let mut cont = lock.lock();
544 while !*cont {
545 cvar.wait(&mut cont);
546 }
547 };
548 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
549 spawner.spawn_from_request(req);
550 }
551
552 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
553 let (lock, cvar) = &*pair;
554 let mut cont = lock.lock();
555 *cont = true;
556 cvar.notify_all();
557 };
558
559 let (result, req) =
560 SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
561 spawner.spawn_from_request(req);
562
563 assert_eq!(result(), Ok(()));
564 })
565 .await;
566 }
567
568 #[fuchsia::test]
569 async fn run_spawn_and_get_result() {
570 spawn_kernel_and_run(async |_, current_task| {
571 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
572
573 let (result, req) =
574 SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
575 spawner.spawn_from_request(req);
576 assert_eq!(result(), Ok(3));
577 })
578 .await;
579 }
580
581 #[fuchsia::test]
582 async fn test_spawn_async() {
583 spawn_kernel_and_run(async |_, current_task| {
584 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
585
586 let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
590 let mut exec = fuchsia_async::LocalExecutor::default();
591 let locked_and_task = LockedAndTask::new(locked, current_task);
592 let fut = async {};
593 let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut, "test-async");
594 exec.run_singlethreaded(wrapped_future);
595 };
596 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
597 spawner.spawn_from_request(req);
598 })
599 .await;
600 }
601
602 #[fuchsia::test]
603 async fn test_spawn_async_closure() {
604 spawn_kernel_and_run(async |_, current_task| {
605 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
606 let fut = async |_: LockedAndTask<'_>| 42;
607 let (result, req) =
608 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
609 spawner.spawn_from_request(req);
610 assert_eq!(result(), Ok(42));
611 })
612 .await;
613 }
614
615 #[fuchsia::test]
616 async fn test_spawn_sync_to_async_result() {
617 spawn_kernel_and_run(async |_, current_task| {
618 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
619 let fut = async |_: LockedAndTask<'_>| 42;
620 let (result, req) =
621 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
622
623 let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
624 let (result2, req2) =
625 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
626 spawner.spawn_from_request(req2);
627 spawner.spawn_from_request(req);
628 assert_eq!(result2(), Ok(42));
629 })
630 .await;
631 }
632
633 #[fuchsia::test]
634 async fn test_spawn_async_to_async_result() {
635 spawn_kernel_and_run(async |_, current_task| {
636 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
637 let fut = async |_: LockedAndTask<'_>| 42;
638 let (result_fut, req) =
639 SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
640
641 let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
642 let (result2, req2) =
643 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
644 spawner.spawn_from_request(req2);
645 spawner.spawn_from_request(req);
646 assert_eq!(result2(), Ok(42));
647 })
648 .await;
649 }
650}