detect_stall/
stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running FIDL request streams until stalled.
6
7use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{ready, Stream, StreamExt};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19/// [`until_stalled`] wraps a FIDL request stream of type [`RS`] into another
20/// stream yielding the same requests, but could complete prematurely if it
21/// has stalled, meaning:
22///
23/// - The underlying `request_stream` has no new messages.
24/// - There are no pending FIDL replies.
25/// - This condition has lasted for at least `debounce_interval`.
26///
27/// When that happens, the request stream will complete, and the returned future
28/// will complete with the server endpoint which has been unbound from the
29/// stream. The returned future will also complete if the request stream ended
30/// on its own without stalling.
31pub fn until_stalled<RS: RequestStream>(
32    request_stream: RS,
33    debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35    let (sender, receiver) = oneshot::channel();
36    let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37        let _ = sender.send(channel);
38    });
39    (stream, receiver)
40}
41
42/// Types that implement [`StreamAndControlHandle`] can stream out FIDL request
43/// messages and vend out weak control handles which lets you manage the connection
44/// and send events.
45pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46    /// Obtain a weak control handle. Different from [`RequestStream::control_handle`],
47    /// the weak control handle will not prevent unbinding. You may hold on to the
48    /// weak control handle and the request stream can still complete when stalled.
49    fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53    /// The stream returned from [`until_stalled`].
54    pub struct StallableRequestStream<RS, F> {
55        stream: Arc<Mutex<Option<RS>>>,
56        debounce_interval: MonotonicDuration,
57        unbind_callback: Option<F>,
58        #[pin]
59        timer: Option<fasync::Timer>,
60    }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64    /// Creates a new stallable request stream that will send the channel via `unbind_callback` when
65    /// stream is stalled.
66    pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67        Self {
68            stream: Arc::new(Mutex::new(Some(stream))),
69            debounce_interval,
70            unbind_callback: Some(unbind_callback),
71            timer: None,
72        }
73    }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77    StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79    fn control_handle(&self) -> WeakControlHandle<RS> {
80        WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81    }
82}
83
84pub struct WeakControlHandle<RS> {
85    stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90    RS: RequestStream,
91{
92    /// If the server endpoint is not unbound, calls `user` function with the
93    /// control handle and propagates the return value. Otherwise, returns `None`.
94    ///
95    /// Typically you can use it to send an event within the closure:
96    ///
97    /// ```
98    /// let control_handle = stream.control_handle();
99    /// let result = control_handle.use_control_handle(
100    ///     |control_handle| control_handle.send_my_event());
101    /// ```
102    ///
103    pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104    where
105        User: FnOnce(RS::ControlHandle) -> R,
106    {
107        self.stream
108            .upgrade()
109            .as_ref()
110            .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111            .flatten()
112    }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116    for StallableRequestStream<RS, F>
117{
118    type Item = <RS as Stream>::Item;
119
120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        let poll_result = self
122            .stream
123            .as_ref()
124            .lock()
125            .as_mut()
126            .expect("Stream already resolved")
127            .poll_next_unpin(cx);
128        let mut this = self.project();
129        match poll_result {
130            Poll::Ready(message) => {
131                this.timer.set(None);
132                if message.is_none() {
133                    this.unbind_callback.take().unwrap()(None);
134                }
135                Poll::Ready(message)
136            }
137            Poll::Pending => {
138                let debounce_interval = *this.debounce_interval;
139                loop {
140                    if this.timer.is_none() {
141                        this.timer.set(Some(fasync::Timer::new(debounce_interval)));
142                    }
143                    ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144                    this.timer.set(None);
145
146                    // Try and unbind, which will fail if there are outstanding responders or
147                    // control handles.
148                    let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149                    match Arc::try_unwrap(inner) {
150                        Ok(inner) => {
151                            this.unbind_callback.take().unwrap()(Some(
152                                inner.into_channel().into_zx_channel(),
153                            ));
154                            return Poll::Ready(None);
155                        }
156                        Err(inner) => {
157                            // We can't unbind because there are outstanding responders or control
158                            // handles, so we'll try again after another debounce interval.
159                            *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160                        }
161                    }
162                }
163            }
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use assert_matches::assert_matches;
172    use fasync::TestExecutor;
173    use fidl::endpoints::Proxy;
174    use fidl::AsHandleRef;
175    use futures::{FutureExt, TryStreamExt};
176    use std::pin::pin;
177    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
178
179    #[fuchsia::test(allow_stalls = false)]
180    async fn no_message() {
181        let initial = fasync::MonotonicInstant::from_nanos(0);
182        TestExecutor::advance_to(initial).await;
183        const DURATION_NANOS: i64 = 1_000_000;
184        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
185
186        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
187        let (stream, stalled) = until_stalled(stream, idle_duration);
188        let mut stream = pin!(stream);
189
190        assert_matches!(
191            futures::join!(
192                stream.next(),
193                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
194            ),
195            (None, Ok(Some(_)))
196        );
197    }
198
199    #[fuchsia::test(allow_stalls = false)]
200    async fn strong_control_handle_blocks_stalling() {
201        let initial = fasync::MonotonicInstant::from_nanos(0);
202        TestExecutor::advance_to(initial).await;
203        const DURATION_NANOS: i64 = 1_000_000;
204        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
205
206        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
207        let (stream, mut stalled) = until_stalled(stream, idle_duration);
208
209        let strong_control_handle: fio::DirectoryControlHandle =
210            stream.control_handle().use_control_handle(|x| x).unwrap();
211
212        // The connection does not stall, because there is `strong_control_handle`.
213        TestExecutor::advance_to(initial + idle_duration * 2).await;
214        let mut stream = pin!(stream.fuse());
215        futures::select! {
216            _ = stream.next() => unreachable!(),
217            _ = stalled => unreachable!(),
218            default => {},
219        }
220
221        // Once we drop it then the connection can stall.
222        drop(strong_control_handle);
223        assert_matches!(
224            futures::join!(
225                stream.next(),
226                TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
227            ),
228            (None, Ok(Some(_)))
229        );
230    }
231
232    #[fuchsia::test(allow_stalls = false)]
233    async fn weak_control_handle() {
234        let initial = fasync::MonotonicInstant::from_nanos(0);
235        TestExecutor::advance_to(initial).await;
236        const DURATION_NANOS: i64 = 1_000_000;
237        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
238
239        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240        let (stream, stalled) = until_stalled(stream, idle_duration);
241
242        // Just getting a weak control handle should not block the connection from stalling.
243        let weak_control_handle = stream.control_handle();
244
245        let mut stream = pin!(stream);
246        assert_matches!(
247            futures::join!(
248                stream.next(),
249                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
250            ),
251            (None, Ok(Some(_)))
252        );
253
254        weak_control_handle.use_control_handle(|_| unreachable!());
255    }
256
257    #[fuchsia::test(allow_stalls = false)]
258    async fn one_message() {
259        let initial = fasync::MonotonicInstant::from_nanos(0);
260        TestExecutor::advance_to(initial).await;
261        const DURATION_NANOS: i64 = 1_000_000;
262        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
263
264        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
265        let (stream, stalled) = until_stalled(stream, idle_duration);
266
267        let mut stalled = pin!(stalled);
268        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
269
270        let _ = proxy.get_flags();
271
272        let mut stream = pin!(stream);
273        let mut message = pin!(stream.next());
274        // Reply to the request so that the stream doesn't have any pending replies.
275        let message = TestExecutor::poll_until_stalled(&mut message).await;
276        let Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) = message else {
277            panic!("Unexpected {message:?}");
278        };
279        responder.send(Ok(fio::Flags::empty())).unwrap();
280
281        // The stream hasn't stalled yet.
282        TestExecutor::advance_to(initial + idle_duration * 2).await;
283        assert!(TestExecutor::poll_until_stalled(&mut stalled).await.is_pending());
284
285        // Poll the stream such that it is stalled.
286        let mut message = pin!(stream.next());
287        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
288        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
289
290        TestExecutor::advance_to(initial + idle_duration * 3).await;
291
292        // Now the the stream should be finished, because the channel has been unbound.
293        assert_matches!(message.await, None);
294        assert_matches!(stalled.await, Ok(Some(_)));
295    }
296
297    #[fuchsia::test(allow_stalls = false)]
298    async fn pending_reply_blocks_stalling() {
299        let initial = fasync::MonotonicInstant::from_nanos(0);
300        TestExecutor::advance_to(initial).await;
301        const DURATION_NANOS: i64 = 1_000_000;
302        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
303
304        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
305        let (stream, mut stalled) = until_stalled(stream, idle_duration);
306        let mut stream = pin!(stream.fuse());
307
308        let _ = proxy.get_flags();
309
310        // Do not reply to the request, but hold on to the responder, so that there is a
311        // pending reply in the connection.
312        let message_with_pending_reply = stream.next().await.unwrap();
313        let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
314        else {
315            panic!("Unexpected {message_with_pending_reply:?}");
316        };
317
318        // The connection does not stall, because there is a pending reply.
319        TestExecutor::advance_to(initial + idle_duration * 2).await;
320        futures::select! {
321            _ = stream.next() => unreachable!(),
322            _ = stalled => unreachable!(),
323            default => {},
324        }
325
326        // Now we resolve the pending reply.
327        responder.send(Ok(fio::Flags::empty())).unwrap();
328
329        // The connection should stall.
330        assert_matches!(
331            futures::join!(
332                stream.next(),
333                TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
334            ),
335            (None, Ok(Some(_)))
336        );
337    }
338
339    #[fuchsia::test(allow_stalls = false)]
340    async fn completed_stream() {
341        let initial = fasync::MonotonicInstant::from_nanos(0);
342        TestExecutor::advance_to(initial).await;
343        const DURATION_NANOS: i64 = 1_000_000;
344        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
345
346        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
347        let (mut stream, stalled) = until_stalled(stream, idle_duration);
348
349        let mut stalled = pin!(stalled);
350        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
351
352        // Close the proxy such that the stream completes.
353        drop(proxy);
354
355        let mut stream = pin!(stream);
356
357        {
358            // Read the `None` from the stream.
359            assert_matches!(stream.next().await, None);
360
361            // In practice the async tasks reading from the stream will exit, thus
362            // dropping the stream. We'll emulate that here.
363            drop(stream);
364        }
365
366        // Now the future should finish with `None` because the connection has
367        // terminated without stalling.
368        assert_matches!(stalled.await, Ok(None));
369    }
370
371    /// Simulate what would happen when a component serves a FIDL stream that's been
372    /// wrapped in `until_stalled`, and thus will complete and give the unbound channel
373    /// back to the user, who can then pass it back to `component_manager` in practice.
374    #[fuchsia::test(allow_stalls = false)]
375    async fn end_to_end() {
376        let initial = fasync::MonotonicInstant::from_nanos(0);
377        TestExecutor::advance_to(initial).await;
378        use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
379
380        const DURATION_NANOS: i64 = 40_000_000;
381        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
382        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
383        let (stream, stalled) = until_stalled(stream, idle_duration);
384
385        // Launch a task that serves the stream.
386        let task = fasync::Task::spawn(async move {
387            let mut stream = pin!(stream);
388            while let Some(request) = stream.try_next().await.unwrap() {
389                match request {
390                    ProtocolARequest::Foo { responder } => responder.send().unwrap(),
391                }
392            }
393        });
394
395        // Launch another task to await the `stalled` future and also let us
396        // check for it synchronously.
397        let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
398
399        // Make some requests at intervals half the idle duration. Stall should not happen.
400        let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
401        const NUM_REQUESTS: usize = 5;
402        let mut deadline = initial;
403        for _ in 0..NUM_REQUESTS {
404            proxy.foo().await.unwrap();
405            deadline += request_duration;
406            TestExecutor::advance_to(deadline).await;
407            assert!(stalled.clone().now_or_never().is_none());
408        }
409
410        // Wait for stalling.
411        deadline += idle_duration;
412        TestExecutor::advance_to(deadline).await;
413        let server_end = stalled.await;
414
415        // Ensure the server task can stop (by observing the completed stream).
416        task.await;
417
418        // Check that this channel was the original server endpoint.
419        let client = proxy.into_channel().unwrap().into_zx_channel();
420        assert_eq!(
421            client.basic_info().unwrap().koid,
422            (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
423        );
424    }
425}