ebpf_api/maps/
ring_buffer.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::MapSchema;
10use linux_uapi::{
11    BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12    BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21// Signal used on ring buffer VMOs to indicate that the buffer has
22// incoming data.
23pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
24
25const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
26
27#[derive(Debug)]
28struct RingBufferState {
29    /// Address of the mapped ring buffer VMO.
30    base_addr: usize,
31
32    /// The mask corresponding to the size of the ring buffer. This is used to map back the
33    /// position in the ringbuffer (that are always growing) to their actual position in the memory
34    /// object.
35    mask: u32,
36}
37
38impl RingBufferState {
39    /// Pointer to the start of the data of the ring buffer.
40    fn data_addr(&self) -> usize {
41        self.base_addr + 3 * *MapBuffer::PAGE_SIZE
42    }
43
44    /// The never decreasing position of the read head of the ring buffer. This is updated
45    /// exclusively from userspace.
46    fn consumer_position(&self) -> &AtomicU32 {
47        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
48        // guarantees that the lock is acquired here.
49        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU32) }
50    }
51
52    /// The never decreasing position of the writing head of the ring buffer. This is updated
53    /// exclusively from the kernel.
54    fn producer_position(&self) -> &AtomicU32 {
55        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
56        // guarantees that the lock is acquired here.
57        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU32) }
58    }
59
60    /// Address of the specified `position` within the buffer.
61    fn data_position(&self, position: u32) -> usize {
62        self.data_addr() + ((position & self.mask) as usize)
63    }
64
65    fn is_consumer_position(&self, addr: usize) -> bool {
66        let Some(position) = addr.checked_sub(self.data_addr()) else {
67            return false;
68        };
69        let position = position as u32;
70        let consumer_position = self.consumer_position().load(Ordering::Acquire) & self.mask;
71        position == consumer_position
72    }
73
74    /// Access the memory at `position` as a `RingBufferRecordHeader`.
75    fn header_mut(&mut self, position: u32) -> &mut RingBufferRecordHeader {
76        // SAFETY
77        //
78        // Reading / writing to the header is safe because the access is exclusive thanks to the
79        // mutable reference to `self` and userspace has only a read only access to this memory.
80        #[allow(
81            clippy::undocumented_unsafe_blocks,
82            reason = "Force documented unsafe blocks in Starnix"
83        )]
84        unsafe {
85            &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
86        }
87    }
88}
89
90#[derive(Debug)]
91pub(crate) struct RingBuffer {
92    /// VMO used to store the map content. Reference-counted to make it possible to share the
93    /// handle with Starnix kernel, particularly for the case when a process needs to wait for
94    /// signals from the VMO (see RINGBUF_SIGNAL).
95    vmo: Arc<zx::Vmo>,
96
97    /// The specific memory address space used to map the ring buffer. This is the last field in
98    /// the struct so that all the data that conceptually points to it is destroyed before the
99    /// memory is unmapped.
100    vmar: AllocatedVmar,
101
102    /// The mask corresponding to the size of the ring buffer. It's used to map the positions
103    /// in the ringbuffer (that are always growing) to their actual position in the memory object.
104    mask: u32,
105}
106
107impl RingBuffer {
108    /// Build a new storage of a ring buffer. `size` must be a non zero multiple of the page size
109    /// and a power of 2.
110    ///
111    /// This will create a mapping in the kernel user space with the following layout:
112    ///
113    /// | T | L | C | P | D | D |
114    ///
115    /// where:
116    /// - T is 1 page containing at its 0 index a pointer to the `RingBuffer` itself.
117    /// - L is 1 page that stores a 32-bit lock state at offset 0. Hidden from user-space.
118    /// - C is 1 page containing at its 0 index a atomic u32 for the consumer position.
119    ///   Accessible in user-space for write.
120    /// - P is 1 page containing at its 0 index a atomic u32 for the producer position.
121    ///   Accessible in user-space for read-only access.
122    /// - D is size bytes and is the content of the ring buffer.
123    ///   Accessible in user-space for read-only access.
124    ///
125    /// All sections described above are stored in the shared VMO except for T.
126    ///
127    /// The returns value is a `Pin<Box>`, because the structure is self referencing and is
128    /// required never to move in memory.
129    pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
130        if schema.key_size != 0 || schema.value_size != 0 {
131            return Err(MapError::InvalidParam);
132        }
133
134        let page_size = *MapBuffer::PAGE_SIZE;
135        // Size must be a power of 2 and a multiple of page_size.
136        let size = schema.max_entries as usize;
137        if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
138            return Err(MapError::InvalidParam);
139        }
140        let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
141
142        // Technical VMO is mapped at the head of the VMAR used by the
143        // ring-buffer. It's used to store a pointer to the `RingBuffer`. This
144        // VMO is specific to this process.
145        let technical_vmo_size = page_size;
146
147        // Add 3 control pages at the head of the ring-buffer VMO.
148        let control_pages_size = 3 * page_size;
149        let vmo_size = control_pages_size + size;
150
151        let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
152        // SAFETY
153        //
154        // The returned value and all pointer to the allocated memory will be part of `Self` and
155        // all pointers will be dropped before the vmar. This ensures the deallocated memory will
156        // not be used after it has been freed.
157        #[allow(
158            clippy::undocumented_unsafe_blocks,
159            reason = "Force documented unsafe blocks in Starnix"
160        )]
161        let vmar = unsafe {
162            AllocatedVmar::allocate(
163                &kernel_root_vmar,
164                0,
165                // Allocate for one technical page, the control pages and twice the size.
166                technical_vmo_size + control_pages_size + 2 * size,
167                zx::VmarFlags::CAN_MAP_SPECIFIC
168                    | zx::VmarFlags::CAN_MAP_READ
169                    | zx::VmarFlags::CAN_MAP_WRITE,
170            )
171            .map_err(|_| MapError::Internal)?
172        };
173        let technical_vmo =
174            zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
175        technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
176        vmar.map(
177            0,
178            &technical_vmo,
179            0,
180            page_size,
181            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
182        )
183        .map_err(|_| MapError::Internal)?;
184
185        let vmo = match vmo.into() {
186            VmoOrName::Vmo(vmo) => {
187                let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
188                if vmo_size != actual_vmo_size {
189                    return Err(MapError::InvalidVmo);
190                }
191                vmo
192            }
193            VmoOrName::Name(name) => {
194                let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
195                    zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
196                    _ => MapError::Internal,
197                })?;
198                let name = format!("ebpf:ring_buffer:{name}");
199                vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
200                vmo
201            }
202        };
203
204        vmar.map(
205            technical_vmo_size,
206            &vmo,
207            0,
208            vmo_size,
209            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
210        )
211        .map_err(|_| MapError::Internal)?;
212
213        // Map the data again at the end.
214        vmar.map(
215            technical_vmo_size + vmo_size,
216            &vmo,
217            control_pages_size as u64,
218            size,
219            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
220        )
221        .map_err(|_| MapError::Internal)?;
222
223        // SAFETY
224        //
225        // This is safe as long as the vmar mapping stays alive. This will be ensured by the
226        // `RingBuffer` itself.
227        #[allow(
228            clippy::undocumented_unsafe_blocks,
229            reason = "Force documented unsafe blocks in Starnix"
230        )]
231        let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
232        let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
233        // Store the pointer to the storage to the start of the technical vmo. This is required to
234        // access the storage from the bpf methods that only get a pointer to the reserved memory.
235        // This is safe as the returned referenced is Pinned.
236        *storage_position = storage.deref();
237        Ok(storage)
238    }
239
240    fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
241        let page_size = *MapBuffer::PAGE_SIZE;
242
243        // Creates a `RwMapLock` that wraps `RingBufferState`.
244        //
245        // SAFETY: Lifetime of the lock is tied to the lifetime of `self`,
246        // which guarantees that the mapping is not destroyed for the lifetime
247        // of the result. The lock guarantees that the access to the
248        // `RingBufferState` is synchronized with other threads sharing the ring
249        // buffer.
250        unsafe {
251            let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
252            RwMapLock::new(
253                lock_cell,
254                self.vmo.as_handle_ref(),
255                RINGBUF_LOCK_SIGNAL,
256                RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
257            )
258        }
259    }
260
261    /// Commits the section of the ringbuffer represented by the `header`. This only consist in
262    /// updating the header length with the correct state bits and signaling the map fd.
263    fn commit(
264        &self,
265        header: &RingBufferRecordHeader,
266        flags: RingBufferWakeupPolicy,
267        discard: bool,
268    ) {
269        let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
270        if discard {
271            new_length |= BPF_RINGBUF_DISCARD_BIT;
272        }
273        header.length.store(new_length, Ordering::Release);
274
275        // Send a signal either if it is forced, or it is the default and the committed entry is
276        // the next one the client will consume.
277        let state = self.state().read();
278        if flags == RingBufferWakeupPolicy::ForceWakeup
279            || (flags == RingBufferWakeupPolicy::DefaultWakeup
280                && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
281        {
282            self.vmo
283                .as_handle_ref()
284                .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
285                .expect("Failed to set signal or a ring buffer VMO");
286        }
287    }
288
289    /// Submit the data.
290    ///
291    /// # Safety
292    ///
293    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
294    /// on a map that has not been dropped, otherwise the behaviour is UB.
295    pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
296        let addr = addr as usize;
297        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
298        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
299        ringbuf_storage.commit(header, flags, false);
300    }
301
302    /// Discard the data.
303    ///
304    /// # Safety
305    ///
306    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
307    /// on a map that has not been dropped, otherwise the behaviour is UB.
308    pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
309        let addr = addr as usize;
310        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
311        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
312        ringbuf_storage.commit(header, flags, true);
313    }
314
315    /// Get the `RingBufferImpl` and the `RingBufferRecordHeader` associated with `addr`.
316    ///
317    /// # Safety
318    ///
319    /// `addr` must be the value returned from a previous call to `ringbuf_reserve` on a `Map` that
320    /// has not been dropped and is kept alive as long as the returned value are used.
321    unsafe fn get_ringbug_and_header_by_addr(
322        addr: usize,
323    ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
324        let page_size = *MapBuffer::PAGE_SIZE;
325        // addr is the data section. First access the header.
326        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
327        let header = unsafe {
328            &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
329                as *const RingBufferRecordHeader)
330        };
331        let addr_page = addr / page_size;
332        let mapping_start_page = addr_page - header.page_count as usize - 1;
333        let mapping_start_address = mapping_start_page * page_size;
334        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
335        let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
336        (ringbuf_impl, header)
337    }
338}
339
340impl MapImpl for RingBuffer {
341    fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
342        None
343    }
344
345    fn update(&self, _key: MapKey, _value: &[u8], _flags: u64) -> Result<(), MapError> {
346        Err(MapError::InvalidParam)
347    }
348
349    fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
350        Err(MapError::InvalidParam)
351    }
352
353    fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
354        Err(MapError::InvalidParam)
355    }
356
357    fn vmo(&self) -> &Arc<zx::Vmo> {
358        &self.vmo
359    }
360
361    fn can_read(&self) -> Option<bool> {
362        let mut state = self.state().write();
363        let consumer_position = state.consumer_position().load(Ordering::Acquire);
364        let producer_position = state.producer_position().load(Ordering::Acquire);
365
366        // Read the header at the consumer position, and check that the entry is not busy.
367        let can_read = consumer_position < producer_position
368            && ((*state.header_mut(producer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
369                == 0);
370        Some(can_read)
371    }
372
373    fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
374        if flags != 0 {
375            return Err(MapError::InvalidParam);
376        }
377
378        //  The top two bits are used as special flags.
379        if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
380            return Err(MapError::InvalidParam);
381        }
382
383        let mut state = self.state().write();
384        let consumer_position = state.consumer_position().load(Ordering::Acquire);
385        let producer_position = state.producer_position().load(Ordering::Acquire);
386        let max_size = self.mask + 1;
387
388        // Available size on the ringbuffer.
389        let consumed_size =
390            producer_position.checked_sub(consumer_position).ok_or(MapError::InvalidParam)?;
391        let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
392
393        const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
394
395        // Total size of the message to write. This is the requested size + the header, rounded up
396        // to align the next header.
397        let total_size: u32 = (size + BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1) / HEADER_ALIGNMENT
398            * HEADER_ALIGNMENT;
399
400        if total_size > available_size {
401            return Err(MapError::SizeLimit);
402        }
403        let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
404        let data_length = size | BPF_RINGBUF_BUSY_BIT;
405        let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
406            .try_into()
407            .map_err(|_| MapError::SizeLimit)?;
408        let header = state.header_mut(producer_position);
409        *header.length.get_mut() = data_length;
410        header.page_count = page_count;
411        state.producer_position().store(producer_position + total_size, Ordering::Release);
412        Ok(data_position)
413    }
414}
415
416#[repr(u32)]
417#[derive(Clone, Copy, Debug, PartialEq, Eq)]
418pub(crate) enum RingBufferWakeupPolicy {
419    DefaultWakeup = 0,
420    NoWakeup = BPF_RB_NO_WAKEUP,
421    ForceWakeup = BPF_RB_FORCE_WAKEUP,
422}
423
424impl From<u32> for RingBufferWakeupPolicy {
425    fn from(v: u32) -> Self {
426        match v {
427            BPF_RB_NO_WAKEUP => Self::NoWakeup,
428            BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
429            // If flags is invalid, use the default value. This is necessary to prevent userspace
430            // leaking ringbuf value by calling into the kernel with an incorrect flag value.
431            _ => Self::DefaultWakeup,
432        }
433    }
434}
435
436#[repr(C)]
437#[repr(align(8))]
438#[derive(Debug)]
439struct RingBufferRecordHeader {
440    length: AtomicU32,
441    page_count: u32,
442}
443
444const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
445
446#[cfg(test)]
447mod test {
448    use super::*;
449
450    #[fuchsia::test]
451    fn test_ring_buffer_wakeup_policy() {
452        assert_eq!(RingBufferWakeupPolicy::from(0), RingBufferWakeupPolicy::DefaultWakeup);
453        assert_eq!(
454            RingBufferWakeupPolicy::from(BPF_RB_NO_WAKEUP),
455            RingBufferWakeupPolicy::NoWakeup
456        );
457        assert_eq!(
458            RingBufferWakeupPolicy::from(BPF_RB_FORCE_WAKEUP),
459            RingBufferWakeupPolicy::ForceWakeup
460        );
461        assert_eq!(RingBufferWakeupPolicy::from(42), RingBufferWakeupPolicy::DefaultWakeup);
462    }
463}