1use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use bitflags::bitflags;
9use slab::Slab;
10use starnix_lifecycle::{AtomicU64Counter, AtomicUsizeCounter};
11use starnix_sync::{
12 EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
13 PortEvent, PortWaitResult,
14};
15use starnix_types::ownership::debug_assert_no_local_temp_ref;
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, Errno};
18use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
19use starnix_uapi::vfs::FdEvents;
20use std::collections::{HashMap, VecDeque};
21use std::sync::{Arc, Weak};
22use syncio::zxio::zxio_signals_t;
23use syncio::{ZxioSignals, ZxioWeak};
24
25#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
26pub enum ReadyItemKey {
27 FdNumber(FdNumber),
28 Usize(usize),
29}
30
31impl From<FdNumber> for ReadyItemKey {
32 fn from(v: FdNumber) -> Self {
33 Self::FdNumber(v)
34 }
35}
36
37impl From<usize> for ReadyItemKey {
38 fn from(v: usize) -> Self {
39 Self::Usize(v)
40 }
41}
42
43#[derive(Debug, Copy, Clone)]
44pub struct ReadyItem {
45 pub key: ReadyItemKey,
46 pub events: FdEvents,
47}
48
49#[derive(Clone)]
50pub enum EventHandler {
51 None,
56
57 Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
62
63 HandleOnce(Arc<Mutex<Option<EventHandler>>>),
68
69 Epoll(EpollEventHandler),
71}
72
73impl EventHandler {
74 pub fn handle(self, events: FdEvents) {
75 match self {
76 Self::None => {}
77 Self::Enqueue { key, queue, sought_events } => {
78 let events = events & sought_events;
79 queue.lock().push_back(ReadyItem { key, events });
80 }
81 Self::HandleOnce(inner) => {
82 if let Some(inner) = inner.lock().take() {
83 inner.handle(events);
84 }
85 }
86 Self::Epoll(e) => e.handle(events),
87 }
88 }
89}
90
91pub struct ZxioSignalHandler {
92 pub zxio: ZxioWeak,
93 pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
94}
95
96pub struct ManyZxHandleSignalHandler {
99 pub count: usize,
100 pub counter: Arc<AtomicUsizeCounter>,
101 pub expected_signals: zx::Signals,
102 pub events: FdEvents,
103}
104
105pub enum SignalHandlerInner {
106 None,
107 Zxio(ZxioSignalHandler),
108 ZxHandle(fn(zx::Signals) -> FdEvents),
109 ManyZxHandle(ManyZxHandleSignalHandler),
110}
111
112pub struct SignalHandler {
113 pub inner: SignalHandlerInner,
114 pub event_handler: EventHandler,
115 pub err_code: Option<Errno>,
116}
117
118impl SignalHandler {
119 fn handle(self, signals: zx::Signals) -> Option<Errno> {
120 let SignalHandler { inner, event_handler, err_code } = self;
121 let events = match inner {
122 SignalHandlerInner::None => None,
123 SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
124 if let Some(zxio) = zxio.upgrade() {
125 Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
126 } else {
127 None
128 }
129 }
130 SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
131 Some(get_events_from_zx_signals(signals))
132 }
133 SignalHandlerInner::ManyZxHandle(signal_handler) => {
134 if signals.contains(signal_handler.expected_signals) {
135 let new_count = signal_handler.counter.next() + 1;
136 assert!(new_count <= signal_handler.count);
137 if new_count == signal_handler.count {
138 Some(signal_handler.events)
139 } else {
140 None
141 }
142 } else {
143 None
144 }
145 }
146 };
147 if let Some(events) = events {
148 event_handler.handle(events)
149 }
150 err_code
151 }
152}
153
154pub enum WaitCallback {
155 SignalHandler(SignalHandler),
156 EventHandler(EventHandler),
157}
158
159struct WaitCancelerQueue {
160 wait_queue: Weak<Mutex<WaitQueueImpl>>,
161 waiter: WaiterRef,
162 wait_key: WaitKey,
163 waiter_id: WaitEntryId,
164}
165
166struct WaitCancelerZxio {
167 zxio: ZxioWeak,
168 inner: PortWaitCanceler,
169}
170
171struct WaitCancelerPort {
172 inner: PortWaitCanceler,
173}
174
175enum WaitCancelerInner {
176 Zxio(WaitCancelerZxio),
177 Queue(WaitCancelerQueue),
178 Port(WaitCancelerPort),
179}
180
181const WAIT_CANCELER_COMMON_SIZE: usize = 2;
182
183pub struct WaitCanceler {
190 cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
191}
192
193impl WaitCanceler {
194 fn new_inner(inner: WaitCancelerInner) -> Self {
195 Self { cancellers: smallvec::smallvec![inner] }
196 }
197
198 pub fn new_noop() -> Self {
199 Self { cancellers: Default::default() }
200 }
201
202 pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
203 Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
204 }
205
206 pub fn new_port(inner: PortWaitCanceler) -> Self {
207 Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
208 }
209
210 pub fn merge(self, other: Self) -> Self {
216 assert!(
219 self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
220 "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
221 WAIT_CANCELER_COMMON_SIZE,
222 self.cancellers.len(),
223 other.cancellers.len()
224 );
225 WaitCanceler::merge_unbounded(self, other)
226 }
227
228 pub fn merge_unbounded(
230 Self { mut cancellers }: Self,
231 Self { cancellers: mut other }: Self,
232 ) -> Self {
233 cancellers.append(&mut other);
234 WaitCanceler { cancellers }
235 }
236
237 pub fn cancel(self) {
241 let Self { cancellers } = self;
242 for canceller in cancellers.into_iter().rev() {
243 match canceller {
244 WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
245 let Some(zxio) = zxio.upgrade() else { return };
246 let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
247 inner.cancel();
248 zxio.wait_end(signals);
249 }
250 WaitCancelerInner::Queue(WaitCancelerQueue {
251 wait_queue,
252 waiter,
253 wait_key,
254 waiter_id: WaitEntryId { key, id },
255 }) => {
256 let Some(wait_queue) = wait_queue.upgrade() else { return };
257 waiter.remove_callback(&wait_key);
258 waiter.will_remove_from_wait_queue(&wait_key);
259 let mut wait_queue = wait_queue.lock();
260 let waiters = &mut wait_queue.waiters;
261 if let Some(entry) = waiters.get_mut(key) {
262 if entry.id == id {
267 waiters.remove(key);
268 }
269 }
270 }
271 WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
272 inner.cancel();
273 }
274 }
275 }
276 }
277}
278
279pub struct PortWaitCanceler {
286 waiter: Weak<PortWaiter>,
287 key: WaitKey,
288}
289
290impl PortWaitCanceler {
291 pub fn cancel(self) {
295 let Self { waiter, key } = self;
296 if let Some(waiter) = waiter.upgrade() {
297 let _ = waiter.port.cancel(key.raw);
298 waiter.remove_callback(&key);
299 }
300 }
301}
302
303#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
304struct WaitKey {
305 raw: u64,
306}
307
308#[derive(Clone, Copy, Debug)]
310enum WaitEvents {
311 All,
314 Fd(FdEvents),
316 Value(u64),
318 SignalMask(SigSet),
320}
321
322impl WaitEvents {
323 fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
325 match (self, other) {
326 (Self::All, _) | (_, Self::All) => true,
327 (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
328 (Self::Value(v1), Self::Value(v2)) => v1 == v2,
329 (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
331 _ => false,
332 }
333 }
334}
335
336impl WaitCallback {
337 pub fn none() -> EventHandler {
338 EventHandler::None
339 }
340}
341
342bitflags! {
343 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
344 pub struct WaiterOptions: u8 {
345 const IGNORE_SIGNALS = 1;
347
348 const UNSAFE_CALLSTACK = 2;
353 }
354}
355
356struct PortWaiter {
360 port: PortEvent,
361 callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, next_key: AtomicU64Counter,
363 options: WaiterOptions,
364
365 wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
370}
371
372impl PortWaiter {
373 fn new(options: WaiterOptions) -> Arc<Self> {
375 Arc::new(PortWaiter {
376 port: PortEvent::new(),
377 callbacks: Default::default(),
378 next_key: AtomicU64Counter::new(1),
379 options,
380 wait_queues: Default::default(),
381 })
382 }
383
384 fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
386 debug_assert_no_local_temp_ref();
390
391 match self.port.wait(deadline) {
392 PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
393 PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
394 PortWaitResult::Signal { key, observed } => {
395 if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
396 match callback {
397 WaitCallback::SignalHandler(handler) => {
398 if let Some(errno) = handler.handle(observed) {
399 return Err(errno);
400 }
401 }
402 WaitCallback::EventHandler(_) => {
403 panic!("wrong type of handler called")
404 }
405 }
406 }
407
408 Ok(())
409 }
410 PortWaitResult::TimedOut => error!(ETIMEDOUT),
411 }
412 }
413
414 fn wait_until<L>(
415 self: &Arc<Self>,
416 locked: &mut Locked<L>,
417 current_task: &CurrentTask,
418 run_state: RunState,
419 deadline: zx::MonotonicInstant,
420 ) -> Result<(), Errno>
421 where
422 L: LockEqualOrBefore<FileOpsCore>,
423 {
424 let is_waiting = deadline.into_nanos() > 0;
425
426 let callback = || {
427 loop {
438 let wait_result = self.wait_internal(deadline);
439 if let Err(errno) = &wait_result {
440 if errno.code == EINTR && !is_waiting {
441 continue; }
443 }
444 return wait_result;
445 }
446 };
447
448 if !self.options.contains(WaiterOptions::UNSAFE_CALLSTACK) {
452 current_task.trigger_delayed_releaser(locked);
453 }
454
455 if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
456 }
457
458 fn next_key(&self) -> WaitKey {
459 let key = self.next_key.next();
460 assert!(key != 0, "bad key from u64 wraparound");
462 WaitKey { raw: key }
463 }
464
465 fn register_callback(&self, callback: WaitCallback) -> WaitKey {
466 let key = self.next_key();
467 assert!(
468 self.callbacks.lock().insert(key, callback).is_none(),
469 "unexpected callback already present for key {key:?}"
470 );
471 key
472 }
473
474 fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
475 self.callbacks.lock().remove(&key)
476 }
477
478 fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
479 let callback = WaitCallback::EventHandler(handler);
480 let key = self.register_callback(callback);
481 self.queue_events(&key, WaitEvents::Fd(events));
482 }
483
484 fn wake_on_zircon_signals(
490 self: &Arc<Self>,
491 handle: &dyn zx::AsHandleRef,
492 zx_signals: zx::Signals,
493 handler: SignalHandler,
494 ) -> Result<PortWaitCanceler, zx::Status> {
495 let callback = WaitCallback::SignalHandler(handler);
496 let key = self.register_callback(callback);
497 self.port.object_wait_async(
498 handle,
499 key.raw,
500 zx_signals,
501 zx::WaitAsyncOpts::EDGE_TRIGGERED,
502 )?;
503 Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
504 }
505
506 fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
507 scopeguard::defer! {
508 self.port.notify(NotifyKind::Regular)
509 }
510
511 let Some(callback) = self.remove_callback(key) else {
523 return;
524 };
525
526 match callback {
527 WaitCallback::EventHandler(handler) => {
528 let events = match events {
529 WaitEvents::All => FdEvents::all(),
532 WaitEvents::Fd(events) => events,
533 WaitEvents::SignalMask(_) => FdEvents::POLLIN,
534 _ => panic!("wrong type of handler called: {events:?}"),
535 };
536 handler.handle(events)
537 }
538 WaitCallback::SignalHandler(_) => {
539 panic!("wrong type of handler called")
540 }
541 }
542 }
543
544 fn notify(&self) {
545 self.port.notify(NotifyKind::Regular);
546 }
547
548 fn interrupt(&self) {
549 if self.options.contains(WaiterOptions::IGNORE_SIGNALS) {
550 return;
551 }
552 self.port.notify(NotifyKind::Interrupt);
553 }
554}
555
556impl std::fmt::Debug for PortWaiter {
557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558 f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
559 }
560}
561
562#[derive(Debug, Clone)]
564pub struct Waiter {
565 inner: Arc<PortWaiter>,
568}
569
570impl Waiter {
571 pub fn new() -> Self {
573 Self { inner: PortWaiter::new(WaiterOptions::empty()) }
574 }
575
576 pub fn with_options(options: WaiterOptions) -> Self {
578 Self { inner: PortWaiter::new(options) }
579 }
580
581 fn weak(&self) -> WaiterRef {
583 WaiterRef::from_port(&self.inner)
584 }
585
586 pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
590 where
591 L: LockEqualOrBefore<FileOpsCore>,
592 {
593 while self
594 .inner
595 .wait_until(
596 locked,
597 current_task,
598 RunState::Frozen(self.clone()),
599 zx::MonotonicInstant::INFINITE,
600 )
601 .is_err()
602 {
603 if current_task.read().has_signal_pending(SIGKILL) {
605 break;
606 }
607 }
609 }
610
611 pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
615 where
616 L: LockEqualOrBefore<FileOpsCore>,
617 {
618 self.inner.wait_until(
619 locked,
620 current_task,
621 RunState::Waiter(WaiterRef::from_port(&self.inner)),
622 zx::MonotonicInstant::INFINITE,
623 )
624 }
625
626 pub fn wait_until<L>(
649 &self,
650 locked: &mut Locked<L>,
651 current_task: &CurrentTask,
652 deadline: zx::MonotonicInstant,
653 ) -> Result<(), Errno>
654 where
655 L: LockEqualOrBefore<FileOpsCore>,
656 {
657 self.inner.wait_until(
658 locked,
659 current_task,
660 RunState::Waiter(WaiterRef::from_port(&self.inner)),
661 deadline,
662 )
663 }
664
665 fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
666 WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
667 }
668
669 fn create_wait_entry_with_handler(
670 &self,
671 filter: WaitEvents,
672 handler: EventHandler,
673 ) -> WaitEntry {
674 let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
675 WaitEntry { waiter: self.weak(), filter, key }
676 }
677
678 pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
679 self.inner.wake_immediately(events, handler);
680 }
681
682 pub fn wake_on_zircon_signals(
687 &self,
688 handle: &dyn zx::AsHandleRef,
689 zx_signals: zx::Signals,
690 handler: SignalHandler,
691 ) -> Result<PortWaitCanceler, zx::Status> {
692 self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
693 }
694
695 pub fn fake_wait(&self) -> WaitCanceler {
699 WaitCanceler::new_noop()
700 }
701
702 pub fn notify(&self) {
704 self.inner.notify();
705 }
706
707 pub fn interrupt(&self) {
713 self.inner.interrupt();
714 }
715}
716
717impl Drop for Waiter {
718 fn drop(&mut self) {
719 let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
722 for wait_queue in wait_queues {
723 if let Some(wait_queue) = wait_queue.upgrade() {
724 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
725 }
726 }
727 }
728}
729
730impl Default for Waiter {
731 fn default() -> Self {
732 Self::new()
733 }
734}
735
736impl PartialEq for Waiter {
737 fn eq(&self, other: &Self) -> bool {
738 Arc::ptr_eq(&self.inner, &other.inner)
739 }
740}
741
742pub struct SimpleWaiter {
743 event: Arc<InterruptibleEvent>,
744 wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
745}
746
747impl SimpleWaiter {
748 pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
749 (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
750 }
751}
752
753impl Drop for SimpleWaiter {
754 fn drop(&mut self) {
755 for wait_queue in &self.wait_queues {
756 if let Some(wait_queue) = wait_queue.upgrade() {
757 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
758 }
759 }
760 }
761}
762
763#[derive(Debug, Clone)]
764enum WaiterKind {
765 Port(Weak<PortWaiter>),
766 Event(Weak<InterruptibleEvent>),
767 AbortHandle(Weak<futures::stream::AbortHandle>),
768}
769
770impl Default for WaiterKind {
771 fn default() -> Self {
772 WaiterKind::Port(Default::default())
773 }
774}
775
776#[derive(Debug, Default, Clone)]
779pub struct WaiterRef(WaiterKind);
780
781impl WaiterRef {
782 fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
783 WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
784 }
785
786 fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
787 WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
788 }
789
790 pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
791 WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
792 }
793
794 pub fn is_valid(&self) -> bool {
795 match &self.0 {
796 WaiterKind::Port(waiter) => waiter.strong_count() != 0,
797 WaiterKind::Event(event) => event.strong_count() != 0,
798 WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
799 }
800 }
801
802 pub fn interrupt(&self) {
803 match &self.0 {
804 WaiterKind::Port(waiter) => {
805 if let Some(waiter) = waiter.upgrade() {
806 waiter.interrupt();
807 }
808 }
809 WaiterKind::Event(event) => {
810 if let Some(event) = event.upgrade() {
811 event.interrupt();
812 }
813 }
814 WaiterKind::AbortHandle(handle) => {
815 if let Some(handle) = handle.upgrade() {
816 handle.abort();
817 }
818 }
819 }
820 }
821
822 fn remove_callback(&self, key: &WaitKey) {
823 match &self.0 {
824 WaiterKind::Port(waiter) => {
825 if let Some(waiter) = waiter.upgrade() {
826 waiter.remove_callback(key);
827 }
828 }
829 _ => (),
830 }
831 }
832
833 fn will_remove_from_wait_queue(&self, key: &WaitKey) {
838 match &self.0 {
839 WaiterKind::Port(waiter) => {
840 if let Some(waiter) = waiter.upgrade() {
841 waiter.wait_queues.lock().remove(key);
842 }
843 }
844 _ => (),
845 }
846 }
847
848 fn notify(&self, key: &WaitKey, events: WaitEvents) -> bool {
855 match &self.0 {
856 WaiterKind::Port(waiter) => {
857 if let Some(waiter) = waiter.upgrade() {
858 waiter.queue_events(key, events);
859 return true;
860 }
861 }
862 WaiterKind::Event(event) => {
863 if let Some(event) = event.upgrade() {
864 event.notify();
865 return true;
866 }
867 }
868 WaiterKind::AbortHandle(handle) => {
869 if let Some(handle) = handle.upgrade() {
870 handle.abort();
871 return true;
872 }
873 }
874 }
875 false
876 }
877}
878
879impl PartialEq<Waiter> for WaiterRef {
880 fn eq(&self, other: &Waiter) -> bool {
881 match &self.0 {
882 WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
883 _ => false,
884 }
885 }
886}
887
888impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
889 fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
890 match &self.0 {
891 WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
892 _ => false,
893 }
894 }
895}
896
897impl PartialEq for WaiterRef {
898 fn eq(&self, other: &WaiterRef) -> bool {
899 match (&self.0, &other.0) {
900 (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
901 (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
902 (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
903 _ => false,
904 }
905 }
906}
907
908#[derive(Default, Debug, Clone)]
915pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
916
917#[derive(Debug)]
918struct WaitEntryWithId {
919 entry: WaitEntry,
920 id: u64,
924}
925
926struct WaitEntryId {
927 key: usize,
928 id: u64,
929}
930
931#[derive(Default, Debug)]
932struct WaitQueueImpl {
933 next_wait_entry_id: u64,
939 waiters: Slab<WaitEntryWithId>,
943}
944
945#[derive(Debug)]
947struct WaitEntry {
948 waiter: WaiterRef,
950
951 filter: WaitEvents,
953
954 key: WaitKey,
956}
957
958impl WaitQueue {
959 fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
960 let mut wait_queue = self.0.lock();
961 let id = wait_queue
962 .next_wait_entry_id
963 .checked_add(1)
964 .expect("all possible wait entry ID values exhausted");
965 wait_queue.next_wait_entry_id = id;
966 WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
967 }
968
969 fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
978 let wait_key = entry.key;
979 let waiter_id = self.add_waiter(entry);
980 let wait_queue = Arc::downgrade(&self.0);
981 waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
982 WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
983 wait_queue,
984 waiter: waiter.weak(),
985 wait_key,
986 waiter_id,
987 }))
988 }
989
990 pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
999 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
1000 }
1001
1002 pub fn wait_async_fd_events(
1011 &self,
1012 waiter: &Waiter,
1013 events: FdEvents,
1014 handler: EventHandler,
1015 ) -> WaitCanceler {
1016 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
1017 self.wait_async_entry(waiter, entry)
1018 }
1019
1020 pub fn wait_async_signal_mask(
1029 &self,
1030 waiter: &Waiter,
1031 mask: SigSet,
1032 handler: EventHandler,
1033 ) -> WaitCanceler {
1034 let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1035 self.wait_async_entry(waiter, entry)
1036 }
1037
1038 pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1047 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1048 }
1049
1050 pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1051 let entry = WaitEntry {
1052 waiter: WaiterRef::from_event(&waiter.event),
1053 filter: WaitEvents::All,
1054 key: Default::default(),
1055 };
1056 waiter.wait_queues.push(Arc::downgrade(&self.0));
1057 self.add_waiter(entry);
1058 }
1059
1060 fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1061 if let WaitEvents::Fd(ref mut fd_events) = events {
1062 *fd_events = fd_events.add_equivalent_fd_events();
1063 }
1064 let mut woken = 0;
1065 self.0.lock().waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1066 if limit > 0 && entry.filter.intercept(&events) {
1067 if entry.waiter.notify(&entry.key, events) {
1068 limit -= 1;
1069 woken += 1;
1070 }
1071
1072 entry.waiter.will_remove_from_wait_queue(&entry.key);
1073 false
1074 } else {
1075 true
1076 }
1077 });
1078 woken
1079 }
1080
1081 pub fn notify_fd_events(&self, events: FdEvents) {
1082 self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1083 }
1084
1085 pub fn notify_signal(&self, signal: &Signal) {
1086 let event = WaitEvents::SignalMask(SigSet::from(*signal));
1087 self.notify_events_count(event, usize::MAX);
1088 }
1089
1090 pub fn notify_value(&self, value: u64) {
1091 self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1092 }
1093
1094 pub fn notify_unordered_count(&self, limit: usize) {
1095 self.notify_events_count(WaitEvents::All, limit);
1096 }
1097
1098 pub fn notify_all(&self) {
1099 self.notify_unordered_count(usize::MAX);
1100 }
1101
1102 pub fn is_empty(&self) -> bool {
1104 self.0.lock().waiters.is_empty()
1105 }
1106}
1107
1108pub struct TypedWaitQueue<T: Into<u64>> {
1110 wait_queue: WaitQueue,
1111 value_type: std::marker::PhantomData<T>,
1112}
1113
1114impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1116 fn default() -> Self {
1117 Self { wait_queue: Default::default(), value_type: Default::default() }
1118 }
1119}
1120
1121impl<T: Into<u64>> TypedWaitQueue<T> {
1122 pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1123 self.wait_queue.wait_async_value(waiter, value.into())
1124 }
1125
1126 pub fn notify_value(&self, value: T) {
1127 self.wait_queue.notify_value(value.into())
1128 }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133 use super::*;
1134 use crate::fs::fuchsia::create_fuchsia_pipe;
1135 use crate::signals::SignalInfo;
1136 use crate::task::TaskFlags;
1137 use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1138 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1139 use crate::vfs::eventfd::{EventFdType, new_eventfd};
1140 use assert_matches::assert_matches;
1141 use starnix_sync::Unlocked;
1142 use starnix_uapi::open_flags::OpenFlags;
1143 use starnix_uapi::signals::SIGUSR1;
1144
1145 const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1146
1147 #[::fuchsia::test]
1148 async fn test_async_wait_exec() {
1149 spawn_kernel_and_run(async |locked, current_task| {
1150 let (local_socket, remote_socket) = zx::Socket::create_stream();
1151 let pipe =
1152 create_fuchsia_pipe(locked, ¤t_task, remote_socket, OpenFlags::RDWR).unwrap();
1153
1154 const MEM_SIZE: usize = 1024;
1155 let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1156
1157 let test_string = "hello startnix".to_string();
1158 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1159 let handler = EventHandler::Enqueue {
1160 key: KEY,
1161 queue: queue.clone(),
1162 sought_events: FdEvents::all(),
1163 };
1164 let waiter = Waiter::new();
1165 pipe.wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1166 .expect("wait_async");
1167 let test_string_clone = test_string.clone();
1168
1169 let write_count = AtomicUsizeCounter::default();
1170 std::thread::scope(|s| {
1171 let thread = s.spawn(|| {
1172 let test_data = test_string_clone.as_bytes();
1173 let no_written = local_socket.write(test_data).unwrap();
1174 assert_eq!(0, write_count.add(no_written));
1175 assert_eq!(no_written, test_data.len());
1176 });
1177
1178 assert!(queue.lock().is_empty());
1181 waiter.wait(locked, ¤t_task).unwrap();
1182 thread.join().expect("join thread")
1183 });
1184 queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1185
1186 let read_size = pipe.read(locked, ¤t_task, &mut output_buffer).unwrap();
1187
1188 let no_written = write_count.get();
1189 assert_eq!(no_written, read_size);
1190
1191 assert_eq!(output_buffer.data(), test_string.as_bytes());
1192 })
1193 .await;
1194 }
1195
1196 #[::fuchsia::test]
1197 async fn test_async_wait_cancel() {
1198 for do_cancel in [true, false] {
1199 spawn_kernel_and_run(async move |locked, current_task| {
1200 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
1201 let waiter = Waiter::new();
1202 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1203 let handler = EventHandler::Enqueue {
1204 key: KEY,
1205 queue: queue.clone(),
1206 sought_events: FdEvents::all(),
1207 };
1208 let wait_canceler = event
1209 .wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1210 .expect("wait_async");
1211 if do_cancel {
1212 wait_canceler.cancel();
1213 }
1214 let add_val = 1u64;
1215 assert_eq!(
1216 event
1217 .write(
1218 locked,
1219 ¤t_task,
1220 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1221 )
1222 .unwrap(),
1223 std::mem::size_of::<u64>()
1224 );
1225
1226 let wait_result =
1227 waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO);
1228 let final_count = queue.lock().len();
1229 if do_cancel {
1230 assert_eq!(wait_result, error!(ETIMEDOUT));
1231 assert_eq!(0, final_count);
1232 } else {
1233 assert_eq!(wait_result, Ok(()));
1234 assert_eq!(1, final_count);
1235 }
1236 })
1237 .await;
1238 }
1239 }
1240
1241 #[::fuchsia::test]
1242 async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1243 spawn_kernel_and_run(async |locked, current_task| {
1244 let wait_queue = WaitQueue::default();
1245 let waiter = Waiter::new();
1246 let wk1 = wait_queue.wait_async(&waiter);
1247 let _wk2 = wait_queue.wait_async(&waiter);
1248 wk1.cancel();
1249 wait_queue.notify_all();
1250 assert!(waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1251 })
1252 .await;
1253 }
1254
1255 #[::fuchsia::test]
1256 async fn multiple_waiters_cancel_one_other_still_notified() {
1257 spawn_kernel_and_run(async |locked, current_task| {
1258 let wait_queue = WaitQueue::default();
1259 let waiter1 = Waiter::new();
1260 let waiter2 = Waiter::new();
1261 let wk1 = wait_queue.wait_async(&waiter1);
1262 let _wk2 = wait_queue.wait_async(&waiter2);
1263 wk1.cancel();
1264 wait_queue.notify_all();
1265 assert!(waiter1.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_err());
1266 assert!(waiter2.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1267 })
1268 .await;
1269 }
1270
1271 #[::fuchsia::test]
1272 async fn test_wait_queue() {
1273 spawn_kernel_and_run(async |locked, current_task| {
1274 let queue = WaitQueue::default();
1275
1276 let waiters = <[Waiter; 3]>::default();
1277 waiters.iter().for_each(|w| {
1278 queue.wait_async(w);
1279 });
1280
1281 let woken = |locked: &mut Locked<Unlocked>| {
1282 waiters
1283 .iter()
1284 .filter(|w| {
1285 w.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok()
1286 })
1287 .count()
1288 };
1289
1290 const INITIAL_NOTIFY_COUNT: usize = 2;
1291 let total_waiters = waiters.len();
1292 queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1293 assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1294
1295 queue.notify_all();
1297 assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1298 })
1299 .await;
1300 }
1301
1302 #[::fuchsia::test]
1303 async fn waiter_kind_abort_handle() {
1304 spawn_kernel_and_run_sync(|_locked, current_task| {
1305 let mut executor = fuchsia_async::TestExecutor::new();
1306 let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1307 let abort_handle = Arc::new(abort_handle);
1308 let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1309
1310 let mut fut = futures::stream::Abortable::new(
1311 futures::future::pending::<()>(),
1312 abort_registration,
1313 );
1314
1315 assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1316
1317 waiter_ref.interrupt();
1318 let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1319 match executor.run_singlethreaded(&mut fut) {
1320 Ok(()) => unreachable!("future never terminates normally"),
1321 Err(futures::stream::Aborted) => Ok(()),
1322 }
1323 });
1324
1325 assert_eq!(output, Ok(()));
1326 })
1327 .await;
1328 }
1329
1330 #[::fuchsia::test]
1331 async fn freeze_with_pending_sigusr1() {
1332 spawn_kernel_and_run(async |_locked, current_task| {
1333 {
1334 let mut task_state = current_task.task.write();
1335 let siginfo = SignalInfo::default(SIGUSR1);
1336 task_state.enqueue_signal(siginfo);
1337 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1338 }
1339
1340 let output: Result<(), Errno> = current_task
1341 .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1342 unreachable!("callback should not be called")
1343 });
1344 assert_eq!(output, error!(EINTR));
1345
1346 let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1347 assert_eq!(output, Ok(()));
1348 })
1349 .await;
1350 }
1351
1352 #[::fuchsia::test]
1353 async fn freeze_with_pending_sigkill() {
1354 spawn_kernel_and_run(async |_locked, current_task| {
1355 {
1356 let mut task_state = current_task.task.write();
1357 let siginfo = SignalInfo::default(SIGKILL);
1358 task_state.enqueue_signal(siginfo);
1359 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1360 }
1361
1362 let output: Result<(), _> = current_task
1363 .run_in_state(RunState::Frozen(Waiter::new()), move || {
1364 unreachable!("callback should not be called")
1365 });
1366 assert_eq!(output, error!(EINTR));
1367 })
1368 .await;
1369 }
1370}