1use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19pub fn until_stalled<RS: RequestStream>(
32 request_stream: RS,
33 debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35 let (sender, receiver) = oneshot::channel();
36 let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37 let _ = sender.send(channel);
38 });
39 (stream, receiver)
40}
41
42pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46 fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53 pub struct StallableRequestStream<RS, F> {
55 stream: Arc<Mutex<Option<RS>>>,
56 debounce_interval: MonotonicDuration,
57 unbind_callback: Option<F>,
58 #[pin]
59 timer: Option<fasync::Timer>,
60 }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64 pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67 Self {
68 stream: Arc::new(Mutex::new(Some(stream))),
69 debounce_interval,
70 unbind_callback: Some(unbind_callback),
71 timer: None,
72 }
73 }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77 StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79 fn control_handle(&self) -> WeakControlHandle<RS> {
80 WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81 }
82}
83
84pub struct WeakControlHandle<RS> {
85 stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90 RS: RequestStream,
91{
92 pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104 where
105 User: FnOnce(RS::ControlHandle) -> R,
106 {
107 self.stream
108 .upgrade()
109 .as_ref()
110 .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111 .flatten()
112 }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116 for StallableRequestStream<RS, F>
117{
118 type Item = <RS as Stream>::Item;
119
120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 let poll_result = self
122 .stream
123 .as_ref()
124 .lock()
125 .as_mut()
126 .expect("Stream already resolved")
127 .poll_next_unpin(cx);
128 let mut this = self.project();
129 match poll_result {
130 Poll::Ready(message) => {
131 this.timer.set(None);
132 if message.is_none() {
133 this.unbind_callback.take().unwrap()(None);
134 }
135 Poll::Ready(message)
136 }
137 Poll::Pending => {
138 let debounce_interval = *this.debounce_interval;
139 loop {
140 if this.timer.is_none() {
141 this.timer.set(Some(fasync::Timer::new(debounce_interval)));
142 }
143 ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144 this.timer.set(None);
145
146 let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149 match Arc::try_unwrap(inner) {
150 Ok(inner) => {
151 this.unbind_callback.take().unwrap()(Some(
152 inner.into_channel().into_zx_channel(),
153 ));
154 return Poll::Ready(None);
155 }
156 Err(inner) => {
157 *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160 }
161 }
162 }
163 }
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use assert_matches::assert_matches;
172 use fasync::TestExecutor;
173 use fidl::endpoints::Proxy;
174 use futures::{FutureExt, TryStreamExt};
175 use std::pin::pin;
176 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
177
178 #[fuchsia::test(allow_stalls = false)]
179 async fn no_message() {
180 let initial = fasync::MonotonicInstant::from_nanos(0);
181 TestExecutor::advance_to(initial).await;
182 const DURATION_NANOS: i64 = 1_000_000;
183 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
184
185 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
186 let (stream, stalled) = until_stalled(stream, idle_duration);
187 let mut stream = pin!(stream);
188
189 assert_matches!(
190 futures::join!(
191 stream.next(),
192 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
193 ),
194 (None, Ok(Some(_)))
195 );
196 }
197
198 #[fuchsia::test(allow_stalls = false)]
199 async fn strong_control_handle_blocks_stalling() {
200 let initial = fasync::MonotonicInstant::from_nanos(0);
201 TestExecutor::advance_to(initial).await;
202 const DURATION_NANOS: i64 = 1_000_000;
203 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
204
205 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
206 let (stream, mut stalled) = until_stalled(stream, idle_duration);
207
208 let strong_control_handle: fio::DirectoryControlHandle =
209 stream.control_handle().use_control_handle(|x| x).unwrap();
210
211 TestExecutor::advance_to(initial + idle_duration * 2).await;
213 let mut stream = pin!(stream.fuse());
214 futures::select! {
215 _ = stream.next() => unreachable!(),
216 _ = stalled => unreachable!(),
217 default => {},
218 }
219
220 drop(strong_control_handle);
222 assert_matches!(
223 futures::join!(
224 stream.next(),
225 TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
226 ),
227 (None, Ok(Some(_)))
228 );
229 }
230
231 #[fuchsia::test(allow_stalls = false)]
232 async fn weak_control_handle() {
233 let initial = fasync::MonotonicInstant::from_nanos(0);
234 TestExecutor::advance_to(initial).await;
235 const DURATION_NANOS: i64 = 1_000_000;
236 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
237
238 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
239 let (stream, stalled) = until_stalled(stream, idle_duration);
240
241 let weak_control_handle = stream.control_handle();
243
244 let mut stream = pin!(stream);
245 assert_matches!(
246 futures::join!(
247 stream.next(),
248 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
249 ),
250 (None, Ok(Some(_)))
251 );
252
253 weak_control_handle.use_control_handle(|_| unreachable!());
254 }
255
256 #[fuchsia::test(allow_stalls = false)]
257 async fn one_message() {
258 let initial = fasync::MonotonicInstant::from_nanos(0);
259 TestExecutor::advance_to(initial).await;
260 const DURATION_NANOS: i64 = 1_000_000;
261 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
262
263 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
264 let (stream, stalled) = until_stalled(stream, idle_duration);
265
266 let mut stalled = pin!(stalled);
267 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
268
269 let _ = proxy.get_flags();
270
271 let mut stream = pin!(stream);
272 let mut message = pin!(stream.next());
273 let message = TestExecutor::poll_until_stalled(&mut message).await;
275 let Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) = message else {
276 panic!("Unexpected {message:?}");
277 };
278 responder.send(Ok(fio::Flags::empty())).unwrap();
279
280 TestExecutor::advance_to(initial + idle_duration * 2).await;
282 assert!(TestExecutor::poll_until_stalled(&mut stalled).await.is_pending());
283
284 let mut message = pin!(stream.next());
286 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
287 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
288
289 TestExecutor::advance_to(initial + idle_duration * 3).await;
290
291 assert_matches!(message.await, None);
293 assert_matches!(stalled.await, Ok(Some(_)));
294 }
295
296 #[fuchsia::test(allow_stalls = false)]
297 async fn pending_reply_blocks_stalling() {
298 let initial = fasync::MonotonicInstant::from_nanos(0);
299 TestExecutor::advance_to(initial).await;
300 const DURATION_NANOS: i64 = 1_000_000;
301 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
302
303 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
304 let (stream, mut stalled) = until_stalled(stream, idle_duration);
305 let mut stream = pin!(stream.fuse());
306
307 let _ = proxy.get_flags();
308
309 let message_with_pending_reply = stream.next().await.unwrap();
312 let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
313 else {
314 panic!("Unexpected {message_with_pending_reply:?}");
315 };
316
317 TestExecutor::advance_to(initial + idle_duration * 2).await;
319 futures::select! {
320 _ = stream.next() => unreachable!(),
321 _ = stalled => unreachable!(),
322 default => {},
323 }
324
325 responder.send(Ok(fio::Flags::empty())).unwrap();
327
328 assert_matches!(
330 futures::join!(
331 stream.next(),
332 TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
333 ),
334 (None, Ok(Some(_)))
335 );
336 }
337
338 #[fuchsia::test(allow_stalls = false)]
339 async fn completed_stream() {
340 let initial = fasync::MonotonicInstant::from_nanos(0);
341 TestExecutor::advance_to(initial).await;
342 const DURATION_NANOS: i64 = 1_000_000;
343 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
344
345 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
346 let (stream, stalled) = until_stalled(stream, idle_duration);
347
348 let mut stalled = pin!(stalled);
349 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
350
351 drop(proxy);
353
354 let mut stream = pin!(stream);
355
356 {
357 assert_matches!(stream.next().await, None);
359
360 drop(stream);
363 }
364
365 assert_matches!(stalled.await, Ok(None));
368 }
369
370 #[fuchsia::test(allow_stalls = false)]
374 async fn end_to_end() {
375 let initial = fasync::MonotonicInstant::from_nanos(0);
376 TestExecutor::advance_to(initial).await;
377 use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
378
379 const DURATION_NANOS: i64 = 40_000_000;
380 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
381 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
382 let (stream, stalled) = until_stalled(stream, idle_duration);
383
384 let task = fasync::Task::spawn(async move {
386 let mut stream = pin!(stream);
387 while let Some(request) = stream.try_next().await.unwrap() {
388 match request {
389 ProtocolARequest::Foo { responder } => responder.send().unwrap(),
390 }
391 }
392 });
393
394 let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
397
398 let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
400 const NUM_REQUESTS: usize = 5;
401 let mut deadline = initial;
402 for _ in 0..NUM_REQUESTS {
403 proxy.foo().await.unwrap();
404 deadline += request_duration;
405 TestExecutor::advance_to(deadline).await;
406 assert!(stalled.clone().now_or_never().is_none());
407 }
408
409 deadline += idle_duration;
411 TestExecutor::advance_to(deadline).await;
412 let server_end = stalled.await;
413
414 task.await;
416
417 let client = proxy.into_channel().unwrap().into_zx_channel();
419 assert_eq!(
420 client.basic_info().unwrap().koid,
421 (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
422 );
423 }
424}