1use futures::future::BoxFuture;
6use futures::{FutureExt, Stream};
7use pin_project::pin_project;
8use std::future::{ready, Future};
9use std::ops::ControlFlow;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13pub trait RequestHandler: Sized {
15 type Request;
17
18 fn handle_request(
21 self: Pin<&mut Self>,
22 request: Self::Request,
23 ) -> impl Future<Output = ControlFlow<()>> + Send;
24
25 fn stream_closed(self: Pin<&mut Self>) -> impl Future<Output = ()> + Send {
28 ready(())
29 }
30}
31
32enum RequestListenerState {
33 PollStream,
35
36 RequestFuture(BoxFuture<'static, ControlFlow<()>>),
40
41 CloseFuture(BoxFuture<'static, ()>),
45
46 Done,
49}
50
51#[pin_project(!Unpin)]
54pub struct RequestListener<RS, RH> {
55 #[pin]
56 stream: RS,
57
58 state: RequestListenerState,
61
62 #[pin]
63 handler: RH,
64}
65
66impl<RS, RH> RequestListener<RS, RH> {
67 pub fn new(stream: RS, handler: RH) -> Self {
68 Self { stream, state: RequestListenerState::PollStream, handler }
69 }
70}
71
72impl<RS, RH, R> Future for RequestListener<RS, RH>
73where
74 RS: Stream<Item = R>,
75 RH: RequestHandler<Request = R>,
76{
77 type Output = ();
78
79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
80 loop {
81 let this = self.as_mut().project();
82 *this.state = match this.state {
83 RequestListenerState::PollStream => match this.stream.poll_next(cx) {
84 Poll::Pending => return Poll::Pending,
85 Poll::Ready(None) => {
86 let close_future = this.handler.stream_closed().boxed();
87 let close_future = unsafe {
92 std::mem::transmute::<BoxFuture<'_, ()>, BoxFuture<'static, ()>>(
93 close_future,
94 )
95 };
96 RequestListenerState::CloseFuture(close_future)
97 }
98 Poll::Ready(Some(request)) => {
99 let request_future = this.handler.handle_request(request).boxed();
100 let request_future = unsafe {
105 std::mem::transmute::<
106 BoxFuture<'_, ControlFlow<()>>,
107 BoxFuture<'static, ControlFlow<()>>,
108 >(request_future)
109 };
110 RequestListenerState::RequestFuture(request_future)
111 }
112 },
113 RequestListenerState::RequestFuture(fut) => match fut.as_mut().poll(cx) {
114 Poll::Pending => return Poll::Pending,
115 Poll::Ready(ControlFlow::Break(())) => RequestListenerState::Done,
116 Poll::Ready(ControlFlow::Continue(())) => RequestListenerState::PollStream,
117 },
118 RequestListenerState::CloseFuture(fut) => match fut.as_mut().poll(cx) {
119 Poll::Pending => return Poll::Pending,
120 Poll::Ready(()) => RequestListenerState::Done,
121 },
122 RequestListenerState::Done => return Poll::Ready(()),
123 };
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use assert_matches::assert_matches;
132 use futures::poll;
133 use std::future::{pending, poll_fn};
134 use std::pin::pin;
135
136 enum Action {
137 Continue,
138 Stop,
139 Pending,
140 }
141
142 struct Handler<'a> {
143 request_responses: &'a [Action],
144 close_responses: &'a [Poll<()>],
145 }
146
147 impl<'a> Handler<'a> {
148 fn new(request_responses: &'a [Action], close_responses: &'a [Poll<()>]) -> Self {
149 Self { request_responses, close_responses }
150 }
151 }
152
153 impl RequestHandler for Handler<'_> {
154 type Request = u8;
155 fn handle_request(
156 mut self: Pin<&mut Self>,
157 _request: Self::Request,
158 ) -> impl Future<Output = ControlFlow<()>> + Send {
159 poll_fn(move |_| {
160 let this = self.as_mut().get_mut();
161 let (action, remaining) =
162 this.request_responses.split_first().expect("polled more times than expected");
163 this.request_responses = remaining;
164 match action {
165 Action::Continue => Poll::Ready(ControlFlow::Continue(())),
166 Action::Stop => Poll::Ready(ControlFlow::Break(())),
167 Action::Pending => Poll::Pending,
168 }
169 })
170 }
171
172 fn stream_closed(mut self: Pin<&mut Self>) -> impl Future<Output = ()> + Send {
173 poll_fn(move |_| {
174 let this = self.as_mut().get_mut();
175 let (response, remaining) =
176 this.close_responses.split_first().expect("polled more times than expected");
177 this.close_responses = remaining;
178 *response
179 })
180 }
181 }
182
183 struct Requests<'a> {
184 requests: &'a [Poll<u8>],
185 }
186
187 impl<'a> Requests<'a> {
188 fn new(requests: &'a [Poll<u8>]) -> Self {
189 Self { requests }
190 }
191 }
192
193 impl Stream for Requests<'_> {
194 type Item = u8;
195
196 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197 let this = self.get_mut();
198 match this.requests.split_first() {
199 None => Poll::Ready(None),
200 Some((action, remaining)) => {
201 this.requests = remaining;
202 match action {
203 Poll::Pending => Poll::Pending,
204 Poll::Ready(request) => Poll::Ready(Some(*request)),
205 }
206 }
207 }
208 }
209 }
210
211 #[fuchsia::test]
212 async fn test_empty_stream() {
213 let handler = Handler::new(&[], &[Poll::Ready(())]);
214 let stream = Requests::new(&[]);
215 let mut listener = pin!(RequestListener::new(stream, handler));
216
217 listener.as_mut().await;
218
219 assert!(listener.handler.close_responses.is_empty());
220 }
221
222 #[fuchsia::test]
223 async fn test_stream_pending() {
224 let handler = Handler::new(&[Action::Continue, Action::Continue], &[Poll::Ready(())]);
225 let stream = Requests::new(&[Poll::Ready(1), Poll::Pending, Poll::Ready(3)]);
226 let mut listener = pin!(RequestListener::new(stream, handler));
227
228 assert_matches!(poll!(&mut listener), Poll::Pending);
229 assert!(matches!(listener.state, RequestListenerState::PollStream));
230 assert_matches!(poll!(&mut listener), Poll::Ready(()));
231
232 assert!(listener.handler.request_responses.is_empty());
233 assert!(listener.handler.close_responses.is_empty());
234 assert!(listener.stream.requests.is_empty());
235 }
236
237 #[fuchsia::test]
238 async fn test_handle_request_pending() {
239 let handler = Handler::new(
240 &[Action::Continue, Action::Pending, Action::Continue],
241 &[Poll::Ready(())],
242 );
243 let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(3)]);
244 let mut listener = pin!(RequestListener::new(stream, handler));
245
246 assert_matches!(poll!(&mut listener), Poll::Pending);
247 assert!(matches!(listener.state, RequestListenerState::RequestFuture(_)));
248 assert_matches!(poll!(&mut listener), Poll::Ready(()));
249
250 assert!(listener.handler.request_responses.is_empty());
251 assert!(listener.handler.close_responses.is_empty());
252 assert!(listener.stream.requests.is_empty());
253 }
254
255 #[fuchsia::test]
256 async fn test_stream_closed_pending() {
257 let handler =
258 Handler::new(&[Action::Continue, Action::Continue], &[Poll::Pending, Poll::Ready(())]);
259 let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(3)]);
260 let mut listener = pin!(RequestListener::new(stream, handler));
261
262 assert_matches!(poll!(&mut listener), Poll::Pending);
263 assert!(matches!(listener.state, RequestListenerState::CloseFuture(_)));
264 assert_matches!(poll!(&mut listener), Poll::Ready(()));
265
266 assert!(listener.handler.request_responses.is_empty());
267 assert!(listener.handler.close_responses.is_empty());
268 assert!(listener.stream.requests.is_empty());
269 }
270
271 #[fuchsia::test]
272 async fn test_stream_closed_called_when_stream_ends() {
273 let handler = Handler::new(&[Action::Continue, Action::Continue], &[Poll::Ready(())]);
274 let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(2)]);
275 let mut listener = pin!(RequestListener::new(stream, handler));
276
277 (&mut listener).await;
278
279 assert!(listener.handler.request_responses.is_empty());
280 assert!(listener.handler.close_responses.is_empty());
281 assert!(listener.stream.requests.is_empty());
282 assert!(matches!(listener.state, RequestListenerState::Done));
283
284 (&mut listener).await;
285 }
286
287 #[fuchsia::test]
288 async fn test_stream_closed_not_called_when_request_handler_stops() {
289 let handler = Handler::new(&[Action::Continue, Action::Stop], &[]);
291 let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(2), Poll::Ready(3)]);
292 let mut listener = pin!(RequestListener::new(stream, handler));
293
294 (&mut listener).await;
295
296 assert!(listener.handler.request_responses.is_empty());
297 assert_eq!(listener.stream.requests, &[Poll::Ready(3u8)]);
298 assert!(matches!(listener.state, RequestListenerState::Done));
299 }
300
301 #[fuchsia::test]
302 async fn test_poll_after_done() {
303 let handler = Handler::new(&[Action::Continue], &[Poll::Ready(())]);
304 let stream = Requests::new(&[Poll::Ready(1)]);
305 let mut listener = pin!(RequestListener::new(stream, handler));
306
307 (&mut listener).await;
308 assert!(matches!(listener.state, RequestListenerState::Done));
309 (&mut listener).await;
310 }
311
312 struct HandlerWithDropInFuture {
313 future_dropped: bool,
314 }
315 impl Drop for HandlerWithDropInFuture {
316 fn drop(&mut self) {
317 assert!(self.future_dropped, "Handler dropped before future");
318 }
319 }
320
321 struct UpdateHandlerOnDrop<'a> {
322 handler: &'a mut HandlerWithDropInFuture,
323 }
324
325 impl Drop for UpdateHandlerOnDrop<'_> {
326 fn drop(&mut self) {
327 assert!(!self.handler.future_dropped);
328 self.handler.future_dropped = true;
329 }
330 }
331
332 impl RequestHandler for HandlerWithDropInFuture {
333 type Request = u8;
334
335 async fn handle_request(self: Pin<&mut Self>, _request: Self::Request) -> ControlFlow<()> {
336 let _defer_drop = UpdateHandlerOnDrop { handler: self.get_mut() };
337 pending().await
338 }
339
340 async fn stream_closed(self: Pin<&mut Self>) -> () {
341 let _defer_drop = UpdateHandlerOnDrop { handler: self.get_mut() };
342 pending().await
343 }
344 }
345
346 #[fuchsia::test]
347 async fn test_request_future_dropped_before_handler() {
348 let handler = HandlerWithDropInFuture { future_dropped: false };
349 let stream = Requests::new(&[Poll::Ready(1)]);
350 let mut listener = pin!(RequestListener::new(stream, handler));
351
352 assert_matches!(poll!(&mut listener), Poll::Pending);
353 assert!(matches!(listener.state, RequestListenerState::RequestFuture(_)));
354 assert!(!listener.handler.future_dropped);
355
356 }
358
359 #[fuchsia::test]
360 async fn test_close_future_dropped_before_handler() {
361 let handler = HandlerWithDropInFuture { future_dropped: false };
362 let stream = Requests::new(&[]);
363 let mut listener = pin!(RequestListener::new(stream, handler));
364
365 assert_matches!(poll!(&mut listener), Poll::Pending);
366 assert!(matches!(listener.state, RequestListenerState::CloseFuture(_)));
367 assert!(!listener.handler.future_dropped);
368
369 }
371}