futures_util/sink/
map_err.rs

1use core::pin::Pin;
2use futures_core::stream::{FusedStream, Stream};
3use futures_core::task::{Context, Poll};
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
9    #[derive(Debug, Clone)]
10    #[must_use = "sinks do nothing unless polled"]
11    pub struct SinkMapErr<Si, F> {
12        #[pin]
13        sink: Si,
14        f: Option<F>,
15    }
16}
17
18impl<Si, F> SinkMapErr<Si, F> {
19    pub(super) fn new(sink: Si, f: F) -> Self {
20        Self { sink, f: Some(f) }
21    }
22
23    delegate_access_inner!(sink, Si, ());
24
25    fn take_f(self: Pin<&mut Self>) -> F {
26        self.project().f.take().expect("polled MapErr after completion")
27    }
28}
29
30impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
31where
32    Si: Sink<Item>,
33    F: FnOnce(Si::Error) -> E,
34{
35    type Error = E;
36
37    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e))
39    }
40
41    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
42        self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e))
43    }
44
45    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46        self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e))
47    }
48
49    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50        self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e))
51    }
52}
53
54// Forwarding impl of Stream from the underlying sink
55impl<S: Stream, F> Stream for SinkMapErr<S, F> {
56    type Item = S::Item;
57
58    delegate_stream!(sink);
59}
60
61impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
62    fn is_terminated(&self) -> bool {
63        self.sink.is_terminated()
64    }
65}