starnix_core/task/
waiter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use slab::Slab;
9use starnix_lifecycle::{AtomicU64Counter, AtomicUsizeCounter};
10use starnix_sync::{
11    EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
12    PortEvent, PortWaitResult,
13};
14use starnix_types::ownership::debug_assert_no_local_temp_ref;
15use starnix_uapi::error;
16use starnix_uapi::errors::{EINTR, Errno};
17use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
18use starnix_uapi::vfs::FdEvents;
19use std::collections::{HashMap, VecDeque};
20use std::sync::{Arc, Weak};
21use syncio::zxio::zxio_signals_t;
22use syncio::{ZxioSignals, ZxioWeak};
23
24#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
25pub enum ReadyItemKey {
26    FdNumber(FdNumber),
27    Usize(usize),
28}
29
30impl From<FdNumber> for ReadyItemKey {
31    fn from(v: FdNumber) -> Self {
32        Self::FdNumber(v)
33    }
34}
35
36impl From<usize> for ReadyItemKey {
37    fn from(v: usize) -> Self {
38        Self::Usize(v)
39    }
40}
41
42#[derive(Debug, Copy, Clone)]
43pub struct ReadyItem {
44    pub key: ReadyItemKey,
45    pub events: FdEvents,
46}
47
48#[derive(Clone)]
49pub enum EventHandler {
50    /// Does nothing.
51    ///
52    /// It is up to the waiter to synchronize itself with the notifier if
53    /// synchronization is needed.
54    None,
55
56    /// Enqueues an event to a ready list.
57    ///
58    /// This event handler naturally synchronizes the notifier and notifee
59    /// because of the lock acquired/released when enqueuing the event.
60    Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
61
62    /// Wraps another EventHandler and only triggers it once. Further .handle() calls are ignored.
63    ///
64    /// This is intended for cases like BinderFileObject which need to register
65    /// the same EventHandler on multiple wait queues.
66    HandleOnce(Arc<Mutex<Option<EventHandler>>>),
67
68    /// This handler is an epoll.
69    Epoll(EpollEventHandler),
70}
71
72impl EventHandler {
73    pub fn handle(self, events: FdEvents) {
74        match self {
75            Self::None => {}
76            Self::Enqueue { key, queue, sought_events } => {
77                let events = events & sought_events;
78                queue.lock().push_back(ReadyItem { key, events });
79            }
80            Self::HandleOnce(inner) => {
81                if let Some(inner) = inner.lock().take() {
82                    inner.handle(events);
83                }
84            }
85            Self::Epoll(e) => e.handle(events),
86        }
87    }
88}
89
90pub struct ZxioSignalHandler {
91    pub zxio: ZxioWeak,
92    pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
93}
94
95// The counter is incremented as each handle is signaled; when the counter reaches the handle
96// count, the event handler is called with the given events.
97pub struct ManyZxHandleSignalHandler {
98    pub count: usize,
99    pub counter: Arc<AtomicUsizeCounter>,
100    pub expected_signals: zx::Signals,
101    pub events: FdEvents,
102}
103
104pub enum SignalHandlerInner {
105    None,
106    Zxio(ZxioSignalHandler),
107    ZxHandle(fn(zx::Signals) -> FdEvents),
108    ManyZxHandle(ManyZxHandleSignalHandler),
109}
110
111pub struct SignalHandler {
112    pub inner: SignalHandlerInner,
113    pub event_handler: EventHandler,
114    pub err_code: Option<Errno>,
115}
116
117impl SignalHandler {
118    fn handle(self, signals: zx::Signals) -> Option<Errno> {
119        let SignalHandler { inner, event_handler, err_code } = self;
120        let events = match inner {
121            SignalHandlerInner::None => None,
122            SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
123                if let Some(zxio) = zxio.upgrade() {
124                    Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
125                } else {
126                    None
127                }
128            }
129            SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
130                Some(get_events_from_zx_signals(signals))
131            }
132            SignalHandlerInner::ManyZxHandle(signal_handler) => {
133                if signals.contains(signal_handler.expected_signals) {
134                    let new_count = signal_handler.counter.next() + 1;
135                    assert!(new_count <= signal_handler.count);
136                    if new_count == signal_handler.count {
137                        Some(signal_handler.events)
138                    } else {
139                        None
140                    }
141                } else {
142                    None
143                }
144            }
145        };
146        if let Some(events) = events {
147            event_handler.handle(events)
148        }
149        err_code
150    }
151}
152
153pub enum WaitCallback {
154    SignalHandler(SignalHandler),
155    EventHandler(EventHandler),
156}
157
158struct WaitCancelerQueue {
159    wait_queue: Weak<Mutex<WaitQueueImpl>>,
160    waiter: WaiterRef,
161    wait_key: WaitKey,
162    waiter_id: WaitEntryId,
163}
164
165struct WaitCancelerZxio {
166    zxio: ZxioWeak,
167    inner: PortWaitCanceler,
168}
169
170struct WaitCancelerPort {
171    inner: PortWaitCanceler,
172}
173
174enum WaitCancelerInner {
175    Zxio(WaitCancelerZxio),
176    Queue(WaitCancelerQueue),
177    Port(WaitCancelerPort),
178}
179
180const WAIT_CANCELER_COMMON_SIZE: usize = 2;
181
182/// Return values for wait_async methods.
183///
184/// Calling `cancel` will cancel any running wait.
185///
186/// Does not implement `Clone` or `Copy` so that only a single canceler exists
187/// per wait.
188pub struct WaitCanceler {
189    cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
190}
191
192impl WaitCanceler {
193    fn new_inner(inner: WaitCancelerInner) -> Self {
194        Self { cancellers: smallvec::smallvec![inner] }
195    }
196
197    pub fn new_noop() -> Self {
198        Self { cancellers: Default::default() }
199    }
200
201    pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
202        Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
203    }
204
205    pub fn new_port(inner: PortWaitCanceler) -> Self {
206        Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
207    }
208
209    /// Equivalent to `merge_unbounded`, except that it enforces that the resulting vector of
210    /// cancellers is small enough to avoid being separately allocated on the heap.
211    ///
212    /// If possible, use this function instead of `merge_unbounded`, because it gives us better
213    /// tools to keep this code path optimized.
214    pub fn merge(self, other: Self) -> Self {
215        // Increase `WAIT_CANCELER_COMMON_SIZE` if needed, or remove this assert and allow the
216        // smallvec to allocate.
217        assert!(
218            self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
219            "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
220            WAIT_CANCELER_COMMON_SIZE,
221            self.cancellers.len(),
222            other.cancellers.len()
223        );
224        WaitCanceler::merge_unbounded(self, other)
225    }
226
227    /// Creates a new `WaitCanceler` that is equivalent to canceling both its arguments.
228    pub fn merge_unbounded(
229        Self { mut cancellers }: Self,
230        Self { cancellers: mut other }: Self,
231    ) -> Self {
232        cancellers.append(&mut other);
233        WaitCanceler { cancellers }
234    }
235
236    /// Cancel the pending wait.
237    ///
238    /// Takes `self` by value since a wait can only be canceled once.
239    pub fn cancel(self) {
240        let Self { cancellers } = self;
241        for canceller in cancellers.into_iter().rev() {
242            match canceller {
243                WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
244                    let Some(zxio) = zxio.upgrade() else { return };
245                    let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
246                    inner.cancel();
247                    zxio.wait_end(signals);
248                }
249                WaitCancelerInner::Queue(WaitCancelerQueue {
250                    wait_queue,
251                    waiter,
252                    wait_key,
253                    waiter_id: WaitEntryId { key, id },
254                }) => {
255                    let Some(wait_queue) = wait_queue.upgrade() else { return };
256                    waiter.remove_callback(&wait_key);
257                    let mut wait_queue = wait_queue.lock();
258                    let waiters = &mut wait_queue.waiters;
259                    if let Some(entry) = waiters.get_mut(key) {
260                        // The map of waiters in a wait queue uses a `Slab` which
261                        // recycles keys. To make sure we are removing the right
262                        // entry, make sure the ID value matches what we expect
263                        // to remove.
264                        if entry.id == id {
265                            waiters.remove(key);
266                        }
267                    }
268                }
269                WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
270                    inner.cancel();
271                }
272            }
273        }
274    }
275}
276
277/// Return values for wait_async methods that monitor the state of a handle.
278///
279/// Calling `cancel` will cancel any running wait.
280///
281/// Does not implement `Clone` or `Copy` so that only a single canceler exists
282/// per wait.
283pub struct PortWaitCanceler {
284    waiter: Weak<PortWaiter>,
285    key: WaitKey,
286}
287
288impl PortWaitCanceler {
289    /// Cancel the pending wait.
290    ///
291    /// Takes `self` by value since a wait can only be canceled once.
292    pub fn cancel(self) {
293        let Self { waiter, key } = self;
294        if let Some(waiter) = waiter.upgrade() {
295            let _ = waiter.port.cancel(key.raw);
296            waiter.remove_callback(&key);
297        }
298    }
299}
300
301#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
302struct WaitKey {
303    raw: u64,
304}
305
306/// The different type of event that can be waited on / triggered.
307#[derive(Clone, Copy, Debug)]
308enum WaitEvents {
309    /// All event: a wait on `All` will be woken up by all event, and a trigger on `All` will wake
310    /// every waiter.
311    All,
312    /// Wait on the set of FdEvents.
313    Fd(FdEvents),
314    /// Wait for the specified value.
315    Value(u64),
316    /// Wait for a signal in a specific mask to be received by the task.
317    SignalMask(SigSet),
318}
319
320impl WaitEvents {
321    /// Returns whether a wait on `self` should be woken up by `other`.
322    fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
323        match (self, other) {
324            (Self::All, _) | (_, Self::All) => true,
325            (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
326            (Self::Value(v1), Self::Value(v2)) => v1 == v2,
327            // A SignalMask event can only be intercepted by another SignalMask event.
328            (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
329            _ => false,
330        }
331    }
332}
333
334impl WaitCallback {
335    pub fn none() -> EventHandler {
336        EventHandler::None
337    }
338}
339
340/// Implementation of Waiter. We put the Waiter data in an Arc so that WaitQueue can tell when the
341/// Waiter has been destroyed by keeping a Weak reference. But this is an implementation detail and
342/// a Waiter should have a single owner. So the Arc is hidden inside Waiter.
343struct PortWaiter {
344    port: PortEvent,
345    callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, // the key 0 is reserved for 'no handler'
346    next_key: AtomicU64Counter,
347    ignore_signals: bool,
348
349    /// Collection of wait queues this Waiter is waiting on, so that when the Waiter is Dropped it
350    /// can remove itself from the queues.
351    ///
352    /// This lock is nested inside the WaitQueue.waiters lock.
353    wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
354}
355
356impl PortWaiter {
357    /// Internal constructor.
358    fn new(ignore_signals: bool) -> Arc<Self> {
359        Arc::new(PortWaiter {
360            port: PortEvent::new(),
361            callbacks: Default::default(),
362            next_key: AtomicU64Counter::new(1),
363            ignore_signals,
364            wait_queues: Default::default(),
365        })
366    }
367
368    /// Waits until the given deadline has passed or the waiter is woken up. See wait_until().
369    fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
370        // This method can block arbitrarily long, possibly waiting for another process. The
371        // current thread should not own any local ref that might delay the release of a resource
372        // while doing so.
373        debug_assert_no_local_temp_ref();
374
375        match self.port.wait(deadline) {
376            PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
377            PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
378            PortWaitResult::Signal { key, observed } => {
379                if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
380                    match callback {
381                        WaitCallback::SignalHandler(handler) => {
382                            if let Some(errno) = handler.handle(observed) {
383                                return Err(errno);
384                            }
385                        }
386                        WaitCallback::EventHandler(_) => {
387                            panic!("wrong type of handler called")
388                        }
389                    }
390                }
391
392                Ok(())
393            }
394            PortWaitResult::TimedOut => error!(ETIMEDOUT),
395        }
396    }
397
398    fn wait_until<L>(
399        self: &Arc<Self>,
400        locked: &mut Locked<L>,
401        current_task: &CurrentTask,
402        run_state: RunState,
403        deadline: zx::MonotonicInstant,
404    ) -> Result<(), Errno>
405    where
406        L: LockEqualOrBefore<FileOpsCore>,
407    {
408        let is_waiting = deadline.into_nanos() > 0;
409
410        let callback = || {
411            // We are susceptible to spurious wakeups because interrupt() posts a message to the port
412            // queue. In addition to more subtle races, there could already be valid messages in the
413            // port queue that will immediately wake us up, leaving the interrupt message in the queue
414            // for subsequent waits (which by then may not have any signals pending) to read.
415            //
416            // It's impossible to non-racily guarantee that a signal is pending so there might always
417            // be an EINTR result here with no signal. But any signal we get when !is_waiting we know is
418            // leftover from before: the top of this function only sets ourself as the
419            // current_task.signals.run_state when there's a nonzero timeout, and that waiter reference
420            // is what is used to signal the interrupt().
421            loop {
422                let wait_result = self.wait_internal(deadline);
423                if let Err(errno) = &wait_result {
424                    if errno.code == EINTR && !is_waiting {
425                        continue; // Spurious wakeup.
426                    }
427                }
428                return wait_result;
429            }
430        };
431
432        // Trigger delayed releaser before blocking.
433        current_task.trigger_delayed_releaser(locked);
434
435        if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
436    }
437
438    fn next_key(&self) -> WaitKey {
439        let key = self.next_key.next();
440        // TODO - find a better reaction to wraparound
441        assert!(key != 0, "bad key from u64 wraparound");
442        WaitKey { raw: key }
443    }
444
445    fn register_callback(&self, callback: WaitCallback) -> WaitKey {
446        let key = self.next_key();
447        assert!(
448            self.callbacks.lock().insert(key, callback).is_none(),
449            "unexpected callback already present for key {key:?}"
450        );
451        key
452    }
453
454    fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
455        self.callbacks.lock().remove(&key)
456    }
457
458    fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
459        let callback = WaitCallback::EventHandler(handler);
460        let key = self.register_callback(callback);
461        self.queue_events(&key, WaitEvents::Fd(events));
462    }
463
464    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
465    /// confused with POSIX signals), optionally running a FnOnce. Wait operations will return
466    /// the error code present in the provided SignalHandler.
467    ///
468    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
469    fn wake_on_zircon_signals(
470        self: &Arc<Self>,
471        handle: &dyn zx::AsHandleRef,
472        zx_signals: zx::Signals,
473        handler: SignalHandler,
474    ) -> Result<PortWaitCanceler, zx::Status> {
475        let callback = WaitCallback::SignalHandler(handler);
476        let key = self.register_callback(callback);
477        self.port.object_wait_async(
478            handle,
479            key.raw,
480            zx_signals,
481            zx::WaitAsyncOpts::EDGE_TRIGGERED,
482        )?;
483        Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
484    }
485
486    fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
487        scopeguard::defer! {
488            self.port.notify(NotifyKind::Regular)
489        }
490
491        // Handling user events immediately when they are triggered breaks any
492        // ordering expectations on Linux by batching all starnix events with
493        // the first starnix event even if other events occur on the Fuchsia
494        // platform (and are enqueued to the `zx::Port`) between them. This
495        // ordering does not seem to be load-bearing for applications running on
496        // starnix so we take the divergence in ordering in favour of improved
497        // performance (by minimizing syscalls) when operating on FDs backed by
498        // starnix.
499        //
500        // TODO(https://fxbug.dev/42084319): If we can read a batch of packets
501        // from the `zx::Port`, maybe we can keep the ordering?
502        let Some(callback) = self.remove_callback(key) else {
503            return;
504        };
505
506        match callback {
507            WaitCallback::EventHandler(handler) => {
508                let events = match events {
509                    // If the event is All, signal on all possible fd
510                    // events.
511                    WaitEvents::All => FdEvents::all(),
512                    WaitEvents::Fd(events) => events,
513                    WaitEvents::SignalMask(_) => FdEvents::POLLIN,
514                    _ => panic!("wrong type of handler called: {events:?}"),
515                };
516                handler.handle(events)
517            }
518            WaitCallback::SignalHandler(_) => {
519                panic!("wrong type of handler called")
520            }
521        }
522    }
523
524    fn notify(&self) {
525        self.port.notify(NotifyKind::Regular);
526    }
527
528    fn interrupt(&self) {
529        if self.ignore_signals {
530            return;
531        }
532        self.port.notify(NotifyKind::Interrupt);
533    }
534}
535
536impl std::fmt::Debug for PortWaiter {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
539    }
540}
541
542/// A type that can put a thread to sleep waiting for a condition.
543#[derive(Debug, Clone)]
544pub struct Waiter {
545    // TODO(https://g-issues.fuchsia.dev/issues/303068424): Avoid `PortWaiter`
546    // when operating purely over FDs backed by starnix.
547    inner: Arc<PortWaiter>,
548}
549
550impl Waiter {
551    /// Create a new waiter.
552    pub fn new() -> Self {
553        Self { inner: PortWaiter::new(false) }
554    }
555
556    /// Create a new waiter that doesn't wake up when a signal is received.
557    pub fn new_ignoring_signals() -> Self {
558        Self { inner: PortWaiter::new(true) }
559    }
560
561    /// Create a weak reference to this waiter.
562    fn weak(&self) -> WaiterRef {
563        WaiterRef::from_port(&self.inner)
564    }
565
566    /// Freeze the task until the waiter is woken up.
567    ///
568    /// No signal, e.g. EINTR (interrupt), should be received.
569    pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
570    where
571        L: LockEqualOrBefore<FileOpsCore>,
572    {
573        while self
574            .inner
575            .wait_until(
576                locked,
577                current_task,
578                RunState::Frozen(self.clone()),
579                zx::MonotonicInstant::INFINITE,
580            )
581            .is_err()
582        {
583            // Avoid attempting to freeze the task if there is a pending SIGKILL.
584            if current_task.read().has_signal_pending(SIGKILL) {
585                break;
586            }
587            // Ignore spurious wakeups from the [`PortEvent.futex`]
588        }
589    }
590
591    /// Wait until the waiter is woken up.
592    ///
593    /// If the wait is interrupted (see [`Waiter::interrupt`]), this function returns EINTR.
594    pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
595    where
596        L: LockEqualOrBefore<FileOpsCore>,
597    {
598        self.inner.wait_until(
599            locked,
600            current_task,
601            RunState::Waiter(WaiterRef::from_port(&self.inner)),
602            zx::MonotonicInstant::INFINITE,
603        )
604    }
605
606    /// Wait until the given deadline has passed or the waiter is woken up.
607    ///
608    /// If the wait deadline is nonzero and is interrupted (see [`Waiter::interrupt`]), this
609    /// function returns EINTR. Callers must take special care not to lose any accumulated data or
610    /// local state when EINTR is received as this is a normal and recoverable situation.
611    ///
612    /// Using a 0 deadline (no waiting, useful for draining pending events) will not wait and is
613    /// guaranteed not to issue EINTR.
614    ///
615    /// It the timeout elapses with no events, this function returns ETIMEDOUT.
616    ///
617    /// Processes at most one event. If the caller is interested in draining the events, it should
618    /// repeatedly call this function with a 0 deadline until it reports ETIMEDOUT. (This case is
619    /// why a 0 deadline must not return EINTR, as previous calls to wait_until() may have
620    /// accumulated state that would be lost when returning EINTR to userspace.)
621    ///
622    /// It is up to the caller (the "waiter") to make sure that it synchronizes with any object
623    /// that triggers an event (the "notifier"). This `Waiter` does not provide any synchronization
624    /// itself. Note that synchronization between the "waiter" the "notifier" may be provided by
625    /// the [`EventHandler`] used to handle an event iff the waiter observes the side-effects of
626    /// the handler (e.g. reading the ready list modified by [`EventHandler::Enqueue`] or
627    /// [`EventHandler::EnqueueOnce`]).
628    pub fn wait_until<L>(
629        &self,
630        locked: &mut Locked<L>,
631        current_task: &CurrentTask,
632        deadline: zx::MonotonicInstant,
633    ) -> Result<(), Errno>
634    where
635        L: LockEqualOrBefore<FileOpsCore>,
636    {
637        self.inner.wait_until(
638            locked,
639            current_task,
640            RunState::Waiter(WaiterRef::from_port(&self.inner)),
641            deadline,
642        )
643    }
644
645    fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
646        WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
647    }
648
649    fn create_wait_entry_with_handler(
650        &self,
651        filter: WaitEvents,
652        handler: EventHandler,
653    ) -> WaitEntry {
654        let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
655        WaitEntry { waiter: self.weak(), filter, key }
656    }
657
658    pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
659        self.inner.wake_immediately(events, handler);
660    }
661
662    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
663    /// confused with POSIX signals), optionally running a FnOnce.
664    ///
665    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
666    pub fn wake_on_zircon_signals(
667        &self,
668        handle: &dyn zx::AsHandleRef,
669        zx_signals: zx::Signals,
670        handler: SignalHandler,
671    ) -> Result<PortWaitCanceler, zx::Status> {
672        self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
673    }
674
675    /// Return a WaitCanceler representing a wait that will never complete. Useful for stub
676    /// implementations that should block forever even though a real implementation would wake up
677    /// eventually.
678    pub fn fake_wait(&self) -> WaitCanceler {
679        WaitCanceler::new_noop()
680    }
681
682    // Notify the waiter to wake it up without signalling any events.
683    pub fn notify(&self) {
684        self.inner.notify();
685    }
686
687    /// Interrupt the waiter to deliver a signal. The wait operation will return EINTR, and a
688    /// typical caller should then unwind to the syscall dispatch loop to let the signal be
689    /// processed. See wait_until() for more details.
690    ///
691    /// Ignored if the waiter was created with new_ignoring_signals().
692    pub fn interrupt(&self) {
693        self.inner.interrupt();
694    }
695}
696
697impl Drop for Waiter {
698    fn drop(&mut self) {
699        // Delete ourselves from each wait queue we know we're on to prevent Weak references to
700        // ourself from sticking around forever.
701        let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
702        for wait_queue in wait_queues {
703            if let Some(wait_queue) = wait_queue.upgrade() {
704                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
705            }
706        }
707    }
708}
709
710impl Default for Waiter {
711    fn default() -> Self {
712        Self::new()
713    }
714}
715
716impl PartialEq for Waiter {
717    fn eq(&self, other: &Self) -> bool {
718        Arc::ptr_eq(&self.inner, &other.inner)
719    }
720}
721
722pub struct SimpleWaiter {
723    event: Arc<InterruptibleEvent>,
724    wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
725}
726
727impl SimpleWaiter {
728    pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
729        (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
730    }
731}
732
733impl Drop for SimpleWaiter {
734    fn drop(&mut self) {
735        for wait_queue in &self.wait_queues {
736            if let Some(wait_queue) = wait_queue.upgrade() {
737                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
738            }
739        }
740    }
741}
742
743#[derive(Debug, Clone)]
744enum WaiterKind {
745    Port(Weak<PortWaiter>),
746    Event(Weak<InterruptibleEvent>),
747    AbortHandle(Weak<futures::stream::AbortHandle>),
748}
749
750impl Default for WaiterKind {
751    fn default() -> Self {
752        WaiterKind::Port(Default::default())
753    }
754}
755
756/// A weak reference to a Waiter. Intended for holding in wait queues or stashing elsewhere for
757/// calling queue_events later.
758#[derive(Debug, Default, Clone)]
759pub struct WaiterRef(WaiterKind);
760
761impl WaiterRef {
762    fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
763        WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
764    }
765
766    fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
767        WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
768    }
769
770    pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
771        WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
772    }
773
774    pub fn is_valid(&self) -> bool {
775        match &self.0 {
776            WaiterKind::Port(waiter) => waiter.strong_count() != 0,
777            WaiterKind::Event(event) => event.strong_count() != 0,
778            WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
779        }
780    }
781
782    pub fn interrupt(&self) {
783        match &self.0 {
784            WaiterKind::Port(waiter) => {
785                if let Some(waiter) = waiter.upgrade() {
786                    waiter.interrupt();
787                }
788            }
789            WaiterKind::Event(event) => {
790                if let Some(event) = event.upgrade() {
791                    event.interrupt();
792                }
793            }
794            WaiterKind::AbortHandle(handle) => {
795                if let Some(handle) = handle.upgrade() {
796                    handle.abort();
797                }
798            }
799        }
800    }
801
802    fn remove_callback(&self, key: &WaitKey) {
803        match &self.0 {
804            WaiterKind::Port(waiter) => {
805                if let Some(waiter) = waiter.upgrade() {
806                    waiter.remove_callback(key);
807                }
808            }
809            _ => (),
810        }
811    }
812
813    /// Called by the WaitQueue when this waiter is about to be removed from the queue.
814    ///
815    /// TODO(abarth): This function does not appear to be called when the WaitQueue is dropped,
816    /// which appears to be a leak.
817    fn will_remove_from_wait_queue(&self, key: &WaitKey) {
818        match &self.0 {
819            WaiterKind::Port(waiter) => {
820                if let Some(waiter) = waiter.upgrade() {
821                    waiter.wait_queues.lock().remove(key);
822                }
823            }
824            _ => (),
825        }
826    }
827
828    /// Notify the waiter that the `events` have occurred.
829    ///
830    /// If the client is using an `SimpleWaiter`, they will be notified but they will not learn
831    /// which events occurred.
832    ///
833    /// If the client is using an `AbortHandle`, `AbortHandle::abort()` will be called.
834    fn notify(&self, key: &WaitKey, events: WaitEvents) -> bool {
835        match &self.0 {
836            WaiterKind::Port(waiter) => {
837                if let Some(waiter) = waiter.upgrade() {
838                    waiter.queue_events(key, events);
839                    return true;
840                }
841            }
842            WaiterKind::Event(event) => {
843                if let Some(event) = event.upgrade() {
844                    event.notify();
845                    return true;
846                }
847            }
848            WaiterKind::AbortHandle(handle) => {
849                if let Some(handle) = handle.upgrade() {
850                    handle.abort();
851                    return true;
852                }
853            }
854        }
855        false
856    }
857}
858
859impl PartialEq<Waiter> for WaiterRef {
860    fn eq(&self, other: &Waiter) -> bool {
861        match &self.0 {
862            WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
863            _ => false,
864        }
865    }
866}
867
868impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
869    fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
870        match &self.0 {
871            WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
872            _ => false,
873        }
874    }
875}
876
877impl PartialEq for WaiterRef {
878    fn eq(&self, other: &WaiterRef) -> bool {
879        match (&self.0, &other.0) {
880            (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
881            (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
882            (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
883            _ => false,
884        }
885    }
886}
887
888/// A list of waiters waiting for some event.
889///
890/// For events that are generated inside Starnix, we walk the wait queue
891/// on the thread that triggered the event to notify the waiters that the event
892/// has occurred. The waiters will then wake up on their own thread to handle
893/// the event.
894#[derive(Default, Debug)]
895pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
896
897#[derive(Debug)]
898struct WaitEntryWithId {
899    entry: WaitEntry,
900    /// The ID use to uniquely identify this wait entry even if it shares the
901    /// key used in the wait queue's [`DenseMap`] with another wait entry since
902    /// a dense map's keys are recycled.
903    id: u64,
904}
905
906struct WaitEntryId {
907    key: usize,
908    id: u64,
909}
910
911#[derive(Default, Debug)]
912struct WaitQueueImpl {
913    /// Holds the next ID value to use when adding a new `WaitEntry` to the
914    /// waiters (dense) map.
915    ///
916    /// A [`DenseMap`]s keys are recycled so we use the ID to uniquely identify
917    /// a wait entry.
918    next_wait_entry_id: u64,
919    /// The list of waiters.
920    ///
921    /// The waiter's wait_queues lock is nested inside this lock.
922    waiters: Slab<WaitEntryWithId>,
923}
924
925/// An entry in a WaitQueue.
926#[derive(Debug)]
927struct WaitEntry {
928    /// The waiter that is waking for the FdEvent.
929    waiter: WaiterRef,
930
931    /// The events that the waiter is waiting for.
932    filter: WaitEvents,
933
934    /// key for cancelling and queueing events
935    key: WaitKey,
936}
937
938impl WaitQueue {
939    fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
940        let mut wait_queue = self.0.lock();
941        let id = wait_queue
942            .next_wait_entry_id
943            .checked_add(1)
944            .expect("all possible wait entry ID values exhausted");
945        wait_queue.next_wait_entry_id = id;
946        WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
947    }
948
949    /// Establish a wait for the given entry.
950    ///
951    /// The waiter will be notified when an event matching the entry occurs.
952    ///
953    /// This function does not actually block the waiter. To block the waiter,
954    /// call the [`Waiter::wait`] function on the waiter.
955    ///
956    /// Returns a `WaitCanceler` that can be used to cancel the wait.
957    fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
958        let wait_key = entry.key;
959        let waiter_id = self.add_waiter(entry);
960        let wait_queue = Arc::downgrade(&self.0);
961        waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
962        WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
963            wait_queue,
964            waiter: waiter.weak(),
965            wait_key,
966            waiter_id,
967        }))
968    }
969
970    /// Establish a wait for the given value event.
971    ///
972    /// The waiter will be notified when an event with the same value occurs.
973    ///
974    /// This function does not actually block the waiter. To block the waiter,
975    /// call the [`Waiter::wait`] function on the waiter.
976    ///
977    /// Returns a `WaitCanceler` that can be used to cancel the wait.
978    pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
979        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
980    }
981
982    /// Establish a wait for the given FdEvents.
983    ///
984    /// The waiter will be notified when an event matching the `events` occurs.
985    ///
986    /// This function does not actually block the waiter. To block the waiter,
987    /// call the [`Waiter::wait`] function on the waiter.
988    ///
989    /// Returns a `WaitCanceler` that can be used to cancel the wait.
990    pub fn wait_async_fd_events(
991        &self,
992        waiter: &Waiter,
993        events: FdEvents,
994        handler: EventHandler,
995    ) -> WaitCanceler {
996        let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
997        self.wait_async_entry(waiter, entry)
998    }
999
1000    /// Establish a wait for a particular signal mask.
1001    ///
1002    /// The waiter will be notified when a signal in the mask is received.
1003    ///
1004    /// This function does not actually block the waiter. To block the waiter,
1005    // call the [`Waiter::wait`] function on the waiter.
1006    ///
1007    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1008    pub fn wait_async_signal_mask(
1009        &self,
1010        waiter: &Waiter,
1011        mask: SigSet,
1012        handler: EventHandler,
1013    ) -> WaitCanceler {
1014        let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1015        self.wait_async_entry(waiter, entry)
1016    }
1017
1018    /// Establish a wait for any event.
1019    ///
1020    /// The waiter will be notified when any event occurs.
1021    ///
1022    /// This function does not actually block the waiter. To block the waiter,
1023    /// call the [`Waiter::wait`] function on the waiter.
1024    ///
1025    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1026    pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1027        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1028    }
1029
1030    pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1031        let entry = WaitEntry {
1032            waiter: WaiterRef::from_event(&waiter.event),
1033            filter: WaitEvents::All,
1034            key: Default::default(),
1035        };
1036        waiter.wait_queues.push(Arc::downgrade(&self.0));
1037        self.add_waiter(entry);
1038    }
1039
1040    fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1041        if let WaitEvents::Fd(ref mut fd_events) = events {
1042            *fd_events = fd_events.add_equivalent_fd_events();
1043        }
1044        let mut woken = 0;
1045        self.0.lock().waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1046            if limit > 0 && entry.filter.intercept(&events) {
1047                if entry.waiter.notify(&entry.key, events) {
1048                    limit -= 1;
1049                    woken += 1;
1050                }
1051
1052                entry.waiter.will_remove_from_wait_queue(&entry.key);
1053                false
1054            } else {
1055                true
1056            }
1057        });
1058        woken
1059    }
1060
1061    pub fn notify_fd_events(&self, events: FdEvents) {
1062        self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1063    }
1064
1065    pub fn notify_signal(&self, signal: &Signal) {
1066        let event = WaitEvents::SignalMask(SigSet::from(*signal));
1067        self.notify_events_count(event, usize::MAX);
1068    }
1069
1070    pub fn notify_value(&self, value: u64) {
1071        self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1072    }
1073
1074    pub fn notify_unordered_count(&self, limit: usize) {
1075        self.notify_events_count(WaitEvents::All, limit);
1076    }
1077
1078    pub fn notify_all(&self) {
1079        self.notify_unordered_count(usize::MAX);
1080    }
1081
1082    /// Returns whether there is no active waiters waiting on this `WaitQueue`.
1083    pub fn is_empty(&self) -> bool {
1084        self.0.lock().waiters.is_empty()
1085    }
1086}
1087
1088/// A wait queue that dispatches events based on the value of an enum.
1089pub struct TypedWaitQueue<T: Into<u64>> {
1090    wait_queue: WaitQueue,
1091    value_type: std::marker::PhantomData<T>,
1092}
1093
1094// We can't #[derive(Default)] on [TypedWaitQueue<T>] as T may not implement the Default trait.
1095impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1096    fn default() -> Self {
1097        Self { wait_queue: Default::default(), value_type: Default::default() }
1098    }
1099}
1100
1101impl<T: Into<u64>> TypedWaitQueue<T> {
1102    pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1103        self.wait_queue.wait_async_value(waiter, value.into())
1104    }
1105
1106    pub fn notify_value(&self, value: T) {
1107        self.wait_queue.notify_value(value.into())
1108    }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use super::*;
1114    use crate::fs::fuchsia::create_fuchsia_pipe;
1115    use crate::signals::SignalInfo;
1116    use crate::task::TaskFlags;
1117    use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1118    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1119    use crate::vfs::eventfd::{EventFdType, new_eventfd};
1120    use assert_matches::assert_matches;
1121    use starnix_sync::Unlocked;
1122    use starnix_uapi::open_flags::OpenFlags;
1123    use starnix_uapi::signals::SIGUSR1;
1124
1125    const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1126
1127    #[::fuchsia::test]
1128    async fn test_async_wait_exec() {
1129        spawn_kernel_and_run(async |locked, current_task| {
1130            let (local_socket, remote_socket) = zx::Socket::create_stream();
1131            let pipe =
1132                create_fuchsia_pipe(locked, &current_task, remote_socket, OpenFlags::RDWR).unwrap();
1133
1134            const MEM_SIZE: usize = 1024;
1135            let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1136
1137            let test_string = "hello startnix".to_string();
1138            let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1139            let handler = EventHandler::Enqueue {
1140                key: KEY,
1141                queue: queue.clone(),
1142                sought_events: FdEvents::all(),
1143            };
1144            let waiter = Waiter::new();
1145            pipe.wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1146                .expect("wait_async");
1147            let test_string_clone = test_string.clone();
1148
1149            let write_count = AtomicUsizeCounter::default();
1150            std::thread::scope(|s| {
1151                let thread = s.spawn(|| {
1152                    let test_data = test_string_clone.as_bytes();
1153                    let no_written = local_socket.write(test_data).unwrap();
1154                    assert_eq!(0, write_count.add(no_written));
1155                    assert_eq!(no_written, test_data.len());
1156                });
1157
1158                // this code would block on failure
1159
1160                assert!(queue.lock().is_empty());
1161                waiter.wait(locked, &current_task).unwrap();
1162                thread.join().expect("join thread")
1163            });
1164            queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1165
1166            let read_size = pipe.read(locked, &current_task, &mut output_buffer).unwrap();
1167
1168            let no_written = write_count.get();
1169            assert_eq!(no_written, read_size);
1170
1171            assert_eq!(output_buffer.data(), test_string.as_bytes());
1172        })
1173        .await;
1174    }
1175
1176    #[::fuchsia::test]
1177    async fn test_async_wait_cancel() {
1178        for do_cancel in [true, false] {
1179            spawn_kernel_and_run(async move |locked, current_task| {
1180                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
1181                let waiter = Waiter::new();
1182                let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1183                let handler = EventHandler::Enqueue {
1184                    key: KEY,
1185                    queue: queue.clone(),
1186                    sought_events: FdEvents::all(),
1187                };
1188                let wait_canceler = event
1189                    .wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1190                    .expect("wait_async");
1191                if do_cancel {
1192                    wait_canceler.cancel();
1193                }
1194                let add_val = 1u64;
1195                assert_eq!(
1196                    event
1197                        .write(
1198                            locked,
1199                            &current_task,
1200                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1201                        )
1202                        .unwrap(),
1203                    std::mem::size_of::<u64>()
1204                );
1205
1206                let wait_result =
1207                    waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO);
1208                let final_count = queue.lock().len();
1209                if do_cancel {
1210                    assert_eq!(wait_result, error!(ETIMEDOUT));
1211                    assert_eq!(0, final_count);
1212                } else {
1213                    assert_eq!(wait_result, Ok(()));
1214                    assert_eq!(1, final_count);
1215                }
1216            })
1217            .await;
1218        }
1219    }
1220
1221    #[::fuchsia::test]
1222    async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1223        spawn_kernel_and_run(async |locked, current_task| {
1224            let wait_queue = WaitQueue::default();
1225            let waiter = Waiter::new();
1226            let wk1 = wait_queue.wait_async(&waiter);
1227            let _wk2 = wait_queue.wait_async(&waiter);
1228            wk1.cancel();
1229            wait_queue.notify_all();
1230            assert!(waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1231        })
1232        .await;
1233    }
1234
1235    #[::fuchsia::test]
1236    async fn multiple_waiters_cancel_one_other_still_notified() {
1237        spawn_kernel_and_run(async |locked, current_task| {
1238            let wait_queue = WaitQueue::default();
1239            let waiter1 = Waiter::new();
1240            let waiter2 = Waiter::new();
1241            let wk1 = wait_queue.wait_async(&waiter1);
1242            let _wk2 = wait_queue.wait_async(&waiter2);
1243            wk1.cancel();
1244            wait_queue.notify_all();
1245            assert!(waiter1.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_err());
1246            assert!(waiter2.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1247        })
1248        .await;
1249    }
1250
1251    #[::fuchsia::test]
1252    async fn test_wait_queue() {
1253        spawn_kernel_and_run(async |locked, current_task| {
1254            let queue = WaitQueue::default();
1255
1256            let waiters = <[Waiter; 3]>::default();
1257            waiters.iter().for_each(|w| {
1258                queue.wait_async(w);
1259            });
1260
1261            let woken = |locked: &mut Locked<Unlocked>| {
1262                waiters
1263                    .iter()
1264                    .filter(|w| {
1265                        w.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok()
1266                    })
1267                    .count()
1268            };
1269
1270            const INITIAL_NOTIFY_COUNT: usize = 2;
1271            let total_waiters = waiters.len();
1272            queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1273            assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1274
1275            // Only the remaining (unnotified) waiters should be notified.
1276            queue.notify_all();
1277            assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1278        })
1279        .await;
1280    }
1281
1282    #[::fuchsia::test]
1283    async fn waiter_kind_abort_handle() {
1284        spawn_kernel_and_run_sync(|_locked, current_task| {
1285            let mut executor = fuchsia_async::TestExecutor::new();
1286            let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1287            let abort_handle = Arc::new(abort_handle);
1288            let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1289
1290            let mut fut = futures::stream::Abortable::new(
1291                futures::future::pending::<()>(),
1292                abort_registration,
1293            );
1294
1295            assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1296
1297            waiter_ref.interrupt();
1298            let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1299                match executor.run_singlethreaded(&mut fut) {
1300                    Ok(()) => unreachable!("future never terminates normally"),
1301                    Err(futures::stream::Aborted) => Ok(()),
1302                }
1303            });
1304
1305            assert_eq!(output, Ok(()));
1306        })
1307        .await;
1308    }
1309
1310    #[::fuchsia::test]
1311    async fn freeze_with_pending_sigusr1() {
1312        spawn_kernel_and_run(async |_locked, current_task| {
1313            {
1314                let mut task_state = current_task.task.write();
1315                let siginfo = SignalInfo::default(SIGUSR1);
1316                task_state.enqueue_signal(siginfo);
1317                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1318            }
1319
1320            let output: Result<(), Errno> = current_task
1321                .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1322                    unreachable!("callback should not be called")
1323                });
1324            assert_eq!(output, error!(EINTR));
1325
1326            let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1327            assert_eq!(output, Ok(()));
1328        })
1329        .await;
1330    }
1331
1332    #[::fuchsia::test]
1333    async fn freeze_with_pending_sigkill() {
1334        spawn_kernel_and_run(async |_locked, current_task| {
1335            {
1336                let mut task_state = current_task.task.write();
1337                let siginfo = SignalInfo::default(SIGKILL);
1338                task_state.enqueue_signal(siginfo);
1339                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1340            }
1341
1342            let output: Result<(), _> = current_task
1343                .run_in_state(RunState::Frozen(Waiter::new()), move || {
1344                    unreachable!("callback should not be called")
1345                });
1346            assert_eq!(output, error!(EINTR));
1347        })
1348        .await;
1349    }
1350}