Skip to main content

starnix_core/perf/
lockless_ring_buffer.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This is a lockless ring buffer modeled after https://docs.kernel.org/trace/ring-buffer-design.html
6use crate::mm::memory::MemoryObject;
7use crate::vfs::OutputBuffer;
8use fuchsia_runtime::vmar_root_self;
9use fuchsia_trace;
10use shared_buffer::SharedBuffer;
11use starnix_logging::{log_error, log_info, log_warn};
12use starnix_sync::Mutex;
13use starnix_types::PAGE_SIZE;
14use starnix_uapi::errors::Errno;
15use starnix_uapi::{errno, error, from_status_like_fdio};
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18struct Node {
19    // page_data is the memory view for this page.
20    page_data: SharedBuffer,
21    // next and prev are pointers to the next and previous nodes.
22    // creating a linked list of pages.
23    next: AtomicUsize,
24    prev: AtomicUsize,
25    // offset to write new data in this page.
26    write_offset: AtomicUsize,
27    // Number of active writers currently reserving or writing to this page.
28    active_writers: AtomicUsize,
29}
30
31const FLAG_MASK: usize = 0b11 << 62;
32// Normal link between pages in the circular list.
33const FLAG_NORMAL: usize = 0b00 << 62;
34// Marks the link pointing to the head page (the oldest page with data).
35const FLAG_HEADER: usize = 0b01 << 62;
36// Set on the link pointing to the head page when it is being swapped by the reader.
37const FLAG_UPDATE: usize = 0b10 << 62;
38
39// The highest bit (bit 63) of `active_writers` is used as a flag indicating if the page is active.
40// When the page is finalized, this bit is cleared to 0.
41const PAGE_ACTIVE_BIT: usize = 1 << 63;
42// The second highest bit (bit 62) of `active_writers` is used to coordinate/claim finalization.
43const PAGE_FINALIZED_BIT: usize = 1 << 62;
44const FLAGS_MASK: usize = PAGE_ACTIVE_BIT | PAGE_FINALIZED_BIT;
45const ACTIVE_WRITERS_MASK: usize = !FLAGS_MASK;
46
47// When yielding under high contention, if we busy-spin we risk priority inversion and CPU starvation
48// of preempted writers. Sleeping for 50 microseconds gives the OS scheduler sufficient window (accounting
49// for 2-5us context switch overhead and 15-30us thread execution time) to reschedule and execute the
50// preempted writer so it can release its reservation.
51const SPIN_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_micros(50);
52
53// When progressive sleep is triggered (exceeding 1000 yields), we back off with a 10 milliseconds sleep.
54// This is a robust scheduling window to guarantee that Zircon schedules the preempted writer thread
55// to complete its commit/release logic.
56const PROGRESSIVE_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(10);
57
58impl Node {
59    fn get_index(val: usize) -> usize {
60        val & !FLAG_MASK
61    }
62    fn get_flags(val: usize) -> usize {
63        val & FLAG_MASK
64    }
65    fn make_val(index: usize, flags: usize) -> usize {
66        index | flags
67    }
68
69    /// Finalizes the page by writing the data size to the header if tracing is enabled.
70    fn finalize(&self) {
71        // Attempt to claim the finalization role for this page.
72        // If another thread already claimed it, return early to avoid a data race on `page_data`.
73        let old_val = self.active_writers.fetch_or(PAGE_FINALIZED_BIT, Ordering::AcqRel);
74        if old_val & PAGE_FINALIZED_BIT != 0 {
75            return;
76        }
77
78        // - Acquire: We load `write_offset` with Acquire ordering to synchronize with all concurrent
79        //   writers that updated it via CAS in `try_reserve_on_page`. This ensures that
80        //   all non-atomic writes to `page_data` performed by the writers before their reservation was completed
81        //   are fully visible to this finalizing thread.
82        let write_offset = self.write_offset.load(Ordering::Acquire);
83        let data_size = std::cmp::min(write_offset, (*PAGE_SIZE) as usize)
84            - LocklessRingBuffer::PAGE_HEADER_SIZE;
85        self.page_data.write_at(8, &(data_size as u64).to_le_bytes());
86
87        // Clear the `PAGE_ACTIVE_BIT` from `active_writers` with Release ordering to publish the completed page.
88        // Release semantics here guarantees that if a reader observes the PAGE_ACTIVE_BIT being cleared,
89        // it is safe to read the page as it will also observe all the completed writes from the writers for that page.
90        self.active_writers.fetch_and(!PAGE_ACTIVE_BIT, Ordering::Release);
91    }
92
93    /// Releases a writer reservation on this node, and finalizes the node if it was the last writer.
94    fn release_writer(&self) {
95        // Using Release ordering with fetch_sub make the loading part use Relaxed ordering, and the
96        // store part Release. This means the value will be updated before the Acquire loading.
97        let prev_writers = self.active_writers.fetch_sub(1, Ordering::Release);
98
99        // If this was the last writer finalizing an active page (`PAGE_ACTIVE_BIT | 1`), we ensure the page is finalized.
100        // Note that if the page was already finalized by the advancing thread, `PAGE_FINALIZED_BIT` will be set,
101        // meaning `prev_writers` will have it set and we won't enter this branch, which is correct since it's already finalized.
102        if prev_writers == PAGE_ACTIVE_BIT | 1 {
103            let write_offset = self.write_offset.load(Ordering::Acquire);
104            if write_offset >= (*PAGE_SIZE) as usize {
105                // Since this thread holds a valid `Reservation`, `ref_count` remains > 0, which blocks
106                // `disable()` from completing and shrinking the VMO. This guarantees that the ring VMO
107                // remains enabled and valid for memory access.
108                // Without this check and finalization on failure, there is a liveness bug where a full page
109                // could never be finalized because the successful writer already skipped finalization.
110                self.finalize();
111            }
112        }
113    }
114}
115
116// We are only 64 bit today, but make it easy to find this assumption if we go to a smaller arch.
117static_assertions::const_assert!(std::mem::size_of::<usize>() == 8);
118
119// Use the high bit to indicate the ring is enabled. This makes the default, 0,
120// which is disabled, an easy value to reason about. And the ref count is the lower 63 bits.
121const RING_ENABLED_BIT: usize = 1 << 63;
122
123pub struct LocklessRingBuffer {
124    vmo: MemoryObject,
125    mapping: SharedBuffer,
126    // linked list of pages that represent the ring, backed by the mapping (and the vmo).
127    nodes: Vec<Node>,
128    // The page where to read the data from the ring.
129    head_page: AtomicUsize,
130    // The page where writes add to the ring.
131    tail_page: AtomicUsize,
132
133    // A page used by the reader of the ring.
134    reader_page: AtomicUsize,
135
136    // Tracks the global number of active readers and writers of the ring in the lower 63 bits.
137    // The highest bit (RING_ENABLED_BIT) is set when the ring is enabled.
138    ref_count: AtomicUsize,
139    // True indicates drop old pages to write new data when the ring is full. If false, writes fail until
140    // the data is read, effectively dropping new data and keeping the old data.
141    overwrite: bool,
142    // Number of dropped pages when overwrite is true.
143    dropped_pages: std::sync::atomic::AtomicU64,
144    // Used to calculate the delta from the last event. Since this is written to by multiple threads,
145    // we keep an atomic value for this.
146    prev_timestamp: std::sync::atomic::AtomicU64,
147    // The async trace event ID for write events.
148    write_event_async_id: fuchsia_trace::Id,
149    // Tracks whether a reader is currently active, used to validate that concurrent reads
150    // on a single ring buffer are not supported and do not happen. This can be removed if we
151    // are convinced it is not necessary.
152    reader_active: std::sync::atomic::AtomicBool,
153    // Mutex to serialize enable() and disable() calls to prevent racing. This should
154    // never be accessed by other methods to avoid locking during reading and writing.
155    state_mutex: Mutex<()>,
156}
157impl LocklessRingBuffer {
158    // This is an Ftrace page header consisting of a u64 timestamp at offset 0 and a u64 data size at offset 8.
159    pub const PAGE_HEADER_SIZE: usize = 16;
160
161    /// Creates a new LocklessRingBuffer.
162    /// size_bytes: Size of the ring buffer. Must be at least PAGE_SIZE * 3 and will be rounded up to a multiple of PAGE_SIZE.
163    /// overwrite: If true, old pages will be dropped when the ring is full. If false, writes will fail until the data is read.
164    /// write_event_async_id: The async trace event ID for write events.
165    pub fn new(
166        size_bytes: usize,
167        overwrite: bool,
168        write_event_async_id: fuchsia_trace::Id,
169    ) -> Result<Self, Errno> {
170        let requested_pages = (size_bytes + (*PAGE_SIZE) as usize - 1) / (*PAGE_SIZE) as usize;
171        // 3 pages are needed, 1 for the read page, 1 for head, and one to swap.
172        let pages = std::cmp::max(3, requested_pages);
173        let total_nodes = pages;
174        let capacity = total_nodes * (*PAGE_SIZE) as usize;
175
176        // Create VMO
177        let vmo: MemoryObject =
178            zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, capacity as u64)
179                .map_err(|_| errno!(ENOMEM))?
180                .into();
181        let vmo = vmo.with_zx_name(b"starnix:tracefs");
182        // Map VMO
183        let addr = vmar_root_self()
184            .map(
185                0,
186                vmo.as_vmo().expect("vmo must exist"),
187                0,
188                capacity,
189                zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE,
190            )
191            .map_err(|e| from_status_like_fdio!(e))?;
192        // SAFETY: The address returned by `vmar.map` is valid for `capacity` bytes.
193        let mapping = unsafe { SharedBuffer::new(addr as *mut u8, capacity) };
194
195        // Create nodes
196        let mut nodes = Vec::with_capacity(total_nodes);
197        let base_ptr = addr as *mut u8;
198        for i in 0..total_nodes {
199            // SAFETY: `base_ptr` points to a valid mapping of size `capacity`.
200            // `i * PAGE_SIZE` is within the bounds of this mapping since `i < total_nodes`.
201            // The memory range `[page_ptr, page_ptr + PAGE_SIZE)` is valid and mapped.
202            let page_ptr = unsafe { base_ptr.add(i * (*PAGE_SIZE) as usize) };
203
204            nodes.push(Node {
205                // SAFETY: `page_ptr` is in bounds of the mapped region and valid for `PAGE_SIZE` bytes.
206                page_data: unsafe { SharedBuffer::new(page_ptr, (*PAGE_SIZE) as usize) },
207                next: AtomicUsize::new(0),
208                prev: AtomicUsize::new(0),
209                write_offset: AtomicUsize::new(LocklessRingBuffer::PAGE_HEADER_SIZE),
210                // Initialize active_writers to PAGE_ACTIVE_BIT (1) since the page is active.
211                active_writers: AtomicUsize::new(PAGE_ACTIVE_BIT),
212            });
213        }
214        // Link first `pages - 1` nodes in a circle. The last page is reserved for the reader initialization.
215        let circle_size = pages - 1;
216        for i in 0..circle_size {
217            let next_idx = (i + 1) % circle_size;
218            let prev_idx = (i + circle_size - 1) % circle_size;
219            nodes[i].next.store(Node::make_val(next_idx, FLAG_NORMAL), Ordering::Relaxed);
220            nodes[i].prev.store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
221        }
222        // Initialize the reader page, mark head page (node 0).
223        nodes[circle_size - 1].next.store(Node::make_val(0, FLAG_HEADER), Ordering::Relaxed);
224        // Initialize reader page pointers to point to the head page and its predecessor
225        nodes[pages - 1].next.store(Node::make_val(0, FLAG_NORMAL), Ordering::Relaxed);
226        nodes[pages - 1]
227            .prev
228            .store(Node::make_val(circle_size - 1, FLAG_NORMAL), Ordering::Relaxed);
229        // Initialize reader page size to 0
230        nodes[pages - 1].page_data.write_at(8, &0u64.to_le_bytes());
231        let buffer = Self {
232            vmo,
233            mapping,
234            nodes,
235            head_page: AtomicUsize::new(0),
236            tail_page: AtomicUsize::new(0),
237            reader_page: AtomicUsize::new(pages - 1),
238
239            ref_count: AtomicUsize::new(RING_ENABLED_BIT),
240            overwrite,
241            dropped_pages: std::sync::atomic::AtomicU64::new(0),
242            prev_timestamp: std::sync::atomic::AtomicU64::new(
243                zx::BootInstant::get().into_nanos() as u64
244            ),
245            write_event_async_id,
246            reader_active: std::sync::atomic::AtomicBool::new(false),
247            state_mutex: Mutex::new(()),
248        };
249        Ok(buffer)
250    }
251    pub fn dropped_pages(&self) -> u64 {
252        self.dropped_pages.load(Ordering::Relaxed)
253    }
254}
255// Debugging support for detecting live locks. Consider removing once
256// the ring has baked for some time.
257#[derive(Default, Debug)]
258struct YieldTracker {
259    /// Number of times we yielded because the next page was being updated by the reader.
260    update_flag: u64,
261    /// Number of times we yielded because the page to overwrite still had active writers.
262    node_match: u64,
263    /// Number of times we yielded because we failed to lock the head page for overwrite.
264    head_lock: u64,
265}
266impl YieldTracker {
267    fn total(&self) -> u64 {
268        self.update_flag + self.node_match + self.head_lock
269    }
270
271    /// Yields or sleeps progressively based on the total retry count.
272    fn yield_or_sleep(&self) {
273        let total = self.total();
274        if total > 1000 {
275            std::thread::sleep(PROGRESSIVE_SLEEP_DURATION);
276        } else if total > 100 {
277            std::thread::sleep(SPIN_SLEEP_DURATION);
278        } else {
279            std::thread::yield_now();
280        }
281    }
282}
283
284/// Represents a reserved region of the ring buffer that is allocated, but not committed yet.
285pub struct Reservation<'a> {
286    pub offset: usize,
287    pub node_idx: usize,
288    pub size: usize,
289    buffer: &'a LocklessRingBuffer,
290    committed: bool,
291}
292
293impl<'a> Reservation<'a> {
294    pub fn write_at(&self, rel_offset: usize, data: &[u8]) {
295        assert!(rel_offset + data.len() <= self.size, "Write exceeds reservation size");
296        self.buffer.mapping.write_at(self.offset + rel_offset, data);
297    }
298
299    fn release(&mut self) {
300        if self.committed {
301            return;
302        }
303
304        let node_idx = self.node_idx;
305        let node = &self.buffer.nodes[node_idx];
306
307        node.release_writer();
308
309        self.buffer.ref_count.fetch_sub(1, Ordering::Release);
310        self.committed = true;
311    }
312}
313
314impl<'a> Drop for Reservation<'a> {
315    fn drop(&mut self) {
316        if !self.committed {
317            starnix_logging::log_warn!("LocklessRingBuffer: Reservation dropped without commit");
318        }
319        self.release();
320    }
321}
322
323impl<'a> std::fmt::Debug for Reservation<'a> {
324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325        f.debug_struct("Reservation")
326            .field("offset", &self.offset)
327            .field("node_idx", &self.node_idx)
328            .field("size", &self.size)
329            .field("committed", &self.committed)
330            .finish()
331    }
332}
333#[derive(Debug, PartialEq, Eq)]
334enum AdvanceResult {
335    /// The tail page has been successfully advanced or transitioned.
336    Advanced,
337    /// Failed to advance due to transient contention. The caller should yield and retry.
338    Yielded,
339    /// A hard error occurred (e.g. buffer is full and overwrite is disabled).
340    Error(Errno),
341}
342
343impl LocklessRingBuffer {
344    /// Attempts to reserve space on the current tail page.
345    ///
346    /// Returns `Ok((offset, now, delta))` if successful, or `Err(())` if the page is full.
347    fn try_reserve_on_page(
348        &self,
349        tail_node: &Node,
350        size: usize,
351    ) -> Result<(usize, zx::BootInstant, zx::Duration<zx::BootTimeline>), ()> {
352        if !self.is_enabled() {
353            log_warn!(
354                "LocklessRingBuffer: canceling try_reserve_on_page because ring is disabled."
355            );
356            return Err(());
357        }
358
359        // Atomically reserve space on the page using a bounded CAS loop.
360        // By verifying that `current_offset + size <= PAGE_SIZE` before executing the CAS,
361        // we guarantee that failed reservations never advance `write_offset`, keeping it exactly
362        // clamped to valid data boundaries.
363        let mut current_offset = tail_node.write_offset.load(Ordering::Acquire);
364        loop {
365            if current_offset + size > (*PAGE_SIZE) as usize {
366                return Err(());
367            }
368            match tail_node.write_offset.compare_exchange_weak(
369                current_offset,
370                current_offset + size,
371                Ordering::AcqRel,
372                Ordering::Acquire,
373            ) {
374                Ok(_) => break,
375                Err(actual) => current_offset = actual,
376            }
377        }
378
379        let now_candidate = zx::BootInstant::get().into_nanos() as u64;
380        let actual_prev = self.prev_timestamp.fetch_max(now_candidate, Ordering::AcqRel);
381        let final_now_nanos = std::cmp::max(now_candidate, actual_prev);
382        let now = zx::BootInstant::from_nanos(final_now_nanos as i64);
383
384        // First write on this page. Set timestamp.
385        let delta_nanos = if current_offset == LocklessRingBuffer::PAGE_HEADER_SIZE {
386            tail_node.page_data.write_at(0, &final_now_nanos.to_le_bytes());
387            0
388        } else {
389            final_now_nanos.saturating_sub(actual_prev)
390        };
391
392        let delta = zx::Duration::from_nanos(delta_nanos as i64);
393
394        Ok((current_offset, now, delta))
395    }
396    /// Advances the tail page to the next page in the ring.
397    ///
398    /// Handles marking the current page as full, moving the head page if overwrite is enabled,
399    /// and updating the global `tail_page` pointer.
400    fn advance_to_next_page(
401        &self,
402        tail_node: &Node,
403        tail_val: usize,
404        yield_tracker: &mut YieldTracker,
405    ) -> AdvanceResult {
406        // Short-circuit if another thread has already advanced the tail page past tail_val.
407        if self.tail_page.load(Ordering::Acquire) != tail_val {
408            return AdvanceResult::Advanced;
409        }
410
411        // If there are no active writers, finalize the page.
412        // We need to check both here and in commit() (and Drop) to avoid race conditions.
413        // Specifically, a writer might finish and see the page is not full yet, while the
414        // thread that failed to fit its data and is moving to the next page hasn't marked it full yet.
415        // By checking in both places, we ensure that at least one thread sees both conditions.
416        if self.is_enabled() {
417            tail_node.finalize();
418        }
419
420        let next_val = tail_node.next.load(Ordering::Acquire);
421        let next_idx = Node::get_index(next_val);
422        let next_flags = Node::get_flags(next_val);
423        if next_flags == FLAG_UPDATE {
424            // Reader is swapping this page. Wait.
425            starnix_logging::log_warn!(
426                "Reservation yielding due to FLAG_UPDATE on node {}",
427                next_idx
428            );
429            yield_tracker.update_flag += 1;
430            yield_tracker.yield_or_sleep();
431            return AdvanceResult::Yielded;
432        } else if next_flags == FLAG_HEADER {
433            if self.overwrite {
434                // Check if any active writer is on the page we want to overwrite.
435                if (self.nodes[next_idx].active_writers.load(Ordering::Acquire)
436                    & ACTIVE_WRITERS_MASK)
437                    > 0
438                {
439                    yield_tracker.node_match += 1;
440                    yield_tracker.yield_or_sleep();
441                    return AdvanceResult::Yielded;
442                }
443
444                starnix_logging::log_warn!(
445                    "LocklessRingBuffer Overwriting page {} (overwriting={})",
446                    next_idx,
447                    self.overwrite
448                );
449                // Try to lock the head page
450                let expected_next = Node::make_val(next_idx, FLAG_HEADER);
451                let locked_next = Node::make_val(next_idx, FLAG_UPDATE);
452                // AcqRel on success:
453                // - Acquire: Ensures we see the up-to-date links of the head page we are about to move.
454                // - Release: Ensures our transition to FLAG_UPDATE (locking) is visible to others.
455                // Relaxed on failure: We fail to acquire the lock and just retry.
456                if tail_node
457                    .next
458                    .compare_exchange_weak(
459                        expected_next,
460                        locked_next,
461                        Ordering::AcqRel,
462                        Ordering::Relaxed,
463                    )
464                    .is_ok()
465                {
466                    // Move the head page.
467                    let head_node = &self.nodes[next_idx];
468                    let head_next_val = head_node.next.load(Ordering::Acquire);
469                    let head_next_idx = Node::get_index(head_next_val);
470
471                    // Set FLAG_HEADER on the new head pointer
472                    let new_head_next = Node::make_val(head_next_idx, FLAG_HEADER);
473                    head_node.next.store(new_head_next, Ordering::Release);
474
475                    // Update global head_page
476                    self.head_page.store(head_next_idx, Ordering::Release);
477
478                    // Unlock the old head pointer (tail_node.next) and make it FLAG_NORMAL
479                    let unlocked_next = Node::make_val(next_idx, FLAG_NORMAL);
480                    tail_node.next.store(unlocked_next, Ordering::Release);
481                    self.dropped_pages.fetch_add(1, Ordering::Relaxed);
482                    fuchsia_trace::async_instant!(
483                        self.write_event_async_id,
484                        "starnix:trace_meta",
485                        "page dropped"
486                    );
487                    return AdvanceResult::Advanced;
488                } else {
489                    // Failed to lock head_page. Retry.
490                    yield_tracker.head_lock += 1;
491                    yield_tracker.yield_or_sleep();
492                    return AdvanceResult::Yielded;
493                }
494            } else {
495                // Buffer is full and overwrite is false.
496                starnix_logging::log_error!("LocklessRingBuffer is full");
497                return AdvanceResult::Error(errno!(ENOSPC));
498            }
499        }
500
501        let next_node = &self.nodes[next_idx];
502        // Only the thread that successfully transitions write_offset from PAGE_SIZE (or full)
503        // to PAGE_HEADER_SIZE is allowed to reset the page and advance the tail pointer.
504        // This prevents the tail-advancing pointer race and slow-thread clobbering.
505        let (should_advance, won_reset) = {
506            let mut current = next_node.write_offset.load(Ordering::Acquire);
507            loop {
508                // Another thread advanced the tail page.
509                if self.tail_page.load(Ordering::Acquire) != tail_val {
510                    break (false, false);
511                }
512                // Edge Case: If `current` is already `PAGE_HEADER_SIZE`, another concurrent writer thread has
513                // already won the reset race on this target page. Since `self.tail_page` hasn't advanced yet,
514                // we must still return `should_advance = true` to cooperate in advancing the global `tail_page`
515                // pointer, while returning `won_reset = false` to avoid redundantly clobbering page metadata.
516                if current == LocklessRingBuffer::PAGE_HEADER_SIZE {
517                    break (true, false);
518                }
519                match next_node.write_offset.compare_exchange_weak(
520                    current,
521                    LocklessRingBuffer::PAGE_HEADER_SIZE,
522                    Ordering::AcqRel,
523                    Ordering::Relaxed,
524                ) {
525                    Ok(_) => break (true, true),
526                    Err(actual) => current = actual,
527                }
528            }
529        };
530
531        if should_advance {
532            if won_reset {
533                // Reset `active_writers` to PAGE_ACTIVE_BIT (1) since the page is active.
534                next_node.active_writers.store(PAGE_ACTIVE_BIT, Ordering::Release);
535                // Clear the size field in the page header to avoid reader seeing stale size.
536                next_node.page_data.write_at(8, &0u64.to_le_bytes());
537            }
538
539            // Try to move tail_page to next_val.
540            // If it fails, it means another thread has already successfully advanced tail_page.
541            if let Err(err) = self.tail_page.compare_exchange(
542                tail_val,
543                next_val,
544                Ordering::AcqRel,
545                Ordering::Relaxed,
546            ) {
547                starnix_logging::log_debug!("Tail page already advanced by another thread: {err}");
548            }
549        }
550        AdvanceResult::Advanced
551    }
552
553    pub fn reserve(
554        &self,
555        size: usize,
556    ) -> Result<(Reservation<'_>, zx::BootInstant, zx::Duration<zx::BootTimeline>), Errno> {
557        // Check that the reservation is non-zero and fits within a page.
558        if size == 0 || size > (*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE {
559            return error!(EINVAL);
560        }
561        // Increment ref_count if enabled.
562        let mut val = self.ref_count.load(Ordering::Acquire);
563        loop {
564            if val & RING_ENABLED_BIT == 0 {
565                return error!(ENOMEM);
566            }
567            // Acquire on success:
568            // - Acquire: Synchronizes with the Release store in `enable()`. This guarantees that if the writer
569            //   observes that the ring is enabled, it will also observe all prior state initialization and memory
570            //   setups made by `enable()` before starting any writes.
571            // - Release: Synchronizes with `disable()`'s Acquire load on `ref_count`, ensuring that if `disable()`
572            //   observes a zero active writer count, it is guaranteed that this writer has either not yet entered
573            //   or has fully completed its operations.
574            // Note: Only the decrement in `release()` needs to be Release to ensure that the data payload writes
575            // are fully visible. Since the writer has not yet written any payload data to the page, there are no
576            // memory writes that need to be published to other threads via Release ordering at this point.
577            match self.ref_count.compare_exchange_weak(
578                val,
579                val + 1,
580                Ordering::Acquire,
581                Ordering::Relaxed,
582            ) {
583                Ok(_) => break,
584                Err(actual) => val = actual,
585            }
586        }
587
588        // Create a scope guard to decrement ref_count on error, panic, or early exit.
589        let ref_count_guard = scopeguard::guard(&self.ref_count, |ref_count| {
590            ref_count.fetch_sub(1, Ordering::Release);
591        });
592
593        // Lock the reservation.
594        let mut yield_tracker = YieldTracker::default();
595        let result = loop {
596            if !self.is_enabled() {
597                log_info!("LocklessRingBuffer: canceling reserve because ring is disabled.");
598                break error!(ENOMEM);
599            }
600            let tail_val = self.tail_page.load(Ordering::Acquire);
601            let tail_idx = Node::get_index(tail_val);
602            let tail_node = &self.nodes[tail_idx];
603
604            // Increment the writer count before trying to reserve space, if it fails decrement it.
605            // We use compare_exchange_weak in a loop to ensure we only increment if the PAGE_ACTIVE_BIT is still set.
606            // Note: The success and failure orderings are Relaxed because the writer has not yet written any payload
607            // data to the page. Therefore, there are no memory writes that need to be published to other threads
608            // via Release ordering.
609            let mut active_incremented = false;
610            let mut old_writers = tail_node.active_writers.load(Ordering::Acquire);
611            loop {
612                if old_writers & PAGE_ACTIVE_BIT == 0 {
613                    break;
614                }
615                match tail_node.active_writers.compare_exchange_weak(
616                    old_writers,
617                    old_writers + 1,
618                    Ordering::Relaxed,
619                    Ordering::Relaxed,
620                ) {
621                    Ok(_) => {
622                        active_incremented = true;
623                        break;
624                    }
625                    Err(actual) => old_writers = actual,
626                }
627            }
628
629            let reservation_result = if active_incremented {
630                self.try_reserve_on_page(tail_node, size)
631            } else {
632                Err(())
633            };
634
635            // Try to claim space on the current page.
636            match reservation_result {
637                Ok((offset, now, delta)) => {
638                    break Ok((
639                        Reservation {
640                            offset: tail_idx * (*PAGE_SIZE) as usize + offset,
641                            node_idx: tail_idx,
642                            size,
643                            buffer: self,
644                            committed: false,
645                        },
646                        now,
647                        delta,
648                    ));
649                }
650                Err(()) => {
651                    // Handle the race where the last successful writer decrements active_writers but
652                    // sees prev_writers > 1 because another concurrent writer (this thread) has already
653                    // incremented it. This thread fails reservation because the page is full, decrements
654                    // active_writers to 0, and gets prev_writers == 1. We must check and finalize here to
655                    // guarantee that full pages are always finalized even if the finalizer thread failed its reserve.
656                    if active_incremented {
657                        self.nodes[tail_idx].release_writer();
658                    }
659                    // Page is full. Advance to the next page.
660                    match self.advance_to_next_page(tail_node, tail_val, &mut yield_tracker) {
661                        AdvanceResult::Advanced | AdvanceResult::Yielded => {}
662                        AdvanceResult::Error(e) => {
663                            break Err(e);
664                        }
665                    }
666                }
667            }
668            // Debug logging for high yield counts.
669            if yield_tracker.total() % 100_000_000 == 0 && yield_tracker.total() > 0 {
670                log_warn!(
671                    "LocklessRingBuffer: spinning in reservation loop.  details = {yield_tracker:?}",
672                );
673            }
674        };
675        // Cleanup and return
676        let total_yields = yield_tracker.total();
677        if total_yields > 500_000 {
678            starnix_logging::log_info!(
679                "Reservation completed with yields: total={}, details={:?}",
680                total_yields,
681                yield_tracker
682            );
683        }
684        match result {
685            Ok((res, now, delta)) => {
686                // Success: Disarm the scope guard so ref_count is NOT decremented.
687                scopeguard::ScopeGuard::into_inner(ref_count_guard);
688                Ok((res, now, delta))
689            }
690            Err(e) => {
691                // Error: The scope guard will automatically decrement ref_count when dropped.
692                Err(e)
693            }
694        }
695    }
696    pub fn commit(&self, mut reservation: Reservation<'_>) {
697        reservation.release();
698    }
699
700    pub fn swap_reader_page(&self) -> Option<usize> {
701        let mut retries = 0;
702        loop {
703            let head_val = self.head_page.load(Ordering::Acquire);
704            let head_idx = Node::get_index(head_val);
705            let tail_val = self.tail_page.load(Ordering::Acquire);
706            let tail_idx = Node::get_index(tail_val);
707            if head_idx == tail_idx {
708                // Ring is empty or we are writing to the only page.
709                return None;
710            }
711            let head_node = &self.nodes[head_idx];
712            // Perform a single Acquire load on `active_writers` to establish happens-before with the finalizing writer.
713            // We check concurrently that the page is finalized (PAGE_ACTIVE_BIT is cleared to 0) and that there are no active writers (count is 0).
714            let active_writers = head_node.active_writers.load(Ordering::Acquire);
715            // We can swap the page if it is no longer active and has no active writers.
716            // We ignore the PAGE_FINALIZED_BIT when determining if the page is ready.
717            if (active_writers & (PAGE_ACTIVE_BIT | ACTIVE_WRITERS_MASK)) != 0 {
718                return None;
719            }
720            let mut size_bytes = [0u8; 8];
721            head_node.page_data.read_at(8, &mut size_bytes);
722            let size = u64::from_le_bytes(size_bytes);
723            if size == 0 {
724                // This can happen even if `head_page != tail_page` (the ring is not empty).
725                // Specifically, if a writer advanced the tail to the next page, the global
726                // `head_page` pointer may have advanced, but the active writer on this new
727                // `head_page` has not yet called `commit()` or finalized the page.
728                // Thus, the data size field at offset 8 in the page header remains 0.
729                return None;
730            }
731            let next_val = head_node.next.load(Ordering::Acquire);
732            let next_idx = Node::get_index(next_val);
733            let prev_val = head_node.prev.load(Ordering::Acquire);
734            let prev_idx = Node::get_index(prev_val);
735            let prev_node = &self.nodes[prev_idx];
736            // Try to lock the head page by setting UPDATE flag on prev.next
737            let expected_next = Node::make_val(head_idx, FLAG_HEADER);
738            let locked_next = Node::make_val(head_idx, FLAG_UPDATE);
739            if prev_node
740                .next
741                .compare_exchange_weak(
742                    expected_next,
743                    locked_next,
744                    Ordering::AcqRel,
745                    Ordering::Relaxed,
746                )
747                .is_ok()
748            {
749                // Locked! Now perform the swap.
750                let reader_idx = self.reader_page.load(Ordering::Acquire);
751                let next_node = &self.nodes[next_idx];
752                // Update pointers to insert reader node.
753                self.nodes[reader_idx]
754                    .prev
755                    .store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
756                self.nodes[reader_idx]
757                    .next
758                    .store(Node::make_val(next_idx, FLAG_HEADER), Ordering::Relaxed);
759                next_node.prev.store(Node::make_val(reader_idx, FLAG_NORMAL), Ordering::Relaxed);
760                // Reset write offset of the new head page (old reader page)
761                self.nodes[reader_idx]
762                    .write_offset
763                    .store(LocklessRingBuffer::PAGE_HEADER_SIZE, Ordering::Relaxed);
764                // Reset `active_writers` to PAGE_ACTIVE_BIT before recycling.
765                self.nodes[reader_idx].active_writers.store(PAGE_ACTIVE_BIT, Ordering::Relaxed);
766                // Clear the size field in the page header to avoid reader seeing stale size.
767                self.nodes[reader_idx].page_data.write_at(8, &0u64.to_le_bytes());
768                // Unlock prev_node and set its next to reader_idx as NORMAL
769                let unlocked_val = Node::make_val(reader_idx, FLAG_NORMAL);
770                prev_node.next.store(unlocked_val, Ordering::Release);
771                // Update global pointers
772                self.head_page.store(next_idx, Ordering::Release);
773                self.reader_page.store(head_idx, Ordering::Release);
774                // Return the old head page index (now reader page)
775                return Some(head_idx);
776            }
777            // Failed to lock, retry.
778            starnix_logging::log_warn!("swap_reader_page failed to lock, yielding");
779            retries += 1;
780            if retries >= 100_000 {
781                starnix_logging::log_error!("LocklessRingBuffer: HUNG in swap_reader_page loop");
782                return None;
783            }
784            // Failed to acquire the lock for swapping. Spin and retry.
785            // Note: The log message above says "yielding" but we actually spin here
786            // to avoid context switch overhead for this short wait.
787            std::hint::spin_loop();
788        }
789    }
790    pub fn read(&self, buf: &mut dyn OutputBuffer) -> Result<usize, Errno> {
791        // Validate that concurrent reads on a single ring buffer do not happen.
792        // By design in both ftrace/tracefs and Starnix VFS, only a single reader is active at a time.
793        if self.reader_active.swap(true, Ordering::AcqRel) {
794            starnix_logging::log_error!(
795                "LocklessRingBuffer: concurrent reads detected! Concurrent reads are not supported by design."
796            );
797            return error!(EBUSY);
798        }
799
800        let _lock_guard = scopeguard::guard(&self.reader_active, |reader_active| {
801            reader_active.store(false, Ordering::Release);
802        });
803
804        // Increment ref_count if enabled to prevent disable() from shrinking the VMO
805        // while we are reading from it. We use a compare_exchange loop instead of
806        // fetch_add to avoid modifying the counter if the ring is already disabled,
807        // which prevents live-locking disable().
808        let mut val = self.ref_count.load(Ordering::Acquire);
809        loop {
810            if val & RING_ENABLED_BIT == 0 {
811                return error!(EAGAIN);
812            }
813            match self.ref_count.compare_exchange_weak(
814                val,
815                val + 1,
816                Ordering::Acquire,
817                Ordering::Relaxed,
818            ) {
819                Ok(_) => break,
820                Err(actual) => val = actual,
821            }
822        }
823
824        let _guard = scopeguard::guard(&self.ref_count, |ref_count| {
825            ref_count.fetch_sub(1, Ordering::Release);
826        });
827
828        if buf.available() < (*PAGE_SIZE) as usize {
829            starnix_logging::log_error!(
830                "Buffer is too small: {} bytes (needs {} bytes)",
831                buf.available(),
832                (*PAGE_SIZE) as usize
833            );
834            return error!(EINVAL);
835        }
836        // TODO(https://fxbug.dev/505532201): Consider handling buffers larger than a page.
837        // this currently only works for a single page.
838        if let Some(idx) = self.swap_reader_page() {
839            let node = &self.nodes[idx];
840            let mut offset = 0;
841            let page_size = (*PAGE_SIZE) as usize;
842            let bytes_written = buf.write_each(&mut move |segment| {
843                let available = page_size - offset;
844                if available == 0 {
845                    return Ok(0);
846                }
847                let size = std::cmp::min(segment.len(), available);
848                // SAFETY: We are writing to this segment, so it's safe to treat it as initialized after we write.
849                // We cast it to `&mut [u8]` to pass to `read_at`.
850                let segment_mut_u8 = unsafe {
851                    std::slice::from_raw_parts_mut(segment.as_mut_ptr() as *mut u8, size)
852                };
853                node.page_data.read_at(offset, segment_mut_u8);
854                offset += size;
855                Ok(size)
856            })?;
857            return Ok(bytes_written);
858        }
859        error!(EAGAIN)
860    }
861
862    #[cfg(test)]
863    pub(crate) fn size_bytes(&self) -> usize {
864        self.nodes.len() * (*PAGE_SIZE) as usize
865    }
866
867    pub fn is_enabled(&self) -> bool {
868        self.ref_count.load(Ordering::Acquire) & RING_ENABLED_BIT != 0
869    }
870    pub fn disable(&self) -> Result<u64, Errno> {
871        let _lock = self.state_mutex.lock();
872        // Clear the enabled bit.
873        self.ref_count.fetch_and(!RING_ENABLED_BIT, Ordering::AcqRel);
874        let mut yield_count: u64 = 0;
875        loop {
876            let active_writers = self.ref_count.load(Ordering::Acquire);
877            if active_writers == 0 {
878                break;
879            }
880            yield_count += 1;
881            if yield_count % 1_000_000 == 0 {
882                log_error!(
883                    "LocklessRingBuffer: disable is waiting for {active_writers} active writers",
884                );
885            }
886            // We use yield_now() here as a compromise:
887            // - spin_wait would waste too much CPU if the writer is descheduled.
888            // - sleep adds too much latency for what should be a very short wait (copying data).
889            // Yielding gives other threads a chance to run while keeping latency low.
890            std::thread::yield_now();
891        }
892        if yield_count > 500_000 {
893            log_warn!("LocklessRingBuffer disable took {} yields", yield_count);
894        }
895        if let Err(e) = self.vmo.set_size(0) {
896            starnix_logging::log_error!(
897                "LocklessRingBuffer disable failed to set_size(0): {:?}",
898                e
899            );
900            return Err(from_status_like_fdio!(e));
901        }
902        let dropped = self.dropped_pages.load(Ordering::Relaxed);
903        Ok(dropped)
904    }
905    pub fn enable(&self) -> Result<zx::BootInstant, Errno> {
906        let _lock = self.state_mutex.lock();
907        let initial_pages = self.nodes.len() - 1;
908        let capacity = self.nodes.len() * (*PAGE_SIZE) as usize;
909        if let Err(e) = self.vmo.set_size(capacity as u64) {
910            starnix_logging::log_error!("LocklessRingBuffer enable failed to set_size: {:?}", e);
911            return Err(from_status_like_fdio!(e));
912        }
913        let now = zx::BootInstant::get();
914        // Reset state
915        self.head_page.store(0, Ordering::Release);
916        self.tail_page.store(0, Ordering::Release);
917
918        self.reader_page.store(initial_pages, Ordering::Release);
919
920        self.dropped_pages.store(0, Ordering::Release);
921        self.prev_timestamp.store(now.into_nanos() as u64, Ordering::Release);
922
923        for i in 0..self.nodes.len() {
924            self.nodes[i]
925                .write_offset
926                .store(LocklessRingBuffer::PAGE_HEADER_SIZE, Ordering::Release);
927            self.nodes[i].active_writers.store(PAGE_ACTIVE_BIT, Ordering::Release);
928        }
929        for i in 0..initial_pages {
930            let next_idx = (i + 1) % initial_pages;
931            let prev_idx = (i + initial_pages - 1) % initial_pages;
932            self.nodes[i].next.store(Node::make_val(next_idx, FLAG_NORMAL), Ordering::Relaxed);
933            self.nodes[i].prev.store(Node::make_val(prev_idx, FLAG_NORMAL), Ordering::Relaxed);
934        }
935        self.nodes[initial_pages - 1].next.store(Node::make_val(0, FLAG_HEADER), Ordering::Relaxed);
936
937        // Initialize reader page size to 0
938        self.nodes[initial_pages].page_data.write_at(8, &0u64.to_le_bytes());
939        // Initialize first page.
940        let _ = self.vmo.as_vmo().expect("vmo must exist").op_range(zx::VmoOp::ZERO, 0, *PAGE_SIZE);
941        let nanos = now.into_nanos() as u64;
942        self.nodes[0].page_data.write_at(0, &nanos.to_le_bytes());
943
944        // Set enabled bit LAST to ensure state is fully reset before writes start.
945        self.ref_count.store(RING_ENABLED_BIT, Ordering::Release);
946        Ok(now)
947    }
948}
949impl Drop for LocklessRingBuffer {
950    fn drop(&mut self) {
951        let (ptr, len) = self.mapping.as_ptr_len();
952        // SAFETY: The mapping was created in `new` and is valid for this lifetime.
953        // We are freeing it now because the object is being destroyed.
954        unsafe {
955            let _ = vmar_root_self().unmap(ptr as usize, len);
956        }
957    }
958}
959#[cfg(test)]
960mod tests {
961    use super::*;
962    use crate::vfs::buffers::VecOutputBuffer;
963    use crate::vfs::{Buffer, OutputBufferCallback, PeekBufferSegmentsCallback};
964    use std::sync::Arc;
965    #[repr(C)]
966    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
967    struct TestMessage {
968        thread_index: u32,
969        timestamp_nanos: u64,
970        delta: u64,
971        data: [u8; 12],
972    }
973    impl TestMessage {
974        pub const SIZE: usize = 32;
975        fn to_bytes(&self) -> [u8; TestMessage::SIZE] {
976            let mut bytes = [0u8; TestMessage::SIZE];
977            bytes[0..4].copy_from_slice(&self.thread_index.to_le_bytes());
978            bytes[4..12].copy_from_slice(&self.timestamp_nanos.to_le_bytes());
979            bytes[12..20].copy_from_slice(&self.delta.to_le_bytes());
980            bytes[20..32].copy_from_slice(&self.data);
981            bytes
982        }
983        fn from_bytes(bytes: &[u8]) -> Self {
984            let mut thread_index = [0u8; 4];
985            thread_index.copy_from_slice(&bytes[0..4]);
986            let mut timestamp_nanos = [0u8; 8];
987            timestamp_nanos.copy_from_slice(&bytes[4..12]);
988            let mut delta = [0u8; 8];
989            delta.copy_from_slice(&bytes[12..20]);
990            let mut data = [0u8; 12];
991            data.copy_from_slice(&bytes[20..32]);
992            Self {
993                thread_index: u32::from_le_bytes(thread_index),
994                timestamp_nanos: u64::from_le_bytes(timestamp_nanos),
995                delta: u64::from_le_bytes(delta),
996                data,
997            }
998        }
999    }
1000    #[test]
1001    fn test_init() {
1002        let buffer =
1003            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1004                .unwrap();
1005        assert_eq!(buffer.size_bytes(), 3 * (*PAGE_SIZE) as usize);
1006    }
1007    #[test]
1008    fn test_reserve() {
1009        let buffer =
1010            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1011                .unwrap();
1012        let res = buffer.reserve(100).unwrap();
1013        assert_eq!(res.0.size, 100);
1014        assert_eq!(res.0.offset, LocklessRingBuffer::PAGE_HEADER_SIZE);
1015        assert_eq!(res.0.node_idx, 0);
1016    }
1017    #[test]
1018    fn test_commit() {
1019        let buffer =
1020            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1021                .unwrap();
1022        let res = buffer.reserve(100).unwrap();
1023        buffer.commit(res.0);
1024        assert_eq!(buffer.nodes[0].active_writers.load(Ordering::Relaxed), PAGE_ACTIVE_BIT);
1025    }
1026    #[test]
1027    fn test_swap_reader_page_empty() {
1028        let buffer =
1029            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1030                .unwrap();
1031        assert_eq!(buffer.swap_reader_page(), None);
1032    }
1033    #[test]
1034    fn test_swap_reader_page_success() {
1035        let buffer =
1036            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1037                .unwrap();
1038        // Fill node 0
1039        let res1 =
1040            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1041        buffer.commit(res1.0);
1042        // Next reserve should go to node 1
1043        let res2 = buffer.reserve(100).unwrap();
1044        assert_eq!(res2.0.node_idx, 1);
1045        buffer.commit(res2.0);
1046        // Page 0 has size > 0 and tail advanced to Page 1.
1047        // So swap_reader_page should succeed!
1048        let old_head = buffer.swap_reader_page();
1049        assert_eq!(old_head, Some(0));
1050        // And head_page should now have advanced to index 1.
1051        assert_eq!(buffer.head_page.load(Ordering::Relaxed), 1);
1052    }
1053    #[test]
1054    fn test_concurrent_reserve() {
1055        use std::sync::Arc;
1056        let buffer = Arc::new(
1057            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1058                .unwrap(),
1059        );
1060        let mut handles = vec![];
1061        for _ in 0..5 {
1062            let buffer_clone = Arc::clone(&buffer);
1063            let handle = std::thread::spawn(move || {
1064                for _ in 0..20 {
1065                    if let Ok(res) = buffer_clone.reserve(10) {
1066                        buffer_clone.commit(res.0);
1067                    }
1068                }
1069            });
1070            handles.push(handle);
1071        }
1072        for handle in handles {
1073            handle.join().unwrap();
1074        }
1075        for i in 0..buffer.nodes.len() {
1076            assert_eq!(buffer.nodes[i].active_writers.load(Ordering::Relaxed), PAGE_ACTIVE_BIT);
1077        }
1078    }
1079    #[test]
1080    fn test_reserve_moves_to_next_page() {
1081        let buffer =
1082            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1083                .unwrap();
1084        // Reserve most of the page, leaving only 50 bytes
1085        let res1 = buffer
1086            .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE - 50)
1087            .unwrap();
1088        buffer.commit(res1.0);
1089        // Try to reserve 100 bytes. It shouldn't fit on node 0.
1090        let res2 = buffer.reserve(100).unwrap();
1091        // It should be on node 1.
1092        assert_eq!(res2.0.node_idx, 1);
1093        assert_eq!(res2.0.offset, (*PAGE_SIZE) as usize + LocklessRingBuffer::PAGE_HEADER_SIZE);
1094    }
1095    #[test]
1096    fn test_reserve_overwrite() {
1097        let buffer =
1098            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1099                .unwrap();
1100        // Fill node 0 and commit
1101        let res1 =
1102            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1103        buffer.commit(res1.0);
1104        // Fill node 1 but do NOT commit
1105        let _res2 =
1106            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1107        // Now tail is at node 1. Next reserve wants to move to node 0.
1108        // But commit is still at node 0.
1109        // So it should trigger overwrite.
1110        let res3 = buffer.reserve(100).unwrap();
1111        // It should succeed and be on node 0.
1112        assert_eq!(res3.0.node_idx, 0);
1113        assert_eq!(buffer.dropped_pages(), 1);
1114        // And head_page should have advanced to 1.
1115        assert_eq!(buffer.head_page.load(Ordering::Relaxed), 1);
1116    }
1117    #[test]
1118    fn test_read() {
1119        let buffer =
1120            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1121                .unwrap();
1122        let res1 =
1123            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1124        let data = vec![1u8; (*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE];
1125        res1.0.write_at(0, &data);
1126        buffer.commit(res1.0);
1127        // Advance commit page to node 1
1128        let res2 = buffer.reserve(100).unwrap();
1129        buffer.commit(res2.0);
1130        let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1131        let result = buffer.read(&mut dest);
1132        assert!(result.is_ok());
1133        assert_eq!(result.unwrap(), (*PAGE_SIZE) as usize);
1134        assert_eq!(&dest.data()[LocklessRingBuffer::PAGE_HEADER_SIZE..], &data[..]);
1135    }
1136    #[test]
1137    fn test_enable_disable() {
1138        let buffer =
1139            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1140                .unwrap();
1141        assert!(buffer.disable().is_ok());
1142        assert_eq!(buffer.ref_count.load(Ordering::Relaxed) & RING_ENABLED_BIT, 0);
1143        let res = buffer.reserve(100);
1144        assert_eq!(res.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1145        assert!(buffer.enable().is_ok());
1146        assert_eq!(buffer.size_bytes(), 3 * (*PAGE_SIZE) as usize);
1147        let res = buffer.reserve(100);
1148        assert!(res.is_ok());
1149    }
1150    // Spawns a reader thread that periodically drains the lockless ring buffer.
1151    //
1152    // `read_page_delay`: An optional delay `(mod_val, duration)` indicating that the reader should
1153    // sleep for `duration` after reading every `mod_val` pages. This is typically `None`, but can be
1154    // used to simulate a slow or batch-based reader to trigger page overwrites/drops in tests.
1155    fn start_reader_thread(
1156        buffer_reader: Arc<LocklessRingBuffer>,
1157        writers_done_reader: Arc<std::sync::atomic::AtomicBool>,
1158        read_page_delay: Option<(usize, std::time::Duration)>,
1159    ) -> std::thread::JoinHandle<Vec<TestMessage>> {
1160        std::thread::spawn(move || {
1161            let mut all_messages = Vec::new();
1162            let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1163            let mut consecutive_eagain = 0;
1164            let mut pages_read = 0;
1165            loop {
1166                dest.reset();
1167                match buffer_reader.read(&mut dest) {
1168                    Ok(bytes_read) => {
1169                        consecutive_eagain = 0;
1170                        assert_eq!(bytes_read, (*PAGE_SIZE) as usize);
1171                        let mut header_ts_bytes = [0u8; 8];
1172                        header_ts_bytes.copy_from_slice(&dest.data()[0..8]);
1173                        let header_timestamp = u64::from_le_bytes(header_ts_bytes);
1174                        let mut size_bytes = [0u8; 8];
1175                        size_bytes.copy_from_slice(&dest.data()[8..16]);
1176                        let data_size = u64::from_le_bytes(size_bytes) as usize;
1177                        let max_offset = LocklessRingBuffer::PAGE_HEADER_SIZE + data_size;
1178                        let mut offset = LocklessRingBuffer::PAGE_HEADER_SIZE;
1179                        let mut first_msg = true;
1180                        while offset + TestMessage::SIZE <= max_offset {
1181                            let msg_bytes = &dest.data()[offset..offset + TestMessage::SIZE];
1182                            let msg = TestMessage::from_bytes(msg_bytes);
1183                            if first_msg {
1184                                if header_timestamp != msg.timestamp_nanos {
1185                                    println!(
1186                                        "HEADER TIMESTAMP MISMATCH: header={}, msg={}",
1187                                        header_timestamp, msg.timestamp_nanos
1188                                    );
1189                                }
1190                                first_msg = false;
1191                            }
1192                            all_messages.push(msg);
1193                            offset += TestMessage::SIZE;
1194                        }
1195                        if let Some((mod_val, duration)) = read_page_delay {
1196                            pages_read += 1;
1197                            if pages_read % mod_val == 0 {
1198                                std::thread::sleep(duration);
1199                            }
1200                        }
1201                    }
1202                    Err(e) if e == starnix_uapi::errno!(EAGAIN) => {
1203                        if writers_done_reader.load(Ordering::Acquire) {
1204                            let head_val = buffer_reader.head_page.load(Ordering::Relaxed);
1205                            let tail_val = buffer_reader.tail_page.load(Ordering::Relaxed);
1206                            // If the writers are done and the reader has caught up to the tail page
1207                            // index, all readable pages have been drained, so we break the loop.
1208                            if Node::get_index(head_val) == Node::get_index(tail_val) {
1209                                break;
1210                            }
1211                            consecutive_eagain += 1;
1212                            assert!(
1213                                consecutive_eagain < 500,
1214                                "LocklessRingBuffer reader stuck: consecutive EAGAIN limit exceeded (5s) after writers finished. head_page={}, tail_page={}, reader_page={}",
1215                                Node::get_index(head_val),
1216                                Node::get_index(tail_val),
1217                                buffer_reader.reader_page.load(Ordering::Relaxed)
1218                            );
1219                        }
1220                        std::thread::sleep(std::time::Duration::from_millis(10));
1221                    }
1222                    Err(e) => panic!("Unexpected error from read: {:?}", e),
1223                }
1224            }
1225            all_messages
1226        })
1227    }
1228    fn check_all_message_data(all_messages: &[TestMessage], num_threads: u32) {
1229        let mut prev_timestamp = 0;
1230        let mut thread_counts = vec![0; num_threads as usize];
1231        let mut out_of_order = 0;
1232        let mut corrupted = 0;
1233        for msg in all_messages {
1234            if msg.timestamp_nanos < prev_timestamp {
1235                println!("OUT OF ORDER: prev_timestamp={}, current={:?}", prev_timestamp, msg);
1236                out_of_order += 1;
1237            }
1238            prev_timestamp = msg.timestamp_nanos;
1239            if &msg.data != b"Event data\0\0" || msg.thread_index >= num_threads {
1240                println!("CORRUPTED: msg={:?}", msg);
1241                corrupted += 1;
1242            } else {
1243                thread_counts[msg.thread_index as usize] += 1;
1244            }
1245        }
1246        println!(
1247            "TEST_RESULT: Read {} messages, {} out of order, {} corrupted. Thread counts: {:?}",
1248            all_messages.len(),
1249            out_of_order,
1250            corrupted,
1251            thread_counts
1252        );
1253        // We do not assert out_of_order is 0 because wait-free reservation can cause slight reordering of timestamps.
1254        assert_eq!(corrupted, 0, "Found corrupted messages");
1255    }
1256    #[test]
1257    fn test_concurrent_read_write_4() {
1258        let num_threads = 4;
1259        let msgs_per_thread = 64;
1260        // 4 threads * 64 msgs * 32 bytes = 8192 bytes.
1261        // Each page can hold (4096 - 16) / 32 = 127 msgs (with 16 bytes unused at the end).
1262        // 256 msgs will take 2 full pages (254 msgs) + 2 msgs on a 3rd page.
1263        let buffer = Arc::new(
1264            LocklessRingBuffer::new(4 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1265                .unwrap(),
1266        );
1267        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1268        let mut handles = vec![];
1269        // Spawn reader
1270        let buffer_reader = Arc::clone(&buffer);
1271        let writers_done_reader = Arc::clone(&writers_done);
1272        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1273        // Spawn writers
1274        for thread_index in 0..num_threads {
1275            let buffer_clone = Arc::clone(&buffer);
1276            let handle = std::thread::spawn(move || {
1277                std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1278                for _ in 0..msgs_per_thread {
1279                    // Reserve exactly TestMessage::SIZE bytes
1280                    let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1281                    let msg = TestMessage {
1282                        thread_index,
1283                        timestamp_nanos: now.into_nanos() as u64,
1284                        delta: delta.into_nanos() as u64,
1285                        data: *b"Event data\0\0",
1286                    };
1287                    res.write_at(0, &msg.to_bytes());
1288                    buffer_clone.commit(res);
1289                    std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1290                }
1291            });
1292            handles.push(handle);
1293        }
1294        for handle in handles {
1295            handle.join().unwrap();
1296        }
1297        writers_done.store(true, Ordering::Release);
1298        let all_messages = reader_handle.join().unwrap();
1299        // Check the messages
1300        check_all_message_data(&all_messages, num_threads);
1301        // Trailing messages might remain on the final unfinalized tail page. We expect between 250 and 256 messages to be successfully read.
1302        assert!(
1303            all_messages.len() >= 250 && all_messages.len() <= 256,
1304            "Expected between 250 and 256 messages, got {}",
1305            all_messages.len()
1306        );
1307    }
1308    #[test]
1309    fn test_concurrent_read_write_1_thread() {
1310        let num_threads = 1;
1311        let msgs_per_thread = 256;
1312        // 1 thread * 256 msgs = 256 msgs.
1313        // Each page holds 127 msgs. 256 msgs requires 3 data pages.
1314        // We use 5 pages total to provide enough capacity.
1315        let buffer = Arc::new(
1316            LocklessRingBuffer::new(5 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1317                .unwrap(),
1318        );
1319        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1320        let mut handles = vec![];
1321        // Spawn reader
1322        let buffer_reader = Arc::clone(&buffer);
1323        let writers_done_reader = Arc::clone(&writers_done);
1324        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1325        // Spawn writers
1326        for thread_index in 0..num_threads {
1327            let buffer_clone = Arc::clone(&buffer);
1328            let handle = std::thread::spawn(move || {
1329                std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1330                for _ in 0..msgs_per_thread {
1331                    let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1332                    let msg = TestMessage {
1333                        thread_index,
1334                        timestamp_nanos: now.into_nanos() as u64,
1335                        delta: delta.into_nanos() as u64,
1336                        data: *b"Event data\0\0",
1337                    };
1338                    res.write_at(0, &msg.to_bytes());
1339                    buffer_clone.commit(res);
1340                    std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1341                }
1342            });
1343            handles.push(handle);
1344        }
1345        for handle in handles {
1346            handle.join().unwrap();
1347        }
1348        writers_done.store(true, Ordering::Release);
1349        let all_messages = reader_handle.join().unwrap();
1350        check_all_message_data(&all_messages, num_threads);
1351        assert_eq!(
1352            all_messages.len(),
1353            254,
1354            "Expected exactly 254 messages, got {}",
1355            all_messages.len()
1356        );
1357    }
1358    #[test]
1359    fn test_concurrent_read_write_8_threads() {
1360        let num_threads = 8;
1361        let msgs_per_thread = 128;
1362        // 4 threads * 64 msgs * 32 bytes = 8192 bytes.
1363        // 8 threads * 128 msgs = 1024 msgs.
1364        // Each page holds 127 msgs. 1024 msgs requires 9 data pages.
1365        // We use 12 pages total to provide enough capacity so no pages are overwritten.
1366        let buffer = Arc::new(
1367            LocklessRingBuffer::new(12 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1368                .unwrap(),
1369        );
1370        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1371        let _writers_done_guard = scopeguard::guard(Arc::clone(&writers_done), |done| {
1372            done.store(true, Ordering::Release);
1373        });
1374        let mut handles = vec![];
1375        // Spawn reader
1376        let buffer_reader = Arc::clone(&buffer);
1377        let writers_done_reader = Arc::clone(&writers_done);
1378        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1379        // Spawn writers
1380        for thread_index in 0..num_threads {
1381            let buffer_clone = Arc::clone(&buffer);
1382            let handle = std::thread::spawn(move || {
1383                std::thread::sleep(std::time::Duration::from_millis(10 + thread_index as u64));
1384                for _ in 0..msgs_per_thread {
1385                    // Reserve exactly TestMessage::SIZE bytes
1386                    let mut retries = 0;
1387                    let (res, now, delta) = loop {
1388                        match buffer_clone.reserve(TestMessage::SIZE) {
1389                            Ok(r) => break r,
1390                            Err(e) if e == starnix_uapi::errno!(ENOSPC) => {
1391                                retries += 1;
1392                                assert!(
1393                                    retries < 100,
1394                                    "LocklessRingBuffer: ENOSPC transient limit exceeded (100ms). Reader may have hung."
1395                                );
1396                                // Under high contention, writers can transiently catch up to the head page.
1397                                // This is expected and will clear up once the reader thread gets scheduled
1398                                // and swaps a page.
1399                                std::thread::sleep(std::time::Duration::from_millis(1));
1400                            }
1401                            Err(e) => panic!("Unexpected error: {:?}", e),
1402                        }
1403                    };
1404                    let msg = TestMessage {
1405                        thread_index,
1406                        timestamp_nanos: now.into_nanos() as u64,
1407                        delta: delta.into_nanos() as u64,
1408                        data: *b"Event data\0\0",
1409                    };
1410                    res.write_at(0, &msg.to_bytes());
1411                    buffer_clone.commit(res);
1412                    std::thread::sleep(std::time::Duration::from_nanos(thread_index as u64));
1413                }
1414            });
1415            handles.push(handle);
1416        }
1417        for handle in handles {
1418            handle.join().unwrap();
1419        }
1420
1421        writers_done.store(true, Ordering::Release);
1422        let all_messages = reader_handle.join().unwrap();
1423        // Check the messages
1424        check_all_message_data(&all_messages, num_threads);
1425        // The last page in the circle (current tail page) cannot be swapped by the reader
1426        // (to prevent reading uncommitted/partial data). The number of messages left sitting on
1427        // the tail page when the writers finish depends entirely on concurrent thread scheduling
1428        // variance under high contention. We expect between 1008 and 1024 messages to be read
1429        // (leaving 0 to 16 messages unread on the final tail page).
1430        assert!(
1431            all_messages.len() >= 1008 && all_messages.len() <= 1024,
1432            "Expected between 1008 and 1024 messages, got {}",
1433            all_messages.len()
1434        );
1435    }
1436    #[test]
1437    fn test_disable_waits_for_ref_count() {
1438        use std::sync::Arc;
1439        use std::sync::atomic::{AtomicBool, Ordering};
1440        let buffer = Arc::new(
1441            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1442                .unwrap(),
1443        );
1444        // Thread 1: Reserves space, keeping ref_count = 1 (plus enabled bit).
1445        let res = buffer.reserve(100).unwrap();
1446        let disable_finished = Arc::new(AtomicBool::new(false));
1447        let disable_finished_clone = Arc::clone(&disable_finished);
1448        let buffer_clone = Arc::clone(&buffer);
1449        // Thread 2: Calls disable. This should block because ref_count has active writers.
1450        let handle = std::thread::spawn(move || {
1451            buffer_clone.disable().unwrap();
1452            disable_finished_clone.store(true, Ordering::Relaxed);
1453        });
1454        // Wait for Thread 2 to start and set size_bytes to 0
1455        while buffer.ref_count.load(Ordering::Acquire) & RING_ENABLED_BIT != 0 {
1456            std::hint::spin_loop();
1457        }
1458        // Verify that disable has NOT finished
1459        assert!(!disable_finished.load(Ordering::Relaxed));
1460        // Verify that new reserves fail with ENOMEM because disable() cleared RING_ENABLED_BIT
1461        let res2 = buffer.reserve(100);
1462        assert_eq!(res2.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1463        // Now write data and commit the first reservation.
1464        // If the VMO had been shrunk, write_data would panic/fault here.
1465        let data = vec![42u8; 100];
1466        res.0.write_at(0, &data);
1467        buffer.commit(res.0);
1468        // Wait for Thread 2 to finish
1469        handle.join().unwrap();
1470        // Verify disable finished
1471        assert!(disable_finished.load(Ordering::Relaxed));
1472    }
1473    #[test]
1474    fn test_reservation_drop_cancels_writer_count() {
1475        let buffer =
1476            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1477                .unwrap();
1478        // 1. Reserve space. ref_count becomes 1 (plus enabled bit).
1479        let res = buffer.reserve(100).unwrap();
1480        assert_eq!(buffer.ref_count.load(Ordering::Relaxed), RING_ENABLED_BIT | 1);
1481        // 2. Drop the reservation without commit.
1482        std::mem::drop(res.0);
1483        // 3. Verify ref_count becomes 0 (plus enabled bit).
1484        assert_eq!(buffer.ref_count.load(Ordering::Relaxed), RING_ENABLED_BIT);
1485        // 4. Verify that a new reservation can be committed (proving the dropped one didn't block).
1486        let res2 = buffer.reserve(100).unwrap();
1487        buffer.commit(res2.0);
1488    }
1489    #[test]
1490    fn test_overwrite_and_read_4_pages() {
1491        let num_threads = 1;
1492        // Use 6 pages total (5 data pages) to allow exactly 4 readable pages
1493        // (1 page is the active commit page).
1494        let buffer = Arc::new(
1495            LocklessRingBuffer::new(6 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1496                .unwrap(),
1497        );
1498        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1499        // 5 data pages capacity + 2 overwritten pages = 7 pages to write.
1500        // Each page holds exactly 127 messages.
1501        let msgs_per_thread = 7 * 127;
1502        let buffer_clone = Arc::clone(&buffer);
1503        let thread_index = 0;
1504        let writer_handle = std::thread::spawn(move || {
1505            for _ in 0..msgs_per_thread {
1506                let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1507                let msg = TestMessage {
1508                    thread_index,
1509                    timestamp_nanos: now.into_nanos() as u64,
1510                    delta: delta.into_nanos() as u64,
1511                    data: *b"Event data\0\0",
1512                };
1513                res.write_at(0, &msg.to_bytes());
1514                buffer_clone.commit(res);
1515            }
1516        });
1517        writer_handle.join().unwrap();
1518        assert_eq!(buffer.dropped_pages(), 2, "Expected 2 pages to be dropped");
1519        writers_done.store(true, Ordering::Release);
1520        let buffer_reader = Arc::clone(&buffer);
1521        let writers_done_reader = Arc::clone(&writers_done);
1522        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1523        let all_messages = reader_handle.join().unwrap();
1524        check_all_message_data(&all_messages, num_threads);
1525        assert_eq!(all_messages.len(), 4 * 127, "Expected exactly 4 pages of messages");
1526    }
1527    #[test]
1528    fn test_reserve_full_producer_consumer() {
1529        let buffer =
1530            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1531                .unwrap();
1532        // Fill node 0
1533        let res1 =
1534            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1535        buffer.commit(res1.0);
1536        // Fill node 1
1537        let res2 =
1538            buffer.reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE).unwrap();
1539        buffer.commit(res2.0);
1540        // Buffer is full (2 data pages used, head at 0, commit at 1 but next is head).
1541        // Next reserve should fail with ENOSPC because overwrite is false.
1542        let res3 = buffer.reserve(100);
1543        assert_eq!(res3.unwrap_err(), starnix_uapi::errno!(ENOSPC));
1544    }
1545
1546    #[test]
1547    fn test_reserve_zero_size() {
1548        let buffer =
1549            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1550                .unwrap();
1551        let res = buffer.reserve(0);
1552        assert_eq!(res.unwrap_err(), starnix_uapi::errno!(EINVAL));
1553    }
1554    #[test]
1555    fn test_concurrent_overwrite_stability() {
1556        let num_threads = 4;
1557        let msgs_per_thread = 256;
1558        // Small buffer to force overwrites
1559        let buffer = Arc::new(
1560            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1561                .unwrap(),
1562        );
1563        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1564        let mut handles = vec![];
1565
1566        let buffer_reader = Arc::clone(&buffer);
1567        let writers_done_reader = Arc::clone(&writers_done);
1568        // Set the delay to 20ms to robustly guarantee that writer threads outpace the reader
1569        // and deterministically trigger dropped/overwritten pages under any CI load variance.
1570        let delay = Some((buffer.nodes.len() - 1, std::time::Duration::from_millis(20)));
1571        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, delay);
1572
1573        // Spawn writers
1574        for thread_index in 0..num_threads {
1575            let buffer_clone = Arc::clone(&buffer);
1576            let handle = std::thread::spawn(move || {
1577                for _ in 0..msgs_per_thread {
1578                    if let Ok((res, now, delta)) = buffer_clone.reserve(TestMessage::SIZE) {
1579                        let msg = TestMessage {
1580                            thread_index,
1581                            timestamp_nanos: now.into_nanos() as u64,
1582                            delta: delta.into_nanos() as u64,
1583                            data: *b"Event data\0\0",
1584                        };
1585                        res.write_at(0, &msg.to_bytes());
1586                        buffer_clone.commit(res);
1587                    }
1588                    // Yield to allow other threads to run and cause contention
1589                    std::thread::yield_now();
1590                }
1591            });
1592            handles.push(handle);
1593        }
1594        for handle in handles {
1595            handle.join().unwrap();
1596        }
1597        writers_done.store(true, Ordering::Release);
1598        let all_messages = reader_handle.join().unwrap();
1599        // Check that data is not corrupted
1600        check_all_message_data(&all_messages, num_threads);
1601        // Verify that some pages were dropped (overwritten)
1602        assert!(buffer.dropped_pages() > 0, "Expected at least some dropped pages");
1603    }
1604
1605    #[test]
1606    fn test_out_of_order_completion_same_page() {
1607        let buffer =
1608            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1609                .unwrap();
1610
1611        let header_size = LocklessRingBuffer::PAGE_HEADER_SIZE;
1612        let page_size = (*PAGE_SIZE) as usize;
1613        let available_space = page_size - header_size;
1614
1615        let size1 = available_space / 2;
1616        let size2 = available_space - size1;
1617
1618        // Reserve first chunk
1619        let (res1, _, _) = buffer.reserve(size1).unwrap();
1620        // Reserve second chunk (fills the page)
1621        let (res2, _, _) = buffer.reserve(size2).unwrap();
1622
1623        // Commit second chunk first (out of order)
1624        buffer.commit(res2);
1625
1626        // Page should not be finalized yet because res1 is still active
1627        assert!(buffer.swap_reader_page().is_none());
1628
1629        // Commit first chunk
1630        buffer.commit(res1);
1631
1632        // Force advance tail by making a reservation that won't fit on Page 0.
1633        let _ = buffer.reserve(10).unwrap();
1634
1635        // Now page should be finalized
1636        let swapped = buffer.swap_reader_page();
1637        assert!(swapped.is_some());
1638        assert_eq!(swapped.unwrap(), 0); // First data page is node 0
1639    }
1640
1641    #[test]
1642    fn test_concurrent_readers_rejection() {
1643        #[derive(Debug)]
1644        struct SleepingOutputBuffer;
1645        impl Buffer for SleepingOutputBuffer {
1646            fn segments_count(&self) -> Result<usize, Errno> {
1647                Ok(1)
1648            }
1649            fn peek_each_segment(
1650                &mut self,
1651                _callback: &mut PeekBufferSegmentsCallback<'_>,
1652            ) -> Result<(), Errno> {
1653                Ok(())
1654            }
1655        }
1656        impl OutputBuffer for SleepingOutputBuffer {
1657            fn write_each(
1658                &mut self,
1659                _callback: &mut OutputBufferCallback<'_>,
1660            ) -> Result<usize, Errno> {
1661                Ok(0)
1662            }
1663            fn available(&self) -> usize {
1664                std::thread::sleep(std::time::Duration::from_millis(50));
1665                (*PAGE_SIZE) as usize
1666            }
1667            fn bytes_written(&self) -> usize {
1668                0
1669            }
1670            fn zero(&mut self) -> Result<usize, Errno> {
1671                Ok(0)
1672            }
1673            unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
1674                Ok(())
1675            }
1676        }
1677
1678        let buffer = Arc::new(
1679            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, false, fuchsia_trace::Id::new())
1680                .unwrap(),
1681        );
1682
1683        let buffer_clone1 = Arc::clone(&buffer);
1684        let handle1 = std::thread::spawn(move || {
1685            let mut dest = SleepingOutputBuffer;
1686            let _ = buffer_clone1.read(&mut dest);
1687        });
1688
1689        // Ensure thread 1 enters `read()` and calls `available()` where it sleeps.
1690        std::thread::sleep(std::time::Duration::from_millis(10));
1691
1692        let buffer_clone2 = Arc::clone(&buffer);
1693        let handle2 = std::thread::spawn(move || {
1694            let mut dest = VecOutputBuffer::new((*PAGE_SIZE) as usize);
1695            let res = buffer_clone2.read(&mut dest);
1696            assert_eq!(res.unwrap_err(), starnix_uapi::errno!(EBUSY));
1697        });
1698
1699        handle2.join().unwrap();
1700        handle1.join().unwrap();
1701    }
1702
1703    #[test]
1704    fn test_stale_offset_livelock() {
1705        let num_threads = 8;
1706        let msgs_per_thread = 200;
1707        // Small 3-page buffer to force wraparounds and high contention
1708        let buffer = Arc::new(
1709            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1710                .unwrap(),
1711        );
1712        let mut handles = vec![];
1713
1714        for thread_index in 0..num_threads {
1715            let buffer_clone = Arc::clone(&buffer);
1716            let handle = std::thread::spawn(move || {
1717                for _ in 0..msgs_per_thread {
1718                    if let Ok((res, now, delta)) = buffer_clone.reserve(TestMessage::SIZE) {
1719                        let msg = TestMessage {
1720                            thread_index,
1721                            timestamp_nanos: now.into_nanos() as u64,
1722                            delta: delta.into_nanos() as u64,
1723                            data: *b"Event data\0\0",
1724                        };
1725                        res.write_at(0, &msg.to_bytes());
1726                        buffer_clone.commit(res);
1727                    }
1728                    std::thread::yield_now();
1729                }
1730            });
1731            handles.push(handle);
1732        }
1733
1734        for handle in handles {
1735            handle.join().unwrap();
1736        }
1737
1738        let dropped = buffer.dropped_pages();
1739        println!("High contention completed. Total dropped pages = {}", dropped);
1740        // Dropped pages should be reasonable and not spike to infinity or cause deadlock
1741        assert!(dropped > 0);
1742    }
1743
1744    #[test]
1745    fn test_writer_preemption_and_overwrite_prevention() {
1746        let buffer = Arc::new(
1747            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1748                .unwrap(),
1749        );
1750
1751        // 1. Thread 1 reserves space on Node 0, but delays committing (preempted)
1752        let res1 = buffer.reserve(100).unwrap();
1753        assert_eq!(res1.0.node_idx, 0);
1754
1755        // 2. Thread 2 fills the rest of Node 0, Node 1, and wraps around to Node 0.
1756        // Thread 2 should block or spin since Node 0 has active writers (active_writers > 0)
1757        let buffer_clone = Arc::clone(&buffer);
1758        let writer_finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
1759        let writer_finished_clone = Arc::clone(&writer_finished);
1760
1761        let handle = std::thread::spawn(move || {
1762            // Fill Node 0
1763            let res2 = buffer_clone
1764                .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE - 100)
1765                .unwrap();
1766            buffer_clone.commit(res2.0);
1767
1768            // Fill Node 1
1769            let res3 = buffer_clone
1770                .reserve((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE)
1771                .unwrap();
1772            buffer_clone.commit(res3.0);
1773
1774            // Try to reserve again - wraps around to Node 0 (which still has res1 active)
1775            // This should block/spin until we commit res1.
1776            let res4 = buffer_clone.reserve(100).unwrap();
1777            buffer_clone.commit(res4.0);
1778
1779            writer_finished_clone.store(true, Ordering::Release);
1780        });
1781
1782        // Give the thread some time to reach the block point
1783        std::thread::sleep(std::time::Duration::from_millis(50));
1784        assert!(!writer_finished.load(Ordering::Acquire));
1785
1786        // Now commit res1, which unblocks the wraparound writer
1787        buffer.commit(res1.0);
1788
1789        handle.join().unwrap();
1790        assert!(writer_finished.load(Ordering::Acquire));
1791    }
1792
1793    #[test]
1794    fn test_extreme_disable_enable_stress() {
1795        let num_threads = 8;
1796        let msgs_per_thread = 100;
1797        let buffer = Arc::new(
1798            LocklessRingBuffer::new(5 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1799                .unwrap(),
1800        );
1801        let mut handles = vec![];
1802
1803        // 1. Spawn 8 writer threads writing continuously until they successfully write msgs_per_thread.
1804        for thread_index in 0..num_threads {
1805            let buffer_clone = Arc::clone(&buffer);
1806            let handle = std::thread::spawn(move || {
1807                let mut count = 0;
1808                while count < msgs_per_thread {
1809                    match buffer_clone.reserve(TestMessage::SIZE) {
1810                        Ok((res, now, delta)) => {
1811                            let msg = TestMessage {
1812                                thread_index,
1813                                timestamp_nanos: now.into_nanos() as u64,
1814                                delta: delta.into_nanos() as u64,
1815                                data: *b"Event data\0\0",
1816                            };
1817                            res.write_at(0, &msg.to_bytes());
1818                            buffer_clone.commit(res);
1819                            count += 1;
1820                        }
1821                        Err(e) if e == starnix_uapi::errno!(ENOMEM) => {
1822                            // Expected when disabled. Yield and retry.
1823                            std::thread::yield_now();
1824                        }
1825                        Err(e) => panic!("Unexpected error during reserve: {:?}", e),
1826                    }
1827                }
1828                count
1829            });
1830            handles.push(handle);
1831        }
1832
1833        // 2. Coordinator thread repeatedly disables and enables the ring buffer.
1834        let buffer_clone = Arc::clone(&buffer);
1835        let coordinator = std::thread::spawn(move || {
1836            for _ in 0..20 {
1837                std::thread::sleep(std::time::Duration::from_millis(5));
1838                // Disable the ring buffer. This must successfully wait for all active writers.
1839                let _dropped = buffer_clone.disable().unwrap();
1840
1841                // Ensure subsequent reservations fail while disabled.
1842                let res = buffer_clone.reserve(TestMessage::SIZE);
1843                assert_eq!(res.unwrap_err(), starnix_uapi::errno!(ENOMEM));
1844
1845                std::thread::sleep(std::time::Duration::from_millis(2));
1846                // Re-enable the ring buffer.
1847                let _now = buffer_clone.enable().unwrap();
1848            }
1849        });
1850
1851        coordinator.join().unwrap();
1852        let mut total_writes = 0;
1853        for handle in handles {
1854            total_writes += handle.join().unwrap();
1855        }
1856        println!("Disable/enable stress completed. Total writes = {}", total_writes);
1857        assert_eq!(total_writes, num_threads * msgs_per_thread);
1858    }
1859
1860    #[test]
1861    fn test_reader_loop_swapping_high_contention() {
1862        let num_threads = 8usize;
1863        let msgs_per_thread = 100;
1864        // 8 threads * 100 msgs * 32 bytes = 25600 bytes.
1865        // Each page holds 127 messages.
1866        // Let's make a 10-page buffer to have ample space.
1867        let buffer = Arc::new(
1868            LocklessRingBuffer::new(10 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1869                .unwrap(),
1870        );
1871        let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
1872        let mut handles = vec![];
1873
1874        // Spawn unified reader thread that loops read() draining all pages.
1875        let buffer_reader = Arc::clone(&buffer);
1876        let writers_done_reader = Arc::clone(&writers_done);
1877        let reader_handle = start_reader_thread(buffer_reader, writers_done_reader, None);
1878
1879        // Spawn 8 writer threads.
1880        for thread_index in 0..num_threads {
1881            let buffer_clone = Arc::clone(&buffer);
1882            let handle = std::thread::spawn(move || {
1883                for _ in 0..msgs_per_thread {
1884                    let (res, now, delta) = buffer_clone.reserve(TestMessage::SIZE).unwrap();
1885                    let msg = TestMessage {
1886                        thread_index: thread_index as u32,
1887                        timestamp_nanos: now.into_nanos() as u64,
1888                        delta: delta.into_nanos() as u64,
1889                        data: *b"Event data\0\0",
1890                    };
1891                    res.write_at(0, &msg.to_bytes());
1892                    buffer_clone.commit(res);
1893                    // Yield to cause maximum interleaving
1894                    std::thread::yield_now();
1895                }
1896            });
1897            handles.push(handle);
1898        }
1899
1900        for handle in handles {
1901            handle.join().unwrap();
1902        }
1903        writers_done.store(true, Ordering::Release);
1904        let all_messages = reader_handle.join().unwrap();
1905        let messages_read = all_messages.len();
1906        check_all_message_data(&all_messages, num_threads as u32);
1907        let dropped = buffer.dropped_pages();
1908        println!(
1909            "Reader high contention completed. Messages read = {}, dropped pages = {}",
1910            messages_read, dropped
1911        );
1912        // Verify that the total number of read messages + dropped pages' messages covers all writes.
1913        let messages_per_page =
1914            ((*PAGE_SIZE) as usize - LocklessRingBuffer::PAGE_HEADER_SIZE) / TestMessage::SIZE;
1915        let total_written = num_threads * msgs_per_thread;
1916        let total_accounted = messages_read + (dropped as usize * messages_per_page);
1917        assert!(total_accounted >= total_written - messages_per_page);
1918    }
1919
1920    #[test]
1921    fn test_failed_reservation_offset_boundary() {
1922        let buffer =
1923            LocklessRingBuffer::new(3 * (*PAGE_SIZE) as usize, true, fuchsia_trace::Id::new())
1924                .unwrap();
1925        let page_size = (*PAGE_SIZE) as usize;
1926        let max_payload = page_size - LocklessRingBuffer::PAGE_HEADER_SIZE;
1927
1928        // Reserve enough space to almost fill Node 0
1929        let msg_size = 100;
1930        let num_msgs = max_payload / msg_size;
1931        for _ in 0..num_msgs {
1932            let (res, _, _) = buffer.reserve(msg_size).unwrap();
1933            buffer.commit(res);
1934        }
1935
1936        // Check current write_offset of Node 0
1937        let expected_offset = LocklessRingBuffer::PAGE_HEADER_SIZE + num_msgs * msg_size;
1938        assert_eq!(buffer.nodes[0].write_offset.load(Ordering::Acquire), expected_offset);
1939
1940        // Now try a reservation that exceeds remaining space on Node 0.
1941        let (res, _, _) = buffer.reserve(msg_size).unwrap();
1942        assert_eq!(res.node_idx, 1);
1943        buffer.commit(res);
1944
1945        // Check write_offset of Node 0.
1946        // Before CAS fix, write_offset was 4116 (> page_size).
1947        // After CAS fix, write_offset is exactly expected_offset (4016).
1948        assert_eq!(buffer.nodes[0].write_offset.load(Ordering::Acquire), expected_offset);
1949    }
1950}