futures_util/sink/
fanout.rs

1use core::fmt::{Debug, Formatter, Result as FmtResult};
2use core::pin::Pin;
3use futures_core::task::{Context, Poll};
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Sink that clones incoming items and forwards them to two sinks at the same time.
9    ///
10    /// Backpressure from any downstream sink propagates up, which means that this sink
11    /// can only process items as fast as its _slowest_ downstream sink.
12    #[must_use = "sinks do nothing unless polled"]
13    pub struct Fanout<Si1, Si2> {
14        #[pin]
15        sink1: Si1,
16        #[pin]
17        sink2: Si2
18    }
19}
20
21impl<Si1, Si2> Fanout<Si1, Si2> {
22    pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
23        Self { sink1, sink2 }
24    }
25
26    /// Get a shared reference to the inner sinks.
27    pub fn get_ref(&self) -> (&Si1, &Si2) {
28        (&self.sink1, &self.sink2)
29    }
30
31    /// Get a mutable reference to the inner sinks.
32    pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
33        (&mut self.sink1, &mut self.sink2)
34    }
35
36    /// Get a pinned mutable reference to the inner sinks.
37    pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
38        let this = self.project();
39        (this.sink1, this.sink2)
40    }
41
42    /// Consumes this combinator, returning the underlying sinks.
43    ///
44    /// Note that this may discard intermediate state of this combinator,
45    /// so care should be taken to avoid losing resources when this is called.
46    pub fn into_inner(self) -> (Si1, Si2) {
47        (self.sink1, self.sink2)
48    }
49}
50
51impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
52    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
53        f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish()
54    }
55}
56
57impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
58where
59    Si1: Sink<Item>,
60    Item: Clone,
61    Si2: Sink<Item, Error = Si1::Error>,
62{
63    type Error = Si1::Error;
64
65    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66        let this = self.project();
67
68        let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
69        let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
70        let ready = sink1_ready && sink2_ready;
71        if ready {
72            Poll::Ready(Ok(()))
73        } else {
74            Poll::Pending
75        }
76    }
77
78    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
79        let this = self.project();
80
81        this.sink1.start_send(item.clone())?;
82        this.sink2.start_send(item)?;
83        Ok(())
84    }
85
86    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        let this = self.project();
88
89        let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
90        let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
91        let ready = sink1_ready && sink2_ready;
92        if ready {
93            Poll::Ready(Ok(()))
94        } else {
95            Poll::Pending
96        }
97    }
98
99    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100        let this = self.project();
101
102        let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
103        let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
104        let ready = sink1_ready && sink2_ready;
105        if ready {
106            Poll::Ready(Ok(()))
107        } else {
108            Poll::Pending
109        }
110    }
111}