fuchsia_component_server/
until_stalled.rs1use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_io as fio;
14use fuchsia_async as fasync;
15use futures::channel::oneshot::{self, Canceled};
16use futures::{FutureExt, Stream};
17use pin_project::pin_project;
18use vfs::ToObjectRequest;
19use vfs::directory::immutable::Simple;
20use vfs::directory::immutable::connection::ImmutableConnection;
21use vfs::execution_scope::{ActiveGuard, ExecutionScope};
22use zx::MonotonicDuration;
23
24use super::{ServiceFs, ServiceObjTrait};
25
26#[pin_project]
32pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
33 #[pin]
34 fs: ServiceFs<ServiceObjTy>,
35 connector: OutgoingConnector,
36 #[pin]
37 state: State,
38 debounce_interval: zx::MonotonicDuration,
39 is_terminated: bool,
40}
41
42pub enum Item<Output> {
44 Request(Output, ActiveGuard),
49
50 Stalled(zx::Channel),
54}
55
56#[pin_project(project = StateProj)]
67enum State {
68 Running {
69 stalled: StalledFut,
70 },
71 Stalled {
74 #[pin]
75 channel: Option<fasync::OnSignals<'static, zx::Channel>>,
76 },
77}
78
79impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
80 type Item = Item<ServiceObjTy::Output>;
81
82 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 let mut this = self.project();
84 if *this.is_terminated {
85 return Poll::Ready(None);
86 }
87
88 let poll_fs = this.fs.poll_next(cx);
93 if let Poll::Ready(Some(request)) = poll_fs {
94 return match this.connector.scope.try_active_guard() {
96 Some(guard) => Poll::Ready(Some(Item::Request(request, guard))),
97 None => Poll::Ready(None),
98 };
99 }
100
101 loop {
104 match this.state.as_mut().project() {
105 StateProj::Running { stalled } => {
106 let channel = std::task::ready!(stalled.poll_unpin(cx));
107 let channel = channel
108 .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
109 this.state.set(State::Stalled { channel });
111 }
112 StateProj::Stalled { channel } => {
113 if let Poll::Ready(None) = poll_fs {
114 *this.is_terminated = true;
116
117 this.connector.scope.shutdown();
122
123 return Poll::Ready(
124 channel
125 .as_pin_mut()
126 .take()
127 .map(|wait| Item::Stalled(wait.take_handle().into())),
128 );
129 }
130 if channel.is_none() {
131 return Poll::Pending;
135 }
136 let mut on_signals = channel.as_pin_mut().unwrap();
138 let _ = std::task::ready!(on_signals.as_mut().poll(cx));
139 let handle = on_signals.take_handle().into();
141 let stalled = this.connector.serve(handle, *this.debounce_interval);
142 this.state.set(State::Running { stalled });
144 }
145 }
146 }
147 }
148}
149
150struct OutgoingConnector {
151 flags: fio::Flags,
152 scope: ExecutionScope,
153 dir: Arc<Simple>,
154}
155
156impl OutgoingConnector {
157 fn serve(
163 &mut self,
164 server_end: ServerEnd<fio::DirectoryMarker>,
165 debounce_interval: MonotonicDuration,
166 ) -> StalledFut {
167 let (unbound_sender, unbound_receiver) = oneshot::channel();
168 let object_request = self.flags.to_object_request(server_end);
169 let scope = self.scope.clone();
170 let dir = self.dir.clone();
171 let flags = self.flags;
172 scope.clone().spawn(object_request.handle_async(async move |object_request| {
173 ImmutableConnection::create_transform_stream(
174 scope,
175 dir,
176 flags,
177 object_request,
178 move |stream| {
179 StallableRequestStream::new(
180 stream,
181 debounce_interval,
182 move |maybe_channel: Option<zx::Channel>| {
185 _ = unbound_sender.send(maybe_channel);
186 },
187 )
188 },
189 )
190 .await
191 }));
192 StalledFut(unbound_receiver)
193 }
194}
195
196struct StalledFut(oneshot::Receiver<Option<zx::Channel>>);
198
199impl Future for StalledFut {
200 type Output = Option<zx::Channel>;
201
202 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203 match self.0.poll_unpin(cx) {
204 Poll::Ready(Ok(maybe_channel)) => Poll::Ready(maybe_channel),
205 Poll::Ready(Err(Canceled)) => Poll::Ready(None),
206 Poll::Pending => Poll::Pending,
207 }
208 }
209}
210
211impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
212 pub(crate) fn new(
213 mut fs: ServiceFs<ServiceObjTy>,
214 debounce_interval: zx::MonotonicDuration,
215 ) -> Self {
216 let channel_queue =
217 fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
218 assert!(
219 channel_queue.len() == 1,
220 "Must have exactly one connection to serve, \
221 e.g. did you call ServiceFs::take_and_serve_directory_handle?"
222 );
223 let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
224 let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
225 let scope = fs.scope.clone();
226 let dir = fs.dir.clone();
227 let mut connector = OutgoingConnector { flags, scope, dir };
228 let stalled = connector.serve(server_end, debounce_interval);
229 Self {
230 fs,
231 connector,
232 state: State::Running { stalled },
233 debounce_interval,
234 is_terminated: false,
235 }
236 }
237
238 pub fn try_active_guard(&self) -> Option<ActiveGuard> {
242 self.connector.scope.try_active_guard()
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use assert_matches::assert_matches;
250 use fasync::TestExecutor;
251 use fidl::endpoints::ClientEnd;
252 use fidl_fuchsia_component_client_test::{
253 ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
254 };
255 use fuchsia_component_client::connect_to_protocol_at_dir_svc;
256 use fuchsia_component_directory::open_directory_async;
257 use fuchsia_sync::Mutex;
258 use futures::future::{BoxFuture, FusedFuture};
259 use futures::{StreamExt, TryStreamExt, pin_mut, select};
260 use std::sync::atomic::{AtomicBool, Ordering};
261 use test_util::Counter;
262 use zx::AsHandleRef;
263
264 enum Requests {
265 ServiceA(ProtocolARequestStream),
266 }
267
268 #[derive(Clone)]
269 struct MockServer {
270 call_count: Arc<Counter>,
271 stalled: Arc<AtomicBool>,
272 server_end: Arc<Mutex<Option<zx::Channel>>>,
273 }
274
275 impl MockServer {
276 fn new() -> Self {
277 let call_count = Arc::new(Counter::new(0));
278 let stalled = Arc::new(AtomicBool::new(false));
279 let server_end = Arc::new(Mutex::new(None));
280 Self { call_count, stalled, server_end }
281 }
282
283 fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
284 let stalled = self.stalled.clone();
285 let call_count = self.call_count.clone();
286 let server_end = self.server_end.clone();
287 async move {
288 match item {
289 Item::Request(requests, active_guard) => {
290 let _active_guard = active_guard;
291 let Requests::ServiceA(mut request_stream) = requests;
292 while let Ok(Some(request)) = request_stream.try_next().await {
293 match request {
294 ProtocolARequest::Foo { responder } => {
295 call_count.inc();
296 let _ = responder.send();
297 }
298 }
299 }
300 }
301 Item::Stalled(channel) => {
302 *server_end.lock() = Some(channel);
303 stalled.store(true, Ordering::SeqCst);
304 }
305 }
306 }
307 .boxed()
308 }
309
310 #[track_caller]
311 fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
312 let reclaimed_server_end: zx::Channel = self.server_end.lock().take().unwrap();
313 assert_eq!(
314 client_end.as_handle_ref().koid().unwrap(),
315 reclaimed_server_end.basic_info().unwrap().related_koid
316 )
317 }
318 }
319
320 async fn setup_test(
322 server_end: ServerEnd<fio::DirectoryMarker>,
323 ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
324 let initial = fasync::MonotonicInstant::from_nanos(0);
325 TestExecutor::advance_to(initial).await;
326 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
327
328 let mut fs = ServiceFs::new();
329 fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
330
331 let mock_server = MockServer::new();
332 let mock_server_clone = mock_server.clone();
333 let fs = fs
334 .until_stalled(IDLE_DURATION)
335 .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
336
337 (initial, mock_server, fs)
338 }
339
340 #[fuchsia::test(allow_stalls = false)]
341 async fn drain_request() {
342 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
343 const NUM_FOO_REQUESTS: usize = 10;
344 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
345 let (initial, mock_server, fs) = setup_test(server_end).await;
346 pin_mut!(fs);
347
348 let mut proxies = Vec::new();
349 for _ in 0..NUM_FOO_REQUESTS {
350 proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
351 }
352
353 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
355
356 TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
358 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
359
360 for proxy in proxies.iter() {
362 select! {
363 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
364 _ = fs => unreachable!(),
365 };
366 }
367
368 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
370 drop(proxies);
371 fs.await;
372
373 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
375 assert!(mock_server.stalled.load(Ordering::SeqCst));
376 mock_server.assert_fs_gave_back_server_end(client_end);
377 }
378
379 #[fuchsia::test(allow_stalls = false)]
380 async fn no_request() {
381 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
382 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
383 let (initial, mock_server, fs) = setup_test(server_end).await;
384 pin_mut!(fs);
385
386 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
387 TestExecutor::advance_to(initial + IDLE_DURATION).await;
388 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
389
390 assert_eq!(mock_server.call_count.get(), 0);
391 assert!(mock_server.stalled.load(Ordering::SeqCst));
392 mock_server.assert_fs_gave_back_server_end(client_end);
393 }
394
395 #[fuchsia::test(allow_stalls = false)]
396 async fn outgoing_dir_client_closed() {
397 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
398 let (_initial, mock_server, fs) = setup_test(server_end).await;
399 pin_mut!(fs);
400
401 drop(client_end);
402 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
403
404 assert_eq!(mock_server.call_count.get(), 0);
405 assert!(!mock_server.stalled.load(Ordering::SeqCst));
406 assert!(mock_server.server_end.lock().is_none());
407 }
408
409 #[fuchsia::test(allow_stalls = false)]
410 async fn request_then_stalled() {
411 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
412
413 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
414 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
415
416 let foo = proxy.foo().fuse();
417 pin_mut!(foo);
418 assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
419
420 let (initial, mock_server, fs) = setup_test(server_end).await;
421 pin_mut!(fs);
422
423 assert_eq!(mock_server.call_count.get(), 0);
425 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
426 assert_eq!(mock_server.call_count.get(), 1);
427 assert_matches!(foo.await, Ok(_));
428
429 drop(proxy);
430 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
431 TestExecutor::advance_to(initial + IDLE_DURATION).await;
432 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
433
434 assert_eq!(mock_server.call_count.get(), 1);
435 assert!(mock_server.stalled.load(Ordering::SeqCst));
436 mock_server.assert_fs_gave_back_server_end(client_end);
437 }
438
439 #[fuchsia::test(allow_stalls = false)]
440 async fn stalled_then_request() {
441 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
442 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
443 let (initial, mock_server, fs) = setup_test(server_end).await;
444 pin_mut!(fs);
445
446 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
447 TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
448 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
449
450 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
451 select! {
452 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
453 _ = fs => unreachable!(),
454 };
455 assert_eq!(mock_server.call_count.get(), 1);
456
457 drop(proxy);
458 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
459 TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
460 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
461
462 assert!(mock_server.stalled.load(Ordering::SeqCst));
463 mock_server.assert_fs_gave_back_server_end(client_end);
464 }
465
466 #[fuchsia::test(allow_stalls = false)]
472 async fn periodic_requests() {
473 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
474 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
475 let (mut current_time, mock_server, fs) = setup_test(server_end).await;
476 let fs = fasync::Task::local(fs);
477
478 const NUM_FOO_REQUESTS: usize = 10;
480 for _ in 0..NUM_FOO_REQUESTS {
481 let request_interval = IDLE_DURATION / 2;
482 current_time += request_interval;
483 TestExecutor::advance_to(current_time).await;
484 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
485 assert_matches!(proxy.foo().await, Ok(_));
486 }
487 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
488
489 for _ in 0..NUM_FOO_REQUESTS {
491 let request_interval = IDLE_DURATION * 2;
492 current_time += request_interval;
493 TestExecutor::advance_to(current_time).await;
494 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
495 let foo = proxy.foo();
496 pin_mut!(foo);
497 assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
498 }
499 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
500
501 fs.await;
502 mock_server.assert_fs_gave_back_server_end(client_end);
503 }
504
505 #[fuchsia::test(allow_stalls = false)]
509 async fn some_other_outgoing_dir_connection_blocks_stalling() {
510 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
511 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
512 let (initial, mock_server, fs) = setup_test(server_end).await;
513 pin_mut!(fs);
514
515 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
516
517 {
518 let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
520
521 TestExecutor::advance_to(initial + IDLE_DURATION).await;
522 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
523
524 assert_matches!(
525 fuchsia_fs::directory::readdir(&svc).await,
526 Ok(ref entries)
527 if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
528 );
529 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
530
531 TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
533 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
534 }
535
536 fs.await;
538 assert!(mock_server.stalled.load(Ordering::SeqCst));
539 mock_server.assert_fs_gave_back_server_end(client_end);
540 }
541
542 #[fuchsia::test(allow_stalls = false)]
546 async fn end_to_end() {
547 let initial = fasync::MonotonicInstant::from_nanos(0);
548 TestExecutor::advance_to(initial).await;
549
550 let mock_server = MockServer::new();
551 let mock_server_clone = mock_server.clone();
552
553 const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
554 let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
555 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
556
557 let component_task = async move {
558 let mut server_end = Some(server_end);
559 let mut loop_count = 0;
560 loop {
561 let mut fs = ServiceFs::new();
562 fs.serve_connection(server_end.unwrap())
563 .unwrap()
564 .dir("svc")
565 .add_fidl_service(Requests::ServiceA);
566
567 let mock_server_clone = mock_server_clone.clone();
568 fs.until_stalled(idle_duration)
569 .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
570 .await;
571
572 let stalled_server_end = mock_server.server_end.lock().take();
573 let Some(stalled_server_end) = stalled_server_end else {
574 return loop_count;
576 };
577
578 fasync::OnSignals::new(
579 &stalled_server_end,
580 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
581 )
582 .await
583 .unwrap();
584 server_end = Some(stalled_server_end.into());
585 loop_count += 1;
586 }
587 };
588 let component_task = fasync::Task::local(component_task);
589
590 let mut deadline = initial;
593 const NUM_REQUESTS: usize = 30;
594 for delay_factor in 0..NUM_REQUESTS {
595 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
596 proxy.foo().await.unwrap();
597 drop(proxy);
598 deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
599 TestExecutor::advance_to(deadline).await;
600 }
601
602 drop(client_end);
603 let loop_count = component_task.await;
604 assert_eq!(loop_count, 25);
606 assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
607 assert!(mock_server.stalled.load(Ordering::SeqCst));
608 }
609
610 #[fuchsia::test(allow_stalls = false)]
611 async fn canceled_stalled_fut() {
612 let (_client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
621 let mut fs: ServiceFs<crate::ServiceObj<'_, ()>> = ServiceFs::new();
622 fs.serve_connection(server_end).unwrap();
623 let fs = fs.until_stalled(MonotonicDuration::from_nanos(1_000_000));
624
625 fs.connector.scope.shutdown();
626 let mut fs = std::pin::pin!(fs);
627 assert!(fs.next().await.is_none());
628 }
629}