1use crate::task::{CurrentTask, RunState};
6use crate::vfs::{EpollEventHandler, FdNumber};
7use bitflags::bitflags;
8use futures::stream::AbortHandle;
9use slab::Slab;
10use smallvec::SmallVec;
11use starnix_lifecycle::AtomicCounter;
12use starnix_sync::{
13 EventHandlerReadyQueueLock, EventWaitGuard, FileOpsCore, InterruptibleEvent, LockDepMutex,
14 LockEqualOrBefore, Locked, Mutex, NotifyKind, PortEvent, PortWaitResult,
15 WaiterEventHandlerLock,
16};
17use starnix_types::ownership::debug_assert_no_local_temp_ref;
18use starnix_uapi::error;
19use starnix_uapi::errors::{EINTR, Errno};
20use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
21use starnix_uapi::vfs::FdEvents;
22use std::collections::{HashMap, VecDeque};
23use std::sync::{Arc, Weak};
24use syncio::zxio::zxio_signals_t;
25use syncio::{ZxioSignals, ZxioWeak};
26
27#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
28pub enum ReadyItemKey {
29 FdNumber(FdNumber),
30 Usize(usize),
31}
32
33impl From<FdNumber> for ReadyItemKey {
34 fn from(v: FdNumber) -> Self {
35 Self::FdNumber(v)
36 }
37}
38
39impl From<usize> for ReadyItemKey {
40 fn from(v: usize) -> Self {
41 Self::Usize(v)
42 }
43}
44
45#[derive(Debug, Copy, Clone)]
46pub struct ReadyItem {
47 pub key: ReadyItemKey,
48 pub events: FdEvents,
49}
50
51#[derive(Clone)]
52pub enum EventHandler {
53 None,
58
59 Enqueue {
64 key: ReadyItemKey,
65 queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>>,
66 sought_events: FdEvents,
67 },
68
69 HandleOnce(Arc<LockDepMutex<Option<EventHandler>, WaiterEventHandlerLock>>),
74
75 Epoll(EpollEventHandler),
77}
78
79impl EventHandler {
80 pub fn handle(self, events: FdEvents) {
81 match self {
82 Self::None => {}
83 Self::Enqueue { key, queue, sought_events } => {
84 let events = events & sought_events;
85 queue.lock().push_back(ReadyItem { key, events });
86 }
87 Self::HandleOnce(inner) => {
88 if let Some(inner) = inner.lock().take() {
89 inner.handle(events);
90 }
91 }
92 Self::Epoll(e) => e.handle(events),
93 }
94 }
95}
96
97pub struct ZxioSignalHandler {
98 pub zxio: ZxioWeak,
99 pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
100}
101
102pub struct ManyZxHandleSignalHandler {
105 pub count: usize,
106 pub counter: Arc<AtomicCounter<usize>>,
107 pub expected_signals: zx::Signals,
108 pub events: FdEvents,
109}
110
111pub enum SignalHandlerInner {
112 None,
113 Zxio(ZxioSignalHandler),
114 ZxHandle(fn(zx::Signals) -> FdEvents),
115 ManyZxHandle(ManyZxHandleSignalHandler),
116}
117
118pub struct SignalHandler {
119 pub inner: SignalHandlerInner,
120 pub event_handler: EventHandler,
121 pub err_code: Option<Errno>,
122}
123
124impl SignalHandler {
125 fn handle(self, signals: zx::Signals) -> Option<Errno> {
126 let SignalHandler { inner, event_handler, err_code } = self;
127 let events = match inner {
128 SignalHandlerInner::None => None,
129 SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
130 if let Some(zxio) = zxio.upgrade() {
131 Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
132 } else {
133 None
134 }
135 }
136 SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
137 Some(get_events_from_zx_signals(signals))
138 }
139 SignalHandlerInner::ManyZxHandle(signal_handler) => {
140 if signals.contains(signal_handler.expected_signals) {
141 let new_count = signal_handler.counter.next() + 1;
142 assert!(new_count <= signal_handler.count);
143 if new_count == signal_handler.count {
144 Some(signal_handler.events)
145 } else {
146 None
147 }
148 } else {
149 None
150 }
151 }
152 };
153 if let Some(events) = events {
154 event_handler.handle(events)
155 }
156 err_code
157 }
158}
159
160pub enum WaitCallback {
161 SignalHandler(SignalHandler),
162 EventHandler(EventHandler),
163}
164
165struct WaitCancelerQueue {
166 wait_queue: Weak<Mutex<WaitQueueImpl>>,
167 waiter: WaiterRef,
168 wait_key: WaitKey,
169 waiter_id: WaitEntryId,
170}
171
172struct WaitCancelerZxio {
173 zxio: ZxioWeak,
174 inner: PortWaitCanceler,
175}
176
177struct WaitCancelerPort {
178 inner: PortWaitCanceler,
179}
180
181enum WaitCancelerInner {
182 Zxio(WaitCancelerZxio),
183 Queue(WaitCancelerQueue),
184 Port(WaitCancelerPort),
185}
186
187enum NotifiableRef {
188 Port(Arc<PortWaiter>),
189 Event(Arc<InterruptibleEvent>),
190 AbortHandle(Arc<AbortHandle>),
191}
192
193const WAIT_CANCELER_COMMON_SIZE: usize = 2;
194
195pub struct WaitCanceler {
202 cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
203}
204
205impl WaitCanceler {
206 fn new_inner(inner: WaitCancelerInner) -> Self {
207 Self { cancellers: smallvec::smallvec![inner] }
208 }
209
210 pub fn new_noop() -> Self {
211 Self { cancellers: Default::default() }
212 }
213
214 pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
215 Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
216 }
217
218 pub fn new_port(inner: PortWaitCanceler) -> Self {
219 Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
220 }
221
222 pub fn merge(self, other: Self) -> Self {
228 assert!(
231 self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
232 "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
233 WAIT_CANCELER_COMMON_SIZE,
234 self.cancellers.len(),
235 other.cancellers.len()
236 );
237 WaitCanceler::merge_unbounded(self, other)
238 }
239
240 pub fn merge_unbounded(
242 Self { mut cancellers }: Self,
243 Self { cancellers: mut other }: Self,
244 ) -> Self {
245 cancellers.append(&mut other);
246 WaitCanceler { cancellers }
247 }
248
249 pub fn cancel(self) {
253 let Self { cancellers } = self;
254 for canceller in cancellers.into_iter().rev() {
255 match canceller {
256 WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
257 let Some(zxio) = zxio.upgrade() else { return };
258 let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
259 inner.cancel();
260 zxio.wait_end(signals);
261 }
262 WaitCancelerInner::Queue(WaitCancelerQueue {
263 wait_queue,
264 waiter,
265 wait_key,
266 waiter_id: WaitEntryId { key, id },
267 }) => {
268 let Some(wait_queue) = wait_queue.upgrade() else { return };
269 waiter.remove_callback(&wait_key);
270 waiter.will_remove_from_wait_queue(&wait_key);
271 let mut wait_queue = wait_queue.lock();
272 let waiters = &mut wait_queue.waiters;
273 if let Some(entry) = waiters.get_mut(key) {
274 if entry.id == id {
279 waiters.remove(key);
280 }
281 }
282 }
283 WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
284 inner.cancel();
285 }
286 }
287 }
288 }
289}
290
291pub struct PortWaitCanceler {
298 waiter: Weak<PortWaiter>,
299 key: WaitKey,
300}
301
302impl PortWaitCanceler {
303 pub fn cancel(self) {
307 let Self { waiter, key } = self;
308 if let Some(waiter) = waiter.upgrade() {
309 let _ = waiter.port.cancel(key.raw);
310 waiter.remove_callback(&key);
311 }
312 }
313}
314
315#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
316struct WaitKey {
317 raw: u64,
318}
319
320#[derive(Clone, Copy, Debug)]
322enum WaitEvents {
323 All,
326 Fd(FdEvents),
328 Value(u64),
330 SignalMask(SigSet),
332}
333
334impl WaitEvents {
335 fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
337 match (self, other) {
338 (Self::All, _) | (_, Self::All) => true,
339 (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
340 (Self::Value(v1), Self::Value(v2)) => v1 == v2,
341 (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
343 _ => false,
344 }
345 }
346}
347
348impl WaitCallback {
349 pub fn none() -> EventHandler {
350 EventHandler::None
351 }
352}
353
354bitflags! {
355 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
356 pub struct WaiterOptions: u8 {
357 const IGNORE_SIGNALS = 1 << 0;
359
360 const UNSAFE_CALLSTACK = 1 << 1;
365 }
366}
367
368struct PortWaiter {
372 port: PortEvent,
373 callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, next_key: AtomicCounter<u64>,
375 options: WaiterOptions,
376
377 wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
382}
383
384impl PortWaiter {
385 fn new(options: WaiterOptions) -> Arc<Self> {
387 Arc::new(PortWaiter {
388 port: PortEvent::new(),
389 callbacks: Default::default(),
390 next_key: AtomicCounter::<u64>::new(1),
391 options,
392 wait_queues: Default::default(),
393 })
394 }
395
396 fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
398 debug_assert_no_local_temp_ref();
402
403 match self.port.wait(deadline) {
404 PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
405 PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
406 PortWaitResult::Signal { key, observed } => {
407 if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
408 match callback {
409 WaitCallback::SignalHandler(handler) => {
410 if let Some(errno) = handler.handle(observed) {
411 return Err(errno);
412 }
413 }
414 WaitCallback::EventHandler(_) => {
415 panic!("wrong type of handler called")
416 }
417 }
418 }
419
420 Ok(())
421 }
422 PortWaitResult::TimedOut => error!(ETIMEDOUT),
423 }
424 }
425
426 fn wait_until<L>(
427 self: &Arc<Self>,
428 locked: &mut Locked<L>,
429 current_task: &CurrentTask,
430 run_state: RunState,
431 deadline: zx::MonotonicInstant,
432 ) -> Result<(), Errno>
433 where
434 L: LockEqualOrBefore<FileOpsCore>,
435 {
436 let is_waiting = deadline.into_nanos() > 0;
437
438 let callback = || {
439 loop {
450 let wait_result = self.wait_internal(deadline);
451 if let Err(errno) = &wait_result {
452 if errno.code == EINTR && !is_waiting {
453 continue; }
455 }
456 return wait_result;
457 }
458 };
459
460 if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
464 current_task.trigger_delayed_releaser(locked);
465 }
466
467 if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
468 }
469
470 fn next_key(&self) -> WaitKey {
471 let key = self.next_key.next();
472 assert!(key != 0, "bad key from u64 wraparound");
474 WaitKey { raw: key }
475 }
476
477 fn register_callback(&self, callback: WaitCallback) -> WaitKey {
478 let key = self.next_key();
479 assert!(
480 self.callbacks.lock().insert(key, callback).is_none(),
481 "unexpected callback already present for key {key:?}"
482 );
483 key
484 }
485
486 fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
487 self.callbacks.lock().remove(&key)
488 }
489
490 fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
491 let callback = WaitCallback::EventHandler(handler);
492 let key = self.register_callback(callback);
493 self.queue_events(&key, WaitEvents::Fd(events));
494 }
495
496 fn wake_on_zircon_signals(
502 self: &Arc<Self>,
503 handle: &dyn zx::AsHandleRef,
504 zx_signals: zx::Signals,
505 handler: SignalHandler,
506 ) -> Result<PortWaitCanceler, zx::Status> {
507 let callback = WaitCallback::SignalHandler(handler);
508 let key = self.register_callback(callback);
509 self.port.object_wait_async(
510 handle,
511 key.raw,
512 zx_signals,
513 zx::WaitAsyncOpts::EDGE_TRIGGERED,
514 )?;
515 Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
516 }
517
518 fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
519 scopeguard::defer! {
520 self.port.notify(NotifyKind::Regular)
521 }
522
523 let Some(callback) = self.remove_callback(key) else {
535 return;
536 };
537
538 match callback {
539 WaitCallback::EventHandler(handler) => {
540 let events = match events {
541 WaitEvents::All => FdEvents::all(),
544 WaitEvents::Fd(events) => events,
545 WaitEvents::SignalMask(_) => FdEvents::POLLIN,
546 WaitEvents::Value(_) => FdEvents::POLLIN,
547 };
548 handler.handle(events)
549 }
550 WaitCallback::SignalHandler(_) => {
551 panic!("wrong type of handler called")
552 }
553 }
554 }
555
556 fn notify(&self) {
557 self.port.notify(NotifyKind::Regular);
558 }
559
560 fn interrupt(&self) {
561 if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
562 return;
563 }
564 self.port.notify(NotifyKind::Interrupt);
565 }
566}
567
568impl std::fmt::Debug for PortWaiter {
569 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570 f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
571 }
572}
573
574#[derive(Debug, Clone)]
576pub struct Waiter {
577 inner: Arc<PortWaiter>,
580}
581
582impl Waiter {
583 pub fn new() -> Self {
585 Self { inner: PortWaiter::new(WaiterOptions::empty()) }
586 }
587
588 pub fn with_options(options: WaiterOptions) -> Self {
590 Self { inner: PortWaiter::new(options) }
591 }
592
593 fn weak(&self) -> WaiterRef {
595 WaiterRef::from_port(&self.inner)
596 }
597
598 pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
602 where
603 L: LockEqualOrBefore<FileOpsCore>,
604 {
605 while self
606 .inner
607 .wait_until(
608 locked,
609 current_task,
610 RunState::Frozen(self.clone()),
611 zx::MonotonicInstant::INFINITE,
612 )
613 .is_err()
614 {
615 if current_task.read().has_signal_pending(SIGKILL) {
617 break;
618 }
619 }
621 }
622
623 pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
627 where
628 L: LockEqualOrBefore<FileOpsCore>,
629 {
630 self.inner.wait_until(
631 locked,
632 current_task,
633 RunState::Waiter(WaiterRef::from_port(&self.inner)),
634 zx::MonotonicInstant::INFINITE,
635 )
636 }
637
638 pub fn wait_until<L>(
661 &self,
662 locked: &mut Locked<L>,
663 current_task: &CurrentTask,
664 deadline: zx::MonotonicInstant,
665 ) -> Result<(), Errno>
666 where
667 L: LockEqualOrBefore<FileOpsCore>,
668 {
669 self.inner.wait_until(
670 locked,
671 current_task,
672 RunState::Waiter(WaiterRef::from_port(&self.inner)),
673 deadline,
674 )
675 }
676
677 fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
678 WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
679 }
680
681 fn create_wait_entry_with_handler(
682 &self,
683 filter: WaitEvents,
684 handler: EventHandler,
685 ) -> WaitEntry {
686 let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
687 WaitEntry { waiter: self.weak(), filter, key }
688 }
689
690 pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
691 self.inner.wake_immediately(events, handler);
692 }
693
694 pub fn wake_on_zircon_signals(
699 &self,
700 handle: &dyn zx::AsHandleRef,
701 zx_signals: zx::Signals,
702 handler: SignalHandler,
703 ) -> Result<PortWaitCanceler, zx::Status> {
704 self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
705 }
706
707 pub fn fake_wait(&self) -> WaitCanceler {
711 WaitCanceler::new_noop()
712 }
713
714 pub fn notify(&self) {
716 self.inner.notify();
717 }
718
719 pub fn interrupt(&self) {
725 self.inner.interrupt();
726 }
727}
728
729impl Drop for Waiter {
730 fn drop(&mut self) {
731 let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
734 for wait_queue in wait_queues {
735 if let Some(wait_queue) = wait_queue.upgrade() {
736 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
737 }
738 }
739 }
740}
741
742impl Default for Waiter {
743 fn default() -> Self {
744 Self::new()
745 }
746}
747
748impl PartialEq for Waiter {
749 fn eq(&self, other: &Self) -> bool {
750 Arc::ptr_eq(&self.inner, &other.inner)
751 }
752}
753
754pub struct SimpleWaiter {
755 event: Arc<InterruptibleEvent>,
756 wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
757}
758
759impl SimpleWaiter {
760 pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
761 (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
762 }
763}
764
765impl Drop for SimpleWaiter {
766 fn drop(&mut self) {
767 for wait_queue in &self.wait_queues {
768 if let Some(wait_queue) = wait_queue.upgrade() {
769 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
770 }
771 }
772 }
773}
774
775#[derive(Debug, Clone)]
776enum WaiterKind {
777 Port(Weak<PortWaiter>),
778 Event(Weak<InterruptibleEvent>),
779 AbortHandle(Weak<futures::stream::AbortHandle>),
780}
781
782impl Default for WaiterKind {
783 fn default() -> Self {
784 WaiterKind::Port(Default::default())
785 }
786}
787
788#[derive(Debug, Default, Clone)]
791pub struct WaiterRef(WaiterKind);
792
793impl WaiterRef {
794 fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
795 WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
796 }
797
798 fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
799 WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
800 }
801
802 pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
803 WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
804 }
805
806 pub fn is_valid(&self) -> bool {
807 match &self.0 {
808 WaiterKind::Port(waiter) => waiter.strong_count() != 0,
809 WaiterKind::Event(event) => event.strong_count() != 0,
810 WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
811 }
812 }
813
814 pub fn interrupt(&self) {
815 match &self.0 {
816 WaiterKind::Port(waiter) => {
817 if let Some(waiter) = waiter.upgrade() {
818 waiter.interrupt();
819 }
820 }
821 WaiterKind::Event(event) => {
822 if let Some(event) = event.upgrade() {
823 event.interrupt();
824 }
825 }
826 WaiterKind::AbortHandle(handle) => {
827 if let Some(handle) = handle.upgrade() {
828 handle.abort();
829 }
830 }
831 }
832 }
833
834 fn remove_callback(&self, key: &WaitKey) {
835 match &self.0 {
836 WaiterKind::Port(waiter) => {
837 if let Some(waiter) = waiter.upgrade() {
838 waiter.remove_callback(key);
839 }
840 }
841 _ => (),
842 }
843 }
844
845 fn upgrade_notifiable(&self) -> Option<NotifiableRef> {
848 match &self.0 {
849 WaiterKind::Port(waiter) => {
850 if let Some(waiter) = waiter.upgrade() {
851 return Some(NotifiableRef::Port(waiter));
852 }
853 }
854 WaiterKind::Event(event) => {
855 if let Some(event) = event.upgrade() {
856 return Some(NotifiableRef::Event(event));
857 }
858 }
859 WaiterKind::AbortHandle(handle) => {
860 if let Some(handle) = handle.upgrade() {
861 return Some(NotifiableRef::AbortHandle(handle));
862 }
863 }
864 }
865 None
866 }
867
868 fn will_remove_from_wait_queue(&self, key: &WaitKey) {
873 match &self.0 {
874 WaiterKind::Port(waiter) => {
875 if let Some(waiter) = waiter.upgrade() {
876 waiter.wait_queues.lock().remove(key);
877 }
878 }
879 _ => (),
880 }
881 }
882}
883
884impl PartialEq<Waiter> for WaiterRef {
885 fn eq(&self, other: &Waiter) -> bool {
886 match &self.0 {
887 WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
888 _ => false,
889 }
890 }
891}
892
893impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
894 fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
895 match &self.0 {
896 WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
897 _ => false,
898 }
899 }
900}
901
902impl PartialEq for WaiterRef {
903 fn eq(&self, other: &WaiterRef) -> bool {
904 match (&self.0, &other.0) {
905 (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
906 (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
907 (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
908 _ => false,
909 }
910 }
911}
912
913impl NotifiableRef {
914 fn notify(&self, key: &WaitKey, events: WaitEvents) {
915 match self {
916 NotifiableRef::Port(port_waiter) => port_waiter.queue_events(key, events),
917 NotifiableRef::Event(interruptible_event) => interruptible_event.notify(),
918 NotifiableRef::AbortHandle(handle) => handle.abort(),
919 }
920 }
921}
922
923#[derive(Default, Debug, Clone)]
930pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
931
932#[derive(Debug)]
933struct WaitEntryWithId {
934 entry: WaitEntry,
935 id: u64,
939}
940
941struct WaitEntryId {
942 key: usize,
943 id: u64,
944}
945
946#[derive(Default, Debug)]
947struct WaitQueueImpl {
948 next_wait_entry_id: u64,
954 waiters: Slab<WaitEntryWithId>,
958}
959
960#[derive(Debug)]
962struct WaitEntry {
963 waiter: WaiterRef,
965
966 filter: WaitEvents,
968
969 key: WaitKey,
971}
972
973impl WaitQueue {
974 fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
975 let mut wait_queue = self.0.lock();
976 let id = wait_queue
977 .next_wait_entry_id
978 .checked_add(1)
979 .expect("all possible wait entry ID values exhausted");
980 wait_queue.next_wait_entry_id = id;
981 WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
982 }
983
984 fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
993 let wait_key = entry.key;
994 let waiter_id = self.add_waiter(entry);
995 let wait_queue = Arc::downgrade(&self.0);
996 waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
997 WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
998 wait_queue,
999 waiter: waiter.weak(),
1000 wait_key,
1001 waiter_id,
1002 }))
1003 }
1004
1005 pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
1014 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1015 }
1016
1017 pub fn wait_async_value_with_handler(
1026 &self,
1027 waiter: &Waiter,
1028 value: u64,
1029 handler: EventHandler,
1030 ) -> WaitCanceler {
1031 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Value(value), handler);
1032 self.wait_async_entry(waiter, entry)
1033 }
1034
1035 pub fn wait_async_fd_events(
1044 &self,
1045 waiter: &Waiter,
1046 events: FdEvents,
1047 handler: EventHandler,
1048 ) -> WaitCanceler {
1049 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1050 self.wait_async_entry(waiter, entry)
1051 }
1052
1053 pub fn wait_async_signal_mask(
1062 &self,
1063 waiter: &Waiter,
1064 mask: SigSet,
1065 handler: EventHandler,
1066 ) -> WaitCanceler {
1067 let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1068 self.wait_async_entry(waiter, entry)
1069 }
1070
1071 pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1080 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1081 }
1082
1083 pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1084 let entry = WaitEntry {
1085 waiter: WaiterRef::from_event(&waiter.event),
1086 filter: WaitEvents::All,
1087 key: Default::default(),
1088 };
1089 waiter.wait_queues.push(Arc::downgrade(&self.0));
1090 self.add_waiter(entry);
1091 }
1092
1093 fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1094 if let WaitEvents::Fd(ref mut fd_events) = events {
1095 *fd_events = fd_events.add_equivalent_fd_events();
1096 }
1097 let mut notifiable_refs = SmallVec::<[(NotifiableRef, WaitKey); 1]>::new();
1103 let mut woken = 0;
1104 {
1105 let mut guard = self.0.lock();
1106 guard.waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1107 if limit > 0 && entry.filter.intercept(&events) {
1108 if let Some(notifiable_ref) = entry.waiter.upgrade_notifiable() {
1109 limit -= 1;
1110 woken += 1;
1111 notifiable_refs.push((notifiable_ref, entry.key));
1112 }
1113
1114 entry.waiter.will_remove_from_wait_queue(&entry.key);
1115 false
1116 } else {
1117 true
1118 }
1119 });
1120 }
1121 for (notifiable_ref, key) in notifiable_refs {
1122 notifiable_ref.notify(&key, events);
1123 }
1124 woken
1125 }
1126
1127 pub fn notify_fd_events(&self, events: FdEvents) {
1128 self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1129 }
1130
1131 pub fn notify_fd_events_count(&self, events: FdEvents, limit: usize) {
1132 self.notify_events_count(WaitEvents::Fd(events), limit);
1133 }
1134
1135 pub fn notify_signal(&self, signal: &Signal) {
1136 let event = WaitEvents::SignalMask(SigSet::from(*signal));
1137 self.notify_events_count(event, usize::MAX);
1138 }
1139
1140 pub fn notify_value(&self, value: u64) {
1141 self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1142 }
1143
1144 pub fn notify_unordered_count(&self, limit: usize) {
1145 self.notify_events_count(WaitEvents::All, limit);
1146 }
1147
1148 pub fn notify_all(&self) {
1149 self.notify_unordered_count(usize::MAX);
1150 }
1151
1152 pub fn is_empty(&self) -> bool {
1154 self.0.lock().waiters.is_empty()
1155 }
1156}
1157
1158pub struct TypedWaitQueue<T: Into<u64>> {
1160 wait_queue: WaitQueue,
1161 value_type: std::marker::PhantomData<T>,
1162}
1163
1164impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1166 fn default() -> Self {
1167 Self { wait_queue: Default::default(), value_type: Default::default() }
1168 }
1169}
1170
1171impl<T: Into<u64>> TypedWaitQueue<T> {
1172 pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1173 self.wait_queue.wait_async_value(waiter, value.into())
1174 }
1175
1176 pub fn wait_async_value_with_handler(
1177 &self,
1178 waiter: &Waiter,
1179 value: T,
1180 handler: EventHandler,
1181 ) -> WaitCanceler {
1182 self.wait_queue.wait_async_value_with_handler(waiter, value.into(), handler)
1183 }
1184
1185 pub fn notify_value(&self, value: T) {
1186 self.wait_queue.notify_value(value.into())
1187 }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192 use super::*;
1193 use crate::fs::fuchsia::create_fuchsia_pipe;
1194 use crate::signals::SignalInfo;
1195 use crate::task::TaskFlags;
1196 use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1197 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1198 use crate::vfs::eventfd::{EventFdType, new_eventfd};
1199 use assert_matches::assert_matches;
1200 use starnix_sync::Unlocked;
1201 use starnix_uapi::open_flags::OpenFlags;
1202 use starnix_uapi::signals::SIGUSR1;
1203
1204 const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1205
1206 #[::fuchsia::test]
1207 async fn test_async_wait_exec() {
1208 spawn_kernel_and_run(async |locked, current_task| {
1209 let (local_socket, remote_socket) = zx::Socket::create_stream();
1210 let pipe =
1211 create_fuchsia_pipe(locked, ¤t_task, remote_socket, OpenFlags::RDWR).unwrap();
1212
1213 const MEM_SIZE: usize = 1024;
1214 let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1215
1216 let test_string = "hello startnix".to_string();
1217 let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1218 Default::default();
1219 let handler = EventHandler::Enqueue {
1220 key: KEY,
1221 queue: queue.clone(),
1222 sought_events: FdEvents::all(),
1223 };
1224 let waiter = Waiter::new();
1225 pipe.wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1226 .expect("wait_async");
1227 let test_string_clone = test_string.clone();
1228
1229 let write_count = AtomicCounter::<usize>::default();
1230 std::thread::scope(|s| {
1231 let thread = s.spawn(|| {
1232 let test_data = test_string_clone.as_bytes();
1233 let no_written = local_socket.write(test_data).unwrap();
1234 assert_eq!(0, write_count.add(no_written));
1235 assert_eq!(no_written, test_data.len());
1236 });
1237
1238 assert!(queue.lock().is_empty());
1241 waiter.wait(locked, ¤t_task).unwrap();
1242 thread.join().expect("join thread")
1243 });
1244 queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1245
1246 let read_size = pipe.read(locked, ¤t_task, &mut output_buffer).unwrap();
1247
1248 let no_written = write_count.get();
1249 assert_eq!(no_written, read_size);
1250
1251 assert_eq!(output_buffer.data(), test_string.as_bytes());
1252 })
1253 .await;
1254 }
1255
1256 #[::fuchsia::test]
1257 async fn test_async_wait_cancel() {
1258 for do_cancel in [true, false] {
1259 spawn_kernel_and_run(async move |locked, current_task| {
1260 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
1261 let waiter = Waiter::new();
1262 let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1263 Default::default();
1264 let handler = EventHandler::Enqueue {
1265 key: KEY,
1266 queue: queue.clone(),
1267 sought_events: FdEvents::all(),
1268 };
1269 let wait_canceler = event
1270 .wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1271 .expect("wait_async");
1272 if do_cancel {
1273 wait_canceler.cancel();
1274 }
1275 let add_val = 1u64;
1276 assert_eq!(
1277 event
1278 .write(
1279 locked,
1280 ¤t_task,
1281 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1282 )
1283 .unwrap(),
1284 std::mem::size_of::<u64>()
1285 );
1286
1287 let wait_result =
1288 waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO);
1289 let final_count = queue.lock().len();
1290 if do_cancel {
1291 assert_eq!(wait_result, error!(ETIMEDOUT));
1292 assert_eq!(0, final_count);
1293 } else {
1294 assert_eq!(wait_result, Ok(()));
1295 assert_eq!(1, final_count);
1296 }
1297 })
1298 .await;
1299 }
1300 }
1301
1302 #[::fuchsia::test]
1303 async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1304 spawn_kernel_and_run(async |locked, current_task| {
1305 let wait_queue = WaitQueue::default();
1306 let waiter = Waiter::new();
1307 let wk1 = wait_queue.wait_async(&waiter);
1308 let _wk2 = wait_queue.wait_async(&waiter);
1309 wk1.cancel();
1310 wait_queue.notify_all();
1311 assert!(waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1312 })
1313 .await;
1314 }
1315
1316 #[::fuchsia::test]
1317 async fn multiple_waiters_cancel_one_other_still_notified() {
1318 spawn_kernel_and_run(async |locked, current_task| {
1319 let wait_queue = WaitQueue::default();
1320 let waiter1 = Waiter::new();
1321 let waiter2 = Waiter::new();
1322 let wk1 = wait_queue.wait_async(&waiter1);
1323 let _wk2 = wait_queue.wait_async(&waiter2);
1324 wk1.cancel();
1325 wait_queue.notify_all();
1326 assert!(waiter1.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_err());
1327 assert!(waiter2.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1328 })
1329 .await;
1330 }
1331
1332 #[::fuchsia::test]
1333 async fn test_wait_queue() {
1334 spawn_kernel_and_run(async |locked, current_task| {
1335 let queue = WaitQueue::default();
1336
1337 let waiters = <[Waiter; 3]>::default();
1338 waiters.iter().for_each(|w| {
1339 queue.wait_async(w);
1340 });
1341
1342 let woken = |locked: &mut Locked<Unlocked>| {
1343 waiters
1344 .iter()
1345 .filter(|w| {
1346 w.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok()
1347 })
1348 .count()
1349 };
1350
1351 const INITIAL_NOTIFY_COUNT: usize = 2;
1352 let total_waiters = waiters.len();
1353 queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1354 assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1355
1356 queue.notify_all();
1358 assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1359 })
1360 .await;
1361 }
1362
1363 #[::fuchsia::test]
1364 async fn waiter_kind_abort_handle() {
1365 spawn_kernel_and_run_sync(|_locked, current_task| {
1366 let mut executor = fuchsia_async::TestExecutor::new();
1367 let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1368 let abort_handle = Arc::new(abort_handle);
1369 let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1370
1371 let mut fut = futures::stream::Abortable::new(
1372 futures::future::pending::<()>(),
1373 abort_registration,
1374 );
1375
1376 assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1377
1378 waiter_ref.interrupt();
1379 let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1380 match executor.run_singlethreaded(&mut fut) {
1381 Ok(()) => unreachable!("future never terminates normally"),
1382 Err(futures::stream::Aborted) => Ok(()),
1383 }
1384 });
1385
1386 assert_eq!(output, Ok(()));
1387 })
1388 .await;
1389 }
1390
1391 #[::fuchsia::test]
1392 async fn freeze_with_pending_sigusr1() {
1393 spawn_kernel_and_run(async |_locked, current_task| {
1394 {
1395 let mut task_state = current_task.task.write();
1396 let siginfo = SignalInfo::kernel(SIGUSR1);
1397 task_state.enqueue_signal(siginfo);
1398 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1399 }
1400
1401 let output: Result<(), Errno> = current_task
1402 .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1403 unreachable!("callback should not be called")
1404 });
1405 assert_eq!(output, error!(EINTR));
1406
1407 let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1408 assert_eq!(output, Ok(()));
1409 })
1410 .await;
1411 }
1412
1413 #[::fuchsia::test]
1414 async fn freeze_with_pending_sigkill() {
1415 spawn_kernel_and_run(async |_locked, current_task| {
1416 {
1417 let mut task_state = current_task.task.write();
1418 let siginfo = SignalInfo::kernel(SIGKILL);
1419 task_state.enqueue_signal(siginfo);
1420 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1421 }
1422
1423 let output: Result<(), _> = current_task
1424 .run_in_state(RunState::Frozen(Waiter::new()), move || {
1425 unreachable!("callback should not be called")
1426 });
1427 assert_eq!(output, error!(EINTR));
1428 })
1429 .await;
1430 }
1431
1432 #[::fuchsia::test]
1433 async fn test_async_typed_wait_value_with_handler() {
1434 spawn_kernel_and_run(async |locked, current_task| {
1435 let queue: Arc<LockDepMutex<VecDeque<ReadyItem>, EventHandlerReadyQueueLock>> =
1436 Default::default();
1437 let handler = EventHandler::Enqueue {
1438 key: KEY,
1439 queue: queue.clone(),
1440 sought_events: FdEvents::all(),
1441 };
1442 let waiter = Waiter::new();
1443 let wait_queue = TypedWaitQueue::<u64>::default();
1444
1445 let test_value = 100u64;
1446 let _wait_canceler =
1447 wait_queue.wait_async_value_with_handler(&waiter, test_value, handler);
1448
1449 assert!(queue.lock().is_empty());
1450
1451 wait_queue.notify_value(test_value + 1);
1453 assert!(queue.lock().is_empty());
1454
1455 wait_queue.notify_value(test_value);
1457
1458 waiter.wait(locked, ¤t_task).expect("wait failed");
1459
1460 let ready_items = queue.lock();
1462 assert_eq!(ready_items.len(), 1);
1463 assert!(ready_items[0].events.contains(FdEvents::POLLIN));
1464 })
1465 .await;
1466 }
1467}