Skip to main content

starnix_kernel_runner/
serve_protocols.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Container;
6use anyhow::{Context as _, Error};
7use fidl::endpoints::{ControlHandle, RequestStream, ServerEnd};
8use fuchsia_async::{
9    DurationExt, {self as fasync},
10};
11use futures::channel::oneshot;
12use futures::{
13    AsyncReadExt, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut,
14    select,
15};
16use starnix_core::execution::{create_init_child_process, execute_task_with_prerun_result};
17use starnix_core::fs::devpts::create_main_and_replica;
18use starnix_core::fs::fuchsia::create_fuchsia_pipe;
19use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
20use starnix_core::task::{CurrentTask, ExitStatus, Kernel, LockedAndTask, ProcessEntryRef};
21use starnix_core::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
22use starnix_core::vfs::file_server::serve_file_at;
23use starnix_core::vfs::socket::VsockSocket;
24use starnix_core::vfs::{FdFlags, FileHandle};
25use starnix_logging::{log_error, log_warn};
26use starnix_modules_framebuffer::Framebuffer;
27use starnix_sync::{Locked, Unlocked};
28use starnix_task_command::TaskCommand;
29use starnix_uapi::auth::Credentials;
30use starnix_uapi::errors::Errno;
31use starnix_uapi::open_flags::OpenFlags;
32use starnix_uapi::signals::UncheckedSignal;
33use starnix_uapi::{errno, error, uapi};
34use std::ffi::CString;
35use std::ops::DerefMut;
36use {
37    fidl_fuchsia_component_runner as frunner, fidl_fuchsia_element as felement,
38    fidl_fuchsia_io as fio, fidl_fuchsia_memory_attribution as fattribution,
39    fidl_fuchsia_posix as fposix, fidl_fuchsia_starnix_binder as fbinder,
40    fidl_fuchsia_starnix_container as fstarcontainer,
41};
42
43use super::start_component;
44
45pub fn expose_root(
46    locked: &mut Locked<Unlocked>,
47    system_task: &CurrentTask,
48    server_end: ServerEnd<fio::DirectoryMarker>,
49) -> Result<(), Error> {
50    let root_file = system_task.open_file(locked, "/".into(), OpenFlags::RDONLY)?;
51    serve_file_at(server_end.into_channel().into(), system_task, &root_file, Credentials::root())?;
52    Ok(())
53}
54
55pub async fn serve_component_runner(
56    request_stream: frunner::ComponentRunnerRequestStream,
57    system_task: &CurrentTask,
58) -> Result<(), Error> {
59    request_stream
60        .try_for_each_concurrent(None, |event| async {
61            match event {
62                frunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
63                    if let Err(e) = start_component(start_info, controller, system_task).await {
64                        log_error!("failed to start component: {:?}", e);
65                    }
66                }
67                frunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
68                    log_warn!("Unknown ComponentRunner request: {ordinal}");
69                }
70            }
71            Ok(())
72        })
73        .await
74        .map_err(Error::from)
75}
76
77fn to_winsize(window_size: Option<fstarcontainer::ConsoleWindowSize>) -> uapi::winsize {
78    window_size
79        .map(|window_size| uapi::winsize {
80            ws_row: window_size.rows,
81            ws_col: window_size.cols,
82            ws_xpixel: window_size.x_pixels,
83            ws_ypixel: window_size.y_pixels,
84        })
85        .unwrap_or(uapi::winsize::default())
86}
87
88async fn spawn_console(
89    kernel: &Kernel,
90    payload: fstarcontainer::ControllerSpawnConsoleRequest,
91) -> Result<Result<u8, fstarcontainer::SpawnConsoleError>, Error> {
92    if let (Some(console_in), Some(console_out), Some(binary_path)) =
93        (payload.console_in, payload.console_out, payload.binary_path)
94    {
95        let binary_path = CString::new(binary_path)?;
96        let argv = payload
97            .argv
98            .unwrap_or(vec![])
99            .into_iter()
100            .map(CString::new)
101            .collect::<Result<Vec<_>, _>>()?;
102        let environ = payload
103            .environ
104            .unwrap_or(vec![])
105            .into_iter()
106            .map(CString::new)
107            .collect::<Result<Vec<_>, _>>()?;
108        let window_size = to_winsize(payload.window_size);
109        let current_task = create_init_child_process(
110            kernel.kthreads.unlocked_for_async().deref_mut(),
111            &kernel.weak_self.upgrade().expect("Kernel must still be alive"),
112            TaskCommand::new(binary_path.as_bytes()),
113            Credentials::with_ids(0, 0),
114            None,
115        )?;
116        let (sender, receiver) = oneshot::channel();
117        let pty = execute_task_with_prerun_result(
118            kernel.kthreads.unlocked_for_async().deref_mut(),
119            current_task,
120            move |locked, current_task| {
121                let executable = current_task.open_file(
122                    locked,
123                    binary_path.as_bytes().into(),
124                    OpenFlags::RDONLY,
125                )?;
126                current_task.exec(locked, executable, binary_path, argv, environ)?;
127                let (pty, pts) = create_main_and_replica(locked, &current_task, window_size)?;
128                let fd_flags = FdFlags::empty();
129                assert_eq!(0, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
130                assert_eq!(1, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
131                assert_eq!(2, current_task.add_file(locked, pts, fd_flags)?.raw());
132                Ok(pty)
133            },
134            move |result| {
135                let _ = match result {
136                    Ok(ExitStatus::Exit(exit_code)) => sender.send(Ok(exit_code)),
137                    _ => sender.send(Err(fstarcontainer::SpawnConsoleError::Canceled)),
138                };
139            },
140            None,
141        )?;
142        let _ = forward_to_pty(kernel, console_in, console_out, pty).map_err(|e| {
143            log_error!("failed to forward to terminal {:?}", e);
144        });
145
146        Ok(receiver.await?)
147    } else {
148        Ok(Err(fstarcontainer::SpawnConsoleError::InvalidArgs))
149    }
150}
151
152pub async fn serve_container_controller(
153    request_stream: fstarcontainer::ControllerRequestStream,
154    system_task: &CurrentTask,
155) -> Result<(), Error> {
156    request_stream
157        .map_err(Error::from)
158        .try_for_each_concurrent(None, |event| async {
159            match event {
160                fstarcontainer::ControllerRequest::VsockConnect {
161                    payload:
162                        fstarcontainer::ControllerVsockConnectRequest { port, bridge_socket, .. },
163                    ..
164                } => {
165                    let Some(port) = port else {
166                        log_warn!("vsock connection missing port");
167                        return Ok(());
168                    };
169                    let Some(bridge_socket) = bridge_socket else {
170                        log_warn!("vsock connection missing bridge_socket");
171                        return Ok(());
172                    };
173                    connect_to_vsock(port, bridge_socket, system_task).await.unwrap_or_else(|e| {
174                        log_error!("failed to connect to vsock {:?}", e);
175                    });
176                }
177                fstarcontainer::ControllerRequest::SpawnConsole { payload, responder } => {
178                    responder.send(spawn_console(system_task.kernel(), payload).await?)?;
179                }
180                fstarcontainer::ControllerRequest::GetVmoReferences { payload, responder } => {
181                    if let Some(koid) = payload.koid {
182                        let thread_groups = system_task
183                            .kernel()
184                            .pids
185                            .read()
186                            .get_thread_groups()
187                            .collect::<Vec<_>>();
188                        let mut results = vec![];
189                        for thread_group in thread_groups {
190                            if let Some(leader) =
191                                system_task.get_task(thread_group.leader).upgrade()
192                            {
193                                if let Ok(live) = leader.live() {
194                                    let files = &live.files;
195                                    let fds = files.get_all_fds();
196                                    for fd in fds {
197                                        if let Ok(file) = files.get(fd) {
198                                            if let Ok(memory) = file.get_memory(
199                                                system_task
200                                                    .kernel()
201                                                    .kthreads
202                                                    .unlocked_for_async()
203                                                    .deref_mut(),
204                                                system_task,
205                                                None,
206                                                starnix_core::mm::ProtectionFlags::READ,
207                                            ) {
208                                                let memory_koid = memory
209                                                    .info()
210                                                    .expect("Failed to get memory info")
211                                                    .koid;
212                                                if memory_koid.raw_koid() == koid {
213                                                    let process_name = thread_group
214                                                        .process
215                                                        .get_name()
216                                                        .unwrap_or_default();
217                                                    results.push(fstarcontainer::VmoReference {
218                                                        process_name: Some(
219                                                            process_name.to_string(),
220                                                        ),
221                                                        pid: Some(leader.get_pid() as u64),
222                                                        fd: Some(fd.raw()),
223                                                        koid: Some(koid),
224                                                        ..Default::default()
225                                                    });
226                                                }
227                                            }
228                                        }
229                                    }
230                                }
231                            }
232                        }
233                        let _ =
234                            responder.send(&fstarcontainer::ControllerGetVmoReferencesResponse {
235                                references: Some(results),
236                                ..Default::default()
237                            });
238                    }
239                }
240                fstarcontainer::ControllerRequest::GetJobHandle { responder } => {
241                    let _result = responder.send(fstarcontainer::ControllerGetJobHandleResponse {
242                        job: Some(
243                            fuchsia_runtime::job_default()
244                                .duplicate(zx::Rights::SAME_RIGHTS)
245                                .expect("Failed to dup handle"),
246                        ),
247                        ..Default::default()
248                    });
249                }
250                fstarcontainer::ControllerRequest::SendSignal {
251                    payload:
252                        fstarcontainer::ControllerSendSignalRequest {
253                            pid: Some(pid),
254                            signal: Some(signal),
255                            ..
256                        },
257                    responder,
258                } => {
259                    let pids = system_task.kernel().pids.read();
260                    if let Some(ProcessEntryRef::Process(target_thread_group)) =
261                        pids.get_process(pid)
262                    {
263                        #[allow(
264                            clippy::undocumented_unsafe_blocks,
265                            reason = "Force documented unsafe blocks in Starnix"
266                        )]
267                        match unsafe {
268                            target_thread_group.send_signal_unchecked_debug(
269                                system_task,
270                                UncheckedSignal::new(signal),
271                            )
272                        } {
273                            Ok(_) => {
274                                let _result = responder.send(Ok(()));
275                            }
276                            Err(_) => {
277                                let _result =
278                                    responder.send(Err(fstarcontainer::SignalError::InvalidSignal));
279                            }
280                        };
281                    } else {
282                        let _result =
283                            responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
284                    };
285                }
286                // The request did not contain both a signal and a target pid.
287                fstarcontainer::ControllerRequest::SendSignal { responder, .. } => {
288                    log_error!("malformed SendSignal request");
289                    let _result = responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
290                }
291                fstarcontainer::ControllerRequest::SetSyscallLogFilter { payload, responder } => {
292                    if let Some(process_name) = payload.process_name {
293                        system_task.kernel().add_syscall_log_filter(&process_name);
294                        let _ = responder.send(Ok(()));
295                    } else {
296                        let _ = responder.send(Err(
297                            fstarcontainer::SetSyscallLogFilterError::MissingProcessName,
298                        ));
299                    }
300                }
301                fstarcontainer::ControllerRequest::ClearSyscallLogFilters { responder } => {
302                    system_task.kernel().clear_syscall_log_filters();
303                    let _ = responder.send();
304                }
305                fstarcontainer::ControllerRequest::_UnknownMethod { .. } => (),
306            }
307            Ok(())
308        })
309        .await
310}
311
312async fn connect_to_vsock(
313    port: u32,
314    bridge_socket: fidl::Socket,
315    system_task: &CurrentTask,
316) -> Result<(), Error> {
317    let socket = loop {
318        if let Ok(socket) = system_task.kernel().default_abstract_vsock_namespace.lookup(&port) {
319            break socket;
320        };
321        fasync::Timer::new(fasync::MonotonicDuration::from_millis(100).after_now()).await;
322    };
323
324    let pipe = create_fuchsia_pipe(
325        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
326        system_task,
327        bridge_socket,
328        OpenFlags::RDWR | OpenFlags::NONBLOCK,
329    )?;
330    socket.downcast_socket::<VsockSocket>().unwrap().remote_connection(
331        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
332        &socket,
333        system_task,
334        pipe,
335    )?;
336
337    Ok(())
338}
339
340fn forward_to_pty(
341    kernel: &Kernel,
342    console_in: fidl::Socket,
343    console_out: fidl::Socket,
344    pty: FileHandle,
345) -> Result<(), Error> {
346    // Matches fuchsia.io.Transfer capacity, somewhat arbitrarily.
347    const BUFFER_CAPACITY: usize = 8192;
348
349    let mut rx = fuchsia_async::Socket::from_socket(console_in);
350    let mut tx = fuchsia_async::Socket::from_socket(console_out);
351    let pty_sink = pty.clone();
352    let closure = async move |locked_and_task: LockedAndTask<'_>| {
353        let _result: Result<(), Error> = (async || {
354            let mut buffer = vec![0u8; BUFFER_CAPACITY];
355            loop {
356                let bytes = rx.read(&mut buffer[..]).await?;
357                if bytes == 0 {
358                    return Ok(());
359                }
360                pty_sink.write(
361                    &mut locked_and_task.unlocked(),
362                    locked_and_task.current_task(),
363                    &mut VecInputBuffer::new(&buffer[..bytes]),
364                )?;
365            }
366        })()
367        .await;
368    };
369    let req = SpawnRequestBuilder::new()
370        .with_debug_name("forward-to-pty-in")
371        .with_async_closure(closure)
372        .build();
373    kernel.kthreads.spawner().spawn_from_request(req);
374
375    let pty_source = pty;
376    let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
377        let _result: Result<(), Error> =
378            fasync::LocalExecutor::default().run_singlethreaded(async {
379                let mut buffer = VecOutputBuffer::new(BUFFER_CAPACITY);
380                loop {
381                    buffer.reset();
382                    let bytes = pty_source.read(locked, current_task, &mut buffer)?;
383                    if bytes == 0 {
384                        return Ok(());
385                    }
386                    tx.write_all(buffer.data()).await?;
387                }
388            });
389    };
390    let req = SpawnRequestBuilder::new()
391        .with_debug_name("forward-to-pty-out")
392        .with_sync_closure(closure)
393        .build();
394    kernel.kthreads.spawner().spawn_from_request(req);
395
396    Ok(())
397}
398
399pub async fn serve_graphical_presenter(
400    mut request_stream: felement::GraphicalPresenterRequestStream,
401    kernel: &Kernel,
402) -> Result<(), Error> {
403    while let Some(request) = request_stream.next().await {
404        match request.context("reading graphical presenter request")? {
405            felement::GraphicalPresenterRequest::PresentView {
406                view_spec,
407                annotation_controller: _,
408                view_controller_request: _,
409                responder,
410            } => match view_spec.viewport_creation_token {
411                Some(token) => {
412                    let fb = Framebuffer::get(kernel).context("getting framebuffer from kernel")?;
413                    fb.present_view(token);
414                    let _ = responder.send(Ok(()));
415                }
416                None => {
417                    let _ = responder.send(Err(felement::PresentViewError::InvalidArgs));
418                }
419            },
420        }
421    }
422    Ok(())
423}
424
425/// Serves the memory attribution provider for the Kernel ELF component.
426pub fn serve_memory_attribution_provider_elfkernel(
427    mut request_stream: fattribution::ProviderRequestStream,
428    container: &Container,
429) -> impl Future<Output = Result<(), Error>> {
430    let observer = container.new_memory_attribution_observer(request_stream.control_handle());
431    async move {
432        while let Some(event) = request_stream.try_next().await? {
433            match event {
434                fattribution::ProviderRequest::Get { responder } => {
435                    observer.next(responder);
436                }
437                fattribution::ProviderRequest::_UnknownMethod {
438                    ordinal, control_handle, ..
439                } => {
440                    log_error!("Invalid request to AttributionProvider: {ordinal}");
441                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
442                }
443            }
444        }
445        Ok(())
446    }
447}
448
449/// Serves the memory attribution provider for the Container component.
450pub fn serve_memory_attribution_provider_container(
451    mut request_stream: fattribution::ProviderRequestStream,
452    kernel: &Kernel,
453) -> impl Future<Output = ()> + use<> {
454    let observer = kernel.new_memory_attribution_observer(request_stream.control_handle());
455    async move {
456        while let Some(event) = request_stream
457            .try_next()
458            .await
459            .inspect_err(|err| {
460                log_warn!("Error while serving container memory attribution: {:?}", err)
461            })
462            .ok()
463            .flatten()
464        {
465            match event {
466                fattribution::ProviderRequest::Get { responder } => {
467                    observer.next(responder);
468                }
469                fattribution::ProviderRequest::_UnknownMethod {
470                    ordinal, control_handle, ..
471                } => {
472                    log_error!("Invalid request to AttributionProvider: {ordinal}");
473                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
474                }
475            }
476        }
477    }
478}
479
480async fn select_first<O>(f1: impl Future<Output = O>, f2: impl Future<Output = O>) -> O {
481    let f1 = f1.fuse();
482    let f2 = f2.fuse();
483    pin_mut!(f1, f2);
484    select! {
485        f1 = f1 => f1,
486        f2 = f2 => f2,
487    }
488}
489
490/// Serve the LutexController protocol.
491pub async fn serve_lutex_controller(
492    request_stream: fbinder::LutexControllerRequestStream,
493    current_task: &CurrentTask,
494) -> Result<(), Error> {
495    let kernel = current_task.kernel();
496    request_stream
497        .map_err(Error::from)
498        .try_for_each_concurrent(None, |event| async move {
499            match event {
500                fbinder::LutexControllerRequest::WaitBitset { payload, responder } => {
501                    let deadline_and_receiver = (|| {
502                        let mut unlocked = kernel.kthreads.unlocked_for_async();
503                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
504                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
505                        let value = payload.value.ok_or_else(|| errno!(EINVAL))?;
506                        let mask = payload.mask.unwrap_or(u32::MAX);
507                        let deadline = payload.deadline.map(zx::MonotonicInstant::from_nanos);
508                        kernel
509                            .shared_futexes
510                            .external_wait(&mut unlocked, vmo.into(), offset, value, mask)
511                            .map(|(event, receiver)| (deadline, event, receiver))
512                    })();
513                    let result = match deadline_and_receiver {
514                        Ok((deadline, event, receiver)) => {
515                            // We construct a specific `wait_fut` to explicitly bind the lifecycle
516                            // of the `event` to the duration of this wait operation. If the FIDL
517                            // client disconnects (or the future is otherwise dropped/cancelled),
518                            // the `event` is immediately dropped. This zeros the strong reference
519                            // count on the `InterruptibleEvent`, signaling to the `FutexTable`
520                            // that this external waiter is now stale and should be garbage
521                            // collected on its next cleanup pass.
522                            let wait_fut = async move {
523                                let _event = event;
524                                let receiver = receiver.map_err(|_| errno!(EINTR));
525                                if let Some(deadline) = deadline {
526                                    let timer =
527                                        fasync::Timer::new(deadline).map(|_| error!(ETIMEDOUT));
528                                    select_first(timer, receiver).await
529                                } else {
530                                    receiver.await
531                                }
532                            };
533                            wait_fut.await
534                        }
535                        Err(e) => Err(e),
536                    };
537                    let result = result.map_err(|e: Errno| {
538                        fposix::Errno::from_primitive(e.code.error_code() as i32)
539                            .unwrap_or(fposix::Errno::Einval)
540                    });
541                    responder
542                        .send(result)
543                        .context("Unable to send LutexControllerRequest::WaitBitset response")?;
544                }
545                fbinder::LutexControllerRequest::WakeBitset { payload, responder } => {
546                    let result = (|| {
547                        let mut unlocked = kernel.kthreads.unlocked_for_async();
548                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
549                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
550                        let count = payload.count.ok_or_else(|| errno!(EINVAL))?;
551                        let mask = payload.mask.unwrap_or(u32::MAX);
552                        kernel.shared_futexes.external_wake(
553                            &mut unlocked,
554                            vmo.into(),
555                            offset,
556                            count as usize,
557                            mask,
558                        )
559                    })();
560                    let result = result
561                        .map(|count| fbinder::WakeResponse {
562                            count: Some(count as u64),
563                            ..fbinder::WakeResponse::default()
564                        })
565                        .map_err(|e: Errno| {
566                            fposix::Errno::from_primitive(e.code.error_code() as i32)
567                                .unwrap_or(fposix::Errno::Einval)
568                        });
569                    responder
570                        .send(result)
571                        .context("Unable to send LutexControllerRequest::WakeBitset response")?;
572                }
573                fbinder::LutexControllerRequest::_UnknownMethod { ordinal, .. } => {
574                    log_warn!("Unknown LutexController ordinal: {}", ordinal);
575                }
576            }
577            Ok(())
578        })
579        .await
580        .context("failed fbinder::LutexController request")
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use assert_matches::assert_matches;
587    use fidl::HandleBased;
588    use starnix_core::testing::*;
589    use {fidl_fuchsia_posix as fposix, zx};
590
591    #[fuchsia::test]
592    async fn lutex_controller_test() {
593        spawn_kernel_and_run(async |mut _locked, current_task| {
594            let (sender, receiver) = oneshot::channel::<()>();
595            current_task.kernel.kthreads.spawn_future(
596                {
597                    let kernel = current_task.kernel.clone();
598                    move || async move {
599                        let (lutex_controller, stream) = fidl::endpoints::create_proxy_and_stream::<
600                            fbinder::LutexControllerMarker,
601                        >();
602
603                        // Spawn the server
604                        let server_fut =
605                            serve_lutex_controller(stream, kernel.kthreads.system_task());
606
607                        let client_fut = async move {
608                            const VMO_SIZE: usize = 4 * 1024;
609                            let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("Vmo::create");
610                            // Wait on an incorrect value.
611                            let wait = lutex_controller
612                                .wait_bitset(fbinder::WaitBitsetRequest {
613                                    vmo: Some(
614                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
615                                            .expect("duplicate vmo"),
616                                    ),
617                                    offset: Some(0),
618                                    value: Some(1),
619                                    ..Default::default()
620                                })
621                                .await
622                                .expect("got_answer");
623                            assert_matches!(wait, Err(fposix::Errno::Eagain));
624
625                            // Wait with a timeout
626                            let wait = lutex_controller
627                                .wait_bitset(fbinder::WaitBitsetRequest {
628                                    vmo: Some(
629                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
630                                            .expect("duplicate vmo"),
631                                    ),
632                                    offset: Some(0),
633                                    value: Some(0),
634                                    deadline: Some(0),
635                                    ..Default::default()
636                                })
637                                .await
638                                .expect("got_answer");
639                            assert_matches!(wait, Err(fposix::Errno::Etimedout));
640
641                            let mut wait = Box::pin(
642                                lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
643                                    vmo: Some(
644                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
645                                            .expect("duplicate vmo"),
646                                    ),
647                                    offset: Some(0),
648                                    value: Some(0),
649                                    deadline: None,
650                                    ..Default::default()
651                                }),
652                            );
653                            // The wait is correct, the future should stay pending until a wake.
654                            assert!(futures::poll!(&mut wait).is_pending());
655
656                            let waken_up: fbinder::WakeResponse = lutex_controller
657                                .wake_bitset(fbinder::WakeBitsetRequest {
658                                    vmo: Some(
659                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
660                                            .expect("duplicate vmo"),
661                                    ),
662                                    offset: Some(0),
663                                    count: Some(1),
664                                    mask: None,
665                                    ..Default::default()
666                                })
667                                .await
668                                .expect("wake_answer")
669                                .expect("wake_response");
670                            assert_eq!(waken_up.count, Some(1));
671
672                            // The wait should now return.
673                            assert!(wait.await.expect("await_answer").is_ok());
674                        };
675
676                        let (server_res, _) = futures::join!(server_fut, client_fut);
677                        server_res.expect("server failed");
678                        let _ = sender.send(());
679                    }
680                },
681                "lutex_controller_test",
682            );
683            receiver.await.expect("test failed");
684        })
685        .await;
686    }
687}