starnix_core/task/
dynamic_thread_spawner.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The API for spawning dynamic kernel threads.
6//!
7//! If you want to run a closure on a kernel thread, check out [SpawnRequestBuilder] on
8//! how to start and configure tasks that run closures.
9
10use crate::execution::create_kernel_thread;
11use crate::task::{
12    CurrentTask, DelayedReleaser, LockedAndTask, Task, WrappedFuture, with_new_current_task,
13};
14use fuchsia_sync::Mutex;
15use futures::TryFutureExt;
16use futures::channel::oneshot;
17use starnix_logging::{log_debug, log_error};
18use starnix_sync::{Locked, Unlocked};
19use starnix_task_command::TaskCommand;
20use starnix_types::ownership::{WeakRef, release_after};
21use starnix_uapi::errno;
22use starnix_uapi::errors::Errno;
23use std::future::Future;
24use std::sync::Arc;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::thread::JoinHandle;
27
28type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
29
30const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
31
32/// A builder for configuring new tasks to spawn.
33///
34/// The builder allows us to set up several different flavors of possible
35/// tasks to spawn, with a menu of options as follows:
36///
37/// - A task may or may not have a name.
38/// - A task may or may not have an assigned scheduler role.
39/// - A task may be a sync, or an async closure.
40/// - A task may return a result, or may not return a result.
41/// - The task's result can be collected synchronously, asynchronously, or not collected at all.
42///
43/// Note that these parameters are not perfectly orthogonal. For example, a task spawned from an
44/// async closure can not return a value asynchronously. (This is not a limitation of the approach,
45/// rather it's the API that we explicitly use today.) Also, some parameter combinations do not
46/// make sense, for example a spawn request can not have both a sync and an async closure to run.
47///
48/// The builder API is designed in a way that only allows chaining the configuration options which
49/// are valid at that point in the configuration process. Invalid option combinations are
50/// compile-time(!) errors. It will only allow creating a [SpawnRequest] if enough parameters have
51/// been passed such that there is enough information to create a request. It will not allow
52/// passing conflicting parameters: for example, if you already passed one synchronous closure, it
53/// is impossible to pass another closure and have the code compile without errors.
54///
55/// ## Usage
56///
57/// Call [SpawnRequestBuilder::new()] to start building. Refer to the unit tests in this module for
58/// usage examples.
59pub struct SpawnRequestBuilder<C: ClosureKind> {
60    debug_name: String,
61    role: Option<&'static str>,
62    closure_kind: C,
63}
64
65/// You can only create an empty request builder.
66impl SpawnRequestBuilder<ClosureNone> {
67    /// Creates a new spawn request builder.
68    pub fn new() -> Self {
69        Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd".into() }
70    }
71}
72
73/// You can call these at any point in the builder's lifecycle.
74impl<C: ClosureKind> SpawnRequestBuilder<C> {
75    /// Set a role to apply to the thread that will run your closure.
76    pub fn with_role(self, role: &'static str) -> Self {
77        Self { role: Some(role), ..self }
78    }
79
80    /// Set a task name to apply to the thread that will run your closure.
81    pub fn with_debug_name(self, debug_name: &'static str) -> Self {
82        Self { debug_name: debug_name.into(), ..self }
83    }
84}
85
86/// You can call these only if you have not provided a closure yet.
87impl SpawnRequestBuilder<ClosureNone> {
88    /// Provides the closure that the spawner will run.
89    pub fn with_sync_closure<F, T>(
90        self,
91        f: F,
92    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
93    where
94        T: Send + 'static,
95        F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
96    {
97        let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
98        SpawnRequestBuilder { role, closure_kind: f, debug_name }
99    }
100
101    /// Provides the closure that the spawner will run.
102    pub fn with_async_closure<F, T>(
103        self,
104        f: F,
105    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
106    where
107        T: Send + 'static,
108        F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
109    {
110        let sync_fn = async_to_sync(f);
111        self.with_sync_closure(sync_fn)
112    }
113}
114
115/// A fully configured spawn request.
116pub struct SpawnRequest {
117    /// The closure to run.
118    closure: BoxedClosure,
119    /// A name to give to the task.
120    debug_name: String,
121}
122
123impl<T, F> SpawnRequestBuilder<F>
124where
125    T: Send + 'static,
126    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
127{
128    /// Build a spawn request.
129    pub fn build(self) -> SpawnRequest {
130        let Self { role, closure_kind, debug_name } = self;
131        let closure = closure_kind;
132        let closure = maybe_apply_role(role, closure);
133        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
134            let _ = closure(locked, current_task);
135        });
136        SpawnRequest { closure, debug_name }
137    }
138
139    /// Like [build], but allows receiving a result synchronously.
140    /// Do not forget to submit the spawn request to a spawner.
141    ///
142    /// Example:
143    ///
144    /// ```
145    /// let (result_fn, request) = /*...*/ .build_with_sync_result();
146    /// // spawn `request`
147    /// let result = result_fn();
148    /// ```
149    pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
150        let Self { role, closure_kind, debug_name } = self;
151        let closure = closure_kind;
152        let (sender, receiver) = sync_channel::<T>(0);
153        let result_fn = move || {
154            receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
155        };
156        let closure = maybe_apply_role(role, closure);
157        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
158            let _ = sender.send(closure(locked, current_task));
159        });
160        (result_fn, SpawnRequest { closure, debug_name })
161    }
162
163    /// Like [build], but allows receiving a result as a future.
164    /// Do not forget to submit the spawn request to a spawner.
165    ///
166    /// Example:
167    ///
168    /// ```
169    /// let (result_fut, request) = /*...*/ .build_with_async_result();
170    /// // spawn `request`
171    /// let result = result_fut.await;
172    /// ```
173    pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
174        let Self { role, closure_kind, debug_name } = self;
175        let closure = closure_kind;
176        let (sender_async, result_fut) = oneshot::channel::<T>();
177        let maybe_with_role = maybe_apply_role(role, closure);
178        let repackaged =
179            Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
180                let result = maybe_with_role(locked, current_task);
181                let _ = sender_async.send(result);
182            });
183        let result_fut =
184            result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
185        (result_fut, SpawnRequest { closure: repackaged, debug_name })
186    }
187}
188
189/// A thread pool that immediately execute any new work sent to it and keep a maximum number of
190/// idle threads.
191#[derive(Debug)]
192pub struct DynamicThreadSpawner {
193    state: Arc<Mutex<DynamicThreadSpawnerState>>,
194    /// The weak system task to create the kernel thread associated with each thread.
195    system_task: WeakRef<Task>,
196    /// A persistent thread that is used to create new thread. This ensures that threads are
197    /// created from the initial starnix process and are not tied to a specific task.
198    persistent_thread: RunningThread,
199}
200
201/// Wrap a closure with a thread role assignment, if one is available.
202fn maybe_apply_role<R, F>(
203    role: Option<&'static str>,
204    f: F,
205) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
206where
207    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
208{
209    move |locked, current_task| {
210        if let Some(role) = role {
211            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
212                log_debug!(e:%; "failed to set kthread role");
213            }
214            let result = f(locked, current_task);
215            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
216                log_debug!(e:%; "failed to reset kthread role to default priority");
217            }
218            result
219        } else {
220            f(locked, current_task)
221        }
222    }
223}
224
225/// Convert async closure to sync closure that can be submitted to the spawner.
226fn async_to_sync<T, F>(
227    f: F,
228) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
229where
230    T: Send + 'static,
231    F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
232{
233    move |locked, current_task| {
234        let mut exec = fuchsia_async::LocalExecutor::default();
235        let locked_and_task = LockedAndTask::new(locked, current_task);
236
237        let locked_and_task_clone = locked_and_task.clone();
238        let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, f(locked_and_task_clone));
239        exec.run_singlethreaded(wrapped_future)
240    }
241}
242
243/// Denotes whether a closure has been provided. A request can not be
244/// built at all without a closure.
245///
246/// See [SpawnRequestBuilder] for usage details.
247pub trait ClosureKind {}
248
249/// A builder type state where no closure has been provided yet.
250///
251/// See [SpawnRequestBuilder] for usage details.
252pub struct ClosureNone {}
253impl ClosureKind for ClosureNone {}
254
255/// A builder type state where a closure has been provided.
256/// See [SpawnRequestBuilder] for usage details.
257impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
258    ClosureKind for FN
259{
260}
261
262#[derive(Debug)]
263struct DynamicThreadSpawnerState {
264    threads: Vec<RunningThread>,
265    idle_threads: u8,
266    max_idle_threads: u8,
267}
268
269impl DynamicThreadSpawner {
270    pub fn new(
271        max_idle_threads: u8,
272        system_task: WeakRef<Task>,
273        debug_name: impl Into<String>,
274    ) -> Self {
275        let persistent_thread =
276            RunningThread::new_persistent(system_task.clone(), debug_name.into());
277        Self {
278            state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
279                max_idle_threads,
280                idle_threads: 0,
281                threads: vec![],
282            })),
283            system_task,
284            persistent_thread,
285        }
286    }
287
288    /// Run a given closure on a thread based on the provided [SpawnRequest].
289    ///
290    /// Use [SpawnRequestBuilder::new()] to start configuring a [SpawnRequest].
291    ///
292    /// This method will use an idle thread in the pool if one is available, otherwise it will
293    /// start a new thread. When this method returns, it is guaranteed that a thread is
294    /// responsible to start running the closure.
295    pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
296        // Check whether a thread already exists to handle the request.
297        let mut function: BoxedClosure = spawn_request.closure;
298        let mut state = self.state.lock();
299        if state.idle_threads > 0 {
300            let mut i = 0;
301            while i < state.threads.len() {
302                // Increases `i` immediately, so that it can be decreased it the thread must be
303                // dropped.
304                let thread_index = i;
305                i += 1;
306                match state.threads[thread_index].try_dispatch(function) {
307                    Ok(_) => {
308                        // The dispatch succeeded.
309                        state.idle_threads -= 1;
310                        return;
311                    }
312                    Err(TrySendError::Full(f)) => {
313                        // The thread is busy.
314                        function = f;
315                    }
316                    Err(TrySendError::Disconnected(f)) => {
317                        // The receiver is disconnected, it means the thread has terminated, drop it.
318                        state.idle_threads -= 1;
319                        state.threads.remove(thread_index);
320                        i -= 1;
321                        function = f;
322                    }
323                }
324            }
325        }
326
327        // A new thread must be created. It needs to be done from the persistent thread.
328        let (sender, receiver) = sync_channel::<RunningThread>(0);
329        let dispatch_function: BoxedClosure = Box::new({
330            let state = self.state.clone();
331            let system_task = self.system_task.clone();
332            move |_, _| {
333                sender
334                    .send(RunningThread::new(
335                        state,
336                        system_task,
337                        spawn_request.debug_name,
338                        function,
339                    ))
340                    .expect("receiver must not be dropped");
341            }
342        });
343        self.persistent_thread
344            .dispatch(dispatch_function)
345            .expect("persistent thread should not have ended.");
346        state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
347    }
348}
349
350type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
351
352impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
353    fn new(locked_and_task: LockedAndTask<'a>, fut: F) -> Self {
354        Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut)
355    }
356}
357
358fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
359    locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
360}
361
362#[derive(Debug)]
363struct RunningThread {
364    thread: Option<JoinHandle<()>>,
365    sender: Option<SyncSender<BoxedClosure>>,
366}
367
368impl RunningThread {
369    fn new(
370        state: Arc<Mutex<DynamicThreadSpawnerState>>,
371        system_task: WeakRef<Task>,
372        debug_task_name: String,
373        f: BoxedClosure,
374    ) -> Self {
375        let (sender, receiver) = sync_channel::<BoxedClosure>(0);
376        let thread = Some(
377            std::thread::Builder::new()
378                .name("kthread-dynamic-worker".to_string())
379                .spawn(move || {
380                    // It's ok to create a new lock context here, since we are on a new thread.
381                    #[allow(
382                        clippy::undocumented_unsafe_blocks,
383                        reason = "Force documented unsafe blocks in Starnix"
384                    )]
385                    let locked = unsafe { Unlocked::new() };
386                    let result = with_new_current_task(
387                        locked,
388                        &system_task,
389                        debug_task_name,
390                        |locked, current_task| {
391                            while let Ok(f) = receiver.recv() {
392                                f(locked, &current_task);
393                                // Apply any delayed releasers.
394                                current_task.trigger_delayed_releaser(locked);
395                                let mut state = state.lock();
396                                state.idle_threads += 1;
397                                if state.idle_threads > state.max_idle_threads {
398                                    // If the number of idle thread is greater than the max, the
399                                    // thread terminates.  This disconnects the receiver, which will
400                                    // ensure that the thread will be joined and remove from the list
401                                    // of available threads the next time the pool tries to use it.
402                                    return;
403                                }
404                            }
405                        },
406                    );
407                    if let Err(e) = result {
408                        log_error!("Unable to create a kernel thread: {e:?}");
409                    }
410                })
411                .expect("able to create threads"),
412        );
413        let result = Self { thread, sender: Some(sender) };
414        // The dispatch cannot fail because the thread can only finish after having executed at
415        // least one task, and this is the first task ever dispatched to it.
416        result
417            .sender
418            .as_ref()
419            .expect("sender should never be None")
420            .send(f)
421            .expect("Dispatch cannot fail");
422        result
423    }
424
425    fn new_persistent(system_task: WeakRef<Task>, task_name: String) -> Self {
426        // The persistent thread doesn't need to do any rendez-vous when received task.
427        let (sender, receiver) = sync_channel::<BoxedClosure>(20);
428        let thread = Some(
429            std::thread::Builder::new()
430                .name("kthread-persistent-worker".to_string())
431                .spawn(move || {
432                    // It's ok to create a new lock context here, since we are on a new thread.
433                    #[allow(
434                        clippy::undocumented_unsafe_blocks,
435                        reason = "Force documented unsafe blocks in Starnix"
436                    )]
437                    let locked = unsafe { Unlocked::new() };
438                    let current_task = {
439                        let Some(system_task) = system_task.upgrade() else {
440                            return;
441                        };
442                        match create_kernel_thread(
443                            locked,
444                            &system_task,
445                            TaskCommand::new(task_name.as_bytes()),
446                        ) {
447                            Ok(task) => task,
448                            Err(e) => {
449                                log_error!("Unable to create a kernel thread: {e:?}");
450                                return;
451                            }
452                        }
453                    };
454                    release_after!(current_task, locked, {
455                        while let Ok(f) = receiver.recv() {
456                            f(locked, &current_task);
457
458                            // Apply any delayed releasers.
459                            current_task.trigger_delayed_releaser(locked);
460                        }
461                    });
462
463                    // Ensure that no releasables are registered after this point as we unwind the stack.
464                    DelayedReleaser::finalize();
465                })
466                .expect("able to create threads"),
467        );
468        Self { thread, sender: Some(sender) }
469    }
470
471    fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
472        self.sender.as_ref().expect("sender should never be None").try_send(f)
473    }
474
475    fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
476        self.sender.as_ref().expect("sender should never be None").send(f)
477    }
478}
479
480impl Drop for RunningThread {
481    fn drop(&mut self) {
482        self.sender = None;
483        match self.thread.take() {
484            Some(thread) => thread.join().expect("Thread should join."),
485            _ => panic!("Thread should never be None"),
486        };
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::testing::spawn_kernel_and_run;
494
495    #[fuchsia::test]
496    async fn run_simple_task() {
497        spawn_kernel_and_run(async |_, current_task| {
498            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
499            // Type decorations are needed sometimes to avoid "closure type is
500            // not general enough" error.
501            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
502            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
503            spawner.spawn_from_request(req);
504        })
505        .await;
506    }
507
508    #[fuchsia::test]
509    async fn run_10_tasks() {
510        spawn_kernel_and_run(async |_, current_task| {
511            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
512            for _ in 0..10 {
513                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
514                let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
515                spawner.spawn_from_request(opts);
516            }
517        })
518        .await;
519    }
520
521    #[fuchsia::test]
522    async fn blocking_task_do_not_prevent_further_processing() {
523        spawn_kernel_and_run(async |_, current_task| {
524            let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
525
526            let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
527            for _ in 0..10 {
528                let pair2 = Arc::clone(&pair);
529                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
530                    let (lock, cvar) = &*pair2;
531                    let mut cont = lock.lock();
532                    while !*cont {
533                        cvar.wait(&mut cont);
534                    }
535                };
536                let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
537                spawner.spawn_from_request(req);
538            }
539
540            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
541                let (lock, cvar) = &*pair;
542                let mut cont = lock.lock();
543                *cont = true;
544                cvar.notify_all();
545            };
546
547            let (result, req) =
548                SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
549            spawner.spawn_from_request(req);
550
551            assert_eq!(result(), Ok(()));
552        })
553        .await;
554    }
555
556    #[fuchsia::test]
557    async fn run_spawn_and_get_result() {
558        spawn_kernel_and_run(async |_, current_task| {
559            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
560
561            let (result, req) =
562                SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
563            spawner.spawn_from_request(req);
564            assert_eq!(result(), Ok(3));
565        })
566        .await;
567    }
568
569    #[fuchsia::test]
570    async fn test_spawn_async() {
571        spawn_kernel_and_run(async |_, current_task| {
572            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
573
574            // The closure free variables must be decorated with their respective types,
575            // the rust compiler gets confused otherwise and is unable to infer the correct
576            // lifetimes. Interestingly, adding your own lifetimes here does *not* help.
577            let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
578                let mut exec = fuchsia_async::LocalExecutor::default();
579                let locked_and_task = LockedAndTask::new(locked, current_task);
580                let fut = async {};
581                let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut);
582                exec.run_singlethreaded(wrapped_future);
583            };
584            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
585            spawner.spawn_from_request(req);
586        })
587        .await;
588    }
589
590    #[fuchsia::test]
591    async fn test_spawn_async_closure() {
592        spawn_kernel_and_run(async |_, current_task| {
593            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
594            let fut = async |_: LockedAndTask<'_>| 42;
595            let (result, req) =
596                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
597            spawner.spawn_from_request(req);
598            assert_eq!(result(), Ok(42));
599        })
600        .await;
601    }
602
603    #[fuchsia::test]
604    async fn test_spawn_sync_to_async_result() {
605        spawn_kernel_and_run(async |_, current_task| {
606            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
607            let fut = async |_: LockedAndTask<'_>| 42;
608            let (result, req) =
609                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
610
611            let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
612            let (result2, req2) =
613                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
614            spawner.spawn_from_request(req2);
615            spawner.spawn_from_request(req);
616            assert_eq!(result2(), Ok(42));
617        })
618        .await;
619    }
620
621    #[fuchsia::test]
622    async fn test_spawn_async_to_async_result() {
623        spawn_kernel_and_run(async |_, current_task| {
624            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
625            let fut = async |_: LockedAndTask<'_>| 42;
626            let (result_fut, req) =
627                SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
628
629            let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
630            let (result2, req2) =
631                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
632            spawner.spawn_from_request(req2);
633            spawner.spawn_from_request(req);
634            assert_eq!(result2(), Ok(42));
635        })
636        .await;
637    }
638}