Skip to main content

async_utils/hanging_get/
client.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use fidl::encoding::ResourceDialect;
7use futures::future::{FusedFuture as _, FutureExt as _, MaybeDone};
8use futures::stream::{FusedStream, Stream};
9use futures::task::{Context, Poll};
10use pin_project::pin_project;
11use std::pin::Pin;
12
13/// HangingGetStream is a [`Stream`] that is oriented towards being a client to the
14/// "Hanging Get" design pattern for flow control as in //docs/development/api/fidl.md#flow_control
15#[must_use = "streams do nothing unless polled"]
16#[pin_project]
17pub struct HangingGetStream<
18    P,
19    O,
20    D: ResourceDialect = fidl::encoding::DefaultFuchsiaResourceDialect,
21    Q = fn(&P) -> QueryResponseFut<O, D>,
22> {
23    proxy: P,
24    query: Q,
25    response: QueryResponseFut<O, D>,
26    eager: bool,
27    _dialect: std::marker::PhantomData<D>,
28}
29
30impl<P, O, D, Q> Stream for HangingGetStream<P, O, D, Q>
31where
32    O: Unpin,
33    Q: FnMut(&P) -> QueryResponseFut<O, D>,
34    D: ResourceDialect,
35{
36    type Item = fidl::Result<O>;
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        let mut this = self.project();
40        if *this.eager {
41            match this.response.poll_unpin(cx) {
42                Poll::Ready(o) => {
43                    let QueryResponseFut(prev_response) =
44                        std::mem::replace(this.response, (&mut this.query)(&this.proxy));
45
46                    if cfg!(debug_assertions) {
47                        match prev_response {
48                            MaybeDone::Gone => {}
49                            MaybeDone::Future(_) => {
50                                panic!("previous uncompleted future still exists")
51                            }
52                            MaybeDone::Done(_) => {
53                                panic!("previous completed future's result not taken")
54                            }
55                        }
56                    } else {
57                        let _: MaybeDone<_> = prev_response;
58                    }
59
60                    Poll::Ready(Some(o))
61                }
62                Poll::Pending => Poll::Pending,
63            }
64        } else {
65            if this.response.is_terminated() {
66                *this.response = (&mut this.query)(&this.proxy);
67            }
68            this.response.poll_unpin(cx).map(Some)
69        }
70    }
71}
72
73impl<P, O, D, Q> FusedStream for HangingGetStream<P, O, D, Q>
74where
75    O: Unpin,
76    Q: FnMut(&P) -> QueryResponseFut<O, D>,
77    D: ResourceDialect,
78{
79    fn is_terminated(&self) -> bool {
80        false
81    }
82}
83
84impl<P, O, D, Q> HangingGetStream<P, O, D, Q>
85where
86    Q: FnMut(&P) -> QueryResponseFut<O, D>,
87    O: Unpin,
88    D: ResourceDialect,
89{
90    fn new_inner(proxy: P, mut query: Q, eager: bool) -> Self {
91        let response = if eager { query(&proxy) } else { QueryResponseFut(MaybeDone::Gone) };
92
93        Self { proxy, query, response, eager, _dialect: std::marker::PhantomData }
94    }
95
96    /// Creates a new lazily-polled hanging-get stream.
97    pub fn new(proxy: P, query: Q) -> Self {
98        Self::new_inner(proxy, query, false /* eager */)
99    }
100
101    /// Creates a new eagerly-polled hanging-get stream.
102    pub fn new_eager(proxy: P, query: Q) -> Self {
103        Self::new_inner(proxy, query, true /* eager */)
104    }
105}
106
107impl<P, O, D> HangingGetStream<P, O, D, fn(&P) -> QueryResponseFut<O, D>>
108where
109    O: Unpin,
110    D: ResourceDialect,
111{
112    /// Creates a new lazily-polled hanging-get stream with a function pointer.
113    pub fn new_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O, D>) -> Self {
114        Self::new(proxy, query)
115    }
116
117    /// Creates a new eagerly-polled hanging-get stream with a function pointer.
118    pub fn new_eager_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O, D>) -> Self {
119        Self::new_eager(proxy, query)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125
126    use super::*;
127    use fuchsia_async as fasync;
128    use futures::TryStreamExt as _;
129    use std::cell::Cell;
130
131    struct TestProxy {
132        state: Cell<usize>,
133    }
134
135    impl TestProxy {
136        fn watch(&self) -> QueryResponseFut<usize> {
137            let cur = self.state.get();
138            self.state.set(cur + 1);
139            QueryResponseFut(MaybeDone::Done(Ok(cur)))
140        }
141    }
142
143    #[fasync::run_singlethreaded(test)]
144    async fn generates_items_lazily() {
145        let proxy = TestProxy { state: Cell::new(0) };
146        let mut watcher = HangingGetStream::new(proxy, TestProxy::watch);
147
148        const ITERS: usize = 3;
149        for i in 0..ITERS {
150            assert!(watcher.response.is_terminated());
151            assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
152        }
153    }
154
155    #[fasync::run_singlethreaded(test)]
156    async fn generates_items_eagerly() {
157        let proxy = TestProxy { state: Cell::new(0) };
158        let mut watcher = HangingGetStream::new_eager_with_fn_ptr(proxy, TestProxy::watch);
159
160        const ITERS: usize = 3;
161        for i in 0..ITERS {
162            assert!(
163                !watcher.response.is_terminated(),
164                "should keep the server hydrated with an in-flight request",
165            );
166            assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
167        }
168    }
169}