detect_stall/
stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running FIDL request streams until stalled.
6
7use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19/// [`until_stalled`] wraps a FIDL request stream of type [`RS`] into another
20/// stream yielding the same requests, but could complete prematurely if it
21/// has stalled, meaning:
22///
23/// - The underlying `request_stream` has no new messages.
24/// - There are no pending FIDL replies.
25/// - This condition has lasted for at least `debounce_interval`.
26///
27/// When that happens, the request stream will complete, and the returned future
28/// will complete with the server endpoint which has been unbound from the
29/// stream. The returned future will also complete if the request stream ended
30/// on its own without stalling.
31pub fn until_stalled<RS: RequestStream>(
32    request_stream: RS,
33    debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35    let (sender, receiver) = oneshot::channel();
36    let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37        let _ = sender.send(channel);
38    });
39    (stream, receiver)
40}
41
42/// Types that implement [`StreamAndControlHandle`] can stream out FIDL request
43/// messages and vend out weak control handles which lets you manage the connection
44/// and send events.
45pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46    /// Obtain a weak control handle. Different from [`RequestStream::control_handle`],
47    /// the weak control handle will not prevent unbinding. You may hold on to the
48    /// weak control handle and the request stream can still complete when stalled.
49    fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53    /// The stream returned from [`until_stalled`].
54    pub struct StallableRequestStream<RS, F> {
55        stream: Arc<Mutex<Option<RS>>>,
56        debounce_interval: MonotonicDuration,
57        unbind_callback: Option<F>,
58        #[pin]
59        timer: Option<fasync::Timer>,
60    }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64    /// Creates a new stallable request stream that will send the channel via `unbind_callback` when
65    /// stream is stalled.
66    pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67        Self {
68            stream: Arc::new(Mutex::new(Some(stream))),
69            debounce_interval,
70            unbind_callback: Some(unbind_callback),
71            timer: None,
72        }
73    }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77    StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79    fn control_handle(&self) -> WeakControlHandle<RS> {
80        WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81    }
82}
83
84pub struct WeakControlHandle<RS> {
85    stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90    RS: RequestStream,
91{
92    /// If the server endpoint is not unbound, calls `user` function with the
93    /// control handle and propagates the return value. Otherwise, returns `None`.
94    ///
95    /// Typically you can use it to send an event within the closure:
96    ///
97    /// ```
98    /// let control_handle = stream.control_handle();
99    /// let result = control_handle.use_control_handle(
100    ///     |control_handle| control_handle.send_my_event());
101    /// ```
102    ///
103    pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104    where
105        User: FnOnce(RS::ControlHandle) -> R,
106    {
107        self.stream
108            .upgrade()
109            .as_ref()
110            .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111            .flatten()
112    }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116    for StallableRequestStream<RS, F>
117{
118    type Item = <RS as Stream>::Item;
119
120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        let poll_result = self
122            .stream
123            .as_ref()
124            .lock()
125            .as_mut()
126            .expect("Stream already resolved")
127            .poll_next_unpin(cx);
128        let mut this = self.project();
129        match poll_result {
130            Poll::Ready(message) => {
131                this.timer.set(None);
132                if message.is_none() {
133                    this.unbind_callback.take().unwrap()(None);
134                }
135                Poll::Ready(message)
136            }
137            Poll::Pending => {
138                let debounce_interval = *this.debounce_interval;
139                loop {
140                    if this.timer.is_none() {
141                        this.timer.set(Some(fasync::Timer::new(debounce_interval)));
142                    }
143                    ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144                    this.timer.set(None);
145
146                    // Try and unbind, which will fail if there are outstanding responders or
147                    // control handles.
148                    let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149                    match Arc::try_unwrap(inner) {
150                        Ok(inner) => {
151                            this.unbind_callback.take().unwrap()(Some(
152                                inner.into_channel().into_zx_channel(),
153                            ));
154                            return Poll::Ready(None);
155                        }
156                        Err(inner) => {
157                            // We can't unbind because there are outstanding responders or control
158                            // handles, so we'll try again after another debounce interval.
159                            *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160                        }
161                    }
162                }
163            }
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use assert_matches::assert_matches;
172    use fasync::TestExecutor;
173    use fidl::endpoints::Proxy;
174    use futures::{FutureExt, TryStreamExt};
175    use std::pin::pin;
176    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
177
178    #[fuchsia::test(allow_stalls = false)]
179    async fn no_message() {
180        let initial = fasync::MonotonicInstant::from_nanos(0);
181        TestExecutor::advance_to(initial).await;
182        const DURATION_NANOS: i64 = 1_000_000;
183        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
184
185        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
186        let (stream, stalled) = until_stalled(stream, idle_duration);
187        let mut stream = pin!(stream);
188
189        assert_matches!(
190            futures::join!(
191                stream.next(),
192                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
193            ),
194            (None, Ok(Some(_)))
195        );
196    }
197
198    #[fuchsia::test(allow_stalls = false)]
199    async fn strong_control_handle_blocks_stalling() {
200        let initial = fasync::MonotonicInstant::from_nanos(0);
201        TestExecutor::advance_to(initial).await;
202        const DURATION_NANOS: i64 = 1_000_000;
203        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
204
205        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
206        let (stream, mut stalled) = until_stalled(stream, idle_duration);
207
208        let strong_control_handle: fio::DirectoryControlHandle =
209            stream.control_handle().use_control_handle(|x| x).unwrap();
210
211        // The connection does not stall, because there is `strong_control_handle`.
212        TestExecutor::advance_to(initial + idle_duration * 2).await;
213        let mut stream = pin!(stream.fuse());
214        futures::select! {
215            _ = stream.next() => unreachable!(),
216            _ = stalled => unreachable!(),
217            default => {},
218        }
219
220        // Once we drop it then the connection can stall.
221        drop(strong_control_handle);
222        assert_matches!(
223            futures::join!(
224                stream.next(),
225                TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
226            ),
227            (None, Ok(Some(_)))
228        );
229    }
230
231    #[fuchsia::test(allow_stalls = false)]
232    async fn weak_control_handle() {
233        let initial = fasync::MonotonicInstant::from_nanos(0);
234        TestExecutor::advance_to(initial).await;
235        const DURATION_NANOS: i64 = 1_000_000;
236        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
237
238        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
239        let (stream, stalled) = until_stalled(stream, idle_duration);
240
241        // Just getting a weak control handle should not block the connection from stalling.
242        let weak_control_handle = stream.control_handle();
243
244        let mut stream = pin!(stream);
245        assert_matches!(
246            futures::join!(
247                stream.next(),
248                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
249            ),
250            (None, Ok(Some(_)))
251        );
252
253        weak_control_handle.use_control_handle(|_| unreachable!());
254    }
255
256    #[fuchsia::test(allow_stalls = false)]
257    async fn one_message() {
258        let initial = fasync::MonotonicInstant::from_nanos(0);
259        TestExecutor::advance_to(initial).await;
260        const DURATION_NANOS: i64 = 1_000_000;
261        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
262
263        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
264        let (stream, stalled) = until_stalled(stream, idle_duration);
265
266        let mut stalled = pin!(stalled);
267        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
268
269        let _ = proxy.get_flags();
270
271        let mut stream = pin!(stream);
272        let mut message = pin!(stream.next());
273        // Reply to the request so that the stream doesn't have any pending replies.
274        let message = TestExecutor::poll_until_stalled(&mut message).await;
275        let Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) = message else {
276            panic!("Unexpected {message:?}");
277        };
278        responder.send(Ok(fio::Flags::empty())).unwrap();
279
280        // The stream hasn't stalled yet.
281        TestExecutor::advance_to(initial + idle_duration * 2).await;
282        assert!(TestExecutor::poll_until_stalled(&mut stalled).await.is_pending());
283
284        // Poll the stream such that it is stalled.
285        let mut message = pin!(stream.next());
286        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
287        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
288
289        TestExecutor::advance_to(initial + idle_duration * 3).await;
290
291        // Now the the stream should be finished, because the channel has been unbound.
292        assert_matches!(message.await, None);
293        assert_matches!(stalled.await, Ok(Some(_)));
294    }
295
296    #[fuchsia::test(allow_stalls = false)]
297    async fn pending_reply_blocks_stalling() {
298        let initial = fasync::MonotonicInstant::from_nanos(0);
299        TestExecutor::advance_to(initial).await;
300        const DURATION_NANOS: i64 = 1_000_000;
301        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
302
303        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
304        let (stream, mut stalled) = until_stalled(stream, idle_duration);
305        let mut stream = pin!(stream.fuse());
306
307        let _ = proxy.get_flags();
308
309        // Do not reply to the request, but hold on to the responder, so that there is a
310        // pending reply in the connection.
311        let message_with_pending_reply = stream.next().await.unwrap();
312        let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
313        else {
314            panic!("Unexpected {message_with_pending_reply:?}");
315        };
316
317        // The connection does not stall, because there is a pending reply.
318        TestExecutor::advance_to(initial + idle_duration * 2).await;
319        futures::select! {
320            _ = stream.next() => unreachable!(),
321            _ = stalled => unreachable!(),
322            default => {},
323        }
324
325        // Now we resolve the pending reply.
326        responder.send(Ok(fio::Flags::empty())).unwrap();
327
328        // The connection should stall.
329        assert_matches!(
330            futures::join!(
331                stream.next(),
332                TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
333            ),
334            (None, Ok(Some(_)))
335        );
336    }
337
338    #[fuchsia::test(allow_stalls = false)]
339    async fn completed_stream() {
340        let initial = fasync::MonotonicInstant::from_nanos(0);
341        TestExecutor::advance_to(initial).await;
342        const DURATION_NANOS: i64 = 1_000_000;
343        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
344
345        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
346        let (stream, stalled) = until_stalled(stream, idle_duration);
347
348        let mut stalled = pin!(stalled);
349        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
350
351        // Close the proxy such that the stream completes.
352        drop(proxy);
353
354        let mut stream = pin!(stream);
355
356        {
357            // Read the `None` from the stream.
358            assert_matches!(stream.next().await, None);
359
360            // In practice the async tasks reading from the stream will exit, thus
361            // dropping the stream. We'll emulate that here.
362            drop(stream);
363        }
364
365        // Now the future should finish with `None` because the connection has
366        // terminated without stalling.
367        assert_matches!(stalled.await, Ok(None));
368    }
369
370    /// Simulate what would happen when a component serves a FIDL stream that's been
371    /// wrapped in `until_stalled`, and thus will complete and give the unbound channel
372    /// back to the user, who can then pass it back to `component_manager` in practice.
373    #[fuchsia::test(allow_stalls = false)]
374    async fn end_to_end() {
375        let initial = fasync::MonotonicInstant::from_nanos(0);
376        TestExecutor::advance_to(initial).await;
377        use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
378
379        const DURATION_NANOS: i64 = 40_000_000;
380        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
381        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
382        let (stream, stalled) = until_stalled(stream, idle_duration);
383
384        // Launch a task that serves the stream.
385        let task = fasync::Task::spawn(async move {
386            let mut stream = pin!(stream);
387            while let Some(request) = stream.try_next().await.unwrap() {
388                match request {
389                    ProtocolARequest::Foo { responder } => responder.send().unwrap(),
390                }
391            }
392        });
393
394        // Launch another task to await the `stalled` future and also let us
395        // check for it synchronously.
396        let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
397
398        // Make some requests at intervals half the idle duration. Stall should not happen.
399        let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
400        const NUM_REQUESTS: usize = 5;
401        let mut deadline = initial;
402        for _ in 0..NUM_REQUESTS {
403            proxy.foo().await.unwrap();
404            deadline += request_duration;
405            TestExecutor::advance_to(deadline).await;
406            assert!(stalled.clone().now_or_never().is_none());
407        }
408
409        // Wait for stalling.
410        deadline += idle_duration;
411        TestExecutor::advance_to(deadline).await;
412        let server_end = stalled.await;
413
414        // Ensure the server task can stop (by observing the completed stream).
415        task.await;
416
417        // Check that this channel was the original server endpoint.
418        let client = proxy.into_channel().unwrap().into_zx_channel();
419        assert_eq!(
420            client.basic_info().unwrap().koid,
421            (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
422        );
423    }
424}