Skip to main content

fidl_fuchsia_net_interfaces_ext/
admin.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for fuchsia.net.interfaces.admin.
6
7use flex_fuchsia_net_interfaces as fnet_interfaces;
8use flex_fuchsia_net_interfaces_admin as fnet_interfaces_admin;
9use flex_fuchsia_net_resources as fnet_resources;
10use futures::{Future, FutureExt as _, Stream, StreamExt as _, TryStreamExt as _};
11use thiserror::Error;
12use zx_status as zx;
13
14/// Error type when using a [`fnet_interfaces_admin::AddressStateProviderProxy`].
15#[derive(Error, Debug)]
16pub enum AddressStateProviderError {
17    /// Address removed error.
18    #[error("address removed: {0:?}")]
19    AddressRemoved(fnet_interfaces_admin::AddressRemovalReason),
20    /// FIDL error.
21    #[error("fidl error")]
22    Fidl(#[from] fidl::Error),
23    /// Channel closed.
24    #[error("AddressStateProvider channel closed")]
25    ChannelClosed,
26}
27
28impl From<TerminalError<fnet_interfaces_admin::AddressRemovalReason>>
29    for AddressStateProviderError
30{
31    fn from(e: TerminalError<fnet_interfaces_admin::AddressRemovalReason>) -> Self {
32        match e {
33            TerminalError::Fidl(e) => AddressStateProviderError::Fidl(e),
34            TerminalError::Terminal(r) => AddressStateProviderError::AddressRemoved(r),
35        }
36    }
37}
38
39/// Waits for the `OnAddressAdded` event to be received on the event stream.
40///
41/// Returns an error if an address removed event is received instead.
42pub async fn wait_for_address_added_event(
43    event_stream: &mut fnet_interfaces_admin::AddressStateProviderEventStream,
44) -> Result<(), AddressStateProviderError> {
45    let event = event_stream
46        .next()
47        .await
48        .ok_or(AddressStateProviderError::ChannelClosed)?
49        .map_err(AddressStateProviderError::Fidl)?;
50    match event {
51        fnet_interfaces_admin::AddressStateProviderEvent::OnAddressAdded {} => Ok(()),
52        fnet_interfaces_admin::AddressStateProviderEvent::OnAddressRemoved { error } => {
53            Err(AddressStateProviderError::AddressRemoved(error))
54        }
55    }
56}
57
58// TODO(https://fxbug.dev/42162477): Introduce type with better concurrency safety
59// for hanging gets.
60/// Returns a stream of assignment states obtained by watching on `address_state_provider`.
61///
62/// Note that this function calls the hanging get FIDL method
63/// [`AddressStateProviderProxy::watch_address_assignment_state`] internally,
64/// which means that this stream should not be polled concurrently with any
65/// logic which calls the same hanging get. This also means that callers should
66/// be careful not to drop the returned stream when it has been polled but yet
67/// to yield an item, e.g. due to a timeout or if using select with another
68/// stream, as doing so causes a pending hanging get to get lost, and may cause
69/// future hanging get calls to fail or the channel to be closed.
70pub fn assignment_state_stream(
71    address_state_provider: fnet_interfaces_admin::AddressStateProviderProxy,
72) -> impl Stream<Item = Result<fnet_interfaces::AddressAssignmentState, AddressStateProviderError>>
73{
74    let event_fut = address_state_provider
75        .take_event_stream()
76        .filter_map(|event| {
77            futures::future::ready(match event {
78                Ok(event) => match event {
79                    fnet_interfaces_admin::AddressStateProviderEvent::OnAddressAdded {} => None,
80                    fnet_interfaces_admin::AddressStateProviderEvent::OnAddressRemoved {
81                        error,
82                    } => Some(AddressStateProviderError::AddressRemoved(error)),
83                },
84                Err(e) => Some(AddressStateProviderError::Fidl(e)),
85            })
86        })
87        .into_future()
88        .map(|(event, _stream)| event.unwrap_or(AddressStateProviderError::ChannelClosed));
89    futures::stream::try_unfold(
90        (address_state_provider, event_fut),
91        |(address_state_provider, event_fut)| {
92            // NB: Rely on the fact that select always polls the left future
93            // first to guarantee that if a terminal event was yielded by the
94            // right future, then we don't have an assignment state to emit to
95            // clients.
96            futures::future::select(
97                address_state_provider.watch_address_assignment_state(),
98                event_fut,
99            )
100            .then(|s| match s {
101                futures::future::Either::Left((state_result, event_fut)) => match state_result {
102                    Ok(state) => {
103                        futures::future::ok(Some((state, (address_state_provider, event_fut))))
104                            .left_future()
105                    }
106                    Err(e) if e.is_closed() => event_fut.map(Result::Err).right_future(),
107                    Err(e) => {
108                        futures::future::err(AddressStateProviderError::Fidl(e)).left_future()
109                    }
110                },
111                futures::future::Either::Right((error, _state_fut)) => {
112                    futures::future::err(error).left_future()
113                }
114            })
115        },
116    )
117}
118
119// TODO(https://fxbug.dev/42162477): Introduce type with better concurrency safety
120// for hanging gets.
121/// Wait until the Assigned state is observed on `stream`.
122///
123/// After this async function resolves successfully, the underlying
124/// `AddressStateProvider` may be used as usual. If an error is returned, a
125/// terminal error has occurred on the underlying channel.
126pub async fn wait_assignment_state<S>(
127    stream: S,
128    want: fnet_interfaces::AddressAssignmentState,
129) -> Result<(), AddressStateProviderError>
130where
131    S: Stream<Item = Result<fnet_interfaces::AddressAssignmentState, AddressStateProviderError>>
132        + Unpin,
133{
134    stream
135        .try_filter_map(|state| futures::future::ok((state == want).then_some(())))
136        .try_next()
137        .await
138        .and_then(|opt| opt.ok_or_else(|| AddressStateProviderError::ChannelClosed))
139}
140
141type ControlEventStreamFutureToReason =
142    fn(
143        (
144            Option<Result<fnet_interfaces_admin::ControlEvent, fidl::Error>>,
145            fnet_interfaces_admin::ControlEventStream,
146        ),
147    ) -> Result<Option<fnet_interfaces_admin::InterfaceRemovedReason>, fidl::Error>;
148
149/// Convert [`fnet_resources::GrantForInterfaceAuthorization`] to
150/// [`fnet_resources::ProofOfInterfaceAuthorization`] with fewer
151/// permissions.
152///
153/// # Panics
154///
155/// Panics when the Event handle does not have the DUPLICATE right. Callers
156/// need not worry about this if providing a grant received from
157/// [`GetAuthorizationForInterface`].
158#[cfg(not(feature = "fdomain"))]
159pub fn proof_from_grant(
160    grant: &fnet_resources::GrantForInterfaceAuthorization,
161) -> fnet_resources::ProofOfInterfaceAuthorization {
162    let fnet_resources::GrantForInterfaceAuthorization { interface_id, token } = grant;
163
164    // The handle duplication is expected to succeed since the input
165    // `GrantFromInterfaceAuthorization` is retrieved directly from FIDL and has
166    // `zx::Rights::DUPLICATE`. Failure may occur if memory is limited, but this
167    // problem cannot be easily resolved via userspace.
168    fnet_resources::ProofOfInterfaceAuthorization {
169        interface_id: *interface_id,
170        token: token.duplicate_handle(fidl::Rights::TRANSFER).unwrap(),
171    }
172}
173
174/// A wrapper for fuchsia.net.interfaces.admin/Control that observes terminal
175/// events.
176#[derive(Clone)]
177pub struct Control {
178    proxy: fnet_interfaces_admin::ControlProxy,
179    // Keeps a shared future that will resolve when the first event is seen on a
180    // ControlEventStream. The shared future makes the observed terminal event
181    // "sticky" for as long as we clone the future before polling it. Note that
182    // we don't drive the event stream to completion, the future is resolved
183    // when the first event is seen. That means this relies on the terminal
184    // event contract but does *not* enforce that the channel is closed
185    // immediately after or that no other events are issued.
186    terminal_event_fut: futures::future::Shared<
187        futures::future::Map<
188            futures::stream::StreamFuture<fnet_interfaces_admin::ControlEventStream>,
189            ControlEventStreamFutureToReason,
190        >,
191    >,
192}
193
194/// Waits for response on query result and terminal event. If the query has a
195/// result, returns that. Otherwise, returns the terminal event.
196async fn or_terminal_event<QR, QF, TR>(
197    query_fut: QF,
198    terminal_event_fut: TR,
199) -> Result<QR, TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>>
200where
201    QR: Unpin,
202    QF: Unpin + Future<Output = Result<QR, fidl::Error>>,
203    TR: Unpin
204        + Future<Output = Result<Option<fnet_interfaces_admin::InterfaceRemovedReason>, fidl::Error>>,
205{
206    match futures::future::select(query_fut, terminal_event_fut).await {
207        futures::future::Either::Left((query_result, terminal_event_fut)) => match query_result {
208            Ok(ok) => Ok(ok),
209            Err(e) if e.is_closed() => match terminal_event_fut.await {
210                Ok(Some(reason)) => Err(TerminalError::Terminal(reason)),
211                Ok(None) | Err(_) => Err(TerminalError::Fidl(e)),
212            },
213            Err(e) => Err(TerminalError::Fidl(e)),
214        },
215        futures::future::Either::Right((event, query_fut)) => {
216            // We need to poll the query response future one more time,
217            // because of the following scenario:
218            //
219            // 1. select() polls the query response future, which returns
220            //    pending.
221            // 2. The server sends the query response and terminal event in
222            //    that order.
223            // 3. The FIDL client library dequeues both of these and wakes
224            //    the respective futures.
225            // 4. select() polls the terminal event future, which is now
226            //    ready.
227            //
228            // In that case, both futures will be ready, so we can use
229            // now_or_never() to check whether the query result future has a
230            // result, since we always want to process that result first.
231            if let Some(query_result) = query_fut.now_or_never() {
232                match query_result {
233                    Ok(ok) => Ok(ok),
234                    Err(e) if e.is_closed() => match event {
235                        Ok(Some(reason)) => Err(TerminalError::Terminal(reason)),
236                        Ok(None) | Err(_) => Err(TerminalError::Fidl(e)),
237                    },
238                    Err(e) => Err(TerminalError::Fidl(e)),
239                }
240            } else {
241                match event.map_err(|e| TerminalError::Fidl(e))? {
242                    Some(removal_reason) => Err(TerminalError::Terminal(removal_reason)),
243                    None => Err(TerminalError::Fidl(fidl::Error::ClientChannelClosed {
244                        status: zx::Status::PEER_CLOSED,
245                        protocol_name: <fnet_interfaces_admin::ControlMarker as flex_client::fidl::ProtocolMarker>::DEBUG_NAME,
246                        #[cfg(not(target_os = "fuchsia"))]
247                        reason: None,
248                        epitaph: None,
249                    })),
250                }
251            }
252        }
253    }
254}
255
256impl Control {
257    /// Calls `AddAddress` on the proxy.
258    pub fn add_address(
259        &self,
260        address: &flex_fuchsia_net::Subnet,
261        parameters: &fnet_interfaces_admin::AddressParameters,
262        address_state_provider: flex_client::fidl::ServerEnd<
263            fnet_interfaces_admin::AddressStateProviderMarker,
264        >,
265    ) -> Result<(), TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>> {
266        self.or_terminal_event_no_return(self.proxy.add_address(
267            address,
268            parameters,
269            address_state_provider,
270        ))
271    }
272
273    /// Calls `GetId` on the proxy.
274    pub async fn get_id(
275        &self,
276    ) -> Result<u64, TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>> {
277        self.or_terminal_event(self.proxy.get_id()).await
278    }
279
280    /// Calls `RemoveAddress` on the proxy.
281    pub async fn remove_address(
282        &self,
283        address: &flex_fuchsia_net::Subnet,
284    ) -> Result<
285        fnet_interfaces_admin::ControlRemoveAddressResult,
286        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
287    > {
288        self.or_terminal_event(self.proxy.remove_address(address)).await
289    }
290
291    /// Calls `SetConfiguration` on the proxy.
292    pub async fn set_configuration(
293        &self,
294        config: &fnet_interfaces_admin::Configuration,
295    ) -> Result<
296        fnet_interfaces_admin::ControlSetConfigurationResult,
297        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
298    > {
299        self.or_terminal_event(self.proxy.set_configuration(config)).await
300    }
301
302    /// Calls `GetConfiguration` on the proxy.
303    pub async fn get_configuration(
304        &self,
305    ) -> Result<
306        fnet_interfaces_admin::ControlGetConfigurationResult,
307        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
308    > {
309        self.or_terminal_event(self.proxy.get_configuration()).await
310    }
311
312    /// Calls `GetAuthorizationForInterface` on the proxy.
313    pub async fn get_authorization_for_interface(
314        &self,
315    ) -> Result<
316        fnet_resources::GrantForInterfaceAuthorization,
317        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
318    > {
319        self.or_terminal_event(self.proxy.get_authorization_for_interface()).await
320    }
321
322    /// Calls `Enable` on the proxy.
323    pub async fn enable(
324        &self,
325    ) -> Result<
326        fnet_interfaces_admin::ControlEnableResult,
327        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
328    > {
329        self.or_terminal_event(self.proxy.enable()).await
330    }
331
332    /// Calls `Remove` on the proxy.
333    pub async fn remove(
334        &self,
335    ) -> Result<
336        fnet_interfaces_admin::ControlRemoveResult,
337        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
338    > {
339        self.or_terminal_event(self.proxy.remove()).await
340    }
341
342    /// Calls `Disable` on the proxy.
343    pub async fn disable(
344        &self,
345    ) -> Result<
346        fnet_interfaces_admin::ControlDisableResult,
347        TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>,
348    > {
349        self.or_terminal_event(self.proxy.disable()).await
350    }
351
352    /// Calls `Detach` on the proxy.
353    pub fn detach(
354        &self,
355    ) -> Result<(), TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>> {
356        self.or_terminal_event_no_return(self.proxy.detach())
357    }
358
359    /// Creates a new `Control` wrapper from `proxy`.
360    pub fn new(proxy: fnet_interfaces_admin::ControlProxy) -> Self {
361        let terminal_event_fut = proxy
362            .take_event_stream()
363            .into_future()
364            .map::<_, ControlEventStreamFutureToReason>(|(event, _stream)| {
365                event
366                    .map(|r| {
367                        r.map(
368                            |fnet_interfaces_admin::ControlEvent::OnInterfaceRemoved { reason }| {
369                                reason
370                            },
371                        )
372                    })
373                    .transpose()
374            })
375            .shared();
376        Self { proxy, terminal_event_fut }
377    }
378
379    /// Waits for interface removal.
380    pub async fn wait_termination(
381        self,
382    ) -> TerminalError<fnet_interfaces_admin::InterfaceRemovedReason> {
383        let Self { proxy: _, terminal_event_fut } = self;
384        match terminal_event_fut.await {
385            Ok(Some(event)) => TerminalError::Terminal(event),
386            Ok(None) => TerminalError::Fidl(fidl::Error::ClientChannelClosed {
387                status: zx::Status::PEER_CLOSED,
388                protocol_name: <fnet_interfaces_admin::ControlMarker as flex_client::fidl::ProtocolMarker>::DEBUG_NAME,
389                #[cfg(not(target_os = "fuchsia"))]
390                reason: None,
391                epitaph: None,
392            }),
393            Err(e) => TerminalError::Fidl(e),
394        }
395    }
396
397    /// Creates a new `Control` and its `ServerEnd`.
398    #[cfg(not(feature = "fdomain"))]
399    pub fn create_endpoints()
400    -> Result<(Self, fidl::endpoints::ServerEnd<fnet_interfaces_admin::ControlMarker>), fidl::Error>
401    {
402        let (proxy, server_end) = fidl::endpoints::create_proxy();
403        Ok((Self::new(proxy), server_end))
404    }
405
406    async fn or_terminal_event<R: Unpin, F: Unpin + Future<Output = Result<R, fidl::Error>>>(
407        &self,
408        fut: F,
409    ) -> Result<R, TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>> {
410        or_terminal_event(fut, self.terminal_event_fut.clone()).await
411    }
412
413    fn or_terminal_event_no_return(
414        &self,
415        r: Result<(), fidl::Error>,
416    ) -> Result<(), TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>> {
417        r.map_err(|err| {
418            if !err.is_closed() {
419                return TerminalError::Fidl(err);
420            }
421            // TODO(https://fxbug.dev/42178907): The terminal event may have been
422            // sent by the server but the future may not resolve immediately,
423            // resulting in the terminal event being missed and a FIDL error
424            // being returned to the user.
425            //
426            // Poll event stream to see if we have a terminal event to return
427            // instead of a FIDL closed error.
428            match self.terminal_event_fut.clone().now_or_never() {
429                Some(Ok(Some(terminal_event))) => TerminalError::Terminal(terminal_event),
430                Some(Err(e)) => {
431                    // Prefer the error observed by the proxy.
432                    let _: fidl::Error = e;
433                    TerminalError::Fidl(err)
434                }
435                None | Some(Ok(None)) => TerminalError::Fidl(err),
436            }
437        })
438    }
439}
440
441impl std::fmt::Debug for Control {
442    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443        let Self { proxy, terminal_event_fut: _ } = self;
444        fmt.debug_struct("Control").field("proxy", proxy).finish()
445    }
446}
447
448/// Errors observed from wrapped terminal events.
449#[derive(Debug)]
450pub enum TerminalError<E> {
451    /// Terminal event was observed.
452    Terminal(E),
453    /// A FIDL error occurred.
454    Fidl(fidl::Error),
455}
456
457impl<E> std::fmt::Display for TerminalError<E>
458where
459    E: std::fmt::Debug,
460{
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        match self {
463            TerminalError::Terminal(e) => write!(f, "terminal event: {:?}", e),
464            TerminalError::Fidl(e) => write!(f, "fidl error: {}", e),
465        }
466    }
467}
468
469impl<E: std::fmt::Debug> std::error::Error for TerminalError<E> {}
470
471/// This can be provided on interface creation to appoint a route table into
472/// which netstack managed routes are installed.
473#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
474pub enum NetstackManagedRoutesDesignation {
475    /// The netstack managed routes are installed in the main table.
476    #[default]
477    Main,
478    /// The netstack managed routes are installed in an interface-local table.
479    ///
480    /// The interface creates local tables (one for each IP version). When the
481    /// interface is removed and all the outstanding `RouteTableV{4,6}` protocol
482    /// channels are closed, the local table is removed.
483    InterfaceLocal,
484}
485
486/// Unknown FIDL value for NetstackManagedRoutesDesignation.
487#[derive(Error, Debug)]
488#[error("unknown designation for netsack managed routes: {0}")]
489pub struct UnknownNetstackManagedRoutesDesignation(pub u64);
490
491impl TryFrom<fnet_interfaces_admin::NetstackManagedRoutesDesignation>
492    for NetstackManagedRoutesDesignation
493{
494    type Error = UnknownNetstackManagedRoutesDesignation;
495
496    fn try_from(
497        value: fnet_interfaces_admin::NetstackManagedRoutesDesignation,
498    ) -> Result<Self, Self::Error> {
499        match value {
500            fnet_interfaces_admin::NetstackManagedRoutesDesignation::Main(
501                fnet_interfaces_admin::Empty,
502            ) => Ok(Self::Main),
503            fnet_interfaces_admin::NetstackManagedRoutesDesignation::InterfaceLocal(
504                fnet_interfaces_admin::Empty,
505            ) => Ok(Self::InterfaceLocal),
506            fnet_interfaces_admin::NetstackManagedRoutesDesignation::__SourceBreaking {
507                unknown_ordinal,
508            } => Err(UnknownNetstackManagedRoutesDesignation(unknown_ordinal)),
509        }
510    }
511}
512
513impl From<NetstackManagedRoutesDesignation>
514    for fnet_interfaces_admin::NetstackManagedRoutesDesignation
515{
516    fn from(value: NetstackManagedRoutesDesignation) -> Self {
517        match value {
518            NetstackManagedRoutesDesignation::Main => Self::Main(fnet_interfaces_admin::Empty),
519            NetstackManagedRoutesDesignation::InterfaceLocal => {
520                Self::InterfaceLocal(fnet_interfaces_admin::Empty)
521            }
522        }
523    }
524}
525
526#[cfg(test)]
527mod test {
528    use std::task::Poll;
529
530    use flex_client::fidl::{ProtocolMarker, RequestStream};
531    use flex_fuchsia_net_interfaces as fnet_interfaces;
532    use flex_fuchsia_net_interfaces_admin as fnet_interfaces_admin;
533    use fnet_interfaces_admin::InterfaceRemovedReason;
534    use test_case::test_case;
535    use zx_status as zx;
536
537    use super::*;
538
539    // Test that the terminal event is observed when the server closes its end.
540    #[fuchsia_async::run_singlethreaded(test)]
541    async fn test_assignment_state_stream() {
542        let client = flex_local::local_client_empty();
543        let (address_state_provider, server_end) =
544            client.create_proxy::<fnet_interfaces_admin::AddressStateProviderMarker>();
545        let state_stream = assignment_state_stream(address_state_provider);
546        futures::pin_mut!(state_stream);
547
548        const REMOVAL_REASON_INVALID: fnet_interfaces_admin::AddressRemovalReason =
549            fnet_interfaces_admin::AddressRemovalReason::Invalid;
550        {
551            let (mut request_stream, control_handle) = server_end.into_stream_and_control_handle();
552
553            const ASSIGNMENT_STATE_ASSIGNED: fnet_interfaces::AddressAssignmentState =
554                fnet_interfaces::AddressAssignmentState::Assigned;
555            let state_fut = state_stream.try_next().map(|r| {
556                assert_eq!(
557                    r.expect("state stream error").expect("state stream ended"),
558                    ASSIGNMENT_STATE_ASSIGNED
559                )
560            });
561            let handle_fut = request_stream.try_next().map(|r| match r.expect("request stream error").expect("request stream ended") {
562                fnet_interfaces_admin::AddressStateProviderRequest::WatchAddressAssignmentState { responder } => {
563                    responder.send(ASSIGNMENT_STATE_ASSIGNED).expect("failed to send stubbed assignment state");
564                }
565                req => panic!("unexpected method called: {:?}", req),
566            });
567            let ((), ()) = futures::join!(state_fut, handle_fut);
568
569            control_handle
570                .send_on_address_removed(REMOVAL_REASON_INVALID)
571                .expect("failed to send fake INVALID address removal reason event");
572        }
573
574        assert_matches::assert_matches!(
575            state_stream.try_collect::<Vec<_>>().await,
576            Err(AddressStateProviderError::AddressRemoved(got)) if got == REMOVAL_REASON_INVALID
577        );
578    }
579
580    // Test that only one error is returned on the assignment state stream when
581    // an error observable on both the client proxy and the event stream occurs.
582    #[fuchsia_async::run_singlethreaded(test)]
583    async fn test_assignment_state_stream_single_error() {
584        let client = flex_local::local_client_empty();
585        let (address_state_provider, server_end) =
586            client.create_proxy::<fnet_interfaces_admin::AddressStateProviderMarker>();
587        let state_stream = assignment_state_stream(address_state_provider);
588
589        server_end
590            .close_with_epitaph(fidl::Status::INTERNAL)
591            .expect("failed to send INTERNAL epitaph");
592
593        let states_fut = state_stream.collect::<Vec<_>>();
594
595        #[cfg(not(feature = "fdomain"))]
596        let states = states_fut.now_or_never().expect("state stream not immediately ready");
597
598        // FDomain needs to pump requests through its bowels a bit so the stream
599        // won't be immediately ready.
600        #[cfg(feature = "fdomain")]
601        let states = states_fut.await;
602
603        // Use collect rather than try_collect to ensure that we don't observe
604        // multiple errors on this stream.
605        assert_matches::assert_matches!(
606            states.as_slice(),
607            [Err(AddressStateProviderError::Fidl(fidl::Error::ClientChannelClosed {
608                status: fidl::Status::INTERNAL,
609                #[cfg(not(target_os = "fuchsia"))]
610                reason: None,
611                ..
612            }))]
613        );
614    }
615
616    // Test that if an assignment state and a terminal event is available at
617    // the same time, the state is yielded first.
618    #[fuchsia_async::run_singlethreaded(test)]
619    async fn assignment_state_stream_state_before_event() {
620        let client = flex_local::local_client_empty();
621        let (address_state_provider, mut request_stream) =
622            client.create_proxy_and_stream::<fnet_interfaces_admin::AddressStateProviderMarker>();
623
624        const ASSIGNMENT_STATE_ASSIGNED: fnet_interfaces::AddressAssignmentState =
625            fnet_interfaces::AddressAssignmentState::Assigned;
626        const REMOVAL_REASON_INVALID: fnet_interfaces_admin::AddressRemovalReason =
627            fnet_interfaces_admin::AddressRemovalReason::Invalid;
628
629        let ((), ()) = futures::future::join(
630            async move {
631                request_stream
632                    .try_next()
633                    .await
634                    .expect("request stream error")
635                    .expect("request stream ended")
636                    .into_watch_address_assignment_state()
637                    .expect("unexpected request")
638                    .send(ASSIGNMENT_STATE_ASSIGNED)
639                    .expect("failed to send stubbed assignment state");
640                request_stream
641                    .control_handle()
642                    .send_on_address_removed(REMOVAL_REASON_INVALID)
643                    .expect("failed to send fake INVALID address removal reason event");
644            },
645            async move {
646                let got = assignment_state_stream(address_state_provider).collect::<Vec<_>>().await;
647                assert_matches::assert_matches!(
648                    got.as_slice(),
649                    &[
650                        Ok(got_state),
651                        Err(AddressStateProviderError::AddressRemoved(got_reason)),
652                    ] => {
653                        assert_eq!(got_state, ASSIGNMENT_STATE_ASSIGNED);
654                        assert_eq!(got_reason, REMOVAL_REASON_INVALID);
655                    }
656                );
657            },
658        )
659        .await;
660    }
661
662    // Tests that terminal event is observed when using ControlWrapper.
663    #[fuchsia_async::run_singlethreaded(test)]
664    async fn control_terminal_event() {
665        let client = flex_local::local_client_empty();
666        let (control, mut request_stream) =
667            client.create_proxy_and_stream::<fnet_interfaces_admin::ControlMarker>();
668        let control = super::Control::new(control);
669        const EXPECTED_EVENT: fnet_interfaces_admin::InterfaceRemovedReason =
670            fnet_interfaces_admin::InterfaceRemovedReason::BadPort;
671        const ID: u64 = 15;
672        let ((), ()) = futures::future::join(
673            async move {
674                assert_matches::assert_matches!(control.get_id().await, Ok(ID));
675                assert_matches::assert_matches!(
676                    control.get_id().await,
677                    Err(super::TerminalError::Terminal(got)) if got == EXPECTED_EVENT
678                );
679            },
680            async move {
681                let responder = request_stream
682                    .try_next()
683                    .await
684                    .expect("operating request stream")
685                    .expect("stream ended unexpectedly")
686                    .into_get_id()
687                    .expect("unexpected request");
688                responder.send(ID).expect("failed to send response");
689                request_stream
690                    .control_handle()
691                    .send_on_interface_removed(EXPECTED_EVENT)
692                    .expect("sending terminal event");
693            },
694        )
695        .await;
696    }
697
698    // Tests that terminal error is observed when using ControlWrapper if no
699    // event is issued.
700    #[fuchsia_async::run_singlethreaded(test)]
701    async fn control_missing_terminal_event() {
702        let client = flex_local::local_client_empty();
703        let (control, mut request_stream) =
704            client.create_proxy_and_stream::<fnet_interfaces_admin::ControlMarker>();
705        let control = super::Control::new(control);
706        let ((), ()) = futures::future::join(
707            async move {
708                assert_matches::assert_matches!(
709                    control.get_id().await,
710                    Err(super::TerminalError::Fidl(fidl::Error::ClientChannelClosed {
711                        status: zx::Status::PEER_CLOSED,
712                        protocol_name: flex_fuchsia_net_interfaces_admin::ControlMarker::DEBUG_NAME,
713                        #[cfg(not(target_os = "fuchsia"))]
714                        reason: None,
715                        ..
716                    }))
717                );
718            },
719            async move {
720                match request_stream
721                    .try_next()
722                    .await
723                    .expect("operating request stream")
724                    .expect("stream ended unexpectedly")
725                {
726                    fnet_interfaces_admin::ControlRequest::GetId { responder } => {
727                        // Just close the channel without issuing a response.
728                        std::mem::drop(responder);
729                    }
730                    request => panic!("unexpected request {:?}", request),
731                }
732            },
733        )
734        .await;
735    }
736
737    #[fuchsia_async::run_singlethreaded(test)]
738    async fn control_pipelined_error() {
739        let client = flex_local::local_client_empty();
740        let (control, request_stream) =
741            client.create_proxy_and_stream::<fnet_interfaces_admin::ControlMarker>();
742        let control = super::Control::new(control);
743        const CLOSE_REASON: fnet_interfaces_admin::InterfaceRemovedReason =
744            fnet_interfaces_admin::InterfaceRemovedReason::BadPort;
745        request_stream
746            .control_handle()
747            .send_on_interface_removed(CLOSE_REASON)
748            .expect("send terminal event");
749        std::mem::drop(request_stream);
750        #[cfg(feature = "fdomain")]
751        {
752            let control_clone = control.clone();
753            let _ = control_clone.wait_termination().await;
754        }
755        assert_matches::assert_matches!(control.or_terminal_event_no_return(Ok(())), Ok(()));
756        assert_matches::assert_matches!(
757            control.or_terminal_event_no_return(Err(fidl::Error::ClientWrite(
758                zx::Status::INTERNAL.into()
759            ))),
760            Err(super::TerminalError::Fidl(fidl::Error::ClientWrite(
761                fidl::TransportError::Status(zx::Status::INTERNAL)
762            )))
763        );
764        #[cfg(target_os = "fuchsia")]
765        assert_matches::assert_matches!(
766            control.or_terminal_event_no_return(Err(fidl::Error::ClientChannelClosed {
767                status: zx::Status::PEER_CLOSED,
768                protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
769                epitaph: None,
770            })),
771            Err(super::TerminalError::Terminal(CLOSE_REASON))
772        );
773    }
774
775    #[fuchsia_async::run_singlethreaded(test)]
776    async fn control_wait_termination() {
777        let client = flex_local::local_client_empty();
778        let (control, request_stream) =
779            client.create_proxy_and_stream::<fnet_interfaces_admin::ControlMarker>();
780        let control = super::Control::new(control);
781        const CLOSE_REASON: fnet_interfaces_admin::InterfaceRemovedReason =
782            fnet_interfaces_admin::InterfaceRemovedReason::BadPort;
783        request_stream
784            .control_handle()
785            .send_on_interface_removed(CLOSE_REASON)
786            .expect("send terminal event");
787        std::mem::drop(request_stream);
788        assert_matches::assert_matches!(
789            control.wait_termination().await,
790            super::TerminalError::Terminal(CLOSE_REASON)
791        );
792    }
793
794    #[fuchsia_async::run_singlethreaded(test)]
795    async fn control_respond_and_drop() {
796        const ID: u64 = 15;
797        let client = flex_local::local_client_empty();
798        let (control, mut request_stream) =
799            client.create_proxy_and_stream::<fnet_interfaces_admin::ControlMarker>();
800        let control = super::Control::new(control);
801        let ((), ()) = futures::future::join(
802            async move {
803                assert_matches::assert_matches!(control.get_id().await, Ok(ID));
804            },
805            async move {
806                let responder = request_stream
807                    .try_next()
808                    .await
809                    .expect("operating request stream")
810                    .expect("stream ended unexpectedly")
811                    .into_get_id()
812                    .expect("unexpected request");
813                responder.send(ID).expect("failed to send response");
814            },
815        )
816        .await;
817    }
818
819    // This test is for the case found in https://fxbug.dev/328297563.  The
820    // query result and terminal event futures both become ready after the query
821    // result is polled and returns pending. This test does not handle the case
822    // for when there is no query result.
823    #[test_case(Ok(()), Ok(Some(InterfaceRemovedReason::User)), Ok(()); "success")]
824    #[test_case(
825        Err(fidl::Error::InvalidHeader),
826        Ok(Some(InterfaceRemovedReason::User)),
827        Err(TerminalError::Fidl(fidl::Error::InvalidHeader));
828        "returns query error when not closed"
829    )]
830    #[test_case(
831        Err(fidl::Error::ClientChannelClosed {
832            status: zx::Status::PEER_CLOSED,
833            protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
834            #[cfg(not(target_os = "fuchsia"))]
835            reason: None,
836            epitaph: None,
837        }),
838        Ok(Some(InterfaceRemovedReason::User)),
839        Err(TerminalError::Terminal(InterfaceRemovedReason::User));
840        "returns terminal error when channel closed"
841    )]
842    #[test_case(
843        Err(fidl::Error::ClientChannelClosed {
844            status: zx::Status::PEER_CLOSED,
845            protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
846            #[cfg(not(target_os = "fuchsia"))]
847            reason: None,
848            epitaph: None,
849        }),
850        Ok(None),
851        Err(TerminalError::Fidl(
852            fidl::Error::ClientChannelClosed {
853                status: zx::Status::PEER_CLOSED,
854                protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
855                #[cfg(not(target_os = "fuchsia"))]
856                reason: None,
857                epitaph: None,
858            }
859        ));
860        "returns query error when no terminal error"
861    )]
862    #[test_case(
863        Err(fidl::Error::ClientChannelClosed {
864            status: zx::Status::PEER_CLOSED,
865            protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
866            #[cfg(not(target_os = "fuchsia"))]
867            reason: None,
868            epitaph: None,
869        }),
870        Err(fidl::Error::InvalidHeader),
871        Err(TerminalError::Fidl(
872            fidl::Error::ClientChannelClosed {
873                status: zx::Status::PEER_CLOSED,
874                protocol_name: fnet_interfaces_admin::ControlMarker::DEBUG_NAME,
875                #[cfg(not(target_os = "fuchsia"))]
876                reason: None,
877                epitaph: None,
878            }
879        ));
880        "returns query error when terminal event returns a fidl error"
881    )]
882    #[fuchsia_async::run_singlethreaded(test)]
883    async fn control_polling_race(
884        left_future_result: Result<(), fidl::Error>,
885        right_future_result: Result<
886            Option<fnet_interfaces_admin::InterfaceRemovedReason>,
887            fidl::Error,
888        >,
889        expected: Result<(), TerminalError<fnet_interfaces_admin::InterfaceRemovedReason>>,
890    ) {
891        let mut polled = false;
892        let first_future = std::future::poll_fn(|_cx| {
893            if polled {
894                Poll::Ready(left_future_result.clone())
895            } else {
896                polled = true;
897                Poll::Pending
898            }
899        })
900        .fuse();
901
902        let second_future =
903            std::future::poll_fn(|_cx| Poll::Ready(right_future_result.clone())).fuse();
904
905        let res = or_terminal_event(first_future, second_future).await;
906        match (res, expected) {
907            (Ok(()), Ok(())) => (),
908            (Err(TerminalError::Terminal(res)), Err(TerminalError::Terminal(expected)))
909                if res == expected => {}
910            // fidl::Error doesn't implement Eq, but this lack of an actual
911            // equality check does not matter for this test.
912            (Err(TerminalError::Fidl(_)), Err(TerminalError::Fidl(_))) => (),
913            (res, expected) => panic!("expected {:?} got {:?}", expected, res),
914        }
915    }
916
917    #[cfg(not(feature = "fdomain"))]
918    #[test]
919    fn convert_proof_to_grant() {
920        use assert_matches::assert_matches;
921        // I don't know why we need this unused. The trait is definitely used
922        // and this function is only in the non-FDomain variant so we shouldn't
923        // be building an FDomain version.
924        #[allow(unused)]
925        use fidl::{AsHandleRef, Rights};
926        // The default Event has more Rights than the token within the Grant returned from
927        // [`GetAuthorizationForInterface`], but can still be converted to be used in the
928        // [`ProofOfInterfaceAuthorization`], since only `zx::Rights::DUPLICATE` and
929        // `zx::Rights::TRANSFER` is required.
930        let event = fidl::Event::create();
931        let grant = fnet_resources::GrantForInterfaceAuthorization {
932            interface_id: Default::default(),
933            token: event,
934        };
935
936        let fnet_resources::ProofOfInterfaceAuthorization { interface_id, token } =
937            proof_from_grant(&grant);
938        assert_eq!(interface_id, Default::default());
939        assert_matches!(token.basic_info(), Ok(info) if info.rights == Rights::TRANSFER);
940    }
941}