Skip to main content

ebpf_api/maps/
ring_buffer.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::{BpfValue, EbpfBufferPtr, MapSchema};
10use linux_uapi::{
11    BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12    BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
20
21// Signal used on ring buffer VMOs to indicate that the buffer has
22// incoming data.
23pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
24
25const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
26
27#[derive(Debug)]
28struct RingBufferState {
29    /// Address of the mapped ring buffer VMO.
30    base_addr: usize,
31
32    /// The mask corresponding to the size of the ring buffer. This is used to map back the
33    /// position in the ringbuffer (that are always growing) to their actual position in the memory
34    /// object.
35    mask: u32,
36}
37
38impl RingBufferState {
39    /// Pointer to the start of the data of the ring buffer.
40    fn data_addr(&self) -> usize {
41        self.base_addr + 3 * *MapBuffer::PAGE_SIZE
42    }
43
44    /// The never decreasing position of the read head of the ring buffer. This is updated
45    /// exclusively from userspace.
46    fn consumer_position(&self) -> &AtomicU64 {
47        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
48        // guarantees that the lock is acquired here.
49        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU64) }
50    }
51
52    /// The never decreasing position of the writing head of the ring buffer. This is updated
53    /// exclusively from the kernel.
54    fn producer_position(&self) -> &AtomicU64 {
55        // SAFETY: `RingBuffer::state()` wraps `self` in a lock, which
56        // guarantees that the lock is acquired here.
57        unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU64) }
58    }
59
60    /// Address of the specified `position` within the buffer.
61    fn data_position(&self, position: u64) -> usize {
62        self.data_addr() + ((position & (self.mask as u64)) as usize)
63    }
64
65    fn is_consumer_position(&self, addr: usize) -> bool {
66        let Some(position) = addr.checked_sub(self.data_addr()) else {
67            return false;
68        };
69        let position = position as u32;
70        let consumer_position =
71            self.consumer_position().load(Ordering::Acquire) & (self.mask as u64);
72        u64::from(position) == consumer_position
73    }
74
75    /// Access the memory at `position` as a `RingBufferRecordHeader`.
76    fn header_mut(&mut self, position: u64) -> &mut RingBufferRecordHeader {
77        // SAFETY
78        //
79        // Reading / writing to the header is safe because the access is exclusive thanks to the
80        // mutable reference to `self` and userspace has only a read only access to this memory.
81        #[allow(
82            clippy::undocumented_unsafe_blocks,
83            reason = "Force documented unsafe blocks in Starnix"
84        )]
85        unsafe {
86            &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
87        }
88    }
89}
90
91#[derive(Debug)]
92pub(crate) struct RingBuffer {
93    /// VMO used to store the map content. Reference-counted to make it possible to share the
94    /// handle with Starnix kernel, particularly for the case when a process needs to wait for
95    /// signals from the VMO (see RINGBUF_SIGNAL).
96    vmo: Arc<zx::Vmo>,
97
98    /// The specific memory address space used to map the ring buffer. This is the last field in
99    /// the struct so that all the data that conceptually points to it is destroyed before the
100    /// memory is unmapped.
101    vmar: AllocatedVmar,
102
103    /// The mask corresponding to the size of the ring buffer. It's used to map the positions
104    /// in the ringbuffer (that are always growing) to their actual position in the memory object.
105    mask: u32,
106}
107
108impl RingBuffer {
109    /// Build a new storage of a ring buffer. `size` must be a non zero multiple of the page size
110    /// and a power of 2.
111    ///
112    /// This will create a mapping in the kernel user space with the following layout:
113    ///
114    /// | T | L | C | P | D | D |
115    ///
116    /// where:
117    /// - T is 1 page containing at its 0 index a pointer to the `RingBuffer` itself.
118    /// - L is 1 page that stores a 32-bit lock state at offset 0. Hidden from user-space.
119    /// - C is 1 page containing at its 0 index a atomic u32 for the consumer position.
120    ///   Accessible in user-space for write.
121    /// - P is 1 page containing at its 0 index a atomic u32 for the producer position.
122    ///   Accessible in user-space for read-only access.
123    /// - D is size bytes and is the content of the ring buffer.
124    ///   Accessible in user-space for read-only access.
125    ///
126    /// All sections described above are stored in the shared VMO except for T.
127    ///
128    /// The returns value is a `Pin<Box>`, because the structure is self referencing and is
129    /// required never to move in memory.
130    pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
131        if schema.key_size != 0 || schema.value_size != 0 {
132            return Err(MapError::InvalidParam);
133        }
134
135        let page_size = *MapBuffer::PAGE_SIZE;
136        // Size must be a power of 2 and a multiple of page_size.
137        let size = schema.max_entries as usize;
138        if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
139            return Err(MapError::InvalidParam);
140        }
141        let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
142
143        // Technical VMO is mapped at the head of the VMAR used by the
144        // ring-buffer. It's used to store a pointer to the `RingBuffer`. This
145        // VMO is specific to this process.
146        let technical_vmo_size = page_size;
147
148        // Add 3 control pages at the head of the ring-buffer VMO.
149        let control_pages_size = 3 * page_size;
150        let vmo_size = control_pages_size + size;
151
152        let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
153        // SAFETY
154        //
155        // The returned value and all pointer to the allocated memory will be part of `Self` and
156        // all pointers will be dropped before the vmar. This ensures the deallocated memory will
157        // not be used after it has been freed.
158        #[allow(
159            clippy::undocumented_unsafe_blocks,
160            reason = "Force documented unsafe blocks in Starnix"
161        )]
162        let vmar = unsafe {
163            AllocatedVmar::allocate(
164                &kernel_root_vmar,
165                0,
166                // Allocate for one technical page, the control pages and twice the size.
167                technical_vmo_size + control_pages_size + 2 * size,
168                zx::VmarFlags::CAN_MAP_SPECIFIC
169                    | zx::VmarFlags::CAN_MAP_READ
170                    | zx::VmarFlags::CAN_MAP_WRITE,
171            )
172            .map_err(|_| MapError::Internal)?
173        };
174        let technical_vmo =
175            zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
176        technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
177        vmar.map(
178            0,
179            &technical_vmo,
180            0,
181            page_size,
182            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
183        )
184        .map_err(|_| MapError::Internal)?;
185
186        let vmo = match vmo.into() {
187            VmoOrName::Vmo(vmo) => {
188                let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
189                if vmo_size != actual_vmo_size {
190                    return Err(MapError::InvalidVmo);
191                }
192                vmo
193            }
194            VmoOrName::Name(name) => {
195                let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
196                    zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
197                    _ => MapError::Internal,
198                })?;
199                let name = format!("ebpf:ring_buffer:{name}");
200                vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
201                vmo
202            }
203        };
204
205        vmar.map(
206            technical_vmo_size,
207            &vmo,
208            0,
209            vmo_size,
210            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
211        )
212        .map_err(|_| MapError::Internal)?;
213
214        // Map the data again at the end.
215        vmar.map(
216            technical_vmo_size + vmo_size,
217            &vmo,
218            control_pages_size as u64,
219            size,
220            zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
221        )
222        .map_err(|_| MapError::Internal)?;
223
224        // SAFETY
225        //
226        // This is safe as long as the vmar mapping stays alive. This will be ensured by the
227        // `RingBuffer` itself.
228        #[allow(
229            clippy::undocumented_unsafe_blocks,
230            reason = "Force documented unsafe blocks in Starnix"
231        )]
232        let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
233        let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
234        // Store the pointer to the storage to the start of the technical vmo. This is required to
235        // access the storage from the bpf methods that only get a pointer to the reserved memory.
236        // This is safe as the returned referenced is Pinned.
237        *storage_position = storage.deref();
238        Ok(storage)
239    }
240
241    fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
242        let page_size = *MapBuffer::PAGE_SIZE;
243
244        // Creates a `RwMapLock` that wraps `RingBufferState`.
245        //
246        // SAFETY: Lifetime of the lock is tied to the lifetime of `self`,
247        // which guarantees that the mapping is not destroyed for the lifetime
248        // of the result. The lock guarantees that the access to the
249        // `RingBufferState` is synchronized with other threads sharing the ring
250        // buffer.
251        unsafe {
252            let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
253            RwMapLock::new(
254                lock_cell,
255                self.vmo.as_handle_ref(),
256                RINGBUF_LOCK_SIGNAL,
257                RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
258            )
259        }
260    }
261
262    /// Commits the section of the ringbuffer represented by the `header`. This only consist in
263    /// updating the header length with the correct state bits and signaling the map fd.
264    fn commit(
265        &self,
266        header: &RingBufferRecordHeader,
267        flags: RingBufferWakeupPolicy,
268        discard: bool,
269    ) {
270        let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
271        if discard {
272            new_length |= BPF_RINGBUF_DISCARD_BIT;
273        }
274        header.length.store(new_length, Ordering::Release);
275
276        // Send a signal either if it is forced, or it is the default and the committed entry is
277        // the next one the client will consume.
278        let state = self.state().read();
279        if flags == RingBufferWakeupPolicy::ForceWakeup
280            || (flags == RingBufferWakeupPolicy::DefaultWakeup
281                && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
282        {
283            self.vmo
284                .as_handle_ref()
285                .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
286                .expect("Failed to set signal or a ring buffer VMO");
287        }
288    }
289
290    /// Submit the data.
291    ///
292    /// # Safety
293    ///
294    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
295    /// on a map that has not been dropped, otherwise the behaviour is UB.
296    pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
297        let addr = addr as usize;
298        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
299        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
300        ringbuf_storage.commit(header, flags, false);
301    }
302
303    /// Discard the data.
304    ///
305    /// # Safety
306    ///
307    /// `addr` must be the value returned by a previous call to `ringbuf_reserve`
308    /// on a map that has not been dropped, otherwise the behaviour is UB.
309    pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
310        let addr = addr as usize;
311        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
312        let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
313        ringbuf_storage.commit(header, flags, true);
314    }
315
316    /// Get the `RingBufferImpl` and the `RingBufferRecordHeader` associated with `addr`.
317    ///
318    /// # Safety
319    ///
320    /// `addr` must be the value returned from a previous call to `ringbuf_reserve` on a `Map` that
321    /// has not been dropped and is kept alive as long as the returned value are used.
322    unsafe fn get_ringbug_and_header_by_addr(
323        addr: usize,
324    ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
325        let page_size = *MapBuffer::PAGE_SIZE;
326        // addr is the data section. First access the header.
327        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
328        let header = unsafe {
329            &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
330                as *const RingBufferRecordHeader)
331        };
332        let addr_page = addr / page_size;
333        let mapping_start_page = addr_page - header.page_count as usize - 1;
334        let mapping_start_address = mapping_start_page * page_size;
335        #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
336        let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
337        (ringbuf_impl, header)
338    }
339}
340
341impl MapImpl for RingBuffer {
342    fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
343        None
344    }
345
346    fn update(&self, _key: &[u8], _value: EbpfBufferPtr<'_>, _flags: u64) -> Result<(), MapError> {
347        Err(MapError::InvalidParam)
348    }
349
350    fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
351        Err(MapError::InvalidParam)
352    }
353
354    fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
355        Err(MapError::InvalidParam)
356    }
357
358    fn vmo(&self) -> &Arc<zx::Vmo> {
359        &self.vmo
360    }
361
362    fn can_read(&self) -> Option<bool> {
363        let mut state = self.state().write();
364        let consumer_position = state.consumer_position().load(Ordering::Acquire);
365        let producer_position = state.producer_position().load(Ordering::Acquire);
366
367        if consumer_position % 8 != 0 {
368            return Some(false);
369        }
370
371        // Read the header at the consumer position, and check that the entry is not busy.
372        let can_read = consumer_position < producer_position
373            && ((*state.header_mut(consumer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
374                == 0);
375        Some(can_read)
376    }
377
378    fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
379        if flags != 0 {
380            return Err(MapError::InvalidParam);
381        }
382
383        //  The top two bits are used as special flags.
384        if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
385            return Err(MapError::InvalidParam);
386        }
387
388        let mut state = self.state().write();
389        let consumer_position = state.consumer_position().load(Ordering::Acquire);
390        let producer_position = state.producer_position().load(Ordering::Acquire);
391        let max_size = (self.mask + 1) as u64;
392
393        // Available size on the ringbuffer.
394        let consumed_size = producer_position.wrapping_sub(consumer_position);
395        let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
396
397        const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
398
399        // Total size of the message to write. This is the requested size + the header, rounded up
400        // to align the next header.
401        let total_size: u32 = size
402            .checked_add(BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1)
403            .ok_or(MapError::InvalidParam)?
404            / HEADER_ALIGNMENT
405            * HEADER_ALIGNMENT;
406
407        if u64::from(total_size) > available_size {
408            return Err(MapError::SizeLimit);
409        }
410        let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
411        let data_length = size | BPF_RINGBUF_BUSY_BIT;
412        let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
413            .try_into()
414            .map_err(|_| MapError::SizeLimit)?;
415        let header = state.header_mut(producer_position);
416        *header.length.get_mut() = data_length;
417        header.page_count = page_count;
418        state
419            .producer_position()
420            .store(producer_position + u64::from(total_size), Ordering::Release);
421        Ok(data_position)
422    }
423}
424
425#[repr(u32)]
426#[derive(Clone, Copy, Debug, PartialEq, Eq)]
427pub(crate) enum RingBufferWakeupPolicy {
428    DefaultWakeup = 0,
429    NoWakeup = BPF_RB_NO_WAKEUP,
430    ForceWakeup = BPF_RB_FORCE_WAKEUP,
431}
432
433impl From<BpfValue> for RingBufferWakeupPolicy {
434    fn from(v: BpfValue) -> Self {
435        let v = u32::try_from(v).unwrap_or(0);
436        match v {
437            BPF_RB_NO_WAKEUP => Self::NoWakeup,
438            BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
439            // If flags is invalid, use the default value. This is necessary to prevent userspace
440            // leaking ringbuf value by calling into the kernel with an incorrect flag value.
441            _ => Self::DefaultWakeup,
442        }
443    }
444}
445
446#[repr(C)]
447#[repr(align(8))]
448#[derive(Debug)]
449struct RingBufferRecordHeader {
450    length: AtomicU32,
451    page_count: u32,
452}
453
454const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
455
456#[cfg(test)]
457mod test {
458    use super::*;
459    use ebpf::MapFlags;
460
461    #[fuchsia::test]
462    fn test_ring_buffer_wakeup_policy() {
463        assert_eq!(
464            RingBufferWakeupPolicy::from(BpfValue::from(0)),
465            RingBufferWakeupPolicy::DefaultWakeup
466        );
467        assert_eq!(
468            RingBufferWakeupPolicy::from(BpfValue::from(BPF_RB_NO_WAKEUP)),
469            RingBufferWakeupPolicy::NoWakeup
470        );
471        assert_eq!(
472            RingBufferWakeupPolicy::from(BpfValue::from(BPF_RB_FORCE_WAKEUP)),
473            RingBufferWakeupPolicy::ForceWakeup
474        );
475        assert_eq!(
476            RingBufferWakeupPolicy::from(BpfValue::from(42)),
477            RingBufferWakeupPolicy::DefaultWakeup
478        );
479    }
480
481    #[fuchsia::test]
482    fn test_ring_buffer_can_read() {
483        let schema = MapSchema {
484            map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
485            key_size: 0,
486            value_size: 0,
487            max_entries: 4096,
488            flags: MapFlags::empty(),
489        };
490
491        let ringbuf = RingBuffer::new(&schema, "test_ringbuf").unwrap();
492
493        // 1. Initially, can_read should be false since consumer == producer.
494        assert_eq!(ringbuf.can_read(), Some(false));
495
496        // 2. After reserving space, can_read should still be false because the
497        //    entry is marked busy.
498        let data_pos = ringbuf.ringbuf_reserve(16, 0).unwrap();
499        assert_eq!(ringbuf.can_read(), Some(false));
500
501        // 3. After submitting, the busy bit is cleared and can_read should be true.
502        // SAFETY: `data_pos` was successfully returned by `ringbuf_reserve` on this
503        // same buffer.
504        unsafe {
505            RingBuffer::submit(data_pos as u64, RingBufferWakeupPolicy::DefaultWakeup);
506        }
507        assert_eq!(ringbuf.can_read(), Some(true));
508
509        // 4. If consumer position advances to match producer, can_read should
510        //    be false again.
511        let producer_pos = ringbuf.state().read().producer_position().load(Ordering::Acquire);
512        ringbuf.state().write().consumer_position().store(producer_pos, Ordering::Release);
513        assert_eq!(ringbuf.can_read(), Some(false));
514    }
515
516    #[fuchsia::test]
517    fn test_ring_buffer_wrapping() {
518        let schema = MapSchema {
519            map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
520            key_size: 0,
521            value_size: 0,
522            max_entries: 4096,
523            flags: MapFlags::empty(),
524        };
525
526        let ringbuf = RingBuffer::new(&schema, "test_ringbuf_wrapping").unwrap();
527
528        // Set producer and consumer positions close to wrapping
529        // Use multiples of 8 to avoid alignment panic!
530        {
531            let state = ringbuf.state().read();
532            state.producer_position().store(0xFFFFFFFFFFFFFFF8, Ordering::Release);
533            state.consumer_position().store(0xFFFFFFFFFFFFFFF0, Ordering::Release);
534        }
535
536        // Available size on the ringbuffer should be accounted correctly.
537        // producer = 0xFFFFFFFFFFFFFFF8, consumer = 0xFFFFFFFFFFFFFFF0.
538        // consumed = 8.
539
540        // Now simulate producer wrapping around!
541        {
542            let state = ringbuf.state().read();
543            state.producer_position().store(8, Ordering::Release);
544        }
545        // Now producer = 8, consumer = 0xFFFFFFFFFFFFFFF0.
546        // The difference should be 8 - 0xFFFFFFFFFFFFFFF0 = 24 (modulo 2^64).
547
548        // ringbuf_reserve should SUCCEED because there is plenty of space!
549        // But in current code it will fail with InvalidParam because 8 < 0xFFFFFFFFFFFFFFF0!
550        let result = ringbuf.ringbuf_reserve(16, 0);
551        assert!(
552            result.is_ok(),
553            "Expected ringbuf_reserve to succeed despite wrapping, but got {:?}",
554            result
555        );
556    }
557
558    #[fuchsia::test]
559    fn test_ring_buffer_consumer_advance_wrap() {
560        let schema = MapSchema {
561            map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
562            key_size: 0,
563            value_size: 0,
564            max_entries: 4096,
565            flags: MapFlags::empty(),
566        };
567
568        let ringbuf = RingBuffer::new(&schema, "test_ringbuf_advance").unwrap();
569
570        // Set producer at 8 (wrapped), consumer at 0xFFFFFFFFFFFFFFF0 (not wrapped)
571        {
572            let state = ringbuf.state().read();
573            state.producer_position().store(8, Ordering::Release);
574            state.consumer_position().store(0xFFFFFFFFFFFFFFF0, Ordering::Release);
575        }
576
577        // Simulate user space advancing consumer to 0 (wrapped)
578        {
579            let state = ringbuf.state().read();
580            state.consumer_position().store(0, Ordering::Release);
581        }
582
583        // Now producer = 8, consumer = 0.
584        // Both are in the same cycle.
585        // consumed_size = 8 - 0 = 8.
586        // available_size = 4096 - 8 = 4088.
587
588        // ringbuf_reserve should SUCCEED in current code because 8 >= 0.
589        let result = ringbuf.ringbuf_reserve(16, 0);
590        assert!(result.is_ok(), "Expected ringbuf_reserve to succeed, but got {:?}", result);
591
592        // Check that it allocated at the correct position (producer was 8)
593        let data_pos = result.unwrap();
594        let state = ringbuf.state().read();
595        let expected_pos = state.data_position(8) + BPF_RINGBUF_HDR_SZ as usize;
596        assert_eq!(data_pos, expected_pos);
597    }
598
599    #[fuchsia::test]
600    fn test_ring_buffer_unaligned_consumer() {
601        let schema = MapSchema {
602            map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
603            key_size: 0,
604            value_size: 0,
605            max_entries: 4096,
606            flags: MapFlags::empty(),
607        };
608
609        let ringbuf = RingBuffer::new(&schema, "test_ringbuf_unaligned").unwrap();
610
611        // Set producer at 8, consumer at 5 (unaligned!)
612        {
613            let state = ringbuf.state().read();
614            state.producer_position().store(8, Ordering::Release);
615            state.consumer_position().store(5, Ordering::Release);
616        }
617
618        // can_read should return false because the consumer position is invalid (unaligned).
619        let result = ringbuf.can_read();
620        assert_eq!(result, Some(false));
621    }
622
623    #[fuchsia::test]
624    fn test_ring_buffer_overflow() {
625        let schema = MapSchema {
626            map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
627            key_size: 0,
628            value_size: 0,
629            max_entries: 4096,
630            flags: MapFlags::empty(),
631        };
632
633        let ringbuf = RingBuffer::new(&schema, "test_ringbuf").unwrap();
634
635        // Reserving size that is close to u32::MAX should fail.
636        assert_eq!(ringbuf.ringbuf_reserve(0xffff_ffff, 0), Err(MapError::InvalidParam));
637        assert_eq!(ringbuf.ringbuf_reserve(0xffff_fff1, 0), Err(MapError::InvalidParam));
638        assert_eq!(ringbuf.ringbuf_reserve(0x3fff_ffff, 0), Err(MapError::SizeLimit));
639    }
640
641}