async_ringbuf/
rb.rs

1use crate::wrap::{AsyncCons, AsyncProd};
2#[cfg(feature = "alloc")]
3use alloc::sync::Arc;
4use core::{mem::MaybeUninit, num::NonZeroUsize};
5use futures::task::AtomicWaker;
6#[cfg(feature = "alloc")]
7use ringbuf::traits::Split;
8use ringbuf::{
9    rb::RbRef,
10    storage::Storage,
11    traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
12    SharedRb,
13};
14
15pub trait AsyncRbRef: RbRef<Rb = AsyncRb<Self::Storage>> {
16    type Storage: Storage;
17}
18impl<S: Storage, R: RbRef<Rb = AsyncRb<S>>> AsyncRbRef for R {
19    type Storage = S;
20}
21
22pub struct AsyncRb<S: Storage> {
23    base: SharedRb<S>,
24    pub(crate) read: AtomicWaker,
25    pub(crate) write: AtomicWaker,
26}
27
28impl<S: Storage> AsyncRb<S> {
29    pub fn from(base: SharedRb<S>) -> Self {
30        Self {
31            base,
32            read: AtomicWaker::default(),
33            write: AtomicWaker::default(),
34        }
35    }
36}
37
38impl<S: Storage> Unpin for AsyncRb<S> {}
39
40impl<S: Storage> Observer for AsyncRb<S> {
41    type Item = S::Item;
42
43    #[inline]
44    fn capacity(&self) -> NonZeroUsize {
45        self.base.capacity()
46    }
47
48    #[inline]
49    fn read_index(&self) -> usize {
50        self.base.read_index()
51    }
52    #[inline]
53    fn write_index(&self) -> usize {
54        self.base.write_index()
55    }
56
57    unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
58        self.base.unsafe_slices(start, end)
59    }
60    unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
61        self.base.unsafe_slices_mut(start, end)
62    }
63
64    #[inline]
65    fn read_is_held(&self) -> bool {
66        self.base.read_is_held()
67    }
68    #[inline]
69    fn write_is_held(&self) -> bool {
70        self.base.write_is_held()
71    }
72}
73
74impl<S: Storage> Producer for AsyncRb<S> {
75    unsafe fn set_write_index(&self, value: usize) {
76        self.base.set_write_index(value);
77        self.write.wake();
78    }
79}
80impl<S: Storage> Consumer for AsyncRb<S> {
81    unsafe fn set_read_index(&self, value: usize) {
82        self.base.set_read_index(value);
83        self.read.wake();
84    }
85}
86impl<S: Storage> RingBuffer for AsyncRb<S> {
87    #[inline]
88    unsafe fn hold_read(&self, flag: bool) -> bool {
89        let old = self.base.hold_read(flag);
90        self.read.wake();
91        old
92    }
93    #[inline]
94    unsafe fn hold_write(&self, flag: bool) -> bool {
95        let old = self.base.hold_write(flag);
96        self.write.wake();
97        old
98    }
99}
100
101impl<S: Storage> SplitRef for AsyncRb<S> {
102    type RefProd<'a> = AsyncProd<&'a Self> where Self:  'a;
103    type RefCons<'a> = AsyncCons<&'a Self> where Self:  'a;
104
105    fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
106        unsafe { (AsyncProd::new(self), AsyncCons::new(self)) }
107    }
108}
109#[cfg(feature = "alloc")]
110impl<S: Storage> Split for AsyncRb<S> {
111    type Prod = AsyncProd<Arc<Self>>;
112    type Cons = AsyncCons<Arc<Self>>;
113
114    fn split(self) -> (Self::Prod, Self::Cons) {
115        let arc = Arc::new(self);
116        unsafe { (AsyncProd::new(arc.clone()), AsyncCons::new(arc)) }
117    }
118}
119
120impl<S: Storage> AsRef<Self> for AsyncRb<S> {
121    fn as_ref(&self) -> &Self {
122        self
123    }
124}
125impl<S: Storage> AsMut<Self> for AsyncRb<S> {
126    fn as_mut(&mut self) -> &mut Self {
127        self
128    }
129}