1use crate::execution::create_kernel_thread;
11use crate::task::{
12 CurrentTask, DelayedReleaser, LockedAndTask, Task, WrappedFuture, with_new_current_task,
13};
14use fuchsia_sync::Mutex;
15use futures::TryFutureExt;
16use futures::channel::oneshot;
17use starnix_logging::{CATEGORY_STARNIX, log_debug, log_error, trace_duration};
18use starnix_sync::{Locked, Unlocked};
19use starnix_task_command::TaskCommand;
20use starnix_types::ownership::{WeakRef, release_after};
21use starnix_uapi::errno;
22use starnix_uapi::errors::Errno;
23use std::future::Future;
24use std::sync::Arc;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::thread::JoinHandle;
27
28type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
29
30const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
31
32pub struct SpawnRequestBuilder<C: ClosureKind> {
60 debug_name: &'static str,
61 role: Option<&'static str>,
62 closure_kind: C,
63}
64
65impl SpawnRequestBuilder<ClosureNone> {
67 pub fn new() -> Self {
69 Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd" }
70 }
71}
72
73impl<C: ClosureKind> SpawnRequestBuilder<C> {
75 pub fn with_role(self, role: &'static str) -> Self {
77 Self { role: Some(role), ..self }
78 }
79
80 pub fn with_debug_name(self, debug_name: &'static str) -> Self {
82 Self { debug_name, ..self }
83 }
84}
85
86impl SpawnRequestBuilder<ClosureNone> {
88 pub fn with_sync_closure<F, T>(
90 self,
91 f: F,
92 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
93 where
94 T: Send + 'static,
95 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
96 {
97 let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
98 SpawnRequestBuilder { role, closure_kind: f, debug_name }
99 }
100
101 pub fn with_async_closure<F, T>(
103 self,
104 f: F,
105 ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
106 where
107 T: Send + 'static,
108 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
109 {
110 let sync_fn = async_to_sync(f, self.debug_name);
111 self.with_sync_closure(sync_fn)
112 }
113}
114
115pub struct SpawnRequest {
117 closure: BoxedClosure,
119 debug_name: &'static str,
121}
122
123impl<T, F> SpawnRequestBuilder<F>
124where
125 T: Send + 'static,
126 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
127{
128 pub fn build(self) -> SpawnRequest {
130 let Self { role, closure_kind, debug_name } = self;
131 let closure = closure_kind;
132 let closure = maybe_apply_role(role, closure);
133 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
134 trace_duration!(CATEGORY_STARNIX, debug_name);
135 let _ = closure(locked, current_task);
136 });
137 SpawnRequest { closure, debug_name }
138 }
139
140 pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
151 let Self { role, closure_kind, debug_name } = self;
152 let closure = closure_kind;
153 let (sender, receiver) = sync_channel::<T>(0);
154 let result_fn = move || {
155 receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
156 };
157 let closure = maybe_apply_role(role, closure);
158 let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
159 trace_duration!(CATEGORY_STARNIX, debug_name);
160 let _ = sender.send(closure(locked, current_task));
161 });
162 (result_fn, SpawnRequest { closure, debug_name })
163 }
164
165 pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
176 let Self { role, closure_kind, debug_name } = self;
177 let closure = closure_kind;
178 let (sender_async, result_fut) = oneshot::channel::<T>();
179 let maybe_with_role = maybe_apply_role(role, closure);
180 let repackaged =
181 Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
182 trace_duration!(CATEGORY_STARNIX, debug_name);
183 let result = maybe_with_role(locked, current_task);
184 let _ = sender_async.send(result);
185 });
186 let result_fut =
187 result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
188 (result_fut, SpawnRequest { closure: repackaged, debug_name })
189 }
190}
191
192#[derive(Debug)]
195pub struct DynamicThreadSpawner {
196 state: Arc<Mutex<DynamicThreadSpawnerState>>,
197 system_task: WeakRef<Task>,
199 persistent_thread: RunningThread,
202}
203
204fn maybe_apply_role<R, F>(
206 role: Option<&'static str>,
207 f: F,
208) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
209where
210 F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
211{
212 move |locked, current_task| {
213 if let Some(role) = role {
214 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
215 log_debug!(e:%; "failed to set kthread role");
216 }
217 let result = f(locked, current_task);
218 if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
219 log_debug!(e:%; "failed to reset kthread role to default priority");
220 }
221 result
222 } else {
223 f(locked, current_task)
224 }
225 }
226}
227
228fn async_to_sync<T, F>(
230 f: F,
231 name: &'static str,
232) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
233where
234 T: Send + 'static,
235 F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
236{
237 move |locked, current_task| {
238 let mut exec = fuchsia_async::LocalExecutor::default();
239 let locked_and_task = LockedAndTask::new(locked, current_task);
240
241 let locked_and_task_clone = locked_and_task.clone();
242 let wrapped_future =
243 WrappedSpawnedFuture::new(locked_and_task, f(locked_and_task_clone), name);
244 exec.run_singlethreaded(wrapped_future)
245 }
246}
247
248pub trait ClosureKind {}
253
254pub struct ClosureNone {}
258impl ClosureKind for ClosureNone {}
259
260impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
263 ClosureKind for FN
264{
265}
266
267#[derive(Debug)]
268struct DynamicThreadSpawnerState {
269 threads: Vec<RunningThread>,
270 idle_threads: u8,
271 max_idle_threads: u8,
272}
273
274impl DynamicThreadSpawner {
275 pub fn new(
276 max_idle_threads: u8,
277 system_task: WeakRef<Task>,
278 debug_name: impl Into<String>,
279 ) -> Self {
280 let persistent_thread =
281 RunningThread::new_persistent(system_task.clone(), debug_name.into());
282 Self {
283 state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
284 max_idle_threads,
285 idle_threads: 0,
286 threads: vec![],
287 })),
288 system_task,
289 persistent_thread,
290 }
291 }
292
293 pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
301 let mut function: BoxedClosure = spawn_request.closure;
303 let mut state = self.state.lock();
304 if state.idle_threads > 0 {
305 let mut i = 0;
306 while i < state.threads.len() {
307 let thread_index = i;
310 i += 1;
311 match state.threads[thread_index].try_dispatch(function) {
312 Ok(_) => {
313 state.idle_threads -= 1;
315 return;
316 }
317 Err(TrySendError::Full(f)) => {
318 function = f;
320 }
321 Err(TrySendError::Disconnected(f)) => {
322 state.idle_threads -= 1;
324 state.threads.remove(thread_index);
325 i -= 1;
326 function = f;
327 }
328 }
329 }
330 }
331
332 let (sender, receiver) = sync_channel::<RunningThread>(0);
334 let dispatch_function: BoxedClosure = Box::new({
335 let state = self.state.clone();
336 let system_task = self.system_task.clone();
337 move |_, _| {
338 sender
339 .send(RunningThread::new(
340 state,
341 system_task,
342 spawn_request.debug_name.to_string(),
343 function,
344 ))
345 .expect("receiver must not be dropped");
346 }
347 });
348 self.persistent_thread
349 .dispatch(dispatch_function)
350 .expect("persistent thread should not have ended.");
351 state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
352 }
353}
354
355type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
356
357impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
358 fn new(locked_and_task: LockedAndTask<'a>, fut: F, name: &'static str) -> Self {
359 Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut, name)
360 }
361}
362
363fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
364 locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
365}
366
367#[derive(Debug)]
368struct RunningThread {
369 thread: Option<JoinHandle<()>>,
370 sender: Option<SyncSender<BoxedClosure>>,
371}
372
373impl RunningThread {
374 fn new(
375 state: Arc<Mutex<DynamicThreadSpawnerState>>,
376 system_task: WeakRef<Task>,
377 debug_task_name: String,
378 f: BoxedClosure,
379 ) -> Self {
380 let (sender, receiver) = sync_channel::<BoxedClosure>(0);
381 let thread = Some(
382 std::thread::Builder::new()
383 .name("kthread-dynamic-worker".to_string())
384 .spawn(move || {
385 #[allow(
387 clippy::undocumented_unsafe_blocks,
388 reason = "Force documented unsafe blocks in Starnix"
389 )]
390 let locked = unsafe { Unlocked::new() };
391 let result = with_new_current_task(
392 locked,
393 &system_task,
394 debug_task_name,
395 |locked, current_task| {
396 while let Ok(f) = receiver.recv() {
397 f(locked, ¤t_task);
398 current_task.trigger_delayed_releaser(locked);
400 let mut state = state.lock();
401 state.idle_threads += 1;
402 if state.idle_threads > state.max_idle_threads {
403 return;
408 }
409 }
410 },
411 );
412 if let Err(e) = result {
413 log_error!("Unable to create a kernel thread: {e:?}");
414 }
415 })
416 .expect("able to create threads"),
417 );
418 let result = Self { thread, sender: Some(sender) };
419 result
422 .sender
423 .as_ref()
424 .expect("sender should never be None")
425 .send(f)
426 .expect("Dispatch cannot fail");
427 result
428 }
429
430 fn new_persistent(system_task: WeakRef<Task>, task_name: String) -> Self {
431 let (sender, receiver) = sync_channel::<BoxedClosure>(20);
433 let thread = Some(
434 std::thread::Builder::new()
435 .name("kthread-persistent-worker".to_string())
436 .spawn(move || {
437 #[allow(
439 clippy::undocumented_unsafe_blocks,
440 reason = "Force documented unsafe blocks in Starnix"
441 )]
442 let locked = unsafe { Unlocked::new() };
443 let current_task = {
444 let Some(system_task) = system_task.upgrade() else {
445 return;
446 };
447 match create_kernel_thread(
448 locked,
449 &system_task,
450 TaskCommand::new(task_name.as_bytes()),
451 ) {
452 Ok(task) => task,
453 Err(e) => {
454 log_error!("Unable to create a kernel thread: {e:?}");
455 return;
456 }
457 }
458 };
459 release_after!(current_task, locked, {
460 while let Ok(f) = receiver.recv() {
461 f(locked, ¤t_task);
462
463 current_task.trigger_delayed_releaser(locked);
465 }
466 });
467
468 DelayedReleaser::finalize();
470 })
471 .expect("able to create threads"),
472 );
473 Self { thread, sender: Some(sender) }
474 }
475
476 fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
477 self.sender.as_ref().expect("sender should never be None").try_send(f)
478 }
479
480 fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
481 self.sender.as_ref().expect("sender should never be None").send(f)
482 }
483}
484
485impl Drop for RunningThread {
486 fn drop(&mut self) {
487 self.sender = None;
488 match self.thread.take() {
489 Some(thread) => thread.join().expect("Thread should join."),
490 _ => panic!("Thread should never be None"),
491 };
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use crate::testing::spawn_kernel_and_run;
499
500 #[fuchsia::test]
501 async fn run_simple_task() {
502 spawn_kernel_and_run(async |_, current_task| {
503 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
504 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
507 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
508 spawner.spawn_from_request(req);
509 })
510 .await;
511 }
512
513 #[fuchsia::test]
514 async fn run_10_tasks() {
515 spawn_kernel_and_run(async |_, current_task| {
516 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
517 for _ in 0..10 {
518 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
519 let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
520 spawner.spawn_from_request(opts);
521 }
522 })
523 .await;
524 }
525
526 #[fuchsia::test]
527 async fn blocking_task_do_not_prevent_further_processing() {
528 spawn_kernel_and_run(async |_, current_task| {
529 let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
530
531 let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
532 for _ in 0..10 {
533 let pair2 = Arc::clone(&pair);
534 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
535 let (lock, cvar) = &*pair2;
536 let mut cont = lock.lock();
537 while !*cont {
538 cvar.wait(&mut cont);
539 }
540 };
541 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
542 spawner.spawn_from_request(req);
543 }
544
545 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
546 let (lock, cvar) = &*pair;
547 let mut cont = lock.lock();
548 *cont = true;
549 cvar.notify_all();
550 };
551
552 let (result, req) =
553 SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
554 spawner.spawn_from_request(req);
555
556 assert_eq!(result(), Ok(()));
557 })
558 .await;
559 }
560
561 #[fuchsia::test]
562 async fn run_spawn_and_get_result() {
563 spawn_kernel_and_run(async |_, current_task| {
564 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
565
566 let (result, req) =
567 SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
568 spawner.spawn_from_request(req);
569 assert_eq!(result(), Ok(3));
570 })
571 .await;
572 }
573
574 #[fuchsia::test]
575 async fn test_spawn_async() {
576 spawn_kernel_and_run(async |_, current_task| {
577 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
578
579 let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
583 let mut exec = fuchsia_async::LocalExecutor::default();
584 let locked_and_task = LockedAndTask::new(locked, current_task);
585 let fut = async {};
586 let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut, "test-async");
587 exec.run_singlethreaded(wrapped_future);
588 };
589 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
590 spawner.spawn_from_request(req);
591 })
592 .await;
593 }
594
595 #[fuchsia::test]
596 async fn test_spawn_async_closure() {
597 spawn_kernel_and_run(async |_, current_task| {
598 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
599 let fut = async |_: LockedAndTask<'_>| 42;
600 let (result, req) =
601 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
602 spawner.spawn_from_request(req);
603 assert_eq!(result(), Ok(42));
604 })
605 .await;
606 }
607
608 #[fuchsia::test]
609 async fn test_spawn_sync_to_async_result() {
610 spawn_kernel_and_run(async |_, current_task| {
611 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
612 let fut = async |_: LockedAndTask<'_>| 42;
613 let (result, req) =
614 SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
615
616 let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
617 let (result2, req2) =
618 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
619 spawner.spawn_from_request(req2);
620 spawner.spawn_from_request(req);
621 assert_eq!(result2(), Ok(42));
622 })
623 .await;
624 }
625
626 #[fuchsia::test]
627 async fn test_spawn_async_to_async_result() {
628 spawn_kernel_and_run(async |_, current_task| {
629 let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
630 let fut = async |_: LockedAndTask<'_>| 42;
631 let (result_fut, req) =
632 SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
633
634 let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
635 let (result2, req2) =
636 SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
637 spawner.spawn_from_request(req2);
638 spawner.spawn_from_request(req);
639 assert_eq!(result2(), Ok(42));
640 })
641 .await;
642 }
643}