1use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{InterruptibleEvent, LockBefore, Locked, OrderedMutex, TerminalLock, Unlocked};
10use starnix_types::futex_address::FutexAddress;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::user_address::UserAddress;
13use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, VecDeque};
16use std::hash::Hash;
17use std::sync::{Arc, Weak};
18
19pub struct FutexTable<Key: FutexKey> {
25 state: OrderedMutex<FutexTableState<Key>, TerminalLock>,
29}
30
31impl<Key: FutexKey> Default for FutexTable<Key> {
32 fn default() -> Self {
33 Self { state: OrderedMutex::new(FutexTableState::default()) }
34 }
35}
36
37impl<Key: FutexKey> FutexTable<Key> {
38 pub fn wait_boot(
42 &self,
43 locked: &mut Locked<Unlocked>,
44 current_task: &CurrentTask,
45 addr: UserAddress,
46 value: u32,
47 mask: u32,
48 deadline: zx::BootInstant,
49 timer_slack: zx::BootDuration,
50 ) -> Result<(), Errno> {
51 let addr = FutexAddress::try_from(addr)?;
52 let mut state = self.state.lock(locked);
53 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
58 if value != loaded_value {
59 return error!(EAGAIN);
60 }
61
62 let key = Key::get(current_task, addr)?;
63 let waiter = Arc::new(Waiter::new());
64 let timer = zx::BootTimer::create();
65 let signal_handler = SignalHandler {
66 inner: SignalHandlerInner::None,
67 event_handler: EventHandler::None,
68 err_code: Some(errno!(ETIMEDOUT)),
69 };
70 waiter
71 .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
72 .expect("wait can only fail in OOM conditions");
73 timer
74 .set(deadline, timer_slack)
75 .expect("timer set cannot fail with valid handles and slack");
76 state.get_waiters_or_default(key).add(FutexWaiter {
77 mask,
78 notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
79 });
80 std::mem::drop(state);
81 waiter.wait(locked, current_task)
82 }
83
84 pub fn wait<L>(
88 &self,
89 locked: &mut Locked<L>,
90 current_task: &CurrentTask,
91 addr: UserAddress,
92 value: u32,
93 mask: u32,
94 deadline: zx::MonotonicInstant,
95 ) -> Result<(), Errno>
96 where
97 L: LockBefore<TerminalLock>,
98 {
99 let addr = FutexAddress::try_from(addr)?;
100 let mut state = self.state.lock(locked);
101 let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
106 if value != loaded_value {
107 return error!(EAGAIN);
108 }
109
110 let key = Key::get(current_task, addr)?;
111 let event = InterruptibleEvent::new();
112 let guard = event.begin_wait();
113 state.get_waiters_or_default(key).add(FutexWaiter {
114 mask,
115 notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
116 });
117 std::mem::drop(state);
118
119 current_task.block_until(guard, deadline)
120 }
121
122 pub fn wake<L>(
127 &self,
128 locked: &mut Locked<L>,
129 task: &Task,
130 addr: UserAddress,
131 count: usize,
132 mask: u32,
133 ) -> Result<usize, Errno>
134 where
135 L: LockBefore<TerminalLock>,
136 {
137 let addr = FutexAddress::try_from(addr)?;
138 let key = Key::get(task, addr)?;
139 Ok(self.state.lock(locked).wake(key, count, mask))
140 }
141
142 pub fn requeue<L>(
146 &self,
147 locked: &mut Locked<L>,
148 current_task: &CurrentTask,
149 addr: UserAddress,
150 wake_count: usize,
151 requeue_count: usize,
152 new_addr: UserAddress,
153 expected_value: Option<u32>,
154 ) -> Result<usize, Errno>
155 where
156 L: LockBefore<TerminalLock>,
157 {
158 let addr = FutexAddress::try_from(addr)?;
159 let new_addr = FutexAddress::try_from(new_addr)?;
160 let key = Key::get(current_task, addr)?;
161 let new_key = Key::get(current_task, new_addr)?;
162 let mut state = self.state.lock(locked);
163 if let Some(expected) = expected_value {
164 let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
167 if value != expected {
168 return error!(EAGAIN);
169 }
170 }
171
172 let woken;
173 let to_requeue;
174 match state.waiters.entry(key) {
175 Entry::Vacant(_) => return Ok(0),
176 Entry::Occupied(mut entry) => {
177 woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
179
180 to_requeue = entry.get_mut().split_for_requeue(requeue_count);
182
183 if entry.get().is_empty() {
184 entry.remove();
185 }
186 }
187 }
188
189 let requeued = to_requeue.0.len();
190 if !to_requeue.is_empty() {
191 state.get_waiters_or_default(new_key).transfer(to_requeue);
192 }
193
194 Ok(woken + requeued)
195 }
196
197 pub fn lock_pi<L>(
201 &self,
202 locked: &mut Locked<L>,
203 current_task: &CurrentTask,
204 addr: UserAddress,
205 deadline: zx::MonotonicInstant,
206 ) -> Result<(), Errno>
207 where
208 L: LockBefore<TerminalLock>,
209 {
210 let addr = FutexAddress::try_from(addr)?;
211 let mut state = self.state.lock(locked);
212 let key = Key::get(current_task, addr)?;
215
216 let tid = current_task.get_tid() as u32;
217 let mm = current_task.mm()?;
218
219 let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
223 let new_owner_tid = loop {
224 let new_owner_tid = current_value & FUTEX_TID_MASK;
225 if new_owner_tid == tid {
226 return error!(EDEADLOCK);
233 }
234
235 if current_value == 0 {
236 match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
239 CompareExchangeResult::Success => return Ok(()),
240 CompareExchangeResult::Stale { observed } => {
241 current_value = observed;
242 continue;
243 }
244 CompareExchangeResult::Error(e) => return Err(e),
245 }
246 }
247
248 let target_value = current_value | FUTEX_WAITERS;
251 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
252 CompareExchangeResult::Success => (),
253 CompareExchangeResult::Stale { observed } => {
254 current_value = observed;
255 continue;
256 }
257 CompareExchangeResult::Error(e) => return Err(e),
258 }
259 break new_owner_tid;
260 };
261
262 let event = InterruptibleEvent::new();
263 let guard = event.begin_wait();
264 let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
265 state.get_rt_mutex_waiters_or_default(key).push_back(RtMutexWaiter { tid, notifiable });
266 std::mem::drop(state);
267
268 let new_owner = current_task
272 .get_task(new_owner_tid as i32)
273 .upgrade()
274 .map(|o| o.thread.read().as_ref().map(Arc::clone))
275 .flatten()
276 .ok_or_else(|| errno!(ESRCH))?;
277 current_task.block_with_owner_until(guard, &new_owner, deadline)
278 }
279
280 pub fn unlock_pi<L>(
284 &self,
285 locked: &mut Locked<L>,
286 current_task: &CurrentTask,
287 addr: UserAddress,
288 ) -> Result<(), Errno>
289 where
290 L: LockBefore<TerminalLock>,
291 {
292 let addr = FutexAddress::try_from(addr)?;
293 let mut state = self.state.lock(locked);
294 let tid = current_task.get_tid() as u32;
295 let mm = current_task.mm()?;
296
297 let key = Key::get(current_task, addr)?;
298
299 let current_value = mm.atomic_load_u32_relaxed(addr)?;
303 if current_value & FUTEX_TID_MASK != tid {
304 return error!(EPERM);
309 }
310
311 loop {
312 let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
313 let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
314
315 match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
318 CompareExchangeResult::Success => (),
319 CompareExchangeResult::Stale { .. } => return error!(EINVAL),
328 CompareExchangeResult::Error(_) => return error!(EACCES),
332 }
333
334 let Some(mut waiter) = maybe_waiter else {
335 break;
337 };
338
339 if waiter.notifiable.notify() {
340 break;
341 }
342
343 }
346
347 Ok(())
348 }
349}
350
351impl FutexTable<SharedFutexKey> {
352 pub fn external_wait<L>(
356 &self,
357 locked: &mut Locked<L>,
358 memory: MemoryObject,
359 offset: u64,
360 value: u32,
361 mask: u32,
362 ) -> Result<oneshot::Receiver<()>, Errno>
363 where
364 L: LockBefore<TerminalLock>,
365 {
366 let key = SharedFutexKey::new(&memory, offset);
367 let mut state = self.state.lock(locked);
368 Self::external_check_futex_value(&memory, offset, value)?;
370
371 let (sender, receiver) = oneshot::channel::<()>();
372 state
373 .get_waiters_or_default(key)
374 .add(FutexWaiter { mask, notifiable: FutexNotifiable::new_external(sender) });
375 Ok(receiver)
376 }
377
378 pub fn external_wake<L>(
383 &self,
384 locked: &mut Locked<L>,
385 memory: MemoryObject,
386 offset: u64,
387 count: usize,
388 mask: u32,
389 ) -> Result<usize, Errno>
390 where
391 L: LockBefore<TerminalLock>,
392 {
393 Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
394 }
395
396 fn external_check_futex_value(
397 memory: &MemoryObject,
398 offset: u64,
399 value: u32,
400 ) -> Result<(), Errno> {
401 let loaded_value = {
402 let mut buf = [0u8; 4];
404 memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
405 u32::from_ne_bytes(buf)
406 };
407 if loaded_value != value {
408 return error!(EAGAIN);
409 }
410 Ok(())
411 }
412}
413
414pub trait FutexKey: Sized + Ord + Hash + Clone {
415 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
416 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
417}
418
419#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
420pub struct PrivateFutexKey {
421 addr: FutexAddress,
422}
423
424impl FutexKey for PrivateFutexKey {
425 fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
426 Ok(PrivateFutexKey { addr })
427 }
428
429 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
430 Ok(task.mm()?.futex.clone())
431 }
432}
433
434#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
435pub struct SharedFutexKey {
436 koid: zx::Koid,
439 offset: u64,
440}
441
442impl FutexKey for SharedFutexKey {
443 fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
444 let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
445 Ok(SharedFutexKey::new(&memory, offset))
446 }
447
448 fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
449 Ok(task.kernel().shared_futexes.clone())
450 }
451}
452
453impl SharedFutexKey {
454 fn new(memory: &MemoryObject, offset: u64) -> Self {
455 Self { koid: memory.get_koid(), offset }
456 }
457}
458
459struct FutexTableState<Key: FutexKey> {
460 waiters: HashMap<Key, FutexWaiters>,
461 rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
462}
463
464impl<Key: FutexKey> Default for FutexTableState<Key> {
465 fn default() -> Self {
466 Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
467 }
468}
469
470impl<Key: FutexKey> FutexTableState<Key> {
471 fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
473 self.waiters.entry(key).or_default()
474 }
475
476 fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
477 let entry = self.waiters.entry(key);
478 match entry {
479 Entry::Vacant(_) => 0,
480 Entry::Occupied(mut entry) => {
481 let count = entry.get_mut().notify(mask, count);
482 if entry.get().is_empty() {
483 entry.remove();
484 }
485 count
486 }
487 }
488 }
489
490 fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
493 self.rt_mutex_waiters.entry(key).or_default()
494 }
495
496 fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
498 let entry = self.rt_mutex_waiters.entry(key);
499 match entry {
500 Entry::Vacant(_) => None,
501 Entry::Occupied(mut entry) => {
502 if let Some(mut waiter) = entry.get_mut().pop_front() {
503 if entry.get().is_empty() {
504 entry.remove();
505 } else {
506 waiter.tid |= FUTEX_WAITERS;
507 }
508 Some(waiter)
509 } else {
510 None
511 }
512 }
513 }
514 }
515}
516
517enum FutexNotifiable {
519 Internal(Weak<InterruptibleEvent>),
521 InternalBoot(Weak<Waiter>),
523 External(Option<oneshot::Sender<()>>),
527}
528
529impl FutexNotifiable {
530 fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
531 Self::Internal(event)
532 }
533
534 fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
535 Self::InternalBoot(waiter)
536 }
537
538 fn new_external(sender: oneshot::Sender<()>) -> Self {
539 Self::External(Some(sender))
540 }
541
542 fn notify(&mut self) -> bool {
545 match self {
546 Self::Internal(event) => {
547 if let Some(event) = event.upgrade() {
548 event.notify();
549 true
550 } else {
551 false
552 }
553 }
554 Self::InternalBoot(waiter) => {
555 if let Some(waiter) = waiter.upgrade() {
556 waiter.notify();
557 true
558 } else {
559 false
560 }
561 }
562 Self::External(sender) => {
563 if let Some(sender) = sender.take() {
564 sender.send(()).is_ok()
565 } else {
566 false
567 }
568 }
569 }
570 }
571}
572
573struct FutexWaiter {
574 mask: u32,
575 notifiable: FutexNotifiable,
576}
577
578#[derive(Default)]
579struct FutexWaiters(VecDeque<FutexWaiter>);
580
581impl FutexWaiters {
582 fn add(&mut self, waiter: FutexWaiter) {
583 self.0.push_back(waiter);
584 }
585
586 fn notify(&mut self, mask: u32, count: usize) -> usize {
587 let mut woken = 0;
588 self.0.retain_mut(|waiter| {
589 if woken == count || waiter.mask & mask == 0 {
590 return true;
591 }
592 if waiter.notifiable.notify() {
595 woken += 1;
596 }
597 false
598 });
599 woken
600 }
601
602 fn transfer(&mut self, mut other: Self) {
603 self.0.append(&mut other.0);
604 }
605
606 fn is_empty(&self) -> bool {
607 self.0.is_empty()
608 }
609
610 fn split_for_requeue(&mut self, max_count: usize) -> Self {
611 let pos = if max_count >= self.0.len() { 0 } else { self.0.len() - max_count };
612 FutexWaiters(self.0.split_off(pos))
613 }
614}
615
616struct RtMutexWaiter {
617 tid: u32,
619
620 notifiable: FutexNotifiable,
621}