vfs/
request_handler.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::future::BoxFuture;
6use futures::{FutureExt, Stream};
7use pin_project::pin_project;
8use std::future::{ready, Future};
9use std::ops::ControlFlow;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// Trait for handling requests in a [`RequestListener`].
14pub trait RequestHandler: Sized {
15    /// The type of requests that this request handler handles.
16    type Request;
17
18    /// Called for each request in the stream or until [`ControlFlow::Break`] is
19    /// returned.
20    fn handle_request(
21        self: Pin<&mut Self>,
22        request: Self::Request,
23    ) -> impl Future<Output = ControlFlow<()>> + Send;
24
25    /// Called when the stream ends. Will not be called if [`RequestHandler::handle_request`]
26    /// returns [`ControlFlow::Break`].
27    fn stream_closed(self: Pin<&mut Self>) -> impl Future<Output = ()> + Send {
28        ready(())
29    }
30}
31
32enum RequestListenerState {
33    /// Indicates that [`RequestListener::stream`] should be polled for more requests.
34    PollStream,
35
36    /// Holds the future returned from [`RequestHandler::handle_request`]. The 'static lifetime is a
37    /// lie. This future holds a reference back to [`RequestListener::handler`] making
38    /// [`RequestListener`] self-referential.
39    RequestFuture(BoxFuture<'static, ControlFlow<()>>),
40
41    /// Holds the future returned from [`RequestHandler::stream_closed`]. The 'static lifetime is a
42    /// lie. This future holds a reference back to [`RequestListener::handler`] making
43    /// [`RequestListener`] self-referential.
44    CloseFuture(BoxFuture<'static, ()>),
45
46    /// Indicates that there's nothing left to be done and [`RequestListener`] should return
47    /// [`Poll::Ready`] from its future.
48    Done,
49}
50
51/// A `Future` for handling requests in a `Stream`. Optimized to reduce memory usage while the
52/// stream is idle.
53#[pin_project(!Unpin)]
54pub struct RequestListener<RS, RH> {
55    #[pin]
56    stream: RS,
57
58    // `state` could hold a future that references `handler` so `state` must come before `handler`
59    // so it gets dropped before `handler`.
60    state: RequestListenerState,
61
62    #[pin]
63    handler: RH,
64}
65
66impl<RS, RH> RequestListener<RS, RH> {
67    pub fn new(stream: RS, handler: RH) -> Self {
68        Self { stream, state: RequestListenerState::PollStream, handler }
69    }
70}
71
72impl<RS, RH, R> Future for RequestListener<RS, RH>
73where
74    RS: Stream<Item = R>,
75    RH: RequestHandler<Request = R>,
76{
77    type Output = ();
78
79    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
80        loop {
81            let this = self.as_mut().project();
82            *this.state = match this.state {
83                RequestListenerState::PollStream => match this.stream.poll_next(cx) {
84                    Poll::Pending => return Poll::Pending,
85                    Poll::Ready(None) => {
86                        let close_future = this.handler.stream_closed().boxed();
87                        // SAFETY: The future holds a reference to the handler making this a
88                        // self-referential struct. This struct doesn't implement Unpin and the
89                        // future is always dropped before the handler so transmuting the future's
90                        // lifetime to static is safe.
91                        let close_future = unsafe {
92                            std::mem::transmute::<BoxFuture<'_, ()>, BoxFuture<'static, ()>>(
93                                close_future,
94                            )
95                        };
96                        RequestListenerState::CloseFuture(close_future)
97                    }
98                    Poll::Ready(Some(request)) => {
99                        let request_future = this.handler.handle_request(request).boxed();
100                        // SAFETY: The future holds a reference to the handler making this a
101                        // self-referential struct. This struct doesn't implement Unpin and the
102                        // future is always dropped before the handler so transmuting the future's
103                        // lifetime to static is safe.
104                        let request_future = unsafe {
105                            std::mem::transmute::<
106                                BoxFuture<'_, ControlFlow<()>>,
107                                BoxFuture<'static, ControlFlow<()>>,
108                            >(request_future)
109                        };
110                        RequestListenerState::RequestFuture(request_future)
111                    }
112                },
113                RequestListenerState::RequestFuture(fut) => match fut.as_mut().poll(cx) {
114                    Poll::Pending => return Poll::Pending,
115                    Poll::Ready(ControlFlow::Break(())) => RequestListenerState::Done,
116                    Poll::Ready(ControlFlow::Continue(())) => RequestListenerState::PollStream,
117                },
118                RequestListenerState::CloseFuture(fut) => match fut.as_mut().poll(cx) {
119                    Poll::Pending => return Poll::Pending,
120                    Poll::Ready(()) => RequestListenerState::Done,
121                },
122                RequestListenerState::Done => return Poll::Ready(()),
123            };
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use assert_matches::assert_matches;
132    use futures::poll;
133    use std::future::{pending, poll_fn};
134    use std::pin::pin;
135
136    enum Action {
137        Continue,
138        Stop,
139        Pending,
140    }
141
142    struct Handler<'a> {
143        request_responses: &'a [Action],
144        close_responses: &'a [Poll<()>],
145    }
146
147    impl<'a> Handler<'a> {
148        fn new(request_responses: &'a [Action], close_responses: &'a [Poll<()>]) -> Self {
149            Self { request_responses, close_responses }
150        }
151    }
152
153    impl RequestHandler for Handler<'_> {
154        type Request = u8;
155        fn handle_request(
156            mut self: Pin<&mut Self>,
157            _request: Self::Request,
158        ) -> impl Future<Output = ControlFlow<()>> + Send {
159            poll_fn(move |_| {
160                let this = self.as_mut().get_mut();
161                let (action, remaining) =
162                    this.request_responses.split_first().expect("polled more times than expected");
163                this.request_responses = remaining;
164                match action {
165                    Action::Continue => Poll::Ready(ControlFlow::Continue(())),
166                    Action::Stop => Poll::Ready(ControlFlow::Break(())),
167                    Action::Pending => Poll::Pending,
168                }
169            })
170        }
171
172        fn stream_closed(mut self: Pin<&mut Self>) -> impl Future<Output = ()> + Send {
173            poll_fn(move |_| {
174                let this = self.as_mut().get_mut();
175                let (response, remaining) =
176                    this.close_responses.split_first().expect("polled more times than expected");
177                this.close_responses = remaining;
178                *response
179            })
180        }
181    }
182
183    struct Requests<'a> {
184        requests: &'a [Poll<u8>],
185    }
186
187    impl<'a> Requests<'a> {
188        fn new(requests: &'a [Poll<u8>]) -> Self {
189            Self { requests }
190        }
191    }
192
193    impl Stream for Requests<'_> {
194        type Item = u8;
195
196        fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197            let this = self.get_mut();
198            match this.requests.split_first() {
199                None => Poll::Ready(None),
200                Some((action, remaining)) => {
201                    this.requests = remaining;
202                    match action {
203                        Poll::Pending => Poll::Pending,
204                        Poll::Ready(request) => Poll::Ready(Some(*request)),
205                    }
206                }
207            }
208        }
209    }
210
211    #[fuchsia::test]
212    async fn test_empty_stream() {
213        let handler = Handler::new(&[], &[Poll::Ready(())]);
214        let stream = Requests::new(&[]);
215        let mut listener = pin!(RequestListener::new(stream, handler));
216
217        listener.as_mut().await;
218
219        assert!(listener.handler.close_responses.is_empty());
220    }
221
222    #[fuchsia::test]
223    async fn test_stream_pending() {
224        let handler = Handler::new(&[Action::Continue, Action::Continue], &[Poll::Ready(())]);
225        let stream = Requests::new(&[Poll::Ready(1), Poll::Pending, Poll::Ready(3)]);
226        let mut listener = pin!(RequestListener::new(stream, handler));
227
228        assert_matches!(poll!(&mut listener), Poll::Pending);
229        assert!(matches!(listener.state, RequestListenerState::PollStream));
230        assert_matches!(poll!(&mut listener), Poll::Ready(()));
231
232        assert!(listener.handler.request_responses.is_empty());
233        assert!(listener.handler.close_responses.is_empty());
234        assert!(listener.stream.requests.is_empty());
235    }
236
237    #[fuchsia::test]
238    async fn test_handle_request_pending() {
239        let handler = Handler::new(
240            &[Action::Continue, Action::Pending, Action::Continue],
241            &[Poll::Ready(())],
242        );
243        let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(3)]);
244        let mut listener = pin!(RequestListener::new(stream, handler));
245
246        assert_matches!(poll!(&mut listener), Poll::Pending);
247        assert!(matches!(listener.state, RequestListenerState::RequestFuture(_)));
248        assert_matches!(poll!(&mut listener), Poll::Ready(()));
249
250        assert!(listener.handler.request_responses.is_empty());
251        assert!(listener.handler.close_responses.is_empty());
252        assert!(listener.stream.requests.is_empty());
253    }
254
255    #[fuchsia::test]
256    async fn test_stream_closed_pending() {
257        let handler =
258            Handler::new(&[Action::Continue, Action::Continue], &[Poll::Pending, Poll::Ready(())]);
259        let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(3)]);
260        let mut listener = pin!(RequestListener::new(stream, handler));
261
262        assert_matches!(poll!(&mut listener), Poll::Pending);
263        assert!(matches!(listener.state, RequestListenerState::CloseFuture(_)));
264        assert_matches!(poll!(&mut listener), Poll::Ready(()));
265
266        assert!(listener.handler.request_responses.is_empty());
267        assert!(listener.handler.close_responses.is_empty());
268        assert!(listener.stream.requests.is_empty());
269    }
270
271    #[fuchsia::test]
272    async fn test_stream_closed_called_when_stream_ends() {
273        let handler = Handler::new(&[Action::Continue, Action::Continue], &[Poll::Ready(())]);
274        let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(2)]);
275        let mut listener = pin!(RequestListener::new(stream, handler));
276
277        (&mut listener).await;
278
279        assert!(listener.handler.request_responses.is_empty());
280        assert!(listener.handler.close_responses.is_empty());
281        assert!(listener.stream.requests.is_empty());
282        assert!(matches!(listener.state, RequestListenerState::Done));
283
284        (&mut listener).await;
285    }
286
287    #[fuchsia::test]
288    async fn test_stream_closed_not_called_when_request_handler_stops() {
289        // No stream_closed calls are expected so the handler will panic if it's called.
290        let handler = Handler::new(&[Action::Continue, Action::Stop], &[]);
291        let stream = Requests::new(&[Poll::Ready(1), Poll::Ready(2), Poll::Ready(3)]);
292        let mut listener = pin!(RequestListener::new(stream, handler));
293
294        (&mut listener).await;
295
296        assert!(listener.handler.request_responses.is_empty());
297        assert_eq!(listener.stream.requests, &[Poll::Ready(3u8)]);
298        assert!(matches!(listener.state, RequestListenerState::Done));
299    }
300
301    #[fuchsia::test]
302    async fn test_poll_after_done() {
303        let handler = Handler::new(&[Action::Continue], &[Poll::Ready(())]);
304        let stream = Requests::new(&[Poll::Ready(1)]);
305        let mut listener = pin!(RequestListener::new(stream, handler));
306
307        (&mut listener).await;
308        assert!(matches!(listener.state, RequestListenerState::Done));
309        (&mut listener).await;
310    }
311
312    struct HandlerWithDropInFuture {
313        future_dropped: bool,
314    }
315    impl Drop for HandlerWithDropInFuture {
316        fn drop(&mut self) {
317            assert!(self.future_dropped, "Handler dropped before future");
318        }
319    }
320
321    struct UpdateHandlerOnDrop<'a> {
322        handler: &'a mut HandlerWithDropInFuture,
323    }
324
325    impl Drop for UpdateHandlerOnDrop<'_> {
326        fn drop(&mut self) {
327            assert!(!self.handler.future_dropped);
328            self.handler.future_dropped = true;
329        }
330    }
331
332    impl RequestHandler for HandlerWithDropInFuture {
333        type Request = u8;
334
335        async fn handle_request(self: Pin<&mut Self>, _request: Self::Request) -> ControlFlow<()> {
336            let _defer_drop = UpdateHandlerOnDrop { handler: self.get_mut() };
337            pending().await
338        }
339
340        async fn stream_closed(self: Pin<&mut Self>) -> () {
341            let _defer_drop = UpdateHandlerOnDrop { handler: self.get_mut() };
342            pending().await
343        }
344    }
345
346    #[fuchsia::test]
347    async fn test_request_future_dropped_before_handler() {
348        let handler = HandlerWithDropInFuture { future_dropped: false };
349        let stream = Requests::new(&[Poll::Ready(1)]);
350        let mut listener = pin!(RequestListener::new(stream, handler));
351
352        assert_matches!(poll!(&mut listener), Poll::Pending);
353        assert!(matches!(listener.state, RequestListenerState::RequestFuture(_)));
354        assert!(!listener.handler.future_dropped);
355
356        // The handler's drop impl will panic if it's dropped before the future.
357    }
358
359    #[fuchsia::test]
360    async fn test_close_future_dropped_before_handler() {
361        let handler = HandlerWithDropInFuture { future_dropped: false };
362        let stream = Requests::new(&[]);
363        let mut listener = pin!(RequestListener::new(stream, handler));
364
365        assert_matches!(poll!(&mut listener), Poll::Pending);
366        assert!(matches!(listener.state, RequestListenerState::CloseFuture(_)));
367        assert!(!listener.handler.future_dropped);
368
369        // The handler's drop impl will panic if it's dropped before the future.
370    }
371}