ringbuf/rb/
shared.rs
1use super::{macros::rb_impl_init, utils::ranges};
2#[cfg(feature = "alloc")]
3use crate::traits::Split;
4use crate::{
5 storage::Storage,
6 traits::{
7 consumer::{impl_consumer_traits, Consumer},
8 producer::{impl_producer_traits, Producer},
9 Observer, RingBuffer, SplitRef,
10 },
11 wrap::{CachingCons, CachingProd},
12};
13#[cfg(feature = "alloc")]
14use alloc::{boxed::Box, sync::Arc};
15use core::{
16 mem::{ManuallyDrop, MaybeUninit},
17 num::NonZeroUsize,
18 ptr,
19 sync::atomic::{AtomicBool, AtomicUsize, Ordering},
20};
21use crossbeam_utils::CachePadded;
22
23#[cfg_attr(
28 feature = "std",
29 doc = r##"
30```
31use std::thread;
32use ringbuf::{SharedRb, storage::Heap, traits::*};
33
34let rb = SharedRb::<Heap<i32>>::new(256);
35let (mut prod, mut cons) = rb.split();
36thread::spawn(move || {
37 prod.try_push(123).unwrap();
38})
39.join();
40thread::spawn(move || {
41 assert_eq!(cons.try_pop().unwrap(), 123);
42})
43.join();
44```
45"##
46)]
47pub struct SharedRb<S: Storage + ?Sized> {
48 read_index: CachePadded<AtomicUsize>,
49 write_index: CachePadded<AtomicUsize>,
50 read_held: AtomicBool,
51 write_held: AtomicBool,
52 storage: S,
53}
54
55impl<S: Storage> SharedRb<S> {
56 pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
63 assert!(!storage.is_empty());
64 Self {
65 storage,
66 read_index: CachePadded::new(AtomicUsize::new(read)),
67 write_index: CachePadded::new(AtomicUsize::new(write)),
68 read_held: AtomicBool::new(false),
69 write_held: AtomicBool::new(false),
70 }
71 }
72 pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
78 let this = ManuallyDrop::new(self);
79 (ptr::read(&this.storage), this.read_index(), this.write_index())
80 }
81}
82
83impl<S: Storage + ?Sized> Observer for SharedRb<S> {
84 type Item = S::Item;
85
86 #[inline]
87 fn capacity(&self) -> NonZeroUsize {
88 unsafe { NonZeroUsize::new_unchecked(self.storage.len()) }
89 }
90
91 #[inline]
92 fn read_index(&self) -> usize {
93 self.read_index.load(Ordering::Acquire)
94 }
95 #[inline]
96 fn write_index(&self) -> usize {
97 self.write_index.load(Ordering::Acquire)
98 }
99
100 unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
101 let (first, second) = ranges(self.capacity(), start, end);
102 (self.storage.slice(first), self.storage.slice(second))
103 }
104 unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
105 let (first, second) = ranges(self.capacity(), start, end);
106 (self.storage.slice_mut(first), self.storage.slice_mut(second))
107 }
108
109 #[inline]
110 fn read_is_held(&self) -> bool {
111 self.read_held.load(Ordering::Acquire)
112 }
113 #[inline]
114 fn write_is_held(&self) -> bool {
115 self.write_held.load(Ordering::Acquire)
116 }
117}
118
119impl<S: Storage + ?Sized> Producer for SharedRb<S> {
120 #[inline]
121 unsafe fn set_write_index(&self, value: usize) {
122 self.write_index.store(value, Ordering::Release);
123 }
124}
125
126impl<S: Storage + ?Sized> Consumer for SharedRb<S> {
127 #[inline]
128 unsafe fn set_read_index(&self, value: usize) {
129 self.read_index.store(value, Ordering::Release);
130 }
131}
132
133impl<S: Storage + ?Sized> RingBuffer for SharedRb<S> {
134 #[inline]
135 unsafe fn hold_read(&self, flag: bool) -> bool {
136 self.read_held.swap(flag, Ordering::AcqRel)
137 }
138 #[inline]
139 unsafe fn hold_write(&self, flag: bool) -> bool {
140 self.write_held.swap(flag, Ordering::AcqRel)
141 }
142}
143
144impl<S: Storage + ?Sized> Drop for SharedRb<S> {
145 fn drop(&mut self) {
146 self.clear();
147 }
148}
149
150#[cfg(feature = "alloc")]
151impl<S: Storage> Split for SharedRb<S> {
152 type Prod = CachingProd<Arc<Self>>;
153 type Cons = CachingCons<Arc<Self>>;
154
155 fn split(self) -> (Self::Prod, Self::Cons) {
156 Arc::new(self).split()
157 }
158}
159#[cfg(feature = "alloc")]
160impl<S: Storage + ?Sized> Split for Arc<SharedRb<S>> {
161 type Prod = CachingProd<Self>;
162 type Cons = CachingCons<Self>;
163
164 fn split(self) -> (Self::Prod, Self::Cons) {
165 (CachingProd::new(self.clone()), CachingCons::new(self))
166 }
167}
168#[cfg(feature = "alloc")]
169impl<S: Storage + ?Sized> Split for Box<SharedRb<S>> {
170 type Prod = CachingProd<Arc<SharedRb<S>>>;
171 type Cons = CachingCons<Arc<SharedRb<S>>>;
172
173 fn split(self) -> (Self::Prod, Self::Cons) {
174 Arc::<SharedRb<S>>::from(self).split()
175 }
176}
177impl<S: Storage + ?Sized> SplitRef for SharedRb<S> {
178 type RefProd<'a> = CachingProd<&'a Self> where Self: 'a;
179 type RefCons<'a> = CachingCons<&'a Self> where Self: 'a;
180
181 fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
182 (CachingProd::new(self), CachingCons::new(self))
183 }
184}
185
186rb_impl_init!(SharedRb);
187
188impl_producer_traits!(SharedRb<S: Storage>);
189impl_consumer_traits!(SharedRb<S: Storage>);
190
191impl<S: Storage + ?Sized> AsRef<Self> for SharedRb<S> {
192 fn as_ref(&self) -> &Self {
193 self
194 }
195}
196impl<S: Storage + ?Sized> AsMut<Self> for SharedRb<S> {
197 fn as_mut(&mut self) -> &mut Self {
198 self
199 }
200}