ring_buffer/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{self as fasync};
6use futures::task::AtomicWaker;
7use std::future::poll_fn;
8use std::ops::{Deref, Range};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::task::Poll;
12use thiserror::Error;
13
14/// Size of the kernel header in the ring buffer. This is different to the FXT header.
15pub const RING_BUFFER_MESSAGE_HEADER_SIZE: usize = 16;
16
17/// Maximum message size. This includes the ring buffer header. This is also the minimum capacity
18/// for the ring buffer.
19pub const MAX_MESSAGE_SIZE: usize = 65536;
20
21// The ring buffer consists of a head index, tail index on the first page. The ring buffer proper
22// starts from the second page. The head and tail indices never wrap; modulo arithmetic is used to
23// get to the actual offset in the buffer.
24//
25// Messages in the ring buffer consist of a 64 bit tag, followed by a 64 bit length. The remainder
26// of the message is a message in the diagnostics log format. The tag and length are written by the
27// kernel, so can be trusted whilst the rest of the message can't be. Messages will always be 64
28// bit aligned, but the length doesn't have to be, which means that when the tail index is advanced,
29// it must always be advanced to maintain that alignment.
30
31const HEAD_OFFSET: usize = 0; // Offset of the head index in the ring buffer.
32const TAIL_OFFSET: usize = 8; // Offset of the tail index in the ring buffer.
33
34/// RingBuffer wraps a IOBuffer shared region and mapping that uses the ring buffer discipline.
35pub struct RingBuffer {
36    shared_region: zx::vdso_next::IobSharedRegion,
37    _vmar: zx::Vmar,
38    base: usize,
39    capacity: usize,
40}
41
42#[derive(Eq, Error, Debug, PartialEq)]
43pub enum Error {
44    #[error("attempt to read a message at an unaligned index")]
45    BadAlignment,
46    #[error("there is not enough room to read the header")]
47    TooSmall,
48    #[error("bad message length (e.g. too big or beyond bounds)")]
49    BadLength,
50}
51
52impl RingBuffer {
53    /// Returns a new RingBuffer and Reader. `capacity` must be a multiple of the page size and
54    /// should be at least `MAX_MESSAGE_SIZE`. The `capacity` does not include the additional page
55    /// used to store the head and tail indices.
56    pub fn create(capacity: usize) -> Reader {
57        let page_size = zx::system_get_page_size() as usize;
58
59        assert_eq!(capacity % page_size, 0);
60        assert!(capacity >= MAX_MESSAGE_SIZE);
61
62        let shared_region_size = capacity + page_size;
63        let shared_region =
64            zx::vdso_next::IobSharedRegion::create(Default::default(), shared_region_size as u64)
65                .unwrap();
66
67        // We only need one endpoint.
68        let (iob, _) = zx::Iob::create(
69            Default::default(),
70            &[zx::IobRegion {
71                region_type: zx::IobRegionType::Shared {
72                    options: Default::default(),
73                    region: &shared_region,
74                },
75                access: zx::IobAccess::EP0_CAN_MAP_READ | zx::IobAccess::EP0_CAN_MAP_WRITE,
76                discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag: 0 },
77            }],
78        )
79        .unwrap();
80
81        let root_vmar = fuchsia_runtime::vmar_root_self();
82
83        // Map the buffer but repeat the mapping for the first 64 KiB of the buffer at the end
84        // which makes dealing with wrapping much easier.
85        //
86        // NOTE: dropping the vmar will drop the mappings.
87        let (vmar, base) = root_vmar
88            .allocate(
89                0,
90                shared_region_size + MAX_MESSAGE_SIZE,
91                zx::VmarFlags::CAN_MAP_READ
92                    | zx::VmarFlags::CAN_MAP_WRITE
93                    | zx::VmarFlags::CAN_MAP_SPECIFIC,
94            )
95            .unwrap();
96        vmar.map_iob(
97            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
98            /* vmar_offset */ 0,
99            &iob,
100            /* region_index */ 0,
101            /* region_offset */ 0,
102            /* region_len */ shared_region_size,
103        )
104        .unwrap();
105        vmar.map_iob(
106            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
107            /* vmar_offset */ shared_region_size,
108            &iob,
109            /* region_index */ 0,
110            /* region_offset */ zx::system_get_page_size() as u64,
111            /* region_len */ MAX_MESSAGE_SIZE,
112        )
113        .unwrap();
114
115        let this = Arc::new(Self { shared_region, _vmar: vmar, base, capacity });
116        Reader {
117            ring_buffer: this,
118            registration: fasync::EHandle::local().register_receiver(Receiver::default()),
119        }
120    }
121
122    /// Returns an IOBuffer that can be used to write to the ring buffer. A tuple is returned; the
123    /// first IOBuffer in the tuple can be written to. The second IOBuffer is the peer and cannot
124    /// be written to or mapped but it can be monitored for peer closed.
125    pub fn new_iob_writer(&self, tag: u64) -> Result<(zx::Iob, zx::Iob), zx::Status> {
126        zx::Iob::create(
127            Default::default(),
128            &[zx::IobRegion {
129                region_type: zx::IobRegionType::Shared {
130                    options: Default::default(),
131                    region: &self.shared_region,
132                },
133                access: zx::IobAccess::EP0_CAN_MEDIATED_WRITE,
134                discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag },
135            }],
136        )
137    }
138
139    /// Returns the capacity of the ring buffer.
140    pub fn capacity(&self) -> usize {
141        self.capacity
142    }
143
144    /// Returns the value of the head pointer, read with Acquire ordering (which will synchronise
145    /// with an update to the head pointer in the kernel that uses Release ordering).
146    pub fn head(&self) -> u64 {
147        // SAFETY: This should be aligned and we mapped base, so the pointer should be
148        // dereferenceable.
149        unsafe { (*((self.base + HEAD_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
150    }
151
152    /// Returns the value of the tail pointer, read with Acquire ordering (which will synchronise
153    /// with an update to the tail via `increment_tail` below; the kernel never changes the tail
154    /// pointer).
155    pub fn tail(&self) -> u64 {
156        // SAFETY: This should be aligned and we mapped base, so the pointer should be
157        // dereferenceable.
158        unsafe { (*((self.base + TAIL_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
159    }
160
161    /// Increments the tail pointer, synchronized with Release ordering (which will synchronise with
162    /// the kernel that reads the tail pointer using Acquire ordering). `amount` should always be
163    /// a multiple of 8. See also `ring_buffer_record_len`.
164    pub fn increment_tail(&self, amount: usize) {
165        assert_eq!(amount % 8, 0);
166        // SAFETY: This should be aligned and we mapped base, so the pointer should be
167        // dereferenceable.
168        unsafe {
169            (*((self.base + TAIL_OFFSET) as *const AtomicU64))
170                .fetch_add(amount as u64, Ordering::Release);
171        }
172    }
173
174    fn index_to_offset(&self, index: u64) -> usize {
175        (index % self.capacity as u64) as usize
176    }
177
178    /// Reads T at `index` in the buffer.
179    ///
180    /// # SAFETY
181    ///
182    /// `index` must have the same alignment as `T`. The read is non-atomic which means it is
183    /// undefined behaviour if there is concurrent write access to the same location (across all
184    /// processes).
185    pub unsafe fn read<T>(&self, index: u64) -> T {
186        debug_assert_eq!(index % std::mem::align_of::<T>() as u64, 0);
187        debug_assert!(std::mem::size_of::<T>() <= MAX_MESSAGE_SIZE);
188        let offset = self.index_to_offset(index);
189        unsafe { ((self.base + zx::system_get_page_size() as usize + offset) as *const T).read() }
190    }
191
192    /// Returns a slice for the first message in `range`.
193    ///
194    /// # SAFETY
195    ///
196    /// The reads are non-atomic so there can be no other concurrent write access to the same range
197    /// (across all processes). The returned slice will only remain valid so long as there is no
198    /// other concurrent write access to the range.
199    pub unsafe fn first_message_in(&self, range: Range<u64>) -> Result<(u64, &[u8]), Error> {
200        if !range.start.is_multiple_of(8) {
201            return Err(Error::BadAlignment);
202        }
203        if range.end - range.start < RING_BUFFER_MESSAGE_HEADER_SIZE as u64 {
204            return Err(Error::TooSmall);
205        }
206        let tag = unsafe { self.read(range.start) };
207        let message_len: u64 = unsafe { self.read(range.start + 8) };
208        if message_len
209            > std::cmp::min(range.end - range.start, MAX_MESSAGE_SIZE as u64)
210                - RING_BUFFER_MESSAGE_HEADER_SIZE as u64
211        {
212            return Err(Error::BadLength);
213        }
214        let index = self.index_to_offset(range.start + 16);
215        Ok((tag, unsafe {
216            std::slice::from_raw_parts(
217                (self.base + zx::system_get_page_size() as usize + index) as *const u8,
218                message_len as usize,
219            )
220        }))
221    }
222}
223
224/// Provides exclusive read access to the ring buffer.
225pub struct Reader {
226    ring_buffer: Arc<RingBuffer>,
227    registration: fasync::ReceiverRegistration<Receiver>,
228}
229
230impl Deref for Reader {
231    type Target = Arc<RingBuffer>;
232    fn deref(&self) -> &Self::Target {
233        &self.ring_buffer
234    }
235}
236
237impl Reader {
238    /// Waits for the head to exceed `index`. Returns head.
239    pub async fn wait(&mut self, index: u64) -> u64 {
240        poll_fn(|cx| {
241            // Check before registering the waker.
242            let head = self.head();
243            if head > index {
244                return Poll::Ready(head);
245            }
246            self.registration.waker.register(cx.waker());
247            if !self.registration.async_wait.swap(true, Ordering::Relaxed) {
248                self.shared_region
249                    .wait_async(
250                        self.registration.port(),
251                        self.registration.key(),
252                        zx::Signals::IOB_SHARED_REGION_UPDATED,
253                        zx::WaitAsyncOpts::empty(),
254                    )
255                    .unwrap();
256            }
257            // Check again in case there was a race.
258            let head = self.head();
259            if head > index { Poll::Ready(head) } else { Poll::Pending }
260        })
261        .await
262    }
263
264    /// Reads a message from `tail`. If no message is ready, this will wait. This will advance
265    /// the `tail`.
266    pub async fn read_message(&mut self) -> Result<(u64, Vec<u8>), Error> {
267        let tail = self.tail();
268        let head = self.wait(tail).await;
269        // SAFETY: There should be no other concurrent write access to this memory because writing
270        // is only allowed via `new_iob_writer` above, and that will always write beyond `head`.
271        let message = unsafe {
272            self.first_message_in(tail..head).map(|(tag, message)| (tag, message.to_vec()))?
273        };
274        self.increment_tail(ring_buffer_record_len(message.1.len()));
275        Ok(message)
276    }
277}
278
279#[derive(Default)]
280struct Receiver {
281    waker: AtomicWaker,
282    async_wait: AtomicBool,
283}
284
285impl fasync::PacketReceiver for Receiver {
286    fn receive_packet(&self, _packet: zx::Packet) {
287        self.async_wait.store(false, Ordering::Relaxed);
288        self.waker.wake();
289    }
290}
291
292/// Returns the ring buffer record length given the message length. This accounts for the ring
293/// buffer message header and any padding required to maintain alignment.
294pub fn ring_buffer_record_len(message_len: usize) -> usize {
295    RING_BUFFER_MESSAGE_HEADER_SIZE + message_len.next_multiple_of(8)
296}
297
298#[cfg(test)]
299mod tests {
300    use super::{Error, MAX_MESSAGE_SIZE, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
301    use futures::stream::FuturesUnordered;
302    use futures::{FutureExt, StreamExt};
303    use std::sync::atomic::{AtomicU64, Ordering};
304
305    #[fuchsia::test]
306    async fn read_message() {
307        const TAG: u64 = 56;
308        let mut reader = RingBuffer::create(128 * 1024);
309        let (iob, _) = reader.new_iob_writer(TAG).unwrap();
310        const DATA: &[u8] = b"test";
311        iob.write(Default::default(), 0, DATA).unwrap();
312        let (tag, data) = reader.read_message().await.expect("read_message failed");
313        assert_eq!(tag, TAG);
314        assert_eq!(&data, DATA);
315    }
316
317    #[fuchsia::test]
318    async fn writing_wakes_reader() {
319        const TAG: u64 = 56;
320        let mut reader = RingBuffer::create(128 * 1024);
321        let (iob, _) = reader.new_iob_writer(TAG).unwrap();
322
323        // Use FuturesUnordered so that it uses its own waker.
324        let mut read_message = FuturesUnordered::from_iter([reader.read_message()]);
325
326        // Poll the reader once to prime it.
327        assert!(read_message.next().now_or_never().is_none());
328
329        const DATA: &[u8] = b"test";
330        iob.write(Default::default(), 0, DATA).unwrap();
331
332        // Check the reader is woken.
333        let (tag, data) = read_message.next().await.unwrap().expect("read_message failed");
334        assert_eq!(tag, TAG);
335        assert_eq!(&data, DATA);
336    }
337
338    #[fuchsia::test]
339    async fn corrupt() {
340        let mut reader = RingBuffer::create(128 * 1024);
341
342        const HEAD_OFFSET: usize = 0;
343        const TAIL_OFFSET: usize = 8;
344        let message_len_offset: usize = zx::system_get_page_size() as usize + 8;
345
346        let base = reader.base;
347        let write_u64 = |offset, value| unsafe {
348            (*((base + offset) as *const AtomicU64)).store(value, Ordering::Release);
349        };
350
351        // Unaligned tail
352        write_u64(TAIL_OFFSET, 1);
353        write_u64(HEAD_OFFSET, 8);
354        assert_eq!(reader.read_message().await, Err(Error::BadAlignment));
355
356        // Too small.
357        write_u64(TAIL_OFFSET, 0);
358        assert_eq!(reader.read_message().await, Err(Error::TooSmall));
359
360        // Exceeds max message size.
361        write_u64(HEAD_OFFSET, 32);
362        write_u64(
363            message_len_offset,
364            (MAX_MESSAGE_SIZE + RING_BUFFER_MESSAGE_HEADER_SIZE + 1) as u64,
365        );
366        assert_eq!(reader.read_message().await, Err(Error::BadLength));
367
368        // Message too big vs head - tail.
369        write_u64(message_len_offset, 17);
370        assert_eq!(reader.read_message().await, Err(Error::BadLength));
371
372        // And finally, a valid message, just to make sure there isn't another issue.
373        write_u64(message_len_offset, 16);
374        assert!(reader.read_message().await.is_ok());
375    }
376}