starnix_core/task/
dynamic_thread_spawner.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The API for spawning dynamic kernel threads.
6//!
7//! If you want to run a closure on a kernel thread, check out [SpawnRequestBuilder] on
8//! how to start and configure tasks that run closures.
9
10use crate::execution::create_kernel_thread;
11use crate::task::{
12    CurrentTask, DelayedReleaser, LockedAndTask, Task, WrappedFuture, with_new_current_task,
13};
14use fuchsia_sync::Mutex;
15use futures::TryFutureExt;
16use futures::channel::oneshot;
17use starnix_logging::{CATEGORY_STARNIX, log_debug, log_error, trace_duration};
18use starnix_sync::{Locked, Unlocked};
19use starnix_task_command::TaskCommand;
20use starnix_types::ownership::{WeakRef, release_after};
21use starnix_uapi::errno;
22use starnix_uapi::errors::Errno;
23use std::future::Future;
24use std::sync::Arc;
25use std::sync::mpsc::{SendError, SyncSender, TrySendError, sync_channel};
26use std::thread::JoinHandle;
27
28type BoxedClosure = Box<dyn FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> () + Send + 'static>;
29
30const DEFAULT_THREAD_ROLE: &str = "fuchsia.starnix.fair.16";
31
32/// A builder for configuring new tasks to spawn.
33///
34/// The builder allows us to set up several different flavors of possible
35/// tasks to spawn, with a menu of options as follows:
36///
37/// - A task may or may not have a name.
38/// - A task may or may not have an assigned scheduler role.
39/// - A task may be a sync, or an async closure.
40/// - A task may return a result, or may not return a result.
41/// - The task's result can be collected synchronously, asynchronously, or not collected at all.
42///
43/// Note that these parameters are not perfectly orthogonal. For example, a task spawned from an
44/// async closure can not return a value asynchronously. (This is not a limitation of the approach,
45/// rather it's the API that we explicitly use today.) Also, some parameter combinations do not
46/// make sense, for example a spawn request can not have both a sync and an async closure to run.
47///
48/// The builder API is designed in a way that only allows chaining the configuration options which
49/// are valid at that point in the configuration process. Invalid option combinations are
50/// compile-time(!) errors. It will only allow creating a [SpawnRequest] if enough parameters have
51/// been passed such that there is enough information to create a request. It will not allow
52/// passing conflicting parameters: for example, if you already passed one synchronous closure, it
53/// is impossible to pass another closure and have the code compile without errors.
54///
55/// ## Usage
56///
57/// Call [SpawnRequestBuilder::new()] to start building. Refer to the unit tests in this module for
58/// usage examples.
59pub struct SpawnRequestBuilder<C: ClosureKind> {
60    debug_name: &'static str,
61    role: Option<&'static str>,
62    closure_kind: C,
63}
64
65/// You can only create an empty request builder.
66impl SpawnRequestBuilder<ClosureNone> {
67    /// Creates a new spawn request builder.
68    pub fn new() -> Self {
69        Self { role: None, closure_kind: ClosureNone {}, debug_name: "kthreadd" }
70    }
71}
72
73/// You can call these at any point in the builder's lifecycle.
74impl<C: ClosureKind> SpawnRequestBuilder<C> {
75    /// Set a role to apply to the thread that will run your closure.
76    pub fn with_role(self, role: &'static str) -> Self {
77        Self { role: Some(role), ..self }
78    }
79
80    /// Set a task name to apply to the thread that will run your closure.
81    pub fn with_debug_name(self, debug_name: &'static str) -> Self {
82        Self { debug_name, ..self }
83    }
84}
85
86/// You can call these only if you have not provided a closure yet.
87impl SpawnRequestBuilder<ClosureNone> {
88    /// Provides the closure that the spawner will run.
89    pub fn with_sync_closure<F, T>(
90        self,
91        f: F,
92    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
93    where
94        T: Send + 'static,
95        F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
96    {
97        let SpawnRequestBuilder { role, closure_kind: _, debug_name } = self;
98        SpawnRequestBuilder { role, closure_kind: f, debug_name }
99    }
100
101    /// Provides the closure that the spawner will run.
102    pub fn with_async_closure<F, T>(
103        self,
104        f: F,
105    ) -> SpawnRequestBuilder<impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
106    where
107        T: Send + 'static,
108        F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
109    {
110        let sync_fn = async_to_sync(f, self.debug_name);
111        self.with_sync_closure(sync_fn)
112    }
113}
114
115/// A fully configured spawn request.
116pub struct SpawnRequest {
117    /// The closure to run.
118    closure: BoxedClosure,
119    /// A name to give to the task.
120    debug_name: &'static str,
121}
122
123impl<T, F> SpawnRequestBuilder<F>
124where
125    T: Send + 'static,
126    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static,
127{
128    /// Build a spawn request.
129    pub fn build(self) -> SpawnRequest {
130        let Self { role, closure_kind, debug_name } = self;
131        let closure = closure_kind;
132        let closure = maybe_apply_role(role, closure);
133        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
134            trace_duration!(CATEGORY_STARNIX, debug_name);
135            let _ = closure(locked, current_task);
136        });
137        SpawnRequest { closure, debug_name }
138    }
139
140    /// Like [build], but allows receiving a result synchronously.
141    /// Do not forget to submit the spawn request to a spawner.
142    ///
143    /// Example:
144    ///
145    /// ```
146    /// let (result_fn, request) = /*...*/ .build_with_sync_result();
147    /// // spawn `request`
148    /// let result = result_fn();
149    /// ```
150    pub fn build_with_sync_result(self) -> (impl FnOnce() -> Result<T, Errno>, SpawnRequest) {
151        let Self { role, closure_kind, debug_name } = self;
152        let closure = closure_kind;
153        let (sender, receiver) = sync_channel::<T>(0);
154        let result_fn = move || {
155            receiver.recv().map_err(|err| errno!(EINTR, format!("while receiving: {err:?}")))
156        };
157        let closure = maybe_apply_role(role, closure);
158        let closure = Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
159            trace_duration!(CATEGORY_STARNIX, debug_name);
160            let _ = sender.send(closure(locked, current_task));
161        });
162        (result_fn, SpawnRequest { closure, debug_name })
163    }
164
165    /// Like [build], but allows receiving a result as a future.
166    /// Do not forget to submit the spawn request to a spawner.
167    ///
168    /// Example:
169    ///
170    /// ```
171    /// let (result_fut, request) = /*...*/ .build_with_async_result();
172    /// // spawn `request`
173    /// let result = result_fut.await;
174    /// ```
175    pub fn build_with_async_result(self) -> (impl Future<Output = Result<T, Errno>>, SpawnRequest) {
176        let Self { role, closure_kind, debug_name } = self;
177        let closure = closure_kind;
178        let (sender_async, result_fut) = oneshot::channel::<T>();
179        let maybe_with_role = maybe_apply_role(role, closure);
180        let repackaged =
181            Box::new(move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
182                trace_duration!(CATEGORY_STARNIX, debug_name);
183                let result = maybe_with_role(locked, current_task);
184                let _ = sender_async.send(result);
185            });
186        let result_fut =
187            result_fut.map_err(|err| errno!(EINTR, format!("while receiving async: {err:?}")));
188        (result_fut, SpawnRequest { closure: repackaged, debug_name })
189    }
190}
191
192/// A thread pool that immediately execute any new work sent to it and keep a maximum number of
193/// idle threads.
194#[derive(Debug)]
195pub struct DynamicThreadSpawner {
196    state: Arc<Mutex<DynamicThreadSpawnerState>>,
197    /// The weak system task to create the kernel thread associated with each thread.
198    system_task: WeakRef<Task>,
199    /// A persistent thread that is used to create new thread. This ensures that threads are
200    /// created from the initial starnix process and are not tied to a specific task.
201    persistent_thread: RunningThread,
202}
203
204/// Wrap a closure with a thread role assignment, if one is available.
205fn maybe_apply_role<R, F>(
206    role: Option<&'static str>,
207    f: F,
208) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static
209where
210    F: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> R + Send + 'static,
211{
212    move |locked, current_task| {
213        if let Some(role) = role {
214            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(role) {
215                log_debug!(e:%; "failed to set kthread role");
216            }
217            let result = f(locked, current_task);
218            if let Err(e) = fuchsia_scheduler::set_role_for_this_thread(DEFAULT_THREAD_ROLE) {
219                log_debug!(e:%; "failed to reset kthread role to default priority");
220            }
221            result
222        } else {
223            f(locked, current_task)
224        }
225    }
226}
227
228/// Convert async closure to sync closure that can be submitted to the spawner.
229fn async_to_sync<T, F>(
230    f: F,
231    name: &'static str,
232) -> impl FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static
233where
234    T: Send + 'static,
235    F: AsyncFnOnce(LockedAndTask<'_>) -> T + Send + 'static,
236{
237    move |locked, current_task| {
238        let mut exec = fuchsia_async::LocalExecutor::default();
239        let locked_and_task = LockedAndTask::new(locked, current_task);
240
241        let locked_and_task_clone = locked_and_task.clone();
242        let wrapped_future =
243            WrappedSpawnedFuture::new(locked_and_task, f(locked_and_task_clone), name);
244        exec.run_singlethreaded(wrapped_future)
245    }
246}
247
248/// Denotes whether a closure has been provided. A request can not be
249/// built at all without a closure.
250///
251/// See [SpawnRequestBuilder] for usage details.
252pub trait ClosureKind {}
253
254/// A builder type state where no closure has been provided yet.
255///
256/// See [SpawnRequestBuilder] for usage details.
257pub struct ClosureNone {}
258impl ClosureKind for ClosureNone {}
259
260/// A builder type state where a closure has been provided.
261/// See [SpawnRequestBuilder] for usage details.
262impl<T: Send + 'static, FN: FnOnce(&mut Locked<Unlocked>, &CurrentTask) -> T + Send + 'static>
263    ClosureKind for FN
264{
265}
266
267#[derive(Debug)]
268struct DynamicThreadSpawnerState {
269    threads: Vec<RunningThread>,
270    idle_threads: u8,
271    max_idle_threads: u8,
272}
273
274impl DynamicThreadSpawner {
275    pub fn new(
276        max_idle_threads: u8,
277        system_task: WeakRef<Task>,
278        debug_name: impl Into<String>,
279    ) -> Self {
280        let persistent_thread =
281            RunningThread::new_persistent(system_task.clone(), debug_name.into());
282        Self {
283            state: Arc::new(Mutex::new(DynamicThreadSpawnerState {
284                max_idle_threads,
285                idle_threads: 0,
286                threads: vec![],
287            })),
288            system_task,
289            persistent_thread,
290        }
291    }
292
293    /// Run a given closure on a thread based on the provided [SpawnRequest].
294    ///
295    /// Use [SpawnRequestBuilder::new()] to start configuring a [SpawnRequest].
296    ///
297    /// This method will use an idle thread in the pool if one is available, otherwise it will
298    /// start a new thread. When this method returns, it is guaranteed that a thread is
299    /// responsible to start running the closure.
300    pub fn spawn_from_request(&self, spawn_request: SpawnRequest) {
301        // Check whether a thread already exists to handle the request.
302        let mut function: BoxedClosure = spawn_request.closure;
303        let mut state = self.state.lock();
304        if state.idle_threads > 0 {
305            let mut i = 0;
306            while i < state.threads.len() {
307                // Increases `i` immediately, so that it can be decreased it the thread must be
308                // dropped.
309                let thread_index = i;
310                i += 1;
311                match state.threads[thread_index].try_dispatch(function) {
312                    Ok(_) => {
313                        // The dispatch succeeded.
314                        state.idle_threads -= 1;
315                        return;
316                    }
317                    Err(TrySendError::Full(f)) => {
318                        // The thread is busy.
319                        function = f;
320                    }
321                    Err(TrySendError::Disconnected(f)) => {
322                        // The receiver is disconnected, it means the thread has terminated, drop it.
323                        state.idle_threads -= 1;
324                        state.threads.remove(thread_index);
325                        i -= 1;
326                        function = f;
327                    }
328                }
329            }
330        }
331
332        // A new thread must be created. It needs to be done from the persistent thread.
333        let (sender, receiver) = sync_channel::<RunningThread>(0);
334        let dispatch_function: BoxedClosure = Box::new({
335            let state = self.state.clone();
336            let system_task = self.system_task.clone();
337            move |_, _| {
338                sender
339                    .send(RunningThread::new(
340                        state,
341                        system_task,
342                        spawn_request.debug_name.to_string(),
343                        function,
344                    ))
345                    .expect("receiver must not be dropped");
346            }
347        });
348        self.persistent_thread
349            .dispatch(dispatch_function)
350            .expect("persistent thread should not have ended.");
351        state.threads.push(receiver.recv().expect("persistent thread should not have ended."));
352    }
353}
354
355type WrappedSpawnedFuture<'a, F> = WrappedFuture<F, LockedAndTask<'a>>;
356
357impl<'a, F: 'a> WrappedSpawnedFuture<'a, F> {
358    fn new(locked_and_task: LockedAndTask<'a>, fut: F, name: &'static str) -> Self {
359        Self::new_with_cleaner(locked_and_task, trigger_delayed_releaser, fut, name)
360    }
361}
362
363fn trigger_delayed_releaser(locked_and_task: LockedAndTask<'_>) {
364    locked_and_task.current_task().trigger_delayed_releaser(&mut locked_and_task.unlocked());
365}
366
367#[derive(Debug)]
368struct RunningThread {
369    thread: Option<JoinHandle<()>>,
370    sender: Option<SyncSender<BoxedClosure>>,
371}
372
373impl RunningThread {
374    fn new(
375        state: Arc<Mutex<DynamicThreadSpawnerState>>,
376        system_task: WeakRef<Task>,
377        debug_task_name: String,
378        f: BoxedClosure,
379    ) -> Self {
380        let (sender, receiver) = sync_channel::<BoxedClosure>(0);
381        let thread = Some(
382            std::thread::Builder::new()
383                .name("kthread-dynamic-worker".to_string())
384                .spawn(move || {
385                    // It's ok to create a new lock context here, since we are on a new thread.
386                    #[allow(
387                        clippy::undocumented_unsafe_blocks,
388                        reason = "Force documented unsafe blocks in Starnix"
389                    )]
390                    let locked = unsafe { Unlocked::new() };
391                    let result = with_new_current_task(
392                        locked,
393                        &system_task,
394                        debug_task_name,
395                        |locked, current_task| {
396                            while let Ok(f) = receiver.recv() {
397                                f(locked, &current_task);
398                                // Apply any delayed releasers.
399                                current_task.trigger_delayed_releaser(locked);
400                                let mut state = state.lock();
401                                state.idle_threads += 1;
402                                if state.idle_threads > state.max_idle_threads {
403                                    // If the number of idle thread is greater than the max, the
404                                    // thread terminates.  This disconnects the receiver, which will
405                                    // ensure that the thread will be joined and remove from the list
406                                    // of available threads the next time the pool tries to use it.
407                                    return;
408                                }
409                            }
410                        },
411                    );
412                    if let Err(e) = result {
413                        log_error!("Unable to create a kernel thread: {e:?}");
414                    }
415                })
416                .expect("able to create threads"),
417        );
418        let result = Self { thread, sender: Some(sender) };
419        // The dispatch cannot fail because the thread can only finish after having executed at
420        // least one task, and this is the first task ever dispatched to it.
421        result
422            .sender
423            .as_ref()
424            .expect("sender should never be None")
425            .send(f)
426            .expect("Dispatch cannot fail");
427        result
428    }
429
430    fn new_persistent(system_task: WeakRef<Task>, task_name: String) -> Self {
431        // The persistent thread doesn't need to do any rendez-vous when received task.
432        let (sender, receiver) = sync_channel::<BoxedClosure>(20);
433        let thread = Some(
434            std::thread::Builder::new()
435                .name("kthread-persistent-worker".to_string())
436                .spawn(move || {
437                    // It's ok to create a new lock context here, since we are on a new thread.
438                    #[allow(
439                        clippy::undocumented_unsafe_blocks,
440                        reason = "Force documented unsafe blocks in Starnix"
441                    )]
442                    let locked = unsafe { Unlocked::new() };
443                    let current_task = {
444                        let Some(system_task) = system_task.upgrade() else {
445                            return;
446                        };
447                        match create_kernel_thread(
448                            locked,
449                            &system_task,
450                            TaskCommand::new(task_name.as_bytes()),
451                        ) {
452                            Ok(task) => task,
453                            Err(e) => {
454                                log_error!("Unable to create a kernel thread: {e:?}");
455                                return;
456                            }
457                        }
458                    };
459                    release_after!(current_task, locked, {
460                        while let Ok(f) = receiver.recv() {
461                            f(locked, &current_task);
462
463                            // Apply any delayed releasers.
464                            current_task.trigger_delayed_releaser(locked);
465                        }
466                    });
467
468                    // Ensure that no releasables are registered after this point as we unwind the stack.
469                    DelayedReleaser::finalize();
470                })
471                .expect("able to create threads"),
472        );
473        Self { thread, sender: Some(sender) }
474    }
475
476    fn try_dispatch(&self, f: BoxedClosure) -> Result<(), TrySendError<BoxedClosure>> {
477        self.sender.as_ref().expect("sender should never be None").try_send(f)
478    }
479
480    fn dispatch(&self, f: BoxedClosure) -> Result<(), SendError<BoxedClosure>> {
481        self.sender.as_ref().expect("sender should never be None").send(f)
482    }
483}
484
485impl Drop for RunningThread {
486    fn drop(&mut self) {
487        self.sender = None;
488        match self.thread.take() {
489            Some(thread) => thread.join().expect("Thread should join."),
490            _ => panic!("Thread should never be None"),
491        };
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::testing::spawn_kernel_and_run;
499
500    #[fuchsia::test]
501    async fn run_simple_task() {
502        spawn_kernel_and_run(async |_, current_task| {
503            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
504            // Type decorations are needed sometimes to avoid "closure type is
505            // not general enough" error.
506            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
507            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
508            spawner.spawn_from_request(req);
509        })
510        .await;
511    }
512
513    #[fuchsia::test]
514    async fn run_10_tasks() {
515        spawn_kernel_and_run(async |_, current_task| {
516            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
517            for _ in 0..10 {
518                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {};
519                let opts = SpawnRequestBuilder::new().with_sync_closure(closure).build();
520                spawner.spawn_from_request(opts);
521            }
522        })
523        .await;
524    }
525
526    #[fuchsia::test]
527    async fn blocking_task_do_not_prevent_further_processing() {
528        spawn_kernel_and_run(async |_, current_task| {
529            let spawner = DynamicThreadSpawner::new(1, current_task.weak_task(), "kthreadd");
530
531            let pair = Arc::new((fuchsia_sync::Mutex::new(false), fuchsia_sync::Condvar::new()));
532            for _ in 0..10 {
533                let pair2 = Arc::clone(&pair);
534                let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
535                    let (lock, cvar) = &*pair2;
536                    let mut cont = lock.lock();
537                    while !*cont {
538                        cvar.wait(&mut cont);
539                    }
540                };
541                let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
542                spawner.spawn_from_request(req);
543            }
544
545            let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
546                let (lock, cvar) = &*pair;
547                let mut cont = lock.lock();
548                *cont = true;
549                cvar.notify_all();
550            };
551
552            let (result, req) =
553                SpawnRequestBuilder::new().with_sync_closure(closure).build_with_sync_result();
554            spawner.spawn_from_request(req);
555
556            assert_eq!(result(), Ok(()));
557        })
558        .await;
559    }
560
561    #[fuchsia::test]
562    async fn run_spawn_and_get_result() {
563        spawn_kernel_and_run(async |_, current_task| {
564            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
565
566            let (result, req) =
567                SpawnRequestBuilder::new().with_sync_closure(|_, _| 3).build_with_sync_result();
568            spawner.spawn_from_request(req);
569            assert_eq!(result(), Ok(3));
570        })
571        .await;
572    }
573
574    #[fuchsia::test]
575    async fn test_spawn_async() {
576        spawn_kernel_and_run(async |_, current_task| {
577            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
578
579            // The closure free variables must be decorated with their respective types,
580            // the rust compiler gets confused otherwise and is unable to infer the correct
581            // lifetimes. Interestingly, adding your own lifetimes here does *not* help.
582            let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
583                let mut exec = fuchsia_async::LocalExecutor::default();
584                let locked_and_task = LockedAndTask::new(locked, current_task);
585                let fut = async {};
586                let wrapped_future = WrappedSpawnedFuture::new(locked_and_task, fut, "test-async");
587                exec.run_singlethreaded(wrapped_future);
588            };
589            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
590            spawner.spawn_from_request(req);
591        })
592        .await;
593    }
594
595    #[fuchsia::test]
596    async fn test_spawn_async_closure() {
597        spawn_kernel_and_run(async |_, current_task| {
598            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
599            let fut = async |_: LockedAndTask<'_>| 42;
600            let (result, req) =
601                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
602            spawner.spawn_from_request(req);
603            assert_eq!(result(), Ok(42));
604        })
605        .await;
606    }
607
608    #[fuchsia::test]
609    async fn test_spawn_sync_to_async_result() {
610        spawn_kernel_and_run(async |_, current_task| {
611            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
612            let fut = async |_: LockedAndTask<'_>| 42;
613            let (result, req) =
614                SpawnRequestBuilder::new().with_async_closure(fut).build_with_sync_result();
615
616            let fut2 = async move |_: LockedAndTask<'_>| result().unwrap();
617            let (result2, req2) =
618                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
619            spawner.spawn_from_request(req2);
620            spawner.spawn_from_request(req);
621            assert_eq!(result2(), Ok(42));
622        })
623        .await;
624    }
625
626    #[fuchsia::test]
627    async fn test_spawn_async_to_async_result() {
628        spawn_kernel_and_run(async |_, current_task| {
629            let spawner = DynamicThreadSpawner::new(2, current_task.weak_task(), "kthreadd");
630            let fut = async |_: LockedAndTask<'_>| 42;
631            let (result_fut, req) =
632                SpawnRequestBuilder::new().with_async_closure(fut).build_with_async_result();
633
634            let fut2 = async move |_: LockedAndTask<'_>| result_fut.await.unwrap();
635            let (result2, req2) =
636                SpawnRequestBuilder::new().with_async_closure(fut2).build_with_sync_result();
637            spawner.spawn_from_request(req2);
638            spawner.spawn_from_request(req);
639            assert_eq!(result2(), Ok(42));
640        })
641        .await;
642    }
643}