1use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use bitflags::bitflags;
9use futures::stream::AbortHandle;
10use slab::Slab;
11use smallvec::SmallVec;
12use starnix_lifecycle::AtomicCounter;
13use starnix_sync::{
14 EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
15 PortEvent, PortWaitResult,
16};
17use starnix_types::ownership::debug_assert_no_local_temp_ref;
18use starnix_uapi::error;
19use starnix_uapi::errors::{EINTR, Errno};
20use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
21use starnix_uapi::vfs::FdEvents;
22use std::collections::{HashMap, VecDeque};
23use std::sync::{Arc, Weak};
24use syncio::zxio::zxio_signals_t;
25use syncio::{ZxioSignals, ZxioWeak};
26
27#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
28pub enum ReadyItemKey {
29 FdNumber(FdNumber),
30 Usize(usize),
31}
32
33impl From<FdNumber> for ReadyItemKey {
34 fn from(v: FdNumber) -> Self {
35 Self::FdNumber(v)
36 }
37}
38
39impl From<usize> for ReadyItemKey {
40 fn from(v: usize) -> Self {
41 Self::Usize(v)
42 }
43}
44
45#[derive(Debug, Copy, Clone)]
46pub struct ReadyItem {
47 pub key: ReadyItemKey,
48 pub events: FdEvents,
49}
50
51#[derive(Clone)]
52pub enum EventHandler {
53 None,
58
59 Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
64
65 HandleOnce(Arc<Mutex<Option<EventHandler>>>),
70
71 Epoll(EpollEventHandler),
73}
74
75impl EventHandler {
76 pub fn handle(self, events: FdEvents) {
77 match self {
78 Self::None => {}
79 Self::Enqueue { key, queue, sought_events } => {
80 let events = events & sought_events;
81 queue.lock().push_back(ReadyItem { key, events });
82 }
83 Self::HandleOnce(inner) => {
84 if let Some(inner) = inner.lock().take() {
85 inner.handle(events);
86 }
87 }
88 Self::Epoll(e) => e.handle(events),
89 }
90 }
91}
92
93pub struct ZxioSignalHandler {
94 pub zxio: ZxioWeak,
95 pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
96}
97
98pub struct ManyZxHandleSignalHandler {
101 pub count: usize,
102 pub counter: Arc<AtomicCounter<usize>>,
103 pub expected_signals: zx::Signals,
104 pub events: FdEvents,
105}
106
107pub enum SignalHandlerInner {
108 None,
109 Zxio(ZxioSignalHandler),
110 ZxHandle(fn(zx::Signals) -> FdEvents),
111 ManyZxHandle(ManyZxHandleSignalHandler),
112}
113
114pub struct SignalHandler {
115 pub inner: SignalHandlerInner,
116 pub event_handler: EventHandler,
117 pub err_code: Option<Errno>,
118}
119
120impl SignalHandler {
121 fn handle(self, signals: zx::Signals) -> Option<Errno> {
122 let SignalHandler { inner, event_handler, err_code } = self;
123 let events = match inner {
124 SignalHandlerInner::None => None,
125 SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
126 if let Some(zxio) = zxio.upgrade() {
127 Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
128 } else {
129 None
130 }
131 }
132 SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
133 Some(get_events_from_zx_signals(signals))
134 }
135 SignalHandlerInner::ManyZxHandle(signal_handler) => {
136 if signals.contains(signal_handler.expected_signals) {
137 let new_count = signal_handler.counter.next() + 1;
138 assert!(new_count <= signal_handler.count);
139 if new_count == signal_handler.count {
140 Some(signal_handler.events)
141 } else {
142 None
143 }
144 } else {
145 None
146 }
147 }
148 };
149 if let Some(events) = events {
150 event_handler.handle(events)
151 }
152 err_code
153 }
154}
155
156pub enum WaitCallback {
157 SignalHandler(SignalHandler),
158 EventHandler(EventHandler),
159}
160
161struct WaitCancelerQueue {
162 wait_queue: Weak<Mutex<WaitQueueImpl>>,
163 waiter: WaiterRef,
164 wait_key: WaitKey,
165 waiter_id: WaitEntryId,
166}
167
168struct WaitCancelerZxio {
169 zxio: ZxioWeak,
170 inner: PortWaitCanceler,
171}
172
173struct WaitCancelerPort {
174 inner: PortWaitCanceler,
175}
176
177enum WaitCancelerInner {
178 Zxio(WaitCancelerZxio),
179 Queue(WaitCancelerQueue),
180 Port(WaitCancelerPort),
181}
182
183enum NotifiableRef {
184 Port(Arc<PortWaiter>),
185 Event(Arc<InterruptibleEvent>),
186 AbortHandle(Arc<AbortHandle>),
187}
188
189const WAIT_CANCELER_COMMON_SIZE: usize = 2;
190
191pub struct WaitCanceler {
198 cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
199}
200
201impl WaitCanceler {
202 fn new_inner(inner: WaitCancelerInner) -> Self {
203 Self { cancellers: smallvec::smallvec![inner] }
204 }
205
206 pub fn new_noop() -> Self {
207 Self { cancellers: Default::default() }
208 }
209
210 pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
211 Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
212 }
213
214 pub fn new_port(inner: PortWaitCanceler) -> Self {
215 Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
216 }
217
218 pub fn merge(self, other: Self) -> Self {
224 assert!(
227 self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
228 "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
229 WAIT_CANCELER_COMMON_SIZE,
230 self.cancellers.len(),
231 other.cancellers.len()
232 );
233 WaitCanceler::merge_unbounded(self, other)
234 }
235
236 pub fn merge_unbounded(
238 Self { mut cancellers }: Self,
239 Self { cancellers: mut other }: Self,
240 ) -> Self {
241 cancellers.append(&mut other);
242 WaitCanceler { cancellers }
243 }
244
245 pub fn cancel(self) {
249 let Self { cancellers } = self;
250 for canceller in cancellers.into_iter().rev() {
251 match canceller {
252 WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
253 let Some(zxio) = zxio.upgrade() else { return };
254 let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
255 inner.cancel();
256 zxio.wait_end(signals);
257 }
258 WaitCancelerInner::Queue(WaitCancelerQueue {
259 wait_queue,
260 waiter,
261 wait_key,
262 waiter_id: WaitEntryId { key, id },
263 }) => {
264 let Some(wait_queue) = wait_queue.upgrade() else { return };
265 waiter.remove_callback(&wait_key);
266 waiter.will_remove_from_wait_queue(&wait_key);
267 let mut wait_queue = wait_queue.lock();
268 let waiters = &mut wait_queue.waiters;
269 if let Some(entry) = waiters.get_mut(key) {
270 if entry.id == id {
275 waiters.remove(key);
276 }
277 }
278 }
279 WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
280 inner.cancel();
281 }
282 }
283 }
284 }
285}
286
287pub struct PortWaitCanceler {
294 waiter: Weak<PortWaiter>,
295 key: WaitKey,
296}
297
298impl PortWaitCanceler {
299 pub fn cancel(self) {
303 let Self { waiter, key } = self;
304 if let Some(waiter) = waiter.upgrade() {
305 let _ = waiter.port.cancel(key.raw);
306 waiter.remove_callback(&key);
307 }
308 }
309}
310
311#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
312struct WaitKey {
313 raw: u64,
314}
315
316#[derive(Clone, Copy, Debug)]
318enum WaitEvents {
319 All,
322 Fd(FdEvents),
324 Value(u64),
326 SignalMask(SigSet),
328}
329
330impl WaitEvents {
331 fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
333 match (self, other) {
334 (Self::All, _) | (_, Self::All) => true,
335 (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
336 (Self::Value(v1), Self::Value(v2)) => v1 == v2,
337 (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
339 _ => false,
340 }
341 }
342}
343
344impl WaitCallback {
345 pub fn none() -> EventHandler {
346 EventHandler::None
347 }
348}
349
350bitflags! {
351 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
352 pub struct WaiterOptions: u8 {
353 const IGNORE_SIGNALS = 1 << 0;
355
356 const UNSAFE_CALLSTACK = 1 << 1;
361 }
362}
363
364struct PortWaiter {
368 port: PortEvent,
369 callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, next_key: AtomicCounter<u64>,
371 options: WaiterOptions,
372
373 wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
378}
379
380impl PortWaiter {
381 fn new(options: WaiterOptions) -> Arc<Self> {
383 Arc::new(PortWaiter {
384 port: PortEvent::new(),
385 callbacks: Default::default(),
386 next_key: AtomicCounter::<u64>::new(1),
387 options,
388 wait_queues: Default::default(),
389 })
390 }
391
392 fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
394 debug_assert_no_local_temp_ref();
398
399 match self.port.wait(deadline) {
400 PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
401 PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
402 PortWaitResult::Signal { key, observed } => {
403 if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
404 match callback {
405 WaitCallback::SignalHandler(handler) => {
406 if let Some(errno) = handler.handle(observed) {
407 return Err(errno);
408 }
409 }
410 WaitCallback::EventHandler(_) => {
411 panic!("wrong type of handler called")
412 }
413 }
414 }
415
416 Ok(())
417 }
418 PortWaitResult::TimedOut => error!(ETIMEDOUT),
419 }
420 }
421
422 fn wait_until<L>(
423 self: &Arc<Self>,
424 locked: &mut Locked<L>,
425 current_task: &CurrentTask,
426 run_state: RunState,
427 deadline: zx::MonotonicInstant,
428 ) -> Result<(), Errno>
429 where
430 L: LockEqualOrBefore<FileOpsCore>,
431 {
432 let is_waiting = deadline.into_nanos() > 0;
433
434 let callback = || {
435 loop {
446 let wait_result = self.wait_internal(deadline);
447 if let Err(errno) = &wait_result {
448 if errno.code == EINTR && !is_waiting {
449 continue; }
451 }
452 return wait_result;
453 }
454 };
455
456 if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
460 current_task.trigger_delayed_releaser(locked);
461 }
462
463 if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
464 }
465
466 fn next_key(&self) -> WaitKey {
467 let key = self.next_key.next();
468 assert!(key != 0, "bad key from u64 wraparound");
470 WaitKey { raw: key }
471 }
472
473 fn register_callback(&self, callback: WaitCallback) -> WaitKey {
474 let key = self.next_key();
475 assert!(
476 self.callbacks.lock().insert(key, callback).is_none(),
477 "unexpected callback already present for key {key:?}"
478 );
479 key
480 }
481
482 fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
483 self.callbacks.lock().remove(&key)
484 }
485
486 fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
487 let callback = WaitCallback::EventHandler(handler);
488 let key = self.register_callback(callback);
489 self.queue_events(&key, WaitEvents::Fd(events));
490 }
491
492 fn wake_on_zircon_signals(
498 self: &Arc<Self>,
499 handle: &dyn zx::AsHandleRef,
500 zx_signals: zx::Signals,
501 handler: SignalHandler,
502 ) -> Result<PortWaitCanceler, zx::Status> {
503 let callback = WaitCallback::SignalHandler(handler);
504 let key = self.register_callback(callback);
505 self.port.object_wait_async(
506 handle,
507 key.raw,
508 zx_signals,
509 zx::WaitAsyncOpts::EDGE_TRIGGERED,
510 )?;
511 Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
512 }
513
514 fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
515 scopeguard::defer! {
516 self.port.notify(NotifyKind::Regular)
517 }
518
519 let Some(callback) = self.remove_callback(key) else {
531 return;
532 };
533
534 match callback {
535 WaitCallback::EventHandler(handler) => {
536 let events = match events {
537 WaitEvents::All => FdEvents::all(),
540 WaitEvents::Fd(events) => events,
541 WaitEvents::SignalMask(_) => FdEvents::POLLIN,
542 WaitEvents::Value(_) => FdEvents::POLLIN,
543 };
544 handler.handle(events)
545 }
546 WaitCallback::SignalHandler(_) => {
547 panic!("wrong type of handler called")
548 }
549 }
550 }
551
552 fn notify(&self) {
553 self.port.notify(NotifyKind::Regular);
554 }
555
556 fn interrupt(&self) {
557 if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
558 return;
559 }
560 self.port.notify(NotifyKind::Interrupt);
561 }
562}
563
564impl std::fmt::Debug for PortWaiter {
565 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566 f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
567 }
568}
569
570#[derive(Debug, Clone)]
572pub struct Waiter {
573 inner: Arc<PortWaiter>,
576}
577
578impl Waiter {
579 pub fn new() -> Self {
581 Self { inner: PortWaiter::new(WaiterOptions::empty()) }
582 }
583
584 pub fn with_options(options: WaiterOptions) -> Self {
586 Self { inner: PortWaiter::new(options) }
587 }
588
589 fn weak(&self) -> WaiterRef {
591 WaiterRef::from_port(&self.inner)
592 }
593
594 pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
598 where
599 L: LockEqualOrBefore<FileOpsCore>,
600 {
601 while self
602 .inner
603 .wait_until(
604 locked,
605 current_task,
606 RunState::Frozen(self.clone()),
607 zx::MonotonicInstant::INFINITE,
608 )
609 .is_err()
610 {
611 if current_task.read().has_signal_pending(SIGKILL) {
613 break;
614 }
615 }
617 }
618
619 pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
623 where
624 L: LockEqualOrBefore<FileOpsCore>,
625 {
626 self.inner.wait_until(
627 locked,
628 current_task,
629 RunState::Waiter(WaiterRef::from_port(&self.inner)),
630 zx::MonotonicInstant::INFINITE,
631 )
632 }
633
634 pub fn wait_until<L>(
657 &self,
658 locked: &mut Locked<L>,
659 current_task: &CurrentTask,
660 deadline: zx::MonotonicInstant,
661 ) -> Result<(), Errno>
662 where
663 L: LockEqualOrBefore<FileOpsCore>,
664 {
665 self.inner.wait_until(
666 locked,
667 current_task,
668 RunState::Waiter(WaiterRef::from_port(&self.inner)),
669 deadline,
670 )
671 }
672
673 fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
674 WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
675 }
676
677 fn create_wait_entry_with_handler(
678 &self,
679 filter: WaitEvents,
680 handler: EventHandler,
681 ) -> WaitEntry {
682 let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
683 WaitEntry { waiter: self.weak(), filter, key }
684 }
685
686 pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
687 self.inner.wake_immediately(events, handler);
688 }
689
690 pub fn wake_on_zircon_signals(
695 &self,
696 handle: &dyn zx::AsHandleRef,
697 zx_signals: zx::Signals,
698 handler: SignalHandler,
699 ) -> Result<PortWaitCanceler, zx::Status> {
700 self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
701 }
702
703 pub fn fake_wait(&self) -> WaitCanceler {
707 WaitCanceler::new_noop()
708 }
709
710 pub fn notify(&self) {
712 self.inner.notify();
713 }
714
715 pub fn interrupt(&self) {
721 self.inner.interrupt();
722 }
723}
724
725impl Drop for Waiter {
726 fn drop(&mut self) {
727 let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
730 for wait_queue in wait_queues {
731 if let Some(wait_queue) = wait_queue.upgrade() {
732 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
733 }
734 }
735 }
736}
737
738impl Default for Waiter {
739 fn default() -> Self {
740 Self::new()
741 }
742}
743
744impl PartialEq for Waiter {
745 fn eq(&self, other: &Self) -> bool {
746 Arc::ptr_eq(&self.inner, &other.inner)
747 }
748}
749
750pub struct SimpleWaiter {
751 event: Arc<InterruptibleEvent>,
752 wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
753}
754
755impl SimpleWaiter {
756 pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
757 (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
758 }
759}
760
761impl Drop for SimpleWaiter {
762 fn drop(&mut self) {
763 for wait_queue in &self.wait_queues {
764 if let Some(wait_queue) = wait_queue.upgrade() {
765 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
766 }
767 }
768 }
769}
770
771#[derive(Debug, Clone)]
772enum WaiterKind {
773 Port(Weak<PortWaiter>),
774 Event(Weak<InterruptibleEvent>),
775 AbortHandle(Weak<futures::stream::AbortHandle>),
776}
777
778impl Default for WaiterKind {
779 fn default() -> Self {
780 WaiterKind::Port(Default::default())
781 }
782}
783
784#[derive(Debug, Default, Clone)]
787pub struct WaiterRef(WaiterKind);
788
789impl WaiterRef {
790 fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
791 WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
792 }
793
794 fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
795 WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
796 }
797
798 pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
799 WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
800 }
801
802 pub fn is_valid(&self) -> bool {
803 match &self.0 {
804 WaiterKind::Port(waiter) => waiter.strong_count() != 0,
805 WaiterKind::Event(event) => event.strong_count() != 0,
806 WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
807 }
808 }
809
810 pub fn interrupt(&self) {
811 match &self.0 {
812 WaiterKind::Port(waiter) => {
813 if let Some(waiter) = waiter.upgrade() {
814 waiter.interrupt();
815 }
816 }
817 WaiterKind::Event(event) => {
818 if let Some(event) = event.upgrade() {
819 event.interrupt();
820 }
821 }
822 WaiterKind::AbortHandle(handle) => {
823 if let Some(handle) = handle.upgrade() {
824 handle.abort();
825 }
826 }
827 }
828 }
829
830 fn remove_callback(&self, key: &WaitKey) {
831 match &self.0 {
832 WaiterKind::Port(waiter) => {
833 if let Some(waiter) = waiter.upgrade() {
834 waiter.remove_callback(key);
835 }
836 }
837 _ => (),
838 }
839 }
840
841 fn upgrade_notifiable(&self) -> Option<NotifiableRef> {
844 match &self.0 {
845 WaiterKind::Port(waiter) => {
846 if let Some(waiter) = waiter.upgrade() {
847 return Some(NotifiableRef::Port(waiter));
848 }
849 }
850 WaiterKind::Event(event) => {
851 if let Some(event) = event.upgrade() {
852 return Some(NotifiableRef::Event(event));
853 }
854 }
855 WaiterKind::AbortHandle(handle) => {
856 if let Some(handle) = handle.upgrade() {
857 return Some(NotifiableRef::AbortHandle(handle));
858 }
859 }
860 }
861 None
862 }
863
864 fn will_remove_from_wait_queue(&self, key: &WaitKey) {
869 match &self.0 {
870 WaiterKind::Port(waiter) => {
871 if let Some(waiter) = waiter.upgrade() {
872 waiter.wait_queues.lock().remove(key);
873 }
874 }
875 _ => (),
876 }
877 }
878}
879
880impl PartialEq<Waiter> for WaiterRef {
881 fn eq(&self, other: &Waiter) -> bool {
882 match &self.0 {
883 WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
884 _ => false,
885 }
886 }
887}
888
889impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
890 fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
891 match &self.0 {
892 WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
893 _ => false,
894 }
895 }
896}
897
898impl PartialEq for WaiterRef {
899 fn eq(&self, other: &WaiterRef) -> bool {
900 match (&self.0, &other.0) {
901 (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
902 (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
903 (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
904 _ => false,
905 }
906 }
907}
908
909impl NotifiableRef {
910 fn notify(&self, key: &WaitKey, events: WaitEvents) {
911 match self {
912 NotifiableRef::Port(port_waiter) => port_waiter.queue_events(key, events),
913 NotifiableRef::Event(interruptible_event) => interruptible_event.notify(),
914 NotifiableRef::AbortHandle(handle) => handle.abort(),
915 }
916 }
917}
918
919#[derive(Default, Debug, Clone)]
926pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
927
928#[derive(Debug)]
929struct WaitEntryWithId {
930 entry: WaitEntry,
931 id: u64,
935}
936
937struct WaitEntryId {
938 key: usize,
939 id: u64,
940}
941
942#[derive(Default, Debug)]
943struct WaitQueueImpl {
944 next_wait_entry_id: u64,
950 waiters: Slab<WaitEntryWithId>,
954}
955
956#[derive(Debug)]
958struct WaitEntry {
959 waiter: WaiterRef,
961
962 filter: WaitEvents,
964
965 key: WaitKey,
967}
968
969impl WaitQueue {
970 fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
971 let mut wait_queue = self.0.lock();
972 let id = wait_queue
973 .next_wait_entry_id
974 .checked_add(1)
975 .expect("all possible wait entry ID values exhausted");
976 wait_queue.next_wait_entry_id = id;
977 WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
978 }
979
980 fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
989 let wait_key = entry.key;
990 let waiter_id = self.add_waiter(entry);
991 let wait_queue = Arc::downgrade(&self.0);
992 waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
993 WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
994 wait_queue,
995 waiter: waiter.weak(),
996 wait_key,
997 waiter_id,
998 }))
999 }
1000
1001 pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
1010 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1011 }
1012
1013 pub fn wait_async_value_with_handler(
1022 &self,
1023 waiter: &Waiter,
1024 value: u64,
1025 handler: EventHandler,
1026 ) -> WaitCanceler {
1027 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Value(value), handler);
1028 self.wait_async_entry(waiter, entry)
1029 }
1030
1031 pub fn wait_async_fd_events(
1040 &self,
1041 waiter: &Waiter,
1042 events: FdEvents,
1043 handler: EventHandler,
1044 ) -> WaitCanceler {
1045 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1046 self.wait_async_entry(waiter, entry)
1047 }
1048
1049 pub fn wait_async_signal_mask(
1058 &self,
1059 waiter: &Waiter,
1060 mask: SigSet,
1061 handler: EventHandler,
1062 ) -> WaitCanceler {
1063 let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1064 self.wait_async_entry(waiter, entry)
1065 }
1066
1067 pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1076 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1077 }
1078
1079 pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1080 let entry = WaitEntry {
1081 waiter: WaiterRef::from_event(&waiter.event),
1082 filter: WaitEvents::All,
1083 key: Default::default(),
1084 };
1085 waiter.wait_queues.push(Arc::downgrade(&self.0));
1086 self.add_waiter(entry);
1087 }
1088
1089 fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1090 if let WaitEvents::Fd(ref mut fd_events) = events {
1091 *fd_events = fd_events.add_equivalent_fd_events();
1092 }
1093 let mut notifiable_refs = SmallVec::<[(NotifiableRef, WaitKey); 1]>::new();
1099 let mut woken = 0;
1100 {
1101 let mut guard = self.0.lock();
1102 guard.waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1103 if limit > 0 && entry.filter.intercept(&events) {
1104 if let Some(notifiable_ref) = entry.waiter.upgrade_notifiable() {
1105 limit -= 1;
1106 woken += 1;
1107 notifiable_refs.push((notifiable_ref, entry.key));
1108 }
1109
1110 entry.waiter.will_remove_from_wait_queue(&entry.key);
1111 false
1112 } else {
1113 true
1114 }
1115 });
1116 }
1117 for (notifiable_ref, key) in notifiable_refs {
1118 notifiable_ref.notify(&key, events);
1119 }
1120 woken
1121 }
1122
1123 pub fn notify_fd_events(&self, events: FdEvents) {
1124 self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1125 }
1126
1127 pub fn notify_fd_events_count(&self, events: FdEvents, limit: usize) {
1128 self.notify_events_count(WaitEvents::Fd(events), limit);
1129 }
1130
1131 pub fn notify_signal(&self, signal: &Signal) {
1132 let event = WaitEvents::SignalMask(SigSet::from(*signal));
1133 self.notify_events_count(event, usize::MAX);
1134 }
1135
1136 pub fn notify_value(&self, value: u64) {
1137 self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1138 }
1139
1140 pub fn notify_unordered_count(&self, limit: usize) {
1141 self.notify_events_count(WaitEvents::All, limit);
1142 }
1143
1144 pub fn notify_all(&self) {
1145 self.notify_unordered_count(usize::MAX);
1146 }
1147
1148 pub fn is_empty(&self) -> bool {
1150 self.0.lock().waiters.is_empty()
1151 }
1152}
1153
1154pub struct TypedWaitQueue<T: Into<u64>> {
1156 wait_queue: WaitQueue,
1157 value_type: std::marker::PhantomData<T>,
1158}
1159
1160impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1162 fn default() -> Self {
1163 Self { wait_queue: Default::default(), value_type: Default::default() }
1164 }
1165}
1166
1167impl<T: Into<u64>> TypedWaitQueue<T> {
1168 pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1169 self.wait_queue.wait_async_value(waiter, value.into())
1170 }
1171
1172 pub fn wait_async_value_with_handler(
1173 &self,
1174 waiter: &Waiter,
1175 value: T,
1176 handler: EventHandler,
1177 ) -> WaitCanceler {
1178 self.wait_queue.wait_async_value_with_handler(waiter, value.into(), handler)
1179 }
1180
1181 pub fn notify_value(&self, value: T) {
1182 self.wait_queue.notify_value(value.into())
1183 }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188 use super::*;
1189 use crate::fs::fuchsia::create_fuchsia_pipe;
1190 use crate::signals::SignalInfo;
1191 use crate::task::TaskFlags;
1192 use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1193 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1194 use crate::vfs::eventfd::{EventFdType, new_eventfd};
1195 use assert_matches::assert_matches;
1196 use starnix_sync::Unlocked;
1197 use starnix_uapi::open_flags::OpenFlags;
1198 use starnix_uapi::signals::SIGUSR1;
1199
1200 const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1201
1202 #[::fuchsia::test]
1203 async fn test_async_wait_exec() {
1204 spawn_kernel_and_run(async |locked, current_task| {
1205 let (local_socket, remote_socket) = zx::Socket::create_stream();
1206 let pipe =
1207 create_fuchsia_pipe(locked, ¤t_task, remote_socket, OpenFlags::RDWR).unwrap();
1208
1209 const MEM_SIZE: usize = 1024;
1210 let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1211
1212 let test_string = "hello startnix".to_string();
1213 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1214 let handler = EventHandler::Enqueue {
1215 key: KEY,
1216 queue: queue.clone(),
1217 sought_events: FdEvents::all(),
1218 };
1219 let waiter = Waiter::new();
1220 pipe.wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1221 .expect("wait_async");
1222 let test_string_clone = test_string.clone();
1223
1224 let write_count = AtomicCounter::<usize>::default();
1225 std::thread::scope(|s| {
1226 let thread = s.spawn(|| {
1227 let test_data = test_string_clone.as_bytes();
1228 let no_written = local_socket.write(test_data).unwrap();
1229 assert_eq!(0, write_count.add(no_written));
1230 assert_eq!(no_written, test_data.len());
1231 });
1232
1233 assert!(queue.lock().is_empty());
1236 waiter.wait(locked, ¤t_task).unwrap();
1237 thread.join().expect("join thread")
1238 });
1239 queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1240
1241 let read_size = pipe.read(locked, ¤t_task, &mut output_buffer).unwrap();
1242
1243 let no_written = write_count.get();
1244 assert_eq!(no_written, read_size);
1245
1246 assert_eq!(output_buffer.data(), test_string.as_bytes());
1247 })
1248 .await;
1249 }
1250
1251 #[::fuchsia::test]
1252 async fn test_async_wait_cancel() {
1253 for do_cancel in [true, false] {
1254 spawn_kernel_and_run(async move |locked, current_task| {
1255 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
1256 let waiter = Waiter::new();
1257 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1258 let handler = EventHandler::Enqueue {
1259 key: KEY,
1260 queue: queue.clone(),
1261 sought_events: FdEvents::all(),
1262 };
1263 let wait_canceler = event
1264 .wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1265 .expect("wait_async");
1266 if do_cancel {
1267 wait_canceler.cancel();
1268 }
1269 let add_val = 1u64;
1270 assert_eq!(
1271 event
1272 .write(
1273 locked,
1274 ¤t_task,
1275 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1276 )
1277 .unwrap(),
1278 std::mem::size_of::<u64>()
1279 );
1280
1281 let wait_result =
1282 waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO);
1283 let final_count = queue.lock().len();
1284 if do_cancel {
1285 assert_eq!(wait_result, error!(ETIMEDOUT));
1286 assert_eq!(0, final_count);
1287 } else {
1288 assert_eq!(wait_result, Ok(()));
1289 assert_eq!(1, final_count);
1290 }
1291 })
1292 .await;
1293 }
1294 }
1295
1296 #[::fuchsia::test]
1297 async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1298 spawn_kernel_and_run(async |locked, current_task| {
1299 let wait_queue = WaitQueue::default();
1300 let waiter = Waiter::new();
1301 let wk1 = wait_queue.wait_async(&waiter);
1302 let _wk2 = wait_queue.wait_async(&waiter);
1303 wk1.cancel();
1304 wait_queue.notify_all();
1305 assert!(waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1306 })
1307 .await;
1308 }
1309
1310 #[::fuchsia::test]
1311 async fn multiple_waiters_cancel_one_other_still_notified() {
1312 spawn_kernel_and_run(async |locked, current_task| {
1313 let wait_queue = WaitQueue::default();
1314 let waiter1 = Waiter::new();
1315 let waiter2 = Waiter::new();
1316 let wk1 = wait_queue.wait_async(&waiter1);
1317 let _wk2 = wait_queue.wait_async(&waiter2);
1318 wk1.cancel();
1319 wait_queue.notify_all();
1320 assert!(waiter1.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_err());
1321 assert!(waiter2.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1322 })
1323 .await;
1324 }
1325
1326 #[::fuchsia::test]
1327 async fn test_wait_queue() {
1328 spawn_kernel_and_run(async |locked, current_task| {
1329 let queue = WaitQueue::default();
1330
1331 let waiters = <[Waiter; 3]>::default();
1332 waiters.iter().for_each(|w| {
1333 queue.wait_async(w);
1334 });
1335
1336 let woken = |locked: &mut Locked<Unlocked>| {
1337 waiters
1338 .iter()
1339 .filter(|w| {
1340 w.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok()
1341 })
1342 .count()
1343 };
1344
1345 const INITIAL_NOTIFY_COUNT: usize = 2;
1346 let total_waiters = waiters.len();
1347 queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1348 assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1349
1350 queue.notify_all();
1352 assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1353 })
1354 .await;
1355 }
1356
1357 #[::fuchsia::test]
1358 async fn waiter_kind_abort_handle() {
1359 spawn_kernel_and_run_sync(|_locked, current_task| {
1360 let mut executor = fuchsia_async::TestExecutor::new();
1361 let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1362 let abort_handle = Arc::new(abort_handle);
1363 let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1364
1365 let mut fut = futures::stream::Abortable::new(
1366 futures::future::pending::<()>(),
1367 abort_registration,
1368 );
1369
1370 assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1371
1372 waiter_ref.interrupt();
1373 let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1374 match executor.run_singlethreaded(&mut fut) {
1375 Ok(()) => unreachable!("future never terminates normally"),
1376 Err(futures::stream::Aborted) => Ok(()),
1377 }
1378 });
1379
1380 assert_eq!(output, Ok(()));
1381 })
1382 .await;
1383 }
1384
1385 #[::fuchsia::test]
1386 async fn freeze_with_pending_sigusr1() {
1387 spawn_kernel_and_run(async |_locked, current_task| {
1388 {
1389 let mut task_state = current_task.task.write();
1390 let siginfo = SignalInfo::kernel(SIGUSR1);
1391 task_state.enqueue_signal(siginfo);
1392 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1393 }
1394
1395 let output: Result<(), Errno> = current_task
1396 .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1397 unreachable!("callback should not be called")
1398 });
1399 assert_eq!(output, error!(EINTR));
1400
1401 let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1402 assert_eq!(output, Ok(()));
1403 })
1404 .await;
1405 }
1406
1407 #[::fuchsia::test]
1408 async fn freeze_with_pending_sigkill() {
1409 spawn_kernel_and_run(async |_locked, current_task| {
1410 {
1411 let mut task_state = current_task.task.write();
1412 let siginfo = SignalInfo::kernel(SIGKILL);
1413 task_state.enqueue_signal(siginfo);
1414 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1415 }
1416
1417 let output: Result<(), _> = current_task
1418 .run_in_state(RunState::Frozen(Waiter::new()), move || {
1419 unreachable!("callback should not be called")
1420 });
1421 assert_eq!(output, error!(EINTR));
1422 })
1423 .await;
1424 }
1425
1426 #[::fuchsia::test]
1427 async fn test_async_typed_wait_value_with_handler() {
1428 spawn_kernel_and_run(async |locked, current_task| {
1429 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1430 let handler = EventHandler::Enqueue {
1431 key: KEY,
1432 queue: queue.clone(),
1433 sought_events: FdEvents::all(),
1434 };
1435 let waiter = Waiter::new();
1436 let wait_queue = TypedWaitQueue::<u64>::default();
1437
1438 let test_value = 100u64;
1439 let _wait_canceler =
1440 wait_queue.wait_async_value_with_handler(&waiter, test_value, handler);
1441
1442 assert!(queue.lock().is_empty());
1443
1444 wait_queue.notify_value(test_value + 1);
1446 assert!(queue.lock().is_empty());
1447
1448 wait_queue.notify_value(test_value);
1450
1451 waiter.wait(locked, ¤t_task).expect("wait failed");
1452
1453 let ready_items = queue.lock();
1455 assert_eq!(ready_items.len(), 1);
1456 assert!(ready_items[0].events.contains(FdEvents::POLLIN));
1457 })
1458 .await;
1459 }
1460}