Skip to main content

starnix_core/task/
dynamic_thread_spawner.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The API for spawning dynamic kernel threads.
6//!
7//! If you want to run a closure on a kernel thread, check out [SpawnRequestBuilder] on
8//! how to start and configure tasks that run closures.
9
10use crate::execution::create_kernel_thread;
11use crate::task::{
12    CurrentTask, DelayedReleaser, LockedAndTask, Task, ThreadLockupDetector, WrappedFuture,
13    with_new_current_task,
14};
15use fuchsia_sync::Mutex;
16use futures::TryFutureExt;
17use futures::channel::oneshot;
18use starnix_logging::{CATEGORY_STARNIX, log_debug, log_error};
19use starnix_sync::{Locked, Unlocked};
20use starnix_task_command::TaskCommand;
21use starnix_types::ownership::release_after;
22use starnix_uapi::errno;
23use starnix_uapi::errors::Errno;
24use std::future::Future;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::sync::{Arc, Weak};
27use std::thread::JoinHandle;
28
29type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
30
31const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
32
33/// A builder for configuring new tasks to spawn.
34///
35/// The builder allows us to set up several different flavors of possible
36/// tasks to spawn, with a menu of options as follows:
37///
38/// - A task may or may not have a name.
39/// - A task may or may not have an assigned scheduler role.
40/// - A task may be a sync, or an async closure.
41/// - A task may return a result, or may not return a result.
42/// - The task's result can be collected synchronously, asynchronously, or not collected at all.
43///
44/// Note that these parameters are not perfectly orthogonal. For example, a task spawned from an
45/// async closure can not return a value asynchronously. (This is not a limitation of the approach,
46/// rather it's the API that we explicitly use today.) Also, some parameter combinations do not
47/// make sense, for example a spawn request can not have both a sync and an async closure to run.
48///
49/// The builder API is designed in a way that only allows chaining the configuration options which
50/// are valid at that point in the configuration process. Invalid option combinations are
51/// compile-time(!) errors. It will only allow creating a [SpawnRequest] if enough parameters have
52/// been passed such that there is enough information to create a request. It will not allow
53/// passing conflicting parameters: for example, if you already passed one synchronous closure, it
54/// is impossible to pass another closure and have the code compile without errors.
55///
56/// ## Usage
57///
58/// Call [SpawnRequestBuilder::new()] to start building. Refer to the unit tests in this module for
59/// usage examples.
60pub struct SpawnRequestBuilder<C: ClosureKind> {
61    debug_name: &'static str,
62    role: Option<&'static str>,
63    closure_kind: C,
64}
65
66/// You can only create an empty request builder.
67impl SpawnRequestBuilder<ClosureNone> {
68    /// Creates a new spawn request builder.
69    pub fn new() -> Self {
70        Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd" }
71    }
72}
73
74/// You can call these at any point in the builder's lifecycle.
75impl<C: ClosureKind> SpawnRequestBuilder<C> {
76    /// Set a role to apply to the thread that will run your closure.
77    pub fn with_role(self, role: &'static str) -> Self {
78        Self { role: Some(role), ..self }
79    }
80
81    /// Set a task name to apply to the thread that will run your closure.
82    pub fn with_debug_name(self, debug_name: &'static str) -> Self {
83        Self { debug_name, ..self }
84    }
85}
86
87/// You can call these only if you have not provided a closure yet.
88impl SpawnRequestBuilder<ClosureNone> {
89    /// Provides the closure that the spawner will run.
90    pub fn with_sync_closure<F, T>(
91        self,
92        f: F,
93    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
94    where
95        T: Send + 'static,
96        F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
97    {
98        let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
99        SpawnRequestBuilder { role, closure_kind: f, debug_name }
100    }
101
102    /// Provides the closure that the spawner will run.
103    pub fn with_async_closure<F, T>(
104        self,
105        f: F,
106    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
107    where
108        T: Send + 'static,
109        F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
110    {
111        let sync_fn = async_to_sync(f, self.debug_name);
112        self.with_sync_closure(sync_fn)
113    }
114}
115
116/// A fully configured spawn request.
117pub struct SpawnRequest {
118    /// The closure to run.
119    closure: BoxedClosure,
120    /// A name to give to the task.
121    debug_name: &'static str,
122}
123
124impl<T, F> SpawnRequestBuilder<F>
125where
126    T: Send + 'static,
127    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
128{
129    /// Build a spawn request.
130    pub fn build(self) -> SpawnRequest {
131        let Self { role, closure_kind, debug_name } = self;
132        let closure = closure_kind;
133        let closure = maybe_apply_role(role, closure);
134        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
135            fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
136            let _ = closure(locked, current_task);
137        });
138        SpawnRequest { closure, debug_name }
139    }
140
141    /// Like [build], but allows receiving a result synchronously.
142    /// Do not forget to submit the spawn request to a spawner.
143    ///
144    /// Example:
145    ///
146    /// ```
147    /// let (result_fn, request) = /*...*/ .build_with_sync_result();
148    /// // spawn `request`
149    /// let result = result_fn();
150    /// ```
151    pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
152        let Self { role, closure_kind, debug_name } = self;
153        let closure = closure_kind;
154        let (sender, receiver) = sync_channel::<T>(0);
155        let result_fn = move || {
156            receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
157        };
158        let closure = maybe_apply_role(role, closure);
159        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
160            fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
161            let _ = sender.send(closure(locked, current_task));
162        });
163        (result_fn, SpawnRequest { closure, debug_name })
164    }
165
166    /// Like [build], but allows receiving a result as a future.
167    /// Do not forget to submit the spawn request to a spawner.
168    ///
169    /// Example:
170    ///
171    /// ```
172    /// let (result_fut, request) = /*...*/ .build_with_async_result();
173    /// // spawn `request`
174    /// let result = result_fut.await;
175    /// ```
176    pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
177        let Self { role, closure_kind, debug_name } = self;
178        let closure = closure_kind;
179        let (sender_async, result_fut) = oneshot::channel::<T>();
180        let maybe_with_role = maybe_apply_role(role, closure);
181        let repackaged =
182            Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
183                fuchsia_trace::duration!(CATEGORY_STARNIX, debug_name);
184                let result = maybe_with_role(locked, current_task);
185                let _ = sender_async.send(result);
186            });
187        let result_fut =
188            result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
189        (result_fut, SpawnRequest { closure: repackaged, debug_name })
190    }
191}
192
193/// A thread pool that immediately execute any new work sent to it and keep a maximum number of
194/// idle threads.
195#[derive(Debug)]
196pub struct DynamicThreadSpawner {
197    state: Arc<Mutex<DynamicThreadSpawnerState>>,
198    /// The weak system task to create the kernel thread associated with each thread.
199    system_task: Weak<Task>,
200    /// A persistent thread that is used to create new thread. This ensures that threads are
201    /// created from the initial starnix process and are not tied to a specific task.
202    persistent_thread: RunningThread,
203}
204
205/// Wrap a closure with a thread role assignment, if one is available.
206fn maybe_apply_role<R, F>(
207    role: Option<&'static str>,
208    f: F,
209) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
210where
211    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
212{
213    move |locked, current_task| {
214        if let Some(role) = role {
215            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
216                log_debug!(e:%; "failed to set kthread role");
217            }
218            let result = f(locked, current_task);
219            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
220                log_debug!(e:%; "failed to reset kthread role to default priority");
221            }
222            result
223        } else {
224            f(locked, current_task)
225        }
226    }
227}
228
229/// Convert async closure to sync closure that can be submitted to the spawner.
230fn async_to_sync<T, F>(
231    f: F,
232    name: &'static str,
233) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
234where
235    T: Send + 'static,
236    F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
237{
238    move |locked, current_task| {
239        let mut exec = fuchsia_async::LocalExecutor::default();
240        let locked_and_task = LockedAndTask::new(locked, current_task);
241
242        let locked_and_task_clone = locked_and_task.clone();
243        let wrapped_future = WrappedSpawnedFuture::new(
244            locked_and_task,
245            ThreadLockupDetector::track_future(f(locked_and_task_clone)),
246            name,
247        );
248        let _waiting_guard = ThreadLockupDetector::pause_tracking();
249        exec.run_singlethreaded(wrapped_future)
250    }
251}
252
253/// Denotes whether a closure has been provided. A request can not be
254/// built at all without a closure.
255///
256/// See [SpawnRequestBuilder] for usage details.
257pub trait ClosureKind {}
258
259/// A builder type state where no closure has been provided yet.
260///
261/// See [SpawnRequestBuilder] for usage details.
262pub struct ClosureNone {}
263impl ClosureKind for ClosureNone {}
264
265/// A builder type state where a closure has been provided.
266/// See [SpawnRequestBuilder] for usage details.
267impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
268    ClosureKind for FN
269{
270}
271
272#[derive(Debug)]
273struct DynamicThreadSpawnerState {
274    threads: Vec<RunningThread>,
275    idle_threads: u8,
276    max_idle_threads: u8,
277}
278
279impl DynamicThreadSpawner {
280    pub fn new(
281        max_idle_threads: u8,
282        system_task: Weak<Task>,
283        debug_name: impl Into<String>,
284    ) -> Self {
285        let persistent_thread =
286            RunningThread::new_persistent(system_task.clone(), debug_name.into());
287        Self {
288            state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
289                max_idle_threads,
290                idle_threads: 0,
291                threads: vec![],
292            })),
293            system_task,
294            persistent_thread,
295        }
296    }
297
298    /// Run a given closure on a thread based on the provided [SpawnRequest].
299    ///
300    /// Use [SpawnRequestBuilder::new()] to start configuring a [SpawnRequest].
301    ///
302    /// This method will use an idle thread in the pool if one is available, otherwise it will
303    /// start a new thread. When this method returns, it is guaranteed that a thread is
304    /// responsible to start running the closure.
305    pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
306        // Check whether a thread already exists to handle the request.
307        let mut function: BoxedClosure = spawn_request.closure;
308        let mut state = self.state.lock();
309        if state.idle_threads > 0 {
310            let mut i = 0;
311            while i < state.threads.len() {
312                // Increases `i` immediately, so that it can be decreased it the thread must be
313                // dropped.
314                let thread_index = i;
315                i += 1;
316                match state.threads[thread_index].try_dispatch(function) {
317                    Ok(_) => {
318                        // The dispatch succeeded.
319                        state.idle_threads -= 1;
320                        return;
321                    }
322                    Err(TrySendError::Full(f)) => {
323                        // The thread is busy.
324                        function = f;
325                    }
326                    Err(TrySendError::Disconnected(f)) => {
327                        // The receiver is disconnected, it means the thread has terminated, drop it.
328                        state.idle_threads -= 1;
329                        state.threads.remove(thread_index);
330                        i -= 1;
331                        function = f;
332                    }
333                }
334            }
335        }
336
337        // A new thread must be created. It needs to be done from the persistent thread.
338        let (sender, receiver) = sync_channel::<RunningThread>(0);
339        let dispatch_function: BoxedClosure = Box::new({
340            let state = self.state.clone();
341            let system_task = self.system_task.clone();
342            move |_, _| {
343                sender
344                    .send(RunningThread::new(
345                        state,
346                        system_task,
347                        spawn_request.debug_name.to_string(),
348                        function,
349                    ))
350                    .expect("receiver must not be dropped");
351            }
352        });
353        self.persistent_thread
354            .dispatch(dispatch_function)
355            .expect("persistent thread should not have ended.");
356        state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
357    }
358}
359
360type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
361
362impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
363    fn new(locked_and_task: LockedAndTask<'a>, fut: F, name: &'static str) -> Self {
364        Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut, name)
365    }
366}
367
368fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
369    locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
370}
371
372#[derive(Debug)]
373struct RunningThread {
374    thread: Option<JoinHandle<()>>,
375    sender: Option<SyncSender<BoxedClosure>>,
376}
377
378impl RunningThread {
379    fn new(
380        state: Arc<Mutex<DynamicThreadSpawnerState>>,
381        system_task: Weak<Task>,
382        debug_task_name: String,
383        f: BoxedClosure,
384    ) -> Self {
385        let (sender, receiver) = sync_channel::<BoxedClosure>(0);
386        let thread = Some(
387            std::thread::Builder::new()
388                .name("kthread-dynamic-worker".to_string())
389                .spawn(move || {
390                    // It's ok to create a new lock context here, since we are on a new thread.
391                    #[allow(
392                        clippy::undocumented_unsafe_blocks,
393                        reason = "Force documented unsafe blocks in Starnix"
394                    )]
395                    let locked = unsafe { Unlocked::new() };
396                    let result = with_new_current_task(
397                        locked,
398                        &system_task,
399                        debug_task_name,
400                        |locked, current_task| {
401                            while let Ok(f) = receiver.recv() {
402                                let _guard = ThreadLockupDetector::track();
403                                f(locked, &current_task);
404                                // Apply any delayed releasers.
405                                current_task.trigger_delayed_releaser(locked);
406                                let mut state = state.lock();
407                                state.idle_threads += 1;
408                                if state.idle_threads > state.max_idle_threads {
409                                    // If the number of idle thread is greater than the max, the
410                                    // thread terminates.  This disconnects the receiver, which will
411                                    // ensure that the thread will be joined and remove from the list
412                                    // of available threads the next time the pool tries to use it.
413                                    return;
414                                }
415                            }
416                        },
417                    );
418                    if let Err(e) = result {
419                        log_error!("Unable to create a kernel thread: {e:?}");
420                    }
421                })
422                .expect("able to create threads"),
423        );
424        let result = Self { thread, sender: Some(sender) };
425        // The dispatch cannot fail because the thread can only finish after having executed at
426        // least one task, and this is the first task ever dispatched to it.
427        result
428            .sender
429            .as_ref()
430            .expect("sender should never be None")
431            .send(f)
432            .expect("Dispatch cannot fail");
433        result
434    }
435
436    fn new_persistent(system_task: Weak<Task>, task_name: String) -> Self {
437        // The persistent thread doesn't need to do any rendez-vous when received task.
438        let (sender, receiver) = sync_channel::<BoxedClosure>(20);
439        let thread = Some(
440            std::thread::Builder::new()
441                .name("kthread-persistent-worker".to_string())
442                .spawn(move || {
443                    // It's ok to create a new lock context here, since we are on a new thread.
444                    #[allow(
445                        clippy::undocumented_unsafe_blocks,
446                        reason = "Force documented unsafe blocks in Starnix"
447                    )]
448                    let locked = unsafe { Unlocked::new() };
449                    let current_task = {
450                        let Some(system_task) = system_task.upgrade() else {
451                            return;
452                        };
453                        match create_kernel_thread(
454                            locked,
455                            &system_task,
456                            TaskCommand::new(task_name.as_bytes()),
457                        ) {
458                            Ok(task) => task,
459                            Err(e) => {
460                                log_error!("Unable to create a kernel thread: {e:?}");
461                                return;
462                            }
463                        }
464                    };
465                    release_after!(current_task, locked, {
466                        while let Ok(f) = receiver.recv() {
467                            let _guard = ThreadLockupDetector::track();
468                            f(locked, &current_task);
469
470                            // Apply any delayed releasers.
471                            current_task.trigger_delayed_releaser(locked);
472                        }
473                    });
474
475                    // Ensure that no releasables are registered after this point as we unwind the stack.
476                    DelayedReleaser::finalize();
477                })
478                .expect("able to create threads"),
479        );
480        Self { thread, sender: Some(sender) }
481    }
482
483    fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
484        self.sender.as_ref().expect("sender should never be None").try_send(f)
485    }
486
487    fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
488        self.sender.as_ref().expect("sender should never be None").send(f)
489    }
490}
491
492impl Drop for RunningThread {
493    fn drop(&mut self) {
494        self.sender = None;
495        match self.thread.take() {
496            Some(thread) => thread.join().expect("Thread should join."),
497            _ => panic!("Thread should never be None"),
498        };
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::testing::spawn_kernel_and_run;
506
507    #[fuchsia::test]
508    async fn run_simple_task() {
509        spawn_kernel_and_run(async |_, current_task| {
510            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
511            // Type decorations are needed sometimes to avoid "closure type is
512            // not general enough" error.
513            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
514            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
515            spawner.spawn_from_request(req);
516        })
517        .await;
518    }
519
520    #[fuchsia::test]
521    async fn run_10_tasks() {
522        spawn_kernel_and_run(async |_, current_task| {
523            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
524            for _ in 0..10 {
525                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
526                let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
527                spawner.spawn_from_request(opts);
528            }
529        })
530        .await;
531    }
532
533    #[fuchsia::test]
534    async fn blocking_task_do_not_prevent_further_processing() {
535        spawn_kernel_and_run(async |_, current_task| {
536            let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
537
538            let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
539            for _ in 0..10 {
540                let pair2 = Arc::clone(&pair);
541                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
542                    let (lock, cvar) = &*pair2;
543                    let mut cont = lock.lock();
544                    while !*cont {
545                        cvar.wait(&mut cont);
546                    }
547                };
548                let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
549                spawner.spawn_from_request(req);
550            }
551
552            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
553                let (lock, cvar) = &*pair;
554                let mut cont = lock.lock();
555                *cont = true;
556                cvar.notify_all();
557            };
558
559            let (result, req) =
560                SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
561            spawner.spawn_from_request(req);
562
563            assert_eq!(result(), Ok(()));
564        })
565        .await;
566    }
567
568    #[fuchsia::test]
569    async fn run_spawn_and_get_result() {
570        spawn_kernel_and_run(async |_, current_task| {
571            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
572
573            let (result, req) =
574                SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
575            spawner.spawn_from_request(req);
576            assert_eq!(result(), Ok(3));
577        })
578        .await;
579    }
580
581    #[fuchsia::test]
582    async fn test_spawn_async() {
583        spawn_kernel_and_run(async |_, current_task| {
584            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
585
586            // The closure free variables must be decorated with their respective types,
587            // the rust compiler gets confused otherwise and is unable to infer the correct
588            // lifetimes. Interestingly, adding your own lifetimes here does *not* help.
589            let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
590                let mut exec = fuchsia_async::LocalExecutor::default();
591                let locked_and_task = LockedAndTask::new(locked, current_task);
592                let fut = async {};
593                let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut, "test-async");
594                exec.run_singlethreaded(wrapped_future);
595            };
596            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
597            spawner.spawn_from_request(req);
598        })
599        .await;
600    }
601
602    #[fuchsia::test]
603    async fn test_spawn_async_closure() {
604        spawn_kernel_and_run(async |_, current_task| {
605            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
606            let fut = async |_: LockedAndTask<'_>| 42;
607            let (result, req) =
608                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
609            spawner.spawn_from_request(req);
610            assert_eq!(result(), Ok(42));
611        })
612        .await;
613    }
614
615    #[fuchsia::test]
616    async fn test_spawn_sync_to_async_result() {
617        spawn_kernel_and_run(async |_, current_task| {
618            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
619            let fut = async |_: LockedAndTask<'_>| 42;
620            let (result, req) =
621                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
622
623            let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
624            let (result2, req2) =
625                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
626            spawner.spawn_from_request(req2);
627            spawner.spawn_from_request(req);
628            assert_eq!(result2(), Ok(42));
629        })
630        .await;
631    }
632
633    #[fuchsia::test]
634    async fn test_spawn_async_to_async_result() {
635        spawn_kernel_and_run(async |_, current_task| {
636            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
637            let fut = async |_: LockedAndTask<'_>| 42;
638            let (result_fut, req) =
639                SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
640
641            let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
642            let (result2, req2) =
643                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
644            spawner.spawn_from_request(req2);
645            spawner.spawn_from_request(req);
646            assert_eq!(result2(), Ok(42));
647        })
648        .await;
649    }
650}