Skip to main content

starnix_core/mm/
futex_table.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{InterruptibleEvent, LockBefore, Locked, OrderedMutex, TerminalLock, Unlocked};
10use starnix_types::futex_address::FutexAddress;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::user_address::UserAddress;
13use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, VecDeque};
16use std::hash::Hash;
17use std::sync::{Arc, Weak};
18
19/// A table of futexes.
20///
21/// Each 32-bit aligned address in an address space can potentially have an associated futex that
22/// userspace can wait upon. This table is a sparse representation that has an actual WaitQueue
23/// only for those addresses that have ever actually had a futex operation performed on them.
24pub struct FutexTable<Key: FutexKey> {
25    /// The futexes associated with each address in each VMO.
26    ///
27    /// This HashMap is populated on-demand when futexes are used.
28    state: OrderedMutex<FutexTableState<Key>, TerminalLock>,
29}
30
31impl<Key: FutexKey> Default for FutexTable<Key> {
32    fn default() -> Self {
33        Self { state: OrderedMutex::new(FutexTableState::default()) }
34    }
35}
36
37impl<Key: FutexKey> FutexTable<Key> {
38    /// Wait on the futex at the given address given a boot deadline.
39    ///
40    /// See FUTEX_WAIT when passed a deadline in CLOCK_REALTIME.
41    pub fn wait_boot(
42        &self,
43        locked: &mut Locked<Unlocked>,
44        current_task: &CurrentTask,
45        addr: UserAddress,
46        value: u32,
47        mask: u32,
48        deadline: zx::BootInstant,
49        timer_slack: zx::BootDuration,
50    ) -> Result<(), Errno> {
51        let addr = FutexAddress::try_from(addr)?;
52        let mut state = self.state.lock(locked);
53        // As the state is locked, no wake can happen before the waiter is registered.
54        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
55        // Acquire ordering to synchronize with userspace modifications to the value on other
56        // threads.
57        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
58        if value != loaded_value {
59            return error!(EAGAIN);
60        }
61
62        let key = Key::get(current_task, addr)?;
63        let waiter = Arc::new(Waiter::new());
64        let timer = zx::BootTimer::create();
65        let signal_handler = SignalHandler {
66            inner: SignalHandlerInner::None,
67            event_handler: EventHandler::None,
68            err_code: Some(errno!(ETIMEDOUT)),
69        };
70        waiter
71            .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
72            .expect("wait can only fail in OOM conditions");
73        timer
74            .set(deadline, timer_slack)
75            .expect("timer set cannot fail with valid handles and slack");
76        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
77            mask,
78            notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
79        });
80        std::mem::drop(state);
81        waiter.wait(locked, current_task).inspect_err(|_| {
82            // If wait returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly
83            // remove our waiter from the queue to prevent a memory leak.
84            // If it succeeded, the waker has already removed us from the queue.
85            self.state.lock(locked).remove_boot_waiter_from_queue(key, &waiter);
86        })
87    }
88
89    /// Wait on the futex at the given address.
90    ///
91    /// See FUTEX_WAIT.
92    pub fn wait<L>(
93        &self,
94        locked: &mut Locked<L>,
95        current_task: &CurrentTask,
96        addr: UserAddress,
97        value: u32,
98        mask: u32,
99        deadline: zx::MonotonicInstant,
100    ) -> Result<(), Errno>
101    where
102        L: LockBefore<TerminalLock>,
103    {
104        let addr = FutexAddress::try_from(addr)?;
105        let mut state = self.state.lock(locked);
106        // As the state is locked, no wake can happen before the waiter is registered.
107        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
108        // Acquire ordering to synchronize with userspace modifications to the value on other
109        // threads.
110        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
111        if value != loaded_value {
112            return error!(EAGAIN);
113        }
114
115        let key = Key::get(current_task, addr)?;
116        let event = InterruptibleEvent::new();
117        let guard = event.begin_wait();
118        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
119            mask,
120            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
121        });
122        std::mem::drop(state);
123
124        current_task.block_until(guard, deadline).inspect_err(|_| {
125            // If block_until returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly
126            // remove our waiter from the queue to prevent a memory leak.
127            // If it succeeded, the waker has already removed us from the queue.
128            self.state.lock(locked).remove_waiter_from_queue(key, &event);
129        })
130    }
131
132    /// Wake the given number of waiters on futex at the given address. Returns the number of
133    /// waiters actually woken.
134    ///
135    /// See FUTEX_WAKE.
136    pub fn wake<L>(
137        &self,
138        locked: &mut Locked<L>,
139        task: &Task,
140        addr: UserAddress,
141        count: usize,
142        mask: u32,
143    ) -> Result<usize, Errno>
144    where
145        L: LockBefore<TerminalLock>,
146    {
147        let addr = FutexAddress::try_from(addr)?;
148        let key = Key::get(task, addr)?;
149        Ok(self.state.lock(locked).wake(key, count, mask))
150    }
151
152    /// Requeue the waiters to another address.
153    ///
154    /// See FUTEX_CMP_REQUEUE
155    pub fn requeue<L>(
156        &self,
157        locked: &mut Locked<L>,
158        current_task: &CurrentTask,
159        addr: UserAddress,
160        wake_count: usize,
161        requeue_count: usize,
162        new_addr: UserAddress,
163        expected_value: Option<u32>,
164    ) -> Result<usize, Errno>
165    where
166        L: LockBefore<TerminalLock>,
167    {
168        let addr = FutexAddress::try_from(addr)?;
169        let new_addr = FutexAddress::try_from(new_addr)?;
170        let key = Key::get(current_task, addr)?;
171        let new_key = Key::get(current_task, new_addr)?;
172        let mut state = self.state.lock(locked);
173        if let Some(expected) = expected_value {
174            // Use acquire ordering here to synchronize with mutex impls that store w/ release
175            // ordering.
176            let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
177            if value != expected {
178                return error!(EAGAIN);
179            }
180        }
181
182        let woken;
183        let to_requeue;
184        match state.waiters.entry(key) {
185            Entry::Vacant(_) => return Ok(0),
186            Entry::Occupied(mut entry) => {
187                // Wake up at most `wake_count` waiters.
188                woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
189
190                // Dequeue up to `requeue_count` waiters to requeue below.
191                to_requeue = entry.get_mut().split_for_requeue(requeue_count);
192
193                if entry.get().is_empty() {
194                    entry.remove();
195                }
196            }
197        }
198
199        let requeued = to_requeue.0.len();
200        if !to_requeue.is_empty() {
201            state.get_waiters_or_default(new_key).transfer(to_requeue);
202        }
203
204        Ok(woken + requeued)
205    }
206
207    /// Lock the futex at the given address.
208    ///
209    /// See FUTEX_LOCK_PI.
210    pub fn lock_pi<L>(
211        &self,
212        locked: &mut Locked<L>,
213        current_task: &CurrentTask,
214        addr: UserAddress,
215        deadline: zx::MonotonicInstant,
216    ) -> Result<(), Errno>
217    where
218        L: LockBefore<TerminalLock>,
219    {
220        let addr = FutexAddress::try_from(addr)?;
221        let mut state = self.state.lock(locked);
222        // As the state is locked, no unlock can happen before the waiter is registered.
223        // If the addr is remapped, we will read stale data, but we will not miss a futex unlock.
224        let key = Key::get(current_task, addr)?;
225
226        let tid = current_task.get_tid() as u32;
227        let mm = current_task.mm()?;
228
229        // Use a relaxed ordering because the compare/exchange below creates a synchronization
230        // point with userspace threads in the success case. No synchronization is required in
231        // failure cases.
232        let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
233        let new_owner_tid = loop {
234            let new_owner_tid = current_value & FUTEX_TID_MASK;
235            if new_owner_tid == tid {
236                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
237                //
238                //   EDEADLK
239                //          (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
240                //          FUTEX_CMP_REQUEUE_PI) The futex word at uaddr is already
241                //          locked by the caller.
242                return error!(EDEADLOCK);
243            }
244
245            if current_value == 0 {
246                // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops
247                // and with the release ordering on userspace unlock ops.
248                match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
249                    CompareExchangeResult::Success => return Ok(()),
250                    CompareExchangeResult::Stale { observed } => {
251                        current_value = observed;
252                        continue;
253                    }
254                    CompareExchangeResult::Error(e) => return Err(e),
255                }
256            }
257
258            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
259            // with the release ordering on userspace unlock ops.
260            let target_value = current_value | FUTEX_WAITERS;
261            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
262                CompareExchangeResult::Success => (),
263                CompareExchangeResult::Stale { observed } => {
264                    current_value = observed;
265                    continue;
266                }
267                CompareExchangeResult::Error(e) => return Err(e),
268            }
269            break new_owner_tid;
270        };
271
272        let event = InterruptibleEvent::new();
273        let guard = event.begin_wait();
274        let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
275        state
276            .get_rt_mutex_waiters_or_default(key.clone())
277            .push_back(RtMutexWaiter { tid, notifiable });
278        std::mem::drop(state);
279
280        // ESRCH  (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
281        //        FUTEX_CMP_REQUEUE_PI) The thread ID in the futex word at
282        //        uaddr does not exist.
283        current_task
284            .get_task(new_owner_tid as i32)
285            .upgrade()
286            .and_then(|o| o.thread.read().as_ref().map(Arc::clone))
287            .map_or_else(
288                || error!(ESRCH),
289                |owner| current_task.block_with_owner_until(guard, &owner, deadline),
290            )
291            .inspect_err(|_| {
292                // If block_with_owner_until returned an error (e.g., ETIMEDOUT), or if we
293                // failed to find the new owner (ESRCH), we must explicitly remove our waiter
294                // from the PI-mutex queue to prevent a memory leak.
295                self.state.lock(locked).remove_rt_mutex_waiter_from_queue(key, &event);
296            })
297    }
298
299    /// Unlock the futex at the given address.
300    ///
301    /// See FUTEX_UNLOCK_PI.
302    pub fn unlock_pi<L>(
303        &self,
304        locked: &mut Locked<L>,
305        current_task: &CurrentTask,
306        addr: UserAddress,
307    ) -> Result<(), Errno>
308    where
309        L: LockBefore<TerminalLock>,
310    {
311        let addr = FutexAddress::try_from(addr)?;
312        let mut state = self.state.lock(locked);
313        let tid = current_task.get_tid() as u32;
314        let mm = current_task.mm()?;
315
316        let key = Key::get(current_task, addr)?;
317
318        // Use a relaxed ordering because the compare/exchange below creates a synchronization
319        // point with userspace threads in the success case. No synchronization is required in
320        // failure cases.
321        let current_value = mm.atomic_load_u32_relaxed(addr)?;
322        if current_value & FUTEX_TID_MASK != tid {
323            // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
324            //
325            //   EPERM  (FUTEX_UNLOCK_PI) The caller does not own the lock
326            //          represented by the futex word.
327            return error!(EPERM);
328        }
329
330        loop {
331            let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
332            let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
333
334            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
335            // with the release ordering on userspace unlock ops.
336            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
337                CompareExchangeResult::Success => (),
338                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
339                //
340                //   EINVAL (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
341                //       FUTEX_UNLOCK_PI) The kernel detected an inconsistency
342                //       between the user-space state at uaddr and the kernel
343                //       state.  This indicates either state corruption or that the
344                //       kernel found a waiter on uaddr which is waiting via
345                //       FUTEX_WAIT or FUTEX_WAIT_BITSET.
346                CompareExchangeResult::Stale { .. } => return error!(EINVAL),
347                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
348                //
349                //   EACCES No read access to the memory of a futex word.
350                CompareExchangeResult::Error(_) => return error!(EACCES),
351            }
352
353            let Some(mut waiter) = maybe_waiter else {
354                // We can stop trying to notify a thread if there are no more waiters.
355                break;
356            };
357
358            if waiter.notifiable.notify() {
359                break;
360            }
361
362            // If we couldn't notify the waiter, then we need to pull the next thread off the
363            // waiter list.
364        }
365
366        Ok(())
367    }
368}
369
370impl FutexTable<SharedFutexKey> {
371    /// Wait on the futex at the given offset in the memory.
372    ///
373    /// Returns a receiver that will be signaled when the futex is woken, and an
374    /// `Arc<()>` token that must be kept alive by the caller for the duration of the
375    /// wait. If the caller drops the token (e.g., if the external client
376    /// disconnects), the waiter is marked as stale and will be garbage-collected by the
377    /// next futex operation on this table.
378    ///
379    /// See FUTEX_WAIT.
380    pub fn external_wait<L>(
381        &self,
382        locked: &mut Locked<L>,
383        memory: MemoryObject,
384        offset: u64,
385        value: u32,
386        mask: u32,
387    ) -> Result<(Arc<()>, oneshot::Receiver<()>), Errno>
388    where
389        L: LockBefore<TerminalLock>,
390    {
391        let key = SharedFutexKey::new(&memory, offset);
392        let mut state = self.state.lock(locked);
393        // As the state is locked, no wake can happen before the waiter is registered.
394        Self::external_check_futex_value(&memory, offset, value)?;
395
396        let token = Arc::new(());
397        let (sender, receiver) = oneshot::channel::<()>();
398        state.get_waiters_or_default(key).add(FutexWaiter {
399            mask,
400            notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
401        });
402        Ok((token, receiver))
403    }
404
405    /// Wake the given number of waiters on futex at the given offset in the memory. Returns the
406    /// number of waiters actually woken.
407    ///
408    /// See FUTEX_WAKE.
409    pub fn external_wake<L>(
410        &self,
411        locked: &mut Locked<L>,
412        memory: MemoryObject,
413        offset: u64,
414        count: usize,
415        mask: u32,
416    ) -> Result<usize, Errno>
417    where
418        L: LockBefore<TerminalLock>,
419    {
420        Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
421    }
422
423    fn external_check_futex_value(
424        memory: &MemoryObject,
425        offset: u64,
426        value: u32,
427    ) -> Result<(), Errno> {
428        let loaded_value = {
429            // TODO: This read should be atomic.
430            let mut buf = [0u8; 4];
431            memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
432            u32::from_ne_bytes(buf)
433        };
434        if loaded_value != value {
435            return error!(EAGAIN);
436        }
437        Ok(())
438    }
439}
440
441pub trait FutexKey: Sized + Ord + Hash + Clone {
442    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
443    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
444}
445
446#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
447pub struct PrivateFutexKey {
448    addr: FutexAddress,
449}
450
451impl FutexKey for PrivateFutexKey {
452    fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
453        Ok(PrivateFutexKey { addr })
454    }
455
456    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
457        Ok(task.mm()?.futex.clone())
458    }
459}
460
461#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
462pub struct SharedFutexKey {
463    // No chance of collisions since koids are never reused:
464    // https://fuchsia.dev/fuchsia-src/concepts/kernel/concepts#kernel_object_ids
465    koid: zx::Koid,
466    offset: u64,
467}
468
469impl FutexKey for SharedFutexKey {
470    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
471        let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
472        Ok(SharedFutexKey::new(&memory, offset))
473    }
474
475    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
476        Ok(task.kernel().shared_futexes.clone())
477    }
478}
479
480impl SharedFutexKey {
481    fn new(memory: &MemoryObject, offset: u64) -> Self {
482        Self { koid: memory.get_koid(), offset }
483    }
484}
485
486struct FutexTableState<Key: FutexKey> {
487    waiters: HashMap<Key, FutexWaiters>,
488    rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
489}
490
491impl<Key: FutexKey> Default for FutexTableState<Key> {
492    fn default() -> Self {
493        Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
494    }
495}
496
497impl<Key: FutexKey> FutexTableState<Key> {
498    /// Returns the FutexWaiters for a given address, creating an empty one if none is registered.
499    fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
500        self.waiters.entry(key).or_default()
501    }
502
503    fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
504        let entry = self.waiters.entry(key);
505        match entry {
506            Entry::Vacant(_) => 0,
507            Entry::Occupied(mut entry) => {
508                let count = entry.get_mut().notify(mask, count);
509                if entry.get().is_empty() {
510                    entry.remove();
511                }
512                count
513            }
514        }
515    }
516
517    /// Returns the RT-Mutex waiters queue for a given address, creating an empty queue if none is
518    /// registered.
519    fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
520        self.rt_mutex_waiters.entry(key).or_default()
521    }
522
523    /// Pop the next RT-Mutex for the given address.
524    fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
525        let entry = self.rt_mutex_waiters.entry(key);
526        match entry {
527            Entry::Vacant(_) => None,
528            Entry::Occupied(mut entry) => {
529                let mut waiter = entry.get_mut().pop_front();
530                // Clean up the hash map entry if the queue is empty. We do this
531                // regardless of whether `pop_front` returned a waiter or `None`,
532                // effectively garbage collecting erroneously empty map entries.
533                if entry.get().is_empty() {
534                    entry.remove();
535                } else if let Some(waiter) = &mut waiter {
536                    waiter.tid |= FUTEX_WAITERS;
537                }
538                waiter
539            }
540        }
541    }
542
543    /// Removes a standard `FUTEX_WAIT` waiter from the queue.
544    ///
545    /// This uses a two-step approach:
546    /// 1. O(1) Fast path: Check the `key` where the waiter originally went to sleep.
547    /// 2. O(N) Fallback: If not found (e.g. moved via `FUTEX_REQUEUE`), scan all futexes.
548    fn remove_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
549        if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
550            if entry.get_mut().remove_waiter(event) {
551                if entry.get().is_empty() {
552                    entry.remove();
553                }
554                return;
555            }
556        }
557
558        let mut key_to_remove = None;
559        for (key, waiters) in self.waiters.iter_mut() {
560            if waiters.remove_waiter(event) {
561                if waiters.is_empty() {
562                    key_to_remove = Some(key.clone());
563                }
564                break;
565            }
566        }
567        if let Some(key) = key_to_remove {
568            self.waiters.remove(&key);
569        }
570    }
571
572    /// Removes a `FUTEX_WAIT_BITSET` waiter (with `FUTEX_CLOCK_REALTIME`).
573    ///
574    /// Like `remove_waiter_from_queue`, it tries the fast O(1) lookup on the original `key` first,
575    /// and falls back to an O(N) scan across all queues in case of a requeue.
576    fn remove_boot_waiter_from_queue(&mut self, key: Key, waiter: &Arc<Waiter>) {
577        if let Entry::Occupied(mut entry) = self.waiters.entry(key) {
578            if entry.get_mut().remove_boot_waiter(waiter) {
579                if entry.get().is_empty() {
580                    entry.remove();
581                }
582                return;
583            }
584        }
585
586        let mut key_to_remove = None;
587        for (key, waiters) in self.waiters.iter_mut() {
588            if waiters.remove_boot_waiter(waiter) {
589                if waiters.is_empty() {
590                    key_to_remove = Some(key.clone());
591                }
592                break;
593            }
594        }
595        if let Some(key) = key_to_remove {
596            self.waiters.remove(&key);
597        }
598    }
599
600    /// Removes a PI-mutex (`FUTEX_LOCK_PI`) waiter.
601    ///
602    /// Operates on the separate `rt_mutex_waiters` map using the same two-step
603    /// O(1)/O(N) algorithm as the other removal methods to handle edge cases where
604    /// PI-mutexes might be requeued (e.g. if `FUTEX_CMP_REQUEUE_PI` is used).
605    fn remove_rt_mutex_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) {
606        let predicate =
607            |w: &RtMutexWaiter| !w.notifiable.matches_event(event) && !w.notifiable.is_stale();
608
609        if let Entry::Occupied(mut entry) = self.rt_mutex_waiters.entry(key) {
610            let len_before = entry.get().len();
611            entry.get_mut().retain(predicate);
612            if entry.get().len() < len_before {
613                if entry.get().is_empty() {
614                    entry.remove();
615                }
616                return;
617            }
618        }
619
620        let mut key_to_remove = None;
621        for (key, waiters) in self.rt_mutex_waiters.iter_mut() {
622            let len_before = waiters.len();
623            waiters.retain(predicate);
624            if waiters.len() < len_before {
625                if waiters.is_empty() {
626                    key_to_remove = Some(key.clone());
627                }
628                break;
629            }
630        }
631        if let Some(key) = key_to_remove {
632            self.rt_mutex_waiters.remove(&key);
633        }
634    }
635}
636
637/// Abstraction over a process waiting on a Futex that can be notified.
638enum FutexNotifiable {
639    /// An internal process waiting on a Futex.
640    Internal(Weak<InterruptibleEvent>),
641    // An internal process waiting on a Futex with a boot deadline.
642    InternalBoot(Weak<Waiter>),
643    /// An external process waiting on a Futex.
644    // The sender needs to be an option so that one can send the notification while only holding a
645    // mut reference on the ExternalWaiter.
646    External(Weak<()>, Option<oneshot::Sender<()>>),
647}
648
649impl FutexNotifiable {
650    fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
651        Self::Internal(event)
652    }
653
654    fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
655        Self::InternalBoot(waiter)
656    }
657
658    fn new_external(token: Weak<()>, sender: oneshot::Sender<()>) -> Self {
659        Self::External(token, Some(sender))
660    }
661
662    /// Tries to notify the process. Returns `true` is the process have been notified. Returns
663    /// `false` otherwise. This means the process is stale and will never be available again.
664    fn notify(&mut self) -> bool {
665        match self {
666            Self::Internal(event) => {
667                if let Some(event) = event.upgrade() {
668                    event.notify();
669                    true
670                } else {
671                    false
672                }
673            }
674            Self::InternalBoot(waiter) => {
675                if let Some(waiter) = waiter.upgrade() {
676                    waiter.notify();
677                    true
678                } else {
679                    false
680                }
681            }
682            Self::External(_, sender) => {
683                if let Some(sender) = sender.take() {
684                    sender.send(()).is_ok()
685                } else {
686                    false
687                }
688            }
689        }
690    }
691
692    fn matches_event(&self, event: &Arc<InterruptibleEvent>) -> bool {
693        match self {
694            Self::Internal(weak) => {
695                if let Some(strong) = weak.upgrade() {
696                    Arc::ptr_eq(&strong, event)
697                } else {
698                    false
699                }
700            }
701            _ => false,
702        }
703    }
704
705    fn matches_waiter(&self, waiter: &Arc<Waiter>) -> bool {
706        match self {
707            Self::InternalBoot(weak) => {
708                if let Some(strong) = weak.upgrade() {
709                    Arc::ptr_eq(&strong, waiter)
710                } else {
711                    false
712                }
713            }
714            _ => false,
715        }
716    }
717
718    fn is_stale(&self) -> bool {
719        match self {
720            Self::Internal(weak) => weak.strong_count() == 0,
721            Self::External(weak, _) => weak.strong_count() == 0,
722            Self::InternalBoot(weak) => weak.strong_count() == 0,
723        }
724    }
725}
726
727struct FutexWaiter {
728    mask: u32,
729    notifiable: FutexNotifiable,
730}
731
732#[derive(Default)]
733struct FutexWaiters(VecDeque<FutexWaiter>);
734
735impl FutexWaiters {
736    fn add(&mut self, waiter: FutexWaiter) {
737        self.0.push_back(waiter);
738    }
739
740    fn notify(&mut self, mask: u32, count: usize) -> usize {
741        let mut woken = 0;
742        self.0.retain_mut(|waiter| {
743            if woken == count || waiter.mask & mask == 0 {
744                return true;
745            }
746            // The send will fail if the receiver is gone, which means nothing was actualling
747            // waiting on the futex.
748            if waiter.notifiable.notify() {
749                woken += 1;
750            }
751            false
752        });
753        woken
754    }
755
756    fn transfer(&mut self, mut other: Self) {
757        self.0.append(&mut other.0);
758    }
759
760    fn is_empty(&self) -> bool {
761        self.0.is_empty()
762    }
763
764    fn remove_waiter(&mut self, event: &Arc<InterruptibleEvent>) -> bool {
765        let initial_len = self.0.len();
766        self.0.retain(|w| !w.notifiable.matches_event(event) && !w.notifiable.is_stale());
767        self.0.len() < initial_len
768    }
769
770    fn remove_boot_waiter(&mut self, waiter: &Arc<Waiter>) -> bool {
771        let initial_len = self.0.len();
772        self.0.retain(|w| !w.notifiable.matches_waiter(waiter) && !w.notifiable.is_stale());
773        self.0.len() < initial_len
774    }
775
776    fn split_for_requeue(&mut self, count: usize) -> Self {
777        let count = std::cmp::min(count, self.0.len());
778        let tail = self.0.split_off(count);
779        let head = std::mem::replace(&mut self.0, tail);
780        FutexWaiters(head)
781    }
782}
783
784struct RtMutexWaiter {
785    /// The tid, possibly with the FUTEX_WAITERS bit set.
786    tid: u32,
787
788    notifiable: FutexNotifiable,
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794    use starnix_sync::InterruptibleEvent;
795    use starnix_uapi::restricted_aspace::RESTRICTED_ASPACE_BASE;
796    use starnix_uapi::user_address::UserAddress;
797
798    #[fuchsia::test]
799    fn test_remove_waiter_simple() {
800        let mut state = FutexTableState::<PrivateFutexKey>::default();
801        let key = PrivateFutexKey {
802            addr: FutexAddress::try_from(UserAddress::from(
803                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
804            ))
805            .unwrap(),
806        };
807        let event = Arc::new(InterruptibleEvent::new());
808
809        state.get_waiters_or_default(key.clone()).add(FutexWaiter {
810            mask: u32::MAX,
811            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
812        });
813
814        assert_eq!(state.waiters.len(), 1);
815        state.remove_waiter_from_queue(key, &event);
816        assert_eq!(state.waiters.len(), 0);
817    }
818
819    #[fuchsia::test]
820    fn test_remove_waiter_requeued() {
821        let mut state = FutexTableState::<PrivateFutexKey>::default();
822        let key1 = PrivateFutexKey {
823            addr: FutexAddress::try_from(UserAddress::from(
824                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
825            ))
826            .unwrap(),
827        };
828        let key2 = PrivateFutexKey {
829            addr: FutexAddress::try_from(UserAddress::from(
830                (RESTRICTED_ASPACE_BASE + 0x2000) as u64,
831            ))
832            .unwrap(),
833        };
834        let event = Arc::new(InterruptibleEvent::new());
835
836        state.get_waiters_or_default(key2.clone()).add(FutexWaiter {
837            mask: u32::MAX,
838            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
839        });
840
841        assert_eq!(state.waiters.len(), 1);
842        state.remove_waiter_from_queue(key1, &event);
843        assert_eq!(state.waiters.len(), 0);
844    }
845
846    #[fuchsia::test]
847    fn test_remove_rt_mutex_waiter() {
848        let mut state = FutexTableState::<PrivateFutexKey>::default();
849        let key = PrivateFutexKey {
850            addr: FutexAddress::try_from(UserAddress::from(
851                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
852            ))
853            .unwrap(),
854        };
855        let event = Arc::new(InterruptibleEvent::new());
856
857        state.get_rt_mutex_waiters_or_default(key.clone()).push_back(RtMutexWaiter {
858            tid: 1,
859            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
860        });
861
862        assert_eq!(state.rt_mutex_waiters.len(), 1);
863        state.remove_rt_mutex_waiter_from_queue(key, &event);
864        assert_eq!(state.rt_mutex_waiters.len(), 0);
865    }
866
867    #[fuchsia::test]
868    fn test_split_for_requeue_fairness() {
869        let mut waiters = FutexWaiters::default();
870        let e1 = Arc::new(InterruptibleEvent::new());
871        let e2 = Arc::new(InterruptibleEvent::new());
872        let e3 = Arc::new(InterruptibleEvent::new());
873
874        waiters.add(FutexWaiter {
875            mask: 1,
876            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e1)),
877        });
878        waiters.add(FutexWaiter {
879            mask: 2,
880            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e2)),
881        });
882        waiters.add(FutexWaiter {
883            mask: 3,
884            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e3)),
885        });
886
887        let split = waiters.split_for_requeue(2);
888
889        assert_eq!(split.0.len(), 2);
890        assert_eq!(split.0[0].mask, 1);
891        assert_eq!(split.0[1].mask, 2);
892
893        assert_eq!(waiters.0.len(), 1);
894        assert_eq!(waiters.0[0].mask, 3);
895    }
896
897    #[fuchsia::test]
898    fn test_stale_external_waiter_cleanup() {
899        let mut state = FutexTableState::<PrivateFutexKey>::default();
900        let key = PrivateFutexKey {
901            addr: FutexAddress::try_from(UserAddress::from(
902                (RESTRICTED_ASPACE_BASE + 0x1000) as u64,
903            ))
904            .unwrap(),
905        };
906
907        {
908            let token = Arc::new(());
909            let (sender, _receiver) = oneshot::channel::<()>();
910            state.get_waiters_or_default(key.clone()).add(FutexWaiter {
911                mask: u32::MAX,
912                notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender),
913            });
914        } // token is dropped here, so it becomes stale
915
916        assert_eq!(state.waiters.len(), 1);
917
918        // Trigger a cleanup with a placeholder event
919        let dummy_event = Arc::new(InterruptibleEvent::new());
920        state.remove_waiter_from_queue(key, &dummy_event);
921
922        assert_eq!(state.waiters.len(), 0, "Stale external waiter should be removed");
923    }
924}