async_ringbuf/
rb.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use crate::wrap::{AsyncCons, AsyncProd};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{mem::MaybeUninit, num::NonZeroUsize};
use futures::task::AtomicWaker;
#[cfg(feature = "alloc")]
use ringbuf::traits::Split;
use ringbuf::{
    rb::RbRef,
    storage::Storage,
    traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
    SharedRb,
};

pub trait AsyncRbRef: RbRef<Rb = AsyncRb<Self::Storage>> {
    type Storage: Storage;
}
impl<S: Storage, R: RbRef<Rb = AsyncRb<S>>> AsyncRbRef for R {
    type Storage = S;
}

pub struct AsyncRb<S: Storage> {
    base: SharedRb<S>,
    pub(crate) read: AtomicWaker,
    pub(crate) write: AtomicWaker,
}

impl<S: Storage> AsyncRb<S> {
    pub fn from(base: SharedRb<S>) -> Self {
        Self {
            base,
            read: AtomicWaker::default(),
            write: AtomicWaker::default(),
        }
    }
}

impl<S: Storage> Unpin for AsyncRb<S> {}

impl<S: Storage> Observer for AsyncRb<S> {
    type Item = S::Item;

    #[inline]
    fn capacity(&self) -> NonZeroUsize {
        self.base.capacity()
    }

    #[inline]
    fn read_index(&self) -> usize {
        self.base.read_index()
    }
    #[inline]
    fn write_index(&self) -> usize {
        self.base.write_index()
    }

    unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
        self.base.unsafe_slices(start, end)
    }
    unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
        self.base.unsafe_slices_mut(start, end)
    }

    #[inline]
    fn read_is_held(&self) -> bool {
        self.base.read_is_held()
    }
    #[inline]
    fn write_is_held(&self) -> bool {
        self.base.write_is_held()
    }
}

impl<S: Storage> Producer for AsyncRb<S> {
    unsafe fn set_write_index(&self, value: usize) {
        self.base.set_write_index(value);
        self.write.wake();
    }
}
impl<S: Storage> Consumer for AsyncRb<S> {
    unsafe fn set_read_index(&self, value: usize) {
        self.base.set_read_index(value);
        self.read.wake();
    }
}
impl<S: Storage> RingBuffer for AsyncRb<S> {
    #[inline]
    unsafe fn hold_read(&self, flag: bool) -> bool {
        let old = self.base.hold_read(flag);
        self.read.wake();
        old
    }
    #[inline]
    unsafe fn hold_write(&self, flag: bool) -> bool {
        let old = self.base.hold_write(flag);
        self.write.wake();
        old
    }
}

impl<S: Storage> SplitRef for AsyncRb<S> {
    type RefProd<'a> = AsyncProd<&'a Self> where Self:  'a;
    type RefCons<'a> = AsyncCons<&'a Self> where Self:  'a;

    fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
        unsafe { (AsyncProd::new(self), AsyncCons::new(self)) }
    }
}
#[cfg(feature = "alloc")]
impl<S: Storage> Split for AsyncRb<S> {
    type Prod = AsyncProd<Arc<Self>>;
    type Cons = AsyncCons<Arc<Self>>;

    fn split(self) -> (Self::Prod, Self::Cons) {
        let arc = Arc::new(self);
        unsafe { (AsyncProd::new(arc.clone()), AsyncCons::new(arc)) }
    }
}

impl<S: Storage> AsRef<Self> for AsyncRb<S> {
    fn as_ref(&self) -> &Self {
        self
    }
}
impl<S: Storage> AsMut<Self> for AsyncRb<S> {
    fn as_mut(&mut self) -> &mut Self {
        self
    }
}