futures_util/stream/stream/
split.rs
1use core::fmt;
2use core::pin::Pin;
3use futures_core::ready;
4use futures_core::stream::Stream;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7
8use crate::lock::BiLock;
9
10#[derive(Debug)]
12#[must_use = "streams do nothing unless polled"]
13#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14pub struct SplitStream<S>(BiLock<S>);
15
16impl<S> Unpin for SplitStream<S> {}
17
18impl<S> SplitStream<S> {
19 pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
21 other.is_pair_of(&self)
22 }
23}
24
25impl<S: Unpin> SplitStream<S> {
26 pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
30 where
31 S: Sink<Item>,
32 {
33 other.reunite(self)
34 }
35}
36
37impl<S: Stream> Stream for SplitStream<S> {
38 type Item = S::Item;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41 ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42 }
43}
44
45#[allow(non_snake_case)]
46fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
47 SplitSink { lock, slot: None }
48}
49
50#[derive(Debug)]
52#[must_use = "sinks do nothing unless polled"]
53#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
54pub struct SplitSink<S, Item> {
55 lock: BiLock<S>,
56 slot: Option<Item>,
57}
58
59impl<S, Item> Unpin for SplitSink<S, Item> {}
60
61impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
62 pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
66 self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
67 }
68}
69
70impl<S, Item> SplitSink<S, Item> {
71 pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
73 self.lock.is_pair_of(&other.0)
74 }
75}
76
77impl<S: Sink<Item>, Item> SplitSink<S, Item> {
78 fn poll_flush_slot(
79 mut inner: Pin<&mut S>,
80 slot: &mut Option<Item>,
81 cx: &mut Context<'_>,
82 ) -> Poll<Result<(), S::Error>> {
83 if slot.is_some() {
84 ready!(inner.as_mut().poll_ready(cx))?;
85 Poll::Ready(inner.start_send(slot.take().unwrap()))
86 } else {
87 Poll::Ready(Ok(()))
88 }
89 }
90
91 fn poll_lock_and_flush_slot(
92 mut self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 ) -> Poll<Result<(), S::Error>> {
95 let this = &mut *self;
96 let mut inner = ready!(this.lock.poll_lock(cx));
97 Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
98 }
99}
100
101impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
102 type Error = S::Error;
103
104 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105 loop {
106 if self.slot.is_none() {
107 return Poll::Ready(Ok(()));
108 }
109 ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
110 }
111 }
112
113 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
114 self.slot = Some(item);
115 Ok(())
116 }
117
118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
119 let this = &mut *self;
120 let mut inner = ready!(this.lock.poll_lock(cx));
121 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
122 inner.as_pin_mut().poll_flush(cx)
123 }
124
125 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
126 let this = &mut *self;
127 let mut inner = ready!(this.lock.poll_lock(cx));
128 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
129 inner.as_pin_mut().poll_close(cx)
130 }
131}
132
133pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
134 let (a, b) = BiLock::new(s);
135 let read = SplitStream(a);
136 let write = SplitSink(b);
137 (write, read)
138}
139
140#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
143pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
144
145impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_tuple("ReuniteError").field(&"...").finish()
148 }
149}
150
151impl<T, Item> fmt::Display for ReuniteError<T, Item> {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
154 }
155}
156
157#[cfg(feature = "std")]
158impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::{sink::Sink, stream::StreamExt};
164 use core::marker::PhantomData;
165
166 struct NopStream<Item> {
167 phantom: PhantomData<Item>,
168 }
169
170 impl<Item> Stream for NopStream<Item> {
171 type Item = Item;
172
173 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174 todo!()
175 }
176 }
177
178 impl<Item> Sink<Item> for NopStream<Item> {
179 type Error = ();
180
181 fn poll_ready(
182 self: Pin<&mut Self>,
183 _cx: &mut Context<'_>,
184 ) -> Poll<Result<(), Self::Error>> {
185 todo!()
186 }
187
188 fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
189 todo!()
190 }
191
192 fn poll_flush(
193 self: Pin<&mut Self>,
194 _cx: &mut Context<'_>,
195 ) -> Poll<Result<(), Self::Error>> {
196 todo!()
197 }
198
199 fn poll_close(
200 self: Pin<&mut Self>,
201 _cx: &mut Context<'_>,
202 ) -> Poll<Result<(), Self::Error>> {
203 todo!()
204 }
205 }
206
207 #[test]
208 fn test_pairing() {
209 let s1 = NopStream::<()> { phantom: PhantomData };
210 let (sink1, stream1) = s1.split();
211 assert!(sink1.is_pair_of(&stream1));
212 assert!(stream1.is_pair_of(&sink1));
213
214 let s2 = NopStream::<()> { phantom: PhantomData };
215 let (sink2, stream2) = s2.split();
216 assert!(sink2.is_pair_of(&stream2));
217 assert!(stream2.is_pair_of(&sink2));
218
219 assert!(!sink1.is_pair_of(&stream2));
220 assert!(!stream1.is_pair_of(&sink2));
221 assert!(!sink2.is_pair_of(&stream1));
222 assert!(!stream2.is_pair_of(&sink1));
223 }
224}