Skip to main content

starnix_core/task/
waiter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task::{CurrentTask, RunState};
6use crate::vfs::{EpollEventHandler, FdNumber};
7use bitflags::bitflags;
8use futures::stream::AbortHandle;
9use slab::Slab;
10use smallvec::SmallVec;
11use starnix_lifecycle::AtomicCounter;
12use starnix_sync::{
13    EventHandlerReadyQueueLock, EventWaitGuard, FileOpsCore, InterruptibleEvent, LockDepMutex,
14    LockEqualOrBefore, Locked, Mutex, NotifyKind, PortEvent, PortWaitResult,
15    WaiterEventHandlerLock,
16};
17use starnix_types::ownership::debug_assert_no_local_temp_ref;
18use starnix_uapi::error;
19use starnix_uapi::errors::{EINTR, Errno};
20use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
21use starnix_uapi::vfs::FdEvents;
22use std::collections::{HashMap, VecDeque};
23use std::sync::{Arc, Weak};
24use syncio::zxio::zxio_signals_t;
25use syncio::{ZxioSignals, ZxioWeak};
26
27#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
28pub enum ReadyItemKey {
29    FdNumber(FdNumber),
30    Usize(usize),
31}
32
33impl From<FdNumber> for ReadyItemKey {
34    fn from(v: FdNumber) -> Self {
35        Self::FdNumber(v)
36    }
37}
38
39impl From<usize> for ReadyItemKey {
40    fn from(v: usize) -> Self {
41        Self::Usize(v)
42    }
43}
44
45#[derive(Debug, Copy, Clone)]
46pub struct ReadyItem {
47    pub key: ReadyItemKey,
48    pub events: FdEvents,
49}
50
51#[derive(Clone)]
52pub enum EventHandler {
53    /// Does nothing.
54    ///
55    /// It is up to the waiter to synchronize itself with the notifier if
56    /// synchronization is needed.
57    None,
58
59    /// Enqueues an event to a ready list.
60    ///
61    /// This event handler naturally synchronizes the notifier and notifee
62    /// because of the lock acquired/released when enqueuing the event.
63    Enqueue {
64        key: ReadyItemKey,
65        queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>>,
66        sought_events: FdEvents,
67    },
68
69    /// Wraps another EventHandler and only triggers it once. Further .handle() calls are ignored.
70    ///
71    /// This is intended for cases like BinderFileObject which need to register
72    /// the same EventHandler on multiple wait queues.
73    HandleOnce(Arc<LockDepMutex<Option<EventHandler>, WaiterEventHandlerLock>>),
74
75    /// This handler is an epoll.
76    Epoll(EpollEventHandler),
77}
78
79impl EventHandler {
80    pub fn handle(self, events: FdEvents) {
81        match self {
82            Self::None => {}
83            Self::Enqueue { key, queue, sought_events } => {
84                let events = events & sought_events;
85                queue.lock().push_back(ReadyItem { key, events });
86            }
87            Self::HandleOnce(inner) => {
88                if let Some(inner) = inner.lock().take() {
89                    inner.handle(events);
90                }
91            }
92            Self::Epoll(e) => e.handle(events),
93        }
94    }
95}
96
97pub struct ZxioSignalHandler {
98    pub zxio: ZxioWeak,
99    pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
100}
101
102// The counter is incremented as each handle is signaled; when the counter reaches the handle
103// count, the event handler is called with the given events.
104pub struct ManyZxHandleSignalHandler {
105    pub count: usize,
106    pub counter: Arc<AtomicCounter<usize>>,
107    pub expected_signals: zx::Signals,
108    pub events: FdEvents,
109}
110
111pub enum SignalHandlerInner {
112    None,
113    Zxio(ZxioSignalHandler),
114    ZxHandle(fn(zx::Signals) -> FdEvents),
115    ManyZxHandle(ManyZxHandleSignalHandler),
116}
117
118pub struct SignalHandler {
119    pub inner: SignalHandlerInner,
120    pub event_handler: EventHandler,
121    pub err_code: Option<Errno>,
122}
123
124impl SignalHandler {
125    fn handle(self, signals: zx::Signals) -> Option<Errno> {
126        let SignalHandler { inner, event_handler, err_code } = self;
127        let events = match inner {
128            SignalHandlerInner::None => None,
129            SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
130                if let Some(zxio) = zxio.upgrade() {
131                    Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
132                } else {
133                    None
134                }
135            }
136            SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
137                Some(get_events_from_zx_signals(signals))
138            }
139            SignalHandlerInner::ManyZxHandle(signal_handler) => {
140                if signals.contains(signal_handler.expected_signals) {
141                    let new_count = signal_handler.counter.next() + 1;
142                    assert!(new_count <= signal_handler.count);
143                    if new_count == signal_handler.count {
144                        Some(signal_handler.events)
145                    } else {
146                        None
147                    }
148                } else {
149                    None
150                }
151            }
152        };
153        if let Some(events) = events {
154            event_handler.handle(events)
155        }
156        err_code
157    }
158}
159
160pub enum WaitCallback {
161    SignalHandler(SignalHandler),
162    EventHandler(EventHandler),
163}
164
165struct WaitCancelerQueue {
166    wait_queue: Weak<Mutex<WaitQueueImpl>>,
167    waiter: WaiterRef,
168    wait_key: WaitKey,
169    waiter_id: WaitEntryId,
170}
171
172struct WaitCancelerZxio {
173    zxio: ZxioWeak,
174    inner: PortWaitCanceler,
175}
176
177struct WaitCancelerPort {
178    inner: PortWaitCanceler,
179}
180
181enum WaitCancelerInner {
182    Zxio(WaitCancelerZxio),
183    Queue(WaitCancelerQueue),
184    Port(WaitCancelerPort),
185}
186
187enum NotifiableRef {
188    Port(Arc<PortWaiter>),
189    Event(Arc<InterruptibleEvent>),
190    AbortHandle(Arc<AbortHandle>),
191}
192
193const WAIT_CANCELER_COMMON_SIZE: usize = 2;
194
195/// Return values for wait_async methods.
196///
197/// Calling `cancel` will cancel any running wait.
198///
199/// Does not implement `Clone` or `Copy` so that only a single canceler exists
200/// per wait.
201pub struct WaitCanceler {
202    cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
203}
204
205impl WaitCanceler {
206    fn new_inner(inner: WaitCancelerInner) -> Self {
207        Self { cancellers: smallvec::smallvec![inner] }
208    }
209
210    pub fn new_noop() -> Self {
211        Self { cancellers: Default::default() }
212    }
213
214    pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
215        Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
216    }
217
218    pub fn new_port(inner: PortWaitCanceler) -> Self {
219        Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
220    }
221
222    /// Equivalent to `merge_unbounded`, except that it enforces that the resulting vector of
223    /// cancellers is small enough to avoid being separately allocated on the heap.
224    ///
225    /// If possible, use this function instead of `merge_unbounded`, because it gives us better
226    /// tools to keep this code path optimized.
227    pub fn merge(self, other: Self) -> Self {
228        // Increase `WAIT_CANCELER_COMMON_SIZE` if needed, or remove this assert and allow the
229        // smallvec to allocate.
230        assert!(
231            self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
232            "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
233            WAIT_CANCELER_COMMON_SIZE,
234            self.cancellers.len(),
235            other.cancellers.len()
236        );
237        WaitCanceler::merge_unbounded(self, other)
238    }
239
240    /// Creates a new `WaitCanceler` that is equivalent to canceling both its arguments.
241    pub fn merge_unbounded(
242        Self { mut cancellers }: Self,
243        Self { cancellers: mut other }: Self,
244    ) -> Self {
245        cancellers.append(&mut other);
246        WaitCanceler { cancellers }
247    }
248
249    /// Cancel the pending wait.
250    ///
251    /// Takes `self` by value since a wait can only be canceled once.
252    pub fn cancel(self) {
253        let Self { cancellers } = self;
254        for canceller in cancellers.into_iter().rev() {
255            match canceller {
256                WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
257                    let Some(zxio) = zxio.upgrade() else { return };
258                    let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
259                    inner.cancel();
260                    zxio.wait_end(signals);
261                }
262                WaitCancelerInner::Queue(WaitCancelerQueue {
263                    wait_queue,
264                    waiter,
265                    wait_key,
266                    waiter_id: WaitEntryId { key, id },
267                }) => {
268                    let Some(wait_queue) = wait_queue.upgrade() else { return };
269                    waiter.remove_callback(&wait_key);
270                    waiter.will_remove_from_wait_queue(&wait_key);
271                    let mut wait_queue = wait_queue.lock();
272                    let waiters = &mut wait_queue.waiters;
273                    if let Some(entry) = waiters.get_mut(key) {
274                        // The map of waiters in a wait queue uses a `Slab` which
275                        // recycles keys. To make sure we are removing the right
276                        // entry, make sure the ID value matches what we expect
277                        // to remove.
278                        if entry.id == id {
279                            waiters.remove(key);
280                        }
281                    }
282                }
283                WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
284                    inner.cancel();
285                }
286            }
287        }
288    }
289}
290
291/// Return values for wait_async methods that monitor the state of a handle.
292///
293/// Calling `cancel` will cancel any running wait.
294///
295/// Does not implement `Clone` or `Copy` so that only a single canceler exists
296/// per wait.
297pub struct PortWaitCanceler {
298    waiter: Weak<PortWaiter>,
299    key: WaitKey,
300}
301
302impl PortWaitCanceler {
303    /// Cancel the pending wait.
304    ///
305    /// Takes `self` by value since a wait can only be canceled once.
306    pub fn cancel(self) {
307        let Self { waiter, key } = self;
308        if let Some(waiter) = waiter.upgrade() {
309            let _ = waiter.port.cancel(key.raw);
310            waiter.remove_callback(&key);
311        }
312    }
313}
314
315#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
316struct WaitKey {
317    raw: u64,
318}
319
320/// The different type of event that can be waited on / triggered.
321#[derive(Clone, Copy, Debug)]
322enum WaitEvents {
323    /// All event: a wait on `All` will be woken up by all event, and a trigger on `All` will wake
324    /// every waiter.
325    All,
326    /// Wait on the set of FdEvents.
327    Fd(FdEvents),
328    /// Wait for the specified value.
329    Value(u64),
330    /// Wait for a signal in a specific mask to be received by the task.
331    SignalMask(SigSet),
332}
333
334impl WaitEvents {
335    /// Returns whether a wait on `self` should be woken up by `other`.
336    fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
337        match (self, other) {
338            (Self::All, _) | (_, Self::All) => true,
339            (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
340            (Self::Value(v1), Self::Value(v2)) => v1 == v2,
341            // A SignalMask event can only be intercepted by another SignalMask event.
342            (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
343            _ => false,
344        }
345    }
346}
347
348impl WaitCallback {
349    pub fn none() -> EventHandler {
350        EventHandler::None
351    }
352}
353
354bitflags! {
355    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
356    pub struct WaiterOptions: u8 {
357        /// The wait cannot be interrupted by signals.
358        const IGNORE_SIGNALS   = 1 << 0;
359
360        /// The wait is not taking place at a safe point.
361        ///
362        /// For example, the caller might be holding a lock, which could cause a deadlock if the
363        /// waiter triggers delayed releasers.
364        const UNSAFE_CALLSTACK = 1 << 1;
365    }
366}
367
368/// Implementation of Waiter. We put the Waiter data in an Arc so that WaitQueue can tell when the
369/// Waiter has been destroyed by keeping a Weak reference. But this is an implementation detail and
370/// a Waiter should have a single owner. So the Arc is hidden inside Waiter.
371struct PortWaiter {
372    port: PortEvent,
373    callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, // the key 0 is reserved for 'no handler'
374    next_key: AtomicCounter<u64>,
375    options: WaiterOptions,
376
377    /// Collection of wait queues this Waiter is waiting on, so that when the Waiter is Dropped it
378    /// can remove itself from the queues.
379    ///
380    /// This lock is nested inside the WaitQueue.waiters lock.
381    wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
382}
383
384impl PortWaiter {
385    /// Internal constructor.
386    fn new(options: WaiterOptions) -> Arc<Self> {
387        Arc::new(PortWaiter {
388            port: PortEvent::new(),
389            callbacks: Default::default(),
390            next_key: AtomicCounter::<u64>::new(1),
391            options,
392            wait_queues: Default::default(),
393        })
394    }
395
396    /// Waits until the given deadline has passed or the waiter is woken up. See wait_until().
397    fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
398        // This method can block arbitrarily long, possibly waiting for another process. The
399        // current thread should not own any local ref that might delay the release of a resource
400        // while doing so.
401        debug_assert_no_local_temp_ref();
402
403        match self.port.wait(deadline) {
404            PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
405            PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
406            PortWaitResult::Signal { key, observed } => {
407                if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
408                    match callback {
409                        WaitCallback::SignalHandler(handler) => {
410                            if let Some(errno) = handler.handle(observed) {
411                                return Err(errno);
412                            }
413                        }
414                        WaitCallback::EventHandler(_) => {
415                            panic!("wrong type of handler called")
416                        }
417                    }
418                }
419
420                Ok(())
421            }
422            PortWaitResult::TimedOut => error!(ETIMEDOUT),
423        }
424    }
425
426    fn wait_until<L>(
427        self: &Arc<Self>,
428        locked: &mut Locked<L>,
429        current_task: &CurrentTask,
430        run_state: RunState,
431        deadline: zx::MonotonicInstant,
432    ) -> Result<(), Errno>
433    where
434        L: LockEqualOrBefore<FileOpsCore>,
435    {
436        let is_waiting = deadline.into_nanos() > 0;
437
438        let callback = || {
439            // We are susceptible to spurious wakeups because interrupt() posts a message to the port
440            // queue. In addition to more subtle races, there could already be valid messages in the
441            // port queue that will immediately wake us up, leaving the interrupt message in the queue
442            // for subsequent waits (which by then may not have any signals pending) to read.
443            //
444            // It's impossible to non-racily guarantee that a signal is pending so there might always
445            // be an EINTR result here with no signal. But any signal we get when !is_waiting we know is
446            // leftover from before: the top of this function only sets ourself as the
447            // current_task.signals.run_state when there's a nonzero timeout, and that waiter reference
448            // is what is used to signal the interrupt().
449            loop {
450                let wait_result = self.wait_internal(deadline);
451                if let Err(errno) = &wait_result {
452                    if errno.code == EINTR && !is_waiting {
453                        continue; // Spurious wakeup.
454                    }
455                }
456                return wait_result;
457            }
458        };
459
460        // Trigger delayed releaser before blocking if we're at a safe point.
461        //
462        // For example, we cannot trigger delayed releaser if we are holding any locks.
463        if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
464            current_task.trigger_delayed_releaser(locked);
465        }
466
467        if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
468    }
469
470    fn next_key(&self) -> WaitKey {
471        let key = self.next_key.next();
472        // TODO - find a better reaction to wraparound
473        assert!(key != 0, "bad key from u64 wraparound");
474        WaitKey { raw: key }
475    }
476
477    fn register_callback(&self, callback: WaitCallback) -> WaitKey {
478        let key = self.next_key();
479        assert!(
480            self.callbacks.lock().insert(key, callback).is_none(),
481            "unexpected callback already present for key {key:?}"
482        );
483        key
484    }
485
486    fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
487        self.callbacks.lock().remove(&key)
488    }
489
490    fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
491        let callback = WaitCallback::EventHandler(handler);
492        let key = self.register_callback(callback);
493        self.queue_events(&key, WaitEvents::Fd(events));
494    }
495
496    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
497    /// confused with POSIX signals), optionally running a FnOnce. Wait operations will return
498    /// the error code present in the provided SignalHandler.
499    ///
500    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
501    fn wake_on_zircon_signals(
502        self: &Arc<Self>,
503        handle: &dyn zx::AsHandleRef,
504        zx_signals: zx::Signals,
505        handler: SignalHandler,
506    ) -> Result<PortWaitCanceler, zx::Status> {
507        let callback = WaitCallback::SignalHandler(handler);
508        let key = self.register_callback(callback);
509        self.port.object_wait_async(
510            handle,
511            key.raw,
512            zx_signals,
513            zx::WaitAsyncOpts::EDGE_TRIGGERED,
514        )?;
515        Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
516    }
517
518    fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
519        scopeguard::defer! {
520            self.port.notify(NotifyKind::Regular)
521        }
522
523        // Handling user events immediately when they are triggered breaks any
524        // ordering expectations on Linux by batching all starnix events with
525        // the first starnix event even if other events occur on the Fuchsia
526        // platform (and are enqueued to the `zx::Port`) between them. This
527        // ordering does not seem to be load-bearing for applications running on
528        // starnix so we take the divergence in ordering in favour of improved
529        // performance (by minimizing syscalls) when operating on FDs backed by
530        // starnix.
531        //
532        // TODO(https://fxbug.dev/42084319): If we can read a batch of packets
533        // from the `zx::Port`, maybe we can keep the ordering?
534        let Some(callback) = self.remove_callback(key) else {
535            return;
536        };
537
538        match callback {
539            WaitCallback::EventHandler(handler) => {
540                let events = match events {
541                    // If the event is All, signal on all possible fd
542                    // events.
543                    WaitEvents::All => FdEvents::all(),
544                    WaitEvents::Fd(events) => events,
545                    WaitEvents::SignalMask(_) => FdEvents::POLLIN,
546                    WaitEvents::Value(_) => FdEvents::POLLIN,
547                };
548                handler.handle(events)
549            }
550            WaitCallback::SignalHandler(_) => {
551                panic!("wrong type of handler called")
552            }
553        }
554    }
555
556    fn notify(&self) {
557        self.port.notify(NotifyKind::Regular);
558    }
559
560    fn interrupt(&self) {
561        if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
562            return;
563        }
564        self.port.notify(NotifyKind::Interrupt);
565    }
566}
567
568impl std::fmt::Debug for PortWaiter {
569    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570        f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
571    }
572}
573
574/// A type that can put a thread to sleep waiting for a condition.
575#[derive(Debug, Clone)]
576pub struct Waiter {
577    // TODO(https://g-issues.fuchsia.dev/issues/303068424): Avoid `PortWaiter`
578    // when operating purely over FDs backed by starnix.
579    inner: Arc<PortWaiter>,
580}
581
582impl Waiter {
583    /// Create a new waiter.
584    pub fn new() -> Self {
585        Self { inner: PortWaiter::new(WaiterOptions::empty()) }
586    }
587
588    /// Create a new waiter with the given options.
589    pub fn with_options(options: WaiterOptions) -> Self {
590        Self { inner: PortWaiter::new(options) }
591    }
592
593    /// Create a weak reference to this waiter.
594    fn weak(&self) -> WaiterRef {
595        WaiterRef::from_port(&self.inner)
596    }
597
598    /// Freeze the task until the waiter is woken up.
599    ///
600    /// No signal, e.g. EINTR (interrupt), should be received.
601    pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
602    where
603        L: LockEqualOrBefore<FileOpsCore>,
604    {
605        while self
606            .inner
607            .wait_until(
608                locked,
609                current_task,
610                RunState::Frozen(self.clone()),
611                zx::MonotonicInstant::INFINITE,
612            )
613            .is_err()
614        {
615            // Avoid attempting to freeze the task if there is a pending SIGKILL.
616            if current_task.read().has_signal_pending(SIGKILL) {
617                break;
618            }
619            // Ignore spurious wakeups from the [`PortEvent.futex`]
620        }
621    }
622
623    /// Wait until the waiter is woken up.
624    ///
625    /// If the wait is interrupted (see [`Waiter::interrupt`]), this function returns EINTR.
626    pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
627    where
628        L: LockEqualOrBefore<FileOpsCore>,
629    {
630        self.inner.wait_until(
631            locked,
632            current_task,
633            RunState::Waiter(WaiterRef::from_port(&self.inner)),
634            zx::MonotonicInstant::INFINITE,
635        )
636    }
637
638    /// Wait until the given deadline has passed or the waiter is woken up.
639    ///
640    /// If the wait deadline is nonzero and is interrupted (see [`Waiter::interrupt`]), this
641    /// function returns EINTR. Callers must take special care not to lose any accumulated data or
642    /// local state when EINTR is received as this is a normal and recoverable situation.
643    ///
644    /// Using a 0 deadline (no waiting, useful for draining pending events) will not wait and is
645    /// guaranteed not to issue EINTR.
646    ///
647    /// It the timeout elapses with no events, this function returns ETIMEDOUT.
648    ///
649    /// Processes at most one event. If the caller is interested in draining the events, it should
650    /// repeatedly call this function with a 0 deadline until it reports ETIMEDOUT. (This case is
651    /// why a 0 deadline must not return EINTR, as previous calls to wait_until() may have
652    /// accumulated state that would be lost when returning EINTR to userspace.)
653    ///
654    /// It is up to the caller (the "waiter") to make sure that it synchronizes with any object
655    /// that triggers an event (the "notifier"). This `Waiter` does not provide any synchronization
656    /// itself. Note that synchronization between the "waiter" the "notifier" may be provided by
657    /// the [`EventHandler`] used to handle an event iff the waiter observes the side-effects of
658    /// the handler (e.g. reading the ready list modified by [`EventHandler::Enqueue`] or
659    /// [`EventHandler::EnqueueOnce`]).
660    pub fn wait_until<L>(
661        &self,
662        locked: &mut Locked<L>,
663        current_task: &CurrentTask,
664        deadline: zx::MonotonicInstant,
665    ) -> Result<(), Errno>
666    where
667        L: LockEqualOrBefore<FileOpsCore>,
668    {
669        self.inner.wait_until(
670            locked,
671            current_task,
672            RunState::Waiter(WaiterRef::from_port(&self.inner)),
673            deadline,
674        )
675    }
676
677    fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
678        WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
679    }
680
681    fn create_wait_entry_with_handler(
682        &self,
683        filter: WaitEvents,
684        handler: EventHandler,
685    ) -> WaitEntry {
686        let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
687        WaitEntry { waiter: self.weak(), filter, key }
688    }
689
690    pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
691        self.inner.wake_immediately(events, handler);
692    }
693
694    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
695    /// confused with POSIX signals), optionally running a FnOnce.
696    ///
697    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
698    pub fn wake_on_zircon_signals(
699        &self,
700        handle: &dyn zx::AsHandleRef,
701        zx_signals: zx::Signals,
702        handler: SignalHandler,
703    ) -> Result<PortWaitCanceler, zx::Status> {
704        self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
705    }
706
707    /// Return a WaitCanceler representing a wait that will never complete. Useful for stub
708    /// implementations that should block forever even though a real implementation would wake up
709    /// eventually.
710    pub fn fake_wait(&self) -> WaitCanceler {
711        WaitCanceler::new_noop()
712    }
713
714    // Notify the waiter to wake it up without signalling any events.
715    pub fn notify(&self) {
716        self.inner.notify();
717    }
718
719    /// Interrupt the waiter to deliver a signal. The wait operation will return EINTR, and a
720    /// typical caller should then unwind to the syscall dispatch loop to let the signal be
721    /// processed. See wait_until() for more details.
722    ///
723    /// Ignored if the waiter was created with new_ignoring_signals().
724    pub fn interrupt(&self) {
725        self.inner.interrupt();
726    }
727}
728
729impl Drop for Waiter {
730    fn drop(&mut self) {
731        // Delete ourselves from each wait queue we know we're on to prevent Weak references to
732        // ourself from sticking around forever.
733        let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
734        for wait_queue in wait_queues {
735            if let Some(wait_queue) = wait_queue.upgrade() {
736                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
737            }
738        }
739    }
740}
741
742impl Default for Waiter {
743    fn default() -> Self {
744        Self::new()
745    }
746}
747
748impl PartialEq for Waiter {
749    fn eq(&self, other: &Self) -> bool {
750        Arc::ptr_eq(&self.inner, &other.inner)
751    }
752}
753
754pub struct SimpleWaiter {
755    event: Arc<InterruptibleEvent>,
756    wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
757}
758
759impl SimpleWaiter {
760    pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
761        (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
762    }
763}
764
765impl Drop for SimpleWaiter {
766    fn drop(&mut self) {
767        for wait_queue in &self.wait_queues {
768            if let Some(wait_queue) = wait_queue.upgrade() {
769                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
770            }
771        }
772    }
773}
774
775#[derive(Debug, Clone)]
776enum WaiterKind {
777    Port(Weak<PortWaiter>),
778    Event(Weak<InterruptibleEvent>),
779    AbortHandle(Weak<futures::stream::AbortHandle>),
780}
781
782impl Default for WaiterKind {
783    fn default() -> Self {
784        WaiterKind::Port(Default::default())
785    }
786}
787
788/// A weak reference to a Waiter. Intended for holding in wait queues or stashing elsewhere for
789/// calling queue_events later.
790#[derive(Debug, Default, Clone)]
791pub struct WaiterRef(WaiterKind);
792
793impl WaiterRef {
794    fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
795        WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
796    }
797
798    fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
799        WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
800    }
801
802    pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
803        WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
804    }
805
806    pub fn is_valid(&self) -> bool {
807        match &self.0 {
808            WaiterKind::Port(waiter) => waiter.strong_count() != 0,
809            WaiterKind::Event(event) => event.strong_count() != 0,
810            WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
811        }
812    }
813
814    pub fn interrupt(&self) {
815        match &self.0 {
816            WaiterKind::Port(waiter) => {
817                if let Some(waiter) = waiter.upgrade() {
818                    waiter.interrupt();
819                }
820            }
821            WaiterKind::Event(event) => {
822                if let Some(event) = event.upgrade() {
823                    event.interrupt();
824                }
825            }
826            WaiterKind::AbortHandle(handle) => {
827                if let Some(handle) = handle.upgrade() {
828                    handle.abort();
829                }
830            }
831        }
832    }
833
834    fn remove_callback(&self, key: &WaitKey) {
835        match &self.0 {
836            WaiterKind::Port(waiter) => {
837                if let Some(waiter) = waiter.upgrade() {
838                    waiter.remove_callback(key);
839                }
840            }
841            _ => (),
842        }
843    }
844
845    /// Attempts to upgrade a waiter ref to a notifiable ref. If the waiter ref is no
846    /// longer valid, returns None.
847    fn upgrade_notifiable(&self) -> Option<NotifiableRef> {
848        match &self.0 {
849            WaiterKind::Port(waiter) => {
850                if let Some(waiter) = waiter.upgrade() {
851                    return Some(NotifiableRef::Port(waiter));
852                }
853            }
854            WaiterKind::Event(event) => {
855                if let Some(event) = event.upgrade() {
856                    return Some(NotifiableRef::Event(event));
857                }
858            }
859            WaiterKind::AbortHandle(handle) => {
860                if let Some(handle) = handle.upgrade() {
861                    return Some(NotifiableRef::AbortHandle(handle));
862                }
863            }
864        }
865        None
866    }
867
868    /// Called by the WaitQueue when this waiter is about to be removed from the queue.
869    ///
870    /// TODO(abarth): This function does not appear to be called when the WaitQueue is dropped,
871    /// which appears to be a leak.
872    fn will_remove_from_wait_queue(&self, key: &WaitKey) {
873        match &self.0 {
874            WaiterKind::Port(waiter) => {
875                if let Some(waiter) = waiter.upgrade() {
876                    waiter.wait_queues.lock().remove(key);
877                }
878            }
879            _ => (),
880        }
881    }
882}
883
884impl PartialEq<Waiter> for WaiterRef {
885    fn eq(&self, other: &Waiter) -> bool {
886        match &self.0 {
887            WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
888            _ => false,
889        }
890    }
891}
892
893impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
894    fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
895        match &self.0 {
896            WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
897            _ => false,
898        }
899    }
900}
901
902impl PartialEq for WaiterRef {
903    fn eq(&self, other: &WaiterRef) -> bool {
904        match (&self.0, &other.0) {
905            (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
906            (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
907            (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
908            _ => false,
909        }
910    }
911}
912
913impl NotifiableRef {
914    fn notify(&self, key: &WaitKey, events: WaitEvents) {
915        match self {
916            NotifiableRef::Port(port_waiter) => port_waiter.queue_events(key, events),
917            NotifiableRef::Event(interruptible_event) => interruptible_event.notify(),
918            NotifiableRef::AbortHandle(handle) => handle.abort(),
919        }
920    }
921}
922
923/// A list of waiters waiting for some event.
924///
925/// For events that are generated inside Starnix, we walk the wait queue
926/// on the thread that triggered the event to notify the waiters that the event
927/// has occurred. The waiters will then wake up on their own thread to handle
928/// the event.
929#[derive(Default, Debug, Clone)]
930pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
931
932#[derive(Debug)]
933struct WaitEntryWithId {
934    entry: WaitEntry,
935    /// The ID use to uniquely identify this wait entry even if it shares the
936    /// key used in the wait queue's [`Slab`] with another wait entry since a
937    /// slab's keys are recycled.
938    id: u64,
939}
940
941struct WaitEntryId {
942    key: usize,
943    id: u64,
944}
945
946#[derive(Default, Debug)]
947struct WaitQueueImpl {
948    /// Holds the next ID value to use when adding a new `WaitEntry` to the
949    /// waiters (dense) map.
950    ///
951    /// A [`Slab`]s keys are recycled so we use the ID to uniquely identify a
952    /// wait entry.
953    next_wait_entry_id: u64,
954    /// The list of waiters.
955    ///
956    /// The waiter's wait_queues lock is nested inside this lock.
957    waiters: Slab<WaitEntryWithId>,
958}
959
960/// An entry in a WaitQueue.
961#[derive(Debug)]
962struct WaitEntry {
963    /// The waiter that is waking for the FdEvent.
964    waiter: WaiterRef,
965
966    /// The events that the waiter is waiting for.
967    filter: WaitEvents,
968
969    /// key for cancelling and queueing events
970    key: WaitKey,
971}
972
973impl WaitQueue {
974    fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
975        let mut wait_queue = self.0.lock();
976        let id = wait_queue
977            .next_wait_entry_id
978            .checked_add(1)
979            .expect("all possible wait entry ID values exhausted");
980        wait_queue.next_wait_entry_id = id;
981        WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
982    }
983
984    /// Establish a wait for the given entry.
985    ///
986    /// The waiter will be notified when an event matching the entry occurs.
987    ///
988    /// This function does not actually block the waiter. To block the waiter,
989    /// call the [`Waiter::wait`] function on the waiter.
990    ///
991    /// Returns a `WaitCanceler` that can be used to cancel the wait.
992    fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
993        let wait_key = entry.key;
994        let waiter_id = self.add_waiter(entry);
995        let wait_queue = Arc::downgrade(&self.0);
996        waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
997        WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
998            wait_queue,
999            waiter: waiter.weak(),
1000            wait_key,
1001            waiter_id,
1002        }))
1003    }
1004
1005    /// Establish a wait for the given value event.
1006    ///
1007    /// The waiter will be notified when an event with the same value occurs.
1008    ///
1009    /// This function does not actually block the waiter. To block the waiter,
1010    /// call the [`Waiter::wait`] function on the waiter.
1011    ///
1012    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1013    pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
1014        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1015    }
1016
1017    /// Establish a wait for the given value event with an associated handler.
1018    ///
1019    /// The waiter will be notified when an event with the same value occurs, triggering the handler.
1020    ///
1021    /// This function does not actually block the waiter. To block the waiter,
1022    /// call the [`Waiter::wait`] function on the waiter.
1023    ///
1024    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1025    pub fn wait_async_value_with_handler(
1026        &self,
1027        waiter: &Waiter,
1028        value: u64,
1029        handler: EventHandler,
1030    ) -> WaitCanceler {
1031        let entry = waiter.create_wait_entry_with_handler(WaitEvents::Value(value), handler);
1032        self.wait_async_entry(waiter, entry)
1033    }
1034
1035    /// Establish a wait for the given FdEvents.
1036    ///
1037    /// The waiter will be notified when an event matching the `events` occurs.
1038    ///
1039    /// This function does not actually block the waiter. To block the waiter,
1040    /// call the [`Waiter::wait`] function on the waiter.
1041    ///
1042    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1043    pub fn wait_async_fd_events(
1044        &self,
1045        waiter: &Waiter,
1046        events: FdEvents,
1047        handler: EventHandler,
1048    ) -> WaitCanceler {
1049        let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1050        self.wait_async_entry(waiter, entry)
1051    }
1052
1053    /// Establish a wait for a particular signal mask.
1054    ///
1055    /// The waiter will be notified when a signal in the mask is received.
1056    ///
1057    /// This function does not actually block the waiter. To block the waiter,
1058    /// call the [`Waiter::wait`] function on the waiter.
1059    ///
1060    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1061    pub fn wait_async_signal_mask(
1062        &self,
1063        waiter: &Waiter,
1064        mask: SigSet,
1065        handler: EventHandler,
1066    ) -> WaitCanceler {
1067        let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1068        self.wait_async_entry(waiter, entry)
1069    }
1070
1071    /// Establish a wait for any event.
1072    ///
1073    /// The waiter will be notified when any event occurs.
1074    ///
1075    /// This function does not actually block the waiter. To block the waiter,
1076    /// call the [`Waiter::wait`] function on the waiter.
1077    ///
1078    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1079    pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1080        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1081    }
1082
1083    pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1084        let entry = WaitEntry {
1085            waiter: WaiterRef::from_event(&waiter.event),
1086            filter: WaitEvents::All,
1087            key: Default::default(),
1088        };
1089        waiter.wait_queues.push(Arc::downgrade(&self.0));
1090        self.add_waiter(entry);
1091    }
1092
1093    fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1094        if let WaitEvents::Fd(ref mut fd_events) = events {
1095            *fd_events = fd_events.add_equivalent_fd_events();
1096        }
1097        // Store references to waiters ready to be notified locally so that we can drop our waiters
1098        // lock before notifying the waiters. The waiters will need to acquire the waiters lock once
1099        // they wake up in order to remove themselves from the queue and so they might contend
1100        // with this thread for that lock.
1101        // Usually we expect to notify at most a single waiter.
1102        let mut notifiable_refs = SmallVec::<[(NotifiableRef, WaitKey); 1]>::new();
1103        let mut woken = 0;
1104        {
1105            let mut guard = self.0.lock();
1106            guard.waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1107                if limit > 0 && entry.filter.intercept(&events) {
1108                    if let Some(notifiable_ref) = entry.waiter.upgrade_notifiable() {
1109                        limit -= 1;
1110                        woken += 1;
1111                        notifiable_refs.push((notifiable_ref, entry.key));
1112                    }
1113
1114                    entry.waiter.will_remove_from_wait_queue(&entry.key);
1115                    false
1116                } else {
1117                    true
1118                }
1119            });
1120        }
1121        for (notifiable_ref, key) in notifiable_refs {
1122            notifiable_ref.notify(&key, events);
1123        }
1124        woken
1125    }
1126
1127    pub fn notify_fd_events(&self, events: FdEvents) {
1128        self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1129    }
1130
1131    pub fn notify_fd_events_count(&self, events: FdEvents, limit: usize) {
1132        self.notify_events_count(WaitEvents::Fd(events), limit);
1133    }
1134
1135    pub fn notify_signal(&self, signal: &Signal) {
1136        let event = WaitEvents::SignalMask(SigSet::from(*signal));
1137        self.notify_events_count(event, usize::MAX);
1138    }
1139
1140    pub fn notify_value(&self, value: u64) {
1141        self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1142    }
1143
1144    pub fn notify_unordered_count(&self, limit: usize) {
1145        self.notify_events_count(WaitEvents::All, limit);
1146    }
1147
1148    pub fn notify_all(&self) {
1149        self.notify_unordered_count(usize::MAX);
1150    }
1151
1152    /// Returns whether there is no active waiters waiting on this `WaitQueue`.
1153    pub fn is_empty(&self) -> bool {
1154        self.0.lock().waiters.is_empty()
1155    }
1156}
1157
1158/// A wait queue that dispatches events based on the value of an enum.
1159pub struct TypedWaitQueue<T: Into<u64>> {
1160    wait_queue: WaitQueue,
1161    value_type: std::marker::PhantomData<T>,
1162}
1163
1164// We can't #[derive(Default)] on [TypedWaitQueue<T>] as T may not implement the Default trait.
1165impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1166    fn default() -> Self {
1167        Self { wait_queue: Default::default(), value_type: Default::default() }
1168    }
1169}
1170
1171impl<T: Into<u64>> TypedWaitQueue<T> {
1172    pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1173        self.wait_queue.wait_async_value(waiter, value.into())
1174    }
1175
1176    pub fn wait_async_value_with_handler(
1177        &self,
1178        waiter: &Waiter,
1179        value: T,
1180        handler: EventHandler,
1181    ) -> WaitCanceler {
1182        self.wait_queue.wait_async_value_with_handler(waiter, value.into(), handler)
1183    }
1184
1185    pub fn notify_value(&self, value: T) {
1186        self.wait_queue.notify_value(value.into())
1187    }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192    use super::*;
1193    use crate::fs::fuchsia::create_fuchsia_pipe;
1194    use crate::signals::SignalInfo;
1195    use crate::task::TaskFlags;
1196    use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1197    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1198    use crate::vfs::eventfd::{EventFdType, new_eventfd};
1199    use assert_matches::assert_matches;
1200    use starnix_sync::Unlocked;
1201    use starnix_uapi::open_flags::OpenFlags;
1202    use starnix_uapi::signals::SIGUSR1;
1203
1204    const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1205
1206    #[::fuchsia::test]
1207    async fn test_async_wait_exec() {
1208        spawn_kernel_and_run(async |locked, current_task| {
1209            let (local_socket, remote_socket) = zx::Socket::create_stream();
1210            let pipe =
1211                create_fuchsia_pipe(locked, &current_task, remote_socket, OpenFlags::RDWR).unwrap();
1212
1213            const MEM_SIZE: usize = 1024;
1214            let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1215
1216            let test_string = "hello startnix".to_string();
1217            let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1218                Default::default();
1219            let handler = EventHandler::Enqueue {
1220                key: KEY,
1221                queue: queue.clone(),
1222                sought_events: FdEvents::all(),
1223            };
1224            let waiter = Waiter::new();
1225            pipe.wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1226                .expect("wait_async");
1227            let test_string_clone = test_string.clone();
1228
1229            let write_count = AtomicCounter::<usize>::default();
1230            std::thread::scope(|s| {
1231                let thread = s.spawn(|| {
1232                    let test_data = test_string_clone.as_bytes();
1233                    let no_written = local_socket.write(test_data).unwrap();
1234                    assert_eq!(0, write_count.add(no_written));
1235                    assert_eq!(no_written, test_data.len());
1236                });
1237
1238                // this code would block on failure
1239
1240                assert!(queue.lock().is_empty());
1241                waiter.wait(locked, &current_task).unwrap();
1242                thread.join().expect("join thread")
1243            });
1244            queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1245
1246            let read_size = pipe.read(locked, &current_task, &mut output_buffer).unwrap();
1247
1248            let no_written = write_count.get();
1249            assert_eq!(no_written, read_size);
1250
1251            assert_eq!(output_buffer.data(), test_string.as_bytes());
1252        })
1253        .await;
1254    }
1255
1256    #[::fuchsia::test]
1257    async fn test_async_wait_cancel() {
1258        for do_cancel in [true, false] {
1259            spawn_kernel_and_run(async move |locked, current_task| {
1260                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
1261                let waiter = Waiter::new();
1262                let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1263                    Default::default();
1264                let handler = EventHandler::Enqueue {
1265                    key: KEY,
1266                    queue: queue.clone(),
1267                    sought_events: FdEvents::all(),
1268                };
1269                let wait_canceler = event
1270                    .wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1271                    .expect("wait_async");
1272                if do_cancel {
1273                    wait_canceler.cancel();
1274                }
1275                let add_val = 1u64;
1276                assert_eq!(
1277                    event
1278                        .write(
1279                            locked,
1280                            &current_task,
1281                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1282                        )
1283                        .unwrap(),
1284                    std::mem::size_of::<u64>()
1285                );
1286
1287                let wait_result =
1288                    waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO);
1289                let final_count = queue.lock().len();
1290                if do_cancel {
1291                    assert_eq!(wait_result, error!(ETIMEDOUT));
1292                    assert_eq!(0, final_count);
1293                } else {
1294                    assert_eq!(wait_result, Ok(()));
1295                    assert_eq!(1, final_count);
1296                }
1297            })
1298            .await;
1299        }
1300    }
1301
1302    #[::fuchsia::test]
1303    async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1304        spawn_kernel_and_run(async |locked, current_task| {
1305            let wait_queue = WaitQueue::default();
1306            let waiter = Waiter::new();
1307            let wk1 = wait_queue.wait_async(&waiter);
1308            let _wk2 = wait_queue.wait_async(&waiter);
1309            wk1.cancel();
1310            wait_queue.notify_all();
1311            assert!(waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1312        })
1313        .await;
1314    }
1315
1316    #[::fuchsia::test]
1317    async fn multiple_waiters_cancel_one_other_still_notified() {
1318        spawn_kernel_and_run(async |locked, current_task| {
1319            let wait_queue = WaitQueue::default();
1320            let waiter1 = Waiter::new();
1321            let waiter2 = Waiter::new();
1322            let wk1 = wait_queue.wait_async(&waiter1);
1323            let _wk2 = wait_queue.wait_async(&waiter2);
1324            wk1.cancel();
1325            wait_queue.notify_all();
1326            assert!(waiter1.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_err());
1327            assert!(waiter2.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1328        })
1329        .await;
1330    }
1331
1332    #[::fuchsia::test]
1333    async fn test_wait_queue() {
1334        spawn_kernel_and_run(async |locked, current_task| {
1335            let queue = WaitQueue::default();
1336
1337            let waiters = <[Waiter; 3]>::default();
1338            waiters.iter().for_each(|w| {
1339                queue.wait_async(w);
1340            });
1341
1342            let woken = |locked: &mut Locked<Unlocked>| {
1343                waiters
1344                    .iter()
1345                    .filter(|w| {
1346                        w.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok()
1347                    })
1348                    .count()
1349            };
1350
1351            const INITIAL_NOTIFY_COUNT: usize = 2;
1352            let total_waiters = waiters.len();
1353            queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1354            assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1355
1356            // Only the remaining (unnotified) waiters should be notified.
1357            queue.notify_all();
1358            assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1359        })
1360        .await;
1361    }
1362
1363    #[::fuchsia::test]
1364    async fn waiter_kind_abort_handle() {
1365        spawn_kernel_and_run_sync(|_locked, current_task| {
1366            let mut executor = fuchsia_async::TestExecutor::new();
1367            let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1368            let abort_handle = Arc::new(abort_handle);
1369            let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1370
1371            let mut fut = futures::stream::Abortable::new(
1372                futures::future::pending::<()>(),
1373                abort_registration,
1374            );
1375
1376            assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1377
1378            waiter_ref.interrupt();
1379            let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1380                match executor.run_singlethreaded(&mut fut) {
1381                    Ok(()) => unreachable!("future never terminates normally"),
1382                    Err(futures::stream::Aborted) => Ok(()),
1383                }
1384            });
1385
1386            assert_eq!(output, Ok(()));
1387        })
1388        .await;
1389    }
1390
1391    #[::fuchsia::test]
1392    async fn freeze_with_pending_sigusr1() {
1393        spawn_kernel_and_run(async |_locked, current_task| {
1394            {
1395                let mut task_state = current_task.task.write();
1396                let siginfo = SignalInfo::kernel(SIGUSR1);
1397                task_state.enqueue_signal(siginfo);
1398                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1399            }
1400
1401            let output: Result<(), Errno> = current_task
1402                .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1403                    unreachable!("callback should not be called")
1404                });
1405            assert_eq!(output, error!(EINTR));
1406
1407            let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1408            assert_eq!(output, Ok(()));
1409        })
1410        .await;
1411    }
1412
1413    #[::fuchsia::test]
1414    async fn freeze_with_pending_sigkill() {
1415        spawn_kernel_and_run(async |_locked, current_task| {
1416            {
1417                let mut task_state = current_task.task.write();
1418                let siginfo = SignalInfo::kernel(SIGKILL);
1419                task_state.enqueue_signal(siginfo);
1420                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1421            }
1422
1423            let output: Result<(), _> = current_task
1424                .run_in_state(RunState::Frozen(Waiter::new()), move || {
1425                    unreachable!("callback should not be called")
1426                });
1427            assert_eq!(output, error!(EINTR));
1428        })
1429        .await;
1430    }
1431
1432    #[::fuchsia::test]
1433    async fn test_async_typed_wait_value_with_handler() {
1434        spawn_kernel_and_run(async |locked, current_task| {
1435            let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1436                Default::default();
1437            let handler = EventHandler::Enqueue {
1438                key: KEY,
1439                queue: queue.clone(),
1440                sought_events: FdEvents::all(),
1441            };
1442            let waiter = Waiter::new();
1443            let wait_queue = TypedWaitQueue::<u64>::default();
1444
1445            let test_value = 100u64;
1446            let _wait_canceler =
1447                wait_queue.wait_async_value_with_handler(&waiter, test_value, handler);
1448
1449            assert!(queue.lock().is_empty());
1450
1451            // Notify wrong value
1452            wait_queue.notify_value(test_value + 1);
1453            assert!(queue.lock().is_empty());
1454
1455            // Notify right value
1456            wait_queue.notify_value(test_value);
1457
1458            waiter.wait(locked, &current_task).expect("wait failed");
1459
1460            // Result delivered via POLLIN logic in central dispatcher
1461            let ready_items = queue.lock();
1462            assert_eq!(ready_items.len(), 1);
1463            assert!(ready_items[0].events.contains(FdEvents::POLLIN));
1464        })
1465        .await;
1466    }
1467}