1use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::{BpfValue, EbpfBufferPtr, MapSchema};
10use linux_uapi::{
11 BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12 BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
20
21pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
24
25const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
26
27#[derive(Debug)]
28struct RingBufferState {
29 base_addr: usize,
31
32 mask: u32,
36}
37
38impl RingBufferState {
39 fn data_addr(&self) -> usize {
41 self.base_addr + 3 * *MapBuffer::PAGE_SIZE
42 }
43
44 fn consumer_position(&self) -> &AtomicU64 {
47 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU64) }
50 }
51
52 fn producer_position(&self) -> &AtomicU64 {
55 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU64) }
58 }
59
60 fn data_position(&self, position: u64) -> usize {
62 self.data_addr() + ((position & (self.mask as u64)) as usize)
63 }
64
65 fn is_consumer_position(&self, addr: usize) -> bool {
66 let Some(position) = addr.checked_sub(self.data_addr()) else {
67 return false;
68 };
69 let position = position as u32;
70 let consumer_position =
71 self.consumer_position().load(Ordering::Acquire) & (self.mask as u64);
72 u64::from(position) == consumer_position
73 }
74
75 fn header_mut(&mut self, position: u64) -> &mut RingBufferRecordHeader {
77 #[allow(
82 clippy::undocumented_unsafe_blocks,
83 reason = "Force documented unsafe blocks in Starnix"
84 )]
85 unsafe {
86 &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
87 }
88 }
89}
90
91#[derive(Debug)]
92pub(crate) struct RingBuffer {
93 vmo: Arc<zx::Vmo>,
97
98 vmar: AllocatedVmar,
102
103 mask: u32,
106}
107
108impl RingBuffer {
109 pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
131 if schema.key_size != 0 || schema.value_size != 0 {
132 return Err(MapError::InvalidParam);
133 }
134
135 let page_size = *MapBuffer::PAGE_SIZE;
136 let size = schema.max_entries as usize;
138 if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
139 return Err(MapError::InvalidParam);
140 }
141 let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
142
143 let technical_vmo_size = page_size;
147
148 let control_pages_size = 3 * page_size;
150 let vmo_size = control_pages_size + size;
151
152 let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
153 #[allow(
159 clippy::undocumented_unsafe_blocks,
160 reason = "Force documented unsafe blocks in Starnix"
161 )]
162 let vmar = unsafe {
163 AllocatedVmar::allocate(
164 &kernel_root_vmar,
165 0,
166 technical_vmo_size + control_pages_size + 2 * size,
168 zx::VmarFlags::CAN_MAP_SPECIFIC
169 | zx::VmarFlags::CAN_MAP_READ
170 | zx::VmarFlags::CAN_MAP_WRITE,
171 )
172 .map_err(|_| MapError::Internal)?
173 };
174 let technical_vmo =
175 zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
176 technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
177 vmar.map(
178 0,
179 &technical_vmo,
180 0,
181 page_size,
182 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
183 )
184 .map_err(|_| MapError::Internal)?;
185
186 let vmo = match vmo.into() {
187 VmoOrName::Vmo(vmo) => {
188 let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
189 if vmo_size != actual_vmo_size {
190 return Err(MapError::InvalidVmo);
191 }
192 vmo
193 }
194 VmoOrName::Name(name) => {
195 let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
196 zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
197 _ => MapError::Internal,
198 })?;
199 let name = format!("ebpf:ring_buffer:{name}");
200 vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
201 vmo
202 }
203 };
204
205 vmar.map(
206 technical_vmo_size,
207 &vmo,
208 0,
209 vmo_size,
210 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
211 )
212 .map_err(|_| MapError::Internal)?;
213
214 vmar.map(
216 technical_vmo_size + vmo_size,
217 &vmo,
218 control_pages_size as u64,
219 size,
220 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
221 )
222 .map_err(|_| MapError::Internal)?;
223
224 #[allow(
229 clippy::undocumented_unsafe_blocks,
230 reason = "Force documented unsafe blocks in Starnix"
231 )]
232 let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
233 let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
234 *storage_position = storage.deref();
238 Ok(storage)
239 }
240
241 fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
242 let page_size = *MapBuffer::PAGE_SIZE;
243
244 unsafe {
252 let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
253 RwMapLock::new(
254 lock_cell,
255 self.vmo.as_handle_ref(),
256 RINGBUF_LOCK_SIGNAL,
257 RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
258 )
259 }
260 }
261
262 fn commit(
265 &self,
266 header: &RingBufferRecordHeader,
267 flags: RingBufferWakeupPolicy,
268 discard: bool,
269 ) {
270 let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
271 if discard {
272 new_length |= BPF_RINGBUF_DISCARD_BIT;
273 }
274 header.length.store(new_length, Ordering::Release);
275
276 let state = self.state().read();
279 if flags == RingBufferWakeupPolicy::ForceWakeup
280 || (flags == RingBufferWakeupPolicy::DefaultWakeup
281 && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
282 {
283 self.vmo
284 .as_handle_ref()
285 .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
286 .expect("Failed to set signal or a ring buffer VMO");
287 }
288 }
289
290 pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
297 let addr = addr as usize;
298 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
299 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
300 ringbuf_storage.commit(header, flags, false);
301 }
302
303 pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
310 let addr = addr as usize;
311 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
312 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
313 ringbuf_storage.commit(header, flags, true);
314 }
315
316 unsafe fn get_ringbug_and_header_by_addr(
323 addr: usize,
324 ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
325 let page_size = *MapBuffer::PAGE_SIZE;
326 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
328 let header = unsafe {
329 &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
330 as *const RingBufferRecordHeader)
331 };
332 let addr_page = addr / page_size;
333 let mapping_start_page = addr_page - header.page_count as usize - 1;
334 let mapping_start_address = mapping_start_page * page_size;
335 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
336 let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
337 (ringbuf_impl, header)
338 }
339}
340
341impl MapImpl for RingBuffer {
342 fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
343 None
344 }
345
346 fn update(&self, _key: &[u8], _value: EbpfBufferPtr<'_>, _flags: u64) -> Result<(), MapError> {
347 Err(MapError::InvalidParam)
348 }
349
350 fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
351 Err(MapError::InvalidParam)
352 }
353
354 fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
355 Err(MapError::InvalidParam)
356 }
357
358 fn vmo(&self) -> &Arc<zx::Vmo> {
359 &self.vmo
360 }
361
362 fn can_read(&self) -> Option<bool> {
363 let mut state = self.state().write();
364 let consumer_position = state.consumer_position().load(Ordering::Acquire);
365 let producer_position = state.producer_position().load(Ordering::Acquire);
366
367 if consumer_position % 8 != 0 {
368 return Some(false);
369 }
370
371 let can_read = consumer_position < producer_position
373 && ((*state.header_mut(consumer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
374 == 0);
375 Some(can_read)
376 }
377
378 fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
379 if flags != 0 {
380 return Err(MapError::InvalidParam);
381 }
382
383 if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
385 return Err(MapError::InvalidParam);
386 }
387
388 let mut state = self.state().write();
389 let consumer_position = state.consumer_position().load(Ordering::Acquire);
390 let producer_position = state.producer_position().load(Ordering::Acquire);
391 let max_size = (self.mask + 1) as u64;
392
393 let consumed_size = producer_position.wrapping_sub(consumer_position);
395 let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
396
397 const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
398
399 let total_size: u32 = size
402 .checked_add(BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1)
403 .ok_or(MapError::InvalidParam)?
404 / HEADER_ALIGNMENT
405 * HEADER_ALIGNMENT;
406
407 if u64::from(total_size) > available_size {
408 return Err(MapError::SizeLimit);
409 }
410 let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
411 let data_length = size | BPF_RINGBUF_BUSY_BIT;
412 let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
413 .try_into()
414 .map_err(|_| MapError::SizeLimit)?;
415 let header = state.header_mut(producer_position);
416 *header.length.get_mut() = data_length;
417 header.page_count = page_count;
418 state
419 .producer_position()
420 .store(producer_position + u64::from(total_size), Ordering::Release);
421 Ok(data_position)
422 }
423}
424
425#[repr(u32)]
426#[derive(Clone, Copy, Debug, PartialEq, Eq)]
427pub(crate) enum RingBufferWakeupPolicy {
428 DefaultWakeup = 0,
429 NoWakeup = BPF_RB_NO_WAKEUP,
430 ForceWakeup = BPF_RB_FORCE_WAKEUP,
431}
432
433impl From<BpfValue> for RingBufferWakeupPolicy {
434 fn from(v: BpfValue) -> Self {
435 let v = u32::try_from(v).unwrap_or(0);
436 match v {
437 BPF_RB_NO_WAKEUP => Self::NoWakeup,
438 BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
439 _ => Self::DefaultWakeup,
442 }
443 }
444}
445
446#[repr(C)]
447#[repr(align(8))]
448#[derive(Debug)]
449struct RingBufferRecordHeader {
450 length: AtomicU32,
451 page_count: u32,
452}
453
454const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
455
456#[cfg(test)]
457mod test {
458 use super::*;
459 use ebpf::MapFlags;
460
461 #[fuchsia::test]
462 fn test_ring_buffer_wakeup_policy() {
463 assert_eq!(
464 RingBufferWakeupPolicy::from(BpfValue::from(0)),
465 RingBufferWakeupPolicy::DefaultWakeup
466 );
467 assert_eq!(
468 RingBufferWakeupPolicy::from(BpfValue::from(BPF_RB_NO_WAKEUP)),
469 RingBufferWakeupPolicy::NoWakeup
470 );
471 assert_eq!(
472 RingBufferWakeupPolicy::from(BpfValue::from(BPF_RB_FORCE_WAKEUP)),
473 RingBufferWakeupPolicy::ForceWakeup
474 );
475 assert_eq!(
476 RingBufferWakeupPolicy::from(BpfValue::from(42)),
477 RingBufferWakeupPolicy::DefaultWakeup
478 );
479 }
480
481 #[fuchsia::test]
482 fn test_ring_buffer_can_read() {
483 let schema = MapSchema {
484 map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
485 key_size: 0,
486 value_size: 0,
487 max_entries: 4096,
488 flags: MapFlags::empty(),
489 };
490
491 let ringbuf = RingBuffer::new(&schema, "test_ringbuf").unwrap();
492
493 assert_eq!(ringbuf.can_read(), Some(false));
495
496 let data_pos = ringbuf.ringbuf_reserve(16, 0).unwrap();
499 assert_eq!(ringbuf.can_read(), Some(false));
500
501 unsafe {
505 RingBuffer::submit(data_pos as u64, RingBufferWakeupPolicy::DefaultWakeup);
506 }
507 assert_eq!(ringbuf.can_read(), Some(true));
508
509 let producer_pos = ringbuf.state().read().producer_position().load(Ordering::Acquire);
512 ringbuf.state().write().consumer_position().store(producer_pos, Ordering::Release);
513 assert_eq!(ringbuf.can_read(), Some(false));
514 }
515
516 #[fuchsia::test]
517 fn test_ring_buffer_wrapping() {
518 let schema = MapSchema {
519 map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
520 key_size: 0,
521 value_size: 0,
522 max_entries: 4096,
523 flags: MapFlags::empty(),
524 };
525
526 let ringbuf = RingBuffer::new(&schema, "test_ringbuf_wrapping").unwrap();
527
528 {
531 let state = ringbuf.state().read();
532 state.producer_position().store(0xFFFFFFFFFFFFFFF8, Ordering::Release);
533 state.consumer_position().store(0xFFFFFFFFFFFFFFF0, Ordering::Release);
534 }
535
536 {
542 let state = ringbuf.state().read();
543 state.producer_position().store(8, Ordering::Release);
544 }
545 let result = ringbuf.ringbuf_reserve(16, 0);
551 assert!(
552 result.is_ok(),
553 "Expected ringbuf_reserve to succeed despite wrapping, but got {:?}",
554 result
555 );
556 }
557
558 #[fuchsia::test]
559 fn test_ring_buffer_consumer_advance_wrap() {
560 let schema = MapSchema {
561 map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
562 key_size: 0,
563 value_size: 0,
564 max_entries: 4096,
565 flags: MapFlags::empty(),
566 };
567
568 let ringbuf = RingBuffer::new(&schema, "test_ringbuf_advance").unwrap();
569
570 {
572 let state = ringbuf.state().read();
573 state.producer_position().store(8, Ordering::Release);
574 state.consumer_position().store(0xFFFFFFFFFFFFFFF0, Ordering::Release);
575 }
576
577 {
579 let state = ringbuf.state().read();
580 state.consumer_position().store(0, Ordering::Release);
581 }
582
583 let result = ringbuf.ringbuf_reserve(16, 0);
590 assert!(result.is_ok(), "Expected ringbuf_reserve to succeed, but got {:?}", result);
591
592 let data_pos = result.unwrap();
594 let state = ringbuf.state().read();
595 let expected_pos = state.data_position(8) + BPF_RINGBUF_HDR_SZ as usize;
596 assert_eq!(data_pos, expected_pos);
597 }
598
599 #[fuchsia::test]
600 fn test_ring_buffer_unaligned_consumer() {
601 let schema = MapSchema {
602 map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
603 key_size: 0,
604 value_size: 0,
605 max_entries: 4096,
606 flags: MapFlags::empty(),
607 };
608
609 let ringbuf = RingBuffer::new(&schema, "test_ringbuf_unaligned").unwrap();
610
611 {
613 let state = ringbuf.state().read();
614 state.producer_position().store(8, Ordering::Release);
615 state.consumer_position().store(5, Ordering::Release);
616 }
617
618 let result = ringbuf.can_read();
620 assert_eq!(result, Some(false));
621 }
622
623 #[fuchsia::test]
624 fn test_ring_buffer_overflow() {
625 let schema = MapSchema {
626 map_type: linux_uapi::bpf_map_type_BPF_MAP_TYPE_RINGBUF,
627 key_size: 0,
628 value_size: 0,
629 max_entries: 4096,
630 flags: MapFlags::empty(),
631 };
632
633 let ringbuf = RingBuffer::new(&schema, "test_ringbuf").unwrap();
634
635 assert_eq!(ringbuf.ringbuf_reserve(0xffff_ffff, 0), Err(MapError::InvalidParam));
637 assert_eq!(ringbuf.ringbuf_reserve(0xffff_fff1, 0), Err(MapError::InvalidParam));
638 assert_eq!(ringbuf.ringbuf_reserve(0x3fff_ffff, 0), Err(MapError::SizeLimit));
639 }
640
641}