async_ringbuf/wrap/
prod.rs

1use crate::{producer::AsyncProducer, rb::AsyncRbRef, wrap::AsyncProd};
2use core::{
3    pin::Pin,
4    task::{Context, Poll},
5};
6#[cfg(feature = "std")]
7use futures::io::AsyncWrite;
8use futures::{ready, Sink};
9#[cfg(feature = "std")]
10use ringbuf::traits::RingBuffer;
11use ringbuf::{
12    traits::{
13        producer::{DelegateProducer, Producer},
14        Observer,
15    },
16    wrap::Wrap,
17};
18#[cfg(feature = "std")]
19use std::io;
20
21impl<R: AsyncRbRef> DelegateProducer for AsyncProd<R> {}
22
23impl<R: AsyncRbRef> AsyncProducer for AsyncProd<R> {
24    fn register_waker(&self, waker: &core::task::Waker) {
25        self.rb().read.register(waker)
26    }
27
28    #[inline]
29    fn close(&mut self) {
30        drop(self.base.take());
31    }
32}
33
34impl<R: AsyncRbRef> Sink<<R::Rb as Observer>::Item> for AsyncProd<R> {
35    type Error = ();
36
37    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        Poll::Ready(if ready!(<Self as AsyncProducer>::poll_ready(self, cx)) {
39            Ok(())
40        } else {
41            Err(())
42        })
43    }
44    fn start_send(mut self: Pin<&mut Self>, item: <R::Rb as Observer>::Item) -> Result<(), Self::Error> {
45        assert!(self.try_push(item).is_ok());
46        Ok(())
47    }
48    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        // Don't need to be flushed.
50        Poll::Ready(Ok(()))
51    }
52    fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        self.close();
54        Poll::Ready(Ok(()))
55    }
56}
57
58#[cfg(feature = "std")]
59impl<R: AsyncRbRef> AsyncWrite for AsyncProd<R>
60where
61    R::Rb: RingBuffer<Item = u8>,
62{
63    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
64        <Self as AsyncProducer>::poll_write(self, cx, buf)
65    }
66    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
67        // Don't need to be flushed.
68        Poll::Ready(Ok(()))
69    }
70    fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
71        self.close();
72        Poll::Ready(Ok(()))
73    }
74}