async_utils/hanging_get/
client.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use futures::future::{FusedFuture as _, FutureExt as _, MaybeDone};
7use futures::stream::{FusedStream, Stream};
8use futures::task::{Context, Poll};
9use pin_project::pin_project;
10use std::pin::Pin;
11
12/// HangingGetStream is a [`Stream`] that is oriented towards being a client to the
13/// "Hanging Get" design pattern for flow control as in //docs/development/api/fidl.md#flow_control
14#[must_use = "streams do nothing unless polled"]
15#[pin_project]
16pub struct HangingGetStream<P, O, Q = fn(&P) -> QueryResponseFut<O>> {
17    proxy: P,
18    query: Q,
19    response: QueryResponseFut<O>,
20    eager: bool,
21}
22
23impl<P, O, Q> Stream for HangingGetStream<P, O, Q>
24where
25    O: Unpin,
26    Q: FnMut(&P) -> QueryResponseFut<O>,
27{
28    type Item = fidl::Result<O>;
29
30    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        let mut this = self.project();
32        if *this.eager {
33            match this.response.poll_unpin(cx) {
34                Poll::Ready(o) => {
35                    let QueryResponseFut(prev_response) =
36                        std::mem::replace(this.response, (&mut this.query)(&this.proxy));
37
38                    if cfg!(debug_assertions) {
39                        match prev_response {
40                            MaybeDone::Gone => {}
41                            MaybeDone::Future(_) => {
42                                panic!("previous uncompleted future still exists")
43                            }
44                            MaybeDone::Done(_) => {
45                                panic!("previous completed future's result not taken")
46                            }
47                        }
48                    } else {
49                        let _: MaybeDone<_> = prev_response;
50                    }
51
52                    Poll::Ready(Some(o))
53                }
54                Poll::Pending => Poll::Pending,
55            }
56        } else {
57            if this.response.is_terminated() {
58                *this.response = (&mut this.query)(&this.proxy);
59            }
60            this.response.poll_unpin(cx).map(Some)
61        }
62    }
63}
64
65impl<P, O, Q> FusedStream for HangingGetStream<P, O, Q>
66where
67    O: Unpin,
68    Q: FnMut(&P) -> QueryResponseFut<O>,
69{
70    fn is_terminated(&self) -> bool {
71        false
72    }
73}
74
75impl<P, O, Q> HangingGetStream<P, O, Q>
76where
77    Q: FnMut(&P) -> QueryResponseFut<O>,
78    O: Unpin,
79{
80    fn new_inner(proxy: P, mut query: Q, eager: bool) -> Self {
81        let response = if eager { query(&proxy) } else { QueryResponseFut(MaybeDone::Gone) };
82
83        Self { proxy, query, response, eager }
84    }
85
86    /// Creates a new lazily-polled hanging-get stream.
87    pub fn new(proxy: P, query: Q) -> Self {
88        Self::new_inner(proxy, query, false /* eager */)
89    }
90
91    /// Creates a new eagerly-polled hanging-get stream.
92    pub fn new_eager(proxy: P, query: Q) -> Self {
93        Self::new_inner(proxy, query, true /* eager */)
94    }
95}
96
97impl<P, O> HangingGetStream<P, O, fn(&P) -> QueryResponseFut<O>>
98where
99    O: Unpin,
100{
101    /// Creates a new lazily-polled hanging-get stream with a function pointer.
102    pub fn new_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O>) -> Self {
103        Self::new(proxy, query)
104    }
105
106    /// Creates a new eagerly-polled hanging-get stream with a function pointer.
107    pub fn new_eager_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O>) -> Self {
108        Self::new_eager(proxy, query)
109    }
110}
111
112#[cfg(test)]
113mod tests {
114
115    use super::*;
116    use fuchsia_async as fasync;
117    use futures::TryStreamExt as _;
118    use std::cell::Cell;
119
120    struct TestProxy {
121        state: Cell<usize>,
122    }
123
124    impl TestProxy {
125        fn watch(&self) -> QueryResponseFut<usize> {
126            let cur = self.state.get();
127            self.state.set(cur + 1);
128            QueryResponseFut(MaybeDone::Done(Ok(cur)))
129        }
130    }
131
132    #[fasync::run_singlethreaded(test)]
133    async fn generates_items_lazily() {
134        let proxy = TestProxy { state: Cell::new(0) };
135        let mut watcher = HangingGetStream::new(proxy, TestProxy::watch);
136
137        const ITERS: usize = 3;
138        for i in 0..ITERS {
139            assert!(watcher.response.is_terminated());
140            assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
141        }
142    }
143
144    #[fasync::run_singlethreaded(test)]
145    async fn generates_items_eagerly() {
146        let proxy = TestProxy { state: Cell::new(0) };
147        let mut watcher = HangingGetStream::new_eager_with_fn_ptr(proxy, TestProxy::watch);
148
149        const ITERS: usize = 3;
150        for i in 0..ITERS {
151            assert!(
152                !watcher.response.is_terminated(),
153                "should keep the server hydrated with an in-flight request",
154            );
155            assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
156        }
157    }
158}