Skip to main content

starnix_core/task/
waiter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use bitflags::bitflags;
9use futures::stream::AbortHandle;
10use slab::Slab;
11use smallvec::SmallVec;
12use starnix_lifecycle::{AtomicU64Counter, AtomicUsizeCounter};
13use starnix_sync::{
14    EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
15    PortEvent, PortWaitResult,
16};
17use starnix_types::ownership::debug_assert_no_local_temp_ref;
18use starnix_uapi::error;
19use starnix_uapi::errors::{EINTR, Errno};
20use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
21use starnix_uapi::vfs::FdEvents;
22use std::collections::{HashMap, VecDeque};
23use std::sync::{Arc, Weak};
24use syncio::zxio::zxio_signals_t;
25use syncio::{ZxioSignals, ZxioWeak};
26
27#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
28pub enum ReadyItemKey {
29    FdNumber(FdNumber),
30    Usize(usize),
31}
32
33impl From<FdNumber> for ReadyItemKey {
34    fn from(v: FdNumber) -> Self {
35        Self::FdNumber(v)
36    }
37}
38
39impl From<usize> for ReadyItemKey {
40    fn from(v: usize) -> Self {
41        Self::Usize(v)
42    }
43}
44
45#[derive(Debug, Copy, Clone)]
46pub struct ReadyItem {
47    pub key: ReadyItemKey,
48    pub events: FdEvents,
49}
50
51#[derive(Clone)]
52pub enum EventHandler {
53    /// Does nothing.
54    ///
55    /// It is up to the waiter to synchronize itself with the notifier if
56    /// synchronization is needed.
57    None,
58
59    /// Enqueues an event to a ready list.
60    ///
61    /// This event handler naturally synchronizes the notifier and notifee
62    /// because of the lock acquired/released when enqueuing the event.
63    Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
64
65    /// Wraps another EventHandler and only triggers it once. Further .handle() calls are ignored.
66    ///
67    /// This is intended for cases like BinderFileObject which need to register
68    /// the same EventHandler on multiple wait queues.
69    HandleOnce(Arc<Mutex<Option<EventHandler>>>),
70
71    /// This handler is an epoll.
72    Epoll(EpollEventHandler),
73}
74
75impl EventHandler {
76    pub fn handle(self, events: FdEvents) {
77        match self {
78            Self::None => {}
79            Self::Enqueue { key, queue, sought_events } => {
80                let events = events & sought_events;
81                queue.lock().push_back(ReadyItem { key, events });
82            }
83            Self::HandleOnce(inner) => {
84                if let Some(inner) = inner.lock().take() {
85                    inner.handle(events);
86                }
87            }
88            Self::Epoll(e) => e.handle(events),
89        }
90    }
91}
92
93pub struct ZxioSignalHandler {
94    pub zxio: ZxioWeak,
95    pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
96}
97
98// The counter is incremented as each handle is signaled; when the counter reaches the handle
99// count, the event handler is called with the given events.
100pub struct ManyZxHandleSignalHandler {
101    pub count: usize,
102    pub counter: Arc<AtomicUsizeCounter>,
103    pub expected_signals: zx::Signals,
104    pub events: FdEvents,
105}
106
107pub enum SignalHandlerInner {
108    None,
109    Zxio(ZxioSignalHandler),
110    ZxHandle(fn(zx::Signals) -> FdEvents),
111    ManyZxHandle(ManyZxHandleSignalHandler),
112}
113
114pub struct SignalHandler {
115    pub inner: SignalHandlerInner,
116    pub event_handler: EventHandler,
117    pub err_code: Option<Errno>,
118}
119
120impl SignalHandler {
121    fn handle(self, signals: zx::Signals) -> Option<Errno> {
122        let SignalHandler { inner, event_handler, err_code } = self;
123        let events = match inner {
124            SignalHandlerInner::None => None,
125            SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
126                if let Some(zxio) = zxio.upgrade() {
127                    Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
128                } else {
129                    None
130                }
131            }
132            SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
133                Some(get_events_from_zx_signals(signals))
134            }
135            SignalHandlerInner::ManyZxHandle(signal_handler) => {
136                if signals.contains(signal_handler.expected_signals) {
137                    let new_count = signal_handler.counter.next() + 1;
138                    assert!(new_count <= signal_handler.count);
139                    if new_count == signal_handler.count {
140                        Some(signal_handler.events)
141                    } else {
142                        None
143                    }
144                } else {
145                    None
146                }
147            }
148        };
149        if let Some(events) = events {
150            event_handler.handle(events)
151        }
152        err_code
153    }
154}
155
156pub enum WaitCallback {
157    SignalHandler(SignalHandler),
158    EventHandler(EventHandler),
159}
160
161struct WaitCancelerQueue {
162    wait_queue: Weak<Mutex<WaitQueueImpl>>,
163    waiter: WaiterRef,
164    wait_key: WaitKey,
165    waiter_id: WaitEntryId,
166}
167
168struct WaitCancelerZxio {
169    zxio: ZxioWeak,
170    inner: PortWaitCanceler,
171}
172
173struct WaitCancelerPort {
174    inner: PortWaitCanceler,
175}
176
177enum WaitCancelerInner {
178    Zxio(WaitCancelerZxio),
179    Queue(WaitCancelerQueue),
180    Port(WaitCancelerPort),
181}
182
183enum NotifiableRef {
184    Port(Arc<PortWaiter>),
185    Event(Arc<InterruptibleEvent>),
186    AbortHandle(Arc<AbortHandle>),
187}
188
189const WAIT_CANCELER_COMMON_SIZE: usize = 2;
190
191/// Return values for wait_async methods.
192///
193/// Calling `cancel` will cancel any running wait.
194///
195/// Does not implement `Clone` or `Copy` so that only a single canceler exists
196/// per wait.
197pub struct WaitCanceler {
198    cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
199}
200
201impl WaitCanceler {
202    fn new_inner(inner: WaitCancelerInner) -> Self {
203        Self { cancellers: smallvec::smallvec![inner] }
204    }
205
206    pub fn new_noop() -> Self {
207        Self { cancellers: Default::default() }
208    }
209
210    pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
211        Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
212    }
213
214    pub fn new_port(inner: PortWaitCanceler) -> Self {
215        Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
216    }
217
218    /// Equivalent to `merge_unbounded`, except that it enforces that the resulting vector of
219    /// cancellers is small enough to avoid being separately allocated on the heap.
220    ///
221    /// If possible, use this function instead of `merge_unbounded`, because it gives us better
222    /// tools to keep this code path optimized.
223    pub fn merge(self, other: Self) -> Self {
224        // Increase `WAIT_CANCELER_COMMON_SIZE` if needed, or remove this assert and allow the
225        // smallvec to allocate.
226        assert!(
227            self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
228            "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
229            WAIT_CANCELER_COMMON_SIZE,
230            self.cancellers.len(),
231            other.cancellers.len()
232        );
233        WaitCanceler::merge_unbounded(self, other)
234    }
235
236    /// Creates a new `WaitCanceler` that is equivalent to canceling both its arguments.
237    pub fn merge_unbounded(
238        Self { mut cancellers }: Self,
239        Self { cancellers: mut other }: Self,
240    ) -> Self {
241        cancellers.append(&mut other);
242        WaitCanceler { cancellers }
243    }
244
245    /// Cancel the pending wait.
246    ///
247    /// Takes `self` by value since a wait can only be canceled once.
248    pub fn cancel(self) {
249        let Self { cancellers } = self;
250        for canceller in cancellers.into_iter().rev() {
251            match canceller {
252                WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
253                    let Some(zxio) = zxio.upgrade() else { return };
254                    let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
255                    inner.cancel();
256                    zxio.wait_end(signals);
257                }
258                WaitCancelerInner::Queue(WaitCancelerQueue {
259                    wait_queue,
260                    waiter,
261                    wait_key,
262                    waiter_id: WaitEntryId { key, id },
263                }) => {
264                    let Some(wait_queue) = wait_queue.upgrade() else { return };
265                    waiter.remove_callback(&wait_key);
266                    waiter.will_remove_from_wait_queue(&wait_key);
267                    let mut wait_queue = wait_queue.lock();
268                    let waiters = &mut wait_queue.waiters;
269                    if let Some(entry) = waiters.get_mut(key) {
270                        // The map of waiters in a wait queue uses a `Slab` which
271                        // recycles keys. To make sure we are removing the right
272                        // entry, make sure the ID value matches what we expect
273                        // to remove.
274                        if entry.id == id {
275                            waiters.remove(key);
276                        }
277                    }
278                }
279                WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
280                    inner.cancel();
281                }
282            }
283        }
284    }
285}
286
287/// Return values for wait_async methods that monitor the state of a handle.
288///
289/// Calling `cancel` will cancel any running wait.
290///
291/// Does not implement `Clone` or `Copy` so that only a single canceler exists
292/// per wait.
293pub struct PortWaitCanceler {
294    waiter: Weak<PortWaiter>,
295    key: WaitKey,
296}
297
298impl PortWaitCanceler {
299    /// Cancel the pending wait.
300    ///
301    /// Takes `self` by value since a wait can only be canceled once.
302    pub fn cancel(self) {
303        let Self { waiter, key } = self;
304        if let Some(waiter) = waiter.upgrade() {
305            let _ = waiter.port.cancel(key.raw);
306            waiter.remove_callback(&key);
307        }
308    }
309}
310
311#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
312struct WaitKey {
313    raw: u64,
314}
315
316/// The different type of event that can be waited on / triggered.
317#[derive(Clone, Copy, Debug)]
318enum WaitEvents {
319    /// All event: a wait on `All` will be woken up by all event, and a trigger on `All` will wake
320    /// every waiter.
321    All,
322    /// Wait on the set of FdEvents.
323    Fd(FdEvents),
324    /// Wait for the specified value.
325    Value(u64),
326    /// Wait for a signal in a specific mask to be received by the task.
327    SignalMask(SigSet),
328}
329
330impl WaitEvents {
331    /// Returns whether a wait on `self` should be woken up by `other`.
332    fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
333        match (self, other) {
334            (Self::All, _) | (_, Self::All) => true,
335            (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
336            (Self::Value(v1), Self::Value(v2)) => v1 == v2,
337            // A SignalMask event can only be intercepted by another SignalMask event.
338            (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
339            _ => false,
340        }
341    }
342}
343
344impl WaitCallback {
345    pub fn none() -> EventHandler {
346        EventHandler::None
347    }
348}
349
350bitflags! {
351    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
352    pub struct WaiterOptions: u8 {
353        /// The wait cannot be interrupted by signals.
354        const IGNORE_SIGNALS = 1;
355
356        /// The wait is not taking place at a safe point.
357        ///
358        /// For example, the caller might be holding a lock, which could cause a deadlock if the
359        /// waiter triggers delayed releasers.
360        const UNSAFE_CALLSTACK = 2;
361    }
362}
363
364/// Implementation of Waiter. We put the Waiter data in an Arc so that WaitQueue can tell when the
365/// Waiter has been destroyed by keeping a Weak reference. But this is an implementation detail and
366/// a Waiter should have a single owner. So the Arc is hidden inside Waiter.
367struct PortWaiter {
368    port: PortEvent,
369    callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, // the key 0 is reserved for 'no handler'
370    next_key: AtomicU64Counter,
371    options: WaiterOptions,
372
373    /// Collection of wait queues this Waiter is waiting on, so that when the Waiter is Dropped it
374    /// can remove itself from the queues.
375    ///
376    /// This lock is nested inside the WaitQueue.waiters lock.
377    wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
378}
379
380impl PortWaiter {
381    /// Internal constructor.
382    fn new(options: WaiterOptions) -> Arc<Self> {
383        Arc::new(PortWaiter {
384            port: PortEvent::new(),
385            callbacks: Default::default(),
386            next_key: AtomicU64Counter::new(1),
387            options,
388            wait_queues: Default::default(),
389        })
390    }
391
392    /// Waits until the given deadline has passed or the waiter is woken up. See wait_until().
393    fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
394        // This method can block arbitrarily long, possibly waiting for another process. The
395        // current thread should not own any local ref that might delay the release of a resource
396        // while doing so.
397        debug_assert_no_local_temp_ref();
398
399        match self.port.wait(deadline) {
400            PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
401            PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
402            PortWaitResult::Signal { key, observed } => {
403                if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
404                    match callback {
405                        WaitCallback::SignalHandler(handler) => {
406                            if let Some(errno) = handler.handle(observed) {
407                                return Err(errno);
408                            }
409                        }
410                        WaitCallback::EventHandler(_) => {
411                            panic!("wrong type of handler called")
412                        }
413                    }
414                }
415
416                Ok(())
417            }
418            PortWaitResult::TimedOut => error!(ETIMEDOUT),
419        }
420    }
421
422    fn wait_until<L>(
423        self: &Arc<Self>,
424        locked: &mut Locked<L>,
425        current_task: &CurrentTask,
426        run_state: RunState,
427        deadline: zx::MonotonicInstant,
428    ) -> Result<(), Errno>
429    where
430        L: LockEqualOrBefore<FileOpsCore>,
431    {
432        let is_waiting = deadline.into_nanos() > 0;
433
434        let callback = || {
435            // We are susceptible to spurious wakeups because interrupt() posts a message to the port
436            // queue. In addition to more subtle races, there could already be valid messages in the
437            // port queue that will immediately wake us up, leaving the interrupt message in the queue
438            // for subsequent waits (which by then may not have any signals pending) to read.
439            //
440            // It's impossible to non-racily guarantee that a signal is pending so there might always
441            // be an EINTR result here with no signal. But any signal we get when !is_waiting we know is
442            // leftover from before: the top of this function only sets ourself as the
443            // current_task.signals.run_state when there's a nonzero timeout, and that waiter reference
444            // is what is used to signal the interrupt().
445            loop {
446                let wait_result = self.wait_internal(deadline);
447                if let Err(errno) = &wait_result {
448                    if errno.code == EINTR && !is_waiting {
449                        continue; // Spurious wakeup.
450                    }
451                }
452                return wait_result;
453            }
454        };
455
456        // Trigger delayed releaser before blocking if we're at a safe point.
457        //
458        // For example, we cannot trigger delayed releaser if we are holding any locks.
459        if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
460            current_task.trigger_delayed_releaser(locked);
461        }
462
463        if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
464    }
465
466    fn next_key(&self) -> WaitKey {
467        let key = self.next_key.next();
468        // TODO - find a better reaction to wraparound
469        assert!(key != 0, "bad key from u64 wraparound");
470        WaitKey { raw: key }
471    }
472
473    fn register_callback(&self, callback: WaitCallback) -> WaitKey {
474        let key = self.next_key();
475        assert!(
476            self.callbacks.lock().insert(key, callback).is_none(),
477            "unexpected callback already present for key {key:?}"
478        );
479        key
480    }
481
482    fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
483        self.callbacks.lock().remove(&key)
484    }
485
486    fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
487        let callback = WaitCallback::EventHandler(handler);
488        let key = self.register_callback(callback);
489        self.queue_events(&key, WaitEvents::Fd(events));
490    }
491
492    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
493    /// confused with POSIX signals), optionally running a FnOnce. Wait operations will return
494    /// the error code present in the provided SignalHandler.
495    ///
496    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
497    fn wake_on_zircon_signals(
498        self: &Arc<Self>,
499        handle: &dyn zx::AsHandleRef,
500        zx_signals: zx::Signals,
501        handler: SignalHandler,
502    ) -> Result<PortWaitCanceler, zx::Status> {
503        let callback = WaitCallback::SignalHandler(handler);
504        let key = self.register_callback(callback);
505        self.port.object_wait_async(
506            handle,
507            key.raw,
508            zx_signals,
509            zx::WaitAsyncOpts::EDGE_TRIGGERED,
510        )?;
511        Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
512    }
513
514    fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
515        scopeguard::defer! {
516            self.port.notify(NotifyKind::Regular)
517        }
518
519        // Handling user events immediately when they are triggered breaks any
520        // ordering expectations on Linux by batching all starnix events with
521        // the first starnix event even if other events occur on the Fuchsia
522        // platform (and are enqueued to the `zx::Port`) between them. This
523        // ordering does not seem to be load-bearing for applications running on
524        // starnix so we take the divergence in ordering in favour of improved
525        // performance (by minimizing syscalls) when operating on FDs backed by
526        // starnix.
527        //
528        // TODO(https://fxbug.dev/42084319): If we can read a batch of packets
529        // from the `zx::Port`, maybe we can keep the ordering?
530        let Some(callback) = self.remove_callback(key) else {
531            return;
532        };
533
534        match callback {
535            WaitCallback::EventHandler(handler) => {
536                let events = match events {
537                    // If the event is All, signal on all possible fd
538                    // events.
539                    WaitEvents::All => FdEvents::all(),
540                    WaitEvents::Fd(events) => events,
541                    WaitEvents::SignalMask(_) => FdEvents::POLLIN,
542                    _ => panic!("wrong type of handler called: {events:?}"),
543                };
544                handler.handle(events)
545            }
546            WaitCallback::SignalHandler(_) => {
547                panic!("wrong type of handler called")
548            }
549        }
550    }
551
552    fn notify(&self) {
553        self.port.notify(NotifyKind::Regular);
554    }
555
556    fn interrupt(&self) {
557        if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
558            return;
559        }
560        self.port.notify(NotifyKind::Interrupt);
561    }
562}
563
564impl std::fmt::Debug for PortWaiter {
565    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566        f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
567    }
568}
569
570/// A type that can put a thread to sleep waiting for a condition.
571#[derive(Debug, Clone)]
572pub struct Waiter {
573    // TODO(https://g-issues.fuchsia.dev/issues/303068424): Avoid `PortWaiter`
574    // when operating purely over FDs backed by starnix.
575    inner: Arc<PortWaiter>,
576}
577
578impl Waiter {
579    /// Create a new waiter.
580    pub fn new() -> Self {
581        Self { inner: PortWaiter::new(WaiterOptions::empty()) }
582    }
583
584    /// Create a new waiter with the given options.
585    pub fn with_options(options: WaiterOptions) -> Self {
586        Self { inner: PortWaiter::new(options) }
587    }
588
589    /// Create a weak reference to this waiter.
590    fn weak(&self) -> WaiterRef {
591        WaiterRef::from_port(&self.inner)
592    }
593
594    /// Freeze the task until the waiter is woken up.
595    ///
596    /// No signal, e.g. EINTR (interrupt), should be received.
597    pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
598    where
599        L: LockEqualOrBefore<FileOpsCore>,
600    {
601        while self
602            .inner
603            .wait_until(
604                locked,
605                current_task,
606                RunState::Frozen(self.clone()),
607                zx::MonotonicInstant::INFINITE,
608            )
609            .is_err()
610        {
611            // Avoid attempting to freeze the task if there is a pending SIGKILL.
612            if current_task.read().has_signal_pending(SIGKILL) {
613                break;
614            }
615            // Ignore spurious wakeups from the [`PortEvent.futex`]
616        }
617    }
618
619    /// Wait until the waiter is woken up.
620    ///
621    /// If the wait is interrupted (see [`Waiter::interrupt`]), this function returns EINTR.
622    pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
623    where
624        L: LockEqualOrBefore<FileOpsCore>,
625    {
626        self.inner.wait_until(
627            locked,
628            current_task,
629            RunState::Waiter(WaiterRef::from_port(&self.inner)),
630            zx::MonotonicInstant::INFINITE,
631        )
632    }
633
634    /// Wait until the given deadline has passed or the waiter is woken up.
635    ///
636    /// If the wait deadline is nonzero and is interrupted (see [`Waiter::interrupt`]), this
637    /// function returns EINTR. Callers must take special care not to lose any accumulated data or
638    /// local state when EINTR is received as this is a normal and recoverable situation.
639    ///
640    /// Using a 0 deadline (no waiting, useful for draining pending events) will not wait and is
641    /// guaranteed not to issue EINTR.
642    ///
643    /// It the timeout elapses with no events, this function returns ETIMEDOUT.
644    ///
645    /// Processes at most one event. If the caller is interested in draining the events, it should
646    /// repeatedly call this function with a 0 deadline until it reports ETIMEDOUT. (This case is
647    /// why a 0 deadline must not return EINTR, as previous calls to wait_until() may have
648    /// accumulated state that would be lost when returning EINTR to userspace.)
649    ///
650    /// It is up to the caller (the "waiter") to make sure that it synchronizes with any object
651    /// that triggers an event (the "notifier"). This `Waiter` does not provide any synchronization
652    /// itself. Note that synchronization between the "waiter" the "notifier" may be provided by
653    /// the [`EventHandler`] used to handle an event iff the waiter observes the side-effects of
654    /// the handler (e.g. reading the ready list modified by [`EventHandler::Enqueue`] or
655    /// [`EventHandler::EnqueueOnce`]).
656    pub fn wait_until<L>(
657        &self,
658        locked: &mut Locked<L>,
659        current_task: &CurrentTask,
660        deadline: zx::MonotonicInstant,
661    ) -> Result<(), Errno>
662    where
663        L: LockEqualOrBefore<FileOpsCore>,
664    {
665        self.inner.wait_until(
666            locked,
667            current_task,
668            RunState::Waiter(WaiterRef::from_port(&self.inner)),
669            deadline,
670        )
671    }
672
673    fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
674        WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
675    }
676
677    fn create_wait_entry_with_handler(
678        &self,
679        filter: WaitEvents,
680        handler: EventHandler,
681    ) -> WaitEntry {
682        let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
683        WaitEntry { waiter: self.weak(), filter, key }
684    }
685
686    pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
687        self.inner.wake_immediately(events, handler);
688    }
689
690    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
691    /// confused with POSIX signals), optionally running a FnOnce.
692    ///
693    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
694    pub fn wake_on_zircon_signals(
695        &self,
696        handle: &dyn zx::AsHandleRef,
697        zx_signals: zx::Signals,
698        handler: SignalHandler,
699    ) -> Result<PortWaitCanceler, zx::Status> {
700        self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
701    }
702
703    /// Return a WaitCanceler representing a wait that will never complete. Useful for stub
704    /// implementations that should block forever even though a real implementation would wake up
705    /// eventually.
706    pub fn fake_wait(&self) -> WaitCanceler {
707        WaitCanceler::new_noop()
708    }
709
710    // Notify the waiter to wake it up without signalling any events.
711    pub fn notify(&self) {
712        self.inner.notify();
713    }
714
715    /// Interrupt the waiter to deliver a signal. The wait operation will return EINTR, and a
716    /// typical caller should then unwind to the syscall dispatch loop to let the signal be
717    /// processed. See wait_until() for more details.
718    ///
719    /// Ignored if the waiter was created with new_ignoring_signals().
720    pub fn interrupt(&self) {
721        self.inner.interrupt();
722    }
723}
724
725impl Drop for Waiter {
726    fn drop(&mut self) {
727        // Delete ourselves from each wait queue we know we're on to prevent Weak references to
728        // ourself from sticking around forever.
729        let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
730        for wait_queue in wait_queues {
731            if let Some(wait_queue) = wait_queue.upgrade() {
732                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
733            }
734        }
735    }
736}
737
738impl Default for Waiter {
739    fn default() -> Self {
740        Self::new()
741    }
742}
743
744impl PartialEq for Waiter {
745    fn eq(&self, other: &Self) -> bool {
746        Arc::ptr_eq(&self.inner, &other.inner)
747    }
748}
749
750pub struct SimpleWaiter {
751    event: Arc<InterruptibleEvent>,
752    wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
753}
754
755impl SimpleWaiter {
756    pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
757        (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
758    }
759}
760
761impl Drop for SimpleWaiter {
762    fn drop(&mut self) {
763        for wait_queue in &self.wait_queues {
764            if let Some(wait_queue) = wait_queue.upgrade() {
765                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
766            }
767        }
768    }
769}
770
771#[derive(Debug, Clone)]
772enum WaiterKind {
773    Port(Weak<PortWaiter>),
774    Event(Weak<InterruptibleEvent>),
775    AbortHandle(Weak<futures::stream::AbortHandle>),
776}
777
778impl Default for WaiterKind {
779    fn default() -> Self {
780        WaiterKind::Port(Default::default())
781    }
782}
783
784/// A weak reference to a Waiter. Intended for holding in wait queues or stashing elsewhere for
785/// calling queue_events later.
786#[derive(Debug, Default, Clone)]
787pub struct WaiterRef(WaiterKind);
788
789impl WaiterRef {
790    fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
791        WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
792    }
793
794    fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
795        WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
796    }
797
798    pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
799        WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
800    }
801
802    pub fn is_valid(&self) -> bool {
803        match &self.0 {
804            WaiterKind::Port(waiter) => waiter.strong_count() != 0,
805            WaiterKind::Event(event) => event.strong_count() != 0,
806            WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
807        }
808    }
809
810    pub fn interrupt(&self) {
811        match &self.0 {
812            WaiterKind::Port(waiter) => {
813                if let Some(waiter) = waiter.upgrade() {
814                    waiter.interrupt();
815                }
816            }
817            WaiterKind::Event(event) => {
818                if let Some(event) = event.upgrade() {
819                    event.interrupt();
820                }
821            }
822            WaiterKind::AbortHandle(handle) => {
823                if let Some(handle) = handle.upgrade() {
824                    handle.abort();
825                }
826            }
827        }
828    }
829
830    fn remove_callback(&self, key: &WaitKey) {
831        match &self.0 {
832            WaiterKind::Port(waiter) => {
833                if let Some(waiter) = waiter.upgrade() {
834                    waiter.remove_callback(key);
835                }
836            }
837            _ => (),
838        }
839    }
840
841    /// Attempts to upgrade a waiter ref to a notifiable ref. If the waiter ref is no
842    /// longer valid, returns None.
843    fn upgrade_notifiable(&self) -> Option<NotifiableRef> {
844        match &self.0 {
845            WaiterKind::Port(waiter) => {
846                if let Some(waiter) = waiter.upgrade() {
847                    return Some(NotifiableRef::Port(waiter));
848                }
849            }
850            WaiterKind::Event(event) => {
851                if let Some(event) = event.upgrade() {
852                    return Some(NotifiableRef::Event(event));
853                }
854            }
855            WaiterKind::AbortHandle(handle) => {
856                if let Some(handle) = handle.upgrade() {
857                    return Some(NotifiableRef::AbortHandle(handle));
858                }
859            }
860        }
861        None
862    }
863
864    /// Called by the WaitQueue when this waiter is about to be removed from the queue.
865    ///
866    /// TODO(abarth): This function does not appear to be called when the WaitQueue is dropped,
867    /// which appears to be a leak.
868    fn will_remove_from_wait_queue(&self, key: &WaitKey) {
869        match &self.0 {
870            WaiterKind::Port(waiter) => {
871                if let Some(waiter) = waiter.upgrade() {
872                    waiter.wait_queues.lock().remove(key);
873                }
874            }
875            _ => (),
876        }
877    }
878}
879
880impl PartialEq<Waiter> for WaiterRef {
881    fn eq(&self, other: &Waiter) -> bool {
882        match &self.0 {
883            WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
884            _ => false,
885        }
886    }
887}
888
889impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
890    fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
891        match &self.0 {
892            WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
893            _ => false,
894        }
895    }
896}
897
898impl PartialEq for WaiterRef {
899    fn eq(&self, other: &WaiterRef) -> bool {
900        match (&self.0, &other.0) {
901            (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
902            (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
903            (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
904            _ => false,
905        }
906    }
907}
908
909impl NotifiableRef {
910    fn notify(&self, key: &WaitKey, events: WaitEvents) {
911        match self {
912            NotifiableRef::Port(port_waiter) => port_waiter.queue_events(key, events),
913            NotifiableRef::Event(interruptible_event) => interruptible_event.notify(),
914            NotifiableRef::AbortHandle(handle) => handle.abort(),
915        }
916    }
917}
918
919/// A list of waiters waiting for some event.
920///
921/// For events that are generated inside Starnix, we walk the wait queue
922/// on the thread that triggered the event to notify the waiters that the event
923/// has occurred. The waiters will then wake up on their own thread to handle
924/// the event.
925#[derive(Default, Debug, Clone)]
926pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
927
928#[derive(Debug)]
929struct WaitEntryWithId {
930    entry: WaitEntry,
931    /// The ID use to uniquely identify this wait entry even if it shares the
932    /// key used in the wait queue's [`Slab`] with another wait entry since a
933    /// slab's keys are recycled.
934    id: u64,
935}
936
937struct WaitEntryId {
938    key: usize,
939    id: u64,
940}
941
942#[derive(Default, Debug)]
943struct WaitQueueImpl {
944    /// Holds the next ID value to use when adding a new `WaitEntry` to the
945    /// waiters (dense) map.
946    ///
947    /// A [`Slab`]s keys are recycled so we use the ID to uniquely identify a
948    /// wait entry.
949    next_wait_entry_id: u64,
950    /// The list of waiters.
951    ///
952    /// The waiter's wait_queues lock is nested inside this lock.
953    waiters: Slab<WaitEntryWithId>,
954}
955
956/// An entry in a WaitQueue.
957#[derive(Debug)]
958struct WaitEntry {
959    /// The waiter that is waking for the FdEvent.
960    waiter: WaiterRef,
961
962    /// The events that the waiter is waiting for.
963    filter: WaitEvents,
964
965    /// key for cancelling and queueing events
966    key: WaitKey,
967}
968
969impl WaitQueue {
970    fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
971        let mut wait_queue = self.0.lock();
972        let id = wait_queue
973            .next_wait_entry_id
974            .checked_add(1)
975            .expect("all possible wait entry ID values exhausted");
976        wait_queue.next_wait_entry_id = id;
977        WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
978    }
979
980    /// Establish a wait for the given entry.
981    ///
982    /// The waiter will be notified when an event matching the entry occurs.
983    ///
984    /// This function does not actually block the waiter. To block the waiter,
985    /// call the [`Waiter::wait`] function on the waiter.
986    ///
987    /// Returns a `WaitCanceler` that can be used to cancel the wait.
988    fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
989        let wait_key = entry.key;
990        let waiter_id = self.add_waiter(entry);
991        let wait_queue = Arc::downgrade(&self.0);
992        waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
993        WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
994            wait_queue,
995            waiter: waiter.weak(),
996            wait_key,
997            waiter_id,
998        }))
999    }
1000
1001    /// Establish a wait for the given value event.
1002    ///
1003    /// The waiter will be notified when an event with the same value occurs.
1004    ///
1005    /// This function does not actually block the waiter. To block the waiter,
1006    /// call the [`Waiter::wait`] function on the waiter.
1007    ///
1008    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1009    pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
1010        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1011    }
1012
1013    /// Establish a wait for the given FdEvents.
1014    ///
1015    /// The waiter will be notified when an event matching the `events` occurs.
1016    ///
1017    /// This function does not actually block the waiter. To block the waiter,
1018    /// call the [`Waiter::wait`] function on the waiter.
1019    ///
1020    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1021    pub fn wait_async_fd_events(
1022        &self,
1023        waiter: &Waiter,
1024        events: FdEvents,
1025        handler: EventHandler,
1026    ) -> WaitCanceler {
1027        let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1028        self.wait_async_entry(waiter, entry)
1029    }
1030
1031    /// Establish a wait for a particular signal mask.
1032    ///
1033    /// The waiter will be notified when a signal in the mask is received.
1034    ///
1035    /// This function does not actually block the waiter. To block the waiter,
1036    // call the [`Waiter::wait`] function on the waiter.
1037    ///
1038    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1039    pub fn wait_async_signal_mask(
1040        &self,
1041        waiter: &Waiter,
1042        mask: SigSet,
1043        handler: EventHandler,
1044    ) -> WaitCanceler {
1045        let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1046        self.wait_async_entry(waiter, entry)
1047    }
1048
1049    /// Establish a wait for any event.
1050    ///
1051    /// The waiter will be notified when any event occurs.
1052    ///
1053    /// This function does not actually block the waiter. To block the waiter,
1054    /// call the [`Waiter::wait`] function on the waiter.
1055    ///
1056    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1057    pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1058        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1059    }
1060
1061    pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1062        let entry = WaitEntry {
1063            waiter: WaiterRef::from_event(&waiter.event),
1064            filter: WaitEvents::All,
1065            key: Default::default(),
1066        };
1067        waiter.wait_queues.push(Arc::downgrade(&self.0));
1068        self.add_waiter(entry);
1069    }
1070
1071    fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1072        if let WaitEvents::Fd(ref mut fd_events) = events {
1073            *fd_events = fd_events.add_equivalent_fd_events();
1074        }
1075        // Store references to waiters ready to be notified locally so that we can drop our waiters
1076        // lock before notifying the waiters. The waiters will need to acquire the waiters lock once
1077        // they wake up in order to remove themselves from the queue and so they might contend
1078        // with this thread for that lock.
1079        // Usually we expect to notify at most a single waiter.
1080        let mut notifiable_refs = SmallVec::<[(NotifiableRef, WaitKey); 1]>::new();
1081        let mut woken = 0;
1082        {
1083            let mut guard = self.0.lock();
1084            guard.waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1085                if limit > 0 && entry.filter.intercept(&events) {
1086                    if let Some(notifiable_ref) = entry.waiter.upgrade_notifiable() {
1087                        limit -= 1;
1088                        woken += 1;
1089                        notifiable_refs.push((notifiable_ref, entry.key));
1090                    }
1091
1092                    entry.waiter.will_remove_from_wait_queue(&entry.key);
1093                    false
1094                } else {
1095                    true
1096                }
1097            });
1098        }
1099        for (notifiable_ref, key) in notifiable_refs {
1100            notifiable_ref.notify(&key, events);
1101        }
1102        woken
1103    }
1104
1105    pub fn notify_fd_events(&self, events: FdEvents) {
1106        self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1107    }
1108
1109    pub fn notify_fd_events_count(&self, events: FdEvents, limit: usize) {
1110        self.notify_events_count(WaitEvents::Fd(events), limit);
1111    }
1112
1113    pub fn notify_signal(&self, signal: &Signal) {
1114        let event = WaitEvents::SignalMask(SigSet::from(*signal));
1115        self.notify_events_count(event, usize::MAX);
1116    }
1117
1118    pub fn notify_value(&self, value: u64) {
1119        self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1120    }
1121
1122    pub fn notify_unordered_count(&self, limit: usize) {
1123        self.notify_events_count(WaitEvents::All, limit);
1124    }
1125
1126    pub fn notify_all(&self) {
1127        self.notify_unordered_count(usize::MAX);
1128    }
1129
1130    /// Returns whether there is no active waiters waiting on this `WaitQueue`.
1131    pub fn is_empty(&self) -> bool {
1132        self.0.lock().waiters.is_empty()
1133    }
1134}
1135
1136/// A wait queue that dispatches events based on the value of an enum.
1137pub struct TypedWaitQueue<T: Into<u64>> {
1138    wait_queue: WaitQueue,
1139    value_type: std::marker::PhantomData<T>,
1140}
1141
1142// We can't #[derive(Default)] on [TypedWaitQueue<T>] as T may not implement the Default trait.
1143impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1144    fn default() -> Self {
1145        Self { wait_queue: Default::default(), value_type: Default::default() }
1146    }
1147}
1148
1149impl<T: Into<u64>> TypedWaitQueue<T> {
1150    pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1151        self.wait_queue.wait_async_value(waiter, value.into())
1152    }
1153
1154    pub fn notify_value(&self, value: T) {
1155        self.wait_queue.notify_value(value.into())
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use super::*;
1162    use crate::fs::fuchsia::create_fuchsia_pipe;
1163    use crate::signals::SignalInfo;
1164    use crate::task::TaskFlags;
1165    use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1166    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1167    use crate::vfs::eventfd::{EventFdType, new_eventfd};
1168    use assert_matches::assert_matches;
1169    use starnix_sync::Unlocked;
1170    use starnix_uapi::open_flags::OpenFlags;
1171    use starnix_uapi::signals::SIGUSR1;
1172
1173    const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1174
1175    #[::fuchsia::test]
1176    async fn test_async_wait_exec() {
1177        spawn_kernel_and_run(async |locked, current_task| {
1178            let (local_socket, remote_socket) = zx::Socket::create_stream();
1179            let pipe =
1180                create_fuchsia_pipe(locked, &current_task, remote_socket, OpenFlags::RDWR).unwrap();
1181
1182            const MEM_SIZE: usize = 1024;
1183            let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1184
1185            let test_string = "hello startnix".to_string();
1186            let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1187            let handler = EventHandler::Enqueue {
1188                key: KEY,
1189                queue: queue.clone(),
1190                sought_events: FdEvents::all(),
1191            };
1192            let waiter = Waiter::new();
1193            pipe.wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1194                .expect("wait_async");
1195            let test_string_clone = test_string.clone();
1196
1197            let write_count = AtomicUsizeCounter::default();
1198            std::thread::scope(|s| {
1199                let thread = s.spawn(|| {
1200                    let test_data = test_string_clone.as_bytes();
1201                    let no_written = local_socket.write(test_data).unwrap();
1202                    assert_eq!(0, write_count.add(no_written));
1203                    assert_eq!(no_written, test_data.len());
1204                });
1205
1206                // this code would block on failure
1207
1208                assert!(queue.lock().is_empty());
1209                waiter.wait(locked, &current_task).unwrap();
1210                thread.join().expect("join thread")
1211            });
1212            queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1213
1214            let read_size = pipe.read(locked, &current_task, &mut output_buffer).unwrap();
1215
1216            let no_written = write_count.get();
1217            assert_eq!(no_written, read_size);
1218
1219            assert_eq!(output_buffer.data(), test_string.as_bytes());
1220        })
1221        .await;
1222    }
1223
1224    #[::fuchsia::test]
1225    async fn test_async_wait_cancel() {
1226        for do_cancel in [true, false] {
1227            spawn_kernel_and_run(async move |locked, current_task| {
1228                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
1229                let waiter = Waiter::new();
1230                let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1231                let handler = EventHandler::Enqueue {
1232                    key: KEY,
1233                    queue: queue.clone(),
1234                    sought_events: FdEvents::all(),
1235                };
1236                let wait_canceler = event
1237                    .wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1238                    .expect("wait_async");
1239                if do_cancel {
1240                    wait_canceler.cancel();
1241                }
1242                let add_val = 1u64;
1243                assert_eq!(
1244                    event
1245                        .write(
1246                            locked,
1247                            &current_task,
1248                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1249                        )
1250                        .unwrap(),
1251                    std::mem::size_of::<u64>()
1252                );
1253
1254                let wait_result =
1255                    waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO);
1256                let final_count = queue.lock().len();
1257                if do_cancel {
1258                    assert_eq!(wait_result, error!(ETIMEDOUT));
1259                    assert_eq!(0, final_count);
1260                } else {
1261                    assert_eq!(wait_result, Ok(()));
1262                    assert_eq!(1, final_count);
1263                }
1264            })
1265            .await;
1266        }
1267    }
1268
1269    #[::fuchsia::test]
1270    async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1271        spawn_kernel_and_run(async |locked, current_task| {
1272            let wait_queue = WaitQueue::default();
1273            let waiter = Waiter::new();
1274            let wk1 = wait_queue.wait_async(&waiter);
1275            let _wk2 = wait_queue.wait_async(&waiter);
1276            wk1.cancel();
1277            wait_queue.notify_all();
1278            assert!(waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1279        })
1280        .await;
1281    }
1282
1283    #[::fuchsia::test]
1284    async fn multiple_waiters_cancel_one_other_still_notified() {
1285        spawn_kernel_and_run(async |locked, current_task| {
1286            let wait_queue = WaitQueue::default();
1287            let waiter1 = Waiter::new();
1288            let waiter2 = Waiter::new();
1289            let wk1 = wait_queue.wait_async(&waiter1);
1290            let _wk2 = wait_queue.wait_async(&waiter2);
1291            wk1.cancel();
1292            wait_queue.notify_all();
1293            assert!(waiter1.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_err());
1294            assert!(waiter2.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1295        })
1296        .await;
1297    }
1298
1299    #[::fuchsia::test]
1300    async fn test_wait_queue() {
1301        spawn_kernel_and_run(async |locked, current_task| {
1302            let queue = WaitQueue::default();
1303
1304            let waiters = <[Waiter; 3]>::default();
1305            waiters.iter().for_each(|w| {
1306                queue.wait_async(w);
1307            });
1308
1309            let woken = |locked: &mut Locked<Unlocked>| {
1310                waiters
1311                    .iter()
1312                    .filter(|w| {
1313                        w.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok()
1314                    })
1315                    .count()
1316            };
1317
1318            const INITIAL_NOTIFY_COUNT: usize = 2;
1319            let total_waiters = waiters.len();
1320            queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1321            assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1322
1323            // Only the remaining (unnotified) waiters should be notified.
1324            queue.notify_all();
1325            assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1326        })
1327        .await;
1328    }
1329
1330    #[::fuchsia::test]
1331    async fn waiter_kind_abort_handle() {
1332        spawn_kernel_and_run_sync(|_locked, current_task| {
1333            let mut executor = fuchsia_async::TestExecutor::new();
1334            let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1335            let abort_handle = Arc::new(abort_handle);
1336            let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1337
1338            let mut fut = futures::stream::Abortable::new(
1339                futures::future::pending::<()>(),
1340                abort_registration,
1341            );
1342
1343            assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1344
1345            waiter_ref.interrupt();
1346            let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1347                match executor.run_singlethreaded(&mut fut) {
1348                    Ok(()) => unreachable!("future never terminates normally"),
1349                    Err(futures::stream::Aborted) => Ok(()),
1350                }
1351            });
1352
1353            assert_eq!(output, Ok(()));
1354        })
1355        .await;
1356    }
1357
1358    #[::fuchsia::test]
1359    async fn freeze_with_pending_sigusr1() {
1360        spawn_kernel_and_run(async |_locked, current_task| {
1361            {
1362                let mut task_state = current_task.task.write();
1363                let siginfo = SignalInfo::kernel(SIGUSR1);
1364                task_state.enqueue_signal(siginfo);
1365                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1366            }
1367
1368            let output: Result<(), Errno> = current_task
1369                .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1370                    unreachable!("callback should not be called")
1371                });
1372            assert_eq!(output, error!(EINTR));
1373
1374            let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1375            assert_eq!(output, Ok(()));
1376        })
1377        .await;
1378    }
1379
1380    #[::fuchsia::test]
1381    async fn freeze_with_pending_sigkill() {
1382        spawn_kernel_and_run(async |_locked, current_task| {
1383            {
1384                let mut task_state = current_task.task.write();
1385                let siginfo = SignalInfo::kernel(SIGKILL);
1386                task_state.enqueue_signal(siginfo);
1387                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1388            }
1389
1390            let output: Result<(), _> = current_task
1391                .run_in_state(RunState::Frozen(Waiter::new()), move || {
1392                    unreachable!("callback should not be called")
1393                });
1394            assert_eq!(output, error!(EINTR));
1395        })
1396        .await;
1397    }
1398}