Skip to main content

detect_stall/
stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running FIDL request streams until stalled.
6
7use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19/// [`until_stalled`] wraps a FIDL request stream of type [`RS`] into another
20/// stream yielding the same requests, but could complete prematurely if it
21/// has stalled, meaning:
22///
23/// - The underlying `request_stream` has no new messages.
24/// - There are no pending FIDL replies.
25/// - This condition has lasted for at least `debounce_interval`.
26///
27/// When that happens, the request stream will complete, and the returned future
28/// will complete with the server endpoint which has been unbound from the
29/// stream. The returned future will also complete if the request stream ended
30/// on its own without stalling.
31pub fn until_stalled<RS: RequestStream>(
32    request_stream: RS,
33    debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35    let (sender, receiver) = oneshot::channel();
36    let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37        let _ = sender.send(channel);
38    });
39    (stream, receiver)
40}
41
42/// Types that implement [`StreamAndControlHandle`] can stream out FIDL request
43/// messages and vend out weak control handles which lets you manage the connection
44/// and send events.
45pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46    /// Obtain a weak control handle. Different from [`RequestStream::control_handle`],
47    /// the weak control handle will not prevent unbinding. You may hold on to the
48    /// weak control handle and the request stream can still complete when stalled.
49    fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53    /// The stream returned from [`until_stalled`].
54    pub struct StallableRequestStream<RS, F> {
55        stream: Arc<Mutex<Option<RS>>>,
56        debounce_interval: MonotonicDuration,
57        unbind_callback: Option<F>,
58        #[pin]
59        timer: Option<fasync::Timer>,
60    }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64    /// Creates a new stallable request stream that will send the channel via `unbind_callback` when
65    /// stream is stalled.
66    pub fn new(stream: RS, mut debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67        if debounce_interval < MonotonicDuration::from_millis(1) {
68            debounce_interval = MonotonicDuration::from_millis(1);
69        }
70        Self {
71            stream: Arc::new(Mutex::new(Some(stream))),
72            debounce_interval,
73            unbind_callback: Some(unbind_callback),
74            timer: None,
75        }
76    }
77}
78
79impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
80    StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
81{
82    fn control_handle(&self) -> WeakControlHandle<RS> {
83        WeakControlHandle { stream: Arc::downgrade(&self.stream) }
84    }
85}
86
87pub struct WeakControlHandle<RS> {
88    stream: Weak<Mutex<Option<RS>>>,
89}
90
91impl<RS> WeakControlHandle<RS>
92where
93    RS: RequestStream,
94{
95    /// If the server endpoint is not unbound, calls `user` function with the
96    /// control handle and propagates the return value. Otherwise, returns `None`.
97    ///
98    /// Typically you can use it to send an event within the closure:
99    ///
100    /// ```
101    /// let control_handle = stream.control_handle();
102    /// let result = control_handle.use_control_handle(
103    ///     |control_handle| control_handle.send_my_event());
104    /// ```
105    ///
106    pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
107    where
108        User: FnOnce(RS::ControlHandle) -> R,
109    {
110        self.stream
111            .upgrade()
112            .as_ref()
113            .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
114            .flatten()
115    }
116}
117
118impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
119    for StallableRequestStream<RS, F>
120{
121    type Item = <RS as Stream>::Item;
122
123    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        let poll_result = self
125            .stream
126            .as_ref()
127            .lock()
128            .as_mut()
129            .expect("Stream already resolved")
130            .poll_next_unpin(cx);
131        let mut this = self.project();
132        match poll_result {
133            Poll::Ready(message) => {
134                if message.is_none() {
135                    this.unbind_callback.take().unwrap()(None);
136                } else {
137                    this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
138                }
139                Poll::Ready(message)
140            }
141            Poll::Pending => {
142                if this.timer.is_none() {
143                    this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
144                }
145                ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
146                this.timer.set(None);
147
148                // Try and unbind, which will fail if there are outstanding responders or
149                // control handles.
150                let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
151                match Arc::try_unwrap(inner) {
152                    Ok(inner) => {
153                        this.unbind_callback.take().unwrap()(Some(
154                            inner.into_channel().into_zx_channel(),
155                        ));
156                        Poll::Ready(None)
157                    }
158                    Err(inner) => {
159                        // We can't unbind because there are outstanding responders or control
160                        // handles, so we'll try again after another debounce interval.
161                        *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
162                        this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
163                        if this.timer.as_mut().as_pin_mut().unwrap().poll(cx).is_ready()
164                        {
165                            cx.waker().wake_by_ref();
166                        }
167                        Poll::Pending
168                    }
169                }
170            }
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use assert_matches::assert_matches;
179    use fasync::TestExecutor;
180    use fidl::endpoints::Proxy;
181    use fidl_fuchsia_io as fio;
182    use fuchsia_async as fasync;
183    use futures::{FutureExt, TryStreamExt};
184    use std::pin::pin;
185
186    #[fuchsia::test(allow_stalls = false)]
187    async fn no_message() {
188        let initial = fasync::MonotonicInstant::from_nanos(0);
189        TestExecutor::advance_to(initial).await;
190        const DURATION_NANOS: i64 = 1_000_000;
191        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
192
193        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
194        let (stream, stalled) = until_stalled(stream, idle_duration);
195        let mut stream = pin!(stream);
196
197        assert_matches!(
198            futures::join!(
199                stream.next(),
200                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
201            ),
202            (None, Ok(Some(_)))
203        );
204    }
205
206    #[fuchsia::test(allow_stalls = false)]
207    async fn strong_control_handle_blocks_stalling() {
208        let initial = fasync::MonotonicInstant::from_nanos(0);
209        TestExecutor::advance_to(initial).await;
210        const DURATION_NANOS: i64 = 1_000_000;
211        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
212
213        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
214        let (stream, mut stalled) = until_stalled(stream, idle_duration);
215
216        let strong_control_handle: fio::DirectoryControlHandle =
217            stream.control_handle().use_control_handle(|x| x).unwrap();
218
219        // The connection does not stall, because there is `strong_control_handle`.
220        TestExecutor::advance_to(initial + idle_duration * 2).await;
221        let mut stream = pin!(stream.fuse());
222        futures::select! {
223            _ = stream.next() => unreachable!(),
224            _ = stalled => unreachable!(),
225            default => {},
226        }
227
228        // Once we drop it then the connection can stall.
229        drop(strong_control_handle);
230        assert_matches!(
231            futures::join!(
232                stream.next(),
233                TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
234            ),
235            (None, Ok(Some(_)))
236        );
237    }
238
239    #[fuchsia::test(allow_stalls = false)]
240    async fn weak_control_handle() {
241        let initial = fasync::MonotonicInstant::from_nanos(0);
242        TestExecutor::advance_to(initial).await;
243        const DURATION_NANOS: i64 = 1_000_000;
244        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
245
246        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
247        let (stream, stalled) = until_stalled(stream, idle_duration);
248
249        // Just getting a weak control handle should not block the connection from stalling.
250        let weak_control_handle = stream.control_handle();
251
252        let mut stream = pin!(stream);
253        assert_matches!(
254            futures::join!(
255                stream.next(),
256                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
257            ),
258            (None, Ok(Some(_)))
259        );
260
261        weak_control_handle.use_control_handle(|_| unreachable!());
262    }
263
264    #[fuchsia::test(allow_stalls = false)]
265    async fn one_message() {
266        let initial = fasync::MonotonicInstant::from_nanos(0);
267        TestExecutor::advance_to(initial).await;
268        const DURATION_NANOS: i64 = 1_000_000;
269        let debounce_interval = MonotonicDuration::from_nanos(DURATION_NANOS);
270
271        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
272        let (stream, stalled) = until_stalled(stream, debounce_interval);
273        let mut stream = pin!(stream);
274        let mut stalled = pin!(stalled);
275
276        // The stream should have no messages.
277        let mut message = pin!(stream.next());
278        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
279        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
280
281        // Send a message to the stream and process it. The debounce timer resets.
282        TestExecutor::advance_to(initial + debounce_interval / 2).await;
283        let _ = proxy.get_flags();
284        assert_matches!(
285            TestExecutor::poll_until_stalled(&mut message).await,
286            Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) => {
287                // Reply to the request so that the stream doesn't have any pending replies.
288                responder.send(Ok(fio::Flags::empty())).unwrap();
289            }
290        );
291        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
292
293        // The stream should remain open after the old debounce interval but before the new debounce
294        // interval.
295        TestExecutor::advance_to(initial + debounce_interval).await;
296        let mut message = pin!(stream.next());
297        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
298        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
299
300        // The stream should close after the new debounce interval.
301        TestExecutor::advance_to(initial + debounce_interval / 2 + debounce_interval).await;
302        assert_matches!(message.await, None);
303        assert_matches!(stalled.await, Ok(Some(_)));
304    }
305
306    #[fuchsia::test(allow_stalls = false)]
307    async fn pending_reply_blocks_stalling() {
308        let initial = fasync::MonotonicInstant::from_nanos(0);
309        TestExecutor::advance_to(initial).await;
310        const DURATION_NANOS: i64 = 1_000_000;
311        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
312
313        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
314        let (stream, mut stalled) = until_stalled(stream, idle_duration);
315        let mut stream = pin!(stream.fuse());
316
317        let _ = proxy.get_flags();
318
319        // Do not reply to the request, but hold on to the responder, so that there is a
320        // pending reply in the connection.
321        let message_with_pending_reply = stream.next().await.unwrap();
322        let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
323        else {
324            panic!("Unexpected {message_with_pending_reply:?}");
325        };
326
327        // The connection does not stall, because there is a pending reply.
328        TestExecutor::advance_to(initial + idle_duration * 2).await;
329        futures::select! {
330            _ = stream.next() => unreachable!(),
331            _ = stalled => unreachable!(),
332            default => {},
333        }
334
335        // Now we resolve the pending reply.
336        responder.send(Ok(fio::Flags::empty())).unwrap();
337
338        // The connection should stall.
339        assert_matches!(
340            futures::join!(
341                stream.next(),
342                TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
343            ),
344            (None, Ok(Some(_)))
345        );
346    }
347
348    #[fuchsia::test(allow_stalls = false)]
349    async fn completed_stream() {
350        let initial = fasync::MonotonicInstant::from_nanos(0);
351        TestExecutor::advance_to(initial).await;
352        const DURATION_NANOS: i64 = 1_000_000;
353        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
354
355        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
356        let (stream, stalled) = until_stalled(stream, idle_duration);
357
358        let mut stalled = pin!(stalled);
359        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
360
361        // Close the proxy such that the stream completes.
362        drop(proxy);
363
364        let mut stream = pin!(stream);
365
366        {
367            // Read the `None` from the stream.
368            assert_matches!(stream.next().await, None);
369
370            // In practice the async tasks reading from the stream will exit, thus
371            // dropping the stream. We'll emulate that here.
372            drop(stream);
373        }
374
375        // Now the future should finish with `None` because the connection has
376        // terminated without stalling.
377        assert_matches!(stalled.await, Ok(None));
378    }
379
380    /// Simulate what would happen when a component serves a FIDL stream that's been
381    /// wrapped in `until_stalled`, and thus will complete and give the unbound channel
382    /// back to the user, who can then pass it back to `component_manager` in practice.
383    #[fuchsia::test(allow_stalls = false)]
384    async fn end_to_end() {
385        let initial = fasync::MonotonicInstant::from_nanos(0);
386        TestExecutor::advance_to(initial).await;
387        use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
388
389        const DURATION_NANOS: i64 = 40_000_000;
390        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
391        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
392        let (stream, stalled) = until_stalled(stream, idle_duration);
393
394        // Launch a task that serves the stream.
395        let task = fasync::Task::spawn(async move {
396            let mut stream = pin!(stream);
397            while let Some(request) = stream.try_next().await.unwrap() {
398                match request {
399                    ProtocolARequest::Foo { responder } => responder.send().unwrap(),
400                }
401            }
402        });
403
404        // Launch another task to await the `stalled` future and also let us
405        // check for it synchronously.
406        let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
407
408        // Make some requests at intervals half the idle duration. Stall should not happen.
409        let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
410        const NUM_REQUESTS: usize = 5;
411        let mut deadline = initial;
412        for _ in 0..NUM_REQUESTS {
413            proxy.foo().await.unwrap();
414            deadline += request_duration;
415            TestExecutor::advance_to(deadline).await;
416            assert!(stalled.clone().now_or_never().is_none());
417        }
418
419        // Wait for stalling.
420        deadline += idle_duration;
421        TestExecutor::advance_to(deadline).await;
422        let server_end = stalled.await;
423
424        // Ensure the server task can stop (by observing the completed stream).
425        task.await;
426
427        // Check that this channel was the original server endpoint.
428        let client = proxy.into_channel().unwrap().into_zx_channel();
429        assert_eq!(
430            client.basic_info().unwrap().koid,
431            (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
432        );
433    }
434}