async_ringbuf/
rb.rs
1use crate::wrap::{AsyncCons, AsyncProd};
2#[cfg(feature = "alloc")]
3use alloc::sync::Arc;
4use core::{mem::MaybeUninit, num::NonZeroUsize};
5use futures::task::AtomicWaker;
6#[cfg(feature = "alloc")]
7use ringbuf::traits::Split;
8use ringbuf::{
9 rb::RbRef,
10 storage::Storage,
11 traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
12 SharedRb,
13};
14
15pub trait AsyncRbRef: RbRef<Rb = AsyncRb<Self::Storage>> {
16 type Storage: Storage;
17}
18impl<S: Storage, R: RbRef<Rb = AsyncRb<S>>> AsyncRbRef for R {
19 type Storage = S;
20}
21
22pub struct AsyncRb<S: Storage> {
23 base: SharedRb<S>,
24 pub(crate) read: AtomicWaker,
25 pub(crate) write: AtomicWaker,
26}
27
28impl<S: Storage> AsyncRb<S> {
29 pub fn from(base: SharedRb<S>) -> Self {
30 Self {
31 base,
32 read: AtomicWaker::default(),
33 write: AtomicWaker::default(),
34 }
35 }
36}
37
38impl<S: Storage> Unpin for AsyncRb<S> {}
39
40impl<S: Storage> Observer for AsyncRb<S> {
41 type Item = S::Item;
42
43 #[inline]
44 fn capacity(&self) -> NonZeroUsize {
45 self.base.capacity()
46 }
47
48 #[inline]
49 fn read_index(&self) -> usize {
50 self.base.read_index()
51 }
52 #[inline]
53 fn write_index(&self) -> usize {
54 self.base.write_index()
55 }
56
57 unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
58 self.base.unsafe_slices(start, end)
59 }
60 unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
61 self.base.unsafe_slices_mut(start, end)
62 }
63
64 #[inline]
65 fn read_is_held(&self) -> bool {
66 self.base.read_is_held()
67 }
68 #[inline]
69 fn write_is_held(&self) -> bool {
70 self.base.write_is_held()
71 }
72}
73
74impl<S: Storage> Producer for AsyncRb<S> {
75 unsafe fn set_write_index(&self, value: usize) {
76 self.base.set_write_index(value);
77 self.write.wake();
78 }
79}
80impl<S: Storage> Consumer for AsyncRb<S> {
81 unsafe fn set_read_index(&self, value: usize) {
82 self.base.set_read_index(value);
83 self.read.wake();
84 }
85}
86impl<S: Storage> RingBuffer for AsyncRb<S> {
87 #[inline]
88 unsafe fn hold_read(&self, flag: bool) -> bool {
89 let old = self.base.hold_read(flag);
90 self.read.wake();
91 old
92 }
93 #[inline]
94 unsafe fn hold_write(&self, flag: bool) -> bool {
95 let old = self.base.hold_write(flag);
96 self.write.wake();
97 old
98 }
99}
100
101impl<S: Storage> SplitRef for AsyncRb<S> {
102 type RefProd<'a> = AsyncProd<&'a Self> where Self: 'a;
103 type RefCons<'a> = AsyncCons<&'a Self> where Self: 'a;
104
105 fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
106 unsafe { (AsyncProd::new(self), AsyncCons::new(self)) }
107 }
108}
109#[cfg(feature = "alloc")]
110impl<S: Storage> Split for AsyncRb<S> {
111 type Prod = AsyncProd<Arc<Self>>;
112 type Cons = AsyncCons<Arc<Self>>;
113
114 fn split(self) -> (Self::Prod, Self::Cons) {
115 let arc = Arc::new(self);
116 unsafe { (AsyncProd::new(arc.clone()), AsyncCons::new(arc)) }
117 }
118}
119
120impl<S: Storage> AsRef<Self> for AsyncRb<S> {
121 fn as_ref(&self) -> &Self {
122 self
123 }
124}
125impl<S: Storage> AsMut<Self> for AsyncRb<S> {
126 fn as_mut(&mut self) -> &mut Self {
127 self
128 }
129}