Skip to main content

fuchsia_component_server/
until_stalled.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running the ServiceFs until stalled.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_io as fio;
14use fuchsia_async as fasync;
15use futures::channel::oneshot::{self, Canceled};
16use futures::{FutureExt, Stream};
17use pin_project::pin_project;
18use vfs::ToObjectRequest;
19use vfs::directory::immutable::Simple;
20use vfs::directory::immutable::connection::ImmutableConnection;
21use vfs::execution_scope::{ActiveGuard, ExecutionScope};
22use zx::MonotonicDuration;
23
24use super::{ServiceFs, ServiceObjTrait};
25
26/// A wrapper around the base [`ServiceFs`] that streams out capability connection requests.
27/// Additionally, it will yield [`Item::Stalled`] if there is no work happening in the fs
28/// and the main outgoing directory connection has not received messages for some time.
29///
30/// Use [`ServiceFs::until_stalled`] to produce an instance. Refer to details there.
31#[pin_project]
32pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
33    #[pin]
34    fs: ServiceFs<ServiceObjTy>,
35    connector: OutgoingConnector,
36    #[pin]
37    state: State,
38    debounce_interval: zx::MonotonicDuration,
39    is_terminated: bool,
40}
41
42/// The item yielded by a [`StallableServiceFs`] stream.
43pub enum Item<Output> {
44    /// A new connection request to a capability. `ServiceObjTy::Output` contains more
45    /// information identifying the capability requested. The [`ActiveGuard`] should be
46    /// held alive as long as you are processing the connection, or doing any other work
47    /// where you would like to prevent the [`ServiceFs`] from shutting down.
48    Request(Output, ActiveGuard),
49
50    /// The [`ServiceFs`] has stalled. The unbound outgoing directory server endpoint will
51    /// be returned here. The stream will complete right after this. You should typically
52    /// escrow the server endpoint back to component manager, and then exit the component.
53    Stalled(zx::Channel),
54}
55
56// Implementation detail below
57
58/// We use a state machine to detect stalling. The general structure is:
59/// - When the service fs is running, wait for the outgoing directory connection to stall.
60/// - If the outgoing directory stalled, unbind it and wait for readable.
61/// - If it is readable, we'll add back the connection to the service fs and back to wait for stall.
62/// - If the service fs finished while the outgoing directory is unbound, we'll
63///   complete the stream and return the endpoint to the user. Note that the service fs might take
64///   a while to finish even after the outgoing directory has been unbound, due to
65///   [`ActiveGuard`]s held by the user or due to other long-running connections.
66#[pin_project(project = StateProj)]
67enum State {
68    Running {
69        stalled: StalledFut,
70    },
71    // If the `channel` is `None`, the outgoing directory stream completed without stalling.
72    // We just need to wait for the `ServiceFs` to finish.
73    Stalled {
74        #[pin]
75        channel: Option<fasync::OnSignals<'static, zx::Channel>>,
76    },
77}
78
79impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
80    type Item = Item<ServiceObjTy::Output>;
81
82    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83        let mut this = self.project();
84        if *this.is_terminated {
85            return Poll::Ready(None);
86        }
87
88        // Poll the underlying service fs to handle requests.
89        //
90        // NOTE: Normally, it isn't safe to poll a stream after it returns None, but ServiceFs
91        // supports this.
92        let poll_fs = this.fs.poll_next(cx);
93        if let Poll::Ready(Some(request)) = poll_fs {
94            // If there is some connection request, always return that to the user first.
95            return match this.connector.scope.try_active_guard() {
96                Some(guard) => Poll::Ready(Some(Item::Request(request, guard))),
97                None => Poll::Ready(None),
98            };
99        }
100
101        // If we get here, the underlying service fs is either finished, or pending.
102        // Poll in a loop until the state no longer changes.
103        loop {
104            match this.state.as_mut().project() {
105                StateProj::Running { stalled } => {
106                    let channel = std::task::ready!(stalled.poll_unpin(cx));
107                    let channel = channel
108                        .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
109                    // The state will be polled on the next loop iteration.
110                    this.state.set(State::Stalled { channel });
111                }
112                StateProj::Stalled { channel } => {
113                    if let Poll::Ready(None) = poll_fs {
114                        // The service fs finished. Return the channel if we have it.
115                        *this.is_terminated = true;
116
117                        // On a multithreaded executor, it is possible that some other task took an
118                        // active guard between us polling `fs` above and here, which would mean the
119                        // scope doesn't immediately shutdown. This isn't something we need to
120                        // support so we won't worry about this.
121                        this.connector.scope.shutdown();
122
123                        return Poll::Ready(
124                            channel
125                                .as_pin_mut()
126                                .take()
127                                .map(|wait| Item::Stalled(wait.take_handle().into())),
128                        );
129                    }
130                    if channel.is_none() {
131                        // The outgoing directory FIDL stream completed (client closed or
132                        // errored) without stalling, but the service fs is processing
133                        // other requests. Simply wait for that to finish.
134                        return Poll::Pending;
135                    }
136                    // Otherwise, arrange to be polled again if the channel is readable.
137                    let mut on_signals = channel.as_pin_mut().unwrap();
138                    let _ = std::task::ready!(on_signals.as_mut().poll(cx));
139                    // Server endpoint is readable again. Restore the connection.
140                    let handle = on_signals.take_handle().into();
141                    let stalled = this.connector.serve(handle, *this.debounce_interval);
142                    // The state will be polled on the next loop iteration.
143                    this.state.set(State::Running { stalled });
144                }
145            }
146        }
147    }
148}
149
150struct OutgoingConnector {
151    flags: fio::Flags,
152    scope: ExecutionScope,
153    dir: Arc<Simple>,
154}
155
156impl OutgoingConnector {
157    /// Adds a stallable outgoing directory connection.
158    ///
159    /// If the request stream completed, the returned future will resolve with `None`.
160    /// If the request stream did not encounter new requests for `debounce_interval`, it will be
161    /// unbound, and the returned future will resolve with `Some(channel)`.
162    fn serve(
163        &mut self,
164        server_end: ServerEnd<fio::DirectoryMarker>,
165        debounce_interval: MonotonicDuration,
166    ) -> StalledFut {
167        let (unbound_sender, unbound_receiver) = oneshot::channel();
168        let object_request = self.flags.to_object_request(server_end);
169        let scope = self.scope.clone();
170        let dir = self.dir.clone();
171        let flags = self.flags;
172        scope.clone().spawn(object_request.handle_async(async move |object_request| {
173            ImmutableConnection::create_transform_stream(
174                scope,
175                dir,
176                flags,
177                object_request,
178                move |stream| {
179                    StallableRequestStream::new(
180                        stream,
181                        debounce_interval,
182                        // This function will be called with the server endpoint when
183                        // the directory request stream is stalled for `debounce_interval`
184                        move |maybe_channel: Option<zx::Channel>| {
185                            _ = unbound_sender.send(maybe_channel);
186                        },
187                    )
188                },
189            )
190            .await
191        }));
192        StalledFut(unbound_receiver)
193    }
194}
195
196/// Wrapper around oneshot channel that maps the sender end being dropped into a `None`.
197struct StalledFut(oneshot::Receiver<Option<zx::Channel>>);
198
199impl Future for StalledFut {
200    type Output = Option<zx::Channel>;
201
202    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203        match self.0.poll_unpin(cx) {
204            Poll::Ready(Ok(maybe_channel)) => Poll::Ready(maybe_channel),
205            Poll::Ready(Err(Canceled)) => Poll::Ready(None),
206            Poll::Pending => Poll::Pending,
207        }
208    }
209}
210
211impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
212    pub(crate) fn new(
213        mut fs: ServiceFs<ServiceObjTy>,
214        debounce_interval: zx::MonotonicDuration,
215    ) -> Self {
216        let channel_queue =
217            fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
218        assert!(
219            channel_queue.len() == 1,
220            "Must have exactly one connection to serve, \
221            e.g. did you call ServiceFs::take_and_serve_directory_handle?"
222        );
223        let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
224        let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
225        let scope = fs.scope.clone();
226        let dir = fs.dir.clone();
227        let mut connector = OutgoingConnector { flags, scope, dir };
228        let stalled = connector.serve(server_end, debounce_interval);
229        Self {
230            fs,
231            connector,
232            state: State::Running { stalled },
233            debounce_interval,
234            is_terminated: false,
235        }
236    }
237
238    /// Returns an [`ActiveGuard`] that will prevent the [`ServiceFs`] from shutting down until the
239    /// [`ActiveGuard`] is dropped.  This will return None if an active guard cannot be obtained
240    /// (e.g.  if the StallableServiceFs stream has terminated, or is just about to terminate).
241    pub fn try_active_guard(&self) -> Option<ActiveGuard> {
242        self.connector.scope.try_active_guard()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use assert_matches::assert_matches;
250    use fasync::TestExecutor;
251    use fidl::endpoints::ClientEnd;
252    use fidl_fuchsia_component_client_test::{
253        ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
254    };
255    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
256    use fuchsia_component_directory::open_directory_async;
257    use fuchsia_sync::Mutex;
258    use futures::future::{BoxFuture, FusedFuture};
259    use futures::{StreamExt, TryStreamExt, pin_mut, select};
260    use std::sync::atomic::{AtomicBool, Ordering};
261    use test_util::Counter;
262    use zx::AsHandleRef;
263
264    enum Requests {
265        ServiceA(ProtocolARequestStream),
266    }
267
268    #[derive(Clone)]
269    struct MockServer {
270        call_count: Arc<Counter>,
271        stalled: Arc<AtomicBool>,
272        server_end: Arc<Mutex<Option<zx::Channel>>>,
273    }
274
275    impl MockServer {
276        fn new() -> Self {
277            let call_count = Arc::new(Counter::new(0));
278            let stalled = Arc::new(AtomicBool::new(false));
279            let server_end = Arc::new(Mutex::new(None));
280            Self { call_count, stalled, server_end }
281        }
282
283        fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
284            let stalled = self.stalled.clone();
285            let call_count = self.call_count.clone();
286            let server_end = self.server_end.clone();
287            async move {
288                match item {
289                    Item::Request(requests, active_guard) => {
290                        let _active_guard = active_guard;
291                        let Requests::ServiceA(mut request_stream) = requests;
292                        while let Ok(Some(request)) = request_stream.try_next().await {
293                            match request {
294                                ProtocolARequest::Foo { responder } => {
295                                    call_count.inc();
296                                    let _ = responder.send();
297                                }
298                            }
299                        }
300                    }
301                    Item::Stalled(channel) => {
302                        *server_end.lock() = Some(channel);
303                        stalled.store(true, Ordering::SeqCst);
304                    }
305                }
306            }
307            .boxed()
308        }
309
310        #[track_caller]
311        fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
312            let reclaimed_server_end: zx::Channel = self.server_end.lock().take().unwrap();
313            assert_eq!(
314                client_end.as_handle_ref().koid().unwrap(),
315                reclaimed_server_end.basic_info().unwrap().related_koid
316            )
317        }
318    }
319
320    /// Initializes fake time; creates VFS with a single mock server, and returns them.
321    async fn setup_test(
322        server_end: ServerEnd<fio::DirectoryMarker>,
323    ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
324        let initial = fasync::MonotonicInstant::from_nanos(0);
325        TestExecutor::advance_to(initial).await;
326        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
327
328        let mut fs = ServiceFs::new();
329        fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
330
331        let mock_server = MockServer::new();
332        let mock_server_clone = mock_server.clone();
333        let fs = fs
334            .until_stalled(IDLE_DURATION)
335            .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
336
337        (initial, mock_server, fs)
338    }
339
340    #[fuchsia::test(allow_stalls = false)]
341    async fn drain_request() {
342        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
343        const NUM_FOO_REQUESTS: usize = 10;
344        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
345        let (initial, mock_server, fs) = setup_test(server_end).await;
346        pin_mut!(fs);
347
348        let mut proxies = Vec::new();
349        for _ in 0..NUM_FOO_REQUESTS {
350            proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
351        }
352
353        // Accept the connections.
354        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
355
356        // Active FIDL connections block idle, no matter the wait.
357        TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
358        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
359
360        // Make some requests.
361        for proxy in proxies.iter() {
362            select! {
363                result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
364                _ = fs => unreachable!(),
365            };
366        }
367
368        // Dropping FIDL connections free the ServiceFs to complete.
369        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
370        drop(proxies);
371        fs.await;
372
373        // Requests were handled.
374        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
375        assert!(mock_server.stalled.load(Ordering::SeqCst));
376        mock_server.assert_fs_gave_back_server_end(client_end);
377    }
378
379    #[fuchsia::test(allow_stalls = false)]
380    async fn no_request() {
381        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
382        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
383        let (initial, mock_server, fs) = setup_test(server_end).await;
384        pin_mut!(fs);
385
386        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
387        TestExecutor::advance_to(initial + IDLE_DURATION).await;
388        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
389
390        assert_eq!(mock_server.call_count.get(), 0);
391        assert!(mock_server.stalled.load(Ordering::SeqCst));
392        mock_server.assert_fs_gave_back_server_end(client_end);
393    }
394
395    #[fuchsia::test(allow_stalls = false)]
396    async fn outgoing_dir_client_closed() {
397        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
398        let (_initial, mock_server, fs) = setup_test(server_end).await;
399        pin_mut!(fs);
400
401        drop(client_end);
402        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
403
404        assert_eq!(mock_server.call_count.get(), 0);
405        assert!(!mock_server.stalled.load(Ordering::SeqCst));
406        assert!(mock_server.server_end.lock().is_none());
407    }
408
409    #[fuchsia::test(allow_stalls = false)]
410    async fn request_then_stalled() {
411        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
412
413        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
414        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
415
416        let foo = proxy.foo().fuse();
417        pin_mut!(foo);
418        assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
419
420        let (initial, mock_server, fs) = setup_test(server_end).await;
421        pin_mut!(fs);
422
423        // Poll the fs to process the FIDL.
424        assert_eq!(mock_server.call_count.get(), 0);
425        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
426        assert_eq!(mock_server.call_count.get(), 1);
427        assert_matches!(foo.await, Ok(_));
428
429        drop(proxy);
430        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
431        TestExecutor::advance_to(initial + IDLE_DURATION).await;
432        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
433
434        assert_eq!(mock_server.call_count.get(), 1);
435        assert!(mock_server.stalled.load(Ordering::SeqCst));
436        mock_server.assert_fs_gave_back_server_end(client_end);
437    }
438
439    #[fuchsia::test(allow_stalls = false)]
440    async fn stalled_then_request() {
441        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
442        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
443        let (initial, mock_server, fs) = setup_test(server_end).await;
444        pin_mut!(fs);
445
446        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
447        TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
448        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
449
450        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
451        select! {
452            result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
453            _ = fs => unreachable!(),
454        };
455        assert_eq!(mock_server.call_count.get(), 1);
456
457        drop(proxy);
458        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
459        TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
460        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
461
462        assert!(mock_server.stalled.load(Ordering::SeqCst));
463        mock_server.assert_fs_gave_back_server_end(client_end);
464    }
465
466    /// If periodic FIDL connections are made at an interval below the idle
467    /// duration, the service fs should not stall.
468    ///
469    /// If periodic FIDL connections are made at an interval above the idle
470    /// duration, the service fs should stall.
471    #[fuchsia::test(allow_stalls = false)]
472    async fn periodic_requests() {
473        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
474        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
475        let (mut current_time, mock_server, fs) = setup_test(server_end).await;
476        let fs = fasync::Task::local(fs);
477
478        // Interval below the idle duration.
479        const NUM_FOO_REQUESTS: usize = 10;
480        for _ in 0..NUM_FOO_REQUESTS {
481            let request_interval = IDLE_DURATION / 2;
482            current_time += request_interval;
483            TestExecutor::advance_to(current_time).await;
484            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
485            assert_matches!(proxy.foo().await, Ok(_));
486        }
487        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
488
489        // Interval above the idle duration.
490        for _ in 0..NUM_FOO_REQUESTS {
491            let request_interval = IDLE_DURATION * 2;
492            current_time += request_interval;
493            TestExecutor::advance_to(current_time).await;
494            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
495            let foo = proxy.foo();
496            pin_mut!(foo);
497            assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
498        }
499        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
500
501        fs.await;
502        mock_server.assert_fs_gave_back_server_end(client_end);
503    }
504
505    /// If there are other connections to the outgoing directory, then the fs will not return unless
506    /// those connections are closed by the client. That's because we currently don't have a way to
507    /// escrow those connections, so we don't want to disrupt them.
508    #[fuchsia::test(allow_stalls = false)]
509    async fn some_other_outgoing_dir_connection_blocks_stalling() {
510        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
511        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
512        let (initial, mock_server, fs) = setup_test(server_end).await;
513        pin_mut!(fs);
514
515        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
516
517        {
518            // We can open another connection that's not the main outgoing directory connection,
519            let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
520
521            TestExecutor::advance_to(initial + IDLE_DURATION).await;
522            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
523
524            assert_matches!(
525                fuchsia_fs::directory::readdir(&svc).await,
526                Ok(ref entries)
527                if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
528            );
529            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
530
531            // ... and the service fs won't stall even if we wait past the timeout.
532            TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
533            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
534        }
535
536        // Closing that connection frees the fs to stall.
537        fs.await;
538        assert!(mock_server.stalled.load(Ordering::SeqCst));
539        mock_server.assert_fs_gave_back_server_end(client_end);
540    }
541
542    /// Emulates a component that receives a bunch of requests, processes them, and then stalls.
543    /// After that, if the outgoing directory is readable, serve it again. No request should be
544    /// dropped, and the fs should stall a bunch of times.
545    #[fuchsia::test(allow_stalls = false)]
546    async fn end_to_end() {
547        let initial = fasync::MonotonicInstant::from_nanos(0);
548        TestExecutor::advance_to(initial).await;
549
550        let mock_server = MockServer::new();
551        let mock_server_clone = mock_server.clone();
552
553        const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
554        let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
555        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
556
557        let component_task = async move {
558            let mut server_end = Some(server_end);
559            let mut loop_count = 0;
560            loop {
561                let mut fs = ServiceFs::new();
562                fs.serve_connection(server_end.unwrap())
563                    .unwrap()
564                    .dir("svc")
565                    .add_fidl_service(Requests::ServiceA);
566
567                let mock_server_clone = mock_server_clone.clone();
568                fs.until_stalled(idle_duration)
569                    .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
570                    .await;
571
572                let stalled_server_end = mock_server.server_end.lock().take();
573                let Some(stalled_server_end) = stalled_server_end else {
574                    // Client closed.
575                    return loop_count;
576                };
577
578                fasync::OnSignals::new(
579                    &stalled_server_end,
580                    zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
581                )
582                .await
583                .unwrap();
584                server_end = Some(stalled_server_end.into());
585                loop_count += 1;
586            }
587        };
588        let component_task = fasync::Task::local(component_task);
589
590        // Make connection requests at increasing intervals, starting from below the idle duration,
591        // to above the idle duration.
592        let mut deadline = initial;
593        const NUM_REQUESTS: usize = 30;
594        for delay_factor in 0..NUM_REQUESTS {
595            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
596            proxy.foo().await.unwrap();
597            drop(proxy);
598            deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
599            TestExecutor::advance_to(deadline).await;
600        }
601
602        drop(client_end);
603        let loop_count = component_task.await;
604        // Why 25: there are 30 requests. The first 5 intervals are below the idle duration.
605        assert_eq!(loop_count, 25);
606        assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
607        assert!(mock_server.stalled.load(Ordering::SeqCst));
608    }
609
610    #[fuchsia::test(allow_stalls = false)]
611    async fn canceled_stalled_fut() {
612        // Verify that `StallableServiceFs` correctly handles cancellation of the underlying oneshot
613        // channel. The oneshot channel sender is owned by the VFS directory connection task. When
614        // we shut down the execution scope, the task is aborted and the sender is dropped,
615        // resulting in an `Err(Canceled)` on the receiver end (`StalledFut`).
616        //
617        // Since the directory connection was aborted without stalling, the connection handle is
618        // lost. `StallableServiceFs` should successfully observe this cancellation and yield `None`
619        // to terminate the stream, rather than hanging indefinitely.
620        let (_client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
621        let mut fs: ServiceFs<crate::ServiceObj<'_, ()>> = ServiceFs::new();
622        fs.serve_connection(server_end).unwrap();
623        let fs = fs.until_stalled(MonotonicDuration::from_nanos(1_000_000));
624
625        fs.connector.scope.shutdown();
626        let mut fs = std::pin::pin!(fs);
627        assert!(fs.next().await.is_none());
628    }
629}