1use crate::signals::RunState;
6use crate::task::CurrentTask;
7use crate::vfs::{EpollEventHandler, FdNumber};
8use slab::Slab;
9use starnix_lifecycle::{AtomicU64Counter, AtomicUsizeCounter};
10use starnix_sync::{
11 EventWaitGuard, FileOpsCore, InterruptibleEvent, LockEqualOrBefore, Locked, Mutex, NotifyKind,
12 PortEvent, PortWaitResult,
13};
14use starnix_types::ownership::debug_assert_no_local_temp_ref;
15use starnix_uapi::error;
16use starnix_uapi::errors::{EINTR, Errno};
17use starnix_uapi::signals::{SIGKILL, SigSet, Signal};
18use starnix_uapi::vfs::FdEvents;
19use std::collections::{HashMap, VecDeque};
20use std::sync::{Arc, Weak};
21use syncio::zxio::zxio_signals_t;
22use syncio::{ZxioSignals, ZxioWeak};
23
24#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
25pub enum ReadyItemKey {
26 FdNumber(FdNumber),
27 Usize(usize),
28}
29
30impl From<FdNumber> for ReadyItemKey {
31 fn from(v: FdNumber) -> Self {
32 Self::FdNumber(v)
33 }
34}
35
36impl From<usize> for ReadyItemKey {
37 fn from(v: usize) -> Self {
38 Self::Usize(v)
39 }
40}
41
42#[derive(Debug, Copy, Clone)]
43pub struct ReadyItem {
44 pub key: ReadyItemKey,
45 pub events: FdEvents,
46}
47
48#[derive(Clone)]
49pub enum EventHandler {
50 None,
55
56 Enqueue { key: ReadyItemKey, queue: Arc<Mutex<VecDeque<ReadyItem>>>, sought_events: FdEvents },
61
62 HandleOnce(Arc<Mutex<Option<EventHandler>>>),
67
68 Epoll(EpollEventHandler),
70}
71
72impl EventHandler {
73 pub fn handle(self, events: FdEvents) {
74 match self {
75 Self::None => {}
76 Self::Enqueue { key, queue, sought_events } => {
77 let events = events & sought_events;
78 queue.lock().push_back(ReadyItem { key, events });
79 }
80 Self::HandleOnce(inner) => {
81 if let Some(inner) = inner.lock().take() {
82 inner.handle(events);
83 }
84 }
85 Self::Epoll(e) => e.handle(events),
86 }
87 }
88}
89
90pub struct ZxioSignalHandler {
91 pub zxio: ZxioWeak,
92 pub get_events_from_zxio_signals: fn(zxio_signals_t) -> FdEvents,
93}
94
95pub struct ManyZxHandleSignalHandler {
98 pub count: usize,
99 pub counter: Arc<AtomicUsizeCounter>,
100 pub expected_signals: zx::Signals,
101 pub events: FdEvents,
102}
103
104pub enum SignalHandlerInner {
105 None,
106 Zxio(ZxioSignalHandler),
107 ZxHandle(fn(zx::Signals) -> FdEvents),
108 ManyZxHandle(ManyZxHandleSignalHandler),
109}
110
111pub struct SignalHandler {
112 pub inner: SignalHandlerInner,
113 pub event_handler: EventHandler,
114 pub err_code: Option<Errno>,
115}
116
117impl SignalHandler {
118 fn handle(self, signals: zx::Signals) -> Option<Errno> {
119 let SignalHandler { inner, event_handler, err_code } = self;
120 let events = match inner {
121 SignalHandlerInner::None => None,
122 SignalHandlerInner::Zxio(ZxioSignalHandler { zxio, get_events_from_zxio_signals }) => {
123 if let Some(zxio) = zxio.upgrade() {
124 Some(get_events_from_zxio_signals(zxio.wait_end(signals)))
125 } else {
126 None
127 }
128 }
129 SignalHandlerInner::ZxHandle(get_events_from_zx_signals) => {
130 Some(get_events_from_zx_signals(signals))
131 }
132 SignalHandlerInner::ManyZxHandle(signal_handler) => {
133 if signals.contains(signal_handler.expected_signals) {
134 let new_count = signal_handler.counter.next() + 1;
135 assert!(new_count <= signal_handler.count);
136 if new_count == signal_handler.count {
137 Some(signal_handler.events)
138 } else {
139 None
140 }
141 } else {
142 None
143 }
144 }
145 };
146 if let Some(events) = events {
147 event_handler.handle(events)
148 }
149 err_code
150 }
151}
152
153pub enum WaitCallback {
154 SignalHandler(SignalHandler),
155 EventHandler(EventHandler),
156}
157
158struct WaitCancelerQueue {
159 wait_queue: Weak<Mutex<WaitQueueImpl>>,
160 waiter: WaiterRef,
161 wait_key: WaitKey,
162 waiter_id: WaitEntryId,
163}
164
165struct WaitCancelerZxio {
166 zxio: ZxioWeak,
167 inner: PortWaitCanceler,
168}
169
170struct WaitCancelerPort {
171 inner: PortWaitCanceler,
172}
173
174enum WaitCancelerInner {
175 Zxio(WaitCancelerZxio),
176 Queue(WaitCancelerQueue),
177 Port(WaitCancelerPort),
178}
179
180const WAIT_CANCELER_COMMON_SIZE: usize = 2;
181
182pub struct WaitCanceler {
189 cancellers: smallvec::SmallVec<[WaitCancelerInner; WAIT_CANCELER_COMMON_SIZE]>,
190}
191
192impl WaitCanceler {
193 fn new_inner(inner: WaitCancelerInner) -> Self {
194 Self { cancellers: smallvec::smallvec![inner] }
195 }
196
197 pub fn new_noop() -> Self {
198 Self { cancellers: Default::default() }
199 }
200
201 pub fn new_zxio(zxio: ZxioWeak, inner: PortWaitCanceler) -> Self {
202 Self::new_inner(WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }))
203 }
204
205 pub fn new_port(inner: PortWaitCanceler) -> Self {
206 Self::new_inner(WaitCancelerInner::Port(WaitCancelerPort { inner }))
207 }
208
209 pub fn merge(self, other: Self) -> Self {
215 assert!(
218 self.cancellers.len() + other.cancellers.len() <= WAIT_CANCELER_COMMON_SIZE,
219 "WaitCanceler::merge disallows more than {} cancellers, found {} + {}",
220 WAIT_CANCELER_COMMON_SIZE,
221 self.cancellers.len(),
222 other.cancellers.len()
223 );
224 WaitCanceler::merge_unbounded(self, other)
225 }
226
227 pub fn merge_unbounded(
229 Self { mut cancellers }: Self,
230 Self { cancellers: mut other }: Self,
231 ) -> Self {
232 cancellers.append(&mut other);
233 WaitCanceler { cancellers }
234 }
235
236 pub fn cancel(self) {
240 let Self { cancellers } = self;
241 for canceller in cancellers.into_iter().rev() {
242 match canceller {
243 WaitCancelerInner::Zxio(WaitCancelerZxio { zxio, inner }) => {
244 let Some(zxio) = zxio.upgrade() else { return };
245 let (_, signals) = zxio.wait_begin(ZxioSignals::NONE.bits());
246 inner.cancel();
247 zxio.wait_end(signals);
248 }
249 WaitCancelerInner::Queue(WaitCancelerQueue {
250 wait_queue,
251 waiter,
252 wait_key,
253 waiter_id: WaitEntryId { key, id },
254 }) => {
255 let Some(wait_queue) = wait_queue.upgrade() else { return };
256 waiter.remove_callback(&wait_key);
257 let mut wait_queue = wait_queue.lock();
258 let waiters = &mut wait_queue.waiters;
259 if let Some(entry) = waiters.get_mut(key) {
260 if entry.id == id {
265 waiters.remove(key);
266 }
267 }
268 }
269 WaitCancelerInner::Port(WaitCancelerPort { inner }) => {
270 inner.cancel();
271 }
272 }
273 }
274 }
275}
276
277pub struct PortWaitCanceler {
284 waiter: Weak<PortWaiter>,
285 key: WaitKey,
286}
287
288impl PortWaitCanceler {
289 pub fn cancel(self) {
293 let Self { waiter, key } = self;
294 if let Some(waiter) = waiter.upgrade() {
295 let _ = waiter.port.cancel(key.raw);
296 waiter.remove_callback(&key);
297 }
298 }
299}
300
301#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
302struct WaitKey {
303 raw: u64,
304}
305
306#[derive(Clone, Copy, Debug)]
308enum WaitEvents {
309 All,
312 Fd(FdEvents),
314 Value(u64),
316 SignalMask(SigSet),
318}
319
320impl WaitEvents {
321 fn intercept(self: &WaitEvents, other: &WaitEvents) -> bool {
323 match (self, other) {
324 (Self::All, _) | (_, Self::All) => true,
325 (Self::Fd(m1), Self::Fd(m2)) => m1.bits() & m2.bits() != 0,
326 (Self::Value(v1), Self::Value(v2)) => v1 == v2,
327 (Self::SignalMask(m1), Self::SignalMask(m2)) => m1.intersects(m2),
329 _ => false,
330 }
331 }
332}
333
334impl WaitCallback {
335 pub fn none() -> EventHandler {
336 EventHandler::None
337 }
338}
339
340struct PortWaiter {
344 port: PortEvent,
345 callbacks: Mutex<HashMap<WaitKey, WaitCallback>>, next_key: AtomicU64Counter,
347 ignore_signals: bool,
348
349 wait_queues: Mutex<HashMap<WaitKey, Weak<Mutex<WaitQueueImpl>>>>,
354}
355
356impl PortWaiter {
357 fn new(ignore_signals: bool) -> Arc<Self> {
359 Arc::new(PortWaiter {
360 port: PortEvent::new(),
361 callbacks: Default::default(),
362 next_key: AtomicU64Counter::new(1),
363 ignore_signals,
364 wait_queues: Default::default(),
365 })
366 }
367
368 fn wait_internal(&self, deadline: zx::MonotonicInstant) -> Result<(), Errno> {
370 debug_assert_no_local_temp_ref();
374
375 match self.port.wait(deadline) {
376 PortWaitResult::Notification { kind: NotifyKind::Regular } => Ok(()),
377 PortWaitResult::Notification { kind: NotifyKind::Interrupt } => error!(EINTR),
378 PortWaitResult::Signal { key, observed } => {
379 if let Some(callback) = self.remove_callback(&WaitKey { raw: key }) {
380 match callback {
381 WaitCallback::SignalHandler(handler) => {
382 if let Some(errno) = handler.handle(observed) {
383 return Err(errno);
384 }
385 }
386 WaitCallback::EventHandler(_) => {
387 panic!("wrong type of handler called")
388 }
389 }
390 }
391
392 Ok(())
393 }
394 PortWaitResult::TimedOut => error!(ETIMEDOUT),
395 }
396 }
397
398 fn wait_until<L>(
399 self: &Arc<Self>,
400 locked: &mut Locked<L>,
401 current_task: &CurrentTask,
402 run_state: RunState,
403 deadline: zx::MonotonicInstant,
404 ) -> Result<(), Errno>
405 where
406 L: LockEqualOrBefore<FileOpsCore>,
407 {
408 let is_waiting = deadline.into_nanos() > 0;
409
410 let callback = || {
411 loop {
422 let wait_result = self.wait_internal(deadline);
423 if let Err(errno) = &wait_result {
424 if errno.code == EINTR && !is_waiting {
425 continue; }
427 }
428 return wait_result;
429 }
430 };
431
432 current_task.trigger_delayed_releaser(locked);
434
435 if is_waiting { current_task.run_in_state(run_state, callback) } else { callback() }
436 }
437
438 fn next_key(&self) -> WaitKey {
439 let key = self.next_key.next();
440 assert!(key != 0, "bad key from u64 wraparound");
442 WaitKey { raw: key }
443 }
444
445 fn register_callback(&self, callback: WaitCallback) -> WaitKey {
446 let key = self.next_key();
447 assert!(
448 self.callbacks.lock().insert(key, callback).is_none(),
449 "unexpected callback already present for key {key:?}"
450 );
451 key
452 }
453
454 fn remove_callback(&self, key: &WaitKey) -> Option<WaitCallback> {
455 self.callbacks.lock().remove(&key)
456 }
457
458 fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
459 let callback = WaitCallback::EventHandler(handler);
460 let key = self.register_callback(callback);
461 self.queue_events(&key, WaitEvents::Fd(events));
462 }
463
464 fn wake_on_zircon_signals(
470 self: &Arc<Self>,
471 handle: &dyn zx::AsHandleRef,
472 zx_signals: zx::Signals,
473 handler: SignalHandler,
474 ) -> Result<PortWaitCanceler, zx::Status> {
475 let callback = WaitCallback::SignalHandler(handler);
476 let key = self.register_callback(callback);
477 self.port.object_wait_async(
478 handle,
479 key.raw,
480 zx_signals,
481 zx::WaitAsyncOpts::EDGE_TRIGGERED,
482 )?;
483 Ok(PortWaitCanceler { waiter: Arc::downgrade(self), key })
484 }
485
486 fn queue_events(&self, key: &WaitKey, events: WaitEvents) {
487 scopeguard::defer! {
488 self.port.notify(NotifyKind::Regular)
489 }
490
491 let Some(callback) = self.remove_callback(key) else {
503 return;
504 };
505
506 match callback {
507 WaitCallback::EventHandler(handler) => {
508 let events = match events {
509 WaitEvents::All => FdEvents::all(),
512 WaitEvents::Fd(events) => events,
513 WaitEvents::SignalMask(_) => FdEvents::POLLIN,
514 _ => panic!("wrong type of handler called: {events:?}"),
515 };
516 handler.handle(events)
517 }
518 WaitCallback::SignalHandler(_) => {
519 panic!("wrong type of handler called")
520 }
521 }
522 }
523
524 fn notify(&self) {
525 self.port.notify(NotifyKind::Regular);
526 }
527
528 fn interrupt(&self) {
529 if self.ignore_signals {
530 return;
531 }
532 self.port.notify(NotifyKind::Interrupt);
533 }
534}
535
536impl std::fmt::Debug for PortWaiter {
537 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538 f.debug_struct("PortWaiter").field("port", &self.port).finish_non_exhaustive()
539 }
540}
541
542#[derive(Debug, Clone)]
544pub struct Waiter {
545 inner: Arc<PortWaiter>,
548}
549
550impl Waiter {
551 pub fn new() -> Self {
553 Self { inner: PortWaiter::new(false) }
554 }
555
556 pub fn new_ignoring_signals() -> Self {
558 Self { inner: PortWaiter::new(true) }
559 }
560
561 fn weak(&self) -> WaiterRef {
563 WaiterRef::from_port(&self.inner)
564 }
565
566 pub fn freeze<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask)
570 where
571 L: LockEqualOrBefore<FileOpsCore>,
572 {
573 while self
574 .inner
575 .wait_until(
576 locked,
577 current_task,
578 RunState::Frozen(self.clone()),
579 zx::MonotonicInstant::INFINITE,
580 )
581 .is_err()
582 {
583 if current_task.read().has_signal_pending(SIGKILL) {
585 break;
586 }
587 }
589 }
590
591 pub fn wait<L>(&self, locked: &mut Locked<L>, current_task: &CurrentTask) -> Result<(), Errno>
595 where
596 L: LockEqualOrBefore<FileOpsCore>,
597 {
598 self.inner.wait_until(
599 locked,
600 current_task,
601 RunState::Waiter(WaiterRef::from_port(&self.inner)),
602 zx::MonotonicInstant::INFINITE,
603 )
604 }
605
606 pub fn wait_until<L>(
629 &self,
630 locked: &mut Locked<L>,
631 current_task: &CurrentTask,
632 deadline: zx::MonotonicInstant,
633 ) -> Result<(), Errno>
634 where
635 L: LockEqualOrBefore<FileOpsCore>,
636 {
637 self.inner.wait_until(
638 locked,
639 current_task,
640 RunState::Waiter(WaiterRef::from_port(&self.inner)),
641 deadline,
642 )
643 }
644
645 fn create_wait_entry(&self, filter: WaitEvents) -> WaitEntry {
646 WaitEntry { waiter: self.weak(), filter, key: self.inner.next_key() }
647 }
648
649 fn create_wait_entry_with_handler(
650 &self,
651 filter: WaitEvents,
652 handler: EventHandler,
653 ) -> WaitEntry {
654 let key = self.inner.register_callback(WaitCallback::EventHandler(handler));
655 WaitEntry { waiter: self.weak(), filter, key }
656 }
657
658 pub fn wake_immediately(&self, events: FdEvents, handler: EventHandler) {
659 self.inner.wake_immediately(events, handler);
660 }
661
662 pub fn wake_on_zircon_signals(
667 &self,
668 handle: &dyn zx::AsHandleRef,
669 zx_signals: zx::Signals,
670 handler: SignalHandler,
671 ) -> Result<PortWaitCanceler, zx::Status> {
672 self.inner.wake_on_zircon_signals(handle, zx_signals, handler)
673 }
674
675 pub fn fake_wait(&self) -> WaitCanceler {
679 WaitCanceler::new_noop()
680 }
681
682 pub fn notify(&self) {
684 self.inner.notify();
685 }
686
687 pub fn interrupt(&self) {
693 self.inner.interrupt();
694 }
695}
696
697impl Drop for Waiter {
698 fn drop(&mut self) {
699 let wait_queues = std::mem::take(&mut *self.inner.wait_queues.lock()).into_values();
702 for wait_queue in wait_queues {
703 if let Some(wait_queue) = wait_queue.upgrade() {
704 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != *self)
705 }
706 }
707 }
708}
709
710impl Default for Waiter {
711 fn default() -> Self {
712 Self::new()
713 }
714}
715
716impl PartialEq for Waiter {
717 fn eq(&self, other: &Self) -> bool {
718 Arc::ptr_eq(&self.inner, &other.inner)
719 }
720}
721
722pub struct SimpleWaiter {
723 event: Arc<InterruptibleEvent>,
724 wait_queues: Vec<Weak<Mutex<WaitQueueImpl>>>,
725}
726
727impl SimpleWaiter {
728 pub fn new(event: &Arc<InterruptibleEvent>) -> (SimpleWaiter, EventWaitGuard<'_>) {
729 (SimpleWaiter { event: event.clone(), wait_queues: Default::default() }, event.begin_wait())
730 }
731}
732
733impl Drop for SimpleWaiter {
734 fn drop(&mut self) {
735 for wait_queue in &self.wait_queues {
736 if let Some(wait_queue) = wait_queue.upgrade() {
737 wait_queue.lock().waiters.retain(|_, entry| entry.entry.waiter != self.event)
738 }
739 }
740 }
741}
742
743#[derive(Debug, Clone)]
744enum WaiterKind {
745 Port(Weak<PortWaiter>),
746 Event(Weak<InterruptibleEvent>),
747 AbortHandle(Weak<futures::stream::AbortHandle>),
748}
749
750impl Default for WaiterKind {
751 fn default() -> Self {
752 WaiterKind::Port(Default::default())
753 }
754}
755
756#[derive(Debug, Default, Clone)]
759pub struct WaiterRef(WaiterKind);
760
761impl WaiterRef {
762 fn from_port(waiter: &Arc<PortWaiter>) -> WaiterRef {
763 WaiterRef(WaiterKind::Port(Arc::downgrade(waiter)))
764 }
765
766 fn from_event(event: &Arc<InterruptibleEvent>) -> WaiterRef {
767 WaiterRef(WaiterKind::Event(Arc::downgrade(event)))
768 }
769
770 pub fn from_abort_handle(handle: &Arc<futures::stream::AbortHandle>) -> WaiterRef {
771 WaiterRef(WaiterKind::AbortHandle(Arc::downgrade(handle)))
772 }
773
774 pub fn is_valid(&self) -> bool {
775 match &self.0 {
776 WaiterKind::Port(waiter) => waiter.strong_count() != 0,
777 WaiterKind::Event(event) => event.strong_count() != 0,
778 WaiterKind::AbortHandle(handle) => handle.strong_count() != 0,
779 }
780 }
781
782 pub fn interrupt(&self) {
783 match &self.0 {
784 WaiterKind::Port(waiter) => {
785 if let Some(waiter) = waiter.upgrade() {
786 waiter.interrupt();
787 }
788 }
789 WaiterKind::Event(event) => {
790 if let Some(event) = event.upgrade() {
791 event.interrupt();
792 }
793 }
794 WaiterKind::AbortHandle(handle) => {
795 if let Some(handle) = handle.upgrade() {
796 handle.abort();
797 }
798 }
799 }
800 }
801
802 fn remove_callback(&self, key: &WaitKey) {
803 match &self.0 {
804 WaiterKind::Port(waiter) => {
805 if let Some(waiter) = waiter.upgrade() {
806 waiter.remove_callback(key);
807 }
808 }
809 _ => (),
810 }
811 }
812
813 fn will_remove_from_wait_queue(&self, key: &WaitKey) {
818 match &self.0 {
819 WaiterKind::Port(waiter) => {
820 if let Some(waiter) = waiter.upgrade() {
821 waiter.wait_queues.lock().remove(key);
822 }
823 }
824 _ => (),
825 }
826 }
827
828 fn notify(&self, key: &WaitKey, events: WaitEvents) -> bool {
835 match &self.0 {
836 WaiterKind::Port(waiter) => {
837 if let Some(waiter) = waiter.upgrade() {
838 waiter.queue_events(key, events);
839 return true;
840 }
841 }
842 WaiterKind::Event(event) => {
843 if let Some(event) = event.upgrade() {
844 event.notify();
845 return true;
846 }
847 }
848 WaiterKind::AbortHandle(handle) => {
849 if let Some(handle) = handle.upgrade() {
850 handle.abort();
851 return true;
852 }
853 }
854 }
855 false
856 }
857}
858
859impl PartialEq<Waiter> for WaiterRef {
860 fn eq(&self, other: &Waiter) -> bool {
861 match &self.0 {
862 WaiterKind::Port(waiter) => waiter.as_ptr() == Arc::as_ptr(&other.inner),
863 _ => false,
864 }
865 }
866}
867
868impl PartialEq<Arc<InterruptibleEvent>> for WaiterRef {
869 fn eq(&self, other: &Arc<InterruptibleEvent>) -> bool {
870 match &self.0 {
871 WaiterKind::Event(event) => event.as_ptr() == Arc::as_ptr(other),
872 _ => false,
873 }
874 }
875}
876
877impl PartialEq for WaiterRef {
878 fn eq(&self, other: &WaiterRef) -> bool {
879 match (&self.0, &other.0) {
880 (WaiterKind::Port(lhs), WaiterKind::Port(rhs)) => Weak::ptr_eq(lhs, rhs),
881 (WaiterKind::Event(lhs), WaiterKind::Event(rhs)) => Weak::ptr_eq(lhs, rhs),
882 (WaiterKind::AbortHandle(lhs), WaiterKind::AbortHandle(rhs)) => Weak::ptr_eq(lhs, rhs),
883 _ => false,
884 }
885 }
886}
887
888#[derive(Default, Debug)]
895pub struct WaitQueue(Arc<Mutex<WaitQueueImpl>>);
896
897#[derive(Debug)]
898struct WaitEntryWithId {
899 entry: WaitEntry,
900 id: u64,
904}
905
906struct WaitEntryId {
907 key: usize,
908 id: u64,
909}
910
911#[derive(Default, Debug)]
912struct WaitQueueImpl {
913 next_wait_entry_id: u64,
919 waiters: Slab<WaitEntryWithId>,
923}
924
925#[derive(Debug)]
927struct WaitEntry {
928 waiter: WaiterRef,
930
931 filter: WaitEvents,
933
934 key: WaitKey,
936}
937
938impl WaitQueue {
939 fn add_waiter(&self, entry: WaitEntry) -> WaitEntryId {
940 let mut wait_queue = self.0.lock();
941 let id = wait_queue
942 .next_wait_entry_id
943 .checked_add(1)
944 .expect("all possible wait entry ID values exhausted");
945 wait_queue.next_wait_entry_id = id;
946 WaitEntryId { key: wait_queue.waiters.insert(WaitEntryWithId { entry, id }), id }
947 }
948
949 fn wait_async_entry(&self, waiter: &Waiter, entry: WaitEntry) -> WaitCanceler {
958 let wait_key = entry.key;
959 let waiter_id = self.add_waiter(entry);
960 let wait_queue = Arc::downgrade(&self.0);
961 waiter.inner.wait_queues.lock().insert(wait_key, wait_queue.clone());
962 WaitCanceler::new_inner(WaitCancelerInner::Queue(WaitCancelerQueue {
963 wait_queue,
964 waiter: waiter.weak(),
965 wait_key,
966 waiter_id,
967 }))
968 }
969
970 pub fn wait_async_value(&self, waiter: &Waiter, value: u64) -> WaitCanceler {
979 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::Value(value)))
980 }
981
982 pub fn wait_async_fd_events(
991 &self,
992 waiter: &Waiter,
993 events: FdEvents,
994 handler: EventHandler,
995 ) -> WaitCanceler {
996 let entry = waiter.create_wait_entry_with_handler(WaitEvents::Fd(events), handler);
997 self.wait_async_entry(waiter, entry)
998 }
999
1000 pub fn wait_async_signal_mask(
1009 &self,
1010 waiter: &Waiter,
1011 mask: SigSet,
1012 handler: EventHandler,
1013 ) -> WaitCanceler {
1014 let entry = waiter.create_wait_entry_with_handler(WaitEvents::SignalMask(mask), handler);
1015 self.wait_async_entry(waiter, entry)
1016 }
1017
1018 pub fn wait_async(&self, waiter: &Waiter) -> WaitCanceler {
1027 self.wait_async_entry(waiter, waiter.create_wait_entry(WaitEvents::All))
1028 }
1029
1030 pub fn wait_async_simple(&self, waiter: &mut SimpleWaiter) {
1031 let entry = WaitEntry {
1032 waiter: WaiterRef::from_event(&waiter.event),
1033 filter: WaitEvents::All,
1034 key: Default::default(),
1035 };
1036 waiter.wait_queues.push(Arc::downgrade(&self.0));
1037 self.add_waiter(entry);
1038 }
1039
1040 fn notify_events_count(&self, mut events: WaitEvents, mut limit: usize) -> usize {
1041 if let WaitEvents::Fd(ref mut fd_events) = events {
1042 *fd_events = fd_events.add_equivalent_fd_events();
1043 }
1044 let mut woken = 0;
1045 self.0.lock().waiters.retain(|_, WaitEntryWithId { entry, id: _ }| {
1046 if limit > 0 && entry.filter.intercept(&events) {
1047 if entry.waiter.notify(&entry.key, events) {
1048 limit -= 1;
1049 woken += 1;
1050 }
1051
1052 entry.waiter.will_remove_from_wait_queue(&entry.key);
1053 false
1054 } else {
1055 true
1056 }
1057 });
1058 woken
1059 }
1060
1061 pub fn notify_fd_events(&self, events: FdEvents) {
1062 self.notify_events_count(WaitEvents::Fd(events), usize::MAX);
1063 }
1064
1065 pub fn notify_signal(&self, signal: &Signal) {
1066 let event = WaitEvents::SignalMask(SigSet::from(*signal));
1067 self.notify_events_count(event, usize::MAX);
1068 }
1069
1070 pub fn notify_value(&self, value: u64) {
1071 self.notify_events_count(WaitEvents::Value(value), usize::MAX);
1072 }
1073
1074 pub fn notify_unordered_count(&self, limit: usize) {
1075 self.notify_events_count(WaitEvents::All, limit);
1076 }
1077
1078 pub fn notify_all(&self) {
1079 self.notify_unordered_count(usize::MAX);
1080 }
1081
1082 pub fn is_empty(&self) -> bool {
1084 self.0.lock().waiters.is_empty()
1085 }
1086}
1087
1088pub struct TypedWaitQueue<T: Into<u64>> {
1090 wait_queue: WaitQueue,
1091 value_type: std::marker::PhantomData<T>,
1092}
1093
1094impl<T: Into<u64>> Default for TypedWaitQueue<T> {
1096 fn default() -> Self {
1097 Self { wait_queue: Default::default(), value_type: Default::default() }
1098 }
1099}
1100
1101impl<T: Into<u64>> TypedWaitQueue<T> {
1102 pub fn wait_async_value(&self, waiter: &Waiter, value: T) -> WaitCanceler {
1103 self.wait_queue.wait_async_value(waiter, value.into())
1104 }
1105
1106 pub fn notify_value(&self, value: T) {
1107 self.wait_queue.notify_value(value.into())
1108 }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use super::*;
1114 use crate::fs::fuchsia::create_fuchsia_pipe;
1115 use crate::signals::SignalInfo;
1116 use crate::task::TaskFlags;
1117 use crate::testing::{spawn_kernel_and_run, spawn_kernel_and_run_sync};
1118 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
1119 use crate::vfs::eventfd::{EventFdType, new_eventfd};
1120 use assert_matches::assert_matches;
1121 use starnix_sync::Unlocked;
1122 use starnix_uapi::open_flags::OpenFlags;
1123 use starnix_uapi::signals::SIGUSR1;
1124
1125 const KEY: ReadyItemKey = ReadyItemKey::Usize(1234);
1126
1127 #[::fuchsia::test]
1128 async fn test_async_wait_exec() {
1129 spawn_kernel_and_run(async |locked, current_task| {
1130 let (local_socket, remote_socket) = zx::Socket::create_stream();
1131 let pipe =
1132 create_fuchsia_pipe(locked, ¤t_task, remote_socket, OpenFlags::RDWR).unwrap();
1133
1134 const MEM_SIZE: usize = 1024;
1135 let mut output_buffer = VecOutputBuffer::new(MEM_SIZE);
1136
1137 let test_string = "hello startnix".to_string();
1138 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1139 let handler = EventHandler::Enqueue {
1140 key: KEY,
1141 queue: queue.clone(),
1142 sought_events: FdEvents::all(),
1143 };
1144 let waiter = Waiter::new();
1145 pipe.wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1146 .expect("wait_async");
1147 let test_string_clone = test_string.clone();
1148
1149 let write_count = AtomicUsizeCounter::default();
1150 std::thread::scope(|s| {
1151 let thread = s.spawn(|| {
1152 let test_data = test_string_clone.as_bytes();
1153 let no_written = local_socket.write(test_data).unwrap();
1154 assert_eq!(0, write_count.add(no_written));
1155 assert_eq!(no_written, test_data.len());
1156 });
1157
1158 assert!(queue.lock().is_empty());
1161 waiter.wait(locked, ¤t_task).unwrap();
1162 thread.join().expect("join thread")
1163 });
1164 queue.lock().iter().for_each(|item| assert!(item.events.contains(FdEvents::POLLIN)));
1165
1166 let read_size = pipe.read(locked, ¤t_task, &mut output_buffer).unwrap();
1167
1168 let no_written = write_count.get();
1169 assert_eq!(no_written, read_size);
1170
1171 assert_eq!(output_buffer.data(), test_string.as_bytes());
1172 })
1173 .await;
1174 }
1175
1176 #[::fuchsia::test]
1177 async fn test_async_wait_cancel() {
1178 for do_cancel in [true, false] {
1179 spawn_kernel_and_run(async move |locked, current_task| {
1180 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
1181 let waiter = Waiter::new();
1182 let queue: Arc<Mutex<VecDeque<ReadyItem>>> = Default::default();
1183 let handler = EventHandler::Enqueue {
1184 key: KEY,
1185 queue: queue.clone(),
1186 sought_events: FdEvents::all(),
1187 };
1188 let wait_canceler = event
1189 .wait_async(locked, ¤t_task, &waiter, FdEvents::POLLIN, handler)
1190 .expect("wait_async");
1191 if do_cancel {
1192 wait_canceler.cancel();
1193 }
1194 let add_val = 1u64;
1195 assert_eq!(
1196 event
1197 .write(
1198 locked,
1199 ¤t_task,
1200 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
1201 )
1202 .unwrap(),
1203 std::mem::size_of::<u64>()
1204 );
1205
1206 let wait_result =
1207 waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO);
1208 let final_count = queue.lock().len();
1209 if do_cancel {
1210 assert_eq!(wait_result, error!(ETIMEDOUT));
1211 assert_eq!(0, final_count);
1212 } else {
1213 assert_eq!(wait_result, Ok(()));
1214 assert_eq!(1, final_count);
1215 }
1216 })
1217 .await;
1218 }
1219 }
1220
1221 #[::fuchsia::test]
1222 async fn single_waiter_multiple_waits_cancel_one_waiter_still_notified() {
1223 spawn_kernel_and_run(async |locked, current_task| {
1224 let wait_queue = WaitQueue::default();
1225 let waiter = Waiter::new();
1226 let wk1 = wait_queue.wait_async(&waiter);
1227 let _wk2 = wait_queue.wait_async(&waiter);
1228 wk1.cancel();
1229 wait_queue.notify_all();
1230 assert!(waiter.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1231 })
1232 .await;
1233 }
1234
1235 #[::fuchsia::test]
1236 async fn multiple_waiters_cancel_one_other_still_notified() {
1237 spawn_kernel_and_run(async |locked, current_task| {
1238 let wait_queue = WaitQueue::default();
1239 let waiter1 = Waiter::new();
1240 let waiter2 = Waiter::new();
1241 let wk1 = wait_queue.wait_async(&waiter1);
1242 let _wk2 = wait_queue.wait_async(&waiter2);
1243 wk1.cancel();
1244 wait_queue.notify_all();
1245 assert!(waiter1.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_err());
1246 assert!(waiter2.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok());
1247 })
1248 .await;
1249 }
1250
1251 #[::fuchsia::test]
1252 async fn test_wait_queue() {
1253 spawn_kernel_and_run(async |locked, current_task| {
1254 let queue = WaitQueue::default();
1255
1256 let waiters = <[Waiter; 3]>::default();
1257 waiters.iter().for_each(|w| {
1258 queue.wait_async(w);
1259 });
1260
1261 let woken = |locked: &mut Locked<Unlocked>| {
1262 waiters
1263 .iter()
1264 .filter(|w| {
1265 w.wait_until(locked, ¤t_task, zx::MonotonicInstant::ZERO).is_ok()
1266 })
1267 .count()
1268 };
1269
1270 const INITIAL_NOTIFY_COUNT: usize = 2;
1271 let total_waiters = waiters.len();
1272 queue.notify_unordered_count(INITIAL_NOTIFY_COUNT);
1273 assert_eq!(INITIAL_NOTIFY_COUNT, woken(locked));
1274
1275 queue.notify_all();
1277 assert_eq!(total_waiters - INITIAL_NOTIFY_COUNT, woken(locked));
1278 })
1279 .await;
1280 }
1281
1282 #[::fuchsia::test]
1283 async fn waiter_kind_abort_handle() {
1284 spawn_kernel_and_run_sync(|_locked, current_task| {
1285 let mut executor = fuchsia_async::TestExecutor::new();
1286 let (abort_handle, abort_registration) = futures::stream::AbortHandle::new_pair();
1287 let abort_handle = Arc::new(abort_handle);
1288 let waiter_ref = WaiterRef::from_abort_handle(&abort_handle);
1289
1290 let mut fut = futures::stream::Abortable::new(
1291 futures::future::pending::<()>(),
1292 abort_registration,
1293 );
1294
1295 assert_matches!(executor.run_until_stalled(&mut fut), std::task::Poll::Pending);
1296
1297 waiter_ref.interrupt();
1298 let output = current_task.run_in_state(RunState::Waiter(waiter_ref), move || {
1299 match executor.run_singlethreaded(&mut fut) {
1300 Ok(()) => unreachable!("future never terminates normally"),
1301 Err(futures::stream::Aborted) => Ok(()),
1302 }
1303 });
1304
1305 assert_eq!(output, Ok(()));
1306 })
1307 .await;
1308 }
1309
1310 #[::fuchsia::test]
1311 async fn freeze_with_pending_sigusr1() {
1312 spawn_kernel_and_run(async |_locked, current_task| {
1313 {
1314 let mut task_state = current_task.task.write();
1315 let siginfo = SignalInfo::default(SIGUSR1);
1316 task_state.enqueue_signal(siginfo);
1317 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1318 }
1319
1320 let output: Result<(), Errno> = current_task
1321 .run_in_state(RunState::Event(InterruptibleEvent::new()), move || {
1322 unreachable!("callback should not be called")
1323 });
1324 assert_eq!(output, error!(EINTR));
1325
1326 let output = current_task.run_in_state(RunState::Frozen(Waiter::new()), move || Ok(()));
1327 assert_eq!(output, Ok(()));
1328 })
1329 .await;
1330 }
1331
1332 #[::fuchsia::test]
1333 async fn freeze_with_pending_sigkill() {
1334 spawn_kernel_and_run(async |_locked, current_task| {
1335 {
1336 let mut task_state = current_task.task.write();
1337 let siginfo = SignalInfo::default(SIGKILL);
1338 task_state.enqueue_signal(siginfo);
1339 task_state.set_flags(TaskFlags::SIGNALS_AVAILABLE, true);
1340 }
1341
1342 let output: Result<(), _> = current_task
1343 .run_in_state(RunState::Frozen(Waiter::new()), move || {
1344 unreachable!("callback should not be called")
1345 });
1346 assert_eq!(output, error!(EINTR));
1347 })
1348 .await;
1349 }
1350}