1use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19pub fn until_stalled<RS: RequestStream>(
32 request_stream: RS,
33 debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35 let (sender, receiver) = oneshot::channel();
36 let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37 let _ = sender.send(channel);
38 });
39 (stream, receiver)
40}
41
42pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46 fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53 pub struct StallableRequestStream<RS, F> {
55 stream: Arc<Mutex<Option<RS>>>,
56 debounce_interval: MonotonicDuration,
57 unbind_callback: Option<F>,
58 #[pin]
59 timer: Option<fasync::Timer>,
60 }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64 pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67 Self {
68 stream: Arc::new(Mutex::new(Some(stream))),
69 debounce_interval,
70 unbind_callback: Some(unbind_callback),
71 timer: None,
72 }
73 }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77 StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79 fn control_handle(&self) -> WeakControlHandle<RS> {
80 WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81 }
82}
83
84pub struct WeakControlHandle<RS> {
85 stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90 RS: RequestStream,
91{
92 pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104 where
105 User: FnOnce(RS::ControlHandle) -> R,
106 {
107 self.stream
108 .upgrade()
109 .as_ref()
110 .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111 .flatten()
112 }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116 for StallableRequestStream<RS, F>
117{
118 type Item = <RS as Stream>::Item;
119
120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 let poll_result = self
122 .stream
123 .as_ref()
124 .lock()
125 .as_mut()
126 .expect("Stream already resolved")
127 .poll_next_unpin(cx);
128 let mut this = self.project();
129 match poll_result {
130 Poll::Ready(message) => {
131 if message.is_none() {
132 this.unbind_callback.take().unwrap()(None);
133 } else {
134 this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
135 }
136 Poll::Ready(message)
137 }
138 Poll::Pending => {
139 loop {
140 if this.timer.is_none() {
141 this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
142 }
143 ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144 this.timer.set(None);
145
146 let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149 match Arc::try_unwrap(inner) {
150 Ok(inner) => {
151 this.unbind_callback.take().unwrap()(Some(
152 inner.into_channel().into_zx_channel(),
153 ));
154 return Poll::Ready(None);
155 }
156 Err(inner) => {
157 *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160 }
161 }
162 }
163 }
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use assert_matches::assert_matches;
172 use fasync::TestExecutor;
173 use fidl::endpoints::Proxy;
174 use fidl_fuchsia_io as fio;
175 use fuchsia_async as fasync;
176 use futures::{FutureExt, TryStreamExt};
177 use std::pin::pin;
178
179 #[fuchsia::test(allow_stalls = false)]
180 async fn no_message() {
181 let initial = fasync::MonotonicInstant::from_nanos(0);
182 TestExecutor::advance_to(initial).await;
183 const DURATION_NANOS: i64 = 1_000_000;
184 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
185
186 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
187 let (stream, stalled) = until_stalled(stream, idle_duration);
188 let mut stream = pin!(stream);
189
190 assert_matches!(
191 futures::join!(
192 stream.next(),
193 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
194 ),
195 (None, Ok(Some(_)))
196 );
197 }
198
199 #[fuchsia::test(allow_stalls = false)]
200 async fn strong_control_handle_blocks_stalling() {
201 let initial = fasync::MonotonicInstant::from_nanos(0);
202 TestExecutor::advance_to(initial).await;
203 const DURATION_NANOS: i64 = 1_000_000;
204 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
205
206 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
207 let (stream, mut stalled) = until_stalled(stream, idle_duration);
208
209 let strong_control_handle: fio::DirectoryControlHandle =
210 stream.control_handle().use_control_handle(|x| x).unwrap();
211
212 TestExecutor::advance_to(initial + idle_duration * 2).await;
214 let mut stream = pin!(stream.fuse());
215 futures::select! {
216 _ = stream.next() => unreachable!(),
217 _ = stalled => unreachable!(),
218 default => {},
219 }
220
221 drop(strong_control_handle);
223 assert_matches!(
224 futures::join!(
225 stream.next(),
226 TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
227 ),
228 (None, Ok(Some(_)))
229 );
230 }
231
232 #[fuchsia::test(allow_stalls = false)]
233 async fn weak_control_handle() {
234 let initial = fasync::MonotonicInstant::from_nanos(0);
235 TestExecutor::advance_to(initial).await;
236 const DURATION_NANOS: i64 = 1_000_000;
237 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
238
239 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240 let (stream, stalled) = until_stalled(stream, idle_duration);
241
242 let weak_control_handle = stream.control_handle();
244
245 let mut stream = pin!(stream);
246 assert_matches!(
247 futures::join!(
248 stream.next(),
249 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
250 ),
251 (None, Ok(Some(_)))
252 );
253
254 weak_control_handle.use_control_handle(|_| unreachable!());
255 }
256
257 #[fuchsia::test(allow_stalls = false)]
258 async fn one_message() {
259 let initial = fasync::MonotonicInstant::from_nanos(0);
260 TestExecutor::advance_to(initial).await;
261 const DURATION_NANOS: i64 = 1_000_000;
262 let debounce_interval = MonotonicDuration::from_nanos(DURATION_NANOS);
263
264 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
265 let (stream, stalled) = until_stalled(stream, debounce_interval);
266 let mut stream = pin!(stream);
267 let mut stalled = pin!(stalled);
268
269 let mut message = pin!(stream.next());
271 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
272 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
273
274 TestExecutor::advance_to(initial + debounce_interval / 2).await;
276 let _ = proxy.get_flags();
277 assert_matches!(
278 TestExecutor::poll_until_stalled(&mut message).await,
279 Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) => {
280 responder.send(Ok(fio::Flags::empty())).unwrap();
282 }
283 );
284 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
285
286 TestExecutor::advance_to(initial + debounce_interval).await;
289 let mut message = pin!(stream.next());
290 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
291 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
292
293 TestExecutor::advance_to(initial + debounce_interval / 2 + debounce_interval).await;
295 assert_matches!(message.await, None);
296 assert_matches!(stalled.await, Ok(Some(_)));
297 }
298
299 #[fuchsia::test(allow_stalls = false)]
300 async fn pending_reply_blocks_stalling() {
301 let initial = fasync::MonotonicInstant::from_nanos(0);
302 TestExecutor::advance_to(initial).await;
303 const DURATION_NANOS: i64 = 1_000_000;
304 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
305
306 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
307 let (stream, mut stalled) = until_stalled(stream, idle_duration);
308 let mut stream = pin!(stream.fuse());
309
310 let _ = proxy.get_flags();
311
312 let message_with_pending_reply = stream.next().await.unwrap();
315 let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
316 else {
317 panic!("Unexpected {message_with_pending_reply:?}");
318 };
319
320 TestExecutor::advance_to(initial + idle_duration * 2).await;
322 futures::select! {
323 _ = stream.next() => unreachable!(),
324 _ = stalled => unreachable!(),
325 default => {},
326 }
327
328 responder.send(Ok(fio::Flags::empty())).unwrap();
330
331 assert_matches!(
333 futures::join!(
334 stream.next(),
335 TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
336 ),
337 (None, Ok(Some(_)))
338 );
339 }
340
341 #[fuchsia::test(allow_stalls = false)]
342 async fn completed_stream() {
343 let initial = fasync::MonotonicInstant::from_nanos(0);
344 TestExecutor::advance_to(initial).await;
345 const DURATION_NANOS: i64 = 1_000_000;
346 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
347
348 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
349 let (stream, stalled) = until_stalled(stream, idle_duration);
350
351 let mut stalled = pin!(stalled);
352 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
353
354 drop(proxy);
356
357 let mut stream = pin!(stream);
358
359 {
360 assert_matches!(stream.next().await, None);
362
363 drop(stream);
366 }
367
368 assert_matches!(stalled.await, Ok(None));
371 }
372
373 #[fuchsia::test(allow_stalls = false)]
377 async fn end_to_end() {
378 let initial = fasync::MonotonicInstant::from_nanos(0);
379 TestExecutor::advance_to(initial).await;
380 use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
381
382 const DURATION_NANOS: i64 = 40_000_000;
383 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
384 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
385 let (stream, stalled) = until_stalled(stream, idle_duration);
386
387 let task = fasync::Task::spawn(async move {
389 let mut stream = pin!(stream);
390 while let Some(request) = stream.try_next().await.unwrap() {
391 match request {
392 ProtocolARequest::Foo { responder } => responder.send().unwrap(),
393 }
394 }
395 });
396
397 let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
400
401 let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
403 const NUM_REQUESTS: usize = 5;
404 let mut deadline = initial;
405 for _ in 0..NUM_REQUESTS {
406 proxy.foo().await.unwrap();
407 deadline += request_duration;
408 TestExecutor::advance_to(deadline).await;
409 assert!(stalled.clone().now_or_never().is_none());
410 }
411
412 deadline += idle_duration;
414 TestExecutor::advance_to(deadline).await;
415 let server_end = stalled.await;
416
417 task.await;
419
420 let client = proxy.into_channel().unwrap().into_zx_channel();
422 assert_eq!(
423 client.basic_info().unwrap().koid,
424 (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
425 );
426 }
427}