ebpf_api/maps/
ring_buffer.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::MapSchema;
10use linux_uapi::{
11    BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12    BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20use zx::AsHandleRef;
21
22// Signal used on ring buffer VMOs to indicate that the buffer has
23// incoming data.
24pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
25
26const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
27
28#[derive(Debug)]
29struct RingBufferState {
30    /// Address of the mapped ring buffer VMO.
31    base_addr: usize,
32
33    /// The mask corresponding to the size of the ring buffer. This is used to map back the
34    /// position in the ringbuffer (that are always growing) to their actual position in the memory
35    /// object.
36    mask: u32,
37}
38
39impl RingBufferState {
40    /// Pointer to the start of the data of the ring buffer.
41    fn data_addr(&self) -> usize {
42        self.base_addr + 3 * *MapBuffer::PAGE_SIZE
43    }
44
45    /// The never decreasing position of the read head of the ring buffer. This is updated
46    /// exclusively from userspace.
47    fn consumer_position(&self) -> &AtomicU32 {
48        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
49        // guarantees that the lock is acquired here.
50        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU32) }
51    }
52
53    /// The never decreasing position of the writing head of the ring buffer. This is updated
54    /// exclusively from the kernel.
55    fn producer_position(&self) -> &AtomicU32 {
56        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
57        // guarantees that the lock is acquired here.
58        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU32) }
59    }
60
61    /// Address of the specified `position` within the buffer.
62    fn data_position(&self, position: u32) -> usize {
63        self.data_addr() + ((position & self.mask) as usize)
64    }
65
66    fn is_consumer_position(&self, addr: usize) -> bool {
67        let Some(position) = addr.checked_sub(self.data_addr()) else {
68            return false;
69        };
70        let position = position as u32;
71        let consumer_position = self.consumer_position().load(Ordering::Acquire) & self.mask;
72        position == consumer_position
73    }
74
75    /// Access the memory at `position` as a `RingBufferRecordHeader`.
76    fn header_mut(&mut self, position: u32) -> &mut RingBufferRecordHeader {
77        // SAFETY
78        //
79        // Reading / writing to the header is safe because the access is exclusive thanks to the
80        // mutable reference to `self` and userspace has only a read only access to this memory.
81        #[allow(
82            clippy::undocumented_unsafe_blocks,
83            reason = "Force documented unsafe blocks in Starnix"
84        )]
85        unsafe {
86            &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
87        }
88    }
89}
90
91#[derive(Debug)]
92pub(crate) struct RingBuffer {
93    /// VMO used to store the map content. Reference-counted to make it possible to share the
94    /// handle with Starnix kernel, particularly for the case when a process needs to wait for
95    /// signals from the VMO (see RINGBUF_SIGNAL).
96    vmo: Arc<zx::Vmo>,
97
98    /// The specific memory address space used to map the ring buffer. This is the last field in
99    /// the struct so that all the data that conceptually points to it is destroyed before the
100    /// memory is unmapped.
101    vmar: AllocatedVmar,
102
103    /// The mask corresponding to the size of the ring buffer. It's used to map the positions
104    /// in the ringbuffer (that are always growing) to their actual position in the memory object.
105    mask: u32,
106}
107
108impl RingBuffer {
109    /// Build a new storage of a ring buffer. `size` must be a non zero multiple of the page size
110    /// and a power of 2.
111    ///
112    /// This will create a mapping in the kernel user space with the following layout:
113    ///
114    /// | T | L | C | P | D | D |
115    ///
116    /// where:
117    /// - T is 1 page containing at its 0 index a pointer to the `RingBuffer` itself.
118    /// - L is 1 page that stores a 32-bit lock state at offset 0. Hidden from user-space.
119    /// - C is 1 page containing at its 0 index a atomic u32 for the consumer position.
120    ///   Accessible in user-space for write.
121    /// - P is 1 page containing at its 0 index a atomic u32 for the producer position.
122    ///   Accessible in user-space for read-only access.
123    /// - D is size bytes and is the content of the ring buffer.
124    ///   Accessible in user-space for read-only access.
125    ///
126    /// All sections described above are stored in the shared VMO except for T.
127    ///
128    /// The returns value is a `Pin<Box>`, because the structure is self referencing and is
129    /// required never to move in memory.
130    pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
131        if schema.key_size != 0 || schema.value_size != 0 {
132            return Err(MapError::InvalidParam);
133        }
134
135        let page_size = *MapBuffer::PAGE_SIZE;
136        // Size must be a power of 2 and a multiple of page_size.
137        let size = schema.max_entries as usize;
138        if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
139            return Err(MapError::InvalidParam);
140        }
141        let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
142
143        // Technical VMO is mapped at the head of the VMAR used by the
144        // ring-buffer. It's used to store a pointer to the `RingBuffer`. This
145        // VMO is specific to this process.
146        let technical_vmo_size = page_size;
147
148        // Add 3 control pages at the head of the ring-buffer VMO.
149        let control_pages_size = 3 * page_size;
150        let vmo_size = control_pages_size + size;
151
152        let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
153        // SAFETY
154        //
155        // The returned value and all pointer to the allocated memory will be part of `Self` and
156        // all pointers will be dropped before the vmar. This ensures the deallocated memory will
157        // not be used after it has been freed.
158        #[allow(
159            clippy::undocumented_unsafe_blocks,
160            reason = "Force documented unsafe blocks in Starnix"
161        )]
162        let vmar = unsafe {
163            AllocatedVmar::allocate(
164                &kernel_root_vmar,
165                0,
166                // Allocate for one technical page, the control pages and twice the size.
167                technical_vmo_size + control_pages_size + 2 * size,
168                zx::VmarFlags::CAN_MAP_SPECIFIC
169                    | zx::VmarFlags::CAN_MAP_READ
170                    | zx::VmarFlags::CAN_MAP_WRITE,
171            )
172            .map_err(|_| MapError::Internal)?
173        };
174        let technical_vmo =
175            zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
176        technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
177        vmar.map(
178            0,
179            &technical_vmo,
180            0,
181            page_size,
182            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
183        )
184        .map_err(|_| MapError::Internal)?;
185
186        let vmo = match vmo.into() {
187            VmoOrName::Vmo(vmo) => {
188                let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
189                if vmo_size != actual_vmo_size {
190                    return Err(MapError::InvalidVmo);
191                }
192                vmo
193            }
194            VmoOrName::Name(name) => {
195                let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
196                    zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
197                    _ => MapError::Internal,
198                })?;
199                let name = format!("ebpf:ring_buffer:{name}");
200                vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
201                vmo
202            }
203        };
204
205        vmar.map(
206            technical_vmo_size,
207            &vmo,
208            0,
209            vmo_size,
210            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
211        )
212        .map_err(|_| MapError::Internal)?;
213
214        // Map the data again at the end.
215        vmar.map(
216            technical_vmo_size + vmo_size,
217            &vmo,
218            control_pages_size as u64,
219            size,
220            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
221        )
222        .map_err(|_| MapError::Internal)?;
223
224        // SAFETY
225        //
226        // This is safe as long as the vmar mapping stays alive. This will be ensured by the
227        // `RingBuffer` itself.
228        #[allow(
229            clippy::undocumented_unsafe_blocks,
230            reason = "Force documented unsafe blocks in Starnix"
231        )]
232        let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
233        let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
234        // Store the pointer to the storage to the start of the technical vmo. This is required to
235        // access the storage from the bpf methods that only get a pointer to the reserved memory.
236        // This is safe as the returned referenced is Pinned.
237        *storage_position = storage.deref();
238        Ok(storage)
239    }
240
241    fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
242        let page_size = *MapBuffer::PAGE_SIZE;
243
244        // Creates a `RwMapLock` that wraps `RingBufferState`.
245        //
246        // SAFETY: Lifetime of the lock is tied to the lifetime of `self`,
247        // which guarantees that the mapping is not destroyed for the lifetime
248        // of the result. The lock guarantees that the access to the
249        // `RingBufferState` is synchronized with other threads sharing the ring
250        // buffer.
251        unsafe {
252            let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
253            RwMapLock::new(
254                lock_cell,
255                self.vmo.as_handle_ref(),
256                RINGBUF_LOCK_SIGNAL,
257                RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
258            )
259        }
260    }
261
262    /// Commits the section of the ringbuffer represented by the `header`. This only consist in
263    /// updating the header length with the correct state bits and signaling the map fd.
264    fn commit(
265        &self,
266        header: &RingBufferRecordHeader,
267        flags: RingBufferWakeupPolicy,
268        discard: bool,
269    ) {
270        let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
271        if discard {
272            new_length |= BPF_RINGBUF_DISCARD_BIT;
273        }
274        header.length.store(new_length, Ordering::Release);
275
276        // Send a signal either if it is forced, or it is the default and the committed entry is
277        // the next one the client will consume.
278        let state = self.state().read();
279        if flags == RingBufferWakeupPolicy::ForceWakeup
280            || (flags == RingBufferWakeupPolicy::DefaultWakeup
281                && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
282        {
283            self.vmo
284                .as_handle_ref()
285                .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
286                .expect("Failed to set signal or a ring buffer VMO");
287        }
288    }
289
290    /// Submit the data.
291    ///
292    /// # Safety
293    ///
294    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
295    /// on a map that has not been dropped, otherwise the behaviour is UB.
296    pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
297        let addr = addr as usize;
298        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
299        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
300        ringbuf_storage.commit(header, flags, false);
301    }
302
303    /// Discard the data.
304    ///
305    /// # Safety
306    ///
307    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
308    /// on a map that has not been dropped, otherwise the behaviour is UB.
309    pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
310        let addr = addr as usize;
311        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
312        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
313        ringbuf_storage.commit(header, flags, true);
314    }
315
316    /// Get the `RingBufferImpl` and the `RingBufferRecordHeader` associated with `addr`.
317    ///
318    /// # Safety
319    ///
320    /// `addr` must be the value returned from a previous call to `ringbuf_reserve` on a `Map` that
321    /// has not been dropped and is kept alive as long as the returned value are used.
322    unsafe fn get_ringbug_and_header_by_addr(
323        addr: usize,
324    ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
325        let page_size = *MapBuffer::PAGE_SIZE;
326        // addr is the data section. First access the header.
327        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
328        let header = unsafe {
329            &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
330                as *const RingBufferRecordHeader)
331        };
332        let addr_page = addr / page_size;
333        let mapping_start_page = addr_page - header.page_count as usize - 1;
334        let mapping_start_address = mapping_start_page * page_size;
335        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
336        let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
337        (ringbuf_impl, header)
338    }
339}
340
341impl MapImpl for RingBuffer {
342    fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
343        None
344    }
345
346    fn update(&self, _key: MapKey, _value: &[u8], _flags: u64) -> Result<(), MapError> {
347        Err(MapError::InvalidParam)
348    }
349
350    fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
351        Err(MapError::InvalidParam)
352    }
353
354    fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
355        Err(MapError::InvalidParam)
356    }
357
358    fn vmo(&self) -> &Arc<zx::Vmo> {
359        &self.vmo
360    }
361
362    fn can_read(&self) -> Option<bool> {
363        let mut state = self.state().write();
364        let consumer_position = state.consumer_position().load(Ordering::Acquire);
365        let producer_position = state.producer_position().load(Ordering::Acquire);
366
367        // Read the header at the consumer position, and check that the entry is not busy.
368        let can_read = consumer_position < producer_position
369            && ((*state.header_mut(producer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
370                == 0);
371        Some(can_read)
372    }
373
374    fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
375        if flags != 0 {
376            return Err(MapError::InvalidParam);
377        }
378
379        //  The top two bits are used as special flags.
380        if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
381            return Err(MapError::InvalidParam);
382        }
383
384        let mut state = self.state().write();
385        let consumer_position = state.consumer_position().load(Ordering::Acquire);
386        let producer_position = state.producer_position().load(Ordering::Acquire);
387        let max_size = self.mask + 1;
388
389        // Available size on the ringbuffer.
390        let consumed_size =
391            producer_position.checked_sub(consumer_position).ok_or(MapError::InvalidParam)?;
392        let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
393
394        const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
395
396        // Total size of the message to write. This is the requested size + the header, rounded up
397        // to align the next header.
398        let total_size: u32 = (size + BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1) / HEADER_ALIGNMENT
399            * HEADER_ALIGNMENT;
400
401        if total_size > available_size {
402            return Err(MapError::SizeLimit);
403        }
404        let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
405        let data_length = size | BPF_RINGBUF_BUSY_BIT;
406        let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
407            .try_into()
408            .map_err(|_| MapError::SizeLimit)?;
409        let header = state.header_mut(producer_position);
410        *header.length.get_mut() = data_length;
411        header.page_count = page_count;
412        state.producer_position().store(producer_position + total_size, Ordering::Release);
413        Ok(data_position)
414    }
415}
416
417#[repr(u32)]
418#[derive(Clone, Copy, Debug, PartialEq, Eq)]
419pub(crate) enum RingBufferWakeupPolicy {
420    DefaultWakeup = 0,
421    NoWakeup = BPF_RB_NO_WAKEUP,
422    ForceWakeup = BPF_RB_FORCE_WAKEUP,
423}
424
425impl From<u32> for RingBufferWakeupPolicy {
426    fn from(v: u32) -> Self {
427        match v {
428            BPF_RB_NO_WAKEUP => Self::NoWakeup,
429            BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
430            // If flags is invalid, use the default value. This is necessary to prevent userspace
431            // leaking ringbuf value by calling into the kernel with an incorrect flag value.
432            _ => Self::DefaultWakeup,
433        }
434    }
435}
436
437#[repr(C)]
438#[repr(align(8))]
439#[derive(Debug)]
440struct RingBufferRecordHeader {
441    length: AtomicU32,
442    page_count: u32,
443}
444
445const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
446
447#[cfg(test)]
448mod test {
449    use super::*;
450
451    #[fuchsia::test]
452    fn test_ring_buffer_wakeup_policy() {
453        assert_eq!(RingBufferWakeupPolicy::from(0), RingBufferWakeupPolicy::DefaultWakeup);
454        assert_eq!(
455            RingBufferWakeupPolicy::from(BPF_RB_NO_WAKEUP),
456            RingBufferWakeupPolicy::NoWakeup
457        );
458        assert_eq!(
459            RingBufferWakeupPolicy::from(BPF_RB_FORCE_WAKEUP),
460            RingBufferWakeupPolicy::ForceWakeup
461        );
462        assert_eq!(RingBufferWakeupPolicy::from(42), RingBufferWakeupPolicy::DefaultWakeup);
463    }
464}