starnix_core/mm/
futex_table.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::memory::MemoryObject;
6use crate::mm::{CompareExchangeResult, ProtectionFlags};
7use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter};
8use futures::channel::oneshot;
9use starnix_sync::{InterruptibleEvent, LockBefore, Locked, OrderedMutex, TerminalLock, Unlocked};
10use starnix_types::futex_address::FutexAddress;
11use starnix_uapi::errors::Errno;
12use starnix_uapi::user_address::UserAddress;
13use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, VecDeque};
16use std::hash::Hash;
17use std::sync::{Arc, Weak};
18
19/// A table of futexes.
20///
21/// Each 32-bit aligned address in an address space can potentially have an associated futex that
22/// userspace can wait upon. This table is a sparse representation that has an actual WaitQueue
23/// only for those addresses that have ever actually had a futex operation performed on them.
24pub struct FutexTable<Key: FutexKey> {
25    /// The futexes associated with each address in each VMO.
26    ///
27    /// This HashMap is populated on-demand when futexes are used.
28    state: OrderedMutex<FutexTableState<Key>, TerminalLock>,
29}
30
31impl<Key: FutexKey> Default for FutexTable<Key> {
32    fn default() -> Self {
33        Self { state: OrderedMutex::new(FutexTableState::default()) }
34    }
35}
36
37impl<Key: FutexKey> FutexTable<Key> {
38    /// Wait on the futex at the given address given a boot deadline.
39    ///
40    /// See FUTEX_WAIT when passed a deadline in CLOCK_REALTIME.
41    pub fn wait_boot(
42        &self,
43        locked: &mut Locked<Unlocked>,
44        current_task: &CurrentTask,
45        addr: UserAddress,
46        value: u32,
47        mask: u32,
48        deadline: zx::BootInstant,
49        timer_slack: zx::BootDuration,
50    ) -> Result<(), Errno> {
51        let addr = FutexAddress::try_from(addr)?;
52        let mut state = self.state.lock(locked);
53        // As the state is locked, no wake can happen before the waiter is registered.
54        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
55        // Acquire ordering to synchronize with userspace modifications to the value on other
56        // threads.
57        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
58        if value != loaded_value {
59            return error!(EAGAIN);
60        }
61
62        let key = Key::get(current_task, addr)?;
63        let waiter = Arc::new(Waiter::new());
64        let timer = zx::BootTimer::create();
65        let signal_handler = SignalHandler {
66            inner: SignalHandlerInner::None,
67            event_handler: EventHandler::None,
68            err_code: Some(errno!(ETIMEDOUT)),
69        };
70        waiter
71            .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler)
72            .expect("wait can only fail in OOM conditions");
73        timer
74            .set(deadline, timer_slack)
75            .expect("timer set cannot fail with valid handles and slack");
76        state.get_waiters_or_default(key).add(FutexWaiter {
77            mask,
78            notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)),
79        });
80        std::mem::drop(state);
81        waiter.wait(locked, current_task)
82    }
83
84    /// Wait on the futex at the given address.
85    ///
86    /// See FUTEX_WAIT.
87    pub fn wait<L>(
88        &self,
89        locked: &mut Locked<L>,
90        current_task: &CurrentTask,
91        addr: UserAddress,
92        value: u32,
93        mask: u32,
94        deadline: zx::MonotonicInstant,
95    ) -> Result<(), Errno>
96    where
97        L: LockBefore<TerminalLock>,
98    {
99        let addr = FutexAddress::try_from(addr)?;
100        let mut state = self.state.lock(locked);
101        // As the state is locked, no wake can happen before the waiter is registered.
102        // If the addr is remapped, we will read stale data, but we will not miss a futex wake.
103        // Acquire ordering to synchronize with userspace modifications to the value on other
104        // threads.
105        let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
106        if value != loaded_value {
107            return error!(EAGAIN);
108        }
109
110        let key = Key::get(current_task, addr)?;
111        let event = InterruptibleEvent::new();
112        let guard = event.begin_wait();
113        state.get_waiters_or_default(key).add(FutexWaiter {
114            mask,
115            notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)),
116        });
117        std::mem::drop(state);
118
119        current_task.block_until(guard, deadline)
120    }
121
122    /// Wake the given number of waiters on futex at the given address. Returns the number of
123    /// waiters actually woken.
124    ///
125    /// See FUTEX_WAKE.
126    pub fn wake<L>(
127        &self,
128        locked: &mut Locked<L>,
129        task: &Task,
130        addr: UserAddress,
131        count: usize,
132        mask: u32,
133    ) -> Result<usize, Errno>
134    where
135        L: LockBefore<TerminalLock>,
136    {
137        let addr = FutexAddress::try_from(addr)?;
138        let key = Key::get(task, addr)?;
139        Ok(self.state.lock(locked).wake(key, count, mask))
140    }
141
142    /// Requeue the waiters to another address.
143    ///
144    /// See FUTEX_CMP_REQUEUE
145    pub fn requeue<L>(
146        &self,
147        locked: &mut Locked<L>,
148        current_task: &CurrentTask,
149        addr: UserAddress,
150        wake_count: usize,
151        requeue_count: usize,
152        new_addr: UserAddress,
153        expected_value: Option<u32>,
154    ) -> Result<usize, Errno>
155    where
156        L: LockBefore<TerminalLock>,
157    {
158        let addr = FutexAddress::try_from(addr)?;
159        let new_addr = FutexAddress::try_from(new_addr)?;
160        let key = Key::get(current_task, addr)?;
161        let new_key = Key::get(current_task, new_addr)?;
162        let mut state = self.state.lock(locked);
163        if let Some(expected) = expected_value {
164            // Use acquire ordering here to synchronize with mutex impls that store w/ release
165            // ordering.
166            let value = current_task.mm()?.atomic_load_u32_acquire(addr)?;
167            if value != expected {
168                return error!(EAGAIN);
169            }
170        }
171
172        let woken;
173        let to_requeue;
174        match state.waiters.entry(key) {
175            Entry::Vacant(_) => return Ok(0),
176            Entry::Occupied(mut entry) => {
177                // Wake up at most `wake_count` waiters.
178                woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count);
179
180                // Dequeue up to `requeue_count` waiters to requeue below.
181                to_requeue = entry.get_mut().split_for_requeue(requeue_count);
182
183                if entry.get().is_empty() {
184                    entry.remove();
185                }
186            }
187        }
188
189        let requeued = to_requeue.0.len();
190        if !to_requeue.is_empty() {
191            state.get_waiters_or_default(new_key).transfer(to_requeue);
192        }
193
194        Ok(woken + requeued)
195    }
196
197    /// Lock the futex at the given address.
198    ///
199    /// See FUTEX_LOCK_PI.
200    pub fn lock_pi<L>(
201        &self,
202        locked: &mut Locked<L>,
203        current_task: &CurrentTask,
204        addr: UserAddress,
205        deadline: zx::MonotonicInstant,
206    ) -> Result<(), Errno>
207    where
208        L: LockBefore<TerminalLock>,
209    {
210        let addr = FutexAddress::try_from(addr)?;
211        let mut state = self.state.lock(locked);
212        // As the state is locked, no unlock can happen before the waiter is registered.
213        // If the addr is remapped, we will read stale data, but we will not miss a futex unlock.
214        let key = Key::get(current_task, addr)?;
215
216        let tid = current_task.get_tid() as u32;
217        let mm = current_task.mm()?;
218
219        // Use a relaxed ordering because the compare/exchange below creates a synchronization
220        // point with userspace threads in the success case. No synchronization is required in
221        // failure cases.
222        let mut current_value = mm.atomic_load_u32_relaxed(addr)?;
223        let new_owner_tid = loop {
224            let new_owner_tid = current_value & FUTEX_TID_MASK;
225            if new_owner_tid == tid {
226                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
227                //
228                //   EDEADLK
229                //          (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
230                //          FUTEX_CMP_REQUEUE_PI) The futex word at uaddr is already
231                //          locked by the caller.
232                return error!(EDEADLOCK);
233            }
234
235            if current_value == 0 {
236                // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops
237                // and with the release ordering on userspace unlock ops.
238                match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) {
239                    CompareExchangeResult::Success => return Ok(()),
240                    CompareExchangeResult::Stale { observed } => {
241                        current_value = observed;
242                        continue;
243                    }
244                    CompareExchangeResult::Error(e) => return Err(e),
245                }
246            }
247
248            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
249            // with the release ordering on userspace unlock ops.
250            let target_value = current_value | FUTEX_WAITERS;
251            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
252                CompareExchangeResult::Success => (),
253                CompareExchangeResult::Stale { observed } => {
254                    current_value = observed;
255                    continue;
256                }
257                CompareExchangeResult::Error(e) => return Err(e),
258            }
259            break new_owner_tid;
260        };
261
262        let event = InterruptibleEvent::new();
263        let guard = event.begin_wait();
264        let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event));
265        state.get_rt_mutex_waiters_or_default(key).push_back(RtMutexWaiter { tid, notifiable });
266        std::mem::drop(state);
267
268        // ESRCH  (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
269        //        FUTEX_CMP_REQUEUE_PI) The thread ID in the futex word at
270        //        uaddr does not exist.
271        let new_owner = current_task
272            .get_task(new_owner_tid as i32)
273            .upgrade()
274            .map(|o| o.thread.read().as_ref().map(Arc::clone))
275            .flatten()
276            .ok_or_else(|| errno!(ESRCH))?;
277        current_task.block_with_owner_until(guard, &new_owner, deadline)
278    }
279
280    /// Unlock the futex at the given address.
281    ///
282    /// See FUTEX_UNLOCK_PI.
283    pub fn unlock_pi<L>(
284        &self,
285        locked: &mut Locked<L>,
286        current_task: &CurrentTask,
287        addr: UserAddress,
288    ) -> Result<(), Errno>
289    where
290        L: LockBefore<TerminalLock>,
291    {
292        let addr = FutexAddress::try_from(addr)?;
293        let mut state = self.state.lock(locked);
294        let tid = current_task.get_tid() as u32;
295        let mm = current_task.mm()?;
296
297        let key = Key::get(current_task, addr)?;
298
299        // Use a relaxed ordering because the compare/exchange below creates a synchronization
300        // point with userspace threads in the success case. No synchronization is required in
301        // failure cases.
302        let current_value = mm.atomic_load_u32_relaxed(addr)?;
303        if current_value & FUTEX_TID_MASK != tid {
304            // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
305            //
306            //   EPERM  (FUTEX_UNLOCK_PI) The caller does not own the lock
307            //          represented by the futex word.
308            return error!(EPERM);
309        }
310
311        loop {
312            let maybe_waiter = state.pop_rt_mutex_waiter(key.clone());
313            let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 };
314
315            // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and
316            // with the release ordering on userspace unlock ops.
317            match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) {
318                CompareExchangeResult::Success => (),
319                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
320                //
321                //   EINVAL (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI,
322                //       FUTEX_UNLOCK_PI) The kernel detected an inconsistency
323                //       between the user-space state at uaddr and the kernel
324                //       state.  This indicates either state corruption or that the
325                //       kernel found a waiter on uaddr which is waiting via
326                //       FUTEX_WAIT or FUTEX_WAIT_BITSET.
327                CompareExchangeResult::Stale { .. } => return error!(EINVAL),
328                // From <https://man7.org/linux/man-pages/man2/futex.2.html>:
329                //
330                //   EACCES No read access to the memory of a futex word.
331                CompareExchangeResult::Error(_) => return error!(EACCES),
332            }
333
334            let Some(mut waiter) = maybe_waiter else {
335                // We can stop trying to notify a thread if there are no more waiters.
336                break;
337            };
338
339            if waiter.notifiable.notify() {
340                break;
341            }
342
343            // If we couldn't notify the waiter, then we need to pull the next thread off the
344            // waiter list.
345        }
346
347        Ok(())
348    }
349}
350
351impl FutexTable<SharedFutexKey> {
352    /// Wait on the futex at the given offset in the memory.
353    ///
354    /// See FUTEX_WAIT.
355    pub fn external_wait<L>(
356        &self,
357        locked: &mut Locked<L>,
358        memory: MemoryObject,
359        offset: u64,
360        value: u32,
361        mask: u32,
362    ) -> Result<oneshot::Receiver<()>, Errno>
363    where
364        L: LockBefore<TerminalLock>,
365    {
366        let key = SharedFutexKey::new(&memory, offset);
367        let mut state = self.state.lock(locked);
368        // As the state is locked, no wake can happen before the waiter is registered.
369        Self::external_check_futex_value(&memory, offset, value)?;
370
371        let (sender, receiver) = oneshot::channel::<()>();
372        state
373            .get_waiters_or_default(key)
374            .add(FutexWaiter { mask, notifiable: FutexNotifiable::new_external(sender) });
375        Ok(receiver)
376    }
377
378    /// Wake the given number of waiters on futex at the given offset in the memory. Returns the
379    /// number of waiters actually woken.
380    ///
381    /// See FUTEX_WAKE.
382    pub fn external_wake<L>(
383        &self,
384        locked: &mut Locked<L>,
385        memory: MemoryObject,
386        offset: u64,
387        count: usize,
388        mask: u32,
389    ) -> Result<usize, Errno>
390    where
391        L: LockBefore<TerminalLock>,
392    {
393        Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask))
394    }
395
396    fn external_check_futex_value(
397        memory: &MemoryObject,
398        offset: u64,
399        value: u32,
400    ) -> Result<(), Errno> {
401        let loaded_value = {
402            // TODO: This read should be atomic.
403            let mut buf = [0u8; 4];
404            memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?;
405            u32::from_ne_bytes(buf)
406        };
407        if loaded_value != value {
408            return error!(EAGAIN);
409        }
410        Ok(())
411    }
412}
413
414pub trait FutexKey: Sized + Ord + Hash + Clone {
415    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>;
416    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>;
417}
418
419#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
420pub struct PrivateFutexKey {
421    addr: FutexAddress,
422}
423
424impl FutexKey for PrivateFutexKey {
425    fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
426        Ok(PrivateFutexKey { addr })
427    }
428
429    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
430        Ok(task.mm()?.futex.clone())
431    }
432}
433
434#[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)]
435pub struct SharedFutexKey {
436    // No chance of collisions since koids are never reused:
437    // https://fuchsia.dev/fuchsia-src/concepts/kernel/concepts#kernel_object_ids
438    koid: zx::Koid,
439    offset: u64,
440}
441
442impl FutexKey for SharedFutexKey {
443    fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> {
444        let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?;
445        Ok(SharedFutexKey::new(&memory, offset))
446    }
447
448    fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> {
449        Ok(task.kernel().shared_futexes.clone())
450    }
451}
452
453impl SharedFutexKey {
454    fn new(memory: &MemoryObject, offset: u64) -> Self {
455        Self { koid: memory.get_koid(), offset }
456    }
457}
458
459struct FutexTableState<Key: FutexKey> {
460    waiters: HashMap<Key, FutexWaiters>,
461    rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>,
462}
463
464impl<Key: FutexKey> Default for FutexTableState<Key> {
465    fn default() -> Self {
466        Self { waiters: Default::default(), rt_mutex_waiters: Default::default() }
467    }
468}
469
470impl<Key: FutexKey> FutexTableState<Key> {
471    /// Returns the FutexWaiters for a given address, creating an empty one if none is registered.
472    fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters {
473        self.waiters.entry(key).or_default()
474    }
475
476    fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize {
477        let entry = self.waiters.entry(key);
478        match entry {
479            Entry::Vacant(_) => 0,
480            Entry::Occupied(mut entry) => {
481                let count = entry.get_mut().notify(mask, count);
482                if entry.get().is_empty() {
483                    entry.remove();
484                }
485                count
486            }
487        }
488    }
489
490    /// Returns the RT-Mutex waiters queue for a given address, creating an empty queue if none is
491    /// registered.
492    fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> {
493        self.rt_mutex_waiters.entry(key).or_default()
494    }
495
496    /// Pop the next RT-Mutex for the given address.
497    fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> {
498        let entry = self.rt_mutex_waiters.entry(key);
499        match entry {
500            Entry::Vacant(_) => None,
501            Entry::Occupied(mut entry) => {
502                if let Some(mut waiter) = entry.get_mut().pop_front() {
503                    if entry.get().is_empty() {
504                        entry.remove();
505                    } else {
506                        waiter.tid |= FUTEX_WAITERS;
507                    }
508                    Some(waiter)
509                } else {
510                    None
511                }
512            }
513        }
514    }
515}
516
517/// Abstraction over a process waiting on a Futex that can be notified.
518enum FutexNotifiable {
519    /// An internal process waiting on a Futex.
520    Internal(Weak<InterruptibleEvent>),
521    // An internal process waiting on a Futex with a boot deadline.
522    InternalBoot(Weak<Waiter>),
523    /// An external process waiting on a Futex.
524    // The sender needs to be an option so that one can send the notification while only holding a
525    // mut reference on the ExternalWaiter.
526    External(Option<oneshot::Sender<()>>),
527}
528
529impl FutexNotifiable {
530    fn new_internal(event: Weak<InterruptibleEvent>) -> Self {
531        Self::Internal(event)
532    }
533
534    fn new_internal_boot(waiter: Weak<Waiter>) -> Self {
535        Self::InternalBoot(waiter)
536    }
537
538    fn new_external(sender: oneshot::Sender<()>) -> Self {
539        Self::External(Some(sender))
540    }
541
542    /// Tries to notify the process. Returns `true` is the process have been notified. Returns
543    /// `false` otherwise. This means the process is stale and will never be available again.
544    fn notify(&mut self) -> bool {
545        match self {
546            Self::Internal(event) => {
547                if let Some(event) = event.upgrade() {
548                    event.notify();
549                    true
550                } else {
551                    false
552                }
553            }
554            Self::InternalBoot(waiter) => {
555                if let Some(waiter) = waiter.upgrade() {
556                    waiter.notify();
557                    true
558                } else {
559                    false
560                }
561            }
562            Self::External(sender) => {
563                if let Some(sender) = sender.take() {
564                    sender.send(()).is_ok()
565                } else {
566                    false
567                }
568            }
569        }
570    }
571}
572
573struct FutexWaiter {
574    mask: u32,
575    notifiable: FutexNotifiable,
576}
577
578#[derive(Default)]
579struct FutexWaiters(VecDeque<FutexWaiter>);
580
581impl FutexWaiters {
582    fn add(&mut self, waiter: FutexWaiter) {
583        self.0.push_back(waiter);
584    }
585
586    fn notify(&mut self, mask: u32, count: usize) -> usize {
587        let mut woken = 0;
588        self.0.retain_mut(|waiter| {
589            if woken == count || waiter.mask & mask == 0 {
590                return true;
591            }
592            // The send will fail if the receiver is gone, which means nothing was actualling
593            // waiting on the futex.
594            if waiter.notifiable.notify() {
595                woken += 1;
596            }
597            false
598        });
599        woken
600    }
601
602    fn transfer(&mut self, mut other: Self) {
603        self.0.append(&mut other.0);
604    }
605
606    fn is_empty(&self) -> bool {
607        self.0.is_empty()
608    }
609
610    fn split_for_requeue(&mut self, max_count: usize) -> Self {
611        let pos = if max_count >= self.0.len() { 0 } else { self.0.len() - max_count };
612        FutexWaiters(self.0.split_off(pos))
613    }
614}
615
616struct RtMutexWaiter {
617    /// The tid, possibly with the FUTEX_WAITERS bit set.
618    tid: u32,
619
620    notifiable: FutexNotifiable,
621}