1use super::buffer::{MapBuffer, VmoOrName};
6use super::lock::RwMapLock;
7use super::vmar::AllocatedVmar;
8use super::{MapError, MapImpl, MapKey, MapValueRef};
9use ebpf::MapSchema;
10use linux_uapi::{
11 BPF_RB_FORCE_WAKEUP, BPF_RB_NO_WAKEUP, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT,
12 BPF_RINGBUF_HDR_SZ,
13};
14use static_assertions::const_assert;
15use std::fmt::Debug;
16use std::ops::Deref;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20use zx::AsHandleRef;
21
22pub const RINGBUF_SIGNAL: zx::Signals = zx::Signals::USER_0;
25
26const RINGBUF_LOCK_SIGNAL: zx::Signals = zx::Signals::USER_1;
27
28#[derive(Debug)]
29struct RingBufferState {
30 base_addr: usize,
32
33 mask: u32,
37}
38
39impl RingBufferState {
40 fn data_addr(&self) -> usize {
42 self.base_addr + 3 * *MapBuffer::PAGE_SIZE
43 }
44
45 fn consumer_position(&self) -> &AtomicU32 {
48 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE) as *const AtomicU32) }
51 }
52
53 fn producer_position(&self) -> &AtomicU32 {
56 unsafe { &*((self.base_addr + *MapBuffer::PAGE_SIZE * 2) as *const AtomicU32) }
59 }
60
61 fn data_position(&self, position: u32) -> usize {
63 self.data_addr() + ((position & self.mask) as usize)
64 }
65
66 fn is_consumer_position(&self, addr: usize) -> bool {
67 let Some(position) = addr.checked_sub(self.data_addr()) else {
68 return false;
69 };
70 let position = position as u32;
71 let consumer_position = self.consumer_position().load(Ordering::Acquire) & self.mask;
72 position == consumer_position
73 }
74
75 fn header_mut(&mut self, position: u32) -> &mut RingBufferRecordHeader {
77 #[allow(
82 clippy::undocumented_unsafe_blocks,
83 reason = "Force documented unsafe blocks in Starnix"
84 )]
85 unsafe {
86 &mut *(self.data_position(position) as *mut RingBufferRecordHeader)
87 }
88 }
89}
90
91#[derive(Debug)]
92pub(crate) struct RingBuffer {
93 vmo: Arc<zx::Vmo>,
97
98 vmar: AllocatedVmar,
102
103 mask: u32,
106}
107
108impl RingBuffer {
109 pub fn new(schema: &MapSchema, vmo: impl Into<VmoOrName>) -> Result<Pin<Box<Self>>, MapError> {
131 if schema.key_size != 0 || schema.value_size != 0 {
132 return Err(MapError::InvalidParam);
133 }
134
135 let page_size = *MapBuffer::PAGE_SIZE;
136 let size = schema.max_entries as usize;
138 if size == 0 || size % page_size != 0 || size & (size - 1) != 0 {
139 return Err(MapError::InvalidParam);
140 }
141 let mask: u32 = (size - 1).try_into().map_err(|_| MapError::InvalidParam)?;
142
143 let technical_vmo_size = page_size;
147
148 let control_pages_size = 3 * page_size;
150 let vmo_size = control_pages_size + size;
151
152 let kernel_root_vmar = fuchsia_runtime::vmar_root_self();
153 #[allow(
159 clippy::undocumented_unsafe_blocks,
160 reason = "Force documented unsafe blocks in Starnix"
161 )]
162 let vmar = unsafe {
163 AllocatedVmar::allocate(
164 &kernel_root_vmar,
165 0,
166 technical_vmo_size + control_pages_size + 2 * size,
168 zx::VmarFlags::CAN_MAP_SPECIFIC
169 | zx::VmarFlags::CAN_MAP_READ
170 | zx::VmarFlags::CAN_MAP_WRITE,
171 )
172 .map_err(|_| MapError::Internal)?
173 };
174 let technical_vmo =
175 zx::Vmo::create(technical_vmo_size as u64).map_err(|_| MapError::Internal)?;
176 technical_vmo.set_name(&zx::Name::new_lossy("ebpf:ring_buffer_technical_vmo")).unwrap();
177 vmar.map(
178 0,
179 &technical_vmo,
180 0,
181 page_size,
182 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
183 )
184 .map_err(|_| MapError::Internal)?;
185
186 let vmo = match vmo.into() {
187 VmoOrName::Vmo(vmo) => {
188 let actual_vmo_size = vmo.get_size().map_err(|_| MapError::InvalidVmo)? as usize;
189 if vmo_size != actual_vmo_size {
190 return Err(MapError::InvalidVmo);
191 }
192 vmo
193 }
194 VmoOrName::Name(name) => {
195 let vmo = zx::Vmo::create(vmo_size as u64).map_err(|e| match e {
196 zx::Status::NO_MEMORY | zx::Status::OUT_OF_RANGE => MapError::NoMemory,
197 _ => MapError::Internal,
198 })?;
199 let name = format!("ebpf:ring_buffer:{name}");
200 vmo.set_name(&zx::Name::new_lossy(&name)).unwrap();
201 vmo
202 }
203 };
204
205 vmar.map(
206 technical_vmo_size,
207 &vmo,
208 0,
209 vmo_size,
210 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
211 )
212 .map_err(|_| MapError::Internal)?;
213
214 vmar.map(
216 technical_vmo_size + vmo_size,
217 &vmo,
218 control_pages_size as u64,
219 size,
220 zx::VmarFlags::SPECIFIC | zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
221 )
222 .map_err(|_| MapError::Internal)?;
223
224 #[allow(
229 clippy::undocumented_unsafe_blocks,
230 reason = "Force documented unsafe blocks in Starnix"
231 )]
232 let storage_position = unsafe { &mut *(vmar.base() as *mut *const Self) };
233 let storage = Box::pin(Self { vmo: Arc::new(vmo), vmar, mask });
234 *storage_position = storage.deref();
238 Ok(storage)
239 }
240
241 fn state<'a>(&'a self) -> RwMapLock<'a, RingBufferState> {
242 let page_size = *MapBuffer::PAGE_SIZE;
243
244 unsafe {
252 let lock_cell = &*((self.vmar.base() + page_size) as *const AtomicU32);
253 RwMapLock::new(
254 lock_cell,
255 self.vmo.as_handle_ref(),
256 RINGBUF_LOCK_SIGNAL,
257 RingBufferState { base_addr: self.vmar.base() + page_size, mask: self.mask },
258 )
259 }
260 }
261
262 fn commit(
265 &self,
266 header: &RingBufferRecordHeader,
267 flags: RingBufferWakeupPolicy,
268 discard: bool,
269 ) {
270 let mut new_length = header.length.load(Ordering::Acquire) & !BPF_RINGBUF_BUSY_BIT;
271 if discard {
272 new_length |= BPF_RINGBUF_DISCARD_BIT;
273 }
274 header.length.store(new_length, Ordering::Release);
275
276 let state = self.state().read();
279 if flags == RingBufferWakeupPolicy::ForceWakeup
280 || (flags == RingBufferWakeupPolicy::DefaultWakeup
281 && state.is_consumer_position(header as *const RingBufferRecordHeader as usize))
282 {
283 self.vmo
284 .as_handle_ref()
285 .signal(zx::Signals::empty(), RINGBUF_SIGNAL)
286 .expect("Failed to set signal or a ring buffer VMO");
287 }
288 }
289
290 pub unsafe fn submit(addr: u64, flags: RingBufferWakeupPolicy) {
297 let addr = addr as usize;
298 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
299 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
300 ringbuf_storage.commit(header, flags, false);
301 }
302
303 pub unsafe fn discard(addr: u64, flags: RingBufferWakeupPolicy) {
310 let addr = addr as usize;
311 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
312 let (ringbuf_storage, header) = unsafe { Self::get_ringbug_and_header_by_addr(addr) };
313 ringbuf_storage.commit(header, flags, true);
314 }
315
316 unsafe fn get_ringbug_and_header_by_addr(
323 addr: usize,
324 ) -> (&'static RingBuffer, &'static RingBufferRecordHeader) {
325 let page_size = *MapBuffer::PAGE_SIZE;
326 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
328 let header = unsafe {
329 &*((addr - std::mem::size_of::<RingBufferRecordHeader>())
330 as *const RingBufferRecordHeader)
331 };
332 let addr_page = addr / page_size;
333 let mapping_start_page = addr_page - header.page_count as usize - 1;
334 let mapping_start_address = mapping_start_page * page_size;
335 #[allow(clippy::undocumented_unsafe_blocks, reason = "2024 edition migration")]
336 let ringbuf_impl = unsafe { &*(mapping_start_address as *const &RingBuffer) };
337 (ringbuf_impl, header)
338 }
339}
340
341impl MapImpl for RingBuffer {
342 fn lookup<'a>(&'a self, _key: &[u8]) -> Option<MapValueRef<'a>> {
343 None
344 }
345
346 fn update(&self, _key: MapKey, _value: &[u8], _flags: u64) -> Result<(), MapError> {
347 Err(MapError::InvalidParam)
348 }
349
350 fn delete(&self, _key: &[u8]) -> Result<(), MapError> {
351 Err(MapError::InvalidParam)
352 }
353
354 fn get_next_key(&self, _key: Option<&[u8]>) -> Result<MapKey, MapError> {
355 Err(MapError::InvalidParam)
356 }
357
358 fn vmo(&self) -> &Arc<zx::Vmo> {
359 &self.vmo
360 }
361
362 fn can_read(&self) -> Option<bool> {
363 let mut state = self.state().write();
364 let consumer_position = state.consumer_position().load(Ordering::Acquire);
365 let producer_position = state.producer_position().load(Ordering::Acquire);
366
367 let can_read = consumer_position < producer_position
369 && ((*state.header_mut(producer_position).length.get_mut()) & BPF_RINGBUF_BUSY_BIT
370 == 0);
371 Some(can_read)
372 }
373
374 fn ringbuf_reserve(&self, size: u32, flags: u64) -> Result<usize, MapError> {
375 if flags != 0 {
376 return Err(MapError::InvalidParam);
377 }
378
379 if size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT) > 0 {
381 return Err(MapError::InvalidParam);
382 }
383
384 let mut state = self.state().write();
385 let consumer_position = state.consumer_position().load(Ordering::Acquire);
386 let producer_position = state.producer_position().load(Ordering::Acquire);
387 let max_size = self.mask + 1;
388
389 let consumed_size =
391 producer_position.checked_sub(consumer_position).ok_or(MapError::InvalidParam)?;
392 let available_size = max_size.checked_sub(consumed_size).ok_or(MapError::InvalidParam)?;
393
394 const HEADER_ALIGNMENT: u32 = std::mem::size_of::<u64>() as u32;
395
396 let total_size: u32 = (size + BPF_RINGBUF_HDR_SZ + HEADER_ALIGNMENT - 1) / HEADER_ALIGNMENT
399 * HEADER_ALIGNMENT;
400
401 if total_size > available_size {
402 return Err(MapError::SizeLimit);
403 }
404 let data_position = state.data_position(producer_position) + BPF_RINGBUF_HDR_SZ as usize;
405 let data_length = size | BPF_RINGBUF_BUSY_BIT;
406 let page_count = ((data_position - state.data_addr()) / *MapBuffer::PAGE_SIZE + 3)
407 .try_into()
408 .map_err(|_| MapError::SizeLimit)?;
409 let header = state.header_mut(producer_position);
410 *header.length.get_mut() = data_length;
411 header.page_count = page_count;
412 state.producer_position().store(producer_position + total_size, Ordering::Release);
413 Ok(data_position)
414 }
415}
416
417#[repr(u32)]
418#[derive(Clone, Copy, Debug, PartialEq, Eq)]
419pub(crate) enum RingBufferWakeupPolicy {
420 DefaultWakeup = 0,
421 NoWakeup = BPF_RB_NO_WAKEUP,
422 ForceWakeup = BPF_RB_FORCE_WAKEUP,
423}
424
425impl From<u32> for RingBufferWakeupPolicy {
426 fn from(v: u32) -> Self {
427 match v {
428 BPF_RB_NO_WAKEUP => Self::NoWakeup,
429 BPF_RB_FORCE_WAKEUP => Self::ForceWakeup,
430 _ => Self::DefaultWakeup,
433 }
434 }
435}
436
437#[repr(C)]
438#[repr(align(8))]
439#[derive(Debug)]
440struct RingBufferRecordHeader {
441 length: AtomicU32,
442 page_count: u32,
443}
444
445const_assert!(std::mem::size_of::<RingBufferRecordHeader>() == BPF_RINGBUF_HDR_SZ as usize);
446
447#[cfg(test)]
448mod test {
449 use super::*;
450
451 #[fuchsia::test]
452 fn test_ring_buffer_wakeup_policy() {
453 assert_eq!(RingBufferWakeupPolicy::from(0), RingBufferWakeupPolicy::DefaultWakeup);
454 assert_eq!(
455 RingBufferWakeupPolicy::from(BPF_RB_NO_WAKEUP),
456 RingBufferWakeupPolicy::NoWakeup
457 );
458 assert_eq!(
459 RingBufferWakeupPolicy::from(BPF_RB_FORCE_WAKEUP),
460 RingBufferWakeupPolicy::ForceWakeup
461 );
462 assert_eq!(RingBufferWakeupPolicy::from(42), RingBufferWakeupPolicy::DefaultWakeup);
463 }
464}