starnix_kernel_runner/
serve_protocols.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Container;
6use anyhow::{Context as _, Error};
7use fidl::endpoints::{ControlHandle, RequestStream, ServerEnd};
8use fuchsia_async::{
9    DurationExt, {self as fasync},
10};
11use futures::channel::oneshot;
12use futures::{
13    AsyncReadExt, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut,
14    select,
15};
16use starnix_core::execution::{create_init_child_process, execute_task_with_prerun_result};
17use starnix_core::fs::devpts::create_main_and_replica;
18use starnix_core::fs::fuchsia::create_fuchsia_pipe;
19use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
20use starnix_core::task::{
21    CurrentTask, ExitStatus, FullCredentials, Kernel, LockedAndTask, ProcessEntryRef,
22};
23use starnix_core::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
24use starnix_core::vfs::file_server::serve_file_at;
25use starnix_core::vfs::socket::VsockSocket;
26use starnix_core::vfs::{FdFlags, FileHandle};
27use starnix_logging::{log_error, log_warn};
28use starnix_modules_framebuffer::Framebuffer;
29use starnix_sync::{Locked, Unlocked};
30use starnix_task_command::TaskCommand;
31use starnix_uapi::errors::Errno;
32use starnix_uapi::open_flags::OpenFlags;
33use starnix_uapi::signals::UncheckedSignal;
34use starnix_uapi::{errno, error, uapi};
35use std::ffi::CString;
36use std::ops::DerefMut;
37use {
38    fidl_fuchsia_component_runner as frunner, fidl_fuchsia_element as felement,
39    fidl_fuchsia_io as fio, fidl_fuchsia_memory_attribution as fattribution,
40    fidl_fuchsia_posix as fposix, fidl_fuchsia_starnix_binder as fbinder,
41    fidl_fuchsia_starnix_container as fstarcontainer,
42};
43
44use super::start_component;
45
46pub fn expose_root(
47    locked: &mut Locked<Unlocked>,
48    system_task: &CurrentTask,
49    server_end: ServerEnd<fio::DirectoryMarker>,
50) -> Result<(), Error> {
51    let root_file = system_task.open_file(locked, "/".into(), OpenFlags::RDONLY)?;
52    serve_file_at(
53        server_end.into_channel().into(),
54        system_task,
55        &root_file,
56        FullCredentials::for_kernel(),
57    )?;
58    Ok(())
59}
60
61pub async fn serve_component_runner(
62    request_stream: frunner::ComponentRunnerRequestStream,
63    system_task: &CurrentTask,
64) -> Result<(), Error> {
65    request_stream
66        .try_for_each_concurrent(None, |event| async {
67            match event {
68                frunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
69                    if let Err(e) = start_component(start_info, controller, system_task).await {
70                        log_error!("failed to start component: {:?}", e);
71                    }
72                }
73                frunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
74                    log_warn!("Unknown ComponentRunner request: {ordinal}");
75                }
76            }
77            Ok(())
78        })
79        .await
80        .map_err(Error::from)
81}
82
83fn to_winsize(window_size: Option<fstarcontainer::ConsoleWindowSize>) -> uapi::winsize {
84    window_size
85        .map(|window_size| uapi::winsize {
86            ws_row: window_size.rows,
87            ws_col: window_size.cols,
88            ws_xpixel: window_size.x_pixels,
89            ws_ypixel: window_size.y_pixels,
90        })
91        .unwrap_or(uapi::winsize::default())
92}
93
94async fn spawn_console(
95    kernel: &Kernel,
96    payload: fstarcontainer::ControllerSpawnConsoleRequest,
97) -> Result<Result<u8, fstarcontainer::SpawnConsoleError>, Error> {
98    if let (Some(console_in), Some(console_out), Some(binary_path)) =
99        (payload.console_in, payload.console_out, payload.binary_path)
100    {
101        let binary_path = CString::new(binary_path)?;
102        let argv = payload
103            .argv
104            .unwrap_or(vec![])
105            .into_iter()
106            .map(CString::new)
107            .collect::<Result<Vec<_>, _>>()?;
108        let environ = payload
109            .environ
110            .unwrap_or(vec![])
111            .into_iter()
112            .map(CString::new)
113            .collect::<Result<Vec<_>, _>>()?;
114        let window_size = to_winsize(payload.window_size);
115        let current_task = create_init_child_process(
116            kernel.kthreads.unlocked_for_async().deref_mut(),
117            &kernel.weak_self.upgrade().expect("Kernel must still be alive"),
118            TaskCommand::new(binary_path.as_bytes()),
119            None,
120        )?;
121        let (sender, receiver) = oneshot::channel();
122        let pty = execute_task_with_prerun_result(
123            kernel.kthreads.unlocked_for_async().deref_mut(),
124            current_task,
125            move |locked, current_task| {
126                let executable = current_task.open_file(
127                    locked,
128                    binary_path.as_bytes().into(),
129                    OpenFlags::RDONLY,
130                )?;
131                current_task.exec(locked, executable, binary_path, argv, environ)?;
132                let (pty, pts) = create_main_and_replica(locked, &current_task, window_size)?;
133                let fd_flags = FdFlags::empty();
134                assert_eq!(0, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
135                assert_eq!(1, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
136                assert_eq!(2, current_task.add_file(locked, pts, fd_flags)?.raw());
137                Ok(pty)
138            },
139            move |result| {
140                let _ = match result {
141                    Ok(ExitStatus::Exit(exit_code)) => sender.send(Ok(exit_code)),
142                    _ => sender.send(Err(fstarcontainer::SpawnConsoleError::Canceled)),
143                };
144            },
145            None,
146        )?;
147        let _ = forward_to_pty(kernel, console_in, console_out, pty).map_err(|e| {
148            log_error!("failed to forward to terminal {:?}", e);
149        });
150
151        Ok(receiver.await?)
152    } else {
153        Ok(Err(fstarcontainer::SpawnConsoleError::InvalidArgs))
154    }
155}
156
157pub async fn serve_container_controller(
158    request_stream: fstarcontainer::ControllerRequestStream,
159    system_task: &CurrentTask,
160) -> Result<(), Error> {
161    request_stream
162        .map_err(Error::from)
163        .try_for_each_concurrent(None, |event| async {
164            match event {
165                fstarcontainer::ControllerRequest::VsockConnect {
166                    payload:
167                        fstarcontainer::ControllerVsockConnectRequest { port, bridge_socket, .. },
168                    ..
169                } => {
170                    let Some(port) = port else {
171                        log_warn!("vsock connection missing port");
172                        return Ok(());
173                    };
174                    let Some(bridge_socket) = bridge_socket else {
175                        log_warn!("vsock connection missing bridge_socket");
176                        return Ok(());
177                    };
178                    connect_to_vsock(port, bridge_socket, system_task).await.unwrap_or_else(|e| {
179                        log_error!("failed to connect to vsock {:?}", e);
180                    });
181                }
182                fstarcontainer::ControllerRequest::SpawnConsole { payload, responder } => {
183                    responder.send(spawn_console(system_task.kernel(), payload).await?)?;
184                }
185                fstarcontainer::ControllerRequest::GetVmoReferences { payload, responder } => {
186                    if let Some(koid) = payload.koid {
187                        let thread_groups = system_task
188                            .kernel()
189                            .pids
190                            .read()
191                            .get_thread_groups()
192                            .collect::<Vec<_>>();
193                        let mut results = vec![];
194                        for thread_group in thread_groups {
195                            if let Some(leader) =
196                                system_task.get_task(thread_group.leader).upgrade()
197                            {
198                                let fds = leader.files.get_all_fds();
199                                for fd in fds {
200                                    if let Ok(file) = leader.files.get(fd) {
201                                        if let Ok(memory) = file.get_memory(
202                                            system_task
203                                                .kernel()
204                                                .kthreads
205                                                .unlocked_for_async()
206                                                .deref_mut(),
207                                            system_task,
208                                            None,
209                                            starnix_core::mm::ProtectionFlags::READ,
210                                        ) {
211                                            let memory_koid = memory
212                                                .info()
213                                                .expect("Failed to get memory info")
214                                                .koid;
215                                            if memory_koid.raw_koid() == koid {
216                                                let process_name = thread_group
217                                                    .process
218                                                    .get_name()
219                                                    .unwrap_or_default();
220                                                results.push(fstarcontainer::VmoReference {
221                                                    process_name: Some(process_name.to_string()),
222                                                    pid: Some(leader.get_pid() as u64),
223                                                    fd: Some(fd.raw()),
224                                                    koid: Some(koid),
225                                                    ..Default::default()
226                                                });
227                                            }
228                                        }
229                                    }
230                                }
231                            }
232                        }
233                        let _ =
234                            responder.send(&fstarcontainer::ControllerGetVmoReferencesResponse {
235                                references: Some(results),
236                                ..Default::default()
237                            });
238                    }
239                }
240                fstarcontainer::ControllerRequest::GetJobHandle { responder } => {
241                    let _result = responder.send(fstarcontainer::ControllerGetJobHandleResponse {
242                        job: Some(
243                            fuchsia_runtime::job_default()
244                                .duplicate(zx::Rights::SAME_RIGHTS)
245                                .expect("Failed to dup handle"),
246                        ),
247                        ..Default::default()
248                    });
249                }
250                fstarcontainer::ControllerRequest::SendSignal {
251                    payload:
252                        fstarcontainer::ControllerSendSignalRequest {
253                            pid: Some(pid),
254                            signal: Some(signal),
255                            ..
256                        },
257                    responder,
258                } => {
259                    let pids = system_task.kernel().pids.read();
260                    if let Some(ProcessEntryRef::Process(target_thread_group)) =
261                        pids.get_process(pid)
262                    {
263                        #[allow(
264                            clippy::undocumented_unsafe_blocks,
265                            reason = "Force documented unsafe blocks in Starnix"
266                        )]
267                        match unsafe {
268                            target_thread_group.send_signal_unchecked_debug(
269                                system_task,
270                                UncheckedSignal::new(signal),
271                            )
272                        } {
273                            Ok(_) => {
274                                let _result = responder.send(Ok(()));
275                            }
276                            Err(_) => {
277                                let _result =
278                                    responder.send(Err(fstarcontainer::SignalError::InvalidSignal));
279                            }
280                        };
281                    } else {
282                        let _result =
283                            responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
284                    };
285                }
286                // The request did not contain both a signal and a target pid.
287                fstarcontainer::ControllerRequest::SendSignal { responder, .. } => {
288                    log_error!("malformed SendSignal request");
289                    let _result = responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
290                }
291                fstarcontainer::ControllerRequest::SetSyscallLogFilter { payload, responder } => {
292                    if let Some(process_name) = payload.process_name {
293                        system_task.kernel().add_syscall_log_filter(&process_name);
294                        let _ = responder.send(Ok(()));
295                    } else {
296                        let _ = responder.send(Err(
297                            fstarcontainer::SetSyscallLogFilterError::MissingProcessName,
298                        ));
299                    }
300                }
301                fstarcontainer::ControllerRequest::ClearSyscallLogFilters { responder } => {
302                    system_task.kernel().clear_syscall_log_filters();
303                    let _ = responder.send();
304                }
305                fstarcontainer::ControllerRequest::_UnknownMethod { .. } => (),
306            }
307            Ok(())
308        })
309        .await
310}
311
312async fn connect_to_vsock(
313    port: u32,
314    bridge_socket: fidl::Socket,
315    system_task: &CurrentTask,
316) -> Result<(), Error> {
317    let socket = loop {
318        if let Ok(socket) = system_task.kernel().default_abstract_vsock_namespace.lookup(&port) {
319            break socket;
320        };
321        fasync::Timer::new(fasync::MonotonicDuration::from_millis(100).after_now()).await;
322    };
323
324    let pipe = create_fuchsia_pipe(
325        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
326        system_task,
327        bridge_socket,
328        OpenFlags::RDWR | OpenFlags::NONBLOCK,
329    )?;
330    socket.downcast_socket::<VsockSocket>().unwrap().remote_connection(
331        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
332        &socket,
333        system_task,
334        pipe,
335    )?;
336
337    Ok(())
338}
339
340fn forward_to_pty(
341    kernel: &Kernel,
342    console_in: fidl::Socket,
343    console_out: fidl::Socket,
344    pty: FileHandle,
345) -> Result<(), Error> {
346    // Matches fuchsia.io.Transfer capacity, somewhat arbitrarily.
347    const BUFFER_CAPACITY: usize = 8192;
348
349    let mut rx = fuchsia_async::Socket::from_socket(console_in);
350    let mut tx = fuchsia_async::Socket::from_socket(console_out);
351    let pty_sink = pty.clone();
352    let closure = async move |locked_and_task: LockedAndTask<'_>| {
353        let _result: Result<(), Error> = (async || {
354            let mut buffer = vec![0u8; BUFFER_CAPACITY];
355            loop {
356                let bytes = rx.read(&mut buffer[..]).await?;
357                if bytes == 0 {
358                    return Ok(());
359                }
360                pty_sink.write(
361                    &mut locked_and_task.unlocked(),
362                    locked_and_task.current_task(),
363                    &mut VecInputBuffer::new(&buffer[..bytes]),
364                )?;
365            }
366        })()
367        .await;
368    };
369    let req = SpawnRequestBuilder::new()
370        .with_debug_name("forward-to-pty-in")
371        .with_async_closure(closure)
372        .build();
373    kernel.kthreads.spawner().spawn_from_request(req);
374
375    let pty_source = pty;
376    let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
377        let _result: Result<(), Error> =
378            fasync::LocalExecutor::default().run_singlethreaded(async {
379                let mut buffer = VecOutputBuffer::new(BUFFER_CAPACITY);
380                loop {
381                    buffer.reset();
382                    let bytes = pty_source.read(locked, current_task, &mut buffer)?;
383                    if bytes == 0 {
384                        return Ok(());
385                    }
386                    tx.write_all(buffer.data()).await?;
387                }
388            });
389    };
390    let req = SpawnRequestBuilder::new()
391        .with_debug_name("forward-to-pty-out")
392        .with_sync_closure(closure)
393        .build();
394    kernel.kthreads.spawner().spawn_from_request(req);
395
396    Ok(())
397}
398
399pub async fn serve_graphical_presenter(
400    mut request_stream: felement::GraphicalPresenterRequestStream,
401    kernel: &Kernel,
402) -> Result<(), Error> {
403    while let Some(request) = request_stream.next().await {
404        match request.context("reading graphical presenter request")? {
405            felement::GraphicalPresenterRequest::PresentView {
406                view_spec,
407                annotation_controller: _,
408                view_controller_request: _,
409                responder,
410            } => match view_spec.viewport_creation_token {
411                Some(token) => {
412                    let fb = Framebuffer::get(kernel).context("getting framebuffer from kernel")?;
413                    fb.present_view(token);
414                    let _ = responder.send(Ok(()));
415                }
416                None => {
417                    let _ = responder.send(Err(felement::PresentViewError::InvalidArgs));
418                }
419            },
420        }
421    }
422    Ok(())
423}
424
425/// Serves the memory attribution provider for the Kernel ELF component.
426pub fn serve_memory_attribution_provider_elfkernel(
427    mut request_stream: fattribution::ProviderRequestStream,
428    container: &Container,
429) -> impl Future<Output = Result<(), Error>> {
430    let observer = container.new_memory_attribution_observer(request_stream.control_handle());
431    async move {
432        while let Some(event) = request_stream.try_next().await? {
433            match event {
434                fattribution::ProviderRequest::Get { responder } => {
435                    observer.next(responder);
436                }
437                fattribution::ProviderRequest::_UnknownMethod {
438                    ordinal, control_handle, ..
439                } => {
440                    log_error!("Invalid request to AttributionProvider: {ordinal}");
441                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
442                }
443            }
444        }
445        Ok(())
446    }
447}
448
449/// Serves the memory attribution provider for the Container component.
450pub fn serve_memory_attribution_provider_container(
451    mut request_stream: fattribution::ProviderRequestStream,
452    kernel: &Kernel,
453) -> impl Future<Output = ()> + use<> {
454    let observer = kernel.new_memory_attribution_observer(request_stream.control_handle());
455    async move {
456        while let Some(event) = request_stream
457            .try_next()
458            .await
459            .inspect_err(|err| {
460                log_warn!("Error while serving container memory attribution: {:?}", err)
461            })
462            .ok()
463            .flatten()
464        {
465            match event {
466                fattribution::ProviderRequest::Get { responder } => {
467                    observer.next(responder);
468                }
469                fattribution::ProviderRequest::_UnknownMethod {
470                    ordinal, control_handle, ..
471                } => {
472                    log_error!("Invalid request to AttributionProvider: {ordinal}");
473                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
474                }
475            }
476        }
477    }
478}
479
480async fn select_first<O>(f1: impl Future<Output = O>, f2: impl Future<Output = O>) -> O {
481    let f1 = f1.fuse();
482    let f2 = f2.fuse();
483    pin_mut!(f1, f2);
484    select! {
485        f1 = f1 => f1,
486        f2 = f2 => f2,
487    }
488}
489
490/// Serve the LutexController protocol.
491pub async fn serve_lutex_controller(
492    request_stream: fbinder::LutexControllerRequestStream,
493    current_task: &CurrentTask,
494) -> Result<(), Error> {
495    let kernel = current_task.kernel();
496    request_stream
497        .map_err(Error::from)
498        .try_for_each_concurrent(None, |event| async move {
499            match event {
500                fbinder::LutexControllerRequest::WaitBitset { payload, responder } => {
501                    let deadline_and_receiver = (|| {
502                        let mut unlocked = kernel.kthreads.unlocked_for_async();
503                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
504                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
505                        let value = payload.value.ok_or_else(|| errno!(EINVAL))?;
506                        let mask = payload.mask.unwrap_or(u32::MAX);
507                        let deadline = payload.deadline.map(zx::MonotonicInstant::from_nanos);
508                        kernel
509                            .shared_futexes
510                            .external_wait(&mut unlocked, vmo.into(), offset, value, mask)
511                            .map(|receiver| (deadline, receiver))
512                    })();
513                    let result = match deadline_and_receiver {
514                        Ok((deadline, receiver)) => {
515                            let receiver = receiver.map_err(|_| errno!(EINTR));
516                            if let Some(deadline) = deadline {
517                                let timer = fasync::Timer::new(deadline).map(|_| error!(ETIMEDOUT));
518                                select_first(timer, receiver).await
519                            } else {
520                                receiver.await
521                            }
522                        }
523                        Err(e) => Err(e),
524                    };
525                    let result = result.map_err(|e: Errno| {
526                        fposix::Errno::from_primitive(e.code.error_code() as i32)
527                            .unwrap_or(fposix::Errno::Einval)
528                    });
529                    responder
530                        .send(result)
531                        .context("Unable to send LutexControllerRequest::WaitBitset response")?;
532                }
533                fbinder::LutexControllerRequest::WakeBitset { payload, responder } => {
534                    let result = (|| {
535                        let mut unlocked = kernel.kthreads.unlocked_for_async();
536                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
537                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
538                        let count = payload.count.ok_or_else(|| errno!(EINVAL))?;
539                        let mask = payload.mask.unwrap_or(u32::MAX);
540                        kernel.shared_futexes.external_wake(
541                            &mut unlocked,
542                            vmo.into(),
543                            offset,
544                            count as usize,
545                            mask,
546                        )
547                    })();
548                    let result = result
549                        .map(|count| fbinder::WakeResponse {
550                            count: Some(count as u64),
551                            ..fbinder::WakeResponse::default()
552                        })
553                        .map_err(|e: Errno| {
554                            fposix::Errno::from_primitive(e.code.error_code() as i32)
555                                .unwrap_or(fposix::Errno::Einval)
556                        });
557                    responder
558                        .send(result)
559                        .context("Unable to send LutexControllerRequest::WakeBitset response")?;
560                }
561                fbinder::LutexControllerRequest::_UnknownMethod { ordinal, .. } => {
562                    log_warn!("Unknown LutexController ordinal: {}", ordinal);
563                }
564            }
565            Ok(())
566        })
567        .await
568        .context("failed fbinder::LutexController request")
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use assert_matches::assert_matches;
575    use fidl::HandleBased;
576    use starnix_core::testing::*;
577    use {fidl_fuchsia_posix as fposix, zx};
578
579    #[fuchsia::test]
580    async fn lutex_controller_test() {
581        spawn_kernel_and_run(async |mut _locked, current_task| {
582            let (sender, receiver) = oneshot::channel::<()>();
583            current_task.kernel.kthreads.spawn_future(
584                {
585                    let kernel = current_task.kernel.clone();
586                    move || async move {
587                        let (lutex_controller, stream) = fidl::endpoints::create_proxy_and_stream::<
588                            fbinder::LutexControllerMarker,
589                        >();
590
591                        // Spawn the server
592                        let server_fut =
593                            serve_lutex_controller(stream, kernel.kthreads.system_task());
594
595                        let client_fut = async move {
596                            const VMO_SIZE: usize = 4 * 1024;
597                            let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("Vmo::create");
598                            // Wait on an incorrect value.
599                            let wait = lutex_controller
600                                .wait_bitset(fbinder::WaitBitsetRequest {
601                                    vmo: Some(
602                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
603                                            .expect("duplicate vmo"),
604                                    ),
605                                    offset: Some(0),
606                                    value: Some(1),
607                                    ..Default::default()
608                                })
609                                .await
610                                .expect("got_answer");
611                            assert_matches!(wait, Err(fposix::Errno::Eagain));
612
613                            // Wait with a timeout
614                            let wait = lutex_controller
615                                .wait_bitset(fbinder::WaitBitsetRequest {
616                                    vmo: Some(
617                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
618                                            .expect("duplicate vmo"),
619                                    ),
620                                    offset: Some(0),
621                                    value: Some(0),
622                                    deadline: Some(0),
623                                    ..Default::default()
624                                })
625                                .await
626                                .expect("got_answer");
627                            assert_matches!(wait, Err(fposix::Errno::Etimedout));
628
629                            let mut wait = Box::pin(
630                                lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
631                                    vmo: Some(
632                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
633                                            .expect("duplicate vmo"),
634                                    ),
635                                    offset: Some(0),
636                                    value: Some(0),
637                                    deadline: None,
638                                    ..Default::default()
639                                }),
640                            );
641                            // The wait is correct, the future should stay pending until a wake.
642                            assert!(futures::poll!(&mut wait).is_pending());
643
644                            let waken_up: fbinder::WakeResponse = lutex_controller
645                                .wake_bitset(fbinder::WakeBitsetRequest {
646                                    vmo: Some(
647                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
648                                            .expect("duplicate vmo"),
649                                    ),
650                                    offset: Some(0),
651                                    count: Some(1),
652                                    mask: None,
653                                    ..Default::default()
654                                })
655                                .await
656                                .expect("wake_answer")
657                                .expect("wake_response");
658                            assert_eq!(waken_up.count, Some(1));
659
660                            // The wait should now return.
661                            assert!(wait.await.expect("await_answer").is_ok());
662                        };
663
664                        let (server_res, _) = futures::join!(server_fut, client_fut);
665                        server_res.expect("server failed");
666                        let _ = sender.send(());
667                    }
668                },
669                "lutex_controller_test",
670            );
671            receiver.await.expect("test failed");
672        })
673        .await;
674    }
675}