Skip to main content

starnix_core/mm/
futex_table.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{
10    FutexTableStateLock, InterruptibleEvent, LockBefore, Locked, OrderedMutex, Unlocked,
11};
12use starnix_types::futex_address::FutexAddress;
13use starnix_uapi::errors::Errno;
14use starnix_uapi::user_address::UserAddress;
15use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, VecDeque};
18use std::hash::Hash;
19use std::sync::{Arc, Weak};
20
21/// A table of futexes.
22///
23/// Each 32-bit aligned address in an address space can potentially have an associated futex that
24/// userspace can wait upon. This table is a sparse representation that has an actual WaitQueue
25/// only for those addresses that have ever actually had a futex operation performed on them.
26pub struct FutexTable<Key: FutexKey> {
27    /// The futexes associated with each address in each VMO.
28    ///
29    /// This HashMap is populated on-demand when futexes are used.
30    state: OrderedMutex<FutexTableState<Key>, FutexTableStateLock>,
31}
32
33impl<Key: FutexKey> Default for FutexTable<Key> {
34    fn default() -> Self {
35        Self { state: OrderedMutex::new(FutexTableState::default()) }
36    }
37}
38
39impl<Key: FutexKey> FutexTable<Key> {
40    /// Wait on the futex at the given address given a boot deadline.
41    ///
42    /// See FUTEX_WAIT when passed a deadline in CLOCK_REALTIME.
43    pub fn wait_boot(
44        &self,
45        locked: &mut Locked<Unlocked>,
46        current_task: &CurrentTask,
47        addr: UserAddress,
48        value: u32,
49        mask: u32,
50        deadline: zx::BootInstant,
51        timer_slack: zx::BootDuration,
52    ) -> Result<(), Errno> {
53        let addr = FutexAddress::try_from(addr)?;
54        let mut state = self.state.lock(locked);
55        // As the state is locked, no wake can happen before the waiter is registered.
56        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
57        // Acquire ordering to synchronize with userspace modifications to the value on other
58        // threads.
59        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
60        if value != loaded_value {
61            return error!(EAGAIN);
62        }
63
64        let key = Key::get(current_task, addr)?;
65        let waiter = Arc::new(Waiter::new());
66        let timer = zx::BootTimer::create();
67        let signal_handler = SignalHandler {
68            inner: SignalHandlerInner::None,
69            event_handler: EventHandler::None,
70            err_code: Some(errno!(ETIMEDOUT)),
71        };
72        waiter
73            .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
74            .expect("wait can only fail in OOM conditions");
75        timer
76            .set(deadline, timer_slack)
77            .expect("timer set cannot fail with valid handles and slack");
78        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
79            mask,
80            notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
81        });
82        std::mem::drop(state);
83        waiter.wait(locked, current_task).inspect_err(|_| {
84            // If wait returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly
85            // remove our waiter from the queue to prevent a memory leak.
86            // If it succeeded, the waker has already removed us from the queue.
87            self.state.lock(locked).remove_boot_waiter_from_queue(key, &waiter);
88        })
89    }
90
91    /// Wait on the futex at the given address.
92    ///
93    /// See FUTEX_WAIT.
94    pub fn wait<L>(
95        &self,
96        locked: &mut Locked<L>,
97        current_task: &CurrentTask,
98        addr: UserAddress,
99        value: u32,
100        mask: u32,
101        deadline: zx::MonotonicInstant,
102    ) -> Result<(), Errno>
103    where
104        L: LockBefore<FutexTableStateLock>,
105    {
106        let addr = FutexAddress::try_from(addr)?;
107        let mut state = self.state.lock(locked);
108        // As the state is locked, no wake can happen before the waiter is registered.
109        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
110        // Acquire ordering to synchronize with userspace modifications to the value on other
111        // threads.
112        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
113        if value != loaded_value {
114            return error!(EAGAIN);
115        }
116
117        let key = Key::get(current_task, addr)?;
118        let event = InterruptibleEvent::new();
119        let guard = event.begin_wait();
120        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
121            mask,
122            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
123        });
124        std::mem::drop(state);
125
126        current_task.block_until(guard, deadline).inspect_err(|_| {
127            // If block_until returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly
128            // remove our waiter from the queue to prevent a memory leak.
129            // If it succeeded, the waker has already removed us from the queue.
130            self.state.lock(locked).remove_waiter_from_queue(key, &event);
131        })
132    }
133
134    /// Wake the given number of waiters on futex at the given address. Returns the number of
135    /// waiters actually woken.
136    ///
137    /// See FUTEX_WAKE.
138    pub fn wake<L>(
139        &self,
140        locked: &mut Locked<L>,
141        task: &Task,
142        addr: UserAddress,
143        count: usize,
144        mask: u32,
145    ) -> Result<usize, Errno>
146    where
147        L: LockBefore<FutexTableStateLock>,
148    {
149        let addr = FutexAddress::try_from(addr)?;
150        let key = Key::get(task, addr)?;
151        Ok(self.state.lock(locked).wake(key, count, mask))
152    }
153
154    /// Requeue the waiters to another address.
155    ///
156    /// See FUTEX_CMP_REQUEUE
157    pub fn requeue<L>(
158        &self,
159        locked: &mut Locked<L>,
160        current_task: &CurrentTask,
161        addr: UserAddress,
162        wake_count: usize,
163        requeue_count: usize,
164        new_addr: UserAddress,
165        expected_value: Option<u32>,
166    ) -> Result<usize, Errno>
167    where
168        L: LockBefore<FutexTableStateLock>,
169    {
170        let addr = FutexAddress::try_from(addr)?;
171        let new_addr = FutexAddress::try_from(new_addr)?;
172        let key = Key::get(current_task, addr)?;
173        let new_key = Key::get(current_task, new_addr)?;
174        let mut state = self.state.lock(locked);
175        if let Some(expected) = expected_value {
176            // Use acquire ordering here to synchronize with mutex impls that store w/ release
177            // ordering.
178            let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
179            if value != expected {
180                return error!(EAGAIN);
181            }
182        }
183
184        Ok(state.requeue(key, new_key, wake_count, requeue_count))
185    }
186
187    /// Lock the futex at the given address.
188    ///
189    /// See FUTEX_LOCK_PI.
190    pub fn lock_pi<L>(
191        &self,
192        locked: &mut Locked<L>,
193        current_task: &CurrentTask,
194        addr: UserAddress,
195        deadline: zx::MonotonicInstant,
196    ) -> Result<(), Errno>
197    where
198        L: LockBefore<FutexTableStateLock>,
199    {
200        let addr = FutexAddress::try_from(addr)?;
201        let mut state = self.state.lock(locked);
202        // As the state is locked, no unlock can happen before the waiter is registered.
203        // If the addr is remapped, we will read stale data, but we will not miss a futex unlock.
204        let key = Key::get(current_task, addr)?;
205
206        let tid = current_task.get_tid() as u32;
207        let mm = current_task.mm()?;
208
209        // Use a relaxed ordering because the compare/exchange below creates a synchronization
210        // point with userspace threads in the success case. No synchronization is required in
211        // failure cases.
212        let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
213        let new_owner_tid = loop {
214            let new_owner_tid = current_value & FUTEX_TID_MASK;
215            if new_owner_tid == tid {
216                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
217                //
218                //   EDEADLK
219                //          (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
220                //          FUTEX_CMP_REQUEUE_PI) The futex word at uaddr is already
221                //          locked by the caller.
222                return error!(EDEADLOCK);
223            }
224
225            if current_value == 0 {
226                // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops
227                // and with the release ordering on userspace unlock ops.
228                match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
229                    CompareExchangeResult::Success => return Ok(()),
230                    CompareExchangeResult::Stale { observed } => {
231                        current_value = observed;
232                        continue;
233                    }
234                    CompareExchangeResult::Error(e) => return Err(e),
235                }
236            }
237
238            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
239            // with the release ordering on userspace unlock ops.
240            let target_value = current_value | FUTEX_WAITERS;
241            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
242                CompareExchangeResult::Success => (),
243                CompareExchangeResult::Stale { observed } => {
244                    current_value = observed;
245                    continue;
246                }
247                CompareExchangeResult::Error(e) => return Err(e),
248            }
249            break new_owner_tid;
250        };
251
252        let event = InterruptibleEvent::new();
253        let guard = event.begin_wait();
254        let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
255        state
256            .get_rt_mutex_waiters_or_default(key.clone())
257            .push_back(RtMutexWaiter { tid, notifiable });
258        std::mem::drop(state);
259
260        // ESRCH  (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
261        //        FUTEX_CMP_REQUEUE_PI) The thread ID in the futex word at
262        //        uaddr does not exist.
263        current_task
264            .get_task(new_owner_tid as i32)
265            .ok()
266            .and_then(|o| o.running_state().unwrap().thread.get().map(|t| Arc::clone(&t.thread)))
267            .map_or_else(
268                || error!(ESRCH),
269                |owner| current_task.block_with_owner_until(guard, &owner, deadline),
270            )
271            .inspect_err(|_| {
272                // If block_with_owner_until returned an error (e.g., ETIMEDOUT), or if we
273                // failed to find the new owner (ESRCH), we must explicitly remove our waiter
274                // from the PI-mutex queue to prevent a memory leak.
275                self.state.lock(locked).remove_rt_mutex_waiter_from_queue(key, &event);
276            })
277    }
278
279    /// Unlock the futex at the given address.
280    ///
281    /// See FUTEX_UNLOCK_PI.
282    pub fn unlock_pi<L>(
283        &self,
284        locked: &mut Locked<L>,
285        current_task: &CurrentTask,
286        addr: UserAddress,
287    ) -> Result<(), Errno>
288    where
289        L: LockBefore<FutexTableStateLock>,
290    {
291        let addr = FutexAddress::try_from(addr)?;
292        let mut state = self.state.lock(locked);
293        let tid = current_task.get_tid() as u32;
294        let mm = current_task.mm()?;
295
296        let key = Key::get(current_task, addr)?;
297
298        // Use a relaxed ordering because the compare/exchange below creates a synchronization
299        // point with userspace threads in the success case. No synchronization is required in
300        // failure cases.
301        let current_value = mm.atomic_load_u32_relaxed(addr)?;
302        if current_value & FUTEX_TID_MASK != tid {
303            // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
304            //
305            //   EPERM  (FUTEX_UNLOCK_PI) The caller does not own the lock
306            //          represented by the futex word.
307            return error!(EPERM);
308        }
309
310        loop {
311            let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
312            let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
313
314            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
315            // with the release ordering on userspace unlock ops.
316            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
317                CompareExchangeResult::Success => (),
318                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
319                //
320                //   EINVAL (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
321                //       FUTEX_UNLOCK_PI) The kernel detected an inconsistency
322                //       between the user-space state at uaddr and the kernel
323                //       state.  This indicates either state corruption or that the
324                //       kernel found a waiter on uaddr which is waiting via
325                //       FUTEX_WAIT or FUTEX_WAIT_BITSET.
326                CompareExchangeResult::Stale { .. } => return error!(EINVAL),
327                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
328                //
329                //   EACCES No read access to the memory of a futex word.
330                CompareExchangeResult::Error(_) => return error!(EACCES),
331            }
332
333            let Some(mut waiter) = maybe_waiter else {
334                // We can stop trying to notify a thread if there are no more waiters.
335                break;
336            };
337
338            if waiter.notifiable.notify() {
339                break;
340            }
341
342            // If we couldn't notify the waiter, then we need to pull the next thread off the
343            // waiter list.
344        }
345
346        Ok(())
347    }
348}
349
350impl FutexTable<SharedFutexKey> {
351    /// Wait on the futex at the given offset in the memory.
352    ///
353    /// Returns a receiver that will be signaled when the futex is woken, and an
354    /// `Arc<()>` token that must be kept alive by the caller for the duration of the
355    /// wait. If the caller drops the token (e.g., if the external client
356    /// disconnects), the waiter is marked as stale and will be garbage-collected by the
357    /// next futex operation on this table.
358    ///
359    /// See FUTEX_WAIT.
360    pub fn external_wait<L>(
361        &self,
362        locked: &mut Locked<L>,
363        memory: MemoryObject,
364        offset: u64,
365        value: u32,
366        mask: u32,
367    ) -> Result<(Arc<()>, oneshot::Receiver<()>), Errno>
368    where
369        L: LockBefore<FutexTableStateLock>,
370    {
371        let key = SharedFutexKey::new(&memory, offset);
372        let mut state = self.state.lock(locked);
373        // As the state is locked, no wake can happen before the waiter is registered.
374        Self::external_check_futex_value(&memory, offset, value)?;
375
376        let token = Arc::new(());
377        let (sender, receiver) = oneshot::channel::<()>();
378        state.get_waiters_or_default(key).add(FutexWaiter {
379            mask,
380            notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
381        });
382        Ok((token, receiver))
383    }
384
385    /// Wake the given number of waiters on futex at the given offset in the memory. Returns the
386    /// number of waiters actually woken.
387    ///
388    /// See FUTEX_WAKE.
389    pub fn external_wake<L>(
390        &self,
391        locked: &mut Locked<L>,
392        memory: MemoryObject,
393        offset: u64,
394        count: usize,
395        mask: u32,
396    ) -> Result<usize, Errno>
397    where
398        L: LockBefore<FutexTableStateLock>,
399    {
400        Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
401    }
402
403    pub fn external_requeue<L>(
404        &self,
405        locked: &mut Locked<L>,
406        first_memory: MemoryObject,
407        first_offset: u64,
408        second_memory: Option<MemoryObject>,
409        second_offset: u64,
410        wake_count: usize,
411        requeue_count: usize,
412        expected_value: Option<u32>,
413    ) -> Result<usize, Errno>
414    where
415        L: LockBefore<FutexTableStateLock>,
416    {
417        let first_key = SharedFutexKey::new(&first_memory, first_offset);
418        let second_key = match second_memory.as_ref() {
419            Some(second_memory) => SharedFutexKey::new(second_memory, second_offset),
420            None => SharedFutexKey::new(&first_memory, second_offset),
421        };
422        // If/when we move from a single table mutex to a mutex per futex, we'll likely want to
423        // define a consistent SharedFutexKey sort order independent of which is "first" and which
424        // is "second" in this call. Then we can acquire each of the two mutexes corresponding to
425        // each of the two futexes per that sort order. This way, we can be holding both mutexes to
426        // make the requeue atomic despite each futex having its own mutex, while avoiding
427        // deadlocks. But for now we lock the whole FutexTable.
428        let mut state = self.state.lock(locked);
429        if let Some(expected) = expected_value {
430            // The state being locked is how this is included in the set of atomic changes.
431            Self::external_check_futex_value(&first_memory, first_offset, expected)?;
432        }
433        Ok(state.requeue(first_key, second_key, wake_count, requeue_count))
434    }
435
436    fn external_check_futex_value(
437        memory: &MemoryObject,
438        offset: u64,
439        value: u32,
440    ) -> Result<(), Errno> {
441        let loaded_value = {
442            // TODO: This read should be atomic.
443            let mut buf = [0u8; 4];
444            memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
445            u32::from_ne_bytes(buf)
446        };
447        if loaded_value != value {
448            return error!(EAGAIN);
449        }
450        Ok(())
451    }
452}
453
454pub trait FutexKey: Sized + Ord + Hash + Clone {
455    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
456    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
457}
458
459#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
460pub struct PrivateFutexKey {
461    addr: FutexAddress,
462}
463
464impl FutexKey for PrivateFutexKey {
465    fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
466        Ok(PrivateFutexKey { addr })
467    }
468
469    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
470        Ok(task.mm()?.futex.clone())
471    }
472}
473
474#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
475pub struct SharedFutexKey {
476    // No chance of collisions since koids are never reused:
477    // https://fuchsia.dev/fuchsia-src/concepts/kernel/concepts#kernel_object_ids
478    koid: zx::Koid,
479    offset: u64,
480}
481
482impl FutexKey for SharedFutexKey {
483    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
484        let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
485        Ok(SharedFutexKey::new(&memory, offset))
486    }
487
488    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
489        Ok(task.kernel().shared_futexes.clone())
490    }
491}
492
493impl SharedFutexKey {
494    fn new(memory: &MemoryObject, offset: u64) -> Self {
495        Self { koid: memory.get_koid(), offset }
496    }
497}
498
499struct FutexTableState<Key: FutexKey> {
500    waiters: HashMap<Key, FutexWaiters>,
501    rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
502}
503
504impl<Key: FutexKey> Default for FutexTableState<Key> {
505    fn default() -> Self {
506        Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
507    }
508}
509
510impl<Key: FutexKey> FutexTableState<Key> {
511    /// Returns the FutexWaiters for a given address, creating an empty one if none is registered.
512    fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
513        self.waiters.entry(key).or_default()
514    }
515
516    fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
517        let entry = self.waiters.entry(key);
518        match entry {
519            Entry::Vacant(_) => 0,
520            Entry::Occupied(mut entry) => {
521                let count = entry.get_mut().notify(mask, count);
522                if entry.get().is_empty() {
523                    entry.remove();
524                }
525                count
526            }
527        }
528    }
529
530    fn requeue(
531        &mut self,
532        key: Key,
533        new_key: Key,
534        wake_count: usize,
535        requeue_count: usize,
536    ) -> usize {
537        let woken;
538        let to_requeue;
539        match self.waiters.entry(key) {
540            Entry::Vacant(_) => return 0,
541            Entry::Occupied(mut entry) => {
542                // Wake up at most `wake_count` waiters.
543                woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
544
545                // Dequeue up to `requeue_count` waiters to requeue below.
546                to_requeue = entry.get_mut().split_for_requeue(requeue_count);
547
548                if entry.get().is_empty() {
549                    entry.remove();
550                }
551            }
552        }
553
554        let requeued = to_requeue.0.len();
555        if !to_requeue.is_empty() {
556            self.get_waiters_or_default(new_key).transfer(to_requeue);
557        }
558
559        woken + requeued
560    }
561
562    /// Returns the RT-Mutex waiters queue for a given address, creating an empty queue if none is
563    /// registered.
564    fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
565        self.rt_mutex_waiters.entry(key).or_default()
566    }
567
568    /// Pop the next RT-Mutex for the given address.
569    fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
570        let entry = self.rt_mutex_waiters.entry(key);
571        match entry {
572            Entry::Vacant(_) => None,
573            Entry::Occupied(mut entry) => {
574                let mut waiter = entry.get_mut().pop_front();
575                // Clean up the hash map entry if the queue is empty. We do this
576                // regardless of whether `pop_front` returned a waiter or `None`,
577                // effectively garbage collecting erroneously empty map entries.
578                if entry.get().is_empty() {
579                    entry.remove();
580                } else if let Some(waiter) = &mut waiter {
581                    waiter.tid |= FUTEX_WAITERS;
582                }
583                waiter
584            }
585        }
586    }
587
588    /// Removes a standard `FUTEX_WAIT` waiter from the queue.
589    ///
590    /// This uses a two-step approach:
591    /// 1. O(1) Fast path: Check the `key` where the waiter originally went to sleep.
592    /// 2. O(N) Fallback: If not found (e.g. moved via `FUTEX_REQUEUE`), scan all futexes.
593    fn remove_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
594        if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
595            if entry.get_mut().remove_waiter(event) {
596                if entry.get().is_empty() {
597                    entry.remove();
598                }
599                return;
600            }
601        }
602
603        let mut key_to_remove = None;
604        for (key, waiters) in self.waiters.iter_mut() {
605            if waiters.remove_waiter(event) {
606                if waiters.is_empty() {
607                    key_to_remove = Some(key.clone());
608                }
609                break;
610            }
611        }
612        if let Some(key) = key_to_remove {
613            self.waiters.remove(&key);
614        }
615    }
616
617    /// Removes a `FUTEX_WAIT_BITSET` waiter (with `FUTEX_CLOCK_REALTIME`).
618    ///
619    /// Like `remove_waiter_from_queue`, it tries the fast O(1) lookup on the original `key` first,
620    /// and falls back to an O(N) scan across all queues in case of a requeue.
621    fn remove_boot_waiter_from_queue(&mut self, key: Key, waiter: &Arc<Waiter>) {
622        if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
623            if entry.get_mut().remove_boot_waiter(waiter) {
624                if entry.get().is_empty() {
625                    entry.remove();
626                }
627                return;
628            }
629        }
630
631        let mut key_to_remove = None;
632        for (key, waiters) in self.waiters.iter_mut() {
633            if waiters.remove_boot_waiter(waiter) {
634                if waiters.is_empty() {
635                    key_to_remove = Some(key.clone());
636                }
637                break;
638            }
639        }
640        if let Some(key) = key_to_remove {
641            self.waiters.remove(&key);
642        }
643    }
644
645    /// Removes a PI-mutex (`FUTEX_LOCK_PI`) waiter.
646    ///
647    /// Operates on the separate `rt_mutex_waiters` map using the same two-step
648    /// O(1)/O(N) algorithm as the other removal methods to handle edge cases where
649    /// PI-mutexes might be requeued (e.g. if `FUTEX_CMP_REQUEUE_PI` is used).
650    fn remove_rt_mutex_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
651        let predicate =
652            |w: &RtMutexWaiter| !w.notifiable.matches_event(event) && !w.notifiable.is_stale();
653
654        if let Entry::Occupied(mut entry) = self.rt_mutex_waiters.entry(key) {
655            let len_before = entry.get().len();
656            entry.get_mut().retain(predicate);
657            if entry.get().len() < len_before {
658                if entry.get().is_empty() {
659                    entry.remove();
660                }
661                return;
662            }
663        }
664
665        let mut key_to_remove = None;
666        for (key, waiters) in self.rt_mutex_waiters.iter_mut() {
667            let len_before = waiters.len();
668            waiters.retain(predicate);
669            if waiters.len() < len_before {
670                if waiters.is_empty() {
671                    key_to_remove = Some(key.clone());
672                }
673                break;
674            }
675        }
676        if let Some(key) = key_to_remove {
677            self.rt_mutex_waiters.remove(&key);
678        }
679    }
680}
681
682/// Abstraction over a process waiting on a Futex that can be notified.
683enum FutexNotifiable {
684    /// An internal process waiting on a Futex.
685    Internal(Weak<InterruptibleEvent>),
686    // An internal process waiting on a Futex with a boot deadline.
687    InternalBoot(Weak<Waiter>),
688    /// An external process waiting on a Futex.
689    // The sender needs to be an option so that one can send the notification while only holding a
690    // mut reference on the ExternalWaiter.
691    External(Weak<()>, Option<oneshot::Sender<()>>),
692}
693
694impl FutexNotifiable {
695    fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
696        Self::Internal(event)
697    }
698
699    fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
700        Self::InternalBoot(waiter)
701    }
702
703    fn new_external(token: Weak<()>, sender: oneshot::Sender<()>) -> Self {
704        Self::External(token, Some(sender))
705    }
706
707    /// Tries to notify the process. Returns `true` is the process have been notified. Returns
708    /// `false` otherwise. This means the process is stale and will never be available again.
709    fn notify(&mut self) -> bool {
710        match self {
711            Self::Internal(event) => {
712                if let Some(event) = event.upgrade() {
713                    event.notify();
714                    true
715                } else {
716                    false
717                }
718            }
719            Self::InternalBoot(waiter) => {
720                if let Some(waiter) = waiter.upgrade() {
721                    waiter.notify();
722                    true
723                } else {
724                    false
725                }
726            }
727            Self::External(_, sender) => {
728                if let Some(sender) = sender.take() {
729                    sender.send(()).is_ok()
730                } else {
731                    false
732                }
733            }
734        }
735    }
736
737    fn matches_event(&self, event: &Arc<InterruptibleEvent>) -> bool {
738        match self {
739            Self::Internal(weak) => {
740                if let Some(strong) = weak.upgrade() {
741                    Arc::ptr_eq(&strong, event)
742                } else {
743                    false
744                }
745            }
746            _ => false,
747        }
748    }
749
750    fn matches_waiter(&self, waiter: &Arc<Waiter>) -> bool {
751        match self {
752            Self::InternalBoot(weak) => {
753                if let Some(strong) = weak.upgrade() {
754                    Arc::ptr_eq(&strong, waiter)
755                } else {
756                    false
757                }
758            }
759            _ => false,
760        }
761    }
762
763    fn is_stale(&self) -> bool {
764        match self {
765            Self::Internal(weak) => weak.strong_count() == 0,
766            Self::External(weak, _) => weak.strong_count() == 0,
767            Self::InternalBoot(weak) => weak.strong_count() == 0,
768        }
769    }
770}
771
772struct FutexWaiter {
773    mask: u32,
774    notifiable: FutexNotifiable,
775}
776
777#[derive(Default)]
778struct FutexWaiters(VecDeque<FutexWaiter>);
779
780impl FutexWaiters {
781    fn add(&mut self, waiter: FutexWaiter) {
782        self.0.push_back(waiter);
783    }
784
785    fn notify(&mut self, mask: u32, count: usize) -> usize {
786        let mut woken = 0;
787        self.0.retain_mut(|waiter| {
788            if woken == count || waiter.mask & mask == 0 {
789                return true;
790            }
791            // The send will fail if the receiver is gone, which means nothing was actualling
792            // waiting on the futex.
793            if waiter.notifiable.notify() {
794                woken += 1;
795            }
796            false
797        });
798        woken
799    }
800
801    fn transfer(&mut self, mut other: Self) {
802        self.0.append(&mut other.0);
803    }
804
805    fn is_empty(&self) -> bool {
806        self.0.is_empty()
807    }
808
809    fn remove_waiter(&mut self, event: &Arc<InterruptibleEvent>) -> bool {
810        let initial_len = self.0.len();
811        self.0.retain(|w| !w.notifiable.matches_event(event) && !w.notifiable.is_stale());
812        self.0.len() < initial_len
813    }
814
815    fn remove_boot_waiter(&mut self, waiter: &Arc<Waiter>) -> bool {
816        let initial_len = self.0.len();
817        self.0.retain(|w| !w.notifiable.matches_waiter(waiter) && !w.notifiable.is_stale());
818        self.0.len() < initial_len
819    }
820
821    fn split_for_requeue(&mut self, count: usize) -> Self {
822        let count = std::cmp::min(count, self.0.len());
823        let tail = self.0.split_off(count);
824        let head = std::mem::replace(&mut self.0, tail);
825        FutexWaiters(head)
826    }
827}
828
829struct RtMutexWaiter {
830    /// The tid, possibly with the FUTEX_WAITERS bit set.
831    tid: u32,
832
833    notifiable: FutexNotifiable,
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839    use starnix_sync::InterruptibleEvent;
840    use starnix_uapi::restricted_aspace::RESTRICTED_ASPACE_BASE;
841    use starnix_uapi::user_address::UserAddress;
842
843    #[fuchsia::test]
844    fn test_remove_waiter_simple() {
845        let mut state = FutexTableState::<PrivateFutexKey>::default();
846        let key = PrivateFutexKey {
847            addr: FutexAddress::try_from(UserAddress::from(
848                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
849            ))
850            .unwrap(),
851        };
852        let event = Arc::new(InterruptibleEvent::new());
853
854        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
855            mask: u32::MAX,
856            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
857        });
858
859        assert_eq!(state.waiters.len(), 1);
860        state.remove_waiter_from_queue(key, &event);
861        assert_eq!(state.waiters.len(), 0);
862    }
863
864    #[fuchsia::test]
865    fn test_remove_waiter_requeued() {
866        let mut state = FutexTableState::<PrivateFutexKey>::default();
867        let key1 = PrivateFutexKey {
868            addr: FutexAddress::try_from(UserAddress::from(
869                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
870            ))
871            .unwrap(),
872        };
873        let key2 = PrivateFutexKey {
874            addr: FutexAddress::try_from(UserAddress::from(
875                (RESTRICTED_ASPACE_BASE + 0x2000) as u64,
876            ))
877            .unwrap(),
878        };
879        let event = Arc::new(InterruptibleEvent::new());
880
881        state.get_waiters_or_default(key2.clone()).add(FutexWaiter {
882            mask: u32::MAX,
883            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
884        });
885
886        assert_eq!(state.waiters.len(), 1);
887        state.remove_waiter_from_queue(key1, &event);
888        assert_eq!(state.waiters.len(), 0);
889    }
890
891    #[fuchsia::test]
892    fn test_remove_rt_mutex_waiter() {
893        let mut state = FutexTableState::<PrivateFutexKey>::default();
894        let key = PrivateFutexKey {
895            addr: FutexAddress::try_from(UserAddress::from(
896                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
897            ))
898            .unwrap(),
899        };
900        let event = Arc::new(InterruptibleEvent::new());
901
902        state.get_rt_mutex_waiters_or_default(key.clone()).push_back(RtMutexWaiter {
903            tid: 1,
904            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
905        });
906
907        assert_eq!(state.rt_mutex_waiters.len(), 1);
908        state.remove_rt_mutex_waiter_from_queue(key, &event);
909        assert_eq!(state.rt_mutex_waiters.len(), 0);
910    }
911
912    #[fuchsia::test]
913    fn test_split_for_requeue_fairness() {
914        let mut waiters = FutexWaiters::default();
915        let e1 = Arc::new(InterruptibleEvent::new());
916        let e2 = Arc::new(InterruptibleEvent::new());
917        let e3 = Arc::new(InterruptibleEvent::new());
918
919        waiters.add(FutexWaiter {
920            mask: 1,
921            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e1)),
922        });
923        waiters.add(FutexWaiter {
924            mask: 2,
925            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e2)),
926        });
927        waiters.add(FutexWaiter {
928            mask: 3,
929            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e3)),
930        });
931
932        let split = waiters.split_for_requeue(2);
933
934        assert_eq!(split.0.len(), 2);
935        assert_eq!(split.0[0].mask, 1);
936        assert_eq!(split.0[1].mask, 2);
937
938        assert_eq!(waiters.0.len(), 1);
939        assert_eq!(waiters.0[0].mask, 3);
940    }
941
942    #[fuchsia::test]
943    fn test_stale_external_waiter_cleanup() {
944        let mut state = FutexTableState::<PrivateFutexKey>::default();
945        let key = PrivateFutexKey {
946            addr: FutexAddress::try_from(UserAddress::from(
947                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
948            ))
949            .unwrap(),
950        };
951
952        {
953            let token = Arc::new(());
954            let (sender, _receiver) = oneshot::channel::<()>();
955            state.get_waiters_or_default(key.clone()).add(FutexWaiter {
956                mask: u32::MAX,
957                notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
958            });
959        } // token is dropped here, so it becomes stale
960
961        assert_eq!(state.waiters.len(), 1);
962
963        // Trigger a cleanup with a placeholder event
964        let dummy_event = Arc::new(InterruptibleEvent::new());
965        state.remove_waiter_from_queue(key, &dummy_event);
966
967        assert_eq!(state.waiters.len(), 0, "Stale external waiter should be removed");
968    }
969}