1use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::MapSchema;
10use linux_uapi::{
11 BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12 BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
24
25const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
26
27#[derive(Debug)]
28struct RingBufferState {
29 base_addr: usize,
31
32 mask: u32,
36}
37
38impl RingBufferState {
39 fn data_addr(&self) -> usize {
41 self.base_addr + 3 * *MapBuffer::PAGE_SIZE
42 }
43
44 fn consumer_position(&self) -> &AtomicU32 {
47 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU32) }
50 }
51
52 fn producer_position(&self) -> &AtomicU32 {
55 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU32) }
58 }
59
60 fn data_position(&self, position: u32) -> usize {
62 self.data_addr() + ((position & self.mask) as usize)
63 }
64
65 fn is_consumer_position(&self, addr: usize) -> bool {
66 let Some(position) = addr.checked_sub(self.data_addr()) else {
67 return false;
68 };
69 let position = position as u32;
70 let consumer_position = self.consumer_position().load(Ordering::Acquire) & self.mask;
71 position == consumer_position
72 }
73
74 fn header_mut(&mut self, position: u32) -> &mut RingBufferRecordHeader {
76 #[allow(
81 clippy::undocumented_unsafe_blocks,
82 reason = "Force documented unsafe blocks in Starnix"
83 )]
84 unsafe {
85 &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
86 }
87 }
88}
89
90#[derive(Debug)]
91pub(crate) struct RingBuffer {
92 vmo: Arc<zx::Vmo>,
96
97 vmar: AllocatedVmar,
101
102 mask: u32,
105}
106
107impl RingBuffer {
108 pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
130 if schema.key_size != 0 || schema.value_size != 0 {
131 return Err(MapError::InvalidParam);
132 }
133
134 let page_size = *MapBuffer::PAGE_SIZE;
135 let size = schema.max_entries as usize;
137 if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
138 return Err(MapError::InvalidParam);
139 }
140 let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
141
142 let technical_vmo_size = page_size;
146
147 let control_pages_size = 3 * page_size;
149 let vmo_size = control_pages_size + size;
150
151 let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
152 #[allow(
158 clippy::undocumented_unsafe_blocks,
159 reason = "Force documented unsafe blocks in Starnix"
160 )]
161 let vmar = unsafe {
162 AllocatedVmar::allocate(
163 &kernel_root_vmar,
164 0,
165 technical_vmo_size + control_pages_size + 2 * size,
167 zx::VmarFlags::CAN_MAP_SPECIFIC
168 | zx::VmarFlags::CAN_MAP_READ
169 | zx::VmarFlags::CAN_MAP_WRITE,
170 )
171 .map_err(|_| MapError::Internal)?
172 };
173 let technical_vmo =
174 zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
175 technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
176 vmar.map(
177 0,
178 &technical_vmo,
179 0,
180 page_size,
181 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
182 )
183 .map_err(|_| MapError::Internal)?;
184
185 let vmo = match vmo.into() {
186 VmoOrName::Vmo(vmo) => {
187 let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
188 if vmo_size != actual_vmo_size {
189 return Err(MapError::InvalidVmo);
190 }
191 vmo
192 }
193 VmoOrName::Name(name) => {
194 let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
195 zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
196 _ => MapError::Internal,
197 })?;
198 let name = format!("ebpf:ring_buffer:{name}");
199 vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
200 vmo
201 }
202 };
203
204 vmar.map(
205 technical_vmo_size,
206 &vmo,
207 0,
208 vmo_size,
209 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
210 )
211 .map_err(|_| MapError::Internal)?;
212
213 vmar.map(
215 technical_vmo_size + vmo_size,
216 &vmo,
217 control_pages_size as u64,
218 size,
219 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
220 )
221 .map_err(|_| MapError::Internal)?;
222
223 #[allow(
228 clippy::undocumented_unsafe_blocks,
229 reason = "Force documented unsafe blocks in Starnix"
230 )]
231 let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
232 let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
233 *storage_position = storage.deref();
237 Ok(storage)
238 }
239
240 fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
241 let page_size = *MapBuffer::PAGE_SIZE;
242
243 unsafe {
251 let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
252 RwMapLock::new(
253 lock_cell,
254 self.vmo.as_handle_ref(),
255 RINGBUF_LOCK_SIGNAL,
256 RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
257 )
258 }
259 }
260
261 fn commit(
264 &self,
265 header: &RingBufferRecordHeader,
266 flags: RingBufferWakeupPolicy,
267 discard: bool,
268 ) {
269 let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
270 if discard {
271 new_length |= BPF_RINGBUF_DISCARD_BIT;
272 }
273 header.length.store(new_length, Ordering::Release);
274
275 let state = self.state().read();
278 if flags == RingBufferWakeupPolicy::ForceWakeup
279 || (flags == RingBufferWakeupPolicy::DefaultWakeup
280 && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
281 {
282 self.vmo
283 .as_handle_ref()
284 .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
285 .expect("Failed to set signal or a ring buffer VMO");
286 }
287 }
288
289 pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
296 let addr = addr as usize;
297 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
298 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
299 ringbuf_storage.commit(header, flags, false);
300 }
301
302 pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
309 let addr = addr as usize;
310 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
311 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
312 ringbuf_storage.commit(header, flags, true);
313 }
314
315 unsafe fn get_ringbug_and_header_by_addr(
322 addr: usize,
323 ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
324 let page_size = *MapBuffer::PAGE_SIZE;
325 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
327 let header = unsafe {
328 &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
329 as *const RingBufferRecordHeader)
330 };
331 let addr_page = addr / page_size;
332 let mapping_start_page = addr_page - header.page_count as usize - 1;
333 let mapping_start_address = mapping_start_page * page_size;
334 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
335 let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
336 (ringbuf_impl, header)
337 }
338}
339
340impl MapImpl for RingBuffer {
341 fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
342 None
343 }
344
345 fn update(&self, _key: MapKey, _value: &[u8], _flags: u64) -> Result<(), MapError> {
346 Err(MapError::InvalidParam)
347 }
348
349 fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
350 Err(MapError::InvalidParam)
351 }
352
353 fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
354 Err(MapError::InvalidParam)
355 }
356
357 fn vmo(&self) -> &Arc<zx::Vmo> {
358 &self.vmo
359 }
360
361 fn can_read(&self) -> Option<bool> {
362 let mut state = self.state().write();
363 let consumer_position = state.consumer_position().load(Ordering::Acquire);
364 let producer_position = state.producer_position().load(Ordering::Acquire);
365
366 let can_read = consumer_position < producer_position
368 && ((*state.header_mut(producer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
369 == 0);
370 Some(can_read)
371 }
372
373 fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
374 if flags != 0 {
375 return Err(MapError::InvalidParam);
376 }
377
378 if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
380 return Err(MapError::InvalidParam);
381 }
382
383 let mut state = self.state().write();
384 let consumer_position = state.consumer_position().load(Ordering::Acquire);
385 let producer_position = state.producer_position().load(Ordering::Acquire);
386 let max_size = self.mask + 1;
387
388 let consumed_size =
390 producer_position.checked_sub(consumer_position).ok_or(MapError::InvalidParam)?;
391 let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
392
393 const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
394
395 let total_size: u32 = (size + BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1) / HEADER_ALIGNMENT
398 * HEADER_ALIGNMENT;
399
400 if total_size > available_size {
401 return Err(MapError::SizeLimit);
402 }
403 let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
404 let data_length = size | BPF_RINGBUF_BUSY_BIT;
405 let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
406 .try_into()
407 .map_err(|_| MapError::SizeLimit)?;
408 let header = state.header_mut(producer_position);
409 *header.length.get_mut() = data_length;
410 header.page_count = page_count;
411 state.producer_position().store(producer_position + total_size, Ordering::Release);
412 Ok(data_position)
413 }
414}
415
416#[repr(u32)]
417#[derive(Clone, Copy, Debug, PartialEq, Eq)]
418pub(crate) enum RingBufferWakeupPolicy {
419 DefaultWakeup = 0,
420 NoWakeup = BPF_RB_NO_WAKEUP,
421 ForceWakeup = BPF_RB_FORCE_WAKEUP,
422}
423
424impl From<u32> for RingBufferWakeupPolicy {
425 fn from(v: u32) -> Self {
426 match v {
427 BPF_RB_NO_WAKEUP => Self::NoWakeup,
428 BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
429 _ => Self::DefaultWakeup,
432 }
433 }
434}
435
436#[repr(C)]
437#[repr(align(8))]
438#[derive(Debug)]
439struct RingBufferRecordHeader {
440 length: AtomicU32,
441 page_count: u32,
442}
443
444const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
445
446#[cfg(test)]
447mod test {
448 use super::*;
449
450 #[fuchsia::test]
451 fn test_ring_buffer_wakeup_policy() {
452 assert_eq!(RingBufferWakeupPolicy::from(0), RingBufferWakeupPolicy::DefaultWakeup);
453 assert_eq!(
454 RingBufferWakeupPolicy::from(BPF_RB_NO_WAKEUP),
455 RingBufferWakeupPolicy::NoWakeup
456 );
457 assert_eq!(
458 RingBufferWakeupPolicy::from(BPF_RB_FORCE_WAKEUP),
459 RingBufferWakeupPolicy::ForceWakeup
460 );
461 assert_eq!(RingBufferWakeupPolicy::from(42), RingBufferWakeupPolicy::DefaultWakeup);
462 }
463}