starnix_core/task/
waiter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use bitflags::bitflags;
9use slab::Slab;
10use starnix_lifecycle::{AtomicU64Counter, AtomicUsizeCounter};
11use starnix_sync::{
12    EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
13    PortEvent, PortWaitResult,
14};
15use starnix_types::ownership::debug_assert_no_local_temp_ref;
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, Errno};
18use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
19use starnix_uapi::vfs::FdEvents;
20use std::collections::{HashMap, VecDeque};
21use std::sync::{Arc, Weak};
22use syncio::zxio::zxio_signals_t;
23use syncio::{ZxioSignals, ZxioWeak};
24
25#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
26pub enum ReadyItemKey {
27    FdNumber(FdNumber),
28    Usize(usize),
29}
30
31impl From<FdNumber> for ReadyItemKey {
32    fn from(v: FdNumber) -> Self {
33        Self::FdNumber(v)
34    }
35}
36
37impl From<usize> for ReadyItemKey {
38    fn from(v: usize) -> Self {
39        Self::Usize(v)
40    }
41}
42
43#[derive(Debug, Copy, Clone)]
44pub struct ReadyItem {
45    pub key: ReadyItemKey,
46    pub events: FdEvents,
47}
48
49#[derive(Clone)]
50pub enum EventHandler {
51    /// Does nothing.
52    ///
53    /// It is up to the waiter to synchronize itself with the notifier if
54    /// synchronization is needed.
55    None,
56
57    /// Enqueues an event to a ready list.
58    ///
59    /// This event handler naturally synchronizes the notifier and notifee
60    /// because of the lock acquired/released when enqueuing the event.
61    Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
62
63    /// Wraps another EventHandler and only triggers it once. Further .handle() calls are ignored.
64    ///
65    /// This is intended for cases like BinderFileObject which need to register
66    /// the same EventHandler on multiple wait queues.
67    HandleOnce(Arc<Mutex<Option<EventHandler>>>),
68
69    /// This handler is an epoll.
70    Epoll(EpollEventHandler),
71}
72
73impl EventHandler {
74    pub fn handle(self, events: FdEvents) {
75        match self {
76            Self::None => {}
77            Self::Enqueue { key, queue, sought_events } => {
78                let events = events & sought_events;
79                queue.lock().push_back(ReadyItem { key, events });
80            }
81            Self::HandleOnce(inner) => {
82                if let Some(inner) = inner.lock().take() {
83                    inner.handle(events);
84                }
85            }
86            Self::Epoll(e) => e.handle(events),
87        }
88    }
89}
90
91pub struct ZxioSignalHandler {
92    pub zxio: ZxioWeak,
93    pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
94}
95
96// The counter is incremented as each handle is signaled; when the counter reaches the handle
97// count, the event handler is called with the given events.
98pub struct ManyZxHandleSignalHandler {
99    pub count: usize,
100    pub counter: Arc<AtomicUsizeCounter>,
101    pub expected_signals: zx::Signals,
102    pub events: FdEvents,
103}
104
105pub enum SignalHandlerInner {
106    None,
107    Zxio(ZxioSignalHandler),
108    ZxHandle(fn(zx::Signals) -> FdEvents),
109    ManyZxHandle(ManyZxHandleSignalHandler),
110}
111
112pub struct SignalHandler {
113    pub inner: SignalHandlerInner,
114    pub event_handler: EventHandler,
115    pub err_code: Option<Errno>,
116}
117
118impl SignalHandler {
119    fn handle(self, signals: zx::Signals) -> Option<Errno> {
120        let SignalHandler { inner, event_handler, err_code } = self;
121        let events = match inner {
122            SignalHandlerInner::None => None,
123            SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
124                if let Some(zxio) = zxio.upgrade() {
125                    Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
126                } else {
127                    None
128                }
129            }
130            SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
131                Some(get_events_from_zx_signals(signals))
132            }
133            SignalHandlerInner::ManyZxHandle(signal_handler) => {
134                if signals.contains(signal_handler.expected_signals) {
135                    let new_count = signal_handler.counter.next() + 1;
136                    assert!(new_count <= signal_handler.count);
137                    if new_count == signal_handler.count {
138                        Some(signal_handler.events)
139                    } else {
140                        None
141                    }
142                } else {
143                    None
144                }
145            }
146        };
147        if let Some(events) = events {
148            event_handler.handle(events)
149        }
150        err_code
151    }
152}
153
154pub enum WaitCallback {
155    SignalHandler(SignalHandler),
156    EventHandler(EventHandler),
157}
158
159struct WaitCancelerQueue {
160    wait_queue: Weak<Mutex<WaitQueueImpl>>,
161    waiter: WaiterRef,
162    wait_key: WaitKey,
163    waiter_id: WaitEntryId,
164}
165
166struct WaitCancelerZxio {
167    zxio: ZxioWeak,
168    inner: PortWaitCanceler,
169}
170
171struct WaitCancelerPort {
172    inner: PortWaitCanceler,
173}
174
175enum WaitCancelerInner {
176    Zxio(WaitCancelerZxio),
177    Queue(WaitCancelerQueue),
178    Port(WaitCancelerPort),
179}
180
181const WAIT_CANCELER_COMMON_SIZE: usize = 2;
182
183/// Return values for wait_async methods.
184///
185/// Calling `cancel` will cancel any running wait.
186///
187/// Does not implement `Clone` or `Copy` so that only a single canceler exists
188/// per wait.
189pub struct WaitCanceler {
190    cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
191}
192
193impl WaitCanceler {
194    fn new_inner(inner: WaitCancelerInner) -> Self {
195        Self { cancellers: smallvec::smallvec![inner] }
196    }
197
198    pub fn new_noop() -> Self {
199        Self { cancellers: Default::default() }
200    }
201
202    pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
203        Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
204    }
205
206    pub fn new_port(inner: PortWaitCanceler) -> Self {
207        Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
208    }
209
210    /// Equivalent to `merge_unbounded`, except that it enforces that the resulting vector of
211    /// cancellers is small enough to avoid being separately allocated on the heap.
212    ///
213    /// If possible, use this function instead of `merge_unbounded`, because it gives us better
214    /// tools to keep this code path optimized.
215    pub fn merge(self, other: Self) -> Self {
216        // Increase `WAIT_CANCELER_COMMON_SIZE` if needed, or remove this assert and allow the
217        // smallvec to allocate.
218        assert!(
219            self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
220            "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
221            WAIT_CANCELER_COMMON_SIZE,
222            self.cancellers.len(),
223            other.cancellers.len()
224        );
225        WaitCanceler::merge_unbounded(self, other)
226    }
227
228    /// Creates a new `WaitCanceler` that is equivalent to canceling both its arguments.
229    pub fn merge_unbounded(
230        Self { mut cancellers }: Self,
231        Self { cancellers: mut other }: Self,
232    ) -> Self {
233        cancellers.append(&mut other);
234        WaitCanceler { cancellers }
235    }
236
237    /// Cancel the pending wait.
238    ///
239    /// Takes `self` by value since a wait can only be canceled once.
240    pub fn cancel(self) {
241        let Self { cancellers } = self;
242        for canceller in cancellers.into_iter().rev() {
243            match canceller {
244                WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
245                    let Some(zxio) = zxio.upgrade() else { return };
246                    let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
247                    inner.cancel();
248                    zxio.wait_end(signals);
249                }
250                WaitCancelerInner::Queue(WaitCancelerQueue {
251                    wait_queue,
252                    waiter,
253                    wait_key,
254                    waiter_id: WaitEntryId { key, id },
255                }) => {
256                    let Some(wait_queue) = wait_queue.upgrade() else { return };
257                    waiter.remove_callback(&wait_key);
258                    waiter.will_remove_from_wait_queue(&wait_key);
259                    let mut wait_queue = wait_queue.lock();
260                    let waiters = &mut wait_queue.waiters;
261                    if let Some(entry) = waiters.get_mut(key) {
262                        // The map of waiters in a wait queue uses a `Slab` which
263                        // recycles keys. To make sure we are removing the right
264                        // entry, make sure the ID value matches what we expect
265                        // to remove.
266                        if entry.id == id {
267                            waiters.remove(key);
268                        }
269                    }
270                }
271                WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
272                    inner.cancel();
273                }
274            }
275        }
276    }
277}
278
279/// Return values for wait_async methods that monitor the state of a handle.
280///
281/// Calling `cancel` will cancel any running wait.
282///
283/// Does not implement `Clone` or `Copy` so that only a single canceler exists
284/// per wait.
285pub struct PortWaitCanceler {
286    waiter: Weak<PortWaiter>,
287    key: WaitKey,
288}
289
290impl PortWaitCanceler {
291    /// Cancel the pending wait.
292    ///
293    /// Takes `self` by value since a wait can only be canceled once.
294    pub fn cancel(self) {
295        let Self { waiter, key } = self;
296        if let Some(waiter) = waiter.upgrade() {
297            let _ = waiter.port.cancel(key.raw);
298            waiter.remove_callback(&key);
299        }
300    }
301}
302
303#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
304struct WaitKey {
305    raw: u64,
306}
307
308/// The different type of event that can be waited on / triggered.
309#[derive(Clone, Copy, Debug)]
310enum WaitEvents {
311    /// All event: a wait on `All` will be woken up by all event, and a trigger on `All` will wake
312    /// every waiter.
313    All,
314    /// Wait on the set of FdEvents.
315    Fd(FdEvents),
316    /// Wait for the specified value.
317    Value(u64),
318    /// Wait for a signal in a specific mask to be received by the task.
319    SignalMask(SigSet),
320}
321
322impl WaitEvents {
323    /// Returns whether a wait on `self` should be woken up by `other`.
324    fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
325        match (self, other) {
326            (Self::All, _) | (_, Self::All) => true,
327            (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
328            (Self::Value(v1), Self::Value(v2)) => v1 == v2,
329            // A SignalMask event can only be intercepted by another SignalMask event.
330            (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
331            _ => false,
332        }
333    }
334}
335
336impl WaitCallback {
337    pub fn none() -> EventHandler {
338        EventHandler::None
339    }
340}
341
342bitflags! {
343    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
344    pub struct WaiterOptions: u8 {
345        /// The wait cannot be interrupted by signals.
346        const IGNORE_SIGNALS = 1;
347
348        /// The wait is not taking place at a safe point.
349        ///
350        /// For example, the caller might be holding a lock, which could cause a deadlock if the
351        /// waiter triggers delayed releasers.
352        const UNSAFE_CALLSTACK = 2;
353    }
354}
355
356/// Implementation of Waiter. We put the Waiter data in an Arc so that WaitQueue can tell when the
357/// Waiter has been destroyed by keeping a Weak reference. But this is an implementation detail and
358/// a Waiter should have a single owner. So the Arc is hidden inside Waiter.
359struct PortWaiter {
360    port: PortEvent,
361    callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, // the key 0 is reserved for 'no handler'
362    next_key: AtomicU64Counter,
363    options: WaiterOptions,
364
365    /// Collection of wait queues this Waiter is waiting on, so that when the Waiter is Dropped it
366    /// can remove itself from the queues.
367    ///
368    /// This lock is nested inside the WaitQueue.waiters lock.
369    wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
370}
371
372impl PortWaiter {
373    /// Internal constructor.
374    fn new(options: WaiterOptions) -> Arc<Self> {
375        Arc::new(PortWaiter {
376            port: PortEvent::new(),
377            callbacks: Default::default(),
378            next_key: AtomicU64Counter::new(1),
379            options,
380            wait_queues: Default::default(),
381        })
382    }
383
384    /// Waits until the given deadline has passed or the waiter is woken up. See wait_until().
385    fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
386        // This method can block arbitrarily long, possibly waiting for another process. The
387        // current thread should not own any local ref that might delay the release of a resource
388        // while doing so.
389        debug_assert_no_local_temp_ref();
390
391        match self.port.wait(deadline) {
392            PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
393            PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
394            PortWaitResult::Signal { key, observed } => {
395                if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
396                    match callback {
397                        WaitCallback::SignalHandler(handler) => {
398                            if let Some(errno) = handler.handle(observed) {
399                                return Err(errno);
400                            }
401                        }
402                        WaitCallback::EventHandler(_) => {
403                            panic!("wrong type of handler called")
404                        }
405                    }
406                }
407
408                Ok(())
409            }
410            PortWaitResult::TimedOut => error!(ETIMEDOUT),
411        }
412    }
413
414    fn wait_until<L>(
415        self: &Arc<Self>,
416        locked: &mut Locked<L>,
417        current_task: &CurrentTask,
418        run_state: RunState,
419        deadline: zx::MonotonicInstant,
420    ) -> Result<(), Errno>
421    where
422        L: LockEqualOrBefore<FileOpsCore>,
423    {
424        let is_waiting = deadline.into_nanos() > 0;
425
426        let callback = || {
427            // We are susceptible to spurious wakeups because interrupt() posts a message to the port
428            // queue. In addition to more subtle races, there could already be valid messages in the
429            // port queue that will immediately wake us up, leaving the interrupt message in the queue
430            // for subsequent waits (which by then may not have any signals pending) to read.
431            //
432            // It's impossible to non-racily guarantee that a signal is pending so there might always
433            // be an EINTR result here with no signal. But any signal we get when !is_waiting we know is
434            // leftover from before: the top of this function only sets ourself as the
435            // current_task.signals.run_state when there's a nonzero timeout, and that waiter reference
436            // is what is used to signal the interrupt().
437            loop {
438                let wait_result = self.wait_internal(deadline);
439                if let Err(errno) = &wait_result {
440                    if errno.code == EINTR && !is_waiting {
441                        continue; // Spurious wakeup.
442                    }
443                }
444                return wait_result;
445            }
446        };
447
448        // Trigger delayed releaser before blocking if we're at a safe point.
449        //
450        // For example, we cannot trigger delayed releaser if we are holding any locks.
451        if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
452            current_task.trigger_delayed_releaser(locked);
453        }
454
455        if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
456    }
457
458    fn next_key(&self) -> WaitKey {
459        let key = self.next_key.next();
460        // TODO - find a better reaction to wraparound
461        assert!(key != 0, "bad key from u64 wraparound");
462        WaitKey { raw: key }
463    }
464
465    fn register_callback(&self, callback: WaitCallback) -> WaitKey {
466        let key = self.next_key();
467        assert!(
468            self.callbacks.lock().insert(key, callback).is_none(),
469            "unexpected callback already present for key {key:?}"
470        );
471        key
472    }
473
474    fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
475        self.callbacks.lock().remove(&key)
476    }
477
478    fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
479        let callback = WaitCallback::EventHandler(handler);
480        let key = self.register_callback(callback);
481        self.queue_events(&key, WaitEvents::Fd(events));
482    }
483
484    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
485    /// confused with POSIX signals), optionally running a FnOnce. Wait operations will return
486    /// the error code present in the provided SignalHandler.
487    ///
488    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
489    fn wake_on_zircon_signals(
490        self: &Arc<Self>,
491        handle: &dyn zx::AsHandleRef,
492        zx_signals: zx::Signals,
493        handler: SignalHandler,
494    ) -> Result<PortWaitCanceler, zx::Status> {
495        let callback = WaitCallback::SignalHandler(handler);
496        let key = self.register_callback(callback);
497        self.port.object_wait_async(
498            handle,
499            key.raw,
500            zx_signals,
501            zx::WaitAsyncOpts::EDGE_TRIGGERED,
502        )?;
503        Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
504    }
505
506    fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
507        scopeguard::defer! {
508            self.port.notify(NotifyKind::Regular)
509        }
510
511        // Handling user events immediately when they are triggered breaks any
512        // ordering expectations on Linux by batching all starnix events with
513        // the first starnix event even if other events occur on the Fuchsia
514        // platform (and are enqueued to the `zx::Port`) between them. This
515        // ordering does not seem to be load-bearing for applications running on
516        // starnix so we take the divergence in ordering in favour of improved
517        // performance (by minimizing syscalls) when operating on FDs backed by
518        // starnix.
519        //
520        // TODO(https://fxbug.dev/42084319): If we can read a batch of packets
521        // from the `zx::Port`, maybe we can keep the ordering?
522        let Some(callback) = self.remove_callback(key) else {
523            return;
524        };
525
526        match callback {
527            WaitCallback::EventHandler(handler) => {
528                let events = match events {
529                    // If the event is All, signal on all possible fd
530                    // events.
531                    WaitEvents::All => FdEvents::all(),
532                    WaitEvents::Fd(events) => events,
533                    WaitEvents::SignalMask(_) => FdEvents::POLLIN,
534                    _ => panic!("wrong type of handler called: {events:?}"),
535                };
536                handler.handle(events)
537            }
538            WaitCallback::SignalHandler(_) => {
539                panic!("wrong type of handler called")
540            }
541        }
542    }
543
544    fn notify(&self) {
545        self.port.notify(NotifyKind::Regular);
546    }
547
548    fn interrupt(&self) {
549        if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
550            return;
551        }
552        self.port.notify(NotifyKind::Interrupt);
553    }
554}
555
556impl std::fmt::Debug for PortWaiter {
557    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558        f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
559    }
560}
561
562/// A type that can put a thread to sleep waiting for a condition.
563#[derive(Debug, Clone)]
564pub struct Waiter {
565    // TODO(https://g-issues.fuchsia.dev/issues/303068424): Avoid `PortWaiter`
566    // when operating purely over FDs backed by starnix.
567    inner: Arc<PortWaiter>,
568}
569
570impl Waiter {
571    /// Create a new waiter.
572    pub fn new() -> Self {
573        Self { inner: PortWaiter::new(WaiterOptions::empty()) }
574    }
575
576    /// Create a new waiter with the given options.
577    pub fn with_options(options: WaiterOptions) -> Self {
578        Self { inner: PortWaiter::new(options) }
579    }
580
581    /// Create a weak reference to this waiter.
582    fn weak(&self) -> WaiterRef {
583        WaiterRef::from_port(&self.inner)
584    }
585
586    /// Freeze the task until the waiter is woken up.
587    ///
588    /// No signal, e.g. EINTR (interrupt), should be received.
589    pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
590    where
591        L: LockEqualOrBefore<FileOpsCore>,
592    {
593        while self
594            .inner
595            .wait_until(
596                locked,
597                current_task,
598                RunState::Frozen(self.clone()),
599                zx::MonotonicInstant::INFINITE,
600            )
601            .is_err()
602        {
603            // Avoid attempting to freeze the task if there is a pending SIGKILL.
604            if current_task.read().has_signal_pending(SIGKILL) {
605                break;
606            }
607            // Ignore spurious wakeups from the [`PortEvent.futex`]
608        }
609    }
610
611    /// Wait until the waiter is woken up.
612    ///
613    /// If the wait is interrupted (see [`Waiter::interrupt`]), this function returns EINTR.
614    pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
615    where
616        L: LockEqualOrBefore<FileOpsCore>,
617    {
618        self.inner.wait_until(
619            locked,
620            current_task,
621            RunState::Waiter(WaiterRef::from_port(&self.inner)),
622            zx::MonotonicInstant::INFINITE,
623        )
624    }
625
626    /// Wait until the given deadline has passed or the waiter is woken up.
627    ///
628    /// If the wait deadline is nonzero and is interrupted (see [`Waiter::interrupt`]), this
629    /// function returns EINTR. Callers must take special care not to lose any accumulated data or
630    /// local state when EINTR is received as this is a normal and recoverable situation.
631    ///
632    /// Using a 0 deadline (no waiting, useful for draining pending events) will not wait and is
633    /// guaranteed not to issue EINTR.
634    ///
635    /// It the timeout elapses with no events, this function returns ETIMEDOUT.
636    ///
637    /// Processes at most one event. If the caller is interested in draining the events, it should
638    /// repeatedly call this function with a 0 deadline until it reports ETIMEDOUT. (This case is
639    /// why a 0 deadline must not return EINTR, as previous calls to wait_until() may have
640    /// accumulated state that would be lost when returning EINTR to userspace.)
641    ///
642    /// It is up to the caller (the "waiter") to make sure that it synchronizes with any object
643    /// that triggers an event (the "notifier"). This `Waiter` does not provide any synchronization
644    /// itself. Note that synchronization between the "waiter" the "notifier" may be provided by
645    /// the [`EventHandler`] used to handle an event iff the waiter observes the side-effects of
646    /// the handler (e.g. reading the ready list modified by [`EventHandler::Enqueue`] or
647    /// [`EventHandler::EnqueueOnce`]).
648    pub fn wait_until<L>(
649        &self,
650        locked: &mut Locked<L>,
651        current_task: &CurrentTask,
652        deadline: zx::MonotonicInstant,
653    ) -> Result<(), Errno>
654    where
655        L: LockEqualOrBefore<FileOpsCore>,
656    {
657        self.inner.wait_until(
658            locked,
659            current_task,
660            RunState::Waiter(WaiterRef::from_port(&self.inner)),
661            deadline,
662        )
663    }
664
665    fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
666        WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
667    }
668
669    fn create_wait_entry_with_handler(
670        &self,
671        filter: WaitEvents,
672        handler: EventHandler,
673    ) -> WaitEntry {
674        let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
675        WaitEntry { waiter: self.weak(), filter, key }
676    }
677
678    pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
679        self.inner.wake_immediately(events, handler);
680    }
681
682    /// Establish an asynchronous wait for the signals on the given Zircon handle (not to be
683    /// confused with POSIX signals), optionally running a FnOnce.
684    ///
685    /// Returns a `PortWaitCanceler` that can be used to cancel the wait.
686    pub fn wake_on_zircon_signals(
687        &self,
688        handle: &dyn zx::AsHandleRef,
689        zx_signals: zx::Signals,
690        handler: SignalHandler,
691    ) -> Result<PortWaitCanceler, zx::Status> {
692        self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
693    }
694
695    /// Return a WaitCanceler representing a wait that will never complete. Useful for stub
696    /// implementations that should block forever even though a real implementation would wake up
697    /// eventually.
698    pub fn fake_wait(&self) -> WaitCanceler {
699        WaitCanceler::new_noop()
700    }
701
702    // Notify the waiter to wake it up without signalling any events.
703    pub fn notify(&self) {
704        self.inner.notify();
705    }
706
707    /// Interrupt the waiter to deliver a signal. The wait operation will return EINTR, and a
708    /// typical caller should then unwind to the syscall dispatch loop to let the signal be
709    /// processed. See wait_until() for more details.
710    ///
711    /// Ignored if the waiter was created with new_ignoring_signals().
712    pub fn interrupt(&self) {
713        self.inner.interrupt();
714    }
715}
716
717impl Drop for Waiter {
718    fn drop(&mut self) {
719        // Delete ourselves from each wait queue we know we're on to prevent Weak references to
720        // ourself from sticking around forever.
721        let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
722        for wait_queue in wait_queues {
723            if let Some(wait_queue) = wait_queue.upgrade() {
724                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
725            }
726        }
727    }
728}
729
730impl Default for Waiter {
731    fn default() -> Self {
732        Self::new()
733    }
734}
735
736impl PartialEq for Waiter {
737    fn eq(&self, other: &Self) -> bool {
738        Arc::ptr_eq(&self.inner, &other.inner)
739    }
740}
741
742pub struct SimpleWaiter {
743    event: Arc<InterruptibleEvent>,
744    wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
745}
746
747impl SimpleWaiter {
748    pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
749        (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
750    }
751}
752
753impl Drop for SimpleWaiter {
754    fn drop(&mut self) {
755        for wait_queue in &self.wait_queues {
756            if let Some(wait_queue) = wait_queue.upgrade() {
757                wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
758            }
759        }
760    }
761}
762
763#[derive(Debug, Clone)]
764enum WaiterKind {
765    Port(Weak<PortWaiter>),
766    Event(Weak<InterruptibleEvent>),
767    AbortHandle(Weak<futures::stream::AbortHandle>),
768}
769
770impl Default for WaiterKind {
771    fn default() -> Self {
772        WaiterKind::Port(Default::default())
773    }
774}
775
776/// A weak reference to a Waiter. Intended for holding in wait queues or stashing elsewhere for
777/// calling queue_events later.
778#[derive(Debug, Default, Clone)]
779pub struct WaiterRef(WaiterKind);
780
781impl WaiterRef {
782    fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
783        WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
784    }
785
786    fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
787        WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
788    }
789
790    pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
791        WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
792    }
793
794    pub fn is_valid(&self) -> bool {
795        match &self.0 {
796            WaiterKind::Port(waiter) => waiter.strong_count() != 0,
797            WaiterKind::Event(event) => event.strong_count() != 0,
798            WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
799        }
800    }
801
802    pub fn interrupt(&self) {
803        match &self.0 {
804            WaiterKind::Port(waiter) => {
805                if let Some(waiter) = waiter.upgrade() {
806                    waiter.interrupt();
807                }
808            }
809            WaiterKind::Event(event) => {
810                if let Some(event) = event.upgrade() {
811                    event.interrupt();
812                }
813            }
814            WaiterKind::AbortHandle(handle) => {
815                if let Some(handle) = handle.upgrade() {
816                    handle.abort();
817                }
818            }
819        }
820    }
821
822    fn remove_callback(&self, key: &WaitKey) {
823        match &self.0 {
824            WaiterKind::Port(waiter) => {
825                if let Some(waiter) = waiter.upgrade() {
826                    waiter.remove_callback(key);
827                }
828            }
829            _ => (),
830        }
831    }
832
833    /// Called by the WaitQueue when this waiter is about to be removed from the queue.
834    ///
835    /// TODO(abarth): This function does not appear to be called when the WaitQueue is dropped,
836    /// which appears to be a leak.
837    fn will_remove_from_wait_queue(&self, key: &WaitKey) {
838        match &self.0 {
839            WaiterKind::Port(waiter) => {
840                if let Some(waiter) = waiter.upgrade() {
841                    waiter.wait_queues.lock().remove(key);
842                }
843            }
844            _ => (),
845        }
846    }
847
848    /// Notify the waiter that the `events` have occurred.
849    ///
850    /// If the client is using an `SimpleWaiter`, they will be notified but they will not learn
851    /// which events occurred.
852    ///
853    /// If the client is using an `AbortHandle`, `AbortHandle::abort()` will be called.
854    fn notify(&self, key: &WaitKey, events: WaitEvents) -> bool {
855        match &self.0 {
856            WaiterKind::Port(waiter) => {
857                if let Some(waiter) = waiter.upgrade() {
858                    waiter.queue_events(key, events);
859                    return true;
860                }
861            }
862            WaiterKind::Event(event) => {
863                if let Some(event) = event.upgrade() {
864                    event.notify();
865                    return true;
866                }
867            }
868            WaiterKind::AbortHandle(handle) => {
869                if let Some(handle) = handle.upgrade() {
870                    handle.abort();
871                    return true;
872                }
873            }
874        }
875        false
876    }
877}
878
879impl PartialEq<Waiter> for WaiterRef {
880    fn eq(&self, other: &Waiter) -> bool {
881        match &self.0 {
882            WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
883            _ => false,
884        }
885    }
886}
887
888impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
889    fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
890        match &self.0 {
891            WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
892            _ => false,
893        }
894    }
895}
896
897impl PartialEq for WaiterRef {
898    fn eq(&self, other: &WaiterRef) -> bool {
899        match (&self.0, &other.0) {
900            (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
901            (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
902            (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
903            _ => false,
904        }
905    }
906}
907
908/// A list of waiters waiting for some event.
909///
910/// For events that are generated inside Starnix, we walk the wait queue
911/// on the thread that triggered the event to notify the waiters that the event
912/// has occurred. The waiters will then wake up on their own thread to handle
913/// the event.
914#[derive(Default, Debug, Clone)]
915pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
916
917#[derive(Debug)]
918struct WaitEntryWithId {
919    entry: WaitEntry,
920    /// The ID use to uniquely identify this wait entry even if it shares the
921    /// key used in the wait queue's [`Slab`] with another wait entry since a
922    /// slab's keys are recycled.
923    id: u64,
924}
925
926struct WaitEntryId {
927    key: usize,
928    id: u64,
929}
930
931#[derive(Default, Debug)]
932struct WaitQueueImpl {
933    /// Holds the next ID value to use when adding a new `WaitEntry` to the
934    /// waiters (dense) map.
935    ///
936    /// A [`Slab`]s keys are recycled so we use the ID to uniquely identify a
937    /// wait entry.
938    next_wait_entry_id: u64,
939    /// The list of waiters.
940    ///
941    /// The waiter's wait_queues lock is nested inside this lock.
942    waiters: Slab<WaitEntryWithId>,
943}
944
945/// An entry in a WaitQueue.
946#[derive(Debug)]
947struct WaitEntry {
948    /// The waiter that is waking for the FdEvent.
949    waiter: WaiterRef,
950
951    /// The events that the waiter is waiting for.
952    filter: WaitEvents,
953
954    /// key for cancelling and queueing events
955    key: WaitKey,
956}
957
958impl WaitQueue {
959    fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
960        let mut wait_queue = self.0.lock();
961        let id = wait_queue
962            .next_wait_entry_id
963            .checked_add(1)
964            .expect("all possible wait entry ID values exhausted");
965        wait_queue.next_wait_entry_id = id;
966        WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
967    }
968
969    /// Establish a wait for the given entry.
970    ///
971    /// The waiter will be notified when an event matching the entry occurs.
972    ///
973    /// This function does not actually block the waiter. To block the waiter,
974    /// call the [`Waiter::wait`] function on the waiter.
975    ///
976    /// Returns a `WaitCanceler` that can be used to cancel the wait.
977    fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
978        let wait_key = entry.key;
979        let waiter_id = self.add_waiter(entry);
980        let wait_queue = Arc::downgrade(&self.0);
981        waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
982        WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
983            wait_queue,
984            waiter: waiter.weak(),
985            wait_key,
986            waiter_id,
987        }))
988    }
989
990    /// Establish a wait for the given value event.
991    ///
992    /// The waiter will be notified when an event with the same value occurs.
993    ///
994    /// This function does not actually block the waiter. To block the waiter,
995    /// call the [`Waiter::wait`] function on the waiter.
996    ///
997    /// Returns a `WaitCanceler` that can be used to cancel the wait.
998    pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
999        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1000    }
1001
1002    /// Establish a wait for the given FdEvents.
1003    ///
1004    /// The waiter will be notified when an event matching the `events` occurs.
1005    ///
1006    /// This function does not actually block the waiter. To block the waiter,
1007    /// call the [`Waiter::wait`] function on the waiter.
1008    ///
1009    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1010    pub fn wait_async_fd_events(
1011        &self,
1012        waiter: &Waiter,
1013        events: FdEvents,
1014        handler: EventHandler,
1015    ) -> WaitCanceler {
1016        let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1017        self.wait_async_entry(waiter, entry)
1018    }
1019
1020    /// Establish a wait for a particular signal mask.
1021    ///
1022    /// The waiter will be notified when a signal in the mask is received.
1023    ///
1024    /// This function does not actually block the waiter. To block the waiter,
1025    // call the [`Waiter::wait`] function on the waiter.
1026    ///
1027    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1028    pub fn wait_async_signal_mask(
1029        &self,
1030        waiter: &Waiter,
1031        mask: SigSet,
1032        handler: EventHandler,
1033    ) -> WaitCanceler {
1034        let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1035        self.wait_async_entry(waiter, entry)
1036    }
1037
1038    /// Establish a wait for any event.
1039    ///
1040    /// The waiter will be notified when any event occurs.
1041    ///
1042    /// This function does not actually block the waiter. To block the waiter,
1043    /// call the [`Waiter::wait`] function on the waiter.
1044    ///
1045    /// Returns a `WaitCanceler` that can be used to cancel the wait.
1046    pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1047        self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1048    }
1049
1050    pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1051        let entry = WaitEntry {
1052            waiter: WaiterRef::from_event(&waiter.event),
1053            filter: WaitEvents::All,
1054            key: Default::default(),
1055        };
1056        waiter.wait_queues.push(Arc::downgrade(&self.0));
1057        self.add_waiter(entry);
1058    }
1059
1060    fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1061        if let WaitEvents::Fd(ref mut fd_events) = events {
1062            *fd_events = fd_events.add_equivalent_fd_events();
1063        }
1064        let mut woken = 0;
1065        self.0.lock().waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1066            if limit > 0 && entry.filter.intercept(&events) {
1067                if entry.waiter.notify(&entry.key, events) {
1068                    limit -= 1;
1069                    woken += 1;
1070                }
1071
1072                entry.waiter.will_remove_from_wait_queue(&entry.key);
1073                false
1074            } else {
1075                true
1076            }
1077        });
1078        woken
1079    }
1080
1081    pub fn notify_fd_events(&self, events: FdEvents) {
1082        self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1083    }
1084
1085    pub fn notify_signal(&self, signal: &Signal) {
1086        let event = WaitEvents::SignalMask(SigSet::from(*signal));
1087        self.notify_events_count(event, usize::MAX);
1088    }
1089
1090    pub fn notify_value(&self, value: u64) {
1091        self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1092    }
1093
1094    pub fn notify_unordered_count(&self, limit: usize) {
1095        self.notify_events_count(WaitEvents::All, limit);
1096    }
1097
1098    pub fn notify_all(&self) {
1099        self.notify_unordered_count(usize::MAX);
1100    }
1101
1102    /// Returns whether there is no active waiters waiting on this `WaitQueue`.
1103    pub fn is_empty(&self) -> bool {
1104        self.0.lock().waiters.is_empty()
1105    }
1106}
1107
1108/// A wait queue that dispatches events based on the value of an enum.
1109pub struct TypedWaitQueue<T: Into<u64>> {
1110    wait_queue: WaitQueue,
1111    value_type: std::marker::PhantomData<T>,
1112}
1113
1114// We can't #[derive(Default)] on [TypedWaitQueue<T>] as T may not implement the Default trait.
1115impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1116    fn default() -> Self {
1117        Self { wait_queue: Default::default(), value_type: Default::default() }
1118    }
1119}
1120
1121impl<T: Into<u64>> TypedWaitQueue<T> {
1122    pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1123        self.wait_queue.wait_async_value(waiter, value.into())
1124    }
1125
1126    pub fn notify_value(&self, value: T) {
1127        self.wait_queue.notify_value(value.into())
1128    }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133    use super::*;
1134    use crate::fs::fuchsia::create_fuchsia_pipe;
1135    use crate::signals::SignalInfo;
1136    use crate::task::TaskFlags;
1137    use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1138    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1139    use crate::vfs::eventfd::{EventFdType, new_eventfd};
1140    use assert_matches::assert_matches;
1141    use starnix_sync::Unlocked;
1142    use starnix_uapi::open_flags::OpenFlags;
1143    use starnix_uapi::signals::SIGUSR1;
1144
1145    const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1146
1147    #[::fuchsia::test]
1148    async fn test_async_wait_exec() {
1149        spawn_kernel_and_run(async |locked, current_task| {
1150            let (local_socket, remote_socket) = zx::Socket::create_stream();
1151            let pipe =
1152                create_fuchsia_pipe(locked, &current_task, remote_socket, OpenFlags::RDWR).unwrap();
1153
1154            const MEM_SIZE: usize = 1024;
1155            let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1156
1157            let test_string = "hello startnix".to_string();
1158            let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1159            let handler = EventHandler::Enqueue {
1160                key: KEY,
1161                queue: queue.clone(),
1162                sought_events: FdEvents::all(),
1163            };
1164            let waiter = Waiter::new();
1165            pipe.wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1166                .expect("wait_async");
1167            let test_string_clone = test_string.clone();
1168
1169            let write_count = AtomicUsizeCounter::default();
1170            std::thread::scope(|s| {
1171                let thread = s.spawn(|| {
1172                    let test_data = test_string_clone.as_bytes();
1173                    let no_written = local_socket.write(test_data).unwrap();
1174                    assert_eq!(0, write_count.add(no_written));
1175                    assert_eq!(no_written, test_data.len());
1176                });
1177
1178                // this code would block on failure
1179
1180                assert!(queue.lock().is_empty());
1181                waiter.wait(locked, &current_task).unwrap();
1182                thread.join().expect("join thread")
1183            });
1184            queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1185
1186            let read_size = pipe.read(locked, &current_task, &mut output_buffer).unwrap();
1187
1188            let no_written = write_count.get();
1189            assert_eq!(no_written, read_size);
1190
1191            assert_eq!(output_buffer.data(), test_string.as_bytes());
1192        })
1193        .await;
1194    }
1195
1196    #[::fuchsia::test]
1197    async fn test_async_wait_cancel() {
1198        for do_cancel in [true, false] {
1199            spawn_kernel_and_run(async move |locked, current_task| {
1200                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
1201                let waiter = Waiter::new();
1202                let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1203                let handler = EventHandler::Enqueue {
1204                    key: KEY,
1205                    queue: queue.clone(),
1206                    sought_events: FdEvents::all(),
1207                };
1208                let wait_canceler = event
1209                    .wait_async(locked, &current_task, &waiter, FdEvents::POLLIN, handler)
1210                    .expect("wait_async");
1211                if do_cancel {
1212                    wait_canceler.cancel();
1213                }
1214                let add_val = 1u64;
1215                assert_eq!(
1216                    event
1217                        .write(
1218                            locked,
1219                            &current_task,
1220                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1221                        )
1222                        .unwrap(),
1223                    std::mem::size_of::<u64>()
1224                );
1225
1226                let wait_result =
1227                    waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO);
1228                let final_count = queue.lock().len();
1229                if do_cancel {
1230                    assert_eq!(wait_result, error!(ETIMEDOUT));
1231                    assert_eq!(0, final_count);
1232                } else {
1233                    assert_eq!(wait_result, Ok(()));
1234                    assert_eq!(1, final_count);
1235                }
1236            })
1237            .await;
1238        }
1239    }
1240
1241    #[::fuchsia::test]
1242    async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1243        spawn_kernel_and_run(async |locked, current_task| {
1244            let wait_queue = WaitQueue::default();
1245            let waiter = Waiter::new();
1246            let wk1 = wait_queue.wait_async(&waiter);
1247            let _wk2 = wait_queue.wait_async(&waiter);
1248            wk1.cancel();
1249            wait_queue.notify_all();
1250            assert!(waiter.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1251        })
1252        .await;
1253    }
1254
1255    #[::fuchsia::test]
1256    async fn multiple_waiters_cancel_one_other_still_notified() {
1257        spawn_kernel_and_run(async |locked, current_task| {
1258            let wait_queue = WaitQueue::default();
1259            let waiter1 = Waiter::new();
1260            let waiter2 = Waiter::new();
1261            let wk1 = wait_queue.wait_async(&waiter1);
1262            let _wk2 = wait_queue.wait_async(&waiter2);
1263            wk1.cancel();
1264            wait_queue.notify_all();
1265            assert!(waiter1.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_err());
1266            assert!(waiter2.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok());
1267        })
1268        .await;
1269    }
1270
1271    #[::fuchsia::test]
1272    async fn test_wait_queue() {
1273        spawn_kernel_and_run(async |locked, current_task| {
1274            let queue = WaitQueue::default();
1275
1276            let waiters = <[Waiter; 3]>::default();
1277            waiters.iter().for_each(|w| {
1278                queue.wait_async(w);
1279            });
1280
1281            let woken = |locked: &mut Locked<Unlocked>| {
1282                waiters
1283                    .iter()
1284                    .filter(|w| {
1285                        w.wait_until(locked, &current_task, zx::MonotonicInstant::ZERO).is_ok()
1286                    })
1287                    .count()
1288            };
1289
1290            const INITIAL_NOTIFY_COUNT: usize = 2;
1291            let total_waiters = waiters.len();
1292            queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1293            assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1294
1295            // Only the remaining (unnotified) waiters should be notified.
1296            queue.notify_all();
1297            assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1298        })
1299        .await;
1300    }
1301
1302    #[::fuchsia::test]
1303    async fn waiter_kind_abort_handle() {
1304        spawn_kernel_and_run_sync(|_locked, current_task| {
1305            let mut executor = fuchsia_async::TestExecutor::new();
1306            let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1307            let abort_handle = Arc::new(abort_handle);
1308            let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1309
1310            let mut fut = futures::stream::Abortable::new(
1311                futures::future::pending::<()>(),
1312                abort_registration,
1313            );
1314
1315            assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1316
1317            waiter_ref.interrupt();
1318            let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1319                match executor.run_singlethreaded(&mut fut) {
1320                    Ok(()) => unreachable!("future never terminates normally"),
1321                    Err(futures::stream::Aborted) => Ok(()),
1322                }
1323            });
1324
1325            assert_eq!(output, Ok(()));
1326        })
1327        .await;
1328    }
1329
1330    #[::fuchsia::test]
1331    async fn freeze_with_pending_sigusr1() {
1332        spawn_kernel_and_run(async |_locked, current_task| {
1333            {
1334                let mut task_state = current_task.task.write();
1335                let siginfo = SignalInfo::default(SIGUSR1);
1336                task_state.enqueue_signal(siginfo);
1337                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1338            }
1339
1340            let output: Result<(), Errno> = current_task
1341                .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1342                    unreachable!("callback should not be called")
1343                });
1344            assert_eq!(output, error!(EINTR));
1345
1346            let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1347            assert_eq!(output, Ok(()));
1348        })
1349        .await;
1350    }
1351
1352    #[::fuchsia::test]
1353    async fn freeze_with_pending_sigkill() {
1354        spawn_kernel_and_run(async |_locked, current_task| {
1355            {
1356                let mut task_state = current_task.task.write();
1357                let siginfo = SignalInfo::default(SIGKILL);
1358                task_state.enqueue_signal(siginfo);
1359                task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1360            }
1361
1362            let output: Result<(), _> = current_task
1363                .run_in_state(RunState::Frozen(Waiter::new()), move || {
1364                    unreachable!("callback should not be called")
1365                });
1366            assert_eq!(output, error!(EINTR));
1367        })
1368        .await;
1369    }
1370}