1use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{
10 FutexTableStateLock, InterruptibleEvent, LockBefore, Locked, OrderedMutex, Unlocked,
11};
12use starnix_types::futex_address::FutexAddress;
13use starnix_uapi::errors::Errno;
14use starnix_uapi::user_address::UserAddress;
15use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, VecDeque};
18use std::hash::Hash;
19use std::sync::{Arc, Weak};
20
21pub struct FutexTable<Key: FutexKey> {
27 state: OrderedMutex<FutexTableState<Key>, FutexTableStateLock>,
31}
32
33impl<Key: FutexKey> Default for FutexTable<Key> {
34 fn default() -> Self {
35 Self { state: OrderedMutex::new(FutexTableState::default()) }
36 }
37}
38
39impl<Key: FutexKey> FutexTable<Key> {
40 pub fn wait_boot(
44 &self,
45 locked: &mut Locked<Unlocked>,
46 current_task: &CurrentTask,
47 addr: UserAddress,
48 value: u32,
49 mask: u32,
50 deadline: zx::BootInstant,
51 timer_slack: zx::BootDuration,
52 ) -> Result<(), Errno> {
53 let addr = FutexAddress::try_from(addr)?;
54 let mut state = self.state.lock(locked);
55 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
60 if value != loaded_value {
61 return error!(EAGAIN);
62 }
63
64 let key = Key::get(current_task, addr)?;
65 let waiter = Arc::new(Waiter::new());
66 let timer = zx::BootTimer::create();
67 let signal_handler = SignalHandler {
68 inner: SignalHandlerInner::None,
69 event_handler: EventHandler::None,
70 err_code: Some(errno!(ETIMEDOUT)),
71 };
72 waiter
73 .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
74 .expect("wait can only fail in OOM conditions");
75 timer
76 .set(deadline, timer_slack)
77 .expect("timer set cannot fail with valid handles and slack");
78 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
79 mask,
80 notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
81 });
82 std::mem::drop(state);
83 waiter.wait(locked, current_task).inspect_err(|_| {
84 self.state.lock(locked).remove_boot_waiter_from_queue(key, &waiter);
88 })
89 }
90
91 pub fn wait<L>(
95 &self,
96 locked: &mut Locked<L>,
97 current_task: &CurrentTask,
98 addr: UserAddress,
99 value: u32,
100 mask: u32,
101 deadline: zx::MonotonicInstant,
102 ) -> Result<(), Errno>
103 where
104 L: LockBefore<FutexTableStateLock>,
105 {
106 let addr = FutexAddress::try_from(addr)?;
107 let mut state = self.state.lock(locked);
108 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
113 if value != loaded_value {
114 return error!(EAGAIN);
115 }
116
117 let key = Key::get(current_task, addr)?;
118 let event = InterruptibleEvent::new();
119 let guard = event.begin_wait();
120 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
121 mask,
122 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
123 });
124 std::mem::drop(state);
125
126 current_task.block_until(guard, deadline).inspect_err(|_| {
127 self.state.lock(locked).remove_waiter_from_queue(key, &event);
131 })
132 }
133
134 pub fn wake<L>(
139 &self,
140 locked: &mut Locked<L>,
141 task: &Task,
142 addr: UserAddress,
143 count: usize,
144 mask: u32,
145 ) -> Result<usize, Errno>
146 where
147 L: LockBefore<FutexTableStateLock>,
148 {
149 let addr = FutexAddress::try_from(addr)?;
150 let key = Key::get(task, addr)?;
151 Ok(self.state.lock(locked).wake(key, count, mask))
152 }
153
154 pub fn requeue<L>(
158 &self,
159 locked: &mut Locked<L>,
160 current_task: &CurrentTask,
161 addr: UserAddress,
162 wake_count: usize,
163 requeue_count: usize,
164 new_addr: UserAddress,
165 expected_value: Option<u32>,
166 ) -> Result<usize, Errno>
167 where
168 L: LockBefore<FutexTableStateLock>,
169 {
170 let addr = FutexAddress::try_from(addr)?;
171 let new_addr = FutexAddress::try_from(new_addr)?;
172 let key = Key::get(current_task, addr)?;
173 let new_key = Key::get(current_task, new_addr)?;
174 let mut state = self.state.lock(locked);
175 if let Some(expected) = expected_value {
176 let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
179 if value != expected {
180 return error!(EAGAIN);
181 }
182 }
183
184 Ok(state.requeue(key, new_key, wake_count, requeue_count))
185 }
186
187 pub fn lock_pi<L>(
191 &self,
192 locked: &mut Locked<L>,
193 current_task: &CurrentTask,
194 addr: UserAddress,
195 deadline: zx::MonotonicInstant,
196 ) -> Result<(), Errno>
197 where
198 L: LockBefore<FutexTableStateLock>,
199 {
200 let addr = FutexAddress::try_from(addr)?;
201 let mut state = self.state.lock(locked);
202 let key = Key::get(current_task, addr)?;
205
206 let tid = current_task.get_tid() as u32;
207 let mm = current_task.mm()?;
208
209 let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
213 let new_owner_tid = loop {
214 let new_owner_tid = current_value & FUTEX_TID_MASK;
215 if new_owner_tid == tid {
216 return error!(EDEADLOCK);
223 }
224
225 if current_value == 0 {
226 match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
229 CompareExchangeResult::Success => return Ok(()),
230 CompareExchangeResult::Stale { observed } => {
231 current_value = observed;
232 continue;
233 }
234 CompareExchangeResult::Error(e) => return Err(e),
235 }
236 }
237
238 let target_value = current_value | FUTEX_WAITERS;
241 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
242 CompareExchangeResult::Success => (),
243 CompareExchangeResult::Stale { observed } => {
244 current_value = observed;
245 continue;
246 }
247 CompareExchangeResult::Error(e) => return Err(e),
248 }
249 break new_owner_tid;
250 };
251
252 let event = InterruptibleEvent::new();
253 let guard = event.begin_wait();
254 let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
255 state
256 .get_rt_mutex_waiters_or_default(key.clone())
257 .push_back(RtMutexWaiter { tid, notifiable });
258 std::mem::drop(state);
259
260 current_task
264 .get_task(new_owner_tid as i32)
265 .ok()
266 .and_then(|o| o.running_state().unwrap().thread.get().map(|t| Arc::clone(&t.thread)))
267 .map_or_else(
268 || error!(ESRCH),
269 |owner| current_task.block_with_owner_until(guard, &owner, deadline),
270 )
271 .inspect_err(|_| {
272 self.state.lock(locked).remove_rt_mutex_waiter_from_queue(key, &event);
276 })
277 }
278
279 pub fn unlock_pi<L>(
283 &self,
284 locked: &mut Locked<L>,
285 current_task: &CurrentTask,
286 addr: UserAddress,
287 ) -> Result<(), Errno>
288 where
289 L: LockBefore<FutexTableStateLock>,
290 {
291 let addr = FutexAddress::try_from(addr)?;
292 let mut state = self.state.lock(locked);
293 let tid = current_task.get_tid() as u32;
294 let mm = current_task.mm()?;
295
296 let key = Key::get(current_task, addr)?;
297
298 let current_value = mm.atomic_load_u32_relaxed(addr)?;
302 if current_value & FUTEX_TID_MASK != tid {
303 return error!(EPERM);
308 }
309
310 loop {
311 let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
312 let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
313
314 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
317 CompareExchangeResult::Success => (),
318 CompareExchangeResult::Stale { .. } => return error!(EINVAL),
327 CompareExchangeResult::Error(_) => return error!(EACCES),
331 }
332
333 let Some(mut waiter) = maybe_waiter else {
334 break;
336 };
337
338 if waiter.notifiable.notify() {
339 break;
340 }
341
342 }
345
346 Ok(())
347 }
348}
349
350impl FutexTable<SharedFutexKey> {
351 pub fn external_wait<L>(
361 &self,
362 locked: &mut Locked<L>,
363 memory: MemoryObject,
364 offset: u64,
365 value: u32,
366 mask: u32,
367 ) -> Result<(Arc<()>, oneshot::Receiver<()>), Errno>
368 where
369 L: LockBefore<FutexTableStateLock>,
370 {
371 let key = SharedFutexKey::new(&memory, offset);
372 let mut state = self.state.lock(locked);
373 Self::external_check_futex_value(&memory, offset, value)?;
375
376 let token = Arc::new(());
377 let (sender, receiver) = oneshot::channel::<()>();
378 state.get_waiters_or_default(key).add(FutexWaiter {
379 mask,
380 notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
381 });
382 Ok((token, receiver))
383 }
384
385 pub fn external_wake<L>(
390 &self,
391 locked: &mut Locked<L>,
392 memory: MemoryObject,
393 offset: u64,
394 count: usize,
395 mask: u32,
396 ) -> Result<usize, Errno>
397 where
398 L: LockBefore<FutexTableStateLock>,
399 {
400 Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
401 }
402
403 pub fn external_requeue<L>(
404 &self,
405 locked: &mut Locked<L>,
406 first_memory: MemoryObject,
407 first_offset: u64,
408 second_memory: Option<MemoryObject>,
409 second_offset: u64,
410 wake_count: usize,
411 requeue_count: usize,
412 expected_value: Option<u32>,
413 ) -> Result<usize, Errno>
414 where
415 L: LockBefore<FutexTableStateLock>,
416 {
417 let first_key = SharedFutexKey::new(&first_memory, first_offset);
418 let second_key = match second_memory.as_ref() {
419 Some(second_memory) => SharedFutexKey::new(second_memory, second_offset),
420 None => SharedFutexKey::new(&first_memory, second_offset),
421 };
422 let mut state = self.state.lock(locked);
429 if let Some(expected) = expected_value {
430 Self::external_check_futex_value(&first_memory, first_offset, expected)?;
432 }
433 Ok(state.requeue(first_key, second_key, wake_count, requeue_count))
434 }
435
436 fn external_check_futex_value(
437 memory: &MemoryObject,
438 offset: u64,
439 value: u32,
440 ) -> Result<(), Errno> {
441 let loaded_value = {
442 let mut buf = [0u8; 4];
444 memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
445 u32::from_ne_bytes(buf)
446 };
447 if loaded_value != value {
448 return error!(EAGAIN);
449 }
450 Ok(())
451 }
452}
453
454pub trait FutexKey: Sized + Ord + Hash + Clone {
455 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
456 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
457}
458
459#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
460pub struct PrivateFutexKey {
461 addr: FutexAddress,
462}
463
464impl FutexKey for PrivateFutexKey {
465 fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
466 Ok(PrivateFutexKey { addr })
467 }
468
469 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
470 Ok(task.mm()?.futex.clone())
471 }
472}
473
474#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
475pub struct SharedFutexKey {
476 koid: zx::Koid,
479 offset: u64,
480}
481
482impl FutexKey for SharedFutexKey {
483 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
484 let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
485 Ok(SharedFutexKey::new(&memory, offset))
486 }
487
488 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
489 Ok(task.kernel().shared_futexes.clone())
490 }
491}
492
493impl SharedFutexKey {
494 fn new(memory: &MemoryObject, offset: u64) -> Self {
495 Self { koid: memory.get_koid(), offset }
496 }
497}
498
499struct FutexTableState<Key: FutexKey> {
500 waiters: HashMap<Key, FutexWaiters>,
501 rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
502}
503
504impl<Key: FutexKey> Default for FutexTableState<Key> {
505 fn default() -> Self {
506 Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
507 }
508}
509
510impl<Key: FutexKey> FutexTableState<Key> {
511 fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
513 self.waiters.entry(key).or_default()
514 }
515
516 fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
517 let entry = self.waiters.entry(key);
518 match entry {
519 Entry::Vacant(_) => 0,
520 Entry::Occupied(mut entry) => {
521 let count = entry.get_mut().notify(mask, count);
522 if entry.get().is_empty() {
523 entry.remove();
524 }
525 count
526 }
527 }
528 }
529
530 fn requeue(
531 &mut self,
532 key: Key,
533 new_key: Key,
534 wake_count: usize,
535 requeue_count: usize,
536 ) -> usize {
537 let woken;
538 let to_requeue;
539 match self.waiters.entry(key) {
540 Entry::Vacant(_) => return 0,
541 Entry::Occupied(mut entry) => {
542 woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
544
545 to_requeue = entry.get_mut().split_for_requeue(requeue_count);
547
548 if entry.get().is_empty() {
549 entry.remove();
550 }
551 }
552 }
553
554 let requeued = to_requeue.0.len();
555 if !to_requeue.is_empty() {
556 self.get_waiters_or_default(new_key).transfer(to_requeue);
557 }
558
559 woken + requeued
560 }
561
562 fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
565 self.rt_mutex_waiters.entry(key).or_default()
566 }
567
568 fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
570 let entry = self.rt_mutex_waiters.entry(key);
571 match entry {
572 Entry::Vacant(_) => None,
573 Entry::Occupied(mut entry) => {
574 let mut waiter = entry.get_mut().pop_front();
575 if entry.get().is_empty() {
579 entry.remove();
580 } else if let Some(waiter) = &mut waiter {
581 waiter.tid |= FUTEX_WAITERS;
582 }
583 waiter
584 }
585 }
586 }
587
588 fn remove_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
594 if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
595 if entry.get_mut().remove_waiter(event) {
596 if entry.get().is_empty() {
597 entry.remove();
598 }
599 return;
600 }
601 }
602
603 let mut key_to_remove = None;
604 for (key, waiters) in self.waiters.iter_mut() {
605 if waiters.remove_waiter(event) {
606 if waiters.is_empty() {
607 key_to_remove = Some(key.clone());
608 }
609 break;
610 }
611 }
612 if let Some(key) = key_to_remove {
613 self.waiters.remove(&key);
614 }
615 }
616
617 fn remove_boot_waiter_from_queue(&mut self, key: Key, waiter: &Arc<Waiter>) {
622 if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
623 if entry.get_mut().remove_boot_waiter(waiter) {
624 if entry.get().is_empty() {
625 entry.remove();
626 }
627 return;
628 }
629 }
630
631 let mut key_to_remove = None;
632 for (key, waiters) in self.waiters.iter_mut() {
633 if waiters.remove_boot_waiter(waiter) {
634 if waiters.is_empty() {
635 key_to_remove = Some(key.clone());
636 }
637 break;
638 }
639 }
640 if let Some(key) = key_to_remove {
641 self.waiters.remove(&key);
642 }
643 }
644
645 fn remove_rt_mutex_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
651 let predicate =
652 |w: &RtMutexWaiter| !w.notifiable.matches_event(event) && !w.notifiable.is_stale();
653
654 if let Entry::Occupied(mut entry) = self.rt_mutex_waiters.entry(key) {
655 let len_before = entry.get().len();
656 entry.get_mut().retain(predicate);
657 if entry.get().len() < len_before {
658 if entry.get().is_empty() {
659 entry.remove();
660 }
661 return;
662 }
663 }
664
665 let mut key_to_remove = None;
666 for (key, waiters) in self.rt_mutex_waiters.iter_mut() {
667 let len_before = waiters.len();
668 waiters.retain(predicate);
669 if waiters.len() < len_before {
670 if waiters.is_empty() {
671 key_to_remove = Some(key.clone());
672 }
673 break;
674 }
675 }
676 if let Some(key) = key_to_remove {
677 self.rt_mutex_waiters.remove(&key);
678 }
679 }
680}
681
682enum FutexNotifiable {
684 Internal(Weak<InterruptibleEvent>),
686 InternalBoot(Weak<Waiter>),
688 External(Weak<()>, Option<oneshot::Sender<()>>),
692}
693
694impl FutexNotifiable {
695 fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
696 Self::Internal(event)
697 }
698
699 fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
700 Self::InternalBoot(waiter)
701 }
702
703 fn new_external(token: Weak<()>, sender: oneshot::Sender<()>) -> Self {
704 Self::External(token, Some(sender))
705 }
706
707 fn notify(&mut self) -> bool {
710 match self {
711 Self::Internal(event) => {
712 if let Some(event) = event.upgrade() {
713 event.notify();
714 true
715 } else {
716 false
717 }
718 }
719 Self::InternalBoot(waiter) => {
720 if let Some(waiter) = waiter.upgrade() {
721 waiter.notify();
722 true
723 } else {
724 false
725 }
726 }
727 Self::External(_, sender) => {
728 if let Some(sender) = sender.take() {
729 sender.send(()).is_ok()
730 } else {
731 false
732 }
733 }
734 }
735 }
736
737 fn matches_event(&self, event: &Arc<InterruptibleEvent>) -> bool {
738 match self {
739 Self::Internal(weak) => {
740 if let Some(strong) = weak.upgrade() {
741 Arc::ptr_eq(&strong, event)
742 } else {
743 false
744 }
745 }
746 _ => false,
747 }
748 }
749
750 fn matches_waiter(&self, waiter: &Arc<Waiter>) -> bool {
751 match self {
752 Self::InternalBoot(weak) => {
753 if let Some(strong) = weak.upgrade() {
754 Arc::ptr_eq(&strong, waiter)
755 } else {
756 false
757 }
758 }
759 _ => false,
760 }
761 }
762
763 fn is_stale(&self) -> bool {
764 match self {
765 Self::Internal(weak) => weak.strong_count() == 0,
766 Self::External(weak, _) => weak.strong_count() == 0,
767 Self::InternalBoot(weak) => weak.strong_count() == 0,
768 }
769 }
770}
771
772struct FutexWaiter {
773 mask: u32,
774 notifiable: FutexNotifiable,
775}
776
777#[derive(Default)]
778struct FutexWaiters(VecDeque<FutexWaiter>);
779
780impl FutexWaiters {
781 fn add(&mut self, waiter: FutexWaiter) {
782 self.0.push_back(waiter);
783 }
784
785 fn notify(&mut self, mask: u32, count: usize) -> usize {
786 let mut woken = 0;
787 self.0.retain_mut(|waiter| {
788 if woken == count || waiter.mask & mask == 0 {
789 return true;
790 }
791 if waiter.notifiable.notify() {
794 woken += 1;
795 }
796 false
797 });
798 woken
799 }
800
801 fn transfer(&mut self, mut other: Self) {
802 self.0.append(&mut other.0);
803 }
804
805 fn is_empty(&self) -> bool {
806 self.0.is_empty()
807 }
808
809 fn remove_waiter(&mut self, event: &Arc<InterruptibleEvent>) -> bool {
810 let initial_len = self.0.len();
811 self.0.retain(|w| !w.notifiable.matches_event(event) && !w.notifiable.is_stale());
812 self.0.len() < initial_len
813 }
814
815 fn remove_boot_waiter(&mut self, waiter: &Arc<Waiter>) -> bool {
816 let initial_len = self.0.len();
817 self.0.retain(|w| !w.notifiable.matches_waiter(waiter) && !w.notifiable.is_stale());
818 self.0.len() < initial_len
819 }
820
821 fn split_for_requeue(&mut self, count: usize) -> Self {
822 let count = std::cmp::min(count, self.0.len());
823 let tail = self.0.split_off(count);
824 let head = std::mem::replace(&mut self.0, tail);
825 FutexWaiters(head)
826 }
827}
828
829struct RtMutexWaiter {
830 tid: u32,
832
833 notifiable: FutexNotifiable,
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839 use starnix_sync::InterruptibleEvent;
840 use starnix_uapi::restricted_aspace::RESTRICTED_ASPACE_BASE;
841 use starnix_uapi::user_address::UserAddress;
842
843 #[fuchsia::test]
844 fn test_remove_waiter_simple() {
845 let mut state = FutexTableState::<PrivateFutexKey>::default();
846 let key = PrivateFutexKey {
847 addr: FutexAddress::try_from(UserAddress::from(
848 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
849 ))
850 .unwrap(),
851 };
852 let event = Arc::new(InterruptibleEvent::new());
853
854 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
855 mask: u32::MAX,
856 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
857 });
858
859 assert_eq!(state.waiters.len(), 1);
860 state.remove_waiter_from_queue(key, &event);
861 assert_eq!(state.waiters.len(), 0);
862 }
863
864 #[fuchsia::test]
865 fn test_remove_waiter_requeued() {
866 let mut state = FutexTableState::<PrivateFutexKey>::default();
867 let key1 = PrivateFutexKey {
868 addr: FutexAddress::try_from(UserAddress::from(
869 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
870 ))
871 .unwrap(),
872 };
873 let key2 = PrivateFutexKey {
874 addr: FutexAddress::try_from(UserAddress::from(
875 (RESTRICTED_ASPACE_BASE + 0x2000) as u64,
876 ))
877 .unwrap(),
878 };
879 let event = Arc::new(InterruptibleEvent::new());
880
881 state.get_waiters_or_default(key2.clone()).add(FutexWaiter {
882 mask: u32::MAX,
883 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
884 });
885
886 assert_eq!(state.waiters.len(), 1);
887 state.remove_waiter_from_queue(key1, &event);
888 assert_eq!(state.waiters.len(), 0);
889 }
890
891 #[fuchsia::test]
892 fn test_remove_rt_mutex_waiter() {
893 let mut state = FutexTableState::<PrivateFutexKey>::default();
894 let key = PrivateFutexKey {
895 addr: FutexAddress::try_from(UserAddress::from(
896 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
897 ))
898 .unwrap(),
899 };
900 let event = Arc::new(InterruptibleEvent::new());
901
902 state.get_rt_mutex_waiters_or_default(key.clone()).push_back(RtMutexWaiter {
903 tid: 1,
904 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
905 });
906
907 assert_eq!(state.rt_mutex_waiters.len(), 1);
908 state.remove_rt_mutex_waiter_from_queue(key, &event);
909 assert_eq!(state.rt_mutex_waiters.len(), 0);
910 }
911
912 #[fuchsia::test]
913 fn test_split_for_requeue_fairness() {
914 let mut waiters = FutexWaiters::default();
915 let e1 = Arc::new(InterruptibleEvent::new());
916 let e2 = Arc::new(InterruptibleEvent::new());
917 let e3 = Arc::new(InterruptibleEvent::new());
918
919 waiters.add(FutexWaiter {
920 mask: 1,
921 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e1)),
922 });
923 waiters.add(FutexWaiter {
924 mask: 2,
925 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e2)),
926 });
927 waiters.add(FutexWaiter {
928 mask: 3,
929 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e3)),
930 });
931
932 let split = waiters.split_for_requeue(2);
933
934 assert_eq!(split.0.len(), 2);
935 assert_eq!(split.0[0].mask, 1);
936 assert_eq!(split.0[1].mask, 2);
937
938 assert_eq!(waiters.0.len(), 1);
939 assert_eq!(waiters.0[0].mask, 3);
940 }
941
942 #[fuchsia::test]
943 fn test_stale_external_waiter_cleanup() {
944 let mut state = FutexTableState::<PrivateFutexKey>::default();
945 let key = PrivateFutexKey {
946 addr: FutexAddress::try_from(UserAddress::from(
947 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
948 ))
949 .unwrap(),
950 };
951
952 {
953 let token = Arc::new(());
954 let (sender, _receiver) = oneshot::channel::<()>();
955 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
956 mask: u32::MAX,
957 notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
958 });
959 } assert_eq!(state.waiters.len(), 1);
962
963 let dummy_event = Arc::new(InterruptibleEvent::new());
965 state.remove_waiter_from_queue(key, &dummy_event);
966
967 assert_eq!(state.waiters.len(), 0, "Stale external waiter should be removed");
968 }
969}