ringbuf/rb/
shared.rs

1use super::{macros::rb_impl_init, utils::ranges};
2#[cfg(feature = "alloc")]
3use crate::traits::Split;
4use crate::{
5    storage::Storage,
6    traits::{
7        consumer::{impl_consumer_traits, Consumer},
8        producer::{impl_producer_traits, Producer},
9        Observer, RingBuffer, SplitRef,
10    },
11    wrap::{CachingCons, CachingProd},
12};
13#[cfg(feature = "alloc")]
14use alloc::{boxed::Box, sync::Arc};
15use core::{
16    mem::{ManuallyDrop, MaybeUninit},
17    num::NonZeroUsize,
18    ptr,
19    sync::atomic::{AtomicBool, AtomicUsize, Ordering},
20};
21use crossbeam_utils::CachePadded;
22
23/// Ring buffer that can be shared between threads.
24///
25/// Note that there is no explicit requirement of `T: Send`. Instead ring buffer will work just fine even with `T: !Send`
26/// until you try to send its producer or consumer to another thread.
27#[cfg_attr(
28    feature = "std",
29    doc = r##"
30```
31use std::thread;
32use ringbuf::{SharedRb, storage::Heap, traits::*};
33
34let rb = SharedRb::<Heap<i32>>::new(256);
35let (mut prod, mut cons) = rb.split();
36thread::spawn(move || {
37    prod.try_push(123).unwrap();
38})
39.join();
40thread::spawn(move || {
41    assert_eq!(cons.try_pop().unwrap(), 123);
42})
43.join();
44```
45"##
46)]
47pub struct SharedRb<S: Storage + ?Sized> {
48    read_index: CachePadded<AtomicUsize>,
49    write_index: CachePadded<AtomicUsize>,
50    read_held: AtomicBool,
51    write_held: AtomicBool,
52    storage: S,
53}
54
55impl<S: Storage> SharedRb<S> {
56    /// Constructs ring buffer from storage and indices.
57    ///
58    /// # Safety
59    ///
60    /// The items in storage inside `read..write` range must be initialized, items outside this range must be uninitialized.
61    /// `read` and `write` positions must be valid (see implementation details).
62    pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
63        assert!(!storage.is_empty());
64        Self {
65            storage,
66            read_index: CachePadded::new(AtomicUsize::new(read)),
67            write_index: CachePadded::new(AtomicUsize::new(write)),
68            read_held: AtomicBool::new(false),
69            write_held: AtomicBool::new(false),
70        }
71    }
72    /// Destructures ring buffer into underlying storage and `read` and `write` indices.
73    ///
74    /// # Safety
75    ///
76    /// Initialized contents of the storage must be properly dropped.
77    pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
78        let this = ManuallyDrop::new(self);
79        (ptr::read(&this.storage), this.read_index(), this.write_index())
80    }
81}
82
83impl<S: Storage + ?Sized> Observer for SharedRb<S> {
84    type Item = S::Item;
85
86    #[inline]
87    fn capacity(&self) -> NonZeroUsize {
88        unsafe { NonZeroUsize::new_unchecked(self.storage.len()) }
89    }
90
91    #[inline]
92    fn read_index(&self) -> usize {
93        self.read_index.load(Ordering::Acquire)
94    }
95    #[inline]
96    fn write_index(&self) -> usize {
97        self.write_index.load(Ordering::Acquire)
98    }
99
100    unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
101        let (first, second) = ranges(self.capacity(), start, end);
102        (self.storage.slice(first), self.storage.slice(second))
103    }
104    unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
105        let (first, second) = ranges(self.capacity(), start, end);
106        (self.storage.slice_mut(first), self.storage.slice_mut(second))
107    }
108
109    #[inline]
110    fn read_is_held(&self) -> bool {
111        self.read_held.load(Ordering::Acquire)
112    }
113    #[inline]
114    fn write_is_held(&self) -> bool {
115        self.write_held.load(Ordering::Acquire)
116    }
117}
118
119impl<S: Storage + ?Sized> Producer for SharedRb<S> {
120    #[inline]
121    unsafe fn set_write_index(&self, value: usize) {
122        self.write_index.store(value, Ordering::Release);
123    }
124}
125
126impl<S: Storage + ?Sized> Consumer for SharedRb<S> {
127    #[inline]
128    unsafe fn set_read_index(&self, value: usize) {
129        self.read_index.store(value, Ordering::Release);
130    }
131}
132
133impl<S: Storage + ?Sized> RingBuffer for SharedRb<S> {
134    #[inline]
135    unsafe fn hold_read(&self, flag: bool) -> bool {
136        self.read_held.swap(flag, Ordering::AcqRel)
137    }
138    #[inline]
139    unsafe fn hold_write(&self, flag: bool) -> bool {
140        self.write_held.swap(flag, Ordering::AcqRel)
141    }
142}
143
144impl<S: Storage + ?Sized> Drop for SharedRb<S> {
145    fn drop(&mut self) {
146        self.clear();
147    }
148}
149
150#[cfg(feature = "alloc")]
151impl<S: Storage> Split for SharedRb<S> {
152    type Prod = CachingProd<Arc<Self>>;
153    type Cons = CachingCons<Arc<Self>>;
154
155    fn split(self) -> (Self::Prod, Self::Cons) {
156        Arc::new(self).split()
157    }
158}
159#[cfg(feature = "alloc")]
160impl<S: Storage + ?Sized> Split for Arc<SharedRb<S>> {
161    type Prod = CachingProd<Self>;
162    type Cons = CachingCons<Self>;
163
164    fn split(self) -> (Self::Prod, Self::Cons) {
165        (CachingProd::new(self.clone()), CachingCons::new(self))
166    }
167}
168#[cfg(feature = "alloc")]
169impl<S: Storage + ?Sized> Split for Box<SharedRb<S>> {
170    type Prod = CachingProd<Arc<SharedRb<S>>>;
171    type Cons = CachingCons<Arc<SharedRb<S>>>;
172
173    fn split(self) -> (Self::Prod, Self::Cons) {
174        Arc::<SharedRb<S>>::from(self).split()
175    }
176}
177impl<S: Storage + ?Sized> SplitRef for SharedRb<S> {
178    type RefProd<'a> = CachingProd<&'a Self> where Self: 'a;
179    type RefCons<'a> = CachingCons<&'a Self> where Self: 'a;
180
181    fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
182        (CachingProd::new(self), CachingCons::new(self))
183    }
184}
185
186rb_impl_init!(SharedRb);
187
188impl_producer_traits!(SharedRb<S: Storage>);
189impl_consumer_traits!(SharedRb<S: Storage>);
190
191impl<S: Storage + ?Sized> AsRef<Self> for SharedRb<S> {
192    fn as_ref(&self) -> &Self {
193        self
194    }
195}
196impl<S: Storage + ?Sized> AsMut<Self> for SharedRb<S> {
197    fn as_mut(&mut self) -> &mut Self {
198        self
199    }
200}