async_utils/hanging_get/
client.rs1use fidl::client::QueryResponseFut;
6use fidl::encoding::ResourceDialect;
7use futures::future::{FusedFuture as _, FutureExt as _, MaybeDone};
8use futures::stream::{FusedStream, Stream};
9use futures::task::{Context, Poll};
10use pin_project::pin_project;
11use std::pin::Pin;
12
13#[must_use = "streams do nothing unless polled"]
16#[pin_project]
17pub struct HangingGetStream<
18 P,
19 O,
20 D: ResourceDialect = fidl::encoding::DefaultFuchsiaResourceDialect,
21 Q = fn(&P) -> QueryResponseFut<O, D>,
22> {
23 proxy: P,
24 query: Q,
25 response: QueryResponseFut<O, D>,
26 eager: bool,
27 _dialect: std::marker::PhantomData<D>,
28}
29
30impl<P, O, D, Q> Stream for HangingGetStream<P, O, D, Q>
31where
32 O: Unpin,
33 Q: FnMut(&P) -> QueryResponseFut<O, D>,
34 D: ResourceDialect,
35{
36 type Item = fidl::Result<O>;
37
38 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39 let mut this = self.project();
40 if *this.eager {
41 match this.response.poll_unpin(cx) {
42 Poll::Ready(o) => {
43 let QueryResponseFut(prev_response) =
44 std::mem::replace(this.response, (&mut this.query)(&this.proxy));
45
46 if cfg!(debug_assertions) {
47 match prev_response {
48 MaybeDone::Gone => {}
49 MaybeDone::Future(_) => {
50 panic!("previous uncompleted future still exists")
51 }
52 MaybeDone::Done(_) => {
53 panic!("previous completed future's result not taken")
54 }
55 }
56 } else {
57 let _: MaybeDone<_> = prev_response;
58 }
59
60 Poll::Ready(Some(o))
61 }
62 Poll::Pending => Poll::Pending,
63 }
64 } else {
65 if this.response.is_terminated() {
66 *this.response = (&mut this.query)(&this.proxy);
67 }
68 this.response.poll_unpin(cx).map(Some)
69 }
70 }
71}
72
73impl<P, O, D, Q> FusedStream for HangingGetStream<P, O, D, Q>
74where
75 O: Unpin,
76 Q: FnMut(&P) -> QueryResponseFut<O, D>,
77 D: ResourceDialect,
78{
79 fn is_terminated(&self) -> bool {
80 false
81 }
82}
83
84impl<P, O, D, Q> HangingGetStream<P, O, D, Q>
85where
86 Q: FnMut(&P) -> QueryResponseFut<O, D>,
87 O: Unpin,
88 D: ResourceDialect,
89{
90 fn new_inner(proxy: P, mut query: Q, eager: bool) -> Self {
91 let response = if eager { query(&proxy) } else { QueryResponseFut(MaybeDone::Gone) };
92
93 Self { proxy, query, response, eager, _dialect: std::marker::PhantomData }
94 }
95
96 pub fn new(proxy: P, query: Q) -> Self {
98 Self::new_inner(proxy, query, false )
99 }
100
101 pub fn new_eager(proxy: P, query: Q) -> Self {
103 Self::new_inner(proxy, query, true )
104 }
105}
106
107impl<P, O, D> HangingGetStream<P, O, D, fn(&P) -> QueryResponseFut<O, D>>
108where
109 O: Unpin,
110 D: ResourceDialect,
111{
112 pub fn new_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O, D>) -> Self {
114 Self::new(proxy, query)
115 }
116
117 pub fn new_eager_with_fn_ptr(proxy: P, query: fn(&P) -> QueryResponseFut<O, D>) -> Self {
119 Self::new_eager(proxy, query)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125
126 use super::*;
127 use fuchsia_async as fasync;
128 use futures::TryStreamExt as _;
129 use std::cell::Cell;
130
131 struct TestProxy {
132 state: Cell<usize>,
133 }
134
135 impl TestProxy {
136 fn watch(&self) -> QueryResponseFut<usize> {
137 let cur = self.state.get();
138 self.state.set(cur + 1);
139 QueryResponseFut(MaybeDone::Done(Ok(cur)))
140 }
141 }
142
143 #[fasync::run_singlethreaded(test)]
144 async fn generates_items_lazily() {
145 let proxy = TestProxy { state: Cell::new(0) };
146 let mut watcher = HangingGetStream::new(proxy, TestProxy::watch);
147
148 const ITERS: usize = 3;
149 for i in 0..ITERS {
150 assert!(watcher.response.is_terminated());
151 assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
152 }
153 }
154
155 #[fasync::run_singlethreaded(test)]
156 async fn generates_items_eagerly() {
157 let proxy = TestProxy { state: Cell::new(0) };
158 let mut watcher = HangingGetStream::new_eager_with_fn_ptr(proxy, TestProxy::watch);
159
160 const ITERS: usize = 3;
161 for i in 0..ITERS {
162 assert!(
163 !watcher.response.is_terminated(),
164 "should keep the server hydrated with an in-flight request",
165 );
166 assert_eq!(watcher.try_next().await.expect("failed to get next item"), Some(i));
167 }
168 }
169}