Skip to main content

starnix_kernel_runner/
serve_protocols.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Container;
6use anyhow::{Context as _, Error};
7use fidl::endpoints::{ControlHandle, RequestStream, ServerEnd};
8use fidl_fuchsia_component_runner as frunner;
9use fidl_fuchsia_element as felement;
10use fidl_fuchsia_io as fio;
11use fidl_fuchsia_memory_attribution as fattribution;
12use fidl_fuchsia_posix as fposix;
13use fidl_fuchsia_starnix_binder as fbinder;
14use fidl_fuchsia_starnix_container as fstarcontainer;
15use fuchsia_async::{
16    DurationExt, {self as fasync},
17};
18use futures::channel::oneshot;
19use futures::{
20    AsyncReadExt, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut,
21    select,
22};
23use starnix_core::execution::{create_init_child_process, execute_task_with_prerun_result};
24use starnix_core::fs::devpts::create_main_and_replica;
25use starnix_core::fs::fuchsia::create_fuchsia_pipe;
26use starnix_core::task::dynamic_thread_spawner::SpawnRequestBuilder;
27use starnix_core::task::{CurrentTask, ExitStatus, Kernel, LockedAndTask, ProcessEntryRef};
28use starnix_core::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
29use starnix_core::vfs::file_server::serve_file_at;
30use starnix_core::vfs::socket::VsockSocket;
31use starnix_core::vfs::{FdFlags, FileHandle};
32use starnix_logging::{log_error, log_warn};
33use starnix_modules_framebuffer::Framebuffer;
34use starnix_sync::{Locked, Unlocked};
35use starnix_task_command::TaskCommand;
36use starnix_uapi::auth::Credentials;
37use starnix_uapi::errors::Errno;
38use starnix_uapi::open_flags::OpenFlags;
39use starnix_uapi::signals::UncheckedSignal;
40use starnix_uapi::{errno, error, uapi};
41use std::ffi::CString;
42use std::ops::DerefMut;
43
44use super::start_component;
45
46pub fn expose_root(
47    locked: &mut Locked<Unlocked>,
48    system_task: &CurrentTask,
49    server_end: ServerEnd<fio::DirectoryMarker>,
50) -> Result<(), Error> {
51    let root_file = system_task.open_file(locked, "/".into(), OpenFlags::RDONLY)?;
52    serve_file_at(server_end.into_channel().into(), system_task, &root_file, Credentials::root())?;
53    Ok(())
54}
55
56pub async fn serve_component_runner(
57    request_stream: frunner::ComponentRunnerRequestStream,
58    system_task: &CurrentTask,
59) -> Result<(), Error> {
60    request_stream
61        .try_for_each_concurrent(None, |event| async {
62            match event {
63                frunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
64                    if let Err(e) = start_component(start_info, controller, system_task).await {
65                        log_error!("failed to start component: {:?}", e);
66                    }
67                }
68                frunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
69                    log_warn!("Unknown ComponentRunner request: {ordinal}");
70                }
71            }
72            Ok(())
73        })
74        .await
75        .map_err(Error::from)
76}
77
78fn to_winsize(window_size: Option<fstarcontainer::ConsoleWindowSize>) -> uapi::winsize {
79    window_size
80        .map(|window_size| uapi::winsize {
81            ws_row: window_size.rows,
82            ws_col: window_size.cols,
83            ws_xpixel: window_size.x_pixels,
84            ws_ypixel: window_size.y_pixels,
85        })
86        .unwrap_or(uapi::winsize::default())
87}
88
89async fn spawn_console(
90    kernel: &Kernel,
91    payload: fstarcontainer::ControllerSpawnConsoleRequest,
92) -> Result<Result<u8, fstarcontainer::SpawnConsoleError>, Error> {
93    if let (Some(console_in), Some(console_out), Some(binary_path)) =
94        (payload.console_in, payload.console_out, payload.binary_path)
95    {
96        let binary_path = CString::new(binary_path)?;
97        let argv = payload
98            .argv
99            .unwrap_or(vec![])
100            .into_iter()
101            .map(CString::new)
102            .collect::<Result<Vec<_>, _>>()?;
103        let environ = payload
104            .environ
105            .unwrap_or(vec![])
106            .into_iter()
107            .map(CString::new)
108            .collect::<Result<Vec<_>, _>>()?;
109        let window_size = to_winsize(payload.window_size);
110        let current_task = create_init_child_process(
111            kernel.kthreads.unlocked_for_async().deref_mut(),
112            &kernel.weak_self.upgrade().expect("Kernel must still be alive"),
113            TaskCommand::new(binary_path.as_bytes()),
114            Credentials::with_ids(0, 0),
115            None,
116        )?;
117        let (sender, receiver) = oneshot::channel();
118        let pty = execute_task_with_prerun_result(
119            kernel.kthreads.unlocked_for_async().deref_mut(),
120            current_task,
121            move |locked, current_task| {
122                let executable = current_task.open_file(
123                    locked,
124                    binary_path.as_bytes().into(),
125                    OpenFlags::RDONLY,
126                )?;
127                current_task.exec(locked, executable, binary_path, argv, environ)?;
128                let (pty, pts) = create_main_and_replica(locked, &current_task, window_size)?;
129                let fd_flags = FdFlags::empty();
130                assert_eq!(0, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
131                assert_eq!(1, current_task.add_file(locked, pts.clone(), fd_flags)?.raw());
132                assert_eq!(2, current_task.add_file(locked, pts, fd_flags)?.raw());
133                Ok(pty)
134            },
135            move |result| {
136                let _ = match result {
137                    Ok(ExitStatus::Exit(exit_code)) => sender.send(Ok(exit_code)),
138                    _ => sender.send(Err(fstarcontainer::SpawnConsoleError::Canceled)),
139                };
140            },
141            None,
142        )?;
143        let _ = forward_to_pty(kernel, console_in, console_out, pty).map_err(|e| {
144            log_error!("failed to forward to terminal {:?}", e);
145        });
146
147        Ok(receiver.await?)
148    } else {
149        Ok(Err(fstarcontainer::SpawnConsoleError::InvalidArgs))
150    }
151}
152
153pub async fn serve_container_controller(
154    request_stream: fstarcontainer::ControllerRequestStream,
155    system_task: &CurrentTask,
156) -> Result<(), Error> {
157    request_stream
158        .map_err(Error::from)
159        .try_for_each_concurrent(None, |event| async {
160            match event {
161                fstarcontainer::ControllerRequest::VsockConnect {
162                    payload:
163                        fstarcontainer::ControllerVsockConnectRequest { port, bridge_socket, .. },
164                    ..
165                } => {
166                    let Some(port) = port else {
167                        log_warn!("vsock connection missing port");
168                        return Ok(());
169                    };
170                    let Some(bridge_socket) = bridge_socket else {
171                        log_warn!("vsock connection missing bridge_socket");
172                        return Ok(());
173                    };
174                    connect_to_vsock(port, bridge_socket, system_task).await.unwrap_or_else(|e| {
175                        log_error!("failed to connect to vsock {:?}", e);
176                    });
177                }
178                fstarcontainer::ControllerRequest::SpawnConsole { payload, responder } => {
179                    responder.send(spawn_console(system_task.kernel(), payload).await?)?;
180                }
181                fstarcontainer::ControllerRequest::GetVmoReferences { payload, responder } => {
182                    if let Some(koid) = payload.koid {
183                        let thread_groups = system_task.kernel().pids.read().get_thread_groups();
184                        let mut results = vec![];
185                        for thread_group in thread_groups {
186                            if let Ok(leader) = system_task.get_task(thread_group.leader) {
187                                if let Ok(running_state) = leader.running_state() {
188                                    let files = &running_state.files;
189                                    let fds = files.get_all_fds();
190                                    for fd in fds {
191                                        if let Ok(file) = files.get(fd) {
192                                            if let Ok(memory) = file.get_memory(
193                                                system_task
194                                                    .kernel()
195                                                    .kthreads
196                                                    .unlocked_for_async()
197                                                    .deref_mut(),
198                                                system_task,
199                                                None,
200                                                starnix_core::mm::ProtectionFlags::READ,
201                                            ) {
202                                                let memory_koid = memory
203                                                    .info()
204                                                    .expect("Failed to get memory info")
205                                                    .koid;
206                                                if memory_koid.raw_koid() == koid {
207                                                    let process_name = thread_group
208                                                        .process
209                                                        .get_name()
210                                                        .unwrap_or_default();
211                                                    results.push(fstarcontainer::VmoReference {
212                                                        process_name: Some(
213                                                            process_name.to_string(),
214                                                        ),
215                                                        pid: Some(leader.get_pid() as u64),
216                                                        fd: Some(fd.raw()),
217                                                        koid: Some(koid),
218                                                        ..Default::default()
219                                                    });
220                                                }
221                                            }
222                                        }
223                                    }
224                                }
225                            }
226                        }
227                        let _ =
228                            responder.send(&fstarcontainer::ControllerGetVmoReferencesResponse {
229                                references: Some(results),
230                                ..Default::default()
231                            });
232                    }
233                }
234                fstarcontainer::ControllerRequest::GetJobHandle { responder } => {
235                    let _result = responder.send(fstarcontainer::ControllerGetJobHandleResponse {
236                        job: Some(
237                            fuchsia_runtime::job_default()
238                                .duplicate_handle(zx::Rights::SAME_RIGHTS)
239                                .expect("Failed to dup handle"),
240                        ),
241                        ..Default::default()
242                    });
243                }
244                fstarcontainer::ControllerRequest::SendSignal {
245                    payload:
246                        fstarcontainer::ControllerSendSignalRequest {
247                            pid: Some(pid),
248                            signal: Some(signal),
249                            ..
250                        },
251                    responder,
252                } => {
253                    let pids = system_task.kernel().pids.read();
254                    if let Some(ProcessEntryRef::Process(target_thread_group)) =
255                        pids.get_process(pid)
256                    {
257                        #[allow(
258                            clippy::undocumented_unsafe_blocks,
259                            reason = "Force documented unsafe blocks in Starnix"
260                        )]
261                        match unsafe {
262                            target_thread_group.send_signal_unchecked_debug(
263                                system_task,
264                                UncheckedSignal::new(signal),
265                            )
266                        } {
267                            Ok(_) => {
268                                let _result = responder.send(Ok(()));
269                            }
270                            Err(_) => {
271                                let _result =
272                                    responder.send(Err(fstarcontainer::SignalError::InvalidSignal));
273                            }
274                        };
275                    } else {
276                        let _result =
277                            responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
278                    };
279                }
280                // The request did not contain both a signal and a target pid.
281                fstarcontainer::ControllerRequest::SendSignal { responder, .. } => {
282                    log_error!("malformed SendSignal request");
283                    let _result = responder.send(Err(fstarcontainer::SignalError::InvalidTarget));
284                }
285                fstarcontainer::ControllerRequest::SetSyscallLogFilter { payload, responder } => {
286                    if let Some(process_name) = payload.process_name {
287                        system_task.kernel().add_syscall_log_filter(&process_name);
288                        let _ = responder.send(Ok(()));
289                    } else {
290                        let _ = responder.send(Err(
291                            fstarcontainer::SetSyscallLogFilterError::MissingProcessName,
292                        ));
293                    }
294                }
295                fstarcontainer::ControllerRequest::ClearSyscallLogFilters { responder } => {
296                    system_task.kernel().clear_syscall_log_filters();
297                    let _ = responder.send();
298                }
299                fstarcontainer::ControllerRequest::_UnknownMethod { .. } => (),
300            }
301            Ok(())
302        })
303        .await
304}
305
306async fn connect_to_vsock(
307    port: u32,
308    bridge_socket: fidl::Socket,
309    system_task: &CurrentTask,
310) -> Result<(), Error> {
311    let socket = loop {
312        if let Ok(socket) = system_task.kernel().default_abstract_vsock_namespace.lookup(&port) {
313            break socket;
314        };
315        fasync::Timer::new(fasync::MonotonicDuration::from_millis(100).after_now()).await;
316    };
317
318    let pipe = create_fuchsia_pipe(
319        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
320        system_task,
321        bridge_socket,
322        OpenFlags::RDWR | OpenFlags::NONBLOCK,
323    )?;
324    socket.downcast_socket::<VsockSocket>().unwrap().remote_connection(
325        system_task.kernel().kthreads.unlocked_for_async().deref_mut(),
326        &socket,
327        system_task,
328        pipe,
329    )?;
330
331    Ok(())
332}
333
334fn forward_to_pty(
335    kernel: &Kernel,
336    console_in: fidl::Socket,
337    console_out: fidl::Socket,
338    pty: FileHandle,
339) -> Result<(), Error> {
340    // Matches fuchsia.io.Transfer capacity, somewhat arbitrarily.
341    const BUFFER_CAPACITY: usize = 8192;
342
343    let mut rx = fuchsia_async::Socket::from_socket(console_in);
344    let mut tx = fuchsia_async::Socket::from_socket(console_out);
345    let pty_sink = pty.clone();
346    let closure = async move |locked_and_task: LockedAndTask<'_>| {
347        let _result: Result<(), Error> = (async || {
348            let mut buffer = vec![0u8; BUFFER_CAPACITY];
349            loop {
350                let bytes = rx.read(&mut buffer[..]).await?;
351                if bytes == 0 {
352                    return Ok(());
353                }
354                pty_sink.write(
355                    &mut locked_and_task.unlocked(),
356                    locked_and_task.current_task(),
357                    &mut VecInputBuffer::new(&buffer[..bytes]),
358                )?;
359            }
360        })()
361        .await;
362    };
363    let req = SpawnRequestBuilder::new()
364        .with_debug_name("forward-to-pty-in")
365        .with_async_closure(closure)
366        .build();
367    kernel.kthreads.spawner().spawn_from_request(req);
368
369    let pty_source = pty;
370    let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
371        let _result: Result<(), Error> =
372            fasync::LocalExecutor::default().run_singlethreaded(async {
373                let mut buffer = VecOutputBuffer::new(BUFFER_CAPACITY);
374                loop {
375                    buffer.reset();
376                    let bytes = pty_source.read(locked, current_task, &mut buffer)?;
377                    if bytes == 0 {
378                        return Ok(());
379                    }
380                    tx.write_all(buffer.data()).await?;
381                }
382            });
383    };
384    let req = SpawnRequestBuilder::new()
385        .with_debug_name("forward-to-pty-out")
386        .with_sync_closure(closure)
387        .build();
388    kernel.kthreads.spawner().spawn_from_request(req);
389
390    Ok(())
391}
392
393pub async fn serve_graphical_presenter(
394    mut request_stream: felement::GraphicalPresenterRequestStream,
395    kernel: &Kernel,
396) -> Result<(), Error> {
397    while let Some(request) = request_stream.next().await {
398        match request.context("reading graphical presenter request")? {
399            felement::GraphicalPresenterRequest::PresentView {
400                view_spec,
401                annotation_controller: _,
402                view_controller_request: _,
403                responder,
404            } => match view_spec.viewport_creation_token {
405                Some(token) => {
406                    let fb = Framebuffer::get(kernel).context("getting framebuffer from kernel")?;
407                    fb.present_view(token);
408                    let _ = responder.send(Ok(()));
409                }
410                None => {
411                    let _ = responder.send(Err(felement::PresentViewError::InvalidArgs));
412                }
413            },
414        }
415    }
416    Ok(())
417}
418
419/// Serves the memory attribution provider for the Kernel ELF component.
420pub fn serve_memory_attribution_provider_elfkernel(
421    mut request_stream: fattribution::ProviderRequestStream,
422    container: &Container,
423) -> impl Future<Output = Result<(), Error>> {
424    let observer = container.new_memory_attribution_observer(request_stream.control_handle());
425    async move {
426        while let Some(event) = request_stream.try_next().await? {
427            match event {
428                fattribution::ProviderRequest::Get { responder } => {
429                    observer.next(responder);
430                }
431                fattribution::ProviderRequest::_UnknownMethod {
432                    ordinal, control_handle, ..
433                } => {
434                    log_error!("Invalid request to AttributionProvider: {ordinal}");
435                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
436                }
437            }
438        }
439        Ok(())
440    }
441}
442
443/// Serves the memory attribution provider for the Container component.
444pub fn serve_memory_attribution_provider_container(
445    mut request_stream: fattribution::ProviderRequestStream,
446    kernel: &Kernel,
447) -> impl Future<Output = ()> + use<> {
448    let observer = kernel.new_memory_attribution_observer(request_stream.control_handle());
449    async move {
450        while let Some(event) = request_stream
451            .try_next()
452            .await
453            .inspect_err(|err| {
454                log_warn!("Error while serving container memory attribution: {:?}", err)
455            })
456            .ok()
457            .flatten()
458        {
459            match event {
460                fattribution::ProviderRequest::Get { responder } => {
461                    observer.next(responder);
462                }
463                fattribution::ProviderRequest::_UnknownMethod {
464                    ordinal, control_handle, ..
465                } => {
466                    log_error!("Invalid request to AttributionProvider: {ordinal}");
467                    control_handle.shutdown_with_epitaph(zx::Status::INVALID_ARGS);
468                }
469            }
470        }
471    }
472}
473
474async fn select_first<O>(f1: impl Future<Output = O>, f2: impl Future<Output = O>) -> O {
475    let f1 = f1.fuse();
476    let f2 = f2.fuse();
477    pin_mut!(f1, f2);
478    select! {
479        f1 = f1 => f1,
480        f2 = f2 => f2,
481    }
482}
483
484/// Serve the LutexController protocol.
485pub async fn serve_lutex_controller(
486    request_stream: fbinder::LutexControllerRequestStream,
487    current_task: &CurrentTask,
488) -> Result<(), Error> {
489    let kernel = current_task.kernel();
490    request_stream
491        .map_err(Error::from)
492        .try_for_each_concurrent(None, |event| async move {
493            match event {
494                fbinder::LutexControllerRequest::WaitBitset { payload, responder } => {
495                    let deadline_and_receiver = (|| {
496                        let mut unlocked = kernel.kthreads.unlocked_for_async();
497                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
498                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
499                        let value = payload.value.ok_or_else(|| errno!(EINVAL))?;
500                        let mask = payload.mask.unwrap_or(u32::MAX);
501                        let deadline = payload.deadline.map(zx::MonotonicInstant::from_nanos);
502                        kernel
503                            .shared_futexes
504                            .external_wait(&mut unlocked, vmo.into(), offset, value, mask)
505                            .map(|(event, receiver)| (deadline, event, receiver))
506                    })();
507                    let result = match deadline_and_receiver {
508                        Ok((deadline, event, receiver)) => {
509                            // We construct a specific `wait_fut` to explicitly bind the lifecycle
510                            // of the `event` to the duration of this wait operation. If the FIDL
511                            // client disconnects (or the future is otherwise dropped/cancelled),
512                            // the `event` is immediately dropped. This zeros the strong reference
513                            // count on the `InterruptibleEvent`, signaling to the `FutexTable`
514                            // that this external waiter is now stale and should be garbage
515                            // collected on its next cleanup pass.
516                            let wait_fut = async move {
517                                let _event = event;
518                                let receiver = receiver.map_err(|_| errno!(EINTR));
519                                if let Some(deadline) = deadline {
520                                    let timer =
521                                        fasync::Timer::new(deadline).map(|_| error!(ETIMEDOUT));
522                                    select_first(timer, receiver).await
523                                } else {
524                                    receiver.await
525                                }
526                            };
527                            wait_fut.await
528                        }
529                        Err(e) => Err(e),
530                    };
531                    let result = result.map_err(|e: Errno| {
532                        fposix::Errno::from_primitive(e.code.error_code() as i32)
533                            .unwrap_or(fposix::Errno::Einval)
534                    });
535                    responder
536                        .send(result)
537                        .context("Unable to send LutexControllerRequest::WaitBitset response")?;
538                }
539                fbinder::LutexControllerRequest::WakeBitset { payload, responder } => {
540                    let result = (|| {
541                        let mut unlocked = kernel.kthreads.unlocked_for_async();
542                        let vmo = payload.vmo.ok_or_else(|| errno!(EINVAL))?;
543                        let offset = payload.offset.ok_or_else(|| errno!(EINVAL))?;
544                        let count = payload.count.ok_or_else(|| errno!(EINVAL))?;
545                        let mask = payload.mask.unwrap_or(u32::MAX);
546                        kernel.shared_futexes.external_wake(
547                            &mut unlocked,
548                            vmo.into(),
549                            offset,
550                            count as usize,
551                            mask,
552                        )
553                    })();
554                    let result = result
555                        .map(|count| fbinder::WakeResponse {
556                            count: Some(count as u64),
557                            ..fbinder::WakeResponse::default()
558                        })
559                        .map_err(|e: Errno| {
560                            fposix::Errno::from_primitive(e.code.error_code() as i32)
561                                .unwrap_or(fposix::Errno::Einval)
562                        });
563                    responder
564                        .send(result)
565                        .context("Unable to send LutexControllerRequest::WakeBitset response")?;
566                }
567                fbinder::LutexControllerRequest::CmpRequeue { payload, responder } => {
568                    let result = (|| {
569                        let mut unlocked = kernel.kthreads.unlocked_for_async();
570                        let first_vmo = payload.first_vmo.ok_or_else(|| errno!(EINVAL))?;
571                        let first_offset = payload.first_offset.ok_or_else(|| errno!(EINVAL))?;
572                        let second_vmo = payload.second_vmo;
573                        let second_offset = payload.second_offset.ok_or_else(|| errno!(EINVAL))?;
574                        let wake_count = payload.wake_count.ok_or_else(|| errno!(EINVAL))?;
575                        let requeue_count = payload.requeue_count.ok_or_else(|| errno!(EINVAL))?;
576                        let cmp_val = payload.cmp_val;
577                        kernel.shared_futexes.external_requeue(
578                            &mut unlocked,
579                            first_vmo.into(),
580                            first_offset,
581                            second_vmo.map(Into::into),
582                            second_offset,
583                            wake_count as usize,
584                            requeue_count as usize,
585                            cmp_val,
586                        )
587                    })();
588                    let result = result
589                        .map(|count| fbinder::CmpRequeueResponse {
590                            count: Some(count as u64),
591                            ..fbinder::CmpRequeueResponse::default()
592                        })
593                        .map_err(|e: Errno| {
594                            fposix::Errno::from_primitive(e.code.error_code() as i32)
595                                .unwrap_or(fposix::Errno::Einval)
596                        });
597                    responder
598                        .send(result)
599                        .context("Unable to send LutexControllerRequest::Requeue response")?;
600                }
601                fbinder::LutexControllerRequest::_UnknownMethod { ordinal, .. } => {
602                    log_warn!("Unknown LutexController ordinal: {}", ordinal);
603                }
604            }
605            Ok(())
606        })
607        .await
608        .context("failed fbinder::LutexController request")
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use assert_matches::assert_matches;
615    use fidl_fuchsia_posix as fposix;
616    use starnix_core::testing::*;
617    use zx;
618
619    #[fuchsia::test]
620    async fn lutex_controller_test() {
621        spawn_kernel_and_run(async |mut _locked, current_task| {
622            let (sender, receiver) = oneshot::channel::<()>();
623            current_task.kernel.kthreads.spawn_future(
624                {
625                    let kernel = current_task.kernel.clone();
626                    move || async move {
627                        let (lutex_controller, stream) = fidl::endpoints::create_proxy_and_stream::<
628                            fbinder::LutexControllerMarker,
629                        >();
630
631                        // Spawn the server
632                        let server_fut =
633                            serve_lutex_controller(stream, kernel.kthreads.system_task());
634
635                        let client_fut = async move {
636                            const VMO_SIZE: usize = 4 * 1024;
637                            let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("Vmo::create");
638                            // Wait on an incorrect value.
639                            let wait = lutex_controller
640                                .wait_bitset(fbinder::WaitBitsetRequest {
641                                    vmo: Some(
642                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
643                                            .expect("duplicate vmo"),
644                                    ),
645                                    offset: Some(0),
646                                    value: Some(1),
647                                    ..Default::default()
648                                })
649                                .await
650                                .expect("got_answer");
651                            assert_matches!(wait, Err(fposix::Errno::Eagain));
652
653                            // Wait with a timeout
654                            let wait = lutex_controller
655                                .wait_bitset(fbinder::WaitBitsetRequest {
656                                    vmo: Some(
657                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
658                                            .expect("duplicate vmo"),
659                                    ),
660                                    offset: Some(0),
661                                    value: Some(0),
662                                    deadline: Some(0),
663                                    ..Default::default()
664                                })
665                                .await
666                                .expect("got_answer");
667                            assert_matches!(wait, Err(fposix::Errno::Etimedout));
668
669                            let mut wait = Box::pin(
670                                lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
671                                    vmo: Some(
672                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
673                                            .expect("duplicate vmo"),
674                                    ),
675                                    offset: Some(0),
676                                    value: Some(0),
677                                    deadline: None,
678                                    ..Default::default()
679                                }),
680                            );
681                            // The wait is correct, the future should stay pending until a wake.
682                            assert!(futures::poll!(&mut wait).is_pending());
683
684                            let waken_up: fbinder::WakeResponse = lutex_controller
685                                .wake_bitset(fbinder::WakeBitsetRequest {
686                                    vmo: Some(
687                                        vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
688                                            .expect("duplicate vmo"),
689                                    ),
690                                    offset: Some(0),
691                                    count: Some(1),
692                                    mask: None,
693                                    ..Default::default()
694                                })
695                                .await
696                                .expect("wake_answer")
697                                .expect("wake_response");
698                            assert_eq!(waken_up.count, Some(1));
699
700                            // The wait should now return.
701                            assert!(wait.await.expect("await_answer").is_ok());
702
703                            // test cmp_requeue
704                            {
705                                let vmo2 = zx::Vmo::create(VMO_SIZE as u64).expect("Vmo::create");
706                                let mut wait1 = Box::pin(
707                                    lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
708                                        vmo: Some(
709                                            vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
710                                                .expect("duplicate vmo"),
711                                        ),
712                                        offset: Some(4),
713                                        value: Some(0),
714                                        deadline: None,
715                                        ..Default::default()
716                                    }),
717                                );
718                                let mut wait2 = Box::pin(
719                                    lutex_controller.wait_bitset(fbinder::WaitBitsetRequest {
720                                        vmo: Some(
721                                            vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
722                                                .expect("duplicate vmo"),
723                                        ),
724                                        offset: Some(4),
725                                        value: Some(0),
726                                        deadline: None,
727                                        ..Default::default()
728                                    }),
729                                );
730                                let cmp_requeue_response = lutex_controller
731                                    .cmp_requeue(fbinder::CmpRequeueRequest {
732                                        first_vmo: Some(
733                                            vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
734                                                .expect("duplicate vmo"),
735                                        ),
736                                        first_offset: Some(4),
737                                        second_vmo: Some(
738                                            vmo2.duplicate_handle(zx::Rights::SAME_RIGHTS)
739                                                .expect("duplicate vmo"),
740                                        ),
741                                        second_offset: Some(8),
742                                        wake_count: Some(1),
743                                        requeue_count: Some(1),
744                                        cmp_val: Some(0xBAD),
745                                        ..Default::default()
746                                    })
747                                    .await
748                                    .expect("got_answer")
749                                    .unwrap_err();
750                                // 0xBAD is not 0x0, so EAGAIN expected
751                                assert_matches!(cmp_requeue_response, fposix::Errno::Eagain);
752                                let cmp_requeue_response2: fbinder::CmpRequeueResponse =
753                                    lutex_controller
754                                        .cmp_requeue(fbinder::CmpRequeueRequest {
755                                            first_vmo: Some(
756                                                vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)
757                                                    .expect("duplicate vmo"),
758                                            ),
759                                            first_offset: Some(4),
760                                            second_vmo: Some(
761                                                vmo2.duplicate_handle(zx::Rights::SAME_RIGHTS)
762                                                    .expect("duplicate vmo"),
763                                            ),
764                                            second_offset: Some(8),
765                                            wake_count: Some(1),
766                                            requeue_count: Some(1),
767                                            cmp_val: Some(0x0),
768                                            ..Default::default()
769                                        })
770                                        .await
771                                        .expect("got_answer")
772                                        .expect("got_response");
773                                // 1 woken, 1 re-queued
774                                assert_matches!(cmp_requeue_response2.count, Some(2));
775                                // give wait1 and wait2 a chance to complete; we don't rely on this
776                                // being long enough however
777                                fasync::Timer::new(
778                                    fasync::MonotonicDuration::from_millis(100).after_now(),
779                                )
780                                .await;
781                                let mut still_pending_count: u32 = 0;
782                                // Up to one of wait1 and wait2 can be completed by this point, so at
783                                // least one must still be pending.
784                                let wait1_poll = futures::poll!(&mut wait1);
785                                if wait1_poll.is_pending() {
786                                    still_pending_count += 1;
787                                }
788                                let wait2_poll = futures::poll!(&mut wait2);
789                                if wait2_poll.is_pending() {
790                                    still_pending_count += 1;
791                                }
792                                assert!(still_pending_count >= 1);
793                                // at this point exactly one of wait1 or wait2 will remain pending until
794                                // we wake it via vmo2 - we don't know which of wait1 or wait2 we're
795                                // waking here (which is fine)
796                                let vmo2_wake_response: fbinder::WakeResponse = lutex_controller
797                                    .wake_bitset(fbinder::WakeBitsetRequest {
798                                        vmo: Some(
799                                            vmo2.duplicate_handle(zx::Rights::SAME_RIGHTS)
800                                                .expect("duplicate vmo"),
801                                        ),
802                                        offset: Some(8),
803                                        count: Some(1),
804                                        mask: None,
805                                        ..Default::default()
806                                    })
807                                    .await
808                                    .expect("wake_answer")
809                                    .expect("wake_response");
810                                assert_eq!(vmo2_wake_response.count, Some(1));
811                                // Now we know that both wait1 and wait2 are unblocked, so we can
812                                // wait on both of them.
813                                if wait1_poll.is_pending() {
814                                    assert!(wait1.await.expect("await_answer").is_ok());
815                                }
816                                if wait2_poll.is_pending() {
817                                    assert!(wait2.await.expect("await_answer").is_ok());
818                                }
819                            }
820                        };
821
822                        let (server_res, _) = futures::join!(server_fut, client_fut);
823                        server_res.expect("server failed");
824                        let _ = sender.send(());
825                    }
826                },
827                "lutex_controller_test",
828            );
829            receiver.await.expect("test failed");
830        })
831        .await;
832    }
833}