fidl_fuchsia_net_routes_ext/
testutil.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Test Utilities for the fuchsia.net.routes FIDL library.
6//!
7//! This library defines a mix of internal and external test utilities,
8//! supporting tests of this `fidl_fuchsia_net_routes_ext` crate and tests
9//! of clients of the `fuchsia.net.routes` FIDL library, respectively.
10
11use crate::FidlRouteIpExt;
12
13use fidl_fuchsia_net_routes as fnet_routes;
14use futures::{Future, Stream, StreamExt as _};
15use net_types::ip::{GenericOverIp, Ip, Ipv4, Ipv6};
16
17/// Responds to the given `Watch` request with the given batch of events.
18pub fn handle_watch<I: FidlRouteIpExt>(
19    request: <<I::WatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
20    event_batch: Vec<I::WatchEvent>,
21) {
22    #[derive(GenericOverIp)]
23    #[generic_over_ip(I, Ip)]
24    struct HandleInputs<I: FidlRouteIpExt> {
25        request:
26            <<I::WatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
27        event_batch: Vec<I::WatchEvent>,
28    }
29    I::map_ip_in(
30        HandleInputs { request, event_batch },
31        |HandleInputs { request, event_batch }| match request
32            .expect("failed to receive `Watch` request")
33        {
34            fnet_routes::WatcherV4Request::Watch { responder } => {
35                responder.send(&event_batch).expect("failed to respond to `Watch`")
36            }
37        },
38        |HandleInputs { request, event_batch }| match request
39            .expect("failed to receive `Watch` request")
40        {
41            fnet_routes::WatcherV6Request::Watch { responder } => {
42                responder.send(&event_batch).expect("failed to respond to `Watch`")
43            }
44        },
45    );
46}
47
48/// A fake implementation of the `WatcherV4` and `WatcherV6` protocols.
49///
50/// Feeds events received in `events` as responses to `Watch()`.
51pub async fn fake_watcher_impl<I: FidlRouteIpExt>(
52    events: impl Stream<Item = Vec<I::WatchEvent>>,
53    server_end: fidl::endpoints::ServerEnd<I::WatcherMarker>,
54) {
55    let (request_stream, _control_handle) = server_end.into_stream_and_control_handle();
56    request_stream
57        .zip(events)
58        .for_each(|(request, event_batch)| {
59            handle_watch::<I>(request, event_batch);
60            futures::future::ready(())
61        })
62        .await
63}
64
65/// Serve a `GetWatcher` request to the `State` protocol by instantiating a
66/// watcher client backed by the given event stream. The returned future
67/// drives the watcher implementation.
68pub async fn serve_state_request<'a, I: FidlRouteIpExt>(
69    request: <<I::StateMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
70    event_stream: impl Stream<Item = Vec<I::WatchEvent>> + 'a,
71) {
72    #[derive(GenericOverIp)]
73    #[generic_over_ip(I, Ip)]
74    struct GetWatcherInputs<'a, I: FidlRouteIpExt> {
75        request:
76            <<I::StateMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
77        // Use `Box<dyn>` here because `event_stream` needs to have a know size.
78        event_stream: Box<dyn Stream<Item = Vec<I::WatchEvent>> + 'a>,
79    }
80    I::map_ip_in::<
81        _,
82        // Use `Box<dyn>` here because `event_stream` needs to have a know size.
83        // `Pin` ensures that `watcher_fut` implements `Future`.
84        std::pin::Pin<Box<dyn Future<Output = ()> + 'a>>,
85    >(
86        GetWatcherInputs { request, event_stream: Box::new(event_stream) },
87        |GetWatcherInputs { request, event_stream }| match request
88            .expect("failed to receive `GetWatcherV4` request")
89        {
90            fnet_routes::StateV4Request::GetWatcherV4 {
91                options: _,
92                watcher,
93                control_handle: _,
94            } => Box::pin(fake_watcher_impl::<Ipv4>(Box::into_pin(event_stream), watcher)),
95            fnet_routes::StateV4Request::GetRuleWatcherV4 {
96                options: _,
97                watcher: _,
98                control_handle: _,
99            } => todo!("TODO(https://fxbug.dev/336204757): Implement rules watcher"),
100        },
101        |GetWatcherInputs { request, event_stream }| match request
102            .expect("failed to receive `GetWatcherV6` request")
103        {
104            fnet_routes::StateV6Request::GetWatcherV6 {
105                options: _,
106                watcher,
107                control_handle: _,
108            } => Box::pin(fake_watcher_impl::<Ipv6>(Box::into_pin(event_stream), watcher)),
109            fnet_routes::StateV6Request::GetRuleWatcherV6 {
110                options: _,
111                watcher: _,
112                control_handle: _,
113            } => todo!("TODO(https://fxbug.dev/336204757): Implement rules watcher"),
114        },
115    )
116    .await
117}
118
119/// Provides a stream of watcher events such that the stack appears to contain
120/// no routes and never installs any.
121pub fn empty_watch_event_stream<'a, I: FidlRouteIpExt>(
122) -> impl Stream<Item = Vec<I::WatchEvent>> + 'a {
123    #[derive(GenericOverIp)]
124    #[generic_over_ip(I, Ip)]
125    struct Wrap<I: FidlRouteIpExt>(I::WatchEvent);
126
127    let Wrap(event) = I::map_ip(
128        (),
129        |()| Wrap(fnet_routes::EventV4::Idle(fnet_routes::Empty)),
130        |()| Wrap(fnet_routes::EventV6::Idle(fnet_routes::Empty)),
131    );
132    futures::stream::once(futures::future::ready(vec![event])).chain(futures::stream::pending())
133}
134
135/// Provides testutils for testing implementations of clients and servers of
136/// fuchsia.net.routes.admin.
137pub mod admin {
138    use fidl::endpoints::ProtocolMarker;
139    use fidl_fuchsia_net_routes_admin as fnet_routes_admin;
140    use futures::{Stream, StreamExt as _, TryStreamExt as _};
141    use net_types::ip::{GenericOverIp, Ip, Ipv4, Ipv6};
142
143    use crate::admin::{FidlRouteAdminIpExt, RouteSetRequest, RouteTableRequest};
144    use crate::Responder;
145
146    /// Provides a RouteTable implementation that provides one RouteSet and
147    /// then panics on subsequent invocations. Returns the request stream for
148    /// that RouteSet.
149    pub fn serve_one_route_set<I: FidlRouteAdminIpExt>(
150        server_end: fidl::endpoints::ServerEnd<I::RouteTableMarker>,
151    ) -> impl Stream<
152            Item = <
153                    <<I as FidlRouteAdminIpExt>::RouteSetMarker as ProtocolMarker>
154                        ::RequestStream as Stream
155                >::Item
156    >{
157        #[derive(GenericOverIp)]
158        #[generic_over_ip(I, Ip)]
159        struct In<I: FidlRouteAdminIpExt>(
160            <<<I as FidlRouteAdminIpExt>::RouteTableMarker as ProtocolMarker>
161                ::RequestStream as Stream
162            >::Item,
163        );
164        #[derive(GenericOverIp)]
165        #[generic_over_ip(I, Ip)]
166        struct Out<I: FidlRouteAdminIpExt>(fidl::endpoints::ServerEnd<I::RouteSetMarker>);
167
168        let stream = server_end.into_stream();
169        stream
170            .scan(false, |responded, item| {
171                let responded = std::mem::replace(responded, true);
172                if responded {
173                    panic!("received multiple RouteTable requests");
174                }
175
176                futures::future::ready(Some(item))
177            })
178            .map(|item| {
179                let Out(route_set_server_end) = I::map_ip(
180                    In(item),
181                    |In(item)| match item.expect("set provider FIDL error") {
182                        fnet_routes_admin::RouteTableV4Request::NewRouteSet {
183                            route_set,
184                            control_handle: _,
185                        } => Out(route_set),
186                        req => unreachable!("unexpected request: {:?}", req),
187                    },
188                    |In(item)| match item.expect("set provider FIDL error") {
189                        fnet_routes_admin::RouteTableV6Request::NewRouteSet {
190                            route_set,
191                            control_handle: _,
192                        } => Out(route_set),
193                        req => unreachable!("unexpected request: {:?}", req),
194                    },
195                );
196                route_set_server_end.into_stream()
197            })
198            .fuse()
199            .flatten_unordered(None)
200    }
201
202    /// TODO(https://fxbug.dev/337298251): Change this to return a RouteSet
203    /// index beside the RouteSet item to make it easier to determine which
204    /// RouteSet pertains to the transmitted requests.
205    ///
206    /// Provides a RouteTable implementation that consolidates all RouteSets into
207    /// a single RouteSet stream. Returns a request stream that vends items as
208    /// they arrive in any RouteSet.
209    pub fn serve_all_route_sets_with_table_id<I: FidlRouteAdminIpExt>(
210        server_end: fidl::endpoints::ServerEnd<I::RouteTableMarker>,
211        table_id: Option<crate::TableId>,
212    ) -> impl Stream<
213            Item = <
214                    <<I as FidlRouteAdminIpExt>::RouteSetMarker as ProtocolMarker>
215                        ::RequestStream as Stream
216                >::Item
217    >{
218        #[derive(GenericOverIp)]
219        #[generic_over_ip(I, Ip)]
220        struct In<I: FidlRouteAdminIpExt>(
221            <<<I as FidlRouteAdminIpExt>::RouteTableMarker as ProtocolMarker>
222                ::RequestStream as Stream
223            >::Item,
224        );
225        #[derive(GenericOverIp)]
226        #[generic_over_ip(I, Ip)]
227        struct Out<I: FidlRouteAdminIpExt>(Option<fidl::endpoints::ServerEnd<I::RouteSetMarker>>);
228
229        let stream = server_end.into_stream();
230        stream
231            .map(move |item| {
232                let Out(route_set_server_end) = I::map_ip(
233                    In(item),
234                    |In(item)| match item.expect("set provider FIDL error") {
235                        fnet_routes_admin::RouteTableV4Request::NewRouteSet {
236                            route_set,
237                            control_handle: _,
238                        } => Out(Some(route_set)),
239                        fnet_routes_admin::RouteTableV4Request::GetTableId { responder } => {
240                            responder
241                                .send(
242                                    table_id
243                                        .unwrap_or_else(|| panic!("GetTableId not supported"))
244                                        .get(),
245                                )
246                                .expect("error responding to GetTableId");
247                            Out(None)
248                        }
249                        req => unreachable!("unexpected request: {:?}", req),
250                    },
251                    |In(item)| match item.expect("set provider FIDL error") {
252                        fnet_routes_admin::RouteTableV6Request::NewRouteSet {
253                            route_set,
254                            control_handle: _,
255                        } => Out(Some(route_set)),
256                        fnet_routes_admin::RouteTableV6Request::GetTableId { responder } => {
257                            responder
258                                .send(
259                                    table_id
260                                        .unwrap_or_else(|| panic!("GetTableId not supported"))
261                                        .get(),
262                                )
263                                .expect("error responding to GetTableId");
264                            Out(None)
265                        }
266                        req => unreachable!("unexpected request: {:?}", req),
267                    },
268                );
269                match route_set_server_end {
270                    None => futures::stream::empty().left_stream(),
271                    Some(route_set_server_end) => route_set_server_end.into_stream().right_stream(),
272                }
273            })
274            .fuse()
275            .flatten_unordered(None)
276    }
277
278    /// Provides a RouteTable implementation that serves no-op RouteSets and identifies itself
279    /// with the given ID.
280    pub async fn serve_noop_route_sets_with_table_id<I: FidlRouteAdminIpExt>(
281        server_end: fidl::endpoints::ServerEnd<I::RouteTableMarker>,
282        table_id: crate::TableId,
283    ) {
284        let stream = server_end.into_stream();
285        stream
286            .try_for_each_concurrent(None, |item| async move {
287                let request = I::into_route_table_request(item);
288                match request {
289                    RouteTableRequest::NewRouteSet { route_set, control_handle: _ } => {
290                        serve_noop_route_set::<I>(route_set).await
291                    }
292                    RouteTableRequest::GetTableId { responder } => {
293                        responder.send(table_id.get()).expect("responding should succeed");
294                    }
295                    request => panic!("unexpected request: {request:?}"),
296                }
297                Ok(())
298            })
299            .await
300            .expect("serving no-op route sets should succeed");
301    }
302
303    /// Provides a RouteTable implementation that serves no-op RouteSets.
304    pub async fn serve_noop_route_sets<I: FidlRouteAdminIpExt>(
305        server_end: fidl::endpoints::ServerEnd<I::RouteTableMarker>,
306    ) {
307        serve_noop_route_sets_with_table_id::<I>(server_end, crate::TableId::new(0)).await
308    }
309
310    /// Serves a RouteSet that returns OK for everything and does nothing.
311    async fn serve_noop_route_set<I: FidlRouteAdminIpExt>(
312        server_end: fidl::endpoints::ServerEnd<I::RouteSetMarker>,
313    ) {
314        #[derive(GenericOverIp)]
315        #[generic_over_ip(I, Ip)]
316        struct Wrap<I: FidlRouteAdminIpExt>(
317            <<<I as FidlRouteAdminIpExt>::RouteSetMarker as ProtocolMarker>
318                ::RequestStream as Stream
319            >::Item,
320        );
321
322        let stream = server_end.into_stream();
323        stream
324            .for_each(|item| async move {
325                let request: RouteSetRequest<I> = I::map_ip(
326                    Wrap(item),
327                    |Wrap(item)| RouteSetRequest::<Ipv4>::from(item.expect("route set FIDL error")),
328                    |Wrap(item)| RouteSetRequest::<Ipv6>::from(item.expect("route set FIDL error")),
329                );
330                match request {
331                    RouteSetRequest::AddRoute { route, responder } => {
332                        let _: crate::Route<I> = route.expect("AddRoute called with invalid route");
333                        responder.send(Ok(true)).expect("respond to AddRoute");
334                    }
335                    RouteSetRequest::RemoveRoute { route, responder } => {
336                        let _: crate::Route<I> =
337                            route.expect("RemoveRoute called with invalid route");
338                        responder.send(Ok(true)).expect("respond to RemoveRoute");
339                    }
340                    RouteSetRequest::AuthenticateForInterface { credential: _, responder } => {
341                        responder.send(Ok(())).expect("respond to AuthenticateForInterface");
342                    }
343                }
344            })
345            .await;
346    }
347}
348
349/// Provides testutils for testing implementations of clients and servers for routes rules FIDL.
350pub mod rules {
351    use fidl_fuchsia_net_routes as fnet_routes;
352    use futures::{Stream, StreamExt as _, TryStreamExt as _};
353    use net_types::ip::{GenericOverIp, Ip};
354
355    use crate::rules::{FidlRuleAdminIpExt, FidlRuleIpExt, RuleSetRequest, RuleTableRequest};
356    use crate::Responder;
357
358    // Responds to the given `Watch` request with the given batch of events.
359    fn handle_watch<I: FidlRuleIpExt>(
360        request: <<I::RuleWatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
361        event_batch: Vec<I::RuleEvent>,
362    ) {
363        #[derive(GenericOverIp)]
364        #[generic_over_ip(I, Ip)]
365        struct HandleInputs<I: FidlRuleIpExt> {
366            request:
367                <<I::RuleWatcherMarker as fidl::endpoints::ProtocolMarker>::RequestStream as Stream>::Item,
368            event_batch: Vec<I::RuleEvent>,
369        }
370        I::map_ip_in(
371            HandleInputs { request, event_batch },
372            |HandleInputs { request, event_batch }| match request
373                .expect("failed to receive `Watch` request")
374            {
375                fnet_routes::RuleWatcherV4Request::Watch { responder } => {
376                    responder.send(&event_batch).expect("failed to respond to `Watch`")
377                }
378            },
379            |HandleInputs { request, event_batch }| match request
380                .expect("failed to receive `Watch` request")
381            {
382                fnet_routes::RuleWatcherV6Request::Watch { responder } => {
383                    responder.send(&event_batch).expect("failed to respond to `Watch`")
384                }
385            },
386        );
387    }
388
389    /// A fake implementation of the `WatcherV4` and `WatcherV6` protocols.
390    ///
391    /// Feeds events received in `events` as responses to `Watch()`.
392    pub async fn fake_rules_watcher_impl<I: FidlRuleIpExt>(
393        events: impl Stream<Item = Vec<I::RuleEvent>>,
394        server_end: fidl::endpoints::ServerEnd<I::RuleWatcherMarker>,
395    ) {
396        let (request_stream, _control_handle) = server_end.into_stream_and_control_handle();
397        request_stream
398            .zip(events)
399            .for_each(|(request, event_batch)| {
400                handle_watch::<I>(request, event_batch);
401                futures::future::ready(())
402            })
403            .await
404    }
405
406    /// Provides a stream of watcher events such that the stack appears to contain
407    /// no routes and never installs any.
408    pub fn empty_watch_event_stream<'a, I: FidlRuleIpExt>(
409    ) -> impl Stream<Item = Vec<I::RuleEvent>> + 'a {
410        #[derive(GenericOverIp)]
411        #[generic_over_ip(I, Ip)]
412        struct Wrap<I: FidlRuleIpExt>(I::RuleEvent);
413
414        let Wrap(event) = I::map_ip(
415            (),
416            |()| Wrap(fnet_routes::RuleEventV4::Idle(fnet_routes::Empty)),
417            |()| Wrap(fnet_routes::RuleEventV6::Idle(fnet_routes::Empty)),
418        );
419        futures::stream::once(futures::future::ready(vec![event])).chain(futures::stream::pending())
420    }
421
422    /// Provides a RuleTable implementation that serves no-op RuleSets.
423    pub async fn serve_noop_rule_sets<I: FidlRuleAdminIpExt>(
424        server_end: fidl::endpoints::ServerEnd<I::RuleTableMarker>,
425    ) {
426        let stream = server_end.into_stream();
427        stream
428            .and_then(|item| async move {
429                let RuleTableRequest::NewRuleSet { priority: _, rule_set, control_handle: _ } =
430                    I::into_rule_table_request(item);
431                serve_noop_rule_set::<I>(rule_set).await;
432                Ok(())
433            })
434            .for_each_concurrent(None, |item| {
435                item.expect("should not get error");
436                futures::future::ready(())
437            })
438            .await;
439    }
440
441    /// Serves a RuleSet that returns OK for everything and does nothing.
442    pub async fn serve_noop_rule_set<I: FidlRuleAdminIpExt>(
443        server_end: fidl::endpoints::ServerEnd<I::RuleSetMarker>,
444    ) {
445        let stream = server_end.into_stream();
446        stream
447            .try_for_each(|item| async move {
448                let request = I::into_rule_set_request(item);
449                match request {
450                    RuleSetRequest::AddRule { index: _, matcher, action: _, responder } => {
451                        let _: crate::rules::RuleMatcher<_> =
452                            matcher.expect("AddRule called with invalid matcher");
453                        responder.send(Ok(())).expect("respond to AddRule");
454                    }
455                    RuleSetRequest::RemoveRule { index: _, responder } => {
456                        responder.send(Ok(())).expect("respond to RemoveRule");
457                    }
458                    RuleSetRequest::AuthenticateForRouteTable { table: _, token: _, responder } => {
459                        responder.send(Ok(())).expect("respond to AuthenticateForRouteTable")
460                    }
461                    RuleSetRequest::Close { control_handle: _ } => {}
462                };
463                Ok(())
464            })
465            .await
466            .expect("should not get error");
467    }
468}
469
470#[cfg(test)]
471pub(crate) mod internal {
472    use super::*;
473    use net_declare::{fidl_ip_v4_with_prefix, fidl_ip_v6_with_prefix};
474
475    // Generates an arbitrary `I::WatchEvent` that is unique for the given `seed`.
476    pub(crate) fn generate_event<I: FidlRouteIpExt>(seed: u32) -> I::WatchEvent {
477        #[derive(GenericOverIp)]
478        #[generic_over_ip(I, Ip)]
479        struct BuildEventOutput<I: FidlRouteIpExt>(I::WatchEvent);
480        let BuildEventOutput(event) = I::map_ip_out(
481            seed,
482            |seed| {
483                BuildEventOutput(fnet_routes::EventV4::Added(fnet_routes::InstalledRouteV4 {
484                    route: Some(fnet_routes::RouteV4 {
485                        destination: fidl_ip_v4_with_prefix!("192.168.0.0/24"),
486                        action: fnet_routes::RouteActionV4::Forward(fnet_routes::RouteTargetV4 {
487                            outbound_interface: 1,
488                            next_hop: None,
489                        }),
490                        properties: fnet_routes::RoutePropertiesV4 {
491                            specified_properties: Some(fnet_routes::SpecifiedRouteProperties {
492                                metric: Some(fnet_routes::SpecifiedMetric::ExplicitMetric(seed)),
493                                ..Default::default()
494                            }),
495                            ..Default::default()
496                        },
497                    }),
498                    effective_properties: Some(fnet_routes::EffectiveRouteProperties {
499                        metric: Some(seed),
500                        ..Default::default()
501                    }),
502                    table_id: Some(0),
503                    ..Default::default()
504                }))
505            },
506            |seed| {
507                BuildEventOutput(fnet_routes::EventV6::Added(fnet_routes::InstalledRouteV6 {
508                    route: Some(fnet_routes::RouteV6 {
509                        destination: fidl_ip_v6_with_prefix!("fe80::0/64"),
510                        action: fnet_routes::RouteActionV6::Forward(fnet_routes::RouteTargetV6 {
511                            outbound_interface: 1,
512                            next_hop: None,
513                        }),
514                        properties: fnet_routes::RoutePropertiesV6 {
515                            specified_properties: Some(fnet_routes::SpecifiedRouteProperties {
516                                metric: Some(fnet_routes::SpecifiedMetric::ExplicitMetric(seed)),
517                                ..Default::default()
518                            }),
519                            ..Default::default()
520                        },
521                    }),
522                    effective_properties: Some(fnet_routes::EffectiveRouteProperties {
523                        metric: Some(seed),
524                        ..Default::default()
525                    }),
526                    table_id: Some(0),
527                    ..Default::default()
528                }))
529            },
530        );
531        event
532    }
533
534    // Same as `generate_event()` except that it operates over a range of `seeds`,
535    // producing `n` `I::WatchEvents` where `n` is the size of the range.
536    pub(crate) fn generate_events_in_range<I: FidlRouteIpExt>(
537        seeds: std::ops::Range<u32>,
538    ) -> Vec<I::WatchEvent> {
539        seeds.into_iter().map(|seed| generate_event::<I>(seed)).collect()
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use crate::admin::FidlRouteAdminIpExt;
547    use crate::testutil::internal as internal_testutil;
548    use crate::{get_watcher, watch};
549    use assert_matches::assert_matches;
550    use futures::FutureExt;
551    use ip_test_macro::ip_test;
552    use test_case::test_case;
553    use zx_status;
554
555    // Tests the `fake_watcher_impl` with various "shapes". The test parameter
556    // is a vec of ranges, where each range corresponds to the batch of events
557    // that will be sent in response to a single call to `Watch().
558    #[ip_test(I)]
559    #[test_case(Vec::new(); "no events")]
560    #[test_case(vec![0..1]; "single_batch_single_event")]
561    #[test_case(vec![0..10]; "single_batch_many_events")]
562    #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
563    #[fuchsia_async::run_singlethreaded(test)]
564    async fn fake_watcher_impl_against_shape<I: FidlRouteIpExt>(
565        test_shape: Vec<std::ops::Range<u32>>,
566    ) {
567        // Build the event stream based on the `test_shape`. Use a channel
568        // so that the stream stays open until `close_channel` is called later.
569        let (event_stream_sender, event_stream_receiver) =
570            futures::channel::mpsc::unbounded::<Vec<I::WatchEvent>>();
571        for batch_shape in &test_shape {
572            event_stream_sender
573                .unbounded_send(internal_testutil::generate_events_in_range::<I>(
574                    batch_shape.clone(),
575                ))
576                .expect("failed to send event batch");
577        }
578
579        // Instantiate the fake Watcher implementation.
580        let (state, state_server_end) = fidl::endpoints::create_proxy::<I::StateMarker>();
581        let (mut state_request_stream, _control_handle) =
582            state_server_end.into_stream_and_control_handle();
583        let watcher_fut = state_request_stream
584            .next()
585            .then(|req| {
586                serve_state_request::<I>(
587                    req.expect("State request_stream unexpectedly ended"),
588                    event_stream_receiver,
589                )
590            })
591            .fuse();
592        futures::pin_mut!(watcher_fut);
593
594        // Drive the watcher, asserting it observes the expected data.
595        let watcher = get_watcher::<I>(&state, Default::default()).expect("failed to get watcher");
596        for batch_shape in test_shape {
597            futures::select!(
598                 () = watcher_fut => panic!("fake watcher implementation unexpectedly finished"),
599                events = watch::<I>(&watcher).fuse() => assert_eq!(
600                    events.expect("failed to watch for events"),
601                    internal_testutil::generate_events_in_range::<I>(batch_shape.clone())));
602        }
603
604        // Close the event_stream_sender and observe the watcher_impl finish.
605        event_stream_sender.close_channel();
606        watcher_fut.await;
607
608        // Trying to watch again after we've exhausted the data should
609        // result in `PEER_CLOSED`.
610        assert_matches!(
611            watch::<I>(&watcher).await,
612            Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. })
613        );
614    }
615
616    // `serve_one_route_set` should panic if the caller makes a call
617    // to `new_route_set` more than once.
618    #[ip_test(I)]
619    #[fuchsia_async::run_singlethreaded]
620    #[should_panic(expected = "received multiple RouteTable requests")]
621    async fn test_serve_one_route_set_panic<I: FidlRouteAdminIpExt>() {
622        let (routes_set_provider_proxy, routes_set_provider_server_end) =
623            fidl::endpoints::create_proxy::<I::RouteTableMarker>();
624        let mut provider = admin::serve_one_route_set::<I>(routes_set_provider_server_end);
625        let _rs1 = crate::admin::new_route_set::<I>(&routes_set_provider_proxy)
626            .expect("created first RouteSet");
627        let _rs2 = crate::admin::new_route_set::<I>(&routes_set_provider_proxy)
628            .expect("created second RouteSet");
629
630        // This should panic, as the route set provider pushes `RouteSet` server
631        // ends through by handling the `new_route_set` requests, which will
632        // cause a panic on the second iteration.
633        let _ = provider.next().await;
634    }
635}