1use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{ready, Stream, StreamExt};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19pub fn until_stalled<RS: RequestStream>(
32 request_stream: RS,
33 debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35 let (sender, receiver) = oneshot::channel();
36 let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37 let _ = sender.send(channel);
38 });
39 (stream, receiver)
40}
41
42pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46 fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53 pub struct StallableRequestStream<RS, F> {
55 stream: Arc<Mutex<Option<RS>>>,
56 debounce_interval: MonotonicDuration,
57 unbind_callback: Option<F>,
58 #[pin]
59 timer: Option<fasync::Timer>,
60 }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64 pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67 Self {
68 stream: Arc::new(Mutex::new(Some(stream))),
69 debounce_interval,
70 unbind_callback: Some(unbind_callback),
71 timer: None,
72 }
73 }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77 StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79 fn control_handle(&self) -> WeakControlHandle<RS> {
80 WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81 }
82}
83
84pub struct WeakControlHandle<RS> {
85 stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90 RS: RequestStream,
91{
92 pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104 where
105 User: FnOnce(RS::ControlHandle) -> R,
106 {
107 self.stream
108 .upgrade()
109 .as_ref()
110 .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111 .flatten()
112 }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116 for StallableRequestStream<RS, F>
117{
118 type Item = <RS as Stream>::Item;
119
120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 let poll_result = self
122 .stream
123 .as_ref()
124 .lock()
125 .as_mut()
126 .expect("Stream already resolved")
127 .poll_next_unpin(cx);
128 let mut this = self.project();
129 match poll_result {
130 Poll::Ready(message) => {
131 this.timer.set(None);
132 if message.is_none() {
133 this.unbind_callback.take().unwrap()(None);
134 }
135 Poll::Ready(message)
136 }
137 Poll::Pending => {
138 let debounce_interval = *this.debounce_interval;
139 loop {
140 if this.timer.is_none() {
141 this.timer.set(Some(fasync::Timer::new(debounce_interval)));
142 }
143 ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144 this.timer.set(None);
145
146 let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149 match Arc::try_unwrap(inner) {
150 Ok(inner) => {
151 this.unbind_callback.take().unwrap()(Some(
152 inner.into_channel().into_zx_channel(),
153 ));
154 return Poll::Ready(None);
155 }
156 Err(inner) => {
157 *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160 }
161 }
162 }
163 }
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use assert_matches::assert_matches;
172 use fasync::TestExecutor;
173 use fidl::endpoints::Proxy;
174 use fidl::AsHandleRef;
175 use futures::{FutureExt, TryStreamExt};
176 use std::pin::pin;
177 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
178
179 #[fuchsia::test(allow_stalls = false)]
180 async fn no_message() {
181 let initial = fasync::MonotonicInstant::from_nanos(0);
182 TestExecutor::advance_to(initial).await;
183 const DURATION_NANOS: i64 = 1_000_000;
184 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
185
186 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
187 let (stream, stalled) = until_stalled(stream, idle_duration);
188 let mut stream = pin!(stream);
189
190 assert_matches!(
191 futures::join!(
192 stream.next(),
193 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
194 ),
195 (None, Ok(Some(_)))
196 );
197 }
198
199 #[fuchsia::test(allow_stalls = false)]
200 async fn strong_control_handle_blocks_stalling() {
201 let initial = fasync::MonotonicInstant::from_nanos(0);
202 TestExecutor::advance_to(initial).await;
203 const DURATION_NANOS: i64 = 1_000_000;
204 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
205
206 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
207 let (stream, mut stalled) = until_stalled(stream, idle_duration);
208
209 let strong_control_handle: fio::DirectoryControlHandle =
210 stream.control_handle().use_control_handle(|x| x).unwrap();
211
212 TestExecutor::advance_to(initial + idle_duration * 2).await;
214 let mut stream = pin!(stream.fuse());
215 futures::select! {
216 _ = stream.next() => unreachable!(),
217 _ = stalled => unreachable!(),
218 default => {},
219 }
220
221 drop(strong_control_handle);
223 assert_matches!(
224 futures::join!(
225 stream.next(),
226 TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
227 ),
228 (None, Ok(Some(_)))
229 );
230 }
231
232 #[fuchsia::test(allow_stalls = false)]
233 async fn weak_control_handle() {
234 let initial = fasync::MonotonicInstant::from_nanos(0);
235 TestExecutor::advance_to(initial).await;
236 const DURATION_NANOS: i64 = 1_000_000;
237 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
238
239 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240 let (stream, stalled) = until_stalled(stream, idle_duration);
241
242 let weak_control_handle = stream.control_handle();
244
245 let mut stream = pin!(stream);
246 assert_matches!(
247 futures::join!(
248 stream.next(),
249 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
250 ),
251 (None, Ok(Some(_)))
252 );
253
254 weak_control_handle.use_control_handle(|_| unreachable!());
255 }
256
257 #[fuchsia::test(allow_stalls = false)]
258 async fn one_message() {
259 let initial = fasync::MonotonicInstant::from_nanos(0);
260 TestExecutor::advance_to(initial).await;
261 const DURATION_NANOS: i64 = 1_000_000;
262 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
263
264 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
265 let (stream, stalled) = until_stalled(stream, idle_duration);
266
267 let mut stalled = pin!(stalled);
268 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
269
270 let _ = proxy.get_flags();
271
272 let mut stream = pin!(stream);
273 let mut message = pin!(stream.next());
274 let message = TestExecutor::poll_until_stalled(&mut message).await;
276 let Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) = message else {
277 panic!("Unexpected {message:?}");
278 };
279 responder.send(Ok(fio::Flags::empty())).unwrap();
280
281 TestExecutor::advance_to(initial + idle_duration * 2).await;
283 assert!(TestExecutor::poll_until_stalled(&mut stalled).await.is_pending());
284
285 let mut message = pin!(stream.next());
287 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
288 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
289
290 TestExecutor::advance_to(initial + idle_duration * 3).await;
291
292 assert_matches!(message.await, None);
294 assert_matches!(stalled.await, Ok(Some(_)));
295 }
296
297 #[fuchsia::test(allow_stalls = false)]
298 async fn pending_reply_blocks_stalling() {
299 let initial = fasync::MonotonicInstant::from_nanos(0);
300 TestExecutor::advance_to(initial).await;
301 const DURATION_NANOS: i64 = 1_000_000;
302 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
303
304 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
305 let (stream, mut stalled) = until_stalled(stream, idle_duration);
306 let mut stream = pin!(stream.fuse());
307
308 let _ = proxy.get_flags();
309
310 let message_with_pending_reply = stream.next().await.unwrap();
313 let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
314 else {
315 panic!("Unexpected {message_with_pending_reply:?}");
316 };
317
318 TestExecutor::advance_to(initial + idle_duration * 2).await;
320 futures::select! {
321 _ = stream.next() => unreachable!(),
322 _ = stalled => unreachable!(),
323 default => {},
324 }
325
326 responder.send(Ok(fio::Flags::empty())).unwrap();
328
329 assert_matches!(
331 futures::join!(
332 stream.next(),
333 TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
334 ),
335 (None, Ok(Some(_)))
336 );
337 }
338
339 #[fuchsia::test(allow_stalls = false)]
340 async fn completed_stream() {
341 let initial = fasync::MonotonicInstant::from_nanos(0);
342 TestExecutor::advance_to(initial).await;
343 const DURATION_NANOS: i64 = 1_000_000;
344 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
345
346 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
347 let (mut stream, stalled) = until_stalled(stream, idle_duration);
348
349 let mut stalled = pin!(stalled);
350 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
351
352 drop(proxy);
354
355 let mut stream = pin!(stream);
356
357 {
358 assert_matches!(stream.next().await, None);
360
361 drop(stream);
364 }
365
366 assert_matches!(stalled.await, Ok(None));
369 }
370
371 #[fuchsia::test(allow_stalls = false)]
375 async fn end_to_end() {
376 let initial = fasync::MonotonicInstant::from_nanos(0);
377 TestExecutor::advance_to(initial).await;
378 use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
379
380 const DURATION_NANOS: i64 = 40_000_000;
381 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
382 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
383 let (stream, stalled) = until_stalled(stream, idle_duration);
384
385 let task = fasync::Task::spawn(async move {
387 let mut stream = pin!(stream);
388 while let Some(request) = stream.try_next().await.unwrap() {
389 match request {
390 ProtocolARequest::Foo { responder } => responder.send().unwrap(),
391 }
392 }
393 });
394
395 let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
398
399 let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
401 const NUM_REQUESTS: usize = 5;
402 let mut deadline = initial;
403 for _ in 0..NUM_REQUESTS {
404 proxy.foo().await.unwrap();
405 deadline += request_duration;
406 TestExecutor::advance_to(deadline).await;
407 assert!(stalled.clone().now_or_never().is_none());
408 }
409
410 deadline += idle_duration;
412 TestExecutor::advance_to(deadline).await;
413 let server_end = stalled.await;
414
415 task.await;
417
418 let client = proxy.into_channel().unwrap().into_zx_channel();
420 assert_eq!(
421 client.basic_info().unwrap().koid,
422 (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
423 );
424 }
425}