Skip to main content

detect_stall/
stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running FIDL request streams until stalled.
6
7use fidl::endpoints::RequestStream;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::channel::oneshot::{self, Receiver};
11use futures::{Stream, StreamExt, ready};
12use pin_project_lite::pin_project;
13use std::future::Future as _;
14use std::pin::Pin;
15use std::sync::{Arc, Weak};
16use std::task::{Context, Poll};
17use zx::MonotonicDuration;
18
19/// [`until_stalled`] wraps a FIDL request stream of type [`RS`] into another
20/// stream yielding the same requests, but could complete prematurely if it
21/// has stalled, meaning:
22///
23/// - The underlying `request_stream` has no new messages.
24/// - There are no pending FIDL replies.
25/// - This condition has lasted for at least `debounce_interval`.
26///
27/// When that happens, the request stream will complete, and the returned future
28/// will complete with the server endpoint which has been unbound from the
29/// stream. The returned future will also complete if the request stream ended
30/// on its own without stalling.
31pub fn until_stalled<RS: RequestStream>(
32    request_stream: RS,
33    debounce_interval: MonotonicDuration,
34) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
35    let (sender, receiver) = oneshot::channel();
36    let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
37        let _ = sender.send(channel);
38    });
39    (stream, receiver)
40}
41
42/// Types that implement [`StreamAndControlHandle`] can stream out FIDL request
43/// messages and vend out weak control handles which lets you manage the connection
44/// and send events.
45pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
46    /// Obtain a weak control handle. Different from [`RequestStream::control_handle`],
47    /// the weak control handle will not prevent unbinding. You may hold on to the
48    /// weak control handle and the request stream can still complete when stalled.
49    fn control_handle(&self) -> WeakControlHandle<RS>;
50}
51
52pin_project! {
53    /// The stream returned from [`until_stalled`].
54    pub struct StallableRequestStream<RS, F> {
55        stream: Arc<Mutex<Option<RS>>>,
56        debounce_interval: MonotonicDuration,
57        unbind_callback: Option<F>,
58        #[pin]
59        timer: Option<fasync::Timer>,
60    }
61}
62
63impl<RS, F> StallableRequestStream<RS, F> {
64    /// Creates a new stallable request stream that will send the channel via `unbind_callback` when
65    /// stream is stalled.
66    pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
67        Self {
68            stream: Arc::new(Mutex::new(Some(stream))),
69            debounce_interval,
70            unbind_callback: Some(unbind_callback),
71            timer: None,
72        }
73    }
74}
75
76impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
77    StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
78{
79    fn control_handle(&self) -> WeakControlHandle<RS> {
80        WeakControlHandle { stream: Arc::downgrade(&self.stream) }
81    }
82}
83
84pub struct WeakControlHandle<RS> {
85    stream: Weak<Mutex<Option<RS>>>,
86}
87
88impl<RS> WeakControlHandle<RS>
89where
90    RS: RequestStream,
91{
92    /// If the server endpoint is not unbound, calls `user` function with the
93    /// control handle and propagates the return value. Otherwise, returns `None`.
94    ///
95    /// Typically you can use it to send an event within the closure:
96    ///
97    /// ```
98    /// let control_handle = stream.control_handle();
99    /// let result = control_handle.use_control_handle(
100    ///     |control_handle| control_handle.send_my_event());
101    /// ```
102    ///
103    pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
104    where
105        User: FnOnce(RS::ControlHandle) -> R,
106    {
107        self.stream
108            .upgrade()
109            .as_ref()
110            .map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
111            .flatten()
112    }
113}
114
115impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
116    for StallableRequestStream<RS, F>
117{
118    type Item = <RS as Stream>::Item;
119
120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        let poll_result = self
122            .stream
123            .as_ref()
124            .lock()
125            .as_mut()
126            .expect("Stream already resolved")
127            .poll_next_unpin(cx);
128        let mut this = self.project();
129        match poll_result {
130            Poll::Ready(message) => {
131                if message.is_none() {
132                    this.unbind_callback.take().unwrap()(None);
133                } else {
134                    this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
135                }
136                Poll::Ready(message)
137            }
138            Poll::Pending => {
139                loop {
140                    if this.timer.is_none() {
141                        this.timer.set(Some(fasync::Timer::new(*this.debounce_interval)));
142                    }
143                    ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
144                    this.timer.set(None);
145
146                    // Try and unbind, which will fail if there are outstanding responders or
147                    // control handles.
148                    let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
149                    match Arc::try_unwrap(inner) {
150                        Ok(inner) => {
151                            this.unbind_callback.take().unwrap()(Some(
152                                inner.into_channel().into_zx_channel(),
153                            ));
154                            return Poll::Ready(None);
155                        }
156                        Err(inner) => {
157                            // We can't unbind because there are outstanding responders or control
158                            // handles, so we'll try again after another debounce interval.
159                            *this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
160                        }
161                    }
162                }
163            }
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use assert_matches::assert_matches;
172    use fasync::TestExecutor;
173    use fidl::endpoints::Proxy;
174    use fidl_fuchsia_io as fio;
175    use fuchsia_async as fasync;
176    use futures::{FutureExt, TryStreamExt};
177    use std::pin::pin;
178
179    #[fuchsia::test(allow_stalls = false)]
180    async fn no_message() {
181        let initial = fasync::MonotonicInstant::from_nanos(0);
182        TestExecutor::advance_to(initial).await;
183        const DURATION_NANOS: i64 = 1_000_000;
184        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
185
186        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
187        let (stream, stalled) = until_stalled(stream, idle_duration);
188        let mut stream = pin!(stream);
189
190        assert_matches!(
191            futures::join!(
192                stream.next(),
193                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
194            ),
195            (None, Ok(Some(_)))
196        );
197    }
198
199    #[fuchsia::test(allow_stalls = false)]
200    async fn strong_control_handle_blocks_stalling() {
201        let initial = fasync::MonotonicInstant::from_nanos(0);
202        TestExecutor::advance_to(initial).await;
203        const DURATION_NANOS: i64 = 1_000_000;
204        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
205
206        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
207        let (stream, mut stalled) = until_stalled(stream, idle_duration);
208
209        let strong_control_handle: fio::DirectoryControlHandle =
210            stream.control_handle().use_control_handle(|x| x).unwrap();
211
212        // The connection does not stall, because there is `strong_control_handle`.
213        TestExecutor::advance_to(initial + idle_duration * 2).await;
214        let mut stream = pin!(stream.fuse());
215        futures::select! {
216            _ = stream.next() => unreachable!(),
217            _ = stalled => unreachable!(),
218            default => {},
219        }
220
221        // Once we drop it then the connection can stall.
222        drop(strong_control_handle);
223        assert_matches!(
224            futures::join!(
225                stream.next(),
226                TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
227            ),
228            (None, Ok(Some(_)))
229        );
230    }
231
232    #[fuchsia::test(allow_stalls = false)]
233    async fn weak_control_handle() {
234        let initial = fasync::MonotonicInstant::from_nanos(0);
235        TestExecutor::advance_to(initial).await;
236        const DURATION_NANOS: i64 = 1_000_000;
237        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
238
239        let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240        let (stream, stalled) = until_stalled(stream, idle_duration);
241
242        // Just getting a weak control handle should not block the connection from stalling.
243        let weak_control_handle = stream.control_handle();
244
245        let mut stream = pin!(stream);
246        assert_matches!(
247            futures::join!(
248                stream.next(),
249                TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
250            ),
251            (None, Ok(Some(_)))
252        );
253
254        weak_control_handle.use_control_handle(|_| unreachable!());
255    }
256
257    #[fuchsia::test(allow_stalls = false)]
258    async fn one_message() {
259        let initial = fasync::MonotonicInstant::from_nanos(0);
260        TestExecutor::advance_to(initial).await;
261        const DURATION_NANOS: i64 = 1_000_000;
262        let debounce_interval = MonotonicDuration::from_nanos(DURATION_NANOS);
263
264        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
265        let (stream, stalled) = until_stalled(stream, debounce_interval);
266        let mut stream = pin!(stream);
267        let mut stalled = pin!(stalled);
268
269        // The stream should have no messages.
270        let mut message = pin!(stream.next());
271        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
272        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
273
274        // Send a message to the stream and process it. The debounce timer resets.
275        TestExecutor::advance_to(initial + debounce_interval / 2).await;
276        let _ = proxy.get_flags();
277        assert_matches!(
278            TestExecutor::poll_until_stalled(&mut message).await,
279            Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) => {
280                // Reply to the request so that the stream doesn't have any pending replies.
281                responder.send(Ok(fio::Flags::empty())).unwrap();
282            }
283        );
284        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
285
286        // The stream should remain open after the old debounce interval but before the new debounce
287        // interval.
288        TestExecutor::advance_to(initial + debounce_interval).await;
289        let mut message = pin!(stream.next());
290        assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
291        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
292
293        // The stream should close after the new debounce interval.
294        TestExecutor::advance_to(initial + debounce_interval / 2 + debounce_interval).await;
295        assert_matches!(message.await, None);
296        assert_matches!(stalled.await, Ok(Some(_)));
297    }
298
299    #[fuchsia::test(allow_stalls = false)]
300    async fn pending_reply_blocks_stalling() {
301        let initial = fasync::MonotonicInstant::from_nanos(0);
302        TestExecutor::advance_to(initial).await;
303        const DURATION_NANOS: i64 = 1_000_000;
304        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
305
306        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
307        let (stream, mut stalled) = until_stalled(stream, idle_duration);
308        let mut stream = pin!(stream.fuse());
309
310        let _ = proxy.get_flags();
311
312        // Do not reply to the request, but hold on to the responder, so that there is a
313        // pending reply in the connection.
314        let message_with_pending_reply = stream.next().await.unwrap();
315        let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
316        else {
317            panic!("Unexpected {message_with_pending_reply:?}");
318        };
319
320        // The connection does not stall, because there is a pending reply.
321        TestExecutor::advance_to(initial + idle_duration * 2).await;
322        futures::select! {
323            _ = stream.next() => unreachable!(),
324            _ = stalled => unreachable!(),
325            default => {},
326        }
327
328        // Now we resolve the pending reply.
329        responder.send(Ok(fio::Flags::empty())).unwrap();
330
331        // The connection should stall.
332        assert_matches!(
333            futures::join!(
334                stream.next(),
335                TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
336            ),
337            (None, Ok(Some(_)))
338        );
339    }
340
341    #[fuchsia::test(allow_stalls = false)]
342    async fn completed_stream() {
343        let initial = fasync::MonotonicInstant::from_nanos(0);
344        TestExecutor::advance_to(initial).await;
345        const DURATION_NANOS: i64 = 1_000_000;
346        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
347
348        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
349        let (stream, stalled) = until_stalled(stream, idle_duration);
350
351        let mut stalled = pin!(stalled);
352        assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
353
354        // Close the proxy such that the stream completes.
355        drop(proxy);
356
357        let mut stream = pin!(stream);
358
359        {
360            // Read the `None` from the stream.
361            assert_matches!(stream.next().await, None);
362
363            // In practice the async tasks reading from the stream will exit, thus
364            // dropping the stream. We'll emulate that here.
365            drop(stream);
366        }
367
368        // Now the future should finish with `None` because the connection has
369        // terminated without stalling.
370        assert_matches!(stalled.await, Ok(None));
371    }
372
373    /// Simulate what would happen when a component serves a FIDL stream that's been
374    /// wrapped in `until_stalled`, and thus will complete and give the unbound channel
375    /// back to the user, who can then pass it back to `component_manager` in practice.
376    #[fuchsia::test(allow_stalls = false)]
377    async fn end_to_end() {
378        let initial = fasync::MonotonicInstant::from_nanos(0);
379        TestExecutor::advance_to(initial).await;
380        use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
381
382        const DURATION_NANOS: i64 = 40_000_000;
383        let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
384        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
385        let (stream, stalled) = until_stalled(stream, idle_duration);
386
387        // Launch a task that serves the stream.
388        let task = fasync::Task::spawn(async move {
389            let mut stream = pin!(stream);
390            while let Some(request) = stream.try_next().await.unwrap() {
391                match request {
392                    ProtocolARequest::Foo { responder } => responder.send().unwrap(),
393                }
394            }
395        });
396
397        // Launch another task to await the `stalled` future and also let us
398        // check for it synchronously.
399        let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
400
401        // Make some requests at intervals half the idle duration. Stall should not happen.
402        let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
403        const NUM_REQUESTS: usize = 5;
404        let mut deadline = initial;
405        for _ in 0..NUM_REQUESTS {
406            proxy.foo().await.unwrap();
407            deadline += request_duration;
408            TestExecutor::advance_to(deadline).await;
409            assert!(stalled.clone().now_or_never().is_none());
410        }
411
412        // Wait for stalling.
413        deadline += idle_duration;
414        TestExecutor::advance_to(deadline).await;
415        let server_end = stalled.await;
416
417        // Ensure the server task can stop (by observing the completed stream).
418        task.await;
419
420        // Check that this channel was the original server endpoint.
421        let client = proxy.into_channel().unwrap().into_zx_channel();
422        assert_eq!(
423            client.basic_info().unwrap().koid,
424            (*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
425        );
426    }
427}