1use fuchsia_async::{self as fasync};
6use futures::task::AtomicWaker;
7use std::future::poll_fn;
8use std::ops::{Deref, Range};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::task::Poll;
12use thiserror::Error;
13
14pub const RING_BUFFER_MESSAGE_HEADER_SIZE: usize = 16;
16
17pub const MAX_MESSAGE_SIZE: usize = 65536;
20
21const HEAD_OFFSET: usize = 0; const TAIL_OFFSET: usize = 8; pub struct RingBuffer {
36 shared_region: zx::vdso_next::IobSharedRegion,
37 _vmar: zx::Vmar,
38 base: usize,
39 capacity: usize,
40}
41
42#[derive(Eq, Error, Debug, PartialEq)]
43pub enum Error {
44 #[error("attempt to read a message at an unaligned index")]
45 BadAlignment,
46 #[error("there is not enough room to read the header")]
47 TooSmall,
48 #[error("bad message length (e.g. too big or beyond bounds)")]
49 BadLength,
50}
51
52impl RingBuffer {
53 pub fn create(capacity: usize) -> Reader {
57 let page_size = zx::system_get_page_size() as usize;
58
59 assert_eq!(capacity % page_size, 0);
60 assert!(capacity >= MAX_MESSAGE_SIZE);
61
62 let shared_region_size = capacity + page_size;
63 let shared_region =
64 zx::vdso_next::IobSharedRegion::create(Default::default(), shared_region_size as u64)
65 .unwrap();
66
67 let (iob, _) = zx::Iob::create(
69 Default::default(),
70 &[zx::IobRegion {
71 region_type: zx::IobRegionType::Shared {
72 options: Default::default(),
73 region: &shared_region,
74 },
75 access: zx::IobAccess::EP0_CAN_MAP_READ | zx::IobAccess::EP0_CAN_MAP_WRITE,
76 discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag: 0 },
77 }],
78 )
79 .unwrap();
80
81 let root_vmar = fuchsia_runtime::vmar_root_self();
82
83 let (vmar, base) = root_vmar
88 .allocate(
89 0,
90 shared_region_size + MAX_MESSAGE_SIZE,
91 zx::VmarFlags::CAN_MAP_READ
92 | zx::VmarFlags::CAN_MAP_WRITE
93 | zx::VmarFlags::CAN_MAP_SPECIFIC,
94 )
95 .unwrap();
96 vmar.map_iob(
97 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
98 0,
99 &iob,
100 0,
101 0,
102 shared_region_size,
103 )
104 .unwrap();
105 vmar.map_iob(
106 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
107 shared_region_size,
108 &iob,
109 0,
110 zx::system_get_page_size() as u64,
111 MAX_MESSAGE_SIZE,
112 )
113 .unwrap();
114
115 let this = Arc::new(Self { shared_region, _vmar: vmar, base, capacity });
116 Reader {
117 ring_buffer: this,
118 registration: fasync::EHandle::local().register_receiver(Receiver::default()),
119 }
120 }
121
122 pub fn new_iob_writer(&self, tag: u64) -> Result<(zx::Iob, zx::Iob), zx::Status> {
126 zx::Iob::create(
127 Default::default(),
128 &[zx::IobRegion {
129 region_type: zx::IobRegionType::Shared {
130 options: Default::default(),
131 region: &self.shared_region,
132 },
133 access: zx::IobAccess::EP0_CAN_MEDIATED_WRITE,
134 discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag },
135 }],
136 )
137 }
138
139 pub fn capacity(&self) -> usize {
141 self.capacity
142 }
143
144 pub fn head(&self) -> u64 {
147 unsafe { (*((self.base + HEAD_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
150 }
151
152 pub fn tail(&self) -> u64 {
156 unsafe { (*((self.base + TAIL_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
159 }
160
161 pub fn increment_tail(&self, amount: usize) {
165 assert_eq!(amount % 8, 0);
166 unsafe {
169 (*((self.base + TAIL_OFFSET) as *const AtomicU64))
170 .fetch_add(amount as u64, Ordering::Release);
171 }
172 }
173
174 fn index_to_offset(&self, index: u64) -> usize {
175 (index % self.capacity as u64) as usize
176 }
177
178 pub unsafe fn read<T>(&self, index: u64) -> T {
186 debug_assert_eq!(index % std::mem::align_of::<T>() as u64, 0);
187 debug_assert!(std::mem::size_of::<T>() <= MAX_MESSAGE_SIZE);
188 let offset = self.index_to_offset(index);
189 unsafe { ((self.base + zx::system_get_page_size() as usize + offset) as *const T).read() }
190 }
191
192 pub unsafe fn first_message_in(&self, range: Range<u64>) -> Result<(u64, &[u8]), Error> {
200 if !range.start.is_multiple_of(8) {
201 return Err(Error::BadAlignment);
202 }
203 if range.end - range.start < RING_BUFFER_MESSAGE_HEADER_SIZE as u64 {
204 return Err(Error::TooSmall);
205 }
206 let tag = unsafe { self.read(range.start) };
207 let message_len: u64 = unsafe { self.read(range.start + 8) };
208 if message_len
209 > std::cmp::min(range.end - range.start, MAX_MESSAGE_SIZE as u64)
210 - RING_BUFFER_MESSAGE_HEADER_SIZE as u64
211 {
212 return Err(Error::BadLength);
213 }
214 let index = self.index_to_offset(range.start + 16);
215 Ok((tag, unsafe {
216 std::slice::from_raw_parts(
217 (self.base + zx::system_get_page_size() as usize + index) as *const u8,
218 message_len as usize,
219 )
220 }))
221 }
222}
223
224pub struct Reader {
226 ring_buffer: Arc<RingBuffer>,
227 registration: fasync::ReceiverRegistration<Receiver>,
228}
229
230impl Deref for Reader {
231 type Target = Arc<RingBuffer>;
232 fn deref(&self) -> &Self::Target {
233 &self.ring_buffer
234 }
235}
236
237impl Reader {
238 pub async fn wait(&mut self, index: u64) -> u64 {
240 poll_fn(|cx| {
241 let head = self.head();
243 if head > index {
244 return Poll::Ready(head);
245 }
246 self.registration.waker.register(cx.waker());
247 if !self.registration.async_wait.swap(true, Ordering::Relaxed) {
248 self.shared_region
249 .wait_async(
250 self.registration.port(),
251 self.registration.key(),
252 zx::Signals::IOB_SHARED_REGION_UPDATED,
253 zx::WaitAsyncOpts::empty(),
254 )
255 .unwrap();
256 }
257 let head = self.head();
259 if head > index { Poll::Ready(head) } else { Poll::Pending }
260 })
261 .await
262 }
263
264 pub async fn read_message(&mut self) -> Result<(u64, Vec<u8>), Error> {
267 let tail = self.tail();
268 let head = self.wait(tail).await;
269 let message = unsafe {
272 self.first_message_in(tail..head).map(|(tag, message)| (tag, message.to_vec()))?
273 };
274 self.increment_tail(ring_buffer_record_len(message.1.len()));
275 Ok(message)
276 }
277}
278
279#[derive(Default)]
280struct Receiver {
281 waker: AtomicWaker,
282 async_wait: AtomicBool,
283}
284
285impl fasync::PacketReceiver for Receiver {
286 fn receive_packet(&self, _packet: zx::Packet) {
287 self.async_wait.store(false, Ordering::Relaxed);
288 self.waker.wake();
289 }
290}
291
292pub fn ring_buffer_record_len(message_len: usize) -> usize {
295 RING_BUFFER_MESSAGE_HEADER_SIZE + message_len.next_multiple_of(8)
296}
297
298#[cfg(test)]
299mod tests {
300 use super::{Error, MAX_MESSAGE_SIZE, RING_BUFFER_MESSAGE_HEADER_SIZE, RingBuffer};
301 use futures::stream::FuturesUnordered;
302 use futures::{FutureExt, StreamExt};
303 use std::sync::atomic::{AtomicU64, Ordering};
304
305 #[fuchsia::test]
306 async fn read_message() {
307 const TAG: u64 = 56;
308 let mut reader = RingBuffer::create(128 * 1024);
309 let (iob, _) = reader.new_iob_writer(TAG).unwrap();
310 const DATA: &[u8] = b"test";
311 iob.write(Default::default(), 0, DATA).unwrap();
312 let (tag, data) = reader.read_message().await.expect("read_message failed");
313 assert_eq!(tag, TAG);
314 assert_eq!(&data, DATA);
315 }
316
317 #[fuchsia::test]
318 async fn writing_wakes_reader() {
319 const TAG: u64 = 56;
320 let mut reader = RingBuffer::create(128 * 1024);
321 let (iob, _) = reader.new_iob_writer(TAG).unwrap();
322
323 let mut read_message = FuturesUnordered::from_iter([reader.read_message()]);
325
326 assert!(read_message.next().now_or_never().is_none());
328
329 const DATA: &[u8] = b"test";
330 iob.write(Default::default(), 0, DATA).unwrap();
331
332 let (tag, data) = read_message.next().await.unwrap().expect("read_message failed");
334 assert_eq!(tag, TAG);
335 assert_eq!(&data, DATA);
336 }
337
338 #[fuchsia::test]
339 async fn corrupt() {
340 let mut reader = RingBuffer::create(128 * 1024);
341
342 const HEAD_OFFSET: usize = 0;
343 const TAIL_OFFSET: usize = 8;
344 let message_len_offset: usize = zx::system_get_page_size() as usize + 8;
345
346 let base = reader.base;
347 let write_u64 = |offset, value| unsafe {
348 (*((base + offset) as *const AtomicU64)).store(value, Ordering::Release);
349 };
350
351 write_u64(TAIL_OFFSET, 1);
353 write_u64(HEAD_OFFSET, 8);
354 assert_eq!(reader.read_message().await, Err(Error::BadAlignment));
355
356 write_u64(TAIL_OFFSET, 0);
358 assert_eq!(reader.read_message().await, Err(Error::TooSmall));
359
360 write_u64(HEAD_OFFSET, 32);
362 write_u64(
363 message_len_offset,
364 (MAX_MESSAGE_SIZE + RING_BUFFER_MESSAGE_HEADER_SIZE + 1) as u64,
365 );
366 assert_eq!(reader.read_message().await, Err(Error::BadLength));
367
368 write_u64(message_len_offset, 17);
370 assert_eq!(reader.read_message().await, Err(Error::BadLength));
371
372 write_u64(message_len_offset, 16);
374 assert!(reader.read_message().await.is_ok());
375 }
376}