async_ringbuf/wrap/
prod.rs
1use crate::{producer::AsyncProducer, rb::AsyncRbRef, wrap::AsyncProd};
2use core::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6#[cfg(feature = "std")]
7use futures::io::AsyncWrite;
8use futures::{ready, Sink};
9#[cfg(feature = "std")]
10use ringbuf::traits::RingBuffer;
11use ringbuf::{
12 traits::{
13 producer::{DelegateProducer, Producer},
14 Observer,
15 },
16 wrap::Wrap,
17};
18#[cfg(feature = "std")]
19use std::io;
20
21impl<R: AsyncRbRef> DelegateProducer for AsyncProd<R> {}
22
23impl<R: AsyncRbRef> AsyncProducer for AsyncProd<R> {
24 fn register_waker(&self, waker: &core::task::Waker) {
25 self.rb().read.register(waker)
26 }
27
28 #[inline]
29 fn close(&mut self) {
30 drop(self.base.take());
31 }
32}
33
34impl<R: AsyncRbRef> Sink<<R::Rb as Observer>::Item> for AsyncProd<R> {
35 type Error = ();
36
37 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 Poll::Ready(if ready!(<Self as AsyncProducer>::poll_ready(self, cx)) {
39 Ok(())
40 } else {
41 Err(())
42 })
43 }
44 fn start_send(mut self: Pin<&mut Self>, item: <R::Rb as Observer>::Item) -> Result<(), Self::Error> {
45 assert!(self.try_push(item).is_ok());
46 Ok(())
47 }
48 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 Poll::Ready(Ok(()))
51 }
52 fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 self.close();
54 Poll::Ready(Ok(()))
55 }
56}
57
58#[cfg(feature = "std")]
59impl<R: AsyncRbRef> AsyncWrite for AsyncProd<R>
60where
61 R::Rb: RingBuffer<Item = u8>,
62{
63 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
64 <Self as AsyncProducer>::poll_write(self, cx, buf)
65 }
66 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
67 Poll::Ready(Ok(()))
69 }
70 fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
71 self.close();
72 Poll::Ready(Ok(()))
73 }
74}