1#![no_std]
6
7use core::sync::atomic::{AtomicU64, Ordering};
8use core::{cmp, ptr, slice};
9pub use kalloc::NoOpAllocator;
10use kalloc::{Allocator, Box, DefaultAllocator};
11use zx_status::Status;
12
13#[derive(Copy, Clone, Debug, PartialEq, Eq)]
15struct RingPointers {
16 read: u32,
17 write: u32,
18}
19
20impl RingPointers {
21 const fn new(read: u32, write: u32) -> Self {
23 Self { read, write }
24 }
25
26 const fn from_combined(combined: u64) -> Self {
28 Self::new((combined >> 32) as u32, combined as u32)
29 }
30
31 const fn as_combined(&self) -> u64 {
33 ((self.read as u64) << 32) | (self.write as u64)
34 }
35
36 const fn available_data(&self) -> u32 {
38 self.write.wrapping_sub(self.read)
39 }
40}
41
42#[derive(Debug)]
49pub struct Reservation<'a> {
50 combined_pointers: &'a AtomicU64,
51 storage_len: u32,
52 initial_ring_pointers: RingPointers,
53 region1: &'a mut [u8],
54 region2: &'a mut [u8],
55 write_offset: u32,
56 committed: bool,
57}
58
59impl<'a> Drop for Reservation<'a> {
60 fn drop(&mut self) {
61 debug_assert!(self.committed, "Reservation dropped without being committed");
62 }
63}
64
65impl<'a> Reservation<'a> {
66 pub fn write(&mut self, data: &[u8]) -> Result<(), Status> {
68 if self.committed {
69 return Err(Status::BAD_STATE);
70 }
71
72 let mut bytes_to_copy = data.len();
73 let mut region1_copy_amount = 0;
74 let mut write_offset = self.write_offset as usize;
75
76 let region1_len = self.region1.len();
77 if write_offset < region1_len {
78 let space_left_in_region1 = region1_len - write_offset;
79 region1_copy_amount = cmp::min(bytes_to_copy, space_left_in_region1);
80
81 self.region1[write_offset..write_offset + region1_copy_amount]
82 .copy_from_slice(&data[..region1_copy_amount]);
83
84 write_offset += region1_copy_amount;
85 bytes_to_copy -= region1_copy_amount;
86 }
87
88 if bytes_to_copy > 0 {
89 if write_offset < region1_len {
90 return Err(Status::BAD_STATE);
91 }
92 let region2_len = self.region2.len();
93 let region2_offset = write_offset - region1_len;
94 if region2_len < region2_offset {
95 return Err(Status::BAD_STATE);
96 }
97 if region2_len - region2_offset < bytes_to_copy {
98 return Err(Status::BUFFER_TOO_SMALL);
99 }
100
101 self.region2[region2_offset..region2_offset + bytes_to_copy]
102 .copy_from_slice(&data[region1_copy_amount..region1_copy_amount + bytes_to_copy]);
103
104 write_offset += bytes_to_copy;
105 }
106
107 self.write_offset = write_offset as u32;
108 Ok(())
109 }
110
111 pub fn commit(mut self) -> Result<(), Status> {
116 if self.committed {
117 return Err(Status::BAD_STATE);
118 }
119 self.committed = true;
120
121 let total_len =
122 self.region1.len().checked_add(self.region2.len()).ok_or(Status::BAD_STATE)? as u32;
123 if self.write_offset != total_len {
124 return Err(Status::BAD_STATE);
125 }
126
127 advance_write_pointer(
128 self.combined_pointers,
129 self.storage_len,
130 self.initial_ring_pointers,
131 total_len,
132 )
133 }
134}
135
136#[repr(C, align(8))]
146pub struct Buffer<A: Allocator + Default = DefaultAllocator> {
147 combined_pointers: AtomicU64,
150 storage: *mut u8,
152 size: usize,
155 _phantom: core::marker::PhantomData<A>,
156}
157
158impl<A: Allocator + Default> Drop for Buffer<A> {
159 fn drop(&mut self) {
160 if !self.storage.is_null() {
161 let slice_ptr = ptr::slice_from_raw_parts_mut(self.storage, self.size);
162 unsafe {
163 let _ = Box::from_raw_in(slice_ptr, A::default());
164 }
165 }
166 }
167}
168
169impl<A: Allocator + Default> Buffer<A> {
170 const MAX_STORAGE_SIZE: u32 = 1 << 31;
172
173 pub fn try_new_in(size: u32, allocator: A) -> Result<Self, Status> {
176 if size > Self::MAX_STORAGE_SIZE {
177 return Err(Status::INVALID_ARGS);
178 }
179 if !size.is_power_of_two() {
180 return Err(Status::INVALID_ARGS);
181 }
182
183 let storage_box = Box::<[u8], A>::try_new_zeroed_slice_in(size as usize, allocator)
184 .map_err(|_| Status::NO_MEMORY)?;
185 let (storage_ptr, _) = Box::into_raw_with_allocator(storage_box);
186
187 Ok(Self {
188 combined_pointers: AtomicU64::new(0),
189 storage: storage_ptr as *mut u8,
190 size: size as usize,
191 _phantom: core::marker::PhantomData,
192 })
193 }
194
195 pub fn size(&self) -> u32 {
197 self.size as u32
198 }
199
200 pub fn reserve(&mut self, size: u32) -> Result<Reservation<'_>, Status> {
205 if size == 0 || size > Self::MAX_STORAGE_SIZE {
206 return Err(Status::INVALID_ARGS);
207 }
208
209 let storage_len = self.size as u32;
210 if size > storage_len {
211 return Err(Status::NO_SPACE);
212 }
213
214 let initial_state = self.load_pointers();
215 let available_space = self.available_space(initial_state);
216 if available_space < size {
217 return Err(Status::NO_SPACE);
218 }
219
220 let write_offset = self.pointer_to_offset(initial_state.write);
221 let ring_break_distance = storage_len - write_offset;
222 let bytes_before_break = cmp::min(size, ring_break_distance);
223
224 let storage_slice = unsafe { slice::from_raw_parts_mut(self.storage, self.size) };
227 let (left, right) = storage_slice.split_at_mut(write_offset as usize);
228 let region1 = &mut right[..bytes_before_break as usize];
229
230 let region2 = if bytes_before_break < size {
231 let region2_len = size - bytes_before_break;
232 &mut left[..region2_len as usize]
233 } else {
234 &mut []
235 };
236
237 Ok(Reservation {
238 combined_pointers: &self.combined_pointers,
239 storage_len,
240 initial_ring_pointers: initial_state,
241 region1,
242 region2,
243 write_offset: 0,
244 committed: false,
245 })
246 }
247
248 pub fn read<F>(&self, mut copy_fn: F, len: u32) -> Result<u32, Status>
262 where
263 F: FnMut(u32, &[u8]) -> Result<(), Status>,
264 {
265 let initial_state = self.load_pointers();
266 let available_data = initial_state.available_data();
267 if available_data == 0 {
268 return Ok(0);
269 }
270
271 let amount_to_copy = cmp::min(available_data, len) as usize;
272 let read_offset = self.pointer_to_offset(initial_state.read) as usize;
273 let ring_break_distance = self.size - read_offset;
274 let bytes_before_break = cmp::min(amount_to_copy, ring_break_distance);
275
276 let storage_slice = unsafe { slice::from_raw_parts(self.storage, self.size) };
279 let slice1 = &storage_slice[read_offset..read_offset + bytes_before_break];
280 copy_fn(0, slice1)?;
281
282 if bytes_before_break < amount_to_copy {
283 let bytes_after_break = amount_to_copy - bytes_before_break;
284 let slice2 = &storage_slice[..bytes_after_break];
285 copy_fn(bytes_before_break as u32, slice2)?;
286 }
287
288 self.advance_read_pointer(initial_state, amount_to_copy as u32)?;
289 Ok(amount_to_copy as u32)
290 }
291
292 pub fn drain(&self) -> Result<(), Status> {
297 let initial_state = self.load_pointers();
298 let available_data = initial_state.available_data();
299 if available_data == 0 {
300 return Ok(());
301 }
302 self.advance_read_pointer(initial_state, available_data)
303 }
304
305 fn pointer_to_offset(&self, pointer: u32) -> u32 {
310 let storage_len = self.size as u32;
311 pointer & (storage_len - 1)
312 }
313
314 fn available_space(&self, pointers: RingPointers) -> u32 {
316 let storage_len = self.size as u32;
317 storage_len.wrapping_sub(pointers.available_data())
318 }
319
320 fn load_pointers(&self) -> RingPointers {
322 let combined = self.combined_pointers.load(Ordering::Acquire);
323 RingPointers::from_combined(combined)
324 }
325
326 fn advance_read_pointer(&self, initial: RingPointers, delta: u32) -> Result<(), Status> {
337 if delta > initial.available_data() {
338 return Err(Status::INVALID_ARGS);
339 }
340
341 let target_read = initial.read.wrapping_add(delta);
342 let mut starting_pointers = initial.as_combined();
343 let mut target_pointers = RingPointers::new(target_read, initial.write).as_combined();
344
345 loop {
346 match self.combined_pointers.compare_exchange_weak(
347 starting_pointers,
348 target_pointers,
349 Ordering::Release,
350 Ordering::Relaxed,
351 ) {
352 Ok(_) => break,
353 Err(observed_combined) => {
354 starting_pointers = observed_combined;
355 let observed = RingPointers::from_combined(observed_combined);
356 debug_assert_eq!(
357 observed.read, initial.read,
358 "potential concurrent read detected; expected read pointer {}, got {}",
359 initial.read, observed.read
360 );
361 target_pointers = RingPointers::new(target_read, observed.write).as_combined();
362 }
363 }
364 }
365 Ok(())
366 }
367}
368
369fn advance_write_pointer(
380 combined_pointers: &AtomicU64,
381 storage_len: u32,
382 initial: RingPointers,
383 delta: u32,
384) -> Result<(), Status> {
385 let available_data = initial.available_data();
386 if delta > storage_len.checked_sub(available_data).ok_or(Status::INVALID_ARGS)? {
387 return Err(Status::INVALID_ARGS);
388 }
389
390 let target_write = initial.write.wrapping_add(delta);
391 let mut starting_pointers = initial.as_combined();
392 let mut target_pointers = RingPointers::new(initial.read, target_write).as_combined();
393
394 loop {
395 match combined_pointers.compare_exchange_weak(
396 starting_pointers,
397 target_pointers,
398 Ordering::Release,
399 Ordering::Relaxed,
400 ) {
401 Ok(_) => break,
402 Err(observed_combined) => {
403 starting_pointers = observed_combined;
404 let observed = RingPointers::from_combined(observed_combined);
405 debug_assert_eq!(
406 observed.write, initial.write,
407 "potential concurrent write detected; expected write pointer {}, got {}",
408 initial.write, observed.write
409 );
410 target_pointers = RingPointers::new(observed.read, target_write).as_combined();
411 }
412 }
413 }
414 Ok(())
415}
416
417impl Buffer<DefaultAllocator> {
418 pub fn try_new(size: u32) -> Result<Self, Status> {
421 Self::try_new_in(size, DefaultAllocator)
422 }
423}
424
425impl Buffer<NoOpAllocator> {
426 pub unsafe fn from_raw_parts(storage: *mut u8, size: usize) -> Self {
435 Self {
436 combined_pointers: AtomicU64::new(0),
437 storage,
438 size,
439 _phantom: core::marker::PhantomData,
440 }
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_ring_pointers_from_combined() {
450 struct TestCase {
451 combined_pointers: u64,
452 expected: RingPointers,
453 }
454 let test_cases = [
455 TestCase {
456 combined_pointers: 0x01320087_01005382,
457 expected: RingPointers::new(0x1320087, 0x1005382),
458 },
459 TestCase { combined_pointers: 0, expected: RingPointers::new(0, 0) },
460 TestCase {
461 combined_pointers: 0xFFFFFFFF_00004587,
462 expected: RingPointers::new(0xFFFFFFFF, 0x4587),
463 },
464 TestCase {
465 combined_pointers: 0x00004587_FFFFFFFF,
466 expected: RingPointers::new(0x4587, 0xFFFFFFFF),
467 },
468 ];
469
470 for tc in &test_cases {
471 let actual = RingPointers::from_combined(tc.combined_pointers);
472 assert_eq!(tc.expected, actual);
473 }
474 }
475
476 #[test]
477 fn test_ring_pointers_as_combined() {
478 struct TestCase {
479 pointers: RingPointers,
480 combined: u64,
481 }
482 let test_cases = [
483 TestCase {
484 pointers: RingPointers::new(0x1320087, 0x1005382),
485 combined: 0x01320087_01005382,
486 },
487 TestCase { pointers: RingPointers::new(0, 0), combined: 0 },
488 TestCase {
489 pointers: RingPointers::new(0xFFFFFFFF, 0x123),
490 combined: 0xFFFFFFFF_00000123,
491 },
492 TestCase {
493 pointers: RingPointers::new(0x123, 0xFFFFFFFF),
494 combined: 0x00000123_FFFFFFFF,
495 },
496 ];
497
498 for tc in &test_cases {
499 let actual = tc.pointers.as_combined();
500 assert_eq!(tc.combined, actual);
501 }
502 }
503
504 #[test]
505 fn test_available_space() {
506 struct TestCase {
507 pointers: RingPointers,
508 buffer_size: u32,
509 expected: u32,
510 }
511 let test_cases = [
512 TestCase { pointers: RingPointers::new(0, 0), buffer_size: 16, expected: 16 },
513 TestCase { pointers: RingPointers::new(3, 3), buffer_size: 16, expected: 16 },
514 TestCase { pointers: RingPointers::new(3, 7), buffer_size: 16, expected: 12 },
515 TestCase { pointers: RingPointers::new(0xFFFFFFFC, 3), buffer_size: 16, expected: 9 },
516 TestCase { pointers: RingPointers::new(0, 16), buffer_size: 16, expected: 0 },
517 ];
518
519 for tc in &test_cases {
520 let buffer = Buffer::try_new(tc.buffer_size).unwrap();
521 let actual = buffer.available_space(tc.pointers);
522 assert_eq!(tc.expected, actual);
523 }
524 }
525
526 #[test]
527 fn test_ring_pointers_available_data() {
528 struct TestCase {
529 pointers: RingPointers,
530 expected: u32,
531 }
532 let test_cases = [
533 TestCase { pointers: RingPointers::new(0, 0), expected: 0 },
534 TestCase { pointers: RingPointers::new(3, 3), expected: 0 },
535 TestCase { pointers: RingPointers::new(3, 7), expected: 4 },
536 TestCase { pointers: RingPointers::new(0, 16), expected: 16 },
537 TestCase { pointers: RingPointers::new(0xFFFFFFFC, 3), expected: 7 },
538 ];
539
540 for tc in &test_cases {
541 let actual = tc.pointers.available_data();
542 assert_eq!(tc.expected, actual);
543 }
544 }
545
546 #[test]
547 fn test_advance_read_pointer() {
548 struct TestCase {
549 initial_pointers: RingPointers,
550 buffer_size: u32,
551 delta: u32,
552 expected: u64,
553 }
554 let test_cases = [
555 TestCase {
556 initial_pointers: RingPointers::new(1, 6),
557 buffer_size: 16,
558 delta: 4,
559 expected: 0x5_00000006,
560 },
561 TestCase {
562 initial_pointers: RingPointers::new(0, 16),
563 buffer_size: 16,
564 delta: 16,
565 expected: 0x10_00000010,
566 },
567 TestCase {
568 initial_pointers: RingPointers::new(0xFFFFFFFC, 12),
569 buffer_size: 16,
570 delta: 5,
571 expected: 0x1_0000000C,
572 },
573 ];
574
575 for tc in &test_cases {
576 let buffer = Buffer::try_new(tc.buffer_size).unwrap();
577
578 let initial = tc.initial_pointers.as_combined();
579 buffer.combined_pointers.store(initial, Ordering::Release);
580
581 buffer.advance_read_pointer(tc.initial_pointers, tc.delta).unwrap();
582 let actual = buffer.combined_pointers.load(Ordering::Acquire);
583 assert_eq!(tc.expected, actual);
584 }
585 }
586
587 #[test]
588 fn test_advance_write_pointer() {
589 struct TestCase {
590 initial_pointers: RingPointers,
591 buffer_size: u32,
592 delta: u32,
593 expected: u64,
594 }
595 let test_cases = [
596 TestCase {
597 initial_pointers: RingPointers::new(7, 9),
598 buffer_size: 16,
599 delta: 4,
600 expected: 0x7_0000000D,
601 },
602 TestCase {
603 initial_pointers: RingPointers::new(0, 0),
604 buffer_size: 16,
605 delta: 16,
606 expected: 0x10,
607 },
608 TestCase {
609 initial_pointers: RingPointers::new(0xFFFFFFF1, 0xFFFFFFFC),
610 buffer_size: 16,
611 delta: 5,
612 expected: 0xFFFFFFF100000001,
613 },
614 ];
615
616 for tc in &test_cases {
617 let buffer = Buffer::try_new(tc.buffer_size).unwrap();
618
619 let initial = tc.initial_pointers.as_combined();
620 buffer.combined_pointers.store(initial, Ordering::Release);
621
622 advance_write_pointer(
623 &buffer.combined_pointers,
624 buffer.size as u32,
625 tc.initial_pointers,
626 tc.delta,
627 )
628 .unwrap();
629 let actual = buffer.combined_pointers.load(Ordering::Acquire);
630 assert_eq!(tc.expected, actual);
631 }
632 }
633
634 #[test]
635 fn test_try_new() {
636 {
638 assert!(Buffer::try_new(256).is_ok());
639 }
640
641 {
643 assert_eq!(Buffer::try_new(u32::MAX).err().unwrap(), Status::INVALID_ARGS);
644 }
645
646 {
648 assert_eq!(Buffer::try_new(100).err().unwrap(), Status::INVALID_ARGS);
649 }
650
651 {
653 assert_eq!(Buffer::try_new_in(256, NoOpAllocator).err().unwrap(), Status::NO_MEMORY);
654 }
655 }
656
657 #[test]
658 fn test_read_write_single_threaded() {
659 const STORAGE_SIZE: usize = 256;
660 let mut src = [0u8; STORAGE_SIZE];
661 for i in 0..STORAGE_SIZE {
662 src[i] = (i * 17 + 5) as u8;
663 }
664
665 struct TestCase {
666 write_size: u32,
667 read_size: u32,
668 expected_read_size: u32,
669 expected_reserve_status: Result<(), Status>,
670 expected_read_status: Result<(), Status>,
671 initial_pointers: RingPointers,
672 use_copy_out_err_fn: bool,
673 }
674
675 let test_cases = [
676 TestCase {
677 write_size: (STORAGE_SIZE / 2) as u32,
678 read_size: (STORAGE_SIZE / 2) as u32,
679 expected_read_size: (STORAGE_SIZE / 2) as u32,
680 expected_reserve_status: Ok(()),
681 expected_read_status: Ok(()),
682 initial_pointers: RingPointers::new(0, 0),
683 use_copy_out_err_fn: false,
684 },
685 TestCase {
686 write_size: (STORAGE_SIZE / 2) as u32,
687 read_size: (STORAGE_SIZE / 4) as u32,
688 expected_read_size: (STORAGE_SIZE / 4) as u32,
689 expected_reserve_status: Ok(()),
690 expected_read_status: Ok(()),
691 initial_pointers: RingPointers::new(0, 0),
692 use_copy_out_err_fn: false,
693 },
694 TestCase {
695 write_size: STORAGE_SIZE as u32,
696 read_size: STORAGE_SIZE as u32,
697 expected_read_size: STORAGE_SIZE as u32,
698 expected_reserve_status: Ok(()),
699 expected_read_status: Ok(()),
700 initial_pointers: RingPointers::new(0, 0),
701 use_copy_out_err_fn: false,
702 },
703 TestCase {
704 write_size: (STORAGE_SIZE / 4) as u32,
705 read_size: (STORAGE_SIZE / 2) as u32,
706 expected_read_size: (STORAGE_SIZE / 4) as u32,
707 expected_reserve_status: Ok(()),
708 expected_read_status: Ok(()),
709 initial_pointers: RingPointers::new(0, 0),
710 use_copy_out_err_fn: false,
711 },
712 TestCase {
713 write_size: STORAGE_SIZE as u32,
714 read_size: STORAGE_SIZE as u32,
715 expected_read_size: STORAGE_SIZE as u32,
716 expected_reserve_status: Ok(()),
717 expected_read_status: Ok(()),
718 initial_pointers: RingPointers::new(
719 (STORAGE_SIZE / 2) as u32,
720 (STORAGE_SIZE / 2) as u32,
721 ),
722 use_copy_out_err_fn: false,
723 },
724 TestCase {
725 write_size: STORAGE_SIZE as u32,
726 read_size: STORAGE_SIZE as u32,
727 expected_read_size: STORAGE_SIZE as u32,
728 expected_reserve_status: Ok(()),
729 expected_read_status: Ok(()),
730 initial_pointers: RingPointers::new(0xFFFFFFFA, 0xFFFFFFFA),
731 use_copy_out_err_fn: false,
732 },
733 TestCase {
734 write_size: 64,
735 read_size: 0,
736 expected_read_size: 0,
737 expected_reserve_status: Err(Status::NO_SPACE),
738 expected_read_status: Ok(()),
739 initial_pointers: RingPointers::new(0, (STORAGE_SIZE - 48) as u32),
740 use_copy_out_err_fn: false,
741 },
742 TestCase {
743 write_size: STORAGE_SIZE as u32,
744 read_size: (STORAGE_SIZE / 2) as u32,
745 expected_read_size: 0,
746 expected_reserve_status: Ok(()),
747 expected_read_status: Err(Status::BAD_STATE),
748 initial_pointers: RingPointers::new(0, 0),
749 use_copy_out_err_fn: true,
750 },
751 ];
752
753 for tc in &test_cases {
754 let mut dst = [0u8; STORAGE_SIZE];
755
756 let mut spsc = Buffer::try_new(STORAGE_SIZE as u32).unwrap();
757
758 let starting_pointers = tc.initial_pointers.as_combined();
759 spsc.combined_pointers.store(starting_pointers, Ordering::Release);
760
761 let reservation = spsc.reserve(tc.write_size);
762 if let Err(e) = &tc.expected_reserve_status {
763 match reservation {
764 Err(actual_err) => assert_eq!(actual_err, *e),
765 Ok(_) => panic!("expected reserve to fail with {:?}, but it succeeded", e),
766 }
767 continue;
768 }
769 let mut reservation = reservation.unwrap();
770
771 reservation.write(&src[..tc.write_size as usize]).unwrap();
772 reservation.commit().unwrap();
773
774 let copy_out_fn = |offset: u32, src_slice: &[u8]| -> Result<(), Status> {
775 let offset = offset as usize;
776 assert!(offset + src_slice.len() <= dst.len());
777 dst[offset..offset + src_slice.len()].copy_from_slice(src_slice);
778 Ok(())
779 };
780
781 let copy_out_err_fn =
782 |_offset: u32, _src_slice: &[u8]| -> Result<(), Status> { Err(Status::BAD_STATE) };
783
784 let read_result = if tc.use_copy_out_err_fn {
785 spsc.read(copy_out_err_fn, tc.read_size)
786 } else {
787 spsc.read(copy_out_fn, tc.read_size)
788 };
789
790 if let Err(e) = &tc.expected_read_status {
791 assert_eq!(read_result.unwrap_err(), *e);
792 assert_eq!(spsc.load_pointers().available_data(), tc.write_size);
793 continue;
794 }
795
796 let read_bytes = read_result.unwrap();
797 assert_eq!(read_bytes, tc.expected_read_size);
798 assert_eq!(
799 &dst[..tc.expected_read_size as usize],
800 &src[..tc.expected_read_size as usize]
801 );
802 }
803 }
804
805 #[test]
806 fn test_drain() {
807 const STORAGE_SIZE: u32 = 256;
808 let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
809
810 let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
811 let write_data = [b'f'; (STORAGE_SIZE / 2) as usize];
812 reservation.write(&write_data).unwrap();
813 reservation.commit().unwrap();
814
815 assert_eq!(spsc.load_pointers().available_data(), STORAGE_SIZE / 2);
816
817 spsc.drain().unwrap();
818 assert_eq!(spsc.load_pointers().available_data(), 0);
819 }
820
821 #[test]
822 fn test_commit_error() {
823 const STORAGE_SIZE: u32 = 256;
824 let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
825
826 let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
827 let write_data = [b'f'; (STORAGE_SIZE / 2) as usize - 1];
829 reservation.write(&write_data).unwrap();
830 assert_eq!(reservation.commit(), Err(Status::BAD_STATE));
831 }
832
833 #[test]
834 fn test_write_error() {
835 const STORAGE_SIZE: u32 = 256;
836 let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
837
838 let mut reservation = spsc.reserve(STORAGE_SIZE / 2).unwrap();
839 let write_data = [b'f'; (STORAGE_SIZE / 2) as usize + 1];
841 assert_eq!(reservation.write(&write_data), Err(Status::BUFFER_TOO_SMALL));
842 reservation.committed = true;
843 }
844
845 #[test]
846 fn test_from_raw_parts() {
847 let mut mock_storage = [0u8; 256];
848
849 let mut spsc =
851 unsafe { Buffer::from_raw_parts(mock_storage.as_mut_ptr(), mock_storage.len()) };
852
853 let mut reservation = spsc.reserve(100).unwrap();
855 let write_data = [b'x'; 100];
856 reservation.write(&write_data).unwrap();
857 reservation.commit().unwrap();
858
859 assert_eq!(spsc.load_pointers().available_data(), 100);
860 assert_eq!(spsc.combined_pointers.load(Ordering::Relaxed) & 0xffffffff, 100); assert_eq!(&mock_storage[..100], &write_data[..]);
862 }
863
864 #[test]
865 fn test_reserve_zero() {
866 const STORAGE_SIZE: u32 = 256;
867 let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
868 match spsc.reserve(0) {
869 Err(e) => assert_eq!(e, Status::INVALID_ARGS),
870 Ok(_) => panic!("reserve(0) should fail with INVALID_ARGS"),
871 }
872 }
873
874 #[test]
875 fn test_reserve_too_large() {
876 const STORAGE_SIZE: u32 = 256;
877 let mut spsc = Buffer::try_new(STORAGE_SIZE).unwrap();
878 match spsc.reserve(u32::MAX) {
879 Err(e) => assert_eq!(e, Status::INVALID_ARGS),
880 Ok(_) => panic!("reserve(u32::MAX) should fail with INVALID_ARGS"),
881 }
882 }
883
884 #[test]
885 fn test_reserve_at_break() {
886 const STORAGE_SIZE: u32 = 16;
887 let mut mock_storage = [0u8; STORAGE_SIZE as usize];
892
893 let mut spsc =
895 unsafe { Buffer::from_raw_parts(mock_storage.as_mut_ptr(), mock_storage.len()) };
896 spsc.combined_pointers.store((3u64 << 32) | 15u64, Ordering::Release);
897
898 let mut reservation = spsc.reserve(4).unwrap();
899 assert_eq!(reservation.region1.len(), 1);
900 assert_eq!(reservation.region2.len(), 3);
901
902 let data = [1, 2, 3, 4];
903 reservation.write(&data).unwrap();
904 reservation.commit().unwrap();
905
906 assert_eq!(mock_storage[15], 1);
907 assert_eq!(mock_storage[0], 2);
908 assert_eq!(mock_storage[1], 3);
909 assert_eq!(mock_storage[2], 4);
910 }
911
912 #[test]
913 fn test_cpp_rust_integration() {
914 #[link(name = "c++")]
915 unsafe extern "C" {
916 fn cpp_spsc_allocate(size: u32) -> *mut Buffer<NoOpAllocator>;
917 fn cpp_spsc_free(spsc: *mut Buffer<NoOpAllocator>);
918 fn cpp_spsc_write(spsc: *mut Buffer<NoOpAllocator>, data: *const u8, len: u32) -> i32;
919 fn cpp_spsc_read(spsc: *mut Buffer<NoOpAllocator>, dst: *mut u8, len: u32) -> i32;
920 }
921
922 let spsc_ptr = unsafe { cpp_spsc_allocate(256) };
924 assert!(!spsc_ptr.is_null());
925
926 let spsc = unsafe { &mut *spsc_ptr };
928
929 let write_data = b"Hello from C++!";
931 let cpp_write_status =
932 unsafe { cpp_spsc_write(spsc_ptr, write_data.as_ptr(), write_data.len() as u32) };
933 assert_eq!(cpp_write_status, 0); let mut read_buf = [0u8; 100];
937 let bytes_read = spsc
938 .read(
939 |_, src| {
940 read_buf[..src.len()].copy_from_slice(src);
941 Ok(())
942 },
943 write_data.len() as u32,
944 )
945 .unwrap();
946 assert_eq!(bytes_read, write_data.len() as u32);
947 assert_eq!(&read_buf[..bytes_read as usize], write_data);
948
949 let rust_write_data = b"Hello from Rust!";
951 let mut reservation = spsc.reserve(rust_write_data.len() as u32).unwrap();
952 reservation.write(rust_write_data).unwrap();
953 reservation.commit().unwrap();
954
955 let mut cpp_read_buf = [0u8; 100];
957 let cpp_read_bytes = unsafe {
958 cpp_spsc_read(spsc_ptr, cpp_read_buf.as_mut_ptr(), rust_write_data.len() as u32)
959 };
960 assert_eq!(cpp_read_bytes, rust_write_data.len() as i32);
961 assert_eq!(&cpp_read_buf[..cpp_read_bytes as usize], rust_write_data);
962
963 unsafe {
965 cpp_spsc_free(spsc_ptr);
966 }
967 }
968}