Skip to main content

spsc_buffer/
lib.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![no_std]
6
7use core::sync::atomic::{AtomicU64, Ordering};
8use core::{cmp, ptr, slice};
9pub use kalloc::NoOpAllocator;
10use kalloc::{Allocator, Box, DefaultAllocator};
11use zx_status::Status;
12
13/// A simple convenience type used to hold the read and write pointers as separate values.
14#[derive(Copy, Clone, Debug, PartialEq, Eq)]
15struct RingPointers {
16    read: u32,
17    write: u32,
18}
19
20impl RingPointers {
21    /// Constructs a new `RingPointers` with the given read and write offsets.
22    const fn new(read: u32, write: u32) -> Self {
23        Self { read, write }
24    }
25
26    /// Splits combined 64-bit pointers into individual read and write pointers.
27    const fn from_combined(combined: u64) -> Self {
28        Self::new((combined >> 32) as u32, combined as u32)
29    }
30
31    /// Combines read and write pointers into a single 64-bit value.
32    const fn as_combined(&self) -> u64 {
33        ((self.read as u64) << 32) | (self.write as u64)
34    }
35
36    /// Returns the amount of data available to read in the buffer.
37    const fn available_data(&self) -> u32 {
38        self.write.wrapping_sub(self.read)
39    }
40}
41
42/// A wrapper around a slot of memory in the ring buffer.
43///
44/// A Reservation has a predetermined size that is determined by the size passed into `reserve`. Any
45/// attempt to write more than this amount of data into the slot is a programming error and will
46/// cause an assertion failure. This class provides a formal way for writers to serialize data in
47/// place in the ring buffer, thus eliminating the need for a temporary serialization buffer.
48#[derive(Debug)]
49pub struct Reservation<'a> {
50    combined_pointers: &'a AtomicU64,
51    storage_len: u32,
52    initial_ring_pointers: RingPointers,
53    region1: &'a mut [u8],
54    region2: &'a mut [u8],
55    write_offset: u32,
56    committed: bool,
57}
58
59impl<'a> Drop for Reservation<'a> {
60    fn drop(&mut self) {
61        debug_assert!(self.committed, "Reservation dropped without being committed");
62    }
63}
64
65impl<'a> Reservation<'a> {
66    /// Writes the given data into this reservation.
67    pub fn write(&mut self, data: &[u8]) -> Result<(), Status> {
68        if self.committed {
69            return Err(Status::BAD_STATE);
70        }
71
72        let mut bytes_to_copy = data.len();
73        let mut region1_copy_amount = 0;
74        let mut write_offset = self.write_offset as usize;
75
76        let region1_len = self.region1.len();
77        if write_offset < region1_len {
78            let space_left_in_region1 = region1_len - write_offset;
79            region1_copy_amount = cmp::min(bytes_to_copy, space_left_in_region1);
80
81            self.region1[write_offset..write_offset + region1_copy_amount]
82                .copy_from_slice(&data[..region1_copy_amount]);
83
84            write_offset += region1_copy_amount;
85            bytes_to_copy -= region1_copy_amount;
86        }
87
88        if bytes_to_copy > 0 {
89            if write_offset < region1_len {
90                return Err(Status::BAD_STATE);
91            }
92            let region2_len = self.region2.len();
93            let region2_offset = write_offset - region1_len;
94            if region2_len < region2_offset {
95                return Err(Status::BAD_STATE);
96            }
97            if region2_len - region2_offset < bytes_to_copy {
98                return Err(Status::BUFFER_TOO_SMALL);
99            }
100
101            self.region2[region2_offset..region2_offset + bytes_to_copy]
102                .copy_from_slice(&data[region1_copy_amount..region1_copy_amount + bytes_to_copy]);
103
104            write_offset += bytes_to_copy;
105        }
106
107        self.write_offset = write_offset as u32;
108        Ok(())
109    }
110
111    /// Advances the write pointer of the associated spsc buffer.
112    ///
113    /// This makes the written data visible to the reader, and thus can only be called once all
114    /// writes have been completed and the reservation is fully written.
115    pub fn commit(mut self) -> Result<(), Status> {
116        if self.committed {
117            return Err(Status::BAD_STATE);
118        }
119        self.committed = true;
120
121        let total_len =
122            self.region1.len().checked_add(self.region2.len()).ok_or(Status::BAD_STATE)? as u32;
123        if self.write_offset != total_len {
124            return Err(Status::BAD_STATE);
125        }
126
127        advance_write_pointer(
128            self.combined_pointers,
129            self.storage_len,
130            self.initial_ring_pointers,
131            total_len,
132        )
133    }
134}
135
136/// A transactional, single-producer, single-consumer ring buffer.
137///
138/// The caller is responsible for ensuring that there is only one reader and one writer; no internal
139/// synchronization is provided to enforce this constraint.
140///
141/// Backing storage is allocated dynamically during the `init` method. The requested size must be a
142/// power of two for correct functionality.
143// TODO(https://fxbug.dev/517301686): Use bindgen or another systematic way to avoid duplicating
144// this structure and causing drift.
145#[repr(C, align(8))]
146pub struct Buffer<A: Allocator + Default = DefaultAllocator> {
147    // The read and write pointers are stored as the upper and lower halves, respectively, of a
148    // single 64-bit atomic.
149    combined_pointers: AtomicU64,
150    // The types used for `storage` and `size` must match those of ktl::span.
151    storage: *mut u8,
152    // The size of the backing storage in bytes. This is enforced to be at most
153    // MAX_STORAGE_SIZE (2 GiB), meaning this value will never exceed u32::MAX.
154    size: usize,
155    _phantom: core::marker::PhantomData<A>,
156}
157
158impl<A: Allocator + Default> Drop for Buffer<A> {
159    fn drop(&mut self) {
160        if !self.storage.is_null() {
161            let slice_ptr = ptr::slice_from_raw_parts_mut(self.storage, self.size);
162            unsafe {
163                let _ = Box::from_raw_in(slice_ptr, A::default());
164            }
165        }
166    }
167}
168
169impl<A: Allocator + Default> Buffer<A> {
170    /// Maximum size of backing storage buffer (2 GiB).
171    const MAX_STORAGE_SIZE: u32 = 1 << 31;
172
173    /// Constructs a new `Buffer` with a dynamically allocated backing storage of the given size,
174    /// using the given allocator.
175    pub fn try_new_in(size: u32, allocator: A) -> Result<Self, Status> {
176        if size > Self::MAX_STORAGE_SIZE {
177            return Err(Status::INVALID_ARGS);
178        }
179        if !size.is_power_of_two() {
180            return Err(Status::INVALID_ARGS);
181        }
182
183        let storage_box = Box::<[u8], A>::try_new_zeroed_slice_in(size as usize, allocator)
184            .map_err(|_| Status::NO_MEMORY)?;
185        let (storage_ptr, _) = Box::into_raw_with_allocator(storage_box);
186
187        Ok(Self {
188            combined_pointers: AtomicU64::new(0),
189            storage: storage_ptr as *mut u8,
190            size: size as usize,
191            _phantom: core::marker::PhantomData,
192        })
193    }
194
195    /// Returns the size of the backing storage.
196    pub fn size(&self) -> u32 {
197        self.size as u32
198    }
199
200    /// Reserves a block of the given size in the buffer.
201    ///
202    /// Any data written into this block will not be visible to readers until `commit` is called on
203    /// the returned `Reservation`.
204    pub fn reserve(&mut self, size: u32) -> Result<Reservation<'_>, Status> {
205        if size == 0 || size > Self::MAX_STORAGE_SIZE {
206            return Err(Status::INVALID_ARGS);
207        }
208
209        let storage_len = self.size as u32;
210        if size > storage_len {
211            return Err(Status::NO_SPACE);
212        }
213
214        let initial_state = self.load_pointers();
215        let available_space = self.available_space(initial_state);
216        if available_space < size {
217            return Err(Status::NO_SPACE);
218        }
219
220        let write_offset = self.pointer_to_offset(initial_state.write);
221        let ring_break_distance = storage_len - write_offset;
222        let bytes_before_break = cmp::min(size, ring_break_distance);
223
224        // SAFETY: The creator of Buffer must ensure that `storage` points to a valid memory region
225        // of `size` bytes.
226        let storage_slice = unsafe { slice::from_raw_parts_mut(self.storage, self.size) };
227        let (left, right) = storage_slice.split_at_mut(write_offset as usize);
228        let region1 = &mut right[..bytes_before_break as usize];
229
230        let region2 = if bytes_before_break < size {
231            let region2_len = size - bytes_before_break;
232            &mut left[..region2_len as usize]
233        } else {
234            &mut []
235        };
236
237        Ok(Reservation {
238            combined_pointers: &self.combined_pointers,
239            storage_len,
240            initial_ring_pointers: initial_state,
241            region1,
242            region2,
243            write_offset: 0,
244            committed: false,
245        })
246    }
247
248    /// Copies `len` bytes out of the buffer using the provided `copy_fn`.
249    ///
250    /// The copy function has the signature `copy_fn(offset: u32, src: &[u8]) -> Result<(), Status>`
251    /// and may be invoked multiple times.
252    ///
253    /// Returns the number of bytes read on success. If `copy_fn` returns an error, that error is
254    /// propagated.
255    ///
256    /// Importantly, even if an error is returned, `copy_fn` might have already processed a partial
257    /// amount of data (between 0 and `len` bytes). However, these partially processed bytes are
258    /// considered *not read*. Consequently, the internal read pointer of the ring buffer will *not*
259    /// be advanced for these unread bytes, meaning that these same bytes will remain available for
260    /// reading in subsequent calls to `read`.
261    pub fn read<F>(&self, mut copy_fn: F, len: u32) -> Result<u32, Status>
262    where
263        F: FnMut(u32, &[u8]) -> Result<(), Status>,
264    {
265        let initial_state = self.load_pointers();
266        let available_data = initial_state.available_data();
267        if available_data == 0 {
268            return Ok(0);
269        }
270
271        let amount_to_copy = cmp::min(available_data, len) as usize;
272        let read_offset = self.pointer_to_offset(initial_state.read) as usize;
273        let ring_break_distance = self.size - read_offset;
274        let bytes_before_break = cmp::min(amount_to_copy, ring_break_distance);
275
276        // SAFETY: The creator of Buffer must ensure that `storage` points to a valid memory region
277        // of `size` bytes.
278        let storage_slice = unsafe { slice::from_raw_parts(self.storage, self.size) };
279        let slice1 = &storage_slice[read_offset..read_offset + bytes_before_break];
280        copy_fn(0, slice1)?;
281
282        if bytes_before_break < amount_to_copy {
283            let bytes_after_break = amount_to_copy - bytes_before_break;
284            let slice2 = &storage_slice[..bytes_after_break];
285            copy_fn(bytes_before_break as u32, slice2)?;
286        }
287
288        self.advance_read_pointer(initial_state, amount_to_copy as u32)?;
289        Ok(amount_to_copy as u32)
290    }
291
292    /// Empties the contents of the buffer.
293    ///
294    /// This is logically a read operation, so a `read` and `drain` cannot be called concurrently.
295    /// Additionally, the behavior of this method is non-deterministic if a write is in-progress.
296    pub fn drain(&self) -> Result<(), Status> {
297        let initial_state = self.load_pointers();
298        let available_data = initial_state.available_data();
299        if available_data == 0 {
300            return Ok(());
301        }
302        self.advance_read_pointer(initial_state, available_data)
303    }
304
305    // Helper function that converts read and write pointers into ring buffer offsets.
306    //
307    // This is logically performing pointer % storage.len(), but because storage.len() is guaranteed
308    // to be a power of two it is equivalent to this logical AND.
309    fn pointer_to_offset(&self, pointer: u32) -> u32 {
310        let storage_len = self.size as u32;
311        pointer & (storage_len - 1)
312    }
313
314    /// Returns the remaining available space in the buffer.
315    fn available_space(&self, pointers: RingPointers) -> u32 {
316        let storage_len = self.size as u32;
317        storage_len.wrapping_sub(pointers.available_data())
318    }
319
320    /// Loads the current values of the read and write pointers.
321    fn load_pointers(&self) -> RingPointers {
322        let combined = self.combined_pointers.load(Ordering::Acquire);
323        RingPointers::from_combined(combined)
324    }
325
326    // Adds the given delta to the read half of the combined_pointers.
327    //
328    // Because we store the pointers in a single combined atomic variable, we must update the entire
329    // combined pointer. We perform this update using a compare and exchange to ensure that
330    // concurrent operations to the write half of the combined pointers are preserved. We also check
331    // to ensure reads do not encounter concurrent reads.
332    //
333    // This is a store-release operation that synchronizes with the load-acquire in load_pointers.
334    // By using release semantics, we ensure that if the updated value is seen in load_pointers, all
335    // memory operations that occurred prior to this update are observable.
336    fn advance_read_pointer(&self, initial: RingPointers, delta: u32) -> Result<(), Status> {
337        if delta > initial.available_data() {
338            return Err(Status::INVALID_ARGS);
339        }
340
341        let target_read = initial.read.wrapping_add(delta);
342        let mut starting_pointers = initial.as_combined();
343        let mut target_pointers = RingPointers::new(target_read, initial.write).as_combined();
344
345        loop {
346            match self.combined_pointers.compare_exchange_weak(
347                starting_pointers,
348                target_pointers,
349                Ordering::Release,
350                Ordering::Relaxed,
351            ) {
352                Ok(_) => break,
353                Err(observed_combined) => {
354                    starting_pointers = observed_combined;
355                    let observed = RingPointers::from_combined(observed_combined);
356                    debug_assert_eq!(
357                        observed.read, initial.read,
358                        "potential concurrent read detected; expected read pointer {}, got {}",
359                        initial.read, observed.read
360                    );
361                    target_pointers = RingPointers::new(target_read, observed.write).as_combined();
362                }
363            }
364        }
365        Ok(())
366    }
367}
368
369// Adds the given delta to the write half of the combined_pointers.
370//
371// Because we store the pointers in a single combined atomic variable, we must update the entire
372// combined pointer. We perform this update using a compare and exchange to ensure that concurrent
373// operations to the read half of the combined pointers are preserved. We also check to ensure
374// writes do not encounter concurrent writes.
375//
376// This is a store-release operation that synchronizes with the load-acquire in load_pointers. By
377// using release semantics, we ensure that if the updated value is seen in load_pointers, all memory
378// operations that occurred prior to this update are observable.
379fn advance_write_pointer(
380    combined_pointers: &AtomicU64,
381    storage_len: u32,
382    initial: RingPointers,
383    delta: u32,
384) -> Result<(), Status> {
385    let available_data = initial.available_data();
386    if delta > storage_len.checked_sub(available_data).ok_or(Status::INVALID_ARGS)? {
387        return Err(Status::INVALID_ARGS);
388    }
389
390    let target_write = initial.write.wrapping_add(delta);
391    let mut starting_pointers = initial.as_combined();
392    let mut target_pointers = RingPointers::new(initial.read, target_write).as_combined();
393
394    loop {
395        match combined_pointers.compare_exchange_weak(
396            starting_pointers,
397            target_pointers,
398            Ordering::Release,
399            Ordering::Relaxed,
400        ) {
401            Ok(_) => break,
402            Err(observed_combined) => {
403                starting_pointers = observed_combined;
404                let observed = RingPointers::from_combined(observed_combined);
405                debug_assert_eq!(
406                    observed.write, initial.write,
407                    "potential concurrent write detected; expected write pointer {}, got {}",
408                    initial.write, observed.write
409                );
410                target_pointers = RingPointers::new(observed.read, target_write).as_combined();
411            }
412        }
413    }
414    Ok(())
415}
416
417impl Buffer<DefaultAllocator> {
418    /// Constructs a new `Buffer` with a dynamically allocated backing storage of the given size,
419    /// using the default allocator.
420    pub fn try_new(size: u32) -> Result<Self, Status> {
421        Self::try_new_in(size, DefaultAllocator)
422    }
423}
424
425impl Buffer<NoOpAllocator> {
426    /// Constructs a `Buffer` from raw pointers using a no-op allocator.
427    ///
428    /// The returned buffer does not own the memory and will not deallocate it when dropped.
429    ///
430    /// # Safety
431    ///
432    /// - `storage` must point to a valid, initialized slice of bytes whose length is a power of two
433    ///   and does not exceed `MAX_STORAGE_SIZE`.
434    pub unsafe fn from_raw_parts(storage: *mut u8, size: usize) -> Self {
435        Self {
436            combined_pointers: AtomicU64::new(0),
437            storage,
438            size,
439            _phantom: core::marker::PhantomData,
440        }
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_ring_pointers_from_combined() {
450        struct TestCase {
451            combined_pointers: u64,
452            expected: RingPointers,
453        }
454        let test_cases = [
455            TestCase {
456                combined_pointers: 0x01320087_01005382,
457                expected: RingPointers::new(0x1320087, 0x1005382),
458            },
459            TestCase { combined_pointers: 0, expected: RingPointers::new(0, 0) },
460            TestCase {
461                combined_pointers: 0xFFFFFFFF_00004587,
462                expected: RingPointers::new(0xFFFFFFFF, 0x4587),
463            },
464            TestCase {
465                combined_pointers: 0x00004587_FFFFFFFF,
466                expected: RingPointers::new(0x4587, 0xFFFFFFFF),
467            },
468        ];
469
470        for tc in &test_cases {
471            let actual = RingPointers::from_combined(tc.combined_pointers);
472            assert_eq!(tc.expected, actual);
473        }
474    }
475
476    #[test]
477    fn test_ring_pointers_as_combined() {
478        struct TestCase {
479            pointers: RingPointers,
480            combined: u64,
481        }
482        let test_cases = [
483            TestCase {
484                pointers: RingPointers::new(0x1320087, 0x1005382),
485                combined: 0x01320087_01005382,
486            },
487            TestCase { pointers: RingPointers::new(0, 0), combined: 0 },
488            TestCase {
489                pointers: RingPointers::new(0xFFFFFFFF, 0x123),
490                combined: 0xFFFFFFFF_00000123,
491            },
492            TestCase {
493                pointers: RingPointers::new(0x123, 0xFFFFFFFF),
494                combined: 0x00000123_FFFFFFFF,
495            },
496        ];
497
498        for tc in &test_cases {
499            let actual = tc.pointers.as_combined();
500            assert_eq!(tc.combined, actual);
501        }
502    }
503
504    #[test]
505    fn test_available_space() {
506        struct TestCase {
507            pointers: RingPointers,
508            buffer_size: u32,
509            expected: u32,
510        }
511        let test_cases = [
512            TestCase { pointers: RingPointers::new(0, 0), buffer_size: 16, expected: 16 },
513            TestCase { pointers: RingPointers::new(3, 3), buffer_size: 16, expected: 16 },
514            TestCase { pointers: RingPointers::new(3, 7), buffer_size: 16, expected: 12 },
515            TestCase { pointers: RingPointers::new(0xFFFFFFFC, 3), buffer_size: 16, expected: 9 },
516            TestCase { pointers: RingPointers::new(0, 16), buffer_size: 16, expected: 0 },
517        ];
518
519        for tc in &test_cases {
520            let buffer = Buffer::try_new(tc.buffer_size).unwrap();
521            let actual = buffer.available_space(tc.pointers);
522            assert_eq!(tc.expected, actual);
523        }
524    }
525
526    #[test]
527    fn test_ring_pointers_available_data() {
528        struct TestCase {
529            pointers: RingPointers,
530            expected: u32,
531        }
532        let test_cases = [
533            TestCase { pointers: RingPointers::new(0, 0), expected: 0 },
534            TestCase { pointers: RingPointers::new(3, 3), expected: 0 },
535            TestCase { pointers: RingPointers::new(3, 7), expected: 4 },
536            TestCase { pointers: RingPointers::new(0, 16), expected: 16 },
537            TestCase { pointers: RingPointers::new(0xFFFFFFFC, 3), expected: 7 },
538        ];
539
540        for tc in &test_cases {
541            let actual = tc.pointers.available_data();
542            assert_eq!(tc.expected, actual);
543        }
544    }
545
546    #[test]
547    fn test_advance_read_pointer() {
548        struct TestCase {
549            initial_pointers: RingPointers,
550            buffer_size: u32,
551            delta: u32,
552            expected: u64,
553        }
554        let test_cases = [
555            TestCase {
556                initial_pointers: RingPointers::new(1, 6),
557                buffer_size: 16,
558                delta: 4,
559                expected: 0x5_00000006,
560            },
561            TestCase {
562                initial_pointers: RingPointers::new(0, 16),
563                buffer_size: 16,
564                delta: 16,
565                expected: 0x10_00000010,
566            },
567            TestCase {
568                initial_pointers: RingPointers::new(0xFFFFFFFC, 12),
569                buffer_size: 16,
570                delta: 5,
571                expected: 0x1_0000000C,
572            },
573        ];
574
575        for tc in &test_cases {
576            let buffer = Buffer::try_new(tc.buffer_size).unwrap();
577
578            let initial = tc.initial_pointers.as_combined();
579            buffer.combined_pointers.store(initial, Ordering::Release);
580
581            buffer.advance_read_pointer(tc.initial_pointers, tc.delta).unwrap();
582            let actual = buffer.combined_pointers.load(Ordering::Acquire);
583            assert_eq!(tc.expected, actual);
584        }
585    }
586
587    #[test]
588    fn test_advance_write_pointer() {
589        struct TestCase {
590            initial_pointers: RingPointers,
591            buffer_size: u32,
592            delta: u32,
593            expected: u64,
594        }
595        let test_cases = [
596            TestCase {
597                initial_pointers: RingPointers::new(7, 9),
598                buffer_size: 16,
599                delta: 4,
600                expected: 0x7_0000000D,
601            },
602            TestCase {
603                initial_pointers: RingPointers::new(0, 0),
604                buffer_size: 16,
605                delta: 16,
606                expected: 0x10,
607            },
608            TestCase {
609                initial_pointers: RingPointers::new(0xFFFFFFF1, 0xFFFFFFFC),
610                buffer_size: 16,
611                delta: 5,
612                expected: 0xFFFFFFF100000001,
613            },
614        ];
615
616        for tc in &test_cases {
617            let buffer = Buffer::try_new(tc.buffer_size).unwrap();
618
619            let initial = tc.initial_pointers.as_combined();
620            buffer.combined_pointers.store(initial, Ordering::Release);
621
622            advance_write_pointer(
623                &buffer.combined_pointers,
624                buffer.size as u32,
625                tc.initial_pointers,
626                tc.delta,
627            )
628            .unwrap();
629            let actual = buffer.combined_pointers.load(Ordering::Acquire);
630            assert_eq!(tc.expected, actual);
631        }
632    }
633
634    #[test]
635    fn test_try_new() {
636        // Happy case
637        {
638            assert!(Buffer::try_new(256).is_ok());
639        }
640
641        // Calling try_new with too big of a size should fail
642        {
643            assert_eq!(Buffer::try_new(u32::MAX).err().unwrap(), Status::INVALID_ARGS);
644        }
645
646        // Calling try_new with a size that is not a power of two should fail
647        {
648            assert_eq!(Buffer::try_new(100).err().unwrap(), Status::INVALID_ARGS);
649        }
650
651        // try_new should propagate allocation failures
652        {
653            assert_eq!(Buffer::try_new_in(256, NoOpAllocator).err().unwrap(), Status::NO_MEMORY);
654        }
655    }
656
657    #[test]
658    fn test_read_write_single_threaded() {
659        const STORAGE_SIZE: usize = 256;
660        let mut src = [0u8; STORAGE_SIZE];
661        for i in 0..STORAGE_SIZE {
662            src[i] = (i * 17 + 5) as u8;
663        }
664
665        struct TestCase {
666            write_size: u32,
667            read_size: u32,
668            expected_read_size: u32,
669            expected_reserve_status: Result<(), Status>,
670            expected_read_status: Result<(), Status>,
671            initial_pointers: RingPointers,
672            use_copy_out_err_fn: bool,
673        }
674
675        let test_cases = [
676            TestCase {
677                write_size: (STORAGE_SIZE / 2) as u32,
678                read_size: (STORAGE_SIZE / 2) as u32,
679                expected_read_size: (STORAGE_SIZE / 2) as u32,
680                expected_reserve_status: Ok(()),
681                expected_read_status: Ok(()),
682                initial_pointers: RingPointers::new(0, 0),
683                use_copy_out_err_fn: false,
684            },
685            TestCase {
686                write_size: (STORAGE_SIZE / 2) as u32,
687                read_size: (STORAGE_SIZE / 4) as u32,
688                expected_read_size: (STORAGE_SIZE / 4) as u32,
689                expected_reserve_status: Ok(()),
690                expected_read_status: Ok(()),
691                initial_pointers: RingPointers::new(0, 0),
692                use_copy_out_err_fn: false,
693            },
694            TestCase {
695                write_size: STORAGE_SIZE as u32,
696                read_size: STORAGE_SIZE as u32,
697                expected_read_size: STORAGE_SIZE as u32,
698                expected_reserve_status: Ok(()),
699                expected_read_status: Ok(()),
700                initial_pointers: RingPointers::new(0, 0),
701                use_copy_out_err_fn: false,
702            },
703            TestCase {
704                write_size: (STORAGE_SIZE / 4) as u32,
705                read_size: (STORAGE_SIZE / 2) as u32,
706                expected_read_size: (STORAGE_SIZE / 4) as u32,
707                expected_reserve_status: Ok(()),
708                expected_read_status: Ok(()),
709                initial_pointers: RingPointers::new(0, 0),
710                use_copy_out_err_fn: false,
711            },
712            TestCase {
713                write_size: STORAGE_SIZE as u32,
714                read_size: STORAGE_SIZE as u32,
715                expected_read_size: STORAGE_SIZE as u32,
716                expected_reserve_status: Ok(()),
717                expected_read_status: Ok(()),
718                initial_pointers: RingPointers::new(
719                    (STORAGE_SIZE / 2) as u32,
720                    (STORAGE_SIZE / 2) as u32,
721                ),
722                use_copy_out_err_fn: false,
723            },
724            TestCase {
725                write_size: STORAGE_SIZE as u32,
726                read_size: STORAGE_SIZE as u32,
727                expected_read_size: STORAGE_SIZE as u32,
728                expected_reserve_status: Ok(()),
729                expected_read_status: Ok(()),
730                initial_pointers: RingPointers::new(0xFFFFFFFA, 0xFFFFFFFA),
731                use_copy_out_err_fn: false,
732            },
733            TestCase {
734                write_size: 64,
735                read_size: 0,
736                expected_read_size: 0,
737                expected_reserve_status: Err(Status::NO_SPACE),
738                expected_read_status: Ok(()),
739                initial_pointers: RingPointers::new(0, (STORAGE_SIZE - 48) as u32),
740                use_copy_out_err_fn: false,
741            },
742            TestCase {
743                write_size: STORAGE_SIZE as u32,
744                read_size: (STORAGE_SIZE / 2) as u32,
745                expected_read_size: 0,
746                expected_reserve_status: Ok(()),
747                expected_read_status: Err(Status::BAD_STATE),
748                initial_pointers: RingPointers::new(0, 0),
749                use_copy_out_err_fn: true,
750            },
751        ];
752
753        for tc in &test_cases {
754            let mut dst = [0u8; STORAGE_SIZE];
755
756            let mut spsc = Buffer::try_new(STORAGE_SIZE as u32).unwrap();
757
758            let starting_pointers = tc.initial_pointers.as_combined();
759            spsc.combined_pointers.store(starting_pointers, Ordering::Release);
760
761            let reservation = spsc.reserve(tc.write_size);
762            if let Err(e) = &tc.expected_reserve_status {
763                match reservation {
764                    Err(actual_err) => assert_eq!(actual_err, *e),
765                    Ok(_) => panic!("expected reserve to fail with {:?}, but it succeeded", e),
766                }
767                continue;
768            }
769            let mut reservation = reservation.unwrap();
770
771            reservation.write(&src[..tc.write_size as usize]).unwrap();
772            reservation.commit().unwrap();
773
774            let copy_out_fn = |offset: u32, src_slice: &[u8]| -> Result<(), Status> {
775                let offset = offset as usize;
776                assert!(offset + src_slice.len() <= dst.len());
777                dst[offset..offset + src_slice.len()].copy_from_slice(src_slice);
778                Ok(())
779            };
780
781            let copy_out_err_fn =
782                |_offset: u32, _src_slice: &[u8]| -> Result<(), Status> { Err(Status::BAD_STATE) };
783
784            let read_result = if tc.use_copy_out_err_fn {
785                spsc.read(copy_out_err_fn, tc.read_size)
786            } else {
787                spsc.read(copy_out_fn, tc.read_size)
788            };
789
790            if let Err(e) = &tc.expected_read_status {
791                assert_eq!(read_result.unwrap_err(), *e);
792                assert_eq!(spsc.load_pointers().available_data(), tc.write_size);
793                continue;
794            }
795
796            let read_bytes = read_result.unwrap();
797            assert_eq!(read_bytes, tc.expected_read_size);
798            assert_eq!(
799                &dst[..tc.expected_read_size as usize],
800                &src[..tc.expected_read_size as usize]
801            );
802        }
803    }
804
805    #[test]
806    fn test_drain() {
807        const STORAGE_SIZE: u32 = 256;
808        let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
809
810        let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
811        let write_data = [b'f'; (STORAGE_SIZE / 2) as usize];
812        reservation.write(&write_data).unwrap();
813        reservation.commit().unwrap();
814
815        assert_eq!(spsc.load_pointers().available_data(), STORAGE_SIZE / 2);
816
817        spsc.drain().unwrap();
818        assert_eq!(spsc.load_pointers().available_data(), 0);
819    }
820
821    #[test]
822    fn test_commit_error() {
823        const STORAGE_SIZE: u32 = 256;
824        let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
825
826        let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
827        // Write fewer bytes than reserved.
828        let write_data = [b'f'; (STORAGE_SIZE / 2) as usize - 1];
829        reservation.write(&write_data).unwrap();
830        assert_eq!(reservation.commit(), Err(Status::BAD_STATE));
831    }
832
833    #[test]
834    fn test_write_error() {
835        const STORAGE_SIZE: u32 = 256;
836        let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
837
838        let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
839        // Write more bytes than reserved.
840        let write_data = [b'f'; (STORAGE_SIZE / 2) as usize + 1];
841        assert_eq!(reservation.write(&write_data), Err(Status::BUFFER_TOO_SMALL));
842        reservation.committed = true;
843    }
844
845    #[test]
846    fn test_from_raw_parts() {
847        let mut mock_storage = [0u8; 256];
848
849        // Safety: Pointers are valid.
850        let mut spsc =
851            unsafe { Buffer::from_raw_parts(mock_storage.as_mut_ptr(), mock_storage.len()) };
852
853        // Verify reserve, write, commit works
854        let mut reservation = spsc.reserve(100).unwrap();
855        let write_data = [b'x'; 100];
856        reservation.write(&write_data).unwrap();
857        reservation.commit().unwrap();
858
859        assert_eq!(spsc.load_pointers().available_data(), 100);
860        assert_eq!(spsc.combined_pointers.load(Ordering::Relaxed) & 0xffffffff, 100); // write pointer is 100
861        assert_eq!(&mock_storage[..100], &write_data[..]);
862    }
863
864    #[test]
865    fn test_reserve_zero() {
866        const STORAGE_SIZE: u32 = 256;
867        let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
868        match spsc.reserve(0) {
869            Err(e) => assert_eq!(e, Status::INVALID_ARGS),
870            Ok(_) => panic!("reserve(0) should fail with INVALID_ARGS"),
871        }
872    }
873
874    #[test]
875    fn test_reserve_too_large() {
876        const STORAGE_SIZE: u32 = 256;
877        let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
878        match spsc.reserve(u32::MAX) {
879            Err(e) => assert_eq!(e, Status::INVALID_ARGS),
880            Ok(_) => panic!("reserve(u32::MAX) should fail with INVALID_ARGS"),
881        }
882    }
883
884    #[test]
885    fn test_reserve_at_break() {
886        const STORAGE_SIZE: u32 = 16;
887        // read raw = 3, write raw = 15
888        // write_offset = 15 & 15 = 15 (distance to end is exactly 1 byte)
889        // available data = 15 - 3 = 12 bytes
890        // available space = 16 - 12 = 4 bytes
891        let mut mock_storage = [0u8; STORAGE_SIZE as usize];
892
893        // Safety: Pointers are valid.
894        let mut spsc =
895            unsafe { Buffer::from_raw_parts(mock_storage.as_mut_ptr(), mock_storage.len()) };
896        spsc.combined_pointers.store((3u64 << 32) | 15u64, Ordering::Release);
897
898        let mut reservation = spsc.reserve(4).unwrap();
899        assert_eq!(reservation.region1.len(), 1);
900        assert_eq!(reservation.region2.len(), 3);
901
902        let data = [1, 2, 3, 4];
903        reservation.write(&data).unwrap();
904        reservation.commit().unwrap();
905
906        assert_eq!(mock_storage[15], 1);
907        assert_eq!(mock_storage[0], 2);
908        assert_eq!(mock_storage[1], 3);
909        assert_eq!(mock_storage[2], 4);
910    }
911
912    #[test]
913    fn test_cpp_rust_integration() {
914        #[link(name = "c++")]
915        unsafe extern "C" {
916            fn cpp_spsc_allocate(size: u32) -> *mut Buffer<NoOpAllocator>;
917            fn cpp_spsc_free(spsc: *mut Buffer<NoOpAllocator>);
918            fn cpp_spsc_write(spsc: *mut Buffer<NoOpAllocator>, data: *const u8, len: u32) -> i32;
919            fn cpp_spsc_read(spsc: *mut Buffer<NoOpAllocator>, dst: *mut u8, len: u32) -> i32;
920        }
921
922        // 1. Allocate on the C++ side.
923        let spsc_ptr = unsafe { cpp_spsc_allocate(256) };
924        assert!(!spsc_ptr.is_null());
925
926        // Convert the raw pointer to a Rust reference to interact with it in-place.
927        let spsc = unsafe { &mut *spsc_ptr };
928
929        // 2. C++ writes, Rust reads.
930        let write_data = b"Hello from C++!";
931        let cpp_write_status =
932            unsafe { cpp_spsc_write(spsc_ptr, write_data.as_ptr(), write_data.len() as u32) };
933        assert_eq!(cpp_write_status, 0); // ZX_OK
934
935        // Rust reads and verifies.
936        let mut read_buf = [0u8; 100];
937        let bytes_read = spsc
938            .read(
939                |_, src| {
940                    read_buf[..src.len()].copy_from_slice(src);
941                    Ok(())
942                },
943                write_data.len() as u32,
944            )
945            .unwrap();
946        assert_eq!(bytes_read, write_data.len() as u32);
947        assert_eq!(&read_buf[..bytes_read as usize], write_data);
948
949        // 3. Rust writes, C++ reads.
950        let rust_write_data = b"Hello from Rust!";
951        let mut reservation = spsc.reserve(rust_write_data.len() as u32).unwrap();
952        reservation.write(rust_write_data).unwrap();
953        reservation.commit().unwrap();
954
955        // C++ reads and verifies.
956        let mut cpp_read_buf = [0u8; 100];
957        let cpp_read_bytes = unsafe {
958            cpp_spsc_read(spsc_ptr, cpp_read_buf.as_mut_ptr(), rust_write_data.len() as u32)
959        };
960        assert_eq!(cpp_read_bytes, rust_write_data.len() as i32);
961        assert_eq!(&cpp_read_buf[..cpp_read_bytes as usize], rust_write_data);
962
963        // 4. Free on the C++ side.
964        unsafe {
965            cpp_spsc_free(spsc_ptr);
966        }
967    }
968}