1use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{InterruptibleEvent, LockBefore, Locked, OrderedMutex, TerminalLock, Unlocked};
10use starnix_types::futex_address::FutexAddress;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::user_address::UserAddress;
13use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, VecDeque};
16use std::hash::Hash;
17use std::sync::{Arc, Weak};
18
19pub struct FutexTable<Key: FutexKey> {
25 state: OrderedMutex<FutexTableState<Key>, TerminalLock>,
29}
30
31impl<Key: FutexKey> Default for FutexTable<Key> {
32 fn default() -> Self {
33 Self { state: OrderedMutex::new(FutexTableState::default()) }
34 }
35}
36
37impl<Key: FutexKey> FutexTable<Key> {
38 pub fn wait_boot(
42 &self,
43 locked: &mut Locked<Unlocked>,
44 current_task: &CurrentTask,
45 addr: UserAddress,
46 value: u32,
47 mask: u32,
48 deadline: zx::BootInstant,
49 timer_slack: zx::BootDuration,
50 ) -> Result<(), Errno> {
51 let addr = FutexAddress::try_from(addr)?;
52 let mut state = self.state.lock(locked);
53 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
58 if value != loaded_value {
59 return error!(EAGAIN);
60 }
61
62 let key = Key::get(current_task, addr)?;
63 let waiter = Arc::new(Waiter::new());
64 let timer = zx::BootTimer::create();
65 let signal_handler = SignalHandler {
66 inner: SignalHandlerInner::None,
67 event_handler: EventHandler::None,
68 err_code: Some(errno!(ETIMEDOUT)),
69 };
70 waiter
71 .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
72 .expect("wait can only fail in OOM conditions");
73 timer
74 .set(deadline, timer_slack)
75 .expect("timer set cannot fail with valid handles and slack");
76 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
77 mask,
78 notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
79 });
80 std::mem::drop(state);
81 waiter.wait(locked, current_task).inspect_err(|_| {
82 self.state.lock(locked).remove_boot_waiter_from_queue(key, &waiter);
86 })
87 }
88
89 pub fn wait<L>(
93 &self,
94 locked: &mut Locked<L>,
95 current_task: &CurrentTask,
96 addr: UserAddress,
97 value: u32,
98 mask: u32,
99 deadline: zx::MonotonicInstant,
100 ) -> Result<(), Errno>
101 where
102 L: LockBefore<TerminalLock>,
103 {
104 let addr = FutexAddress::try_from(addr)?;
105 let mut state = self.state.lock(locked);
106 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
111 if value != loaded_value {
112 return error!(EAGAIN);
113 }
114
115 let key = Key::get(current_task, addr)?;
116 let event = InterruptibleEvent::new();
117 let guard = event.begin_wait();
118 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
119 mask,
120 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
121 });
122 std::mem::drop(state);
123
124 current_task.block_until(guard, deadline).inspect_err(|_| {
125 self.state.lock(locked).remove_waiter_from_queue(key, &event);
129 })
130 }
131
132 pub fn wake<L>(
137 &self,
138 locked: &mut Locked<L>,
139 task: &Task,
140 addr: UserAddress,
141 count: usize,
142 mask: u32,
143 ) -> Result<usize, Errno>
144 where
145 L: LockBefore<TerminalLock>,
146 {
147 let addr = FutexAddress::try_from(addr)?;
148 let key = Key::get(task, addr)?;
149 Ok(self.state.lock(locked).wake(key, count, mask))
150 }
151
152 pub fn requeue<L>(
156 &self,
157 locked: &mut Locked<L>,
158 current_task: &CurrentTask,
159 addr: UserAddress,
160 wake_count: usize,
161 requeue_count: usize,
162 new_addr: UserAddress,
163 expected_value: Option<u32>,
164 ) -> Result<usize, Errno>
165 where
166 L: LockBefore<TerminalLock>,
167 {
168 let addr = FutexAddress::try_from(addr)?;
169 let new_addr = FutexAddress::try_from(new_addr)?;
170 let key = Key::get(current_task, addr)?;
171 let new_key = Key::get(current_task, new_addr)?;
172 let mut state = self.state.lock(locked);
173 if let Some(expected) = expected_value {
174 let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
177 if value != expected {
178 return error!(EAGAIN);
179 }
180 }
181
182 let woken;
183 let to_requeue;
184 match state.waiters.entry(key) {
185 Entry::Vacant(_) => return Ok(0),
186 Entry::Occupied(mut entry) => {
187 woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
189
190 to_requeue = entry.get_mut().split_for_requeue(requeue_count);
192
193 if entry.get().is_empty() {
194 entry.remove();
195 }
196 }
197 }
198
199 let requeued = to_requeue.0.len();
200 if !to_requeue.is_empty() {
201 state.get_waiters_or_default(new_key).transfer(to_requeue);
202 }
203
204 Ok(woken + requeued)
205 }
206
207 pub fn lock_pi<L>(
211 &self,
212 locked: &mut Locked<L>,
213 current_task: &CurrentTask,
214 addr: UserAddress,
215 deadline: zx::MonotonicInstant,
216 ) -> Result<(), Errno>
217 where
218 L: LockBefore<TerminalLock>,
219 {
220 let addr = FutexAddress::try_from(addr)?;
221 let mut state = self.state.lock(locked);
222 let key = Key::get(current_task, addr)?;
225
226 let tid = current_task.get_tid() as u32;
227 let mm = current_task.mm()?;
228
229 let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
233 let new_owner_tid = loop {
234 let new_owner_tid = current_value & FUTEX_TID_MASK;
235 if new_owner_tid == tid {
236 return error!(EDEADLOCK);
243 }
244
245 if current_value == 0 {
246 match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
249 CompareExchangeResult::Success => return Ok(()),
250 CompareExchangeResult::Stale { observed } => {
251 current_value = observed;
252 continue;
253 }
254 CompareExchangeResult::Error(e) => return Err(e),
255 }
256 }
257
258 let target_value = current_value | FUTEX_WAITERS;
261 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
262 CompareExchangeResult::Success => (),
263 CompareExchangeResult::Stale { observed } => {
264 current_value = observed;
265 continue;
266 }
267 CompareExchangeResult::Error(e) => return Err(e),
268 }
269 break new_owner_tid;
270 };
271
272 let event = InterruptibleEvent::new();
273 let guard = event.begin_wait();
274 let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
275 state
276 .get_rt_mutex_waiters_or_default(key.clone())
277 .push_back(RtMutexWaiter { tid, notifiable });
278 std::mem::drop(state);
279
280 current_task
284 .get_task(new_owner_tid as i32)
285 .upgrade()
286 .and_then(|o| o.thread.read().as_ref().map(Arc::clone))
287 .map_or_else(
288 || error!(ESRCH),
289 |owner| current_task.block_with_owner_until(guard, &owner, deadline),
290 )
291 .inspect_err(|_| {
292 self.state.lock(locked).remove_rt_mutex_waiter_from_queue(key, &event);
296 })
297 }
298
299 pub fn unlock_pi<L>(
303 &self,
304 locked: &mut Locked<L>,
305 current_task: &CurrentTask,
306 addr: UserAddress,
307 ) -> Result<(), Errno>
308 where
309 L: LockBefore<TerminalLock>,
310 {
311 let addr = FutexAddress::try_from(addr)?;
312 let mut state = self.state.lock(locked);
313 let tid = current_task.get_tid() as u32;
314 let mm = current_task.mm()?;
315
316 let key = Key::get(current_task, addr)?;
317
318 let current_value = mm.atomic_load_u32_relaxed(addr)?;
322 if current_value & FUTEX_TID_MASK != tid {
323 return error!(EPERM);
328 }
329
330 loop {
331 let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
332 let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
333
334 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
337 CompareExchangeResult::Success => (),
338 CompareExchangeResult::Stale { .. } => return error!(EINVAL),
347 CompareExchangeResult::Error(_) => return error!(EACCES),
351 }
352
353 let Some(mut waiter) = maybe_waiter else {
354 break;
356 };
357
358 if waiter.notifiable.notify() {
359 break;
360 }
361
362 }
365
366 Ok(())
367 }
368}
369
370impl FutexTable<SharedFutexKey> {
371 pub fn external_wait<L>(
381 &self,
382 locked: &mut Locked<L>,
383 memory: MemoryObject,
384 offset: u64,
385 value: u32,
386 mask: u32,
387 ) -> Result<(Arc<()>, oneshot::Receiver<()>), Errno>
388 where
389 L: LockBefore<TerminalLock>,
390 {
391 let key = SharedFutexKey::new(&memory, offset);
392 let mut state = self.state.lock(locked);
393 Self::external_check_futex_value(&memory, offset, value)?;
395
396 let token = Arc::new(());
397 let (sender, receiver) = oneshot::channel::<()>();
398 state.get_waiters_or_default(key).add(FutexWaiter {
399 mask,
400 notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
401 });
402 Ok((token, receiver))
403 }
404
405 pub fn external_wake<L>(
410 &self,
411 locked: &mut Locked<L>,
412 memory: MemoryObject,
413 offset: u64,
414 count: usize,
415 mask: u32,
416 ) -> Result<usize, Errno>
417 where
418 L: LockBefore<TerminalLock>,
419 {
420 Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
421 }
422
423 fn external_check_futex_value(
424 memory: &MemoryObject,
425 offset: u64,
426 value: u32,
427 ) -> Result<(), Errno> {
428 let loaded_value = {
429 let mut buf = [0u8; 4];
431 memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
432 u32::from_ne_bytes(buf)
433 };
434 if loaded_value != value {
435 return error!(EAGAIN);
436 }
437 Ok(())
438 }
439}
440
441pub trait FutexKey: Sized + Ord + Hash + Clone {
442 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
443 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
444}
445
446#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
447pub struct PrivateFutexKey {
448 addr: FutexAddress,
449}
450
451impl FutexKey for PrivateFutexKey {
452 fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
453 Ok(PrivateFutexKey { addr })
454 }
455
456 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
457 Ok(task.mm()?.futex.clone())
458 }
459}
460
461#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
462pub struct SharedFutexKey {
463 koid: zx::Koid,
466 offset: u64,
467}
468
469impl FutexKey for SharedFutexKey {
470 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
471 let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
472 Ok(SharedFutexKey::new(&memory, offset))
473 }
474
475 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
476 Ok(task.kernel().shared_futexes.clone())
477 }
478}
479
480impl SharedFutexKey {
481 fn new(memory: &MemoryObject, offset: u64) -> Self {
482 Self { koid: memory.get_koid(), offset }
483 }
484}
485
486struct FutexTableState<Key: FutexKey> {
487 waiters: HashMap<Key, FutexWaiters>,
488 rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
489}
490
491impl<Key: FutexKey> Default for FutexTableState<Key> {
492 fn default() -> Self {
493 Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
494 }
495}
496
497impl<Key: FutexKey> FutexTableState<Key> {
498 fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
500 self.waiters.entry(key).or_default()
501 }
502
503 fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
504 let entry = self.waiters.entry(key);
505 match entry {
506 Entry::Vacant(_) => 0,
507 Entry::Occupied(mut entry) => {
508 let count = entry.get_mut().notify(mask, count);
509 if entry.get().is_empty() {
510 entry.remove();
511 }
512 count
513 }
514 }
515 }
516
517 fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
520 self.rt_mutex_waiters.entry(key).or_default()
521 }
522
523 fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
525 let entry = self.rt_mutex_waiters.entry(key);
526 match entry {
527 Entry::Vacant(_) => None,
528 Entry::Occupied(mut entry) => {
529 let mut waiter = entry.get_mut().pop_front();
530 if entry.get().is_empty() {
534 entry.remove();
535 } else if let Some(waiter) = &mut waiter {
536 waiter.tid |= FUTEX_WAITERS;
537 }
538 waiter
539 }
540 }
541 }
542
543 fn remove_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
549 if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
550 if entry.get_mut().remove_waiter(event) {
551 if entry.get().is_empty() {
552 entry.remove();
553 }
554 return;
555 }
556 }
557
558 let mut key_to_remove = None;
559 for (key, waiters) in self.waiters.iter_mut() {
560 if waiters.remove_waiter(event) {
561 if waiters.is_empty() {
562 key_to_remove = Some(key.clone());
563 }
564 break;
565 }
566 }
567 if let Some(key) = key_to_remove {
568 self.waiters.remove(&key);
569 }
570 }
571
572 fn remove_boot_waiter_from_queue(&mut self, key: Key, waiter: &Arc<Waiter>) {
577 if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
578 if entry.get_mut().remove_boot_waiter(waiter) {
579 if entry.get().is_empty() {
580 entry.remove();
581 }
582 return;
583 }
584 }
585
586 let mut key_to_remove = None;
587 for (key, waiters) in self.waiters.iter_mut() {
588 if waiters.remove_boot_waiter(waiter) {
589 if waiters.is_empty() {
590 key_to_remove = Some(key.clone());
591 }
592 break;
593 }
594 }
595 if let Some(key) = key_to_remove {
596 self.waiters.remove(&key);
597 }
598 }
599
600 fn remove_rt_mutex_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
606 let predicate =
607 |w: &RtMutexWaiter| !w.notifiable.matches_event(event) && !w.notifiable.is_stale();
608
609 if let Entry::Occupied(mut entry) = self.rt_mutex_waiters.entry(key) {
610 let len_before = entry.get().len();
611 entry.get_mut().retain(predicate);
612 if entry.get().len() < len_before {
613 if entry.get().is_empty() {
614 entry.remove();
615 }
616 return;
617 }
618 }
619
620 let mut key_to_remove = None;
621 for (key, waiters) in self.rt_mutex_waiters.iter_mut() {
622 let len_before = waiters.len();
623 waiters.retain(predicate);
624 if waiters.len() < len_before {
625 if waiters.is_empty() {
626 key_to_remove = Some(key.clone());
627 }
628 break;
629 }
630 }
631 if let Some(key) = key_to_remove {
632 self.rt_mutex_waiters.remove(&key);
633 }
634 }
635}
636
637enum FutexNotifiable {
639 Internal(Weak<InterruptibleEvent>),
641 InternalBoot(Weak<Waiter>),
643 External(Weak<()>, Option<oneshot::Sender<()>>),
647}
648
649impl FutexNotifiable {
650 fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
651 Self::Internal(event)
652 }
653
654 fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
655 Self::InternalBoot(waiter)
656 }
657
658 fn new_external(token: Weak<()>, sender: oneshot::Sender<()>) -> Self {
659 Self::External(token, Some(sender))
660 }
661
662 fn notify(&mut self) -> bool {
665 match self {
666 Self::Internal(event) => {
667 if let Some(event) = event.upgrade() {
668 event.notify();
669 true
670 } else {
671 false
672 }
673 }
674 Self::InternalBoot(waiter) => {
675 if let Some(waiter) = waiter.upgrade() {
676 waiter.notify();
677 true
678 } else {
679 false
680 }
681 }
682 Self::External(_, sender) => {
683 if let Some(sender) = sender.take() {
684 sender.send(()).is_ok()
685 } else {
686 false
687 }
688 }
689 }
690 }
691
692 fn matches_event(&self, event: &Arc<InterruptibleEvent>) -> bool {
693 match self {
694 Self::Internal(weak) => {
695 if let Some(strong) = weak.upgrade() {
696 Arc::ptr_eq(&strong, event)
697 } else {
698 false
699 }
700 }
701 _ => false,
702 }
703 }
704
705 fn matches_waiter(&self, waiter: &Arc<Waiter>) -> bool {
706 match self {
707 Self::InternalBoot(weak) => {
708 if let Some(strong) = weak.upgrade() {
709 Arc::ptr_eq(&strong, waiter)
710 } else {
711 false
712 }
713 }
714 _ => false,
715 }
716 }
717
718 fn is_stale(&self) -> bool {
719 match self {
720 Self::Internal(weak) => weak.strong_count() == 0,
721 Self::External(weak, _) => weak.strong_count() == 0,
722 Self::InternalBoot(weak) => weak.strong_count() == 0,
723 }
724 }
725}
726
727struct FutexWaiter {
728 mask: u32,
729 notifiable: FutexNotifiable,
730}
731
732#[derive(Default)]
733struct FutexWaiters(VecDeque<FutexWaiter>);
734
735impl FutexWaiters {
736 fn add(&mut self, waiter: FutexWaiter) {
737 self.0.push_back(waiter);
738 }
739
740 fn notify(&mut self, mask: u32, count: usize) -> usize {
741 let mut woken = 0;
742 self.0.retain_mut(|waiter| {
743 if woken == count || waiter.mask & mask == 0 {
744 return true;
745 }
746 if waiter.notifiable.notify() {
749 woken += 1;
750 }
751 false
752 });
753 woken
754 }
755
756 fn transfer(&mut self, mut other: Self) {
757 self.0.append(&mut other.0);
758 }
759
760 fn is_empty(&self) -> bool {
761 self.0.is_empty()
762 }
763
764 fn remove_waiter(&mut self, event: &Arc<InterruptibleEvent>) -> bool {
765 let initial_len = self.0.len();
766 self.0.retain(|w| !w.notifiable.matches_event(event) && !w.notifiable.is_stale());
767 self.0.len() < initial_len
768 }
769
770 fn remove_boot_waiter(&mut self, waiter: &Arc<Waiter>) -> bool {
771 let initial_len = self.0.len();
772 self.0.retain(|w| !w.notifiable.matches_waiter(waiter) && !w.notifiable.is_stale());
773 self.0.len() < initial_len
774 }
775
776 fn split_for_requeue(&mut self, count: usize) -> Self {
777 let count = std::cmp::min(count, self.0.len());
778 let tail = self.0.split_off(count);
779 let head = std::mem::replace(&mut self.0, tail);
780 FutexWaiters(head)
781 }
782}
783
784struct RtMutexWaiter {
785 tid: u32,
787
788 notifiable: FutexNotifiable,
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use starnix_sync::InterruptibleEvent;
795 use starnix_uapi::restricted_aspace::RESTRICTED_ASPACE_BASE;
796 use starnix_uapi::user_address::UserAddress;
797
798 #[fuchsia::test]
799 fn test_remove_waiter_simple() {
800 let mut state = FutexTableState::<PrivateFutexKey>::default();
801 let key = PrivateFutexKey {
802 addr: FutexAddress::try_from(UserAddress::from(
803 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
804 ))
805 .unwrap(),
806 };
807 let event = Arc::new(InterruptibleEvent::new());
808
809 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
810 mask: u32::MAX,
811 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
812 });
813
814 assert_eq!(state.waiters.len(), 1);
815 state.remove_waiter_from_queue(key, &event);
816 assert_eq!(state.waiters.len(), 0);
817 }
818
819 #[fuchsia::test]
820 fn test_remove_waiter_requeued() {
821 let mut state = FutexTableState::<PrivateFutexKey>::default();
822 let key1 = PrivateFutexKey {
823 addr: FutexAddress::try_from(UserAddress::from(
824 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
825 ))
826 .unwrap(),
827 };
828 let key2 = PrivateFutexKey {
829 addr: FutexAddress::try_from(UserAddress::from(
830 (RESTRICTED_ASPACE_BASE + 0x2000) as u64,
831 ))
832 .unwrap(),
833 };
834 let event = Arc::new(InterruptibleEvent::new());
835
836 state.get_waiters_or_default(key2.clone()).add(FutexWaiter {
837 mask: u32::MAX,
838 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
839 });
840
841 assert_eq!(state.waiters.len(), 1);
842 state.remove_waiter_from_queue(key1, &event);
843 assert_eq!(state.waiters.len(), 0);
844 }
845
846 #[fuchsia::test]
847 fn test_remove_rt_mutex_waiter() {
848 let mut state = FutexTableState::<PrivateFutexKey>::default();
849 let key = PrivateFutexKey {
850 addr: FutexAddress::try_from(UserAddress::from(
851 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
852 ))
853 .unwrap(),
854 };
855 let event = Arc::new(InterruptibleEvent::new());
856
857 state.get_rt_mutex_waiters_or_default(key.clone()).push_back(RtMutexWaiter {
858 tid: 1,
859 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
860 });
861
862 assert_eq!(state.rt_mutex_waiters.len(), 1);
863 state.remove_rt_mutex_waiter_from_queue(key, &event);
864 assert_eq!(state.rt_mutex_waiters.len(), 0);
865 }
866
867 #[fuchsia::test]
868 fn test_split_for_requeue_fairness() {
869 let mut waiters = FutexWaiters::default();
870 let e1 = Arc::new(InterruptibleEvent::new());
871 let e2 = Arc::new(InterruptibleEvent::new());
872 let e3 = Arc::new(InterruptibleEvent::new());
873
874 waiters.add(FutexWaiter {
875 mask: 1,
876 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e1)),
877 });
878 waiters.add(FutexWaiter {
879 mask: 2,
880 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e2)),
881 });
882 waiters.add(FutexWaiter {
883 mask: 3,
884 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e3)),
885 });
886
887 let split = waiters.split_for_requeue(2);
888
889 assert_eq!(split.0.len(), 2);
890 assert_eq!(split.0[0].mask, 1);
891 assert_eq!(split.0[1].mask, 2);
892
893 assert_eq!(waiters.0.len(), 1);
894 assert_eq!(waiters.0[0].mask, 3);
895 }
896
897 #[fuchsia::test]
898 fn test_stale_external_waiter_cleanup() {
899 let mut state = FutexTableState::<PrivateFutexKey>::default();
900 let key = PrivateFutexKey {
901 addr: FutexAddress::try_from(UserAddress::from(
902 (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
903 ))
904 .unwrap(),
905 };
906
907 {
908 let token = Arc::new(());
909 let (sender, _receiver) = oneshot::channel::<()>();
910 state.get_waiters_or_default(key.clone()).add(FutexWaiter {
911 mask: u32::MAX,
912 notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
913 });
914 } assert_eq!(state.waiters.len(), 1);
917
918 let dummy_event = Arc::new(InterruptibleEvent::new());
920 state.remove_waiter_from_queue(key, &dummy_event);
921
922 assert_eq!(state.waiters.len(), 0, "Stale external waiter should be removed");
923 }
924}