async_utils/hanging_get/
client.rs
1use fidl::client::QueryResponseFut;
6use futures::future::{FusedFuture as _, FutureExt as _, MaybeDone};
7use futures::stream::{FusedStream, Stream};
8use futures::task::{Context, Poll};
9use pin_project::pin_project;
10use std::pin::Pin;
11
12#[must_use = "streams do nothing unless polled"]
15#[pin_project]
16pub struct HangingGetStream<P, O, Q = fn(&P) -> QueryResponseFut<O>> {
17 proxy: P,
18 query: Q,
19 response: QueryResponseFut<O>,
20 eager: bool,
21}
22
23impl<P, O, Q> Stream for HangingGetStream<P, O, Q>
24where
25 O: Unpin,
26 Q: FnMut(&P) -> QueryResponseFut<O>,
27{
28 type Item = fidl::Result<O>;
29
30 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 let mut this = self.project();
32 if *this.eager {
33 match this.response.poll_unpin(cx) {
34 Poll::Ready(o) => {
35 let QueryResponseFut(prev_response) =
36 std::mem::replace(this.response, (&mut this.query)(&this.proxy));
37
38 if cfg!(debug_assertions) {
39 match prev_response {
40 MaybeDone::Gone => {}
41 MaybeDone::Future(_) => {
42 panic!("previous uncompleted future still exists")
43 }
44 MaybeDone::Done(_) => {
45 panic!("previous completed future's result not taken")
46 }
47 }
48 } else {
49 let _: MaybeDone<_> = prev_response;
50 }
51
52 Poll::Ready(Some(o))
53 }
54 Poll::Pending => Poll::Pending,
55 }
56 } else {
57 if this.response.is_terminated() {
58 *this.response = (&mut this.query)(&this.proxy);
59 }
60 this.response.poll_unpin(cx).map(Some)
61 }
62 }
63}
64
65impl<P, O, Q> FusedStream for HangingGetStream<P, O, Q>
66where
67 O: Unpin,
68 Q: FnMut(&P) -> QueryResponseFut<O>,
69{
70 fn is_terminated(&self) -> bool {
71 false
72 }
73}
74
75impl<P, O, Q> HangingGetStream<P, O, Q>
76where
77 Q: FnMut(&P) -> QueryResponseFut<O>,
78 O: Unpin,
79{
80 fn new_inner(proxy: P, mut query: Q, eager: bool) -> Self {
81 let response = if eager { query(&proxy) } else { QueryResponseFut(MaybeDone::Gone) };
82
83 Self { proxy, query, response, eager }
84 }
85
86 pub fn new(proxy: P, query: Q) -> Self {
88 Self::new_inner(proxy, query, false )
89 }
90
91 pub fn new_eager(proxy: P, query: Q) -> Self {
93 Self::new_inner(proxy, query, true )
94 }
95}
96
97impl<P, O> HangingGetStream<P, O, fn(&P) -> QueryResponseFut<O>>
98where
99 O: Unpin,
100{
101 pub fn new_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O>) -> Self {
103 Self::new(proxy, query)
104 }
105
106 pub fn new_eager_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O>) -> Self {
108 Self::new_eager(proxy, query)
109 }
110}
111
112#[cfg(test)]
113mod tests {
114
115 use super::*;
116 use fuchsia_async as fasync;
117 use futures::TryStreamExt as _;
118 use std::cell::Cell;
119
120 struct TestProxy {
121 state: Cell<usize>,
122 }
123
124 impl TestProxy {
125 fn watch(&self) -> QueryResponseFut<usize> {
126 let cur = self.state.get();
127 self.state.set(cur + 1);
128 QueryResponseFut(MaybeDone::Done(Ok(cur)))
129 }
130 }
131
132 #[fasync::run_singlethreaded(test)]
133 async fn generates_items_lazily() {
134 let proxy = TestProxy { state: Cell::new(0) };
135 let mut watcher = HangingGetStream::new(proxy, TestProxy::watch);
136
137 const ITERS: usize = 3;
138 for i in 0..ITERS {
139 assert!(watcher.response.is_terminated());
140 assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
141 }
142 }
143
144 #[fasync::run_singlethreaded(test)]
145 async fn generates_items_eagerly() {
146 let proxy = TestProxy { state: Cell::new(0) };
147 let mut watcher = HangingGetStream::new_eager_with_fn_ptr(proxy, TestProxy::watch);
148
149 const ITERS: usize = 3;
150 for i in 0..ITERS {
151 assert!(
152 !watcher.response.is_terminated(),
153 "should keep the server hydrated with an in-flight request",
154 );
155 assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
156 }
157 }
158}