ringbuf/rb/
shared.rs

1use super::{macros::rb_impl_init, utils::ranges};
2#[cfg(feature = "alloc")]
3use crate::traits::Split;
4use crate::{
5    storage::Storage,
6    traits::{
7        consumer::{impl_consumer_traits, Consumer},
8        producer::{impl_producer_traits, Producer},
9        Observer, RingBuffer, SplitRef,
10    },
11    wrap::{CachingCons, CachingProd},
12};
13use core::{
14    mem::{ManuallyDrop, MaybeUninit},
15    num::NonZeroUsize,
16    ptr,
17};
18use crossbeam_utils::CachePadded;
19
20#[cfg(not(feature = "portable-atomic"))]
21use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22#[cfg(feature = "portable-atomic")]
23use portable_atomic::{AtomicBool, AtomicUsize, Ordering};
24#[cfg(feature = "alloc")]
25use {crate::alias::Arc, alloc::boxed::Box};
26
27/// Ring buffer that can be shared between threads.
28///
29/// Note that there is no explicit requirement of `T: Send`. Instead ring buffer will work just fine even with `T: !Send`
30/// until you try to send its producer or consumer to another thread.
31#[cfg_attr(
32    feature = "std",
33    doc = r##"
34```
35use std::thread;
36use ringbuf::{SharedRb, storage::Heap, traits::*};
37
38let rb = SharedRb::<Heap<i32>>::new(256);
39let (mut prod, mut cons) = rb.split();
40thread::spawn(move || {
41    prod.try_push(123).unwrap();
42})
43.join();
44thread::spawn(move || {
45    assert_eq!(cons.try_pop().unwrap(), 123);
46})
47.join();
48```
49"##
50)]
51pub struct SharedRb<S: Storage + ?Sized> {
52    read_index: CachePadded<AtomicUsize>,
53    write_index: CachePadded<AtomicUsize>,
54    read_held: AtomicBool,
55    write_held: AtomicBool,
56    storage: S,
57}
58
59impl<S: Storage> SharedRb<S> {
60    /// Constructs ring buffer from storage and indices.
61    ///
62    /// # Safety
63    ///
64    /// The items in storage inside `read..write` range must be initialized, items outside this range must be uninitialized.
65    /// `read` and `write` positions must be valid (see implementation details).
66    pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
67        assert!(!storage.is_empty());
68        Self {
69            storage,
70            read_index: CachePadded::new(AtomicUsize::new(read)),
71            write_index: CachePadded::new(AtomicUsize::new(write)),
72            read_held: AtomicBool::new(false),
73            write_held: AtomicBool::new(false),
74        }
75    }
76    /// Destructures ring buffer into underlying storage and `read` and `write` indices.
77    ///
78    /// # Safety
79    ///
80    /// Initialized contents of the storage must be properly dropped.
81    pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
82        let this = ManuallyDrop::new(self);
83        (ptr::read(&this.storage), this.read_index(), this.write_index())
84    }
85}
86
87impl<S: Storage + ?Sized> Observer for SharedRb<S> {
88    type Item = S::Item;
89
90    #[inline]
91    fn capacity(&self) -> NonZeroUsize {
92        unsafe { NonZeroUsize::new_unchecked(self.storage.len()) }
93    }
94
95    #[inline]
96    fn read_index(&self) -> usize {
97        self.read_index.load(Ordering::Acquire)
98    }
99    #[inline]
100    fn write_index(&self) -> usize {
101        self.write_index.load(Ordering::Acquire)
102    }
103
104    unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
105        let (first, second) = ranges(self.capacity(), start, end);
106        (self.storage.slice(first), self.storage.slice(second))
107    }
108    unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
109        let (first, second) = ranges(self.capacity(), start, end);
110        (self.storage.slice_mut(first), self.storage.slice_mut(second))
111    }
112
113    #[inline]
114    fn read_is_held(&self) -> bool {
115        self.read_held.load(Ordering::Acquire)
116    }
117    #[inline]
118    fn write_is_held(&self) -> bool {
119        self.write_held.load(Ordering::Acquire)
120    }
121}
122
123impl<S: Storage + ?Sized> Producer for SharedRb<S> {
124    #[inline]
125    unsafe fn set_write_index(&self, value: usize) {
126        self.write_index.store(value, Ordering::Release);
127    }
128}
129
130impl<S: Storage + ?Sized> Consumer for SharedRb<S> {
131    #[inline]
132    unsafe fn set_read_index(&self, value: usize) {
133        self.read_index.store(value, Ordering::Release);
134    }
135}
136
137impl<S: Storage + ?Sized> RingBuffer for SharedRb<S> {
138    #[inline]
139    unsafe fn hold_read(&self, flag: bool) -> bool {
140        self.read_held.swap(flag, Ordering::AcqRel)
141    }
142    #[inline]
143    unsafe fn hold_write(&self, flag: bool) -> bool {
144        self.write_held.swap(flag, Ordering::AcqRel)
145    }
146}
147
148impl<S: Storage + ?Sized> Drop for SharedRb<S> {
149    fn drop(&mut self) {
150        self.clear();
151    }
152}
153
154#[cfg(feature = "alloc")]
155impl<S: Storage> Split for SharedRb<S> {
156    type Prod = CachingProd<Arc<Self>>;
157    type Cons = CachingCons<Arc<Self>>;
158
159    fn split(self) -> (Self::Prod, Self::Cons) {
160        Arc::new(self).split()
161    }
162}
163#[cfg(feature = "alloc")]
164impl<S: Storage + ?Sized> Split for Arc<SharedRb<S>> {
165    type Prod = CachingProd<Self>;
166    type Cons = CachingCons<Self>;
167
168    fn split(self) -> (Self::Prod, Self::Cons) {
169        (CachingProd::new(self.clone()), CachingCons::new(self))
170    }
171}
172#[cfg(feature = "alloc")]
173impl<S: Storage + ?Sized> Split for Box<SharedRb<S>> {
174    type Prod = CachingProd<Arc<SharedRb<S>>>;
175    type Cons = CachingCons<Arc<SharedRb<S>>>;
176
177    fn split(self) -> (Self::Prod, Self::Cons) {
178        Arc::<SharedRb<S>>::from(self).split()
179    }
180}
181impl<S: Storage + ?Sized> SplitRef for SharedRb<S> {
182    type RefProd<'a>
183        = CachingProd<&'a Self>
184    where
185        Self: 'a;
186    type RefCons<'a>
187        = CachingCons<&'a Self>
188    where
189        Self: 'a;
190
191    fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
192        (CachingProd::new(self), CachingCons::new(self))
193    }
194}
195
196rb_impl_init!(SharedRb);
197
198impl_producer_traits!(SharedRb<S: Storage>);
199impl_consumer_traits!(SharedRb<S: Storage>);
200
201impl<S: Storage + ?Sized> AsRef<Self> for SharedRb<S> {
202    fn as_ref(&self) -> &Self {
203        self
204    }
205}
206impl<S: Storage + ?Sized> AsMut<Self> for SharedRb<S> {
207    fn as_mut(&mut self) -> &mut Self {
208        self
209    }
210}