1use crate::mm::memory::MemoryObject;
7use crate::vfs::OutputBuffer;
8use fuchsia_runtime::vmar_root_self;
9use fuchsia_trace;
10use shared_buffer::SharedBuffer;
11use starnix_logging::{log_error, log_info, log_warn};
12use starnix_sync::Mutex;
13use starnix_types::PAGE_SIZE;
14use starnix_uapi::errors::Errno;
15use starnix_uapi::{errno, error, from_status_like_fdio};
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18struct Node {
19 page_data: SharedBuffer,
21 next: AtomicUsize,
24 prev: AtomicUsize,
25 write_offset: AtomicUsize,
27 active_writers: AtomicUsize,
29}
30
31const FLAG_MASK: usize = 0b11 << 62;
32const FLAG_NORMAL: usize = 0b00 << 62;
34const FLAG_HEADER: usize = 0b01 << 62;
36const FLAG_UPDATE: usize = 0b10 << 62;
38
39const PAGE_ACTIVE_BIT: usize = 1 << 63;
42const PAGE_FINALIZED_BIT: usize = 1 << 62;
44const FLAGS_MASK: usize = PAGE_ACTIVE_BIT | PAGE_FINALIZED_BIT;
45const ACTIVE_WRITERS_MASK: usize = !FLAGS_MASK;
46
47const SPIN_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_micros(50);
52
53const PROGRESSIVE_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(10);
57
58impl Node {
59 fn get_index(val: usize) -> usize {
60 val & !FLAG_MASK
61 }
62 fn get_flags(val: usize) -> usize {
63 val & FLAG_MASK
64 }
65 fn make_val(index: usize, flags: usize) -> usize {
66 index | flags
67 }
68
69 fn finalize(&self) {
71 let old_val = self.active_writers.fetch_or(PAGE_FINALIZED_BIT, Ordering::AcqRel);
74 if old_val & PAGE_FINALIZED_BIT != 0 {
75 return;
76 }
77
78 let write_offset = self.write_offset.load(Ordering::Acquire);
83 let data_size = std::cmp::min(write_offset, (*PAGE_SIZE) as usize)
84 - LocklessRingBuffer::PAGE_HEADER_SIZE;
85 self.page_data.write_at(8, &(data_size as u64).to_le_bytes());
86
87 self.active_writers.fetch_and(!PAGE_ACTIVE_BIT, Ordering::Release);
91 }
92
93 fn release_writer(&self) {
95 let prev_writers = self.active_writers.fetch_sub(1, Ordering::Release);
98
99 if prev_writers == PAGE_ACTIVE_BIT | 1 {
103 let write_offset = self.write_offset.load(Ordering::Acquire);
104 if write_offset >= (*PAGE_SIZE) as usize {
105 self.finalize();
111 }
112 }
113 }
114}
115
116static_assertions::const_assert!(std::mem::size_of::<usize>() == 8);
118
119const RING_ENABLED_BIT: usize = 1 << 63;
122
123pub struct LocklessRingBuffer {
124 vmo: MemoryObject,
125 mapping: SharedBuffer,
126 nodes: Vec<Node>,
128 head_page: AtomicUsize,
130 tail_page: AtomicUsize,
132
133 reader_page: AtomicUsize,
135
136 ref_count: AtomicUsize,
139 overwrite: bool,
142 dropped_pages: std::sync::atomic::AtomicU64,
144 prev_timestamp: std::sync::atomic::AtomicU64,
147 write_event_async_id: fuchsia_trace::Id,
149 reader_active: std::sync::atomic::AtomicBool,
153 state_mutex: Mutex<()>,
156}
157impl LocklessRingBuffer {
158 pub const PAGE_HEADER_SIZE: usize = 16;
160
161 pub fn new(
166 size_bytes: usize,
167 overwrite: bool,
168 write_event_async_id: fuchsia_trace::Id,
169 ) -> Result<Self, Errno> {
170 let requested_pages = (size_bytes + (*PAGE_SIZE) as usize - 1) / (*PAGE_SIZE) as usize;
171 let pages = std::cmp::max(3, requested_pages);
173 let total_nodes = pages;
174 let capacity = total_nodes * (*PAGE_SIZE) as usize;
175
176 let vmo: MemoryObject =
178 zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, capacity as u64)
179 .map_err(|_| errno!(ENOMEM))?
180 .into();
181 let vmo = vmo.with_zx_name(b"starnix:tracefs");
182 let addr = vmar_root_self()
184 .map(
185 0,
186 vmo.as_vmo().expect("vmo must exist"),
187 0,
188 capacity,
189 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
190 )
191 .map_err(|e| from_status_like_fdio!(e))?;
192 let mapping = unsafe { SharedBuffer::new(addr as *mut u8, capacity) };
194
195 let mut nodes = Vec::with_capacity(total_nodes);
197 let base_ptr = addr as *mut u8;
198 for i in 0..total_nodes {
199 let page_ptr = unsafe { base_ptr.add(i * (*PAGE_SIZE) as usize) };
203
204 nodes.push(Node {
205 page_data: unsafe { SharedBuffer::new(page_ptr, (*PAGE_SIZE) as usize) },
207 next: AtomicUsize::new(0),
208 prev: AtomicUsize::new(0),
209 write_offset: AtomicUsize::new(LocklessRingBuffer::PAGE_HEADER_SIZE),
210 active_writers: AtomicUsize::new(PAGE_ACTIVE_BIT),
212 });
213 }
214 let circle_size = pages - 1;
216 for i in 0..circle_size {
217 let next_idx = (i + 1) % circle_size;
218 let prev_idx = (i + circle_size - 1) % circle_size;
219 nodes[i].next.store(Node::make_val(next_idx, FLAG_NORMAL), Ordering::Relaxed);
220 nodes[i].prev.store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
221 }
222 nodes[circle_size - 1].next.store(Node::make_val(0, FLAG_HEADER), Ordering::Relaxed);
224 nodes[pages - 1].next.store(Node::make_val(0, FLAG_NORMAL), Ordering::Relaxed);
226 nodes[pages - 1]
227 .prev
228 .store(Node::make_val(circle_size - 1, FLAG_NORMAL), Ordering::Relaxed);
229 nodes[pages - 1].page_data.write_at(8, &0u64.to_le_bytes());
231 let buffer = Self {
232 vmo,
233 mapping,
234 nodes,
235 head_page: AtomicUsize::new(0),
236 tail_page: AtomicUsize::new(0),
237 reader_page: AtomicUsize::new(pages - 1),
238
239 ref_count: AtomicUsize::new(RING_ENABLED_BIT),
240 overwrite,
241 dropped_pages: std::sync::atomic::AtomicU64::new(0),
242 prev_timestamp: std::sync::atomic::AtomicU64::new(
243 zx::BootInstant::get().into_nanos() as u64
244 ),
245 write_event_async_id,
246 reader_active: std::sync::atomic::AtomicBool::new(false),
247 state_mutex: Mutex::new(()),
248 };
249 Ok(buffer)
250 }
251 pub fn dropped_pages(&self) -> u64 {
252 self.dropped_pages.load(Ordering::Relaxed)
253 }
254}
255#[derive(Default, Debug)]
258struct YieldTracker {
259 update_flag: u64,
261 node_match: u64,
263 head_lock: u64,
265}
266impl YieldTracker {
267 fn total(&self) -> u64 {
268 self.update_flag + self.node_match + self.head_lock
269 }
270
271 fn yield_or_sleep(&self) {
273 let total = self.total();
274 if total > 1000 {
275 std::thread::sleep(PROGRESSIVE_SLEEP_DURATION);
276 } else if total > 100 {
277 std::thread::sleep(SPIN_SLEEP_DURATION);
278 } else {
279 std::thread::yield_now();
280 }
281 }
282}
283
284pub struct Reservation<'a> {
286 pub offset: usize,
287 pub node_idx: usize,
288 pub size: usize,
289 buffer: &'a LocklessRingBuffer,
290 committed: bool,
291}
292
293impl<'a> Reservation<'a> {
294 pub fn write_at(&self, rel_offset: usize, data: &[u8]) {
295 assert!(rel_offset + data.len() <= self.size, "Write exceeds reservation size");
296 self.buffer.mapping.write_at(self.offset + rel_offset, data);
297 }
298
299 fn release(&mut self) {
300 if self.committed {
301 return;
302 }
303
304 let node_idx = self.node_idx;
305 let node = &self.buffer.nodes[node_idx];
306
307 node.release_writer();
308
309 self.buffer.ref_count.fetch_sub(1, Ordering::Release);
310 self.committed = true;
311 }
312}
313
314impl<'a> Drop for Reservation<'a> {
315 fn drop(&mut self) {
316 if !self.committed {
317 starnix_logging::log_warn!("LocklessRingBuffer: Reservation dropped without commit");
318 }
319 self.release();
320 }
321}
322
323impl<'a> std::fmt::Debug for Reservation<'a> {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 f.debug_struct("Reservation")
326 .field("offset", &self.offset)
327 .field("node_idx", &self.node_idx)
328 .field("size", &self.size)
329 .field("committed", &self.committed)
330 .finish()
331 }
332}
333#[derive(Debug, PartialEq, Eq)]
334enum AdvanceResult {
335 Advanced,
337 Yielded,
339 Error(Errno),
341}
342
343impl LocklessRingBuffer {
344 fn try_reserve_on_page(
348 &self,
349 tail_node: &Node,
350 size: usize,
351 ) -> Result<(usize, zx::BootInstant, zx::Duration<zx::BootTimeline>), ()> {
352 if !self.is_enabled() {
353 log_warn!(
354 "LocklessRingBuffer: canceling try_reserve_on_page because ring is disabled."
355 );
356 return Err(());
357 }
358
359 let mut current_offset = tail_node.write_offset.load(Ordering::Acquire);
364 loop {
365 if current_offset + size > (*PAGE_SIZE) as usize {
366 return Err(());
367 }
368 match tail_node.write_offset.compare_exchange_weak(
369 current_offset,
370 current_offset + size,
371 Ordering::AcqRel,
372 Ordering::Acquire,
373 ) {
374 Ok(_) => break,
375 Err(actual) => current_offset = actual,
376 }
377 }
378
379 let now_candidate = zx::BootInstant::get().into_nanos() as u64;
380 let actual_prev = self.prev_timestamp.fetch_max(now_candidate, Ordering::AcqRel);
381 let final_now_nanos = std::cmp::max(now_candidate, actual_prev);
382 let now = zx::BootInstant::from_nanos(final_now_nanos as i64);
383
384 let delta_nanos = if current_offset == LocklessRingBuffer::PAGE_HEADER_SIZE {
386 tail_node.page_data.write_at(0, &final_now_nanos.to_le_bytes());
387 0
388 } else {
389 final_now_nanos.saturating_sub(actual_prev)
390 };
391
392 let delta = zx::Duration::from_nanos(delta_nanos as i64);
393
394 Ok((current_offset, now, delta))
395 }
396 fn advance_to_next_page(
401 &self,
402 tail_node: &Node,
403 tail_val: usize,
404 yield_tracker: &mut YieldTracker,
405 ) -> AdvanceResult {
406 if self.tail_page.load(Ordering::Acquire) != tail_val {
408 return AdvanceResult::Advanced;
409 }
410
411 if self.is_enabled() {
417 tail_node.finalize();
418 }
419
420 let next_val = tail_node.next.load(Ordering::Acquire);
421 let next_idx = Node::get_index(next_val);
422 let next_flags = Node::get_flags(next_val);
423 if next_flags == FLAG_UPDATE {
424 starnix_logging::log_warn!(
426 "Reservation yielding due to FLAG_UPDATE on node {}",
427 next_idx
428 );
429 yield_tracker.update_flag += 1;
430 yield_tracker.yield_or_sleep();
431 return AdvanceResult::Yielded;
432 } else if next_flags == FLAG_HEADER {
433 if self.overwrite {
434 if (self.nodes[next_idx].active_writers.load(Ordering::Acquire)
436 & ACTIVE_WRITERS_MASK)
437 > 0
438 {
439 yield_tracker.node_match += 1;
440 yield_tracker.yield_or_sleep();
441 return AdvanceResult::Yielded;
442 }
443
444 starnix_logging::log_warn!(
445 "LocklessRingBuffer Overwriting page {} (overwriting={})",
446 next_idx,
447 self.overwrite
448 );
449 let expected_next = Node::make_val(next_idx, FLAG_HEADER);
451 let locked_next = Node::make_val(next_idx, FLAG_UPDATE);
452 if tail_node
457 .next
458 .compare_exchange_weak(
459 expected_next,
460 locked_next,
461 Ordering::AcqRel,
462 Ordering::Relaxed,
463 )
464 .is_ok()
465 {
466 let head_node = &self.nodes[next_idx];
468 let head_next_val = head_node.next.load(Ordering::Acquire);
469 let head_next_idx = Node::get_index(head_next_val);
470
471 let new_head_next = Node::make_val(head_next_idx, FLAG_HEADER);
473 head_node.next.store(new_head_next, Ordering::Release);
474
475 self.head_page.store(head_next_idx, Ordering::Release);
477
478 let unlocked_next = Node::make_val(next_idx, FLAG_NORMAL);
480 tail_node.next.store(unlocked_next, Ordering::Release);
481 self.dropped_pages.fetch_add(1, Ordering::Relaxed);
482 fuchsia_trace::async_instant!(
483 self.write_event_async_id,
484 "starnix:trace_meta",
485 "page dropped"
486 );
487 return AdvanceResult::Advanced;
488 } else {
489 yield_tracker.head_lock += 1;
491 yield_tracker.yield_or_sleep();
492 return AdvanceResult::Yielded;
493 }
494 } else {
495 starnix_logging::log_error!("LocklessRingBuffer is full");
497 return AdvanceResult::Error(errno!(ENOSPC));
498 }
499 }
500
501 let next_node = &self.nodes[next_idx];
502 let (should_advance, won_reset) = {
506 let mut current = next_node.write_offset.load(Ordering::Acquire);
507 loop {
508 if self.tail_page.load(Ordering::Acquire) != tail_val {
510 break (false, false);
511 }
512 if current == LocklessRingBuffer::PAGE_HEADER_SIZE {
517 break (true, false);
518 }
519 match next_node.write_offset.compare_exchange_weak(
520 current,
521 LocklessRingBuffer::PAGE_HEADER_SIZE,
522 Ordering::AcqRel,
523 Ordering::Relaxed,
524 ) {
525 Ok(_) => break (true, true),
526 Err(actual) => current = actual,
527 }
528 }
529 };
530
531 if should_advance {
532 if won_reset {
533 next_node.active_writers.store(PAGE_ACTIVE_BIT, Ordering::Release);
535 next_node.page_data.write_at(8, &0u64.to_le_bytes());
537 }
538
539 if let Err(err) = self.tail_page.compare_exchange(
542 tail_val,
543 next_val,
544 Ordering::AcqRel,
545 Ordering::Relaxed,
546 ) {
547 starnix_logging::log_debug!("Tail page already advanced by another thread: {err}");
548 }
549 }
550 AdvanceResult::Advanced
551 }
552
553 pub fn reserve(
554 &self,
555 size: usize,
556 ) -> Result<(Reservation<'_>, zx::BootInstant, zx::Duration<zx::BootTimeline>), Errno> {
557 if size == 0 || size > (*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE {
559 return error!(EINVAL);
560 }
561 let mut val = self.ref_count.load(Ordering::Acquire);
563 loop {
564 if val & RING_ENABLED_BIT == 0 {
565 return error!(ENOMEM);
566 }
567 match self.ref_count.compare_exchange_weak(
578 val,
579 val + 1,
580 Ordering::Acquire,
581 Ordering::Relaxed,
582 ) {
583 Ok(_) => break,
584 Err(actual) => val = actual,
585 }
586 }
587
588 let ref_count_guard = scopeguard::guard(&self.ref_count, |ref_count| {
590 ref_count.fetch_sub(1, Ordering::Release);
591 });
592
593 let mut yield_tracker = YieldTracker::default();
595 let result = loop {
596 if !self.is_enabled() {
597 log_info!("LocklessRingBuffer: canceling reserve because ring is disabled.");
598 break error!(ENOMEM);
599 }
600 let tail_val = self.tail_page.load(Ordering::Acquire);
601 let tail_idx = Node::get_index(tail_val);
602 let tail_node = &self.nodes[tail_idx];
603
604 let mut active_incremented = false;
610 let mut old_writers = tail_node.active_writers.load(Ordering::Acquire);
611 loop {
612 if old_writers & PAGE_ACTIVE_BIT == 0 {
613 break;
614 }
615 match tail_node.active_writers.compare_exchange_weak(
616 old_writers,
617 old_writers + 1,
618 Ordering::Relaxed,
619 Ordering::Relaxed,
620 ) {
621 Ok(_) => {
622 active_incremented = true;
623 break;
624 }
625 Err(actual) => old_writers = actual,
626 }
627 }
628
629 let reservation_result = if active_incremented {
630 self.try_reserve_on_page(tail_node, size)
631 } else {
632 Err(())
633 };
634
635 match reservation_result {
637 Ok((offset, now, delta)) => {
638 break Ok((
639 Reservation {
640 offset: tail_idx * (*PAGE_SIZE) as usize + offset,
641 node_idx: tail_idx,
642 size,
643 buffer: self,
644 committed: false,
645 },
646 now,
647 delta,
648 ));
649 }
650 Err(()) => {
651 if active_incremented {
657 self.nodes[tail_idx].release_writer();
658 }
659 match self.advance_to_next_page(tail_node, tail_val, &mut yield_tracker) {
661 AdvanceResult::Advanced | AdvanceResult::Yielded => {}
662 AdvanceResult::Error(e) => {
663 break Err(e);
664 }
665 }
666 }
667 }
668 if yield_tracker.total() % 100_000_000 == 0 && yield_tracker.total() > 0 {
670 log_warn!(
671 "LocklessRingBuffer: spinning in reservation loop. details = {yield_tracker:?}",
672 );
673 }
674 };
675 let total_yields = yield_tracker.total();
677 if total_yields > 500_000 {
678 starnix_logging::log_info!(
679 "Reservation completed with yields: total={}, details={:?}",
680 total_yields,
681 yield_tracker
682 );
683 }
684 match result {
685 Ok((res, now, delta)) => {
686 scopeguard::ScopeGuard::into_inner(ref_count_guard);
688 Ok((res, now, delta))
689 }
690 Err(e) => {
691 Err(e)
693 }
694 }
695 }
696 pub fn commit(&self, mut reservation: Reservation<'_>) {
697 reservation.release();
698 }
699
700 pub fn swap_reader_page(&self) -> Option<usize> {
701 let mut retries = 0;
702 loop {
703 let head_val = self.head_page.load(Ordering::Acquire);
704 let head_idx = Node::get_index(head_val);
705 let tail_val = self.tail_page.load(Ordering::Acquire);
706 let tail_idx = Node::get_index(tail_val);
707 if head_idx == tail_idx {
708 return None;
710 }
711 let head_node = &self.nodes[head_idx];
712 let active_writers = head_node.active_writers.load(Ordering::Acquire);
715 if (active_writers & (PAGE_ACTIVE_BIT | ACTIVE_WRITERS_MASK)) != 0 {
718 return None;
719 }
720 let mut size_bytes = [0u8; 8];
721 head_node.page_data.read_at(8, &mut size_bytes);
722 let size = u64::from_le_bytes(size_bytes);
723 if size == 0 {
724 return None;
730 }
731 let next_val = head_node.next.load(Ordering::Acquire);
732 let next_idx = Node::get_index(next_val);
733 let prev_val = head_node.prev.load(Ordering::Acquire);
734 let prev_idx = Node::get_index(prev_val);
735 let prev_node = &self.nodes[prev_idx];
736 let expected_next = Node::make_val(head_idx, FLAG_HEADER);
738 let locked_next = Node::make_val(head_idx, FLAG_UPDATE);
739 if prev_node
740 .next
741 .compare_exchange_weak(
742 expected_next,
743 locked_next,
744 Ordering::AcqRel,
745 Ordering::Relaxed,
746 )
747 .is_ok()
748 {
749 let reader_idx = self.reader_page.load(Ordering::Acquire);
751 let next_node = &self.nodes[next_idx];
752 self.nodes[reader_idx]
754 .prev
755 .store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
756 self.nodes[reader_idx]
757 .next
758 .store(Node::make_val(next_idx, FLAG_HEADER), Ordering::Relaxed);
759 next_node.prev.store(Node::make_val(reader_idx, FLAG_NORMAL), Ordering::Relaxed);
760 self.nodes[reader_idx]
762 .write_offset
763 .store(LocklessRingBuffer::PAGE_HEADER_SIZE, Ordering::Relaxed);
764 self.nodes[reader_idx].active_writers.store(PAGE_ACTIVE_BIT, Ordering::Relaxed);
766 self.nodes[reader_idx].page_data.write_at(8, &0u64.to_le_bytes());
768 let unlocked_val = Node::make_val(reader_idx, FLAG_NORMAL);
770 prev_node.next.store(unlocked_val, Ordering::Release);
771 self.head_page.store(next_idx, Ordering::Release);
773 self.reader_page.store(head_idx, Ordering::Release);
774 return Some(head_idx);
776 }
777 starnix_logging::log_warn!("swap_reader_page failed to lock, yielding");
779 retries += 1;
780 if retries >= 100_000 {
781 starnix_logging::log_error!("LocklessRingBuffer: HUNG in swap_reader_page loop");
782 return None;
783 }
784 std::hint::spin_loop();
788 }
789 }
790 pub fn read(&self, buf: &mut dyn OutputBuffer) -> Result<usize, Errno> {
791 if self.reader_active.swap(true, Ordering::AcqRel) {
794 starnix_logging::log_error!(
795 "LocklessRingBuffer: concurrent reads detected! Concurrent reads are not supported by design."
796 );
797 return error!(EBUSY);
798 }
799
800 let _lock_guard = scopeguard::guard(&self.reader_active, |reader_active| {
801 reader_active.store(false, Ordering::Release);
802 });
803
804 let mut val = self.ref_count.load(Ordering::Acquire);
809 loop {
810 if val & RING_ENABLED_BIT == 0 {
811 return error!(EAGAIN);
812 }
813 match self.ref_count.compare_exchange_weak(
814 val,
815 val + 1,
816 Ordering::Acquire,
817 Ordering::Relaxed,
818 ) {
819 Ok(_) => break,
820 Err(actual) => val = actual,
821 }
822 }
823
824 let _guard = scopeguard::guard(&self.ref_count, |ref_count| {
825 ref_count.fetch_sub(1, Ordering::Release);
826 });
827
828 if buf.available() < (*PAGE_SIZE) as usize {
829 starnix_logging::log_error!(
830 "Buffer is too small: {} bytes (needs {} bytes)",
831 buf.available(),
832 (*PAGE_SIZE) as usize
833 );
834 return error!(EINVAL);
835 }
836 if let Some(idx) = self.swap_reader_page() {
839 let node = &self.nodes[idx];
840 let mut offset = 0;
841 let page_size = (*PAGE_SIZE) as usize;
842 let bytes_written = buf.write_each(&mut move |segment| {
843 let available = page_size - offset;
844 if available == 0 {
845 return Ok(0);
846 }
847 let size = std::cmp::min(segment.len(), available);
848 let segment_mut_u8 = unsafe {
851 std::slice::from_raw_parts_mut(segment.as_mut_ptr() as *mut u8, size)
852 };
853 node.page_data.read_at(offset, segment_mut_u8);
854 offset += size;
855 Ok(size)
856 })?;
857 return Ok(bytes_written);
858 }
859 error!(EAGAIN)
860 }
861
862 #[cfg(test)]
863 pub(crate) fn size_bytes(&self) -> usize {
864 self.nodes.len() * (*PAGE_SIZE) as usize
865 }
866
867 pub fn is_enabled(&self) -> bool {
868 self.ref_count.load(Ordering::Acquire) & RING_ENABLED_BIT != 0
869 }
870 pub fn disable(&self) -> Result<u64, Errno> {
871 let _lock = self.state_mutex.lock();
872 self.ref_count.fetch_and(!RING_ENABLED_BIT, Ordering::AcqRel);
874 let mut yield_count: u64 = 0;
875 loop {
876 let active_writers = self.ref_count.load(Ordering::Acquire);
877 if active_writers == 0 {
878 break;
879 }
880 yield_count += 1;
881 if yield_count % 1_000_000 == 0 {
882 log_error!(
883 "LocklessRingBuffer: disable is waiting for {active_writers} active writers",
884 );
885 }
886 std::thread::yield_now();
891 }
892 if yield_count > 500_000 {
893 log_warn!("LocklessRingBuffer disable took {} yields", yield_count);
894 }
895 if let Err(e) = self.vmo.set_size(0) {
896 starnix_logging::log_error!(
897 "LocklessRingBuffer disable failed to set_size(0): {:?}",
898 e
899 );
900 return Err(from_status_like_fdio!(e));
901 }
902 let dropped = self.dropped_pages.load(Ordering::Relaxed);
903 Ok(dropped)
904 }
905 pub fn enable(&self) -> Result<zx::BootInstant, Errno> {
906 let _lock = self.state_mutex.lock();
907 let initial_pages = self.nodes.len() - 1;
908 let capacity = self.nodes.len() * (*PAGE_SIZE) as usize;
909 if let Err(e) = self.vmo.set_size(capacity as u64) {
910 starnix_logging::log_error!("LocklessRingBuffer enable failed to set_size: {:?}", e);
911 return Err(from_status_like_fdio!(e));
912 }
913 let now = zx::BootInstant::get();
914 self.head_page.store(0, Ordering::Release);
916 self.tail_page.store(0, Ordering::Release);
917
918 self.reader_page.store(initial_pages, Ordering::Release);
919
920 self.dropped_pages.store(0, Ordering::Release);
921 self.prev_timestamp.store(now.into_nanos() as u64, Ordering::Release);
922
923 for i in 0..self.nodes.len() {
924 self.nodes[i]
925 .write_offset
926 .store(LocklessRingBuffer::PAGE_HEADER_SIZE, Ordering::Release);
927 self.nodes[i].active_writers.store(PAGE_ACTIVE_BIT, Ordering::Release);
928 }
929 for i in 0..initial_pages {
930 let next_idx = (i + 1) % initial_pages;
931 let prev_idx = (i + initial_pages - 1) % initial_pages;
932 self.nodes[i].next.store(Node::make_val(next_idx, FLAG_NORMAL), Ordering::Relaxed);
933 self.nodes[i].prev.store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
934 }
935 self.nodes[initial_pages - 1].next.store(Node::make_val(0, FLAG_HEADER), Ordering::Relaxed);
936
937 self.nodes[initial_pages].page_data.write_at(8, &0u64.to_le_bytes());
939 let _ = self.vmo.as_vmo().expect("vmo must exist").op_range(zx::VmoOp::ZERO, 0, *PAGE_SIZE);
941 let nanos = now.into_nanos() as u64;
942 self.nodes[0].page_data.write_at(0, &nanos.to_le_bytes());
943
944 self.ref_count.store(RING_ENABLED_BIT, Ordering::Release);
946 Ok(now)
947 }
948}
949impl Drop for LocklessRingBuffer {
950 fn drop(&mut self) {
951 let (ptr, len) = self.mapping.as_ptr_len();
952 unsafe {
955 let _ = vmar_root_self().unmap(ptr as usize, len);
956 }
957 }
958}
959#[cfg(test)]
960mod tests {
961 use super::*;
962 use crate::vfs::buffers::VecOutputBuffer;
963 use crate::vfs::{Buffer, OutputBufferCallback, PeekBufferSegmentsCallback};
964 use std::sync::Arc;
965 #[repr(C)]
966 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
967 struct TestMessage {
968 thread_index: u32,
969 timestamp_nanos: u64,
970 delta: u64,
971 data: [u8; 12],
972 }
973 impl TestMessage {
974 pub const SIZE: usize = 32;
975 fn to_bytes(&self) -> [u8; TestMessage::SIZE] {
976 let mut bytes = [0u8; TestMessage::SIZE];
977 bytes[0..4].copy_from_slice(&self.thread_index.to_le_bytes());
978 bytes[4..12].copy_from_slice(&self.timestamp_nanos.to_le_bytes());
979 bytes[12..20].copy_from_slice(&self.delta.to_le_bytes());
980 bytes[20..32].copy_from_slice(&self.data);
981 bytes
982 }
983 fn from_bytes(bytes: &[u8]) -> Self {
984 let mut thread_index = [0u8; 4];
985 thread_index.copy_from_slice(&bytes[0..4]);
986 let mut timestamp_nanos = [0u8; 8];
987 timestamp_nanos.copy_from_slice(&bytes[4..12]);
988 let mut delta = [0u8; 8];
989 delta.copy_from_slice(&bytes[12..20]);
990 let mut data = [0u8; 12];
991 data.copy_from_slice(&bytes[20..32]);
992 Self {
993 thread_index: u32::from_le_bytes(thread_index),
994 timestamp_nanos: u64::from_le_bytes(timestamp_nanos),
995 delta: u64::from_le_bytes(delta),
996 data,
997 }
998 }
999 }
1000 #[test]
1001 fn test_init() {
1002 let buffer =
1003 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1004 .unwrap();
1005 assert_eq!(buffer.size_bytes(), 3 * (*PAGE_SIZE) as usize);
1006 }
1007 #[test]
1008 fn test_reserve() {
1009 let buffer =
1010 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1011 .unwrap();
1012 let res = buffer.reserve(100).unwrap();
1013 assert_eq!(res.0.size, 100);
1014 assert_eq!(res.0.offset, LocklessRingBuffer::PAGE_HEADER_SIZE);
1015 assert_eq!(res.0.node_idx, 0);
1016 }
1017 #[test]
1018 fn test_commit() {
1019 let buffer =
1020 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1021 .unwrap();
1022 let res = buffer.reserve(100).unwrap();
1023 buffer.commit(res.0);
1024 assert_eq!(buffer.nodes[0].active_writers.load(Ordering::Relaxed), PAGE_ACTIVE_BIT);
1025 }
1026 #[test]
1027 fn test_swap_reader_page_empty() {
1028 let buffer =
1029 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1030 .unwrap();
1031 assert_eq!(buffer.swap_reader_page(), None);
1032 }
1033 #[test]
1034 fn test_swap_reader_page_success() {
1035 let buffer =
1036 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1037 .unwrap();
1038 let res1 =
1040 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1041 buffer.commit(res1.0);
1042 let res2 = buffer.reserve(100).unwrap();
1044 assert_eq!(res2.0.node_idx, 1);
1045 buffer.commit(res2.0);
1046 let old_head = buffer.swap_reader_page();
1049 assert_eq!(old_head, Some(0));
1050 assert_eq!(buffer.head_page.load(Ordering::Relaxed), 1);
1052 }
1053 #[test]
1054 fn test_concurrent_reserve() {
1055 use std::sync::Arc;
1056 let buffer = Arc::new(
1057 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1058 .unwrap(),
1059 );
1060 let mut handles = vec![];
1061 for _ in 0..5 {
1062 let buffer_clone = Arc::clone(&buffer);
1063 let handle = std::thread::spawn(move || {
1064 for _ in 0..20 {
1065 if let Ok(res) = buffer_clone.reserve(10) {
1066 buffer_clone.commit(res.0);
1067 }
1068 }
1069 });
1070 handles.push(handle);
1071 }
1072 for handle in handles {
1073 handle.join().unwrap();
1074 }
1075 for i in 0..buffer.nodes.len() {
1076 assert_eq!(buffer.nodes[i].active_writers.load(Ordering::Relaxed), PAGE_ACTIVE_BIT);
1077 }
1078 }
1079 #[test]
1080 fn test_reserve_moves_to_next_page() {
1081 let buffer =
1082 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1083 .unwrap();
1084 let res1 = buffer
1086 .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE - 50)
1087 .unwrap();
1088 buffer.commit(res1.0);
1089 let res2 = buffer.reserve(100).unwrap();
1091 assert_eq!(res2.0.node_idx, 1);
1093 assert_eq!(res2.0.offset, (*PAGE_SIZE) as usize + LocklessRingBuffer::PAGE_HEADER_SIZE);
1094 }
1095 #[test]
1096 fn test_reserve_overwrite() {
1097 let buffer =
1098 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1099 .unwrap();
1100 let res1 =
1102 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1103 buffer.commit(res1.0);
1104 let _res2 =
1106 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1107 let res3 = buffer.reserve(100).unwrap();
1111 assert_eq!(res3.0.node_idx, 0);
1113 assert_eq!(buffer.dropped_pages(), 1);
1114 assert_eq!(buffer.head_page.load(Ordering::Relaxed), 1);
1116 }
1117 #[test]
1118 fn test_read() {
1119 let buffer =
1120 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1121 .unwrap();
1122 let res1 =
1123 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1124 let data = vec![1u8; (*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE];
1125 res1.0.write_at(0, &data);
1126 buffer.commit(res1.0);
1127 let res2 = buffer.reserve(100).unwrap();
1129 buffer.commit(res2.0);
1130 let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1131 let result = buffer.read(&mut dest);
1132 assert!(result.is_ok());
1133 assert_eq!(result.unwrap(), (*PAGE_SIZE) as usize);
1134 assert_eq!(&dest.data()[LocklessRingBuffer::PAGE_HEADER_SIZE..], &data[..]);
1135 }
1136 #[test]
1137 fn test_enable_disable() {
1138 let buffer =
1139 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1140 .unwrap();
1141 assert!(buffer.disable().is_ok());
1142 assert_eq!(buffer.ref_count.load(Ordering::Relaxed) & RING_ENABLED_BIT, 0);
1143 let res = buffer.reserve(100);
1144 assert_eq!(res.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1145 assert!(buffer.enable().is_ok());
1146 assert_eq!(buffer.size_bytes(), 3 * (*PAGE_SIZE) as usize);
1147 let res = buffer.reserve(100);
1148 assert!(res.is_ok());
1149 }
1150 fn start_reader_thread(
1156 buffer_reader: Arc<LocklessRingBuffer>,
1157 writers_done_reader: Arc<std::sync::atomic::AtomicBool>,
1158 read_page_delay: Option<(usize, std::time::Duration)>,
1159 ) -> std::thread::JoinHandle<Vec<TestMessage>> {
1160 std::thread::spawn(move || {
1161 let mut all_messages = Vec::new();
1162 let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1163 let mut consecutive_eagain = 0;
1164 let mut pages_read = 0;
1165 loop {
1166 dest.reset();
1167 match buffer_reader.read(&mut dest) {
1168 Ok(bytes_read) => {
1169 consecutive_eagain = 0;
1170 assert_eq!(bytes_read, (*PAGE_SIZE) as usize);
1171 let mut header_ts_bytes = [0u8; 8];
1172 header_ts_bytes.copy_from_slice(&dest.data()[0..8]);
1173 let header_timestamp = u64::from_le_bytes(header_ts_bytes);
1174 let mut size_bytes = [0u8; 8];
1175 size_bytes.copy_from_slice(&dest.data()[8..16]);
1176 let data_size = u64::from_le_bytes(size_bytes) as usize;
1177 let max_offset = LocklessRingBuffer::PAGE_HEADER_SIZE + data_size;
1178 let mut offset = LocklessRingBuffer::PAGE_HEADER_SIZE;
1179 let mut first_msg = true;
1180 while offset + TestMessage::SIZE <= max_offset {
1181 let msg_bytes = &dest.data()[offset..offset + TestMessage::SIZE];
1182 let msg = TestMessage::from_bytes(msg_bytes);
1183 if first_msg {
1184 if header_timestamp != msg.timestamp_nanos {
1185 println!(
1186 "HEADER TIMESTAMP MISMATCH: header={}, msg={}",
1187 header_timestamp, msg.timestamp_nanos
1188 );
1189 }
1190 first_msg = false;
1191 }
1192 all_messages.push(msg);
1193 offset += TestMessage::SIZE;
1194 }
1195 if let Some((mod_val, duration)) = read_page_delay {
1196 pages_read += 1;
1197 if pages_read % mod_val == 0 {
1198 std::thread::sleep(duration);
1199 }
1200 }
1201 }
1202 Err(e) if e == starnix_uapi::errno!(EAGAIN) => {
1203 if writers_done_reader.load(Ordering::Acquire) {
1204 let head_val = buffer_reader.head_page.load(Ordering::Relaxed);
1205 let tail_val = buffer_reader.tail_page.load(Ordering::Relaxed);
1206 if Node::get_index(head_val) == Node::get_index(tail_val) {
1209 break;
1210 }
1211 consecutive_eagain += 1;
1212 assert!(
1213 consecutive_eagain < 500,
1214 "LocklessRingBuffer reader stuck: consecutive EAGAIN limit exceeded (5s) after writers finished. head_page={}, tail_page={}, reader_page={}",
1215 Node::get_index(head_val),
1216 Node::get_index(tail_val),
1217 buffer_reader.reader_page.load(Ordering::Relaxed)
1218 );
1219 }
1220 std::thread::sleep(std::time::Duration::from_millis(10));
1221 }
1222 Err(e) => panic!("Unexpected error from read: {:?}", e),
1223 }
1224 }
1225 all_messages
1226 })
1227 }
1228 fn check_all_message_data(all_messages: &[TestMessage], num_threads: u32) {
1229 let mut prev_timestamp = 0;
1230 let mut thread_counts = vec![0; num_threads as usize];
1231 let mut out_of_order = 0;
1232 let mut corrupted = 0;
1233 for msg in all_messages {
1234 if msg.timestamp_nanos < prev_timestamp {
1235 println!("OUT OF ORDER: prev_timestamp={}, current={:?}", prev_timestamp, msg);
1236 out_of_order += 1;
1237 }
1238 prev_timestamp = msg.timestamp_nanos;
1239 if &msg.data != b"Event data\0\0" || msg.thread_index >= num_threads {
1240 println!("CORRUPTED: msg={:?}", msg);
1241 corrupted += 1;
1242 } else {
1243 thread_counts[msg.thread_index as usize] += 1;
1244 }
1245 }
1246 println!(
1247 "TEST_RESULT: Read {} messages, {} out of order, {} corrupted. Thread counts: {:?}",
1248 all_messages.len(),
1249 out_of_order,
1250 corrupted,
1251 thread_counts
1252 );
1253 assert_eq!(corrupted, 0, "Found corrupted messages");
1255 }
1256 #[test]
1257 fn test_concurrent_read_write_4() {
1258 let num_threads = 4;
1259 let msgs_per_thread = 64;
1260 let buffer = Arc::new(
1264 LocklessRingBuffer::new(4 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1265 .unwrap(),
1266 );
1267 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1268 let mut handles = vec![];
1269 let buffer_reader = Arc::clone(&buffer);
1271 let writers_done_reader = Arc::clone(&writers_done);
1272 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1273 for thread_index in 0..num_threads {
1275 let buffer_clone = Arc::clone(&buffer);
1276 let handle = std::thread::spawn(move || {
1277 std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1278 for _ in 0..msgs_per_thread {
1279 let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1281 let msg = TestMessage {
1282 thread_index,
1283 timestamp_nanos: now.into_nanos() as u64,
1284 delta: delta.into_nanos() as u64,
1285 data: *b"Event data\0\0",
1286 };
1287 res.write_at(0, &msg.to_bytes());
1288 buffer_clone.commit(res);
1289 std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1290 }
1291 });
1292 handles.push(handle);
1293 }
1294 for handle in handles {
1295 handle.join().unwrap();
1296 }
1297 writers_done.store(true, Ordering::Release);
1298 let all_messages = reader_handle.join().unwrap();
1299 check_all_message_data(&all_messages, num_threads);
1301 assert!(
1303 all_messages.len() >= 250 && all_messages.len() <= 256,
1304 "Expected between 250 and 256 messages, got {}",
1305 all_messages.len()
1306 );
1307 }
1308 #[test]
1309 fn test_concurrent_read_write_1_thread() {
1310 let num_threads = 1;
1311 let msgs_per_thread = 256;
1312 let buffer = Arc::new(
1316 LocklessRingBuffer::new(5 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1317 .unwrap(),
1318 );
1319 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1320 let mut handles = vec![];
1321 let buffer_reader = Arc::clone(&buffer);
1323 let writers_done_reader = Arc::clone(&writers_done);
1324 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1325 for thread_index in 0..num_threads {
1327 let buffer_clone = Arc::clone(&buffer);
1328 let handle = std::thread::spawn(move || {
1329 std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1330 for _ in 0..msgs_per_thread {
1331 let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1332 let msg = TestMessage {
1333 thread_index,
1334 timestamp_nanos: now.into_nanos() as u64,
1335 delta: delta.into_nanos() as u64,
1336 data: *b"Event data\0\0",
1337 };
1338 res.write_at(0, &msg.to_bytes());
1339 buffer_clone.commit(res);
1340 std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1341 }
1342 });
1343 handles.push(handle);
1344 }
1345 for handle in handles {
1346 handle.join().unwrap();
1347 }
1348 writers_done.store(true, Ordering::Release);
1349 let all_messages = reader_handle.join().unwrap();
1350 check_all_message_data(&all_messages, num_threads);
1351 assert_eq!(
1352 all_messages.len(),
1353 254,
1354 "Expected exactly 254 messages, got {}",
1355 all_messages.len()
1356 );
1357 }
1358 #[test]
1359 fn test_concurrent_read_write_8_threads() {
1360 let num_threads = 8;
1361 let msgs_per_thread = 128;
1362 let buffer = Arc::new(
1367 LocklessRingBuffer::new(12 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1368 .unwrap(),
1369 );
1370 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1371 let _writers_done_guard = scopeguard::guard(Arc::clone(&writers_done), |done| {
1372 done.store(true, Ordering::Release);
1373 });
1374 let mut handles = vec![];
1375 let buffer_reader = Arc::clone(&buffer);
1377 let writers_done_reader = Arc::clone(&writers_done);
1378 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1379 for thread_index in 0..num_threads {
1381 let buffer_clone = Arc::clone(&buffer);
1382 let handle = std::thread::spawn(move || {
1383 std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1384 for _ in 0..msgs_per_thread {
1385 let mut retries = 0;
1387 let (res, now, delta) = loop {
1388 match buffer_clone.reserve(TestMessage::SIZE) {
1389 Ok(r) => break r,
1390 Err(e) if e == starnix_uapi::errno!(ENOSPC) => {
1391 retries += 1;
1392 assert!(
1393 retries < 100,
1394 "LocklessRingBuffer: ENOSPC transient limit exceeded (100ms). Reader may have hung."
1395 );
1396 std::thread::sleep(std::time::Duration::from_millis(1));
1400 }
1401 Err(e) => panic!("Unexpected error: {:?}", e),
1402 }
1403 };
1404 let msg = TestMessage {
1405 thread_index,
1406 timestamp_nanos: now.into_nanos() as u64,
1407 delta: delta.into_nanos() as u64,
1408 data: *b"Event data\0\0",
1409 };
1410 res.write_at(0, &msg.to_bytes());
1411 buffer_clone.commit(res);
1412 std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1413 }
1414 });
1415 handles.push(handle);
1416 }
1417 for handle in handles {
1418 handle.join().unwrap();
1419 }
1420
1421 writers_done.store(true, Ordering::Release);
1422 let all_messages = reader_handle.join().unwrap();
1423 check_all_message_data(&all_messages, num_threads);
1425 assert!(
1431 all_messages.len() >= 1008 && all_messages.len() <= 1024,
1432 "Expected between 1008 and 1024 messages, got {}",
1433 all_messages.len()
1434 );
1435 }
1436 #[test]
1437 fn test_disable_waits_for_ref_count() {
1438 use std::sync::Arc;
1439 use std::sync::atomic::{AtomicBool, Ordering};
1440 let buffer = Arc::new(
1441 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1442 .unwrap(),
1443 );
1444 let res = buffer.reserve(100).unwrap();
1446 let disable_finished = Arc::new(AtomicBool::new(false));
1447 let disable_finished_clone = Arc::clone(&disable_finished);
1448 let buffer_clone = Arc::clone(&buffer);
1449 let handle = std::thread::spawn(move || {
1451 buffer_clone.disable().unwrap();
1452 disable_finished_clone.store(true, Ordering::Relaxed);
1453 });
1454 while buffer.ref_count.load(Ordering::Acquire) & RING_ENABLED_BIT != 0 {
1456 std::hint::spin_loop();
1457 }
1458 assert!(!disable_finished.load(Ordering::Relaxed));
1460 let res2 = buffer.reserve(100);
1462 assert_eq!(res2.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1463 let data = vec![42u8; 100];
1466 res.0.write_at(0, &data);
1467 buffer.commit(res.0);
1468 handle.join().unwrap();
1470 assert!(disable_finished.load(Ordering::Relaxed));
1472 }
1473 #[test]
1474 fn test_reservation_drop_cancels_writer_count() {
1475 let buffer =
1476 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1477 .unwrap();
1478 let res = buffer.reserve(100).unwrap();
1480 assert_eq!(buffer.ref_count.load(Ordering::Relaxed), RING_ENABLED_BIT | 1);
1481 std::mem::drop(res.0);
1483 assert_eq!(buffer.ref_count.load(Ordering::Relaxed), RING_ENABLED_BIT);
1485 let res2 = buffer.reserve(100).unwrap();
1487 buffer.commit(res2.0);
1488 }
1489 #[test]
1490 fn test_overwrite_and_read_4_pages() {
1491 let num_threads = 1;
1492 let buffer = Arc::new(
1495 LocklessRingBuffer::new(6 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1496 .unwrap(),
1497 );
1498 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1499 let msgs_per_thread = 7 * 127;
1502 let buffer_clone = Arc::clone(&buffer);
1503 let thread_index = 0;
1504 let writer_handle = std::thread::spawn(move || {
1505 for _ in 0..msgs_per_thread {
1506 let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1507 let msg = TestMessage {
1508 thread_index,
1509 timestamp_nanos: now.into_nanos() as u64,
1510 delta: delta.into_nanos() as u64,
1511 data: *b"Event data\0\0",
1512 };
1513 res.write_at(0, &msg.to_bytes());
1514 buffer_clone.commit(res);
1515 }
1516 });
1517 writer_handle.join().unwrap();
1518 assert_eq!(buffer.dropped_pages(), 2, "Expected 2 pages to be dropped");
1519 writers_done.store(true, Ordering::Release);
1520 let buffer_reader = Arc::clone(&buffer);
1521 let writers_done_reader = Arc::clone(&writers_done);
1522 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1523 let all_messages = reader_handle.join().unwrap();
1524 check_all_message_data(&all_messages, num_threads);
1525 assert_eq!(all_messages.len(), 4 * 127, "Expected exactly 4 pages of messages");
1526 }
1527 #[test]
1528 fn test_reserve_full_producer_consumer() {
1529 let buffer =
1530 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1531 .unwrap();
1532 let res1 =
1534 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1535 buffer.commit(res1.0);
1536 let res2 =
1538 buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1539 buffer.commit(res2.0);
1540 let res3 = buffer.reserve(100);
1543 assert_eq!(res3.unwrap_err(), starnix_uapi::errno!(ENOSPC));
1544 }
1545
1546 #[test]
1547 fn test_reserve_zero_size() {
1548 let buffer =
1549 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1550 .unwrap();
1551 let res = buffer.reserve(0);
1552 assert_eq!(res.unwrap_err(), starnix_uapi::errno!(EINVAL));
1553 }
1554 #[test]
1555 fn test_concurrent_overwrite_stability() {
1556 let num_threads = 4;
1557 let msgs_per_thread = 256;
1558 let buffer = Arc::new(
1560 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1561 .unwrap(),
1562 );
1563 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1564 let mut handles = vec![];
1565
1566 let buffer_reader = Arc::clone(&buffer);
1567 let writers_done_reader = Arc::clone(&writers_done);
1568 let delay = Some((buffer.nodes.len() - 1, std::time::Duration::from_millis(20)));
1571 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, delay);
1572
1573 for thread_index in 0..num_threads {
1575 let buffer_clone = Arc::clone(&buffer);
1576 let handle = std::thread::spawn(move || {
1577 for _ in 0..msgs_per_thread {
1578 if let Ok((res, now, delta)) = buffer_clone.reserve(TestMessage::SIZE) {
1579 let msg = TestMessage {
1580 thread_index,
1581 timestamp_nanos: now.into_nanos() as u64,
1582 delta: delta.into_nanos() as u64,
1583 data: *b"Event data\0\0",
1584 };
1585 res.write_at(0, &msg.to_bytes());
1586 buffer_clone.commit(res);
1587 }
1588 std::thread::yield_now();
1590 }
1591 });
1592 handles.push(handle);
1593 }
1594 for handle in handles {
1595 handle.join().unwrap();
1596 }
1597 writers_done.store(true, Ordering::Release);
1598 let all_messages = reader_handle.join().unwrap();
1599 check_all_message_data(&all_messages, num_threads);
1601 assert!(buffer.dropped_pages() > 0, "Expected at least some dropped pages");
1603 }
1604
1605 #[test]
1606 fn test_out_of_order_completion_same_page() {
1607 let buffer =
1608 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1609 .unwrap();
1610
1611 let header_size = LocklessRingBuffer::PAGE_HEADER_SIZE;
1612 let page_size = (*PAGE_SIZE) as usize;
1613 let available_space = page_size - header_size;
1614
1615 let size1 = available_space / 2;
1616 let size2 = available_space - size1;
1617
1618 let (res1, _, _) = buffer.reserve(size1).unwrap();
1620 let (res2, _, _) = buffer.reserve(size2).unwrap();
1622
1623 buffer.commit(res2);
1625
1626 assert!(buffer.swap_reader_page().is_none());
1628
1629 buffer.commit(res1);
1631
1632 let _ = buffer.reserve(10).unwrap();
1634
1635 let swapped = buffer.swap_reader_page();
1637 assert!(swapped.is_some());
1638 assert_eq!(swapped.unwrap(), 0); }
1640
1641 #[test]
1642 fn test_concurrent_readers_rejection() {
1643 #[derive(Debug)]
1644 struct SleepingOutputBuffer;
1645 impl Buffer for SleepingOutputBuffer {
1646 fn segments_count(&self) -> Result<usize, Errno> {
1647 Ok(1)
1648 }
1649 fn peek_each_segment(
1650 &mut self,
1651 _callback: &mut PeekBufferSegmentsCallback<'_>,
1652 ) -> Result<(), Errno> {
1653 Ok(())
1654 }
1655 }
1656 impl OutputBuffer for SleepingOutputBuffer {
1657 fn write_each(
1658 &mut self,
1659 _callback: &mut OutputBufferCallback<'_>,
1660 ) -> Result<usize, Errno> {
1661 Ok(0)
1662 }
1663 fn available(&self) -> usize {
1664 std::thread::sleep(std::time::Duration::from_millis(50));
1665 (*PAGE_SIZE) as usize
1666 }
1667 fn bytes_written(&self) -> usize {
1668 0
1669 }
1670 fn zero(&mut self) -> Result<usize, Errno> {
1671 Ok(0)
1672 }
1673 unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
1674 Ok(())
1675 }
1676 }
1677
1678 let buffer = Arc::new(
1679 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1680 .unwrap(),
1681 );
1682
1683 let buffer_clone1 = Arc::clone(&buffer);
1684 let handle1 = std::thread::spawn(move || {
1685 let mut dest = SleepingOutputBuffer;
1686 let _ = buffer_clone1.read(&mut dest);
1687 });
1688
1689 std::thread::sleep(std::time::Duration::from_millis(10));
1691
1692 let buffer_clone2 = Arc::clone(&buffer);
1693 let handle2 = std::thread::spawn(move || {
1694 let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1695 let res = buffer_clone2.read(&mut dest);
1696 assert_eq!(res.unwrap_err(), starnix_uapi::errno!(EBUSY));
1697 });
1698
1699 handle2.join().unwrap();
1700 handle1.join().unwrap();
1701 }
1702
1703 #[test]
1704 fn test_stale_offset_livelock() {
1705 let num_threads = 8;
1706 let msgs_per_thread = 200;
1707 let buffer = Arc::new(
1709 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1710 .unwrap(),
1711 );
1712 let mut handles = vec![];
1713
1714 for thread_index in 0..num_threads {
1715 let buffer_clone = Arc::clone(&buffer);
1716 let handle = std::thread::spawn(move || {
1717 for _ in 0..msgs_per_thread {
1718 if let Ok((res, now, delta)) = buffer_clone.reserve(TestMessage::SIZE) {
1719 let msg = TestMessage {
1720 thread_index,
1721 timestamp_nanos: now.into_nanos() as u64,
1722 delta: delta.into_nanos() as u64,
1723 data: *b"Event data\0\0",
1724 };
1725 res.write_at(0, &msg.to_bytes());
1726 buffer_clone.commit(res);
1727 }
1728 std::thread::yield_now();
1729 }
1730 });
1731 handles.push(handle);
1732 }
1733
1734 for handle in handles {
1735 handle.join().unwrap();
1736 }
1737
1738 let dropped = buffer.dropped_pages();
1739 println!("High contention completed. Total dropped pages = {}", dropped);
1740 assert!(dropped > 0);
1742 }
1743
1744 #[test]
1745 fn test_writer_preemption_and_overwrite_prevention() {
1746 let buffer = Arc::new(
1747 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1748 .unwrap(),
1749 );
1750
1751 let res1 = buffer.reserve(100).unwrap();
1753 assert_eq!(res1.0.node_idx, 0);
1754
1755 let buffer_clone = Arc::clone(&buffer);
1758 let writer_finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
1759 let writer_finished_clone = Arc::clone(&writer_finished);
1760
1761 let handle = std::thread::spawn(move || {
1762 let res2 = buffer_clone
1764 .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE - 100)
1765 .unwrap();
1766 buffer_clone.commit(res2.0);
1767
1768 let res3 = buffer_clone
1770 .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE)
1771 .unwrap();
1772 buffer_clone.commit(res3.0);
1773
1774 let res4 = buffer_clone.reserve(100).unwrap();
1777 buffer_clone.commit(res4.0);
1778
1779 writer_finished_clone.store(true, Ordering::Release);
1780 });
1781
1782 std::thread::sleep(std::time::Duration::from_millis(50));
1784 assert!(!writer_finished.load(Ordering::Acquire));
1785
1786 buffer.commit(res1.0);
1788
1789 handle.join().unwrap();
1790 assert!(writer_finished.load(Ordering::Acquire));
1791 }
1792
1793 #[test]
1794 fn test_extreme_disable_enable_stress() {
1795 let num_threads = 8;
1796 let msgs_per_thread = 100;
1797 let buffer = Arc::new(
1798 LocklessRingBuffer::new(5 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1799 .unwrap(),
1800 );
1801 let mut handles = vec![];
1802
1803 for thread_index in 0..num_threads {
1805 let buffer_clone = Arc::clone(&buffer);
1806 let handle = std::thread::spawn(move || {
1807 let mut count = 0;
1808 while count < msgs_per_thread {
1809 match buffer_clone.reserve(TestMessage::SIZE) {
1810 Ok((res, now, delta)) => {
1811 let msg = TestMessage {
1812 thread_index,
1813 timestamp_nanos: now.into_nanos() as u64,
1814 delta: delta.into_nanos() as u64,
1815 data: *b"Event data\0\0",
1816 };
1817 res.write_at(0, &msg.to_bytes());
1818 buffer_clone.commit(res);
1819 count += 1;
1820 }
1821 Err(e) if e == starnix_uapi::errno!(ENOMEM) => {
1822 std::thread::yield_now();
1824 }
1825 Err(e) => panic!("Unexpected error during reserve: {:?}", e),
1826 }
1827 }
1828 count
1829 });
1830 handles.push(handle);
1831 }
1832
1833 let buffer_clone = Arc::clone(&buffer);
1835 let coordinator = std::thread::spawn(move || {
1836 for _ in 0..20 {
1837 std::thread::sleep(std::time::Duration::from_millis(5));
1838 let _dropped = buffer_clone.disable().unwrap();
1840
1841 let res = buffer_clone.reserve(TestMessage::SIZE);
1843 assert_eq!(res.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1844
1845 std::thread::sleep(std::time::Duration::from_millis(2));
1846 let _now = buffer_clone.enable().unwrap();
1848 }
1849 });
1850
1851 coordinator.join().unwrap();
1852 let mut total_writes = 0;
1853 for handle in handles {
1854 total_writes += handle.join().unwrap();
1855 }
1856 println!("Disable/enable stress completed. Total writes = {}", total_writes);
1857 assert_eq!(total_writes, num_threads * msgs_per_thread);
1858 }
1859
1860 #[test]
1861 fn test_reader_loop_swapping_high_contention() {
1862 let num_threads = 8usize;
1863 let msgs_per_thread = 100;
1864 let buffer = Arc::new(
1868 LocklessRingBuffer::new(10 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1869 .unwrap(),
1870 );
1871 let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1872 let mut handles = vec![];
1873
1874 let buffer_reader = Arc::clone(&buffer);
1876 let writers_done_reader = Arc::clone(&writers_done);
1877 let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1878
1879 for thread_index in 0..num_threads {
1881 let buffer_clone = Arc::clone(&buffer);
1882 let handle = std::thread::spawn(move || {
1883 for _ in 0..msgs_per_thread {
1884 let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1885 let msg = TestMessage {
1886 thread_index: thread_index as u32,
1887 timestamp_nanos: now.into_nanos() as u64,
1888 delta: delta.into_nanos() as u64,
1889 data: *b"Event data\0\0",
1890 };
1891 res.write_at(0, &msg.to_bytes());
1892 buffer_clone.commit(res);
1893 std::thread::yield_now();
1895 }
1896 });
1897 handles.push(handle);
1898 }
1899
1900 for handle in handles {
1901 handle.join().unwrap();
1902 }
1903 writers_done.store(true, Ordering::Release);
1904 let all_messages = reader_handle.join().unwrap();
1905 let messages_read = all_messages.len();
1906 check_all_message_data(&all_messages, num_threads as u32);
1907 let dropped = buffer.dropped_pages();
1908 println!(
1909 "Reader high contention completed. Messages read = {}, dropped pages = {}",
1910 messages_read, dropped
1911 );
1912 let messages_per_page =
1914 ((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE) / TestMessage::SIZE;
1915 let total_written = num_threads * msgs_per_thread;
1916 let total_accounted = messages_read + (dropped as usize * messages_per_page);
1917 assert!(total_accounted >= total_written - messages_per_page);
1918 }
1919
1920 #[test]
1921 fn test_failed_reservation_offset_boundary() {
1922 let buffer =
1923 LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1924 .unwrap();
1925 let page_size = (*PAGE_SIZE) as usize;
1926 let max_payload = page_size - LocklessRingBuffer::PAGE_HEADER_SIZE;
1927
1928 let msg_size = 100;
1930 let num_msgs = max_payload / msg_size;
1931 for _ in 0..num_msgs {
1932 let (res, _, _) = buffer.reserve(msg_size).unwrap();
1933 buffer.commit(res);
1934 }
1935
1936 let expected_offset = LocklessRingBuffer::PAGE_HEADER_SIZE + num_msgs * msg_size;
1938 assert_eq!(buffer.nodes[0].write_offset.load(Ordering::Acquire), expected_offset);
1939
1940 let (res, _, _) = buffer.reserve(msg_size).unwrap();
1942 assert_eq!(res.node_idx, 1);
1943 buffer.commit(res);
1944
1945 assert_eq!(buffer.nodes[0].write_offset.load(Ordering::Acquire), expected_offset);
1949 }
1950}