futures_util/stream/stream/
split.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::Stream;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7
8use crate::lock::BiLock;
9
10/// A `Stream` part of the split pair
11#[derive(Debug)]
12#[must_use = "streams do nothing unless polled"]
13#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14pub struct SplitStream<S>(BiLock<S>);
15
16impl<S> Unpin for SplitStream<S> {}
17
18impl<S> SplitStream<S> {
19    /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
20    pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
21        other.is_pair_of(&self)
22    }
23}
24
25impl<S: Unpin> SplitStream<S> {
26    /// Attempts to put the two "halves" of a split `Stream + Sink` back
27    /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
28    /// a matching pair originating from the same call to `StreamExt::split`.
29    pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
30    where
31        S: Sink<Item>,
32    {
33        other.reunite(self)
34    }
35}
36
37impl<S: Stream> Stream for SplitStream<S> {
38    type Item = S::Item;
39
40    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41        ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42    }
43}
44
45#[allow(non_snake_case)]
46fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
47    SplitSink { lock, slot: None }
48}
49
50/// A `Sink` part of the split pair
51#[derive(Debug)]
52#[must_use = "sinks do nothing unless polled"]
53#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
54pub struct SplitSink<S, Item> {
55    lock: BiLock<S>,
56    slot: Option<Item>,
57}
58
59impl<S, Item> Unpin for SplitSink<S, Item> {}
60
61impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
62    /// Attempts to put the two "halves" of a split `Stream + Sink` back
63    /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
64    /// a matching pair originating from the same call to `StreamExt::split`.
65    pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
66        self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
67    }
68}
69
70impl<S, Item> SplitSink<S, Item> {
71    /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
72    pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
73        self.lock.is_pair_of(&other.0)
74    }
75}
76
77impl<S: Sink<Item>, Item> SplitSink<S, Item> {
78    fn poll_flush_slot(
79        mut inner: Pin<&mut S>,
80        slot: &mut Option<Item>,
81        cx: &mut Context<'_>,
82    ) -> Poll<Result<(), S::Error>> {
83        if slot.is_some() {
84            ready!(inner.as_mut().poll_ready(cx))?;
85            Poll::Ready(inner.start_send(slot.take().unwrap()))
86        } else {
87            Poll::Ready(Ok(()))
88        }
89    }
90
91    fn poll_lock_and_flush_slot(
92        mut self: Pin<&mut Self>,
93        cx: &mut Context<'_>,
94    ) -> Poll<Result<(), S::Error>> {
95        let this = &mut *self;
96        let mut inner = ready!(this.lock.poll_lock(cx));
97        Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
98    }
99}
100
101impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
102    type Error = S::Error;
103
104    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105        loop {
106            if self.slot.is_none() {
107                return Poll::Ready(Ok(()));
108            }
109            ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
110        }
111    }
112
113    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
114        self.slot = Some(item);
115        Ok(())
116    }
117
118    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
119        let this = &mut *self;
120        let mut inner = ready!(this.lock.poll_lock(cx));
121        ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
122        inner.as_pin_mut().poll_flush(cx)
123    }
124
125    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
126        let this = &mut *self;
127        let mut inner = ready!(this.lock.poll_lock(cx));
128        ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
129        inner.as_pin_mut().poll_close(cx)
130    }
131}
132
133pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
134    let (a, b) = BiLock::new(s);
135    let read = SplitStream(a);
136    let write = SplitSink(b);
137    (write, read)
138}
139
140/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
141/// of a `Stream + Split`, and thus could not be `reunite`d.
142#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
143pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
144
145impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_tuple("ReuniteError").field(&"...").finish()
148    }
149}
150
151impl<T, Item> fmt::Display for ReuniteError<T, Item> {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
154    }
155}
156
157#[cfg(feature = "std")]
158impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::{sink::Sink, stream::StreamExt};
164    use core::marker::PhantomData;
165
166    struct NopStream<Item> {
167        phantom: PhantomData<Item>,
168    }
169
170    impl<Item> Stream for NopStream<Item> {
171        type Item = Item;
172
173        fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174            todo!()
175        }
176    }
177
178    impl<Item> Sink<Item> for NopStream<Item> {
179        type Error = ();
180
181        fn poll_ready(
182            self: Pin<&mut Self>,
183            _cx: &mut Context<'_>,
184        ) -> Poll<Result<(), Self::Error>> {
185            todo!()
186        }
187
188        fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
189            todo!()
190        }
191
192        fn poll_flush(
193            self: Pin<&mut Self>,
194            _cx: &mut Context<'_>,
195        ) -> Poll<Result<(), Self::Error>> {
196            todo!()
197        }
198
199        fn poll_close(
200            self: Pin<&mut Self>,
201            _cx: &mut Context<'_>,
202        ) -> Poll<Result<(), Self::Error>> {
203            todo!()
204        }
205    }
206
207    #[test]
208    fn test_pairing() {
209        let s1 = NopStream::<()> { phantom: PhantomData };
210        let (sink1, stream1) = s1.split();
211        assert!(sink1.is_pair_of(&stream1));
212        assert!(stream1.is_pair_of(&sink1));
213
214        let s2 = NopStream::<()> { phantom: PhantomData };
215        let (sink2, stream2) = s2.split();
216        assert!(sink2.is_pair_of(&stream2));
217        assert!(stream2.is_pair_of(&sink2));
218
219        assert!(!sink1.is_pair_of(&stream2));
220        assert!(!stream1.is_pair_of(&sink2));
221        assert!(!sink2.is_pair_of(&stream1));
222        assert!(!stream2.is_pair_of(&sink1));
223    }
224}