1use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19pub fn until_stalled<RS: RequestStream>(
32 request_stream: RS,
33 debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35 let (sender, receiver) = oneshot::channel();
36 let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37 let _ = sender.send(channel);
38 });
39 (stream, receiver)
40}
41
42pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46 fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53 pub struct StallableRequestStream<RS, F> {
55 stream: Arc<Mutex<Option<RS>>>,
56 debounce_interval: MonotonicDuration,
57 unbind_callback: Option<F>,
58 #[pin]
59 timer: Option<fasync::Timer>,
60 }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64 pub fn new(stream: RS, mut debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67 if debounce_interval < MonotonicDuration::from_millis(1) {
68 debounce_interval = MonotonicDuration::from_millis(1);
69 }
70 Self {
71 stream: Arc::new(Mutex::new(Some(stream))),
72 debounce_interval,
73 unbind_callback: Some(unbind_callback),
74 timer: None,
75 }
76 }
77}
78
79impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
80 StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
81{
82 fn control_handle(&self) -> WeakControlHandle<RS> {
83 WeakControlHandle { stream: Arc::downgrade(&self.stream) }
84 }
85}
86
87pub struct WeakControlHandle<RS> {
88 stream: Weak<Mutex<Option<RS>>>,
89}
90
91impl<RS> WeakControlHandle<RS>
92where
93 RS: RequestStream,
94{
95 pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
107 where
108 User: FnOnce(RS::ControlHandle) -> R,
109 {
110 self.stream
111 .upgrade()
112 .as_ref()
113 .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
114 .flatten()
115 }
116}
117
118impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
119 for StallableRequestStream<RS, F>
120{
121 type Item = <RS as Stream>::Item;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let poll_result = self
125 .stream
126 .as_ref()
127 .lock()
128 .as_mut()
129 .expect("Stream already resolved")
130 .poll_next_unpin(cx);
131 let mut this = self.project();
132 match poll_result {
133 Poll::Ready(message) => {
134 if message.is_none() {
135 this.unbind_callback.take().unwrap()(None);
136 } else {
137 this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
138 }
139 Poll::Ready(message)
140 }
141 Poll::Pending => {
142 if this.timer.is_none() {
143 this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
144 }
145 ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
146 this.timer.set(None);
147
148 let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
151 match Arc::try_unwrap(inner) {
152 Ok(inner) => {
153 this.unbind_callback.take().unwrap()(Some(
154 inner.into_channel().into_zx_channel(),
155 ));
156 Poll::Ready(None)
157 }
158 Err(inner) => {
159 *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
162 this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
163 if this.timer.as_mut().as_pin_mut().unwrap().poll(cx).is_ready()
164 {
165 cx.waker().wake_by_ref();
166 }
167 Poll::Pending
168 }
169 }
170 }
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use assert_matches::assert_matches;
179 use fasync::TestExecutor;
180 use fidl::endpoints::Proxy;
181 use fidl_fuchsia_io as fio;
182 use fuchsia_async as fasync;
183 use futures::{FutureExt, TryStreamExt};
184 use std::pin::pin;
185
186 #[fuchsia::test(allow_stalls = false)]
187 async fn no_message() {
188 let initial = fasync::MonotonicInstant::from_nanos(0);
189 TestExecutor::advance_to(initial).await;
190 const DURATION_NANOS: i64 = 1_000_000;
191 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
192
193 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
194 let (stream, stalled) = until_stalled(stream, idle_duration);
195 let mut stream = pin!(stream);
196
197 assert_matches!(
198 futures::join!(
199 stream.next(),
200 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
201 ),
202 (None, Ok(Some(_)))
203 );
204 }
205
206 #[fuchsia::test(allow_stalls = false)]
207 async fn strong_control_handle_blocks_stalling() {
208 let initial = fasync::MonotonicInstant::from_nanos(0);
209 TestExecutor::advance_to(initial).await;
210 const DURATION_NANOS: i64 = 1_000_000;
211 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
212
213 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
214 let (stream, mut stalled) = until_stalled(stream, idle_duration);
215
216 let strong_control_handle: fio::DirectoryControlHandle =
217 stream.control_handle().use_control_handle(|x| x).unwrap();
218
219 TestExecutor::advance_to(initial + idle_duration * 2).await;
221 let mut stream = pin!(stream.fuse());
222 futures::select! {
223 _ = stream.next() => unreachable!(),
224 _ = stalled => unreachable!(),
225 default => {},
226 }
227
228 drop(strong_control_handle);
230 assert_matches!(
231 futures::join!(
232 stream.next(),
233 TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
234 ),
235 (None, Ok(Some(_)))
236 );
237 }
238
239 #[fuchsia::test(allow_stalls = false)]
240 async fn weak_control_handle() {
241 let initial = fasync::MonotonicInstant::from_nanos(0);
242 TestExecutor::advance_to(initial).await;
243 const DURATION_NANOS: i64 = 1_000_000;
244 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
245
246 let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
247 let (stream, stalled) = until_stalled(stream, idle_duration);
248
249 let weak_control_handle = stream.control_handle();
251
252 let mut stream = pin!(stream);
253 assert_matches!(
254 futures::join!(
255 stream.next(),
256 TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
257 ),
258 (None, Ok(Some(_)))
259 );
260
261 weak_control_handle.use_control_handle(|_| unreachable!());
262 }
263
264 #[fuchsia::test(allow_stalls = false)]
265 async fn one_message() {
266 let initial = fasync::MonotonicInstant::from_nanos(0);
267 TestExecutor::advance_to(initial).await;
268 const DURATION_NANOS: i64 = 1_000_000;
269 let debounce_interval = MonotonicDuration::from_nanos(DURATION_NANOS);
270
271 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
272 let (stream, stalled) = until_stalled(stream, debounce_interval);
273 let mut stream = pin!(stream);
274 let mut stalled = pin!(stalled);
275
276 let mut message = pin!(stream.next());
278 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
279 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
280
281 TestExecutor::advance_to(initial + debounce_interval / 2).await;
283 let _ = proxy.get_flags();
284 assert_matches!(
285 TestExecutor::poll_until_stalled(&mut message).await,
286 Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) => {
287 responder.send(Ok(fio::Flags::empty())).unwrap();
289 }
290 );
291 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
292
293 TestExecutor::advance_to(initial + debounce_interval).await;
296 let mut message = pin!(stream.next());
297 assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
298 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
299
300 TestExecutor::advance_to(initial + debounce_interval / 2 + debounce_interval).await;
302 assert_matches!(message.await, None);
303 assert_matches!(stalled.await, Ok(Some(_)));
304 }
305
306 #[fuchsia::test(allow_stalls = false)]
307 async fn pending_reply_blocks_stalling() {
308 let initial = fasync::MonotonicInstant::from_nanos(0);
309 TestExecutor::advance_to(initial).await;
310 const DURATION_NANOS: i64 = 1_000_000;
311 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
312
313 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
314 let (stream, mut stalled) = until_stalled(stream, idle_duration);
315 let mut stream = pin!(stream.fuse());
316
317 let _ = proxy.get_flags();
318
319 let message_with_pending_reply = stream.next().await.unwrap();
322 let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
323 else {
324 panic!("Unexpected {message_with_pending_reply:?}");
325 };
326
327 TestExecutor::advance_to(initial + idle_duration * 2).await;
329 futures::select! {
330 _ = stream.next() => unreachable!(),
331 _ = stalled => unreachable!(),
332 default => {},
333 }
334
335 responder.send(Ok(fio::Flags::empty())).unwrap();
337
338 assert_matches!(
340 futures::join!(
341 stream.next(),
342 TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
343 ),
344 (None, Ok(Some(_)))
345 );
346 }
347
348 #[fuchsia::test(allow_stalls = false)]
349 async fn completed_stream() {
350 let initial = fasync::MonotonicInstant::from_nanos(0);
351 TestExecutor::advance_to(initial).await;
352 const DURATION_NANOS: i64 = 1_000_000;
353 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
354
355 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
356 let (stream, stalled) = until_stalled(stream, idle_duration);
357
358 let mut stalled = pin!(stalled);
359 assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
360
361 drop(proxy);
363
364 let mut stream = pin!(stream);
365
366 {
367 assert_matches!(stream.next().await, None);
369
370 drop(stream);
373 }
374
375 assert_matches!(stalled.await, Ok(None));
378 }
379
380 #[fuchsia::test(allow_stalls = false)]
384 async fn end_to_end() {
385 let initial = fasync::MonotonicInstant::from_nanos(0);
386 TestExecutor::advance_to(initial).await;
387 use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
388
389 const DURATION_NANOS: i64 = 40_000_000;
390 let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
391 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
392 let (stream, stalled) = until_stalled(stream, idle_duration);
393
394 let task = fasync::Task::spawn(async move {
396 let mut stream = pin!(stream);
397 while let Some(request) = stream.try_next().await.unwrap() {
398 match request {
399 ProtocolARequest::Foo { responder } => responder.send().unwrap(),
400 }
401 }
402 });
403
404 let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
407
408 let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
410 const NUM_REQUESTS: usize = 5;
411 let mut deadline = initial;
412 for _ in 0..NUM_REQUESTS {
413 proxy.foo().await.unwrap();
414 deadline += request_duration;
415 TestExecutor::advance_to(deadline).await;
416 assert!(stalled.clone().now_or_never().is_none());
417 }
418
419 deadline += idle_duration;
421 TestExecutor::advance_to(deadline).await;
422 let server_end = stalled.await;
423
424 task.await;
426
427 let client = proxy.into_channel().unwrap().into_zx_channel();
429 assert_eq!(
430 client.basic_info().unwrap().koid,
431 (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
432 );
433 }
434}