Skip to main content

fidl_fuchsia_net_filter_ext/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.net.filter FIDL library.
6//!
7//! Note that this library as written is not meant for inclusion in the SDK. It
8//! is only meant to be used in conjunction with a netstack that is compiled
9//! against the same API level of the `fuchsia.net.filter` FIDL library. This
10//! library opts in to compile-time and runtime breakage when the FIDL library
11//! is evolved in order to enforce that it is updated along with the FIDL
12//! library itself.
13
14#[cfg(target_os = "fuchsia")]
15pub mod sync;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::num::NonZeroU16;
20use std::ops::RangeInclusive;
21
22use async_utils::fold::FoldWhile;
23use fidl::marker::SourceBreaking;
24use fidl_fuchsia_ebpf as febpf;
25use fidl_fuchsia_net as fnet;
26use fidl_fuchsia_net_filter as fnet_filter;
27use fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext;
28use fidl_fuchsia_net_matchers_ext as fnet_matchers_ext;
29use fidl_fuchsia_net_root as fnet_root;
30use futures::{Stream, StreamExt as _, TryStreamExt as _};
31use thiserror::Error;
32
33/// Conversion errors from `fnet_filter` FIDL types to the
34/// equivalents defined in this module.
35#[derive(Debug, Error, PartialEq)]
36pub enum FidlConversionError {
37    #[error("union is of an unknown variant: {0}")]
38    UnknownUnionVariant(&'static str),
39    #[error("namespace ID not provided")]
40    MissingNamespaceId,
41    #[error("namespace domain not provided")]
42    MissingNamespaceDomain,
43    #[error("routine ID not provided")]
44    MissingRoutineId,
45    #[error("routine type not provided")]
46    MissingRoutineType,
47    #[error("IP installation hook not provided")]
48    MissingIpInstallationHook,
49    #[error("NAT installation hook not provided")]
50    MissingNatInstallationHook,
51    #[error("interface matcher specified an invalid ID of 0")]
52    ZeroInterfaceId,
53    #[error("invalid address range (start must be <= end)")]
54    InvalidAddressRange,
55    #[error("address range start and end addresses are not the same IP family")]
56    AddressRangeFamilyMismatch,
57    #[error("prefix length of subnet is longer than number of bits in IP address")]
58    SubnetPrefixTooLong,
59    #[error("host bits are set in subnet network")]
60    SubnetHostBitsSet,
61    #[error("invalid port matcher range (start must be <= end)")]
62    InvalidPortMatcherRange,
63    #[error("transparent proxy action specified an invalid local port of 0")]
64    UnspecifiedTransparentProxyPort,
65    #[error("NAT action specified an invalid rewrite port of 0")]
66    UnspecifiedNatPort,
67    #[error("invalid port range (start must be <= end)")]
68    InvalidPortRange,
69    #[error("non-error result variant could not be converted to an error")]
70    NotAnError,
71}
72
73impl From<fnet_matchers_ext::PortError> for FidlConversionError {
74    fn from(value: fnet_matchers_ext::PortError) -> Self {
75        match value {
76            fnet_matchers_ext::PortError::InvalidPortRange => {
77                FidlConversionError::InvalidPortMatcherRange
78            }
79        }
80    }
81}
82
83impl From<fnet_matchers_ext::InterfaceError> for FidlConversionError {
84    fn from(value: fnet_matchers_ext::InterfaceError) -> Self {
85        match value {
86            fnet_matchers_ext::InterfaceError::ZeroId => FidlConversionError::ZeroInterfaceId,
87            fnet_matchers_ext::InterfaceError::UnknownUnionVariant => {
88                FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
89            }
90            fidl_fuchsia_net_matchers_ext::InterfaceError::UnknownPortClass(
91                unknown_port_class_error,
92            ) => match unknown_port_class_error {
93                fnet_interfaces_ext::UnknownPortClassError::NetInterfaces(_) => {
94                    FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS)
95                }
96                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(_) => {
97                    FidlConversionError::UnknownUnionVariant(
98                        type_names::HARDWARE_NETWORK_PORT_CLASS,
99                    )
100                }
101            },
102        }
103    }
104}
105
106impl From<fnet_matchers_ext::AddressError> for FidlConversionError {
107    fn from(value: fnet_matchers_ext::AddressError) -> Self {
108        match value {
109            fnet_matchers_ext::AddressError::AddressMatcherType(address_matcher_type_error) => {
110                address_matcher_type_error.into()
111            }
112        }
113    }
114}
115
116impl From<fnet_matchers_ext::AddressMatcherTypeError> for FidlConversionError {
117    fn from(value: fnet_matchers_ext::AddressMatcherTypeError) -> Self {
118        match value {
119            fnet_matchers_ext::AddressMatcherTypeError::Subnet(subnet_error) => subnet_error.into(),
120            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(address_range_error) => {
121                address_range_error.into()
122            }
123            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant => {
124                FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
125            }
126        }
127    }
128}
129
130impl From<fnet_matchers_ext::AddressRangeError> for FidlConversionError {
131    fn from(value: fnet_matchers_ext::AddressRangeError) -> Self {
132        match value {
133            fnet_matchers_ext::AddressRangeError::Invalid => {
134                FidlConversionError::InvalidAddressRange
135            }
136            fnet_matchers_ext::AddressRangeError::FamilyMismatch => {
137                FidlConversionError::AddressRangeFamilyMismatch
138            }
139        }
140    }
141}
142
143impl From<fnet_matchers_ext::SubnetError> for FidlConversionError {
144    fn from(value: fnet_matchers_ext::SubnetError) -> Self {
145        match value {
146            fnet_matchers_ext::SubnetError::PrefixTooLong => {
147                FidlConversionError::SubnetPrefixTooLong
148            }
149            fnet_matchers_ext::SubnetError::HostBitsSet => FidlConversionError::SubnetHostBitsSet,
150        }
151    }
152}
153
154impl From<fnet_matchers_ext::TransportProtocolError> for FidlConversionError {
155    fn from(value: fnet_matchers_ext::TransportProtocolError) -> Self {
156        match value {
157            fnet_matchers_ext::TransportProtocolError::Port(port_matcher_error) => {
158                port_matcher_error.into()
159            }
160            fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant => {
161                FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
162            }
163        }
164    }
165}
166
167// TODO(https://fxbug.dev/317058051): remove this when the Rust FIDL bindings
168// expose constants for these.
169mod type_names {
170    pub(super) const RESOURCE_ID: &str = "fuchsia.net.filter/ResourceId";
171    pub(super) const DOMAIN: &str = "fuchsia.net.filter/Domain";
172    pub(super) const IP_INSTALLATION_HOOK: &str = "fuchsia.net.filter/IpInstallationHook";
173    pub(super) const NAT_INSTALLATION_HOOK: &str = "fuchsia.net.filter/NatInstallationHook";
174    pub(super) const ROUTINE_TYPE: &str = "fuchsia.net.filter/RoutineType";
175    pub(super) const INTERFACE_MATCHER: &str = "fuchsia.net.matchers/Interface";
176    pub(super) const ADDRESS_MATCHER_TYPE: &str = "fuchsia.net.filter/AddressMatcherType";
177    pub(super) const TRANSPORT_PROTOCOL: &str = "fuchsia.net.matchers/TransportProtocol";
178    pub(super) const ACTION: &str = "fuchsia.net.filter/Action";
179    pub(super) const MARK_ACTION: &str = "fuchsia.net.filter/MarkAction";
180    pub(super) const TRANSPARENT_PROXY: &str = "fuchsia.net.filter/TransparentProxy";
181    pub(super) const RESOURCE: &str = "fuchsia.net.filter/Resource";
182    pub(super) const EVENT: &str = "fuchsia.net.filter/Event";
183    pub(super) const CHANGE: &str = "fuchsia.net.filter/Change";
184    pub(super) const CHANGE_VALIDATION_ERROR: &str = "fuchsia.net.filter/ChangeValidationError";
185    pub(super) const CHANGE_VALIDATION_RESULT: &str = "fuchsia.net.filter/ChangeValidationResult";
186    pub(super) const COMMIT_ERROR: &str = "fuchsia.net.filter/CommitError";
187    pub(super) const COMMIT_RESULT: &str = "fuchsia.net.filter/CommitResult";
188    pub(super) const NET_INTERFACES_PORT_CLASS: &str = "fuchsia.net.interfaces/PortClass";
189    pub(super) const HARDWARE_NETWORK_PORT_CLASS: &str = "fuchsia.hardware.network/PortClass";
190    pub(super) const REJECT_TYPE: &str = "fuchsia.net.filter/RejectType";
191}
192
193/// Extension type for [`fnet_filter::NamespaceId`].
194#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
195pub struct NamespaceId(pub String);
196
197/// Extension type for [`fnet_filter::RoutineId`].
198#[derive(Debug, Clone, PartialEq, Eq, Hash)]
199pub struct RoutineId {
200    pub namespace: NamespaceId,
201    pub name: String,
202}
203
204impl From<fnet_filter::RoutineId> for RoutineId {
205    fn from(id: fnet_filter::RoutineId) -> Self {
206        let fnet_filter::RoutineId { namespace, name } = id;
207        Self { namespace: NamespaceId(namespace), name }
208    }
209}
210
211impl From<RoutineId> for fnet_filter::RoutineId {
212    fn from(id: RoutineId) -> Self {
213        let RoutineId { namespace, name } = id;
214        let NamespaceId(namespace) = namespace;
215        Self { namespace, name }
216    }
217}
218
219/// Extension type for [`fnet_filter::RuleId`].
220#[derive(Debug, Clone, PartialEq, Eq, Hash)]
221pub struct RuleId {
222    pub routine: RoutineId,
223    pub index: u32,
224}
225
226impl From<fnet_filter::RuleId> for RuleId {
227    fn from(id: fnet_filter::RuleId) -> Self {
228        let fnet_filter::RuleId { routine, index } = id;
229        Self { routine: routine.into(), index }
230    }
231}
232
233impl From<RuleId> for fnet_filter::RuleId {
234    fn from(id: RuleId) -> Self {
235        let RuleId { routine, index } = id;
236        Self { routine: routine.into(), index }
237    }
238}
239
240/// Extension type for [`fnet_filter::ResourceId`].
241#[derive(Debug, Clone, PartialEq, Eq, Hash)]
242pub enum ResourceId {
243    Namespace(NamespaceId),
244    Routine(RoutineId),
245    Rule(RuleId),
246}
247
248impl TryFrom<fnet_filter::ResourceId> for ResourceId {
249    type Error = FidlConversionError;
250
251    fn try_from(id: fnet_filter::ResourceId) -> Result<Self, Self::Error> {
252        match id {
253            fnet_filter::ResourceId::Namespace(id) => Ok(Self::Namespace(NamespaceId(id))),
254            fnet_filter::ResourceId::Routine(id) => Ok(Self::Routine(id.into())),
255            fnet_filter::ResourceId::Rule(id) => Ok(Self::Rule(id.into())),
256            fnet_filter::ResourceId::__SourceBreaking { .. } => {
257                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
258            }
259        }
260    }
261}
262
263impl From<ResourceId> for fnet_filter::ResourceId {
264    fn from(id: ResourceId) -> Self {
265        match id {
266            ResourceId::Namespace(NamespaceId(id)) => fnet_filter::ResourceId::Namespace(id),
267            ResourceId::Routine(id) => fnet_filter::ResourceId::Routine(id.into()),
268            ResourceId::Rule(id) => fnet_filter::ResourceId::Rule(id.into()),
269        }
270    }
271}
272
273/// Extension type for [`fnet_filter::Domain`].
274#[derive(Debug, Clone, PartialEq)]
275pub enum Domain {
276    Ipv4,
277    Ipv6,
278    AllIp,
279}
280
281impl From<Domain> for fnet_filter::Domain {
282    fn from(domain: Domain) -> Self {
283        match domain {
284            Domain::Ipv4 => fnet_filter::Domain::Ipv4,
285            Domain::Ipv6 => fnet_filter::Domain::Ipv6,
286            Domain::AllIp => fnet_filter::Domain::AllIp,
287        }
288    }
289}
290
291impl TryFrom<fnet_filter::Domain> for Domain {
292    type Error = FidlConversionError;
293
294    fn try_from(domain: fnet_filter::Domain) -> Result<Self, Self::Error> {
295        match domain {
296            fnet_filter::Domain::Ipv4 => Ok(Self::Ipv4),
297            fnet_filter::Domain::Ipv6 => Ok(Self::Ipv6),
298            fnet_filter::Domain::AllIp => Ok(Self::AllIp),
299            fnet_filter::Domain::__SourceBreaking { .. } => {
300                Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
301            }
302        }
303    }
304}
305
306/// Extension type for [`fnet_filter::Namespace`].
307#[derive(Debug, Clone, PartialEq)]
308pub struct Namespace {
309    pub id: NamespaceId,
310    pub domain: Domain,
311}
312
313impl From<Namespace> for fnet_filter::Namespace {
314    fn from(namespace: Namespace) -> Self {
315        let Namespace { id, domain } = namespace;
316        let NamespaceId(id) = id;
317        Self { id: Some(id), domain: Some(domain.into()), __source_breaking: SourceBreaking }
318    }
319}
320
321impl TryFrom<fnet_filter::Namespace> for Namespace {
322    type Error = FidlConversionError;
323
324    fn try_from(namespace: fnet_filter::Namespace) -> Result<Self, Self::Error> {
325        let fnet_filter::Namespace { id, domain, __source_breaking } = namespace;
326        let id = NamespaceId(id.ok_or(FidlConversionError::MissingNamespaceId)?);
327        let domain = domain.ok_or(FidlConversionError::MissingNamespaceDomain)?.try_into()?;
328        Ok(Self { id, domain })
329    }
330}
331
332/// Extension type for [`fnet_filter::IpInstallationHook`].
333#[derive(Debug, Clone, Copy, PartialEq)]
334pub enum IpHook {
335    Ingress,
336    LocalIngress,
337    Forwarding,
338    LocalEgress,
339    Egress,
340}
341
342impl From<IpHook> for fnet_filter::IpInstallationHook {
343    fn from(hook: IpHook) -> Self {
344        match hook {
345            IpHook::Ingress => Self::Ingress,
346            IpHook::LocalIngress => Self::LocalIngress,
347            IpHook::Forwarding => Self::Forwarding,
348            IpHook::LocalEgress => Self::LocalEgress,
349            IpHook::Egress => Self::Egress,
350        }
351    }
352}
353
354impl TryFrom<fnet_filter::IpInstallationHook> for IpHook {
355    type Error = FidlConversionError;
356
357    fn try_from(hook: fnet_filter::IpInstallationHook) -> Result<Self, Self::Error> {
358        match hook {
359            fnet_filter::IpInstallationHook::Ingress => Ok(Self::Ingress),
360            fnet_filter::IpInstallationHook::LocalIngress => Ok(Self::LocalIngress),
361            fnet_filter::IpInstallationHook::Forwarding => Ok(Self::Forwarding),
362            fnet_filter::IpInstallationHook::LocalEgress => Ok(Self::LocalEgress),
363            fnet_filter::IpInstallationHook::Egress => Ok(Self::Egress),
364            fnet_filter::IpInstallationHook::__SourceBreaking { .. } => {
365                Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
366            }
367        }
368    }
369}
370
371/// Extension type for [`fnet_filter::NatInstallationHook`].
372#[derive(Debug, Clone, Copy, PartialEq)]
373pub enum NatHook {
374    Ingress,
375    LocalIngress,
376    LocalEgress,
377    Egress,
378}
379
380impl From<NatHook> for fnet_filter::NatInstallationHook {
381    fn from(hook: NatHook) -> Self {
382        match hook {
383            NatHook::Ingress => Self::Ingress,
384            NatHook::LocalIngress => Self::LocalIngress,
385            NatHook::LocalEgress => Self::LocalEgress,
386            NatHook::Egress => Self::Egress,
387        }
388    }
389}
390
391impl TryFrom<fnet_filter::NatInstallationHook> for NatHook {
392    type Error = FidlConversionError;
393
394    fn try_from(hook: fnet_filter::NatInstallationHook) -> Result<Self, Self::Error> {
395        match hook {
396            fnet_filter::NatInstallationHook::Ingress => Ok(Self::Ingress),
397            fnet_filter::NatInstallationHook::LocalIngress => Ok(Self::LocalIngress),
398            fnet_filter::NatInstallationHook::LocalEgress => Ok(Self::LocalEgress),
399            fnet_filter::NatInstallationHook::Egress => Ok(Self::Egress),
400            fnet_filter::NatInstallationHook::__SourceBreaking { .. } => {
401                Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
402            }
403        }
404    }
405}
406
407/// Extension type for [`fnet_filter::InstalledIpRoutine`].
408#[derive(Debug, Clone, PartialEq)]
409pub struct InstalledIpRoutine {
410    pub hook: IpHook,
411    pub priority: i32,
412}
413
414impl From<InstalledIpRoutine> for fnet_filter::InstalledIpRoutine {
415    fn from(routine: InstalledIpRoutine) -> Self {
416        let InstalledIpRoutine { hook, priority } = routine;
417        Self {
418            hook: Some(hook.into()),
419            priority: Some(priority),
420            __source_breaking: SourceBreaking,
421        }
422    }
423}
424
425impl TryFrom<fnet_filter::InstalledIpRoutine> for InstalledIpRoutine {
426    type Error = FidlConversionError;
427
428    fn try_from(routine: fnet_filter::InstalledIpRoutine) -> Result<Self, Self::Error> {
429        let fnet_filter::InstalledIpRoutine { hook, priority, __source_breaking } = routine;
430        let hook = hook.ok_or(FidlConversionError::MissingIpInstallationHook)?;
431        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
432        Ok(Self { hook: hook.try_into()?, priority })
433    }
434}
435
436/// Extension type for [`fnet_filter::InstalledNatRoutine`].
437#[derive(Debug, Clone, PartialEq)]
438pub struct InstalledNatRoutine {
439    pub hook: NatHook,
440    pub priority: i32,
441}
442
443impl From<InstalledNatRoutine> for fnet_filter::InstalledNatRoutine {
444    fn from(routine: InstalledNatRoutine) -> Self {
445        let InstalledNatRoutine { hook, priority } = routine;
446        Self {
447            hook: Some(hook.into()),
448            priority: Some(priority),
449            __source_breaking: SourceBreaking,
450        }
451    }
452}
453
454impl TryFrom<fnet_filter::InstalledNatRoutine> for InstalledNatRoutine {
455    type Error = FidlConversionError;
456
457    fn try_from(routine: fnet_filter::InstalledNatRoutine) -> Result<Self, Self::Error> {
458        let fnet_filter::InstalledNatRoutine { hook, priority, __source_breaking } = routine;
459        let hook = hook.ok_or(FidlConversionError::MissingNatInstallationHook)?;
460        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
461        Ok(Self { hook: hook.try_into()?, priority })
462    }
463}
464
465/// Extension type for [`fnet_filter::RoutineType`].
466#[derive(Debug, Clone, PartialEq)]
467pub enum RoutineType {
468    Ip(Option<InstalledIpRoutine>),
469    Nat(Option<InstalledNatRoutine>),
470}
471
472impl RoutineType {
473    pub fn is_installed(&self) -> bool {
474        // The `InstalledIpRoutine` or `InstalledNatRoutine` configuration is
475        // optional, and when omitted, signifies an uninstalled routine.
476        match self {
477            Self::Ip(Some(_)) | Self::Nat(Some(_)) => true,
478            Self::Ip(None) | Self::Nat(None) => false,
479        }
480    }
481}
482
483impl From<RoutineType> for fnet_filter::RoutineType {
484    fn from(routine: RoutineType) -> Self {
485        match routine {
486            RoutineType::Ip(installation) => Self::Ip(fnet_filter::IpRoutine {
487                installation: installation.map(Into::into),
488                __source_breaking: SourceBreaking,
489            }),
490            RoutineType::Nat(installation) => Self::Nat(fnet_filter::NatRoutine {
491                installation: installation.map(Into::into),
492                __source_breaking: SourceBreaking,
493            }),
494        }
495    }
496}
497
498impl TryFrom<fnet_filter::RoutineType> for RoutineType {
499    type Error = FidlConversionError;
500
501    fn try_from(type_: fnet_filter::RoutineType) -> Result<Self, Self::Error> {
502        match type_ {
503            fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
504                installation,
505                __source_breaking,
506            }) => Ok(RoutineType::Ip(installation.map(TryInto::try_into).transpose()?)),
507            fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine {
508                installation,
509                __source_breaking,
510            }) => Ok(RoutineType::Nat(installation.map(TryInto::try_into).transpose()?)),
511            fnet_filter::RoutineType::__SourceBreaking { .. } => {
512                Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
513            }
514        }
515    }
516}
517
518/// Extension type for [`fnet_filter::Routine`].
519#[derive(Debug, Clone, PartialEq)]
520pub struct Routine {
521    pub id: RoutineId,
522    pub routine_type: RoutineType,
523}
524
525impl From<Routine> for fnet_filter::Routine {
526    fn from(routine: Routine) -> Self {
527        let Routine { id, routine_type: type_ } = routine;
528        Self { id: Some(id.into()), type_: Some(type_.into()), __source_breaking: SourceBreaking }
529    }
530}
531
532impl TryFrom<fnet_filter::Routine> for Routine {
533    type Error = FidlConversionError;
534
535    fn try_from(routine: fnet_filter::Routine) -> Result<Self, Self::Error> {
536        let fnet_filter::Routine { id, type_, __source_breaking } = routine;
537        let id = id.ok_or(FidlConversionError::MissingRoutineId)?;
538        let type_ = type_.ok_or(FidlConversionError::MissingRoutineType)?;
539        Ok(Self { id: id.into(), routine_type: type_.try_into()? })
540    }
541}
542
543/// Extension type for [`fnet_filter::Matchers`].
544#[derive(Default, Clone, PartialEq)]
545pub struct Matchers {
546    pub in_interface: Option<fnet_matchers_ext::Interface>,
547    pub out_interface: Option<fnet_matchers_ext::Interface>,
548    pub src_addr: Option<fnet_matchers_ext::Address>,
549    pub dst_addr: Option<fnet_matchers_ext::Address>,
550    pub transport_protocol: Option<fnet_matchers_ext::TransportProtocol>,
551    pub ebpf_program: Option<febpf::ProgramId>,
552}
553
554impl From<Matchers> for fnet_filter::Matchers {
555    fn from(matchers: Matchers) -> Self {
556        let Matchers {
557            in_interface,
558            out_interface,
559            src_addr,
560            dst_addr,
561            transport_protocol,
562            ebpf_program,
563        } = matchers;
564        Self {
565            in_interface: in_interface.map(Into::into),
566            out_interface: out_interface.map(Into::into),
567            src_addr: src_addr.map(Into::into),
568            dst_addr: dst_addr.map(Into::into),
569            transport_protocol: transport_protocol.map(Into::into),
570            ebpf_program: ebpf_program.map(Into::into),
571            __source_breaking: SourceBreaking,
572        }
573    }
574}
575
576impl TryFrom<fnet_filter::Matchers> for Matchers {
577    type Error = FidlConversionError;
578
579    fn try_from(matchers: fnet_filter::Matchers) -> Result<Self, Self::Error> {
580        let fnet_filter::Matchers {
581            in_interface,
582            out_interface,
583            src_addr,
584            dst_addr,
585            transport_protocol,
586            ebpf_program,
587            __source_breaking,
588        } = matchers;
589        Ok(Self {
590            in_interface: in_interface.map(TryInto::try_into).transpose()?,
591            out_interface: out_interface.map(TryInto::try_into).transpose()?,
592            src_addr: src_addr.map(TryInto::try_into).transpose()?,
593            dst_addr: dst_addr.map(TryInto::try_into).transpose()?,
594            transport_protocol: transport_protocol.map(TryInto::try_into).transpose()?,
595            ebpf_program: ebpf_program.map(Into::into),
596        })
597    }
598}
599
600impl Debug for Matchers {
601    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602        let mut debug_struct = f.debug_struct("Matchers");
603
604        let Matchers {
605            in_interface,
606            out_interface,
607            src_addr,
608            dst_addr,
609            transport_protocol,
610            ebpf_program,
611        } = &self;
612
613        // Omit empty fields.
614        if let Some(matcher) = in_interface {
615            let _ = debug_struct.field("in_interface", matcher);
616        }
617
618        if let Some(matcher) = out_interface {
619            let _ = debug_struct.field("out_interface", matcher);
620        }
621
622        if let Some(matcher) = src_addr {
623            let _ = debug_struct.field("src_addr", matcher);
624        }
625
626        if let Some(matcher) = dst_addr {
627            let _ = debug_struct.field("dst_addr", matcher);
628        }
629
630        if let Some(matcher) = transport_protocol {
631            let _ = debug_struct.field("transport_protocol", matcher);
632        }
633
634        if let Some(matcher) = ebpf_program {
635            let _ = debug_struct.field("ebpf_program", matcher);
636        }
637
638        debug_struct.finish()
639    }
640}
641
642/// Extension type for [`fnet_filter::Action`].
643#[derive(Debug, Clone, PartialEq)]
644pub enum Action {
645    Accept,
646    Drop,
647    Jump(String),
648    Return,
649    TransparentProxy(TransparentProxy),
650    Redirect { dst_port: Option<PortRange> },
651    Masquerade { src_port: Option<PortRange> },
652    Mark { domain: fnet::MarkDomain, action: MarkAction },
653    None,
654    Reject(RejectType),
655}
656
657#[derive(Debug, Clone, PartialEq)]
658pub enum MarkAction {
659    SetMark { clearing_mask: fnet::Mark, mark: fnet::Mark },
660}
661
662/// Extension type for [`fnet_filter::TransparentProxy_`].
663#[derive(Debug, Clone, PartialEq)]
664pub enum TransparentProxy {
665    LocalAddr(fnet::IpAddress),
666    LocalPort(NonZeroU16),
667    LocalAddrAndPort(fnet::IpAddress, NonZeroU16),
668}
669
670#[derive(Debug, Clone, PartialEq)]
671pub struct PortRange(pub RangeInclusive<NonZeroU16>);
672
673impl From<PortRange> for fnet_filter::PortRange {
674    fn from(range: PortRange) -> Self {
675        let PortRange(range) = range;
676        Self { start: range.start().get(), end: range.end().get() }
677    }
678}
679
680impl TryFrom<fnet_filter::PortRange> for PortRange {
681    type Error = FidlConversionError;
682
683    fn try_from(range: fnet_filter::PortRange) -> Result<Self, Self::Error> {
684        let fnet_filter::PortRange { start, end } = range;
685        if start > end {
686            Err(FidlConversionError::InvalidPortRange)
687        } else {
688            let start = NonZeroU16::new(start).ok_or(FidlConversionError::UnspecifiedNatPort)?;
689            let end = NonZeroU16::new(end).ok_or(FidlConversionError::UnspecifiedNatPort)?;
690            Ok(Self(start..=end))
691        }
692    }
693}
694
695#[derive(Debug, Clone, Copy, PartialEq)]
696pub enum RejectType {
697    TcpReset,
698    NetUnreachable,
699    HostUnreachable,
700    ProtoUnreachable,
701    PortUnreachable,
702    RoutePolicyFail,
703    RejectRoute,
704    AdminProhibited,
705}
706
707impl From<RejectType> for fnet_filter::RejectType {
708    fn from(value: RejectType) -> Self {
709        match value {
710            RejectType::TcpReset => fnet_filter::RejectType::TcpReset,
711            RejectType::NetUnreachable => fnet_filter::RejectType::NetUnreachable,
712            RejectType::HostUnreachable => fnet_filter::RejectType::HostUnreachable,
713            RejectType::ProtoUnreachable => fnet_filter::RejectType::ProtoUnreachable,
714            RejectType::PortUnreachable => fnet_filter::RejectType::PortUnreachable,
715            RejectType::RoutePolicyFail => fnet_filter::RejectType::RoutePolicyFail,
716            RejectType::RejectRoute => fnet_filter::RejectType::RejectRoute,
717            RejectType::AdminProhibited => fnet_filter::RejectType::AdminProhibited,
718        }
719    }
720}
721
722impl TryFrom<fnet_filter::RejectType> for RejectType {
723    type Error = FidlConversionError;
724
725    fn try_from(value: fnet_filter::RejectType) -> Result<Self, Self::Error> {
726        match value {
727            fnet_filter::RejectType::TcpReset => Ok(RejectType::TcpReset),
728            fnet_filter::RejectType::NetUnreachable => Ok(RejectType::NetUnreachable),
729            fnet_filter::RejectType::HostUnreachable => Ok(RejectType::HostUnreachable),
730            fnet_filter::RejectType::ProtoUnreachable => Ok(RejectType::ProtoUnreachable),
731            fnet_filter::RejectType::PortUnreachable => Ok(RejectType::PortUnreachable),
732            fnet_filter::RejectType::RoutePolicyFail => Ok(RejectType::RoutePolicyFail),
733            fnet_filter::RejectType::RejectRoute => Ok(RejectType::RejectRoute),
734            fnet_filter::RejectType::AdminProhibited => Ok(RejectType::AdminProhibited),
735            fnet_filter::RejectType::__SourceBreaking { .. } => {
736                Err(FidlConversionError::UnknownUnionVariant(type_names::REJECT_TYPE))
737            }
738        }
739    }
740}
741
742impl From<Action> for fnet_filter::Action {
743    fn from(action: Action) -> Self {
744        match action {
745            Action::Accept => Self::Accept(fnet_filter::Empty {}),
746            Action::Drop => Self::Drop(fnet_filter::Empty {}),
747            Action::Jump(target) => Self::Jump(target),
748            Action::Return => Self::Return_(fnet_filter::Empty {}),
749            Action::TransparentProxy(proxy) => Self::TransparentProxy(match proxy {
750                TransparentProxy::LocalAddr(addr) => {
751                    fnet_filter::TransparentProxy_::LocalAddr(addr)
752                }
753                TransparentProxy::LocalPort(port) => {
754                    fnet_filter::TransparentProxy_::LocalPort(port.get())
755                }
756                TransparentProxy::LocalAddrAndPort(addr, port) => {
757                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
758                        addr,
759                        port: port.get(),
760                    })
761                }
762            }),
763            Action::Redirect { dst_port } => Self::Redirect(fnet_filter::Redirect {
764                dst_port: dst_port.map(Into::into),
765                __source_breaking: SourceBreaking,
766            }),
767            Action::Masquerade { src_port } => Self::Masquerade(fnet_filter::Masquerade {
768                src_port: src_port.map(Into::into),
769                __source_breaking: SourceBreaking,
770            }),
771            Action::Mark { domain, action } => {
772                Self::Mark(fnet_filter::Mark { domain, action: action.into() })
773            }
774            Action::None => Self::None(fnet_filter::Empty {}),
775            Action::Reject(reject_type) => {
776                Self::Reject(fnet_filter::Reject { reject_type: reject_type.into() })
777            }
778        }
779    }
780}
781
782impl TryFrom<fnet_filter::Action> for Action {
783    type Error = FidlConversionError;
784
785    fn try_from(action: fnet_filter::Action) -> Result<Self, Self::Error> {
786        match action {
787            fnet_filter::Action::Accept(fnet_filter::Empty {}) => Ok(Self::Accept),
788            fnet_filter::Action::Drop(fnet_filter::Empty {}) => Ok(Self::Drop),
789            fnet_filter::Action::Jump(target) => Ok(Self::Jump(target)),
790            fnet_filter::Action::Return_(fnet_filter::Empty {}) => Ok(Self::Return),
791            fnet_filter::Action::TransparentProxy(proxy) => {
792                Ok(Self::TransparentProxy(match proxy {
793                    fnet_filter::TransparentProxy_::LocalAddr(addr) => {
794                        TransparentProxy::LocalAddr(addr)
795                    }
796                    fnet_filter::TransparentProxy_::LocalPort(port) => {
797                        let port = NonZeroU16::new(port)
798                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
799                        TransparentProxy::LocalPort(port)
800                    }
801                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
802                        addr,
803                        port,
804                    }) => {
805                        let port = NonZeroU16::new(port)
806                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
807                        TransparentProxy::LocalAddrAndPort(addr, port)
808                    }
809                    fnet_filter::TransparentProxy_::__SourceBreaking { .. } => {
810                        return Err(FidlConversionError::UnknownUnionVariant(
811                            type_names::TRANSPARENT_PROXY,
812                        ));
813                    }
814                }))
815            }
816            fnet_filter::Action::Redirect(fnet_filter::Redirect {
817                dst_port,
818                __source_breaking,
819            }) => Ok(Self::Redirect { dst_port: dst_port.map(TryInto::try_into).transpose()? }),
820            fnet_filter::Action::Masquerade(fnet_filter::Masquerade {
821                src_port,
822                __source_breaking,
823            }) => Ok(Self::Masquerade { src_port: src_port.map(TryInto::try_into).transpose()? }),
824            fnet_filter::Action::Mark(fnet_filter::Mark { domain, action }) => {
825                Ok(Self::Mark { domain, action: action.try_into()? })
826            }
827            fnet_filter::Action::__SourceBreaking { .. } => {
828                Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
829            }
830            fnet_filter::Action::None(fnet_filter::Empty {}) => Ok(Self::None),
831            fnet_filter::Action::Reject(fnet_filter::Reject { reject_type }) => {
832                Ok(Self::Reject(reject_type.try_into()?))
833            }
834        }
835    }
836}
837
838impl From<MarkAction> for fnet_filter::MarkAction {
839    fn from(action: MarkAction) -> Self {
840        match action {
841            MarkAction::SetMark { clearing_mask, mark } => {
842                Self::SetMark(fnet_filter::SetMark { clearing_mask, mark })
843            }
844        }
845    }
846}
847
848impl TryFrom<fnet_filter::MarkAction> for MarkAction {
849    type Error = FidlConversionError;
850    fn try_from(action: fnet_filter::MarkAction) -> Result<Self, Self::Error> {
851        match action {
852            fnet_filter::MarkAction::SetMark(fnet_filter::SetMark { clearing_mask, mark }) => {
853                Ok(Self::SetMark { clearing_mask, mark })
854            }
855            fnet_filter::MarkAction::__SourceBreaking { .. } => {
856                Err(FidlConversionError::UnknownUnionVariant(type_names::MARK_ACTION))
857            }
858        }
859    }
860}
861
862/// Extension type for [`fnet_filter::Rule`].
863#[derive(Debug, Clone, PartialEq)]
864pub struct Rule {
865    pub id: RuleId,
866    pub matchers: Matchers,
867    pub action: Action,
868}
869
870impl From<Rule> for fnet_filter::Rule {
871    fn from(rule: Rule) -> Self {
872        let Rule { id, matchers, action } = rule;
873        Self { id: id.into(), matchers: matchers.into(), action: action.into() }
874    }
875}
876
877impl TryFrom<fnet_filter::Rule> for Rule {
878    type Error = FidlConversionError;
879
880    fn try_from(rule: fnet_filter::Rule) -> Result<Self, Self::Error> {
881        let fnet_filter::Rule { id, matchers, action } = rule;
882        Ok(Self { id: id.into(), matchers: matchers.try_into()?, action: action.try_into()? })
883    }
884}
885
886/// Extension type for [`fnet_filter::Resource`].
887#[derive(Debug, Clone, PartialEq)]
888pub enum Resource {
889    Namespace(Namespace),
890    Routine(Routine),
891    Rule(Rule),
892}
893
894impl Resource {
895    pub fn id(&self) -> ResourceId {
896        match self {
897            Self::Namespace(Namespace { id, domain: _ }) => ResourceId::Namespace(id.clone()),
898            Self::Routine(Routine { id, routine_type: _ }) => ResourceId::Routine(id.clone()),
899            Self::Rule(Rule { id, matchers: _, action: _ }) => ResourceId::Rule(id.clone()),
900        }
901    }
902}
903
904impl From<Resource> for fnet_filter::Resource {
905    fn from(resource: Resource) -> Self {
906        match resource {
907            Resource::Namespace(namespace) => Self::Namespace(namespace.into()),
908            Resource::Routine(routine) => Self::Routine(routine.into()),
909            Resource::Rule(rule) => Self::Rule(rule.into()),
910        }
911    }
912}
913
914impl TryFrom<fnet_filter::Resource> for Resource {
915    type Error = FidlConversionError;
916
917    fn try_from(resource: fnet_filter::Resource) -> Result<Self, Self::Error> {
918        match resource {
919            fnet_filter::Resource::Namespace(namespace) => {
920                Ok(Self::Namespace(namespace.try_into()?))
921            }
922            fnet_filter::Resource::Routine(routine) => Ok(Self::Routine(routine.try_into()?)),
923            fnet_filter::Resource::Rule(rule) => Ok(Self::Rule(rule.try_into()?)),
924            fnet_filter::Resource::__SourceBreaking { .. } => {
925                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
926            }
927        }
928    }
929}
930
931/// Extension type for [`fnet_filter::ControllerId`].
932#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
933pub struct ControllerId(pub String);
934
935/// Extension type for [`fnet_filter::Event`].
936#[derive(Debug, Clone, PartialEq)]
937pub enum Event {
938    Existing(ControllerId, Resource),
939    Idle,
940    Added(ControllerId, Resource),
941    Removed(ControllerId, ResourceId),
942    EndOfUpdate,
943}
944
945impl From<Event> for fnet_filter::Event {
946    fn from(event: Event) -> Self {
947        match event {
948            Event::Existing(controller, resource) => {
949                let ControllerId(id) = controller;
950                Self::Existing(fnet_filter::ExistingResource {
951                    controller: id,
952                    resource: resource.into(),
953                })
954            }
955            Event::Idle => Self::Idle(fnet_filter::Empty {}),
956            Event::Added(controller, resource) => {
957                let ControllerId(id) = controller;
958                Self::Added(fnet_filter::AddedResource {
959                    controller: id,
960                    resource: resource.into(),
961                })
962            }
963            Event::Removed(controller, resource) => {
964                let ControllerId(id) = controller;
965                Self::Removed(fnet_filter::RemovedResource {
966                    controller: id,
967                    resource: resource.into(),
968                })
969            }
970            Event::EndOfUpdate => Self::EndOfUpdate(fnet_filter::Empty {}),
971        }
972    }
973}
974
975impl TryFrom<fnet_filter::Event> for Event {
976    type Error = FidlConversionError;
977
978    fn try_from(event: fnet_filter::Event) -> Result<Self, Self::Error> {
979        match event {
980            fnet_filter::Event::Existing(fnet_filter::ExistingResource {
981                controller,
982                resource,
983            }) => Ok(Self::Existing(ControllerId(controller), resource.try_into()?)),
984            fnet_filter::Event::Idle(fnet_filter::Empty {}) => Ok(Self::Idle),
985            fnet_filter::Event::Added(fnet_filter::AddedResource { controller, resource }) => {
986                Ok(Self::Added(ControllerId(controller), resource.try_into()?))
987            }
988            fnet_filter::Event::Removed(fnet_filter::RemovedResource { controller, resource }) => {
989                Ok(Self::Removed(ControllerId(controller), resource.try_into()?))
990            }
991            fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}) => Ok(Self::EndOfUpdate),
992            fnet_filter::Event::__SourceBreaking { .. } => {
993                Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
994            }
995        }
996    }
997}
998
999/// Filter watcher creation errors.
1000#[derive(Debug, Error)]
1001pub enum WatcherCreationError {
1002    #[error("failed to create filter watcher proxy: {0}")]
1003    CreateProxy(fidl::Error),
1004    #[error("failed to get filter watcher: {0}")]
1005    GetWatcher(fidl::Error),
1006}
1007
1008/// Filter watcher `Watch` errors.
1009#[derive(Debug, Error)]
1010pub enum WatchError {
1011    /// The call to `Watch` returned a FIDL error.
1012    #[error("the call to `Watch()` failed: {0}")]
1013    Fidl(fidl::Error),
1014    /// The event returned by `Watch` encountered a conversion error.
1015    #[error("failed to convert event returned by `Watch()`: {0}")]
1016    Conversion(FidlConversionError),
1017    /// The server returned an empty batch of events.
1018    #[error("the call to `Watch()` returned an empty batch of events")]
1019    EmptyEventBatch,
1020}
1021
1022/// Connects to the watcher protocol and converts the Hanging-Get style API into
1023/// an Event stream.
1024///
1025/// Each call to `Watch` returns a batch of events, which are flattened into a
1026/// single stream. If an error is encountered while calling `Watch` or while
1027/// converting the event, the stream is immediately terminated.
1028pub fn event_stream_from_state(
1029    state: fnet_filter::StateProxy,
1030) -> Result<impl Stream<Item = Result<Event, WatchError>>, WatcherCreationError> {
1031    let (watcher, server_end) = fidl::endpoints::create_proxy::<fnet_filter::WatcherMarker>();
1032    state
1033        .get_watcher(&fnet_filter::WatcherOptions::default(), server_end)
1034        .map_err(WatcherCreationError::GetWatcher)?;
1035
1036    let stream = futures::stream::try_unfold(watcher, |watcher| async {
1037        let events = watcher.watch().await.map_err(WatchError::Fidl)?;
1038        if events.is_empty() {
1039            return Err(WatchError::EmptyEventBatch);
1040        }
1041
1042        let event_stream = futures::stream::iter(events).map(Ok).and_then(|event| {
1043            futures::future::ready(event.try_into().map_err(WatchError::Conversion))
1044        });
1045        Ok(Some((event_stream, watcher)))
1046    })
1047    .try_flatten();
1048
1049    Ok(stream)
1050}
1051
1052/// Errors returned by [`get_existing_resources`].
1053#[derive(Debug, Error)]
1054pub enum GetExistingResourcesError {
1055    /// There was an error in the event stream.
1056    #[error("there was an error in the event stream: {0}")]
1057    ErrorInStream(WatchError),
1058    /// There was an unexpected event in the event stream. Only `existing` or
1059    /// `idle` events are expected.
1060    #[error("there was an unexpected event in the event stream: {0:?}")]
1061    UnexpectedEvent(Event),
1062    /// A duplicate existing resource was reported in the event stream.
1063    #[error("a duplicate existing resource was reported")]
1064    DuplicateResource(Resource),
1065    /// The event stream unexpectedly ended.
1066    #[error("the event stream unexpectedly ended")]
1067    StreamEnded,
1068}
1069
1070/// A trait for types holding filtering state that can be updated by change
1071/// events.
1072pub trait Update {
1073    /// Add the resource to the specified controller's state.
1074    ///
1075    /// Optionally returns a resource that has already been added to the
1076    /// controller with the same [`ResourceId`].
1077    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource>;
1078
1079    /// Remove the resource from the specified controller's state.
1080    ///
1081    /// Returns the removed resource, if present.
1082    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource>;
1083}
1084
1085impl Update for HashMap<ControllerId, HashMap<ResourceId, Resource>> {
1086    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource> {
1087        self.entry(controller).or_default().insert(resource.id(), resource)
1088    }
1089
1090    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource> {
1091        self.get_mut(&controller)?.remove(resource)
1092    }
1093}
1094
1095/// Collects all `existing` events from the stream, stopping once the `idle`
1096/// event is observed.
1097pub async fn get_existing_resources<C: Update + Default>(
1098    stream: impl Stream<Item = Result<Event, WatchError>>,
1099) -> Result<C, GetExistingResourcesError> {
1100    async_utils::fold::fold_while(
1101        stream,
1102        Ok(C::default()),
1103        |resources: Result<C, GetExistingResourcesError>, event| {
1104            let mut resources =
1105                resources.expect("`resources` must be `Ok`, because we stop folding on err");
1106            futures::future::ready(match event {
1107                Err(e) => FoldWhile::Done(Err(GetExistingResourcesError::ErrorInStream(e))),
1108                Ok(e) => match e {
1109                    Event::Existing(controller, resource) => {
1110                        if let Some(resource) = resources.add(controller, resource) {
1111                            FoldWhile::Done(Err(GetExistingResourcesError::DuplicateResource(
1112                                resource,
1113                            )))
1114                        } else {
1115                            FoldWhile::Continue(Ok(resources))
1116                        }
1117                    }
1118                    Event::Idle => FoldWhile::Done(Ok(resources)),
1119                    e @ (Event::Added(_, _) | Event::Removed(_, _) | Event::EndOfUpdate) => {
1120                        FoldWhile::Done(Err(GetExistingResourcesError::UnexpectedEvent(e)))
1121                    }
1122                },
1123            })
1124        },
1125    )
1126    .await
1127    .short_circuited()
1128    .map_err(|_resources| GetExistingResourcesError::StreamEnded)?
1129}
1130
1131/// Errors returned by [`wait_for_condition`].
1132#[derive(Debug, Error)]
1133pub enum WaitForConditionError {
1134    /// There was an error in the event stream.
1135    #[error("there was an error in the event stream: {0}")]
1136    ErrorInStream(WatchError),
1137    /// There was an `Added` event for an already existing resource.
1138    #[error("observed an added event for an already existing resource: {0:?}")]
1139    AddedAlreadyExisting(Resource),
1140    /// There was a `Removed` event for a non-existent resource.
1141    #[error("observed a removed event for a non-existent resource: {0:?}")]
1142    RemovedNonExistent(ResourceId),
1143    /// The event stream unexpectedly ended.
1144    #[error("the event stream unexpectedly ended")]
1145    StreamEnded,
1146}
1147
1148/// Wait for a condition on filtering state to be satisfied.
1149///
1150/// With the given `initial_state`, take events from `event_stream` and update
1151/// the state, calling `predicate` whenever the state changes. When predicates
1152/// returns `True` yield `Ok(())`.
1153pub async fn wait_for_condition<
1154    C: Update,
1155    S: Stream<Item = Result<Event, WatchError>>,
1156    F: Fn(&C) -> bool,
1157>(
1158    event_stream: S,
1159    initial_state: &mut C,
1160    predicate: F,
1161) -> Result<(), WaitForConditionError> {
1162    async_utils::fold::try_fold_while(
1163        event_stream.map_err(WaitForConditionError::ErrorInStream),
1164        initial_state,
1165        |resources: &mut C, event| {
1166            futures::future::ready(match event {
1167                Event::Existing(controller, resource) | Event::Added(controller, resource) => {
1168                    if let Some(resource) = resources.add(controller, resource) {
1169                        Err(WaitForConditionError::AddedAlreadyExisting(resource))
1170                    } else {
1171                        Ok(FoldWhile::Continue(resources))
1172                    }
1173                }
1174                Event::Removed(controller, resource) => resources
1175                    .remove(controller, &resource)
1176                    .map(|_| FoldWhile::Continue(resources))
1177                    .ok_or(WaitForConditionError::RemovedNonExistent(resource)),
1178                // Wait until a transactional update has been completed to call
1179                // the predicate so it's not run against partially-updated
1180                // state.
1181                Event::Idle | Event::EndOfUpdate => {
1182                    if predicate(&resources) {
1183                        Ok(FoldWhile::Done(()))
1184                    } else {
1185                        Ok(FoldWhile::Continue(resources))
1186                    }
1187                }
1188            })
1189        },
1190    )
1191    .await?
1192    .short_circuited()
1193    .map_err(|_resources: &mut C| WaitForConditionError::StreamEnded)
1194}
1195
1196/// Namespace controller creation errors.
1197#[derive(Debug, Error)]
1198pub enum ControllerCreationError {
1199    #[error("failed to create namespace controller proxy: {0}")]
1200    CreateProxy(fidl::Error),
1201    #[error("failed to open namespace controller: {0}")]
1202    OpenController(fidl::Error),
1203    #[error("server did not emit OnIdAssigned event")]
1204    NoIdAssigned,
1205    #[error("failed to observe ID assignment event: {0}")]
1206    IdAssignment(fidl::Error),
1207}
1208
1209/// Errors for individual changes pushed.
1210///
1211/// Extension type for the error variants of [`fnet_filter::ChangeValidationError`].
1212#[derive(Debug, Error, PartialEq)]
1213pub enum ChangeValidationError {
1214    #[error("change contains a resource that is missing a required field")]
1215    MissingRequiredField,
1216    #[error("rule specifies an invalid interface matcher")]
1217    InvalidInterfaceMatcher,
1218    #[error("rule specifies an invalid address matcher")]
1219    InvalidAddressMatcher,
1220    #[error("rule specifies an invalid port matcher")]
1221    InvalidPortMatcher,
1222    #[error("rule specifies an invalid transparent proxy action")]
1223    InvalidTransparentProxyAction,
1224    #[error("rule specifies an invalid NAT action")]
1225    InvalidNatAction,
1226    #[error("rule specifies an invalid port range")]
1227    InvalidPortRange,
1228}
1229
1230impl TryFrom<fnet_filter::ChangeValidationError> for ChangeValidationError {
1231    type Error = FidlConversionError;
1232
1233    fn try_from(error: fnet_filter::ChangeValidationError) -> Result<Self, Self::Error> {
1234        match error {
1235            fnet_filter::ChangeValidationError::MissingRequiredField => {
1236                Ok(Self::MissingRequiredField)
1237            }
1238            fnet_filter::ChangeValidationError::InvalidInterfaceMatcher => {
1239                Ok(Self::InvalidInterfaceMatcher)
1240            }
1241            fnet_filter::ChangeValidationError::InvalidAddressMatcher => {
1242                Ok(Self::InvalidAddressMatcher)
1243            }
1244            fnet_filter::ChangeValidationError::InvalidPortMatcher => Ok(Self::InvalidPortMatcher),
1245            fnet_filter::ChangeValidationError::InvalidTransparentProxyAction => {
1246                Ok(Self::InvalidTransparentProxyAction)
1247            }
1248            fnet_filter::ChangeValidationError::InvalidNatAction => Ok(Self::InvalidNatAction),
1249            fnet_filter::ChangeValidationError::InvalidPortRange => Ok(Self::InvalidPortRange),
1250            fnet_filter::ChangeValidationError::Ok
1251            | fnet_filter::ChangeValidationError::NotReached => {
1252                Err(FidlConversionError::NotAnError)
1253            }
1254            fnet_filter::ChangeValidationError::__SourceBreaking { unknown_ordinal: _ } => {
1255                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_ERROR))
1256            }
1257        }
1258    }
1259}
1260
1261#[derive(Debug, Error)]
1262pub enum RegisterEbpfProgramError {
1263    #[error("failed to call FIDL method: {0}")]
1264    CallMethod(fidl::Error),
1265
1266    #[error("failed to link the program")]
1267    LinkFailed,
1268
1269    #[error("failed to initialize a map")]
1270    MapFailed,
1271
1272    #[error("the program is already registered")]
1273    AlreadyRegistered,
1274
1275    #[error("the request is missing a required field")]
1276    MissingRequiredField,
1277}
1278
1279impl From<fnet_filter::RegisterEbpfProgramError> for RegisterEbpfProgramError {
1280    fn from(error: fnet_filter::RegisterEbpfProgramError) -> Self {
1281        match error {
1282            fnet_filter::RegisterEbpfProgramError::LinkFailed => Self::LinkFailed,
1283            fnet_filter::RegisterEbpfProgramError::MapFailed => Self::MapFailed,
1284            fnet_filter::RegisterEbpfProgramError::AlreadyRegistered => Self::AlreadyRegistered,
1285            fnet_filter::RegisterEbpfProgramError::MissingRequiredField => {
1286                Self::MissingRequiredField
1287            }
1288        }
1289    }
1290}
1291
1292/// Errors for the NamespaceController.PushChanges method.
1293#[derive(Debug, Error)]
1294pub enum PushChangesError {
1295    #[error("failed to call FIDL method: {0}")]
1296    CallMethod(fidl::Error),
1297    #[error("too many changes were pushed to the server")]
1298    TooManyChanges,
1299    #[error("invalid change(s) pushed: {0:?}")]
1300    ErrorOnChange(Vec<(Change, ChangeValidationError)>),
1301    #[error("unknown FIDL type: {0}")]
1302    FidlConversion(#[from] FidlConversionError),
1303}
1304
1305/// Errors for individual changes committed.
1306///
1307/// Extension type for the error variants of [`fnet_filter::CommitError`].
1308#[derive(Debug, Error, PartialEq)]
1309pub enum ChangeCommitError {
1310    #[error("the change referred to an unknown namespace")]
1311    NamespaceNotFound,
1312    #[error("the change referred to an unknown routine")]
1313    RoutineNotFound,
1314    #[error("the change referred to an unknown rule")]
1315    RuleNotFound,
1316    #[error("the specified resource already exists")]
1317    AlreadyExists,
1318    #[error("the change includes a rule that jumps to an installed routine")]
1319    TargetRoutineIsInstalled,
1320    #[error("the change includes an eBPF matcher with an invalid program ID")]
1321    InvalidEbpfProgramId,
1322}
1323
1324impl TryFrom<fnet_filter::CommitError> for ChangeCommitError {
1325    type Error = FidlConversionError;
1326
1327    fn try_from(error: fnet_filter::CommitError) -> Result<Self, Self::Error> {
1328        match error {
1329            fnet_filter::CommitError::NamespaceNotFound => Ok(Self::NamespaceNotFound),
1330            fnet_filter::CommitError::RoutineNotFound => Ok(Self::RoutineNotFound),
1331            fnet_filter::CommitError::RuleNotFound => Ok(Self::RuleNotFound),
1332            fnet_filter::CommitError::AlreadyExists => Ok(Self::AlreadyExists),
1333            fnet_filter::CommitError::TargetRoutineIsInstalled => {
1334                Ok(Self::TargetRoutineIsInstalled)
1335            }
1336            fnet_filter::CommitError::InvalidEbpfProgramId => Ok(Self::InvalidEbpfProgramId),
1337            fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1338                Err(FidlConversionError::NotAnError)
1339            }
1340            fnet_filter::CommitError::__SourceBreaking { unknown_ordinal: _ } => {
1341                Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR))
1342            }
1343        }
1344    }
1345}
1346
1347/// Errors for the NamespaceController.Commit method.
1348#[derive(Debug, Error)]
1349pub enum CommitError {
1350    #[error("failed to call FIDL method: {0}")]
1351    CallMethod(fidl::Error),
1352    #[error("rule has a matcher that is unavailable in its context: {0:?}")]
1353    RuleWithInvalidMatcher(RuleId),
1354    #[error("rule has an action that is invalid for its routine: {0:?}")]
1355    RuleWithInvalidAction(RuleId),
1356    #[error("rule has a TransparentProxy action but not a valid transport protocol matcher: {0:?}")]
1357    TransparentProxyWithInvalidMatcher(RuleId),
1358    #[error(
1359        "rule has a Redirect action that specifies a destination port but not a valid transport \
1360        protocol matcher: {0:?}"
1361    )]
1362    RedirectWithInvalidMatcher(RuleId),
1363    #[error(
1364        "rule has a Masquerade action that specifies a source port but not a valid transport \
1365        protocol matcher: {0:?}"
1366    )]
1367    MasqueradeWithInvalidMatcher(RuleId),
1368    #[error("rule has a Reject action but not a valid transport protocol matcher: {0:?}")]
1369    RejectWithInvalidMatcher(RuleId),
1370    #[error("routine forms a cycle {0:?}")]
1371    CyclicalRoutineGraph(RoutineId),
1372    #[error("invalid change was pushed: {0:?}")]
1373    ErrorOnChange(Vec<(Change, ChangeCommitError)>),
1374    #[error("unknown FIDL type: {0}")]
1375    FidlConversion(#[from] FidlConversionError),
1376}
1377
1378/// Extension type for [`fnet_filter::Change`].
1379#[derive(Debug, Clone, PartialEq)]
1380pub enum Change {
1381    Create(Resource),
1382    Remove(ResourceId),
1383}
1384
1385impl From<Change> for fnet_filter::Change {
1386    fn from(change: Change) -> Self {
1387        match change {
1388            Change::Create(resource) => Self::Create(resource.into()),
1389            Change::Remove(resource) => Self::Remove(resource.into()),
1390        }
1391    }
1392}
1393
1394impl TryFrom<fnet_filter::Change> for Change {
1395    type Error = FidlConversionError;
1396
1397    fn try_from(change: fnet_filter::Change) -> Result<Self, Self::Error> {
1398        match change {
1399            fnet_filter::Change::Create(resource) => Ok(Self::Create(resource.try_into()?)),
1400            fnet_filter::Change::Remove(resource) => Ok(Self::Remove(resource.try_into()?)),
1401            fnet_filter::Change::__SourceBreaking { .. } => {
1402                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
1403            }
1404        }
1405    }
1406}
1407
1408/// A controller for filtering state.
1409pub struct Controller {
1410    controller: fnet_filter::NamespaceControllerProxy,
1411    // The client provides an ID when creating a new controller, but the server
1412    // may need to assign a different ID to avoid conflicts; either way, the
1413    // server informs the client of the final `ControllerId` on creation.
1414    id: ControllerId,
1415    // Changes that have been pushed to the server but not yet committed. This
1416    // allows the `Controller` to report more informative errors by correlating
1417    // error codes with particular changes.
1418    pending_changes: Vec<Change>,
1419}
1420
1421impl Controller {
1422    pub async fn new_root(
1423        root: &fnet_root::FilterProxy,
1424        ControllerId(id): &ControllerId,
1425    ) -> Result<Self, ControllerCreationError> {
1426        let (controller, server_end) = fidl::endpoints::create_proxy();
1427        root.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1428
1429        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1430            .take_event_stream()
1431            .next()
1432            .await
1433            .ok_or(ControllerCreationError::NoIdAssigned)?
1434            .map_err(ControllerCreationError::IdAssignment)?;
1435        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1436    }
1437
1438    /// Creates a new `Controller`.
1439    ///
1440    /// Note that the provided `ControllerId` may need to be modified server-
1441    /// side to avoid collisions; to obtain the final ID assigned to the
1442    /// `Controller`, use the `id` method.
1443    pub async fn new(
1444        control: &fnet_filter::ControlProxy,
1445        ControllerId(id): &ControllerId,
1446    ) -> Result<Self, ControllerCreationError> {
1447        let (controller, server_end) = fidl::endpoints::create_proxy();
1448        control.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1449
1450        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1451            .take_event_stream()
1452            .next()
1453            .await
1454            .ok_or(ControllerCreationError::NoIdAssigned)?
1455            .map_err(ControllerCreationError::IdAssignment)?;
1456        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1457    }
1458
1459    pub fn id(&self) -> &ControllerId {
1460        &self.id
1461    }
1462
1463    pub async fn register_ebpf_program(
1464        &mut self,
1465        handle: febpf::ProgramHandle,
1466        program: febpf::VerifiedProgram,
1467    ) -> Result<(), RegisterEbpfProgramError> {
1468        self.controller
1469            .register_ebpf_program(handle, program)
1470            .await
1471            .map_err(RegisterEbpfProgramError::CallMethod)?
1472            .map_err(RegisterEbpfProgramError::from)
1473    }
1474
1475    pub async fn push_changes(&mut self, changes: Vec<Change>) -> Result<(), PushChangesError> {
1476        let fidl_changes = changes.iter().cloned().map(Into::into).collect::<Vec<_>>();
1477        let result = self
1478            .controller
1479            .push_changes(&fidl_changes)
1480            .await
1481            .map_err(PushChangesError::CallMethod)?;
1482        handle_change_validation_result(result, &changes)?;
1483        // Maintain a client-side copy of the pending changes we've pushed to
1484        // the server in order to provide better error messages if a commit
1485        // fails.
1486        self.pending_changes.extend(changes);
1487        Ok(())
1488    }
1489
1490    async fn commit_with_options(
1491        &mut self,
1492        options: fnet_filter::CommitOptions,
1493    ) -> Result<(), CommitError> {
1494        let committed_changes = std::mem::take(&mut self.pending_changes);
1495        let result = self.controller.commit(options).await.map_err(CommitError::CallMethod)?;
1496        handle_commit_result(result, committed_changes)
1497    }
1498
1499    pub async fn commit(&mut self) -> Result<(), CommitError> {
1500        self.commit_with_options(fnet_filter::CommitOptions::default()).await
1501    }
1502
1503    pub async fn commit_idempotent(&mut self) -> Result<(), CommitError> {
1504        self.commit_with_options(fnet_filter::CommitOptions {
1505            idempotent: Some(true),
1506            __source_breaking: SourceBreaking,
1507        })
1508        .await
1509    }
1510}
1511
1512pub(crate) fn handle_change_validation_result(
1513    change_validation_result: fnet_filter::ChangeValidationResult,
1514    changes: &Vec<Change>,
1515) -> Result<(), PushChangesError> {
1516    match change_validation_result {
1517        fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}) => Ok(()),
1518        fnet_filter::ChangeValidationResult::TooManyChanges(fnet_filter::Empty {}) => {
1519            Err(PushChangesError::TooManyChanges)
1520        }
1521        fnet_filter::ChangeValidationResult::ErrorOnChange(results) => {
1522            let errors: Result<_, PushChangesError> =
1523                changes.iter().zip(results).try_fold(Vec::new(), |mut errors, (change, result)| {
1524                    match result {
1525                        fnet_filter::ChangeValidationError::Ok
1526                        | fnet_filter::ChangeValidationError::NotReached => Ok(errors),
1527                        error @ (fnet_filter::ChangeValidationError::MissingRequiredField
1528                        | fnet_filter::ChangeValidationError::InvalidInterfaceMatcher
1529                        | fnet_filter::ChangeValidationError::InvalidAddressMatcher
1530                        | fnet_filter::ChangeValidationError::InvalidPortMatcher
1531                        | fnet_filter::ChangeValidationError::InvalidTransparentProxyAction
1532                        | fnet_filter::ChangeValidationError::InvalidNatAction
1533                        | fnet_filter::ChangeValidationError::InvalidPortRange) => {
1534                            let error = error
1535                                .try_into()
1536                                .expect("`Ok` and `NotReached` are handled in another arm");
1537                            errors.push((change.clone(), error));
1538                            Ok(errors)
1539                        }
1540                        fnet_filter::ChangeValidationError::__SourceBreaking { .. } => {
1541                            Err(FidlConversionError::UnknownUnionVariant(
1542                                type_names::CHANGE_VALIDATION_ERROR,
1543                            )
1544                            .into())
1545                        }
1546                    }
1547                });
1548            Err(PushChangesError::ErrorOnChange(errors?))
1549        }
1550        fnet_filter::ChangeValidationResult::__SourceBreaking { .. } => {
1551            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_RESULT)
1552                .into())
1553        }
1554    }
1555}
1556
1557pub(crate) fn handle_commit_result(
1558    commit_result: fnet_filter::CommitResult,
1559    committed_changes: Vec<Change>,
1560) -> Result<(), CommitError> {
1561    match commit_result {
1562        fnet_filter::CommitResult::Ok(fnet_filter::Empty {}) => Ok(()),
1563        fnet_filter::CommitResult::RuleWithInvalidMatcher(rule_id) => {
1564            Err(CommitError::RuleWithInvalidMatcher(rule_id.into()))
1565        }
1566        fnet_filter::CommitResult::RuleWithInvalidAction(rule_id) => {
1567            Err(CommitError::RuleWithInvalidAction(rule_id.into()))
1568        }
1569        fnet_filter::CommitResult::TransparentProxyWithInvalidMatcher(rule_id) => {
1570            Err(CommitError::TransparentProxyWithInvalidMatcher(rule_id.into()))
1571        }
1572        fnet_filter::CommitResult::RedirectWithInvalidMatcher(rule_id) => {
1573            Err(CommitError::RedirectWithInvalidMatcher(rule_id.into()))
1574        }
1575        fnet_filter::CommitResult::MasqueradeWithInvalidMatcher(rule_id) => {
1576            Err(CommitError::MasqueradeWithInvalidMatcher(rule_id.into()))
1577        }
1578        fnet_filter::CommitResult::RejectWithInvalidMatcher(rule_id) => {
1579            Err(CommitError::RejectWithInvalidMatcher(rule_id.into()))
1580        }
1581        fnet_filter::CommitResult::CyclicalRoutineGraph(routine_id) => {
1582            Err(CommitError::CyclicalRoutineGraph(routine_id.into()))
1583        }
1584        fnet_filter::CommitResult::ErrorOnChange(results) => {
1585            let errors: Result<_, CommitError> = committed_changes
1586                .into_iter()
1587                .zip(results)
1588                .try_fold(Vec::new(), |mut errors, (change, result)| match result {
1589                    fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1590                        Ok(errors)
1591                    }
1592                    error @ (fnet_filter::CommitError::NamespaceNotFound
1593                    | fnet_filter::CommitError::RoutineNotFound
1594                    | fnet_filter::CommitError::RuleNotFound
1595                    | fnet_filter::CommitError::AlreadyExists
1596                    | fnet_filter::CommitError::TargetRoutineIsInstalled
1597                    | fnet_filter::CommitError::InvalidEbpfProgramId) => {
1598                        let error = error
1599                            .try_into()
1600                            .expect("`Ok` and `NotReached` are handled in another arm");
1601                        errors.push((change, error));
1602                        Ok(errors)
1603                    }
1604                    fnet_filter::CommitError::__SourceBreaking { .. } => {
1605                        Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR)
1606                            .into())
1607                    }
1608                });
1609            Err(CommitError::ErrorOnChange(errors?))
1610        }
1611        fnet_filter::CommitResult::__SourceBreaking { .. } => {
1612            Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_RESULT).into())
1613        }
1614    }
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619
1620    use assert_matches::assert_matches;
1621    use fidl_fuchsia_net_matchers as fnet_matchers;
1622    use futures::channel::mpsc;
1623    use futures::task::Poll;
1624    use futures::{FutureExt as _, SinkExt as _};
1625    use test_case::test_case;
1626
1627    use fidl_fuchsia_hardware_network as fhardware_network;
1628    use fidl_fuchsia_net_interfaces as fnet_interfaces;
1629
1630    use super::*;
1631
1632    #[test_case(
1633        fnet_filter::ResourceId::Namespace(String::from("namespace")),
1634        ResourceId::Namespace(NamespaceId(String::from("namespace")));
1635        "NamespaceId"
1636    )]
1637    #[test_case(fnet_filter::Domain::Ipv4, Domain::Ipv4; "Domain")]
1638    #[test_case(
1639        fnet_filter::Namespace {
1640            id: Some(String::from("namespace")),
1641            domain: Some(fnet_filter::Domain::Ipv4),
1642            ..Default::default()
1643        },
1644        Namespace { id: NamespaceId(String::from("namespace")), domain: Domain::Ipv4 };
1645        "Namespace"
1646    )]
1647    #[test_case(fnet_filter::IpInstallationHook::Egress, IpHook::Egress; "IpHook")]
1648    #[test_case(fnet_filter::NatInstallationHook::Egress, NatHook::Egress; "NatHook")]
1649    #[test_case(
1650        fnet_filter::InstalledIpRoutine {
1651            hook: Some(fnet_filter::IpInstallationHook::Egress),
1652            priority: Some(1),
1653            ..Default::default()
1654        },
1655        InstalledIpRoutine {
1656            hook: IpHook::Egress,
1657            priority: 1,
1658        };
1659        "InstalledIpRoutine"
1660    )]
1661    #[test_case(
1662        fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
1663            installation: Some(fnet_filter::InstalledIpRoutine {
1664                hook: Some(fnet_filter::IpInstallationHook::LocalEgress),
1665                priority: Some(1),
1666                ..Default::default()
1667            }),
1668            ..Default::default()
1669        }),
1670        RoutineType::Ip(Some(InstalledIpRoutine { hook: IpHook::LocalEgress, priority: 1 }));
1671        "RoutineType"
1672    )]
1673    #[test_case(
1674        fnet_filter::Routine {
1675            id: Some(fnet_filter::RoutineId {
1676                namespace: String::from("namespace"),
1677                name: String::from("routine"),
1678            }),
1679            type_: Some(fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine::default())),
1680            ..Default::default()
1681        },
1682        Routine {
1683            id: RoutineId {
1684                namespace: NamespaceId(String::from("namespace")),
1685                name: String::from("routine"),
1686            },
1687            routine_type: RoutineType::Nat(None),
1688        };
1689        "Routine"
1690    )]
1691    #[test_case(
1692        fnet_filter::Matchers {
1693            in_interface: Some(fnet_matchers::Interface::Name(String::from("wlan"))),
1694            transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Tcp(fnet_matchers::TcpPacket {
1695                src_port: None,
1696                dst_port: Some(fnet_matchers::Port { start: 22, end: 22, invert: false }),
1697                ..Default::default()
1698            })),
1699            ..Default::default()
1700        },
1701        Matchers {
1702            in_interface: Some(fnet_matchers_ext::Interface::Name(String::from("wlan"))),
1703            transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Tcp {
1704                src_port: None,
1705                dst_port: Some(fnet_matchers_ext::Port::new(22, 22, false).unwrap()),
1706            }),
1707            ..Default::default()
1708        };
1709        "Matchers"
1710    )]
1711    #[test_case(
1712        fnet_filter::Action::Accept(fnet_filter::Empty {}),
1713        Action::Accept;
1714        "Action"
1715    )]
1716    #[test_case(
1717        fnet_filter::Rule {
1718            id: fnet_filter::RuleId {
1719                routine: fnet_filter::RoutineId {
1720                    namespace: String::from("namespace"),
1721                    name: String::from("routine"),
1722                },
1723                index: 1,
1724            },
1725            matchers: fnet_filter::Matchers {
1726                transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Icmp(
1727                    fnet_matchers::IcmpPacket::default()
1728                )),
1729                ..Default::default()
1730            },
1731            action: fnet_filter::Action::Drop(fnet_filter::Empty {}),
1732        },
1733        Rule {
1734            id: RuleId {
1735                routine: RoutineId {
1736                    namespace: NamespaceId(String::from("namespace")),
1737                    name: String::from("routine"),
1738                },
1739                index: 1,
1740            },
1741            matchers: Matchers {
1742                transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Icmp),
1743                ..Default::default()
1744            },
1745            action: Action::Drop,
1746        };
1747        "Rule"
1748    )]
1749    #[test_case(
1750        fnet_filter::Resource::Namespace(fnet_filter::Namespace {
1751            id: Some(String::from("namespace")),
1752            domain: Some(fnet_filter::Domain::Ipv4),
1753            ..Default::default()
1754        }),
1755        Resource::Namespace(Namespace {
1756            id: NamespaceId(String::from("namespace")),
1757            domain: Domain::Ipv4
1758        });
1759        "Resource"
1760    )]
1761    #[test_case(
1762        fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}),
1763        Event::EndOfUpdate;
1764        "Event"
1765    )]
1766    #[test_case(
1767        fnet_filter::Change::Remove(fnet_filter::ResourceId::Namespace(String::from("namespace"))),
1768        Change::Remove(ResourceId::Namespace(NamespaceId(String::from("namespace"))));
1769        "Change"
1770    )]
1771    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
1772    where
1773        E: TryFrom<F> + Clone + Debug + PartialEq,
1774        <E as TryFrom<F>>::Error: Debug + PartialEq,
1775        F: From<E> + Clone + Debug + PartialEq,
1776    {
1777        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
1778        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type.clone());
1779    }
1780
1781    #[test]
1782    fn resource_id_try_from_unknown_variant() {
1783        assert_eq!(
1784            ResourceId::try_from(fnet_filter::ResourceId::__SourceBreaking { unknown_ordinal: 0 }),
1785            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
1786        );
1787    }
1788
1789    #[test]
1790    fn domain_try_from_unknown_variant() {
1791        assert_eq!(
1792            Domain::try_from(fnet_filter::Domain::__SourceBreaking { unknown_ordinal: 0 }),
1793            Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
1794        );
1795    }
1796
1797    #[test]
1798    fn namespace_try_from_missing_properties() {
1799        assert_eq!(
1800            Namespace::try_from(fnet_filter::Namespace {
1801                id: None,
1802                domain: Some(fnet_filter::Domain::Ipv4),
1803                ..Default::default()
1804            }),
1805            Err(FidlConversionError::MissingNamespaceId)
1806        );
1807        assert_eq!(
1808            Namespace::try_from(fnet_filter::Namespace {
1809                id: Some(String::from("namespace")),
1810                domain: None,
1811                ..Default::default()
1812            }),
1813            Err(FidlConversionError::MissingNamespaceDomain)
1814        );
1815    }
1816
1817    #[test]
1818    fn ip_installation_hook_try_from_unknown_variant() {
1819        assert_eq!(
1820            IpHook::try_from(fnet_filter::IpInstallationHook::__SourceBreaking {
1821                unknown_ordinal: 0
1822            }),
1823            Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
1824        );
1825    }
1826
1827    #[test]
1828    fn nat_installation_hook_try_from_unknown_variant() {
1829        assert_eq!(
1830            NatHook::try_from(fnet_filter::NatInstallationHook::__SourceBreaking {
1831                unknown_ordinal: 0
1832            }),
1833            Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
1834        );
1835    }
1836
1837    #[test]
1838    fn installed_ip_routine_try_from_missing_hook() {
1839        assert_eq!(
1840            InstalledIpRoutine::try_from(fnet_filter::InstalledIpRoutine {
1841                hook: None,
1842                ..Default::default()
1843            }),
1844            Err(FidlConversionError::MissingIpInstallationHook)
1845        );
1846    }
1847
1848    #[test]
1849    fn installed_nat_routine_try_from_missing_hook() {
1850        assert_eq!(
1851            InstalledNatRoutine::try_from(fnet_filter::InstalledNatRoutine {
1852                hook: None,
1853                ..Default::default()
1854            }),
1855            Err(FidlConversionError::MissingNatInstallationHook)
1856        );
1857    }
1858
1859    #[test]
1860    fn routine_type_try_from_unknown_variant() {
1861        assert_eq!(
1862            RoutineType::try_from(fnet_filter::RoutineType::__SourceBreaking {
1863                unknown_ordinal: 0
1864            }),
1865            Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
1866        );
1867    }
1868
1869    #[test]
1870    fn routine_try_from_missing_properties() {
1871        assert_eq!(
1872            Routine::try_from(fnet_filter::Routine { id: None, ..Default::default() }),
1873            Err(FidlConversionError::MissingRoutineId)
1874        );
1875        assert_eq!(
1876            Routine::try_from(fnet_filter::Routine {
1877                id: Some(fnet_filter::RoutineId {
1878                    namespace: String::from("namespace"),
1879                    name: String::from("routine"),
1880                }),
1881                type_: None,
1882                ..Default::default()
1883            }),
1884            Err(FidlConversionError::MissingRoutineType)
1885        );
1886    }
1887
1888    #[test_case(
1889        fnet_matchers_ext::PortError::InvalidPortRange =>
1890        FidlConversionError::InvalidPortMatcherRange
1891    )]
1892    #[test_case(
1893        fnet_matchers_ext::InterfaceError::ZeroId =>
1894        FidlConversionError::ZeroInterfaceId
1895    )]
1896    #[test_case(
1897        fnet_matchers_ext::InterfaceError::UnknownUnionVariant =>
1898        FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
1899    )]
1900    #[test_case(
1901        {
1902            let invalid_port_class = fnet_interfaces::PortClass::__SourceBreaking {
1903                unknown_ordinal: 0
1904            };
1905            let error = fnet_interfaces_ext::PortClass::try_from(
1906                invalid_port_class
1907            ).unwrap_err();
1908            fnet_matchers_ext::InterfaceError::UnknownPortClass(error)
1909        } =>
1910        FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS)
1911    )]
1912    #[test_case(
1913        {
1914            let invalid_port_class = fhardware_network::PortClass::__SourceBreaking {
1915                unknown_ordinal: 0
1916            };
1917            let error = fnet_interfaces_ext::PortClass::try_from(
1918                invalid_port_class
1919            ).unwrap_err();
1920            fnet_matchers_ext::InterfaceError::UnknownPortClass(
1921                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(error))
1922        } =>
1923        FidlConversionError::UnknownUnionVariant(type_names::HARDWARE_NETWORK_PORT_CLASS)
1924    )]
1925    #[test_case(
1926        fnet_matchers_ext::SubnetError::PrefixTooLong =>
1927        FidlConversionError::SubnetPrefixTooLong
1928    )]
1929    #[test_case(
1930        fnet_matchers_ext::SubnetError::HostBitsSet =>
1931        FidlConversionError::SubnetHostBitsSet
1932    )]
1933    #[test_case(
1934        fnet_matchers_ext::AddressRangeError::Invalid =>
1935        FidlConversionError::InvalidAddressRange
1936    )]
1937    #[test_case(
1938        fnet_matchers_ext::AddressRangeError::FamilyMismatch =>
1939        FidlConversionError::AddressRangeFamilyMismatch
1940    )]
1941    #[test_case(
1942        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1943            fnet_matchers_ext::SubnetError::PrefixTooLong) =>
1944        FidlConversionError::SubnetPrefixTooLong
1945    )]
1946    #[test_case(
1947        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1948            fnet_matchers_ext::SubnetError::HostBitsSet) =>
1949        FidlConversionError::SubnetHostBitsSet
1950    )]
1951    #[test_case(
1952        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1953            fnet_matchers_ext::AddressRangeError::Invalid) =>
1954        FidlConversionError::InvalidAddressRange
1955    )]
1956    #[test_case(
1957        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1958            fnet_matchers_ext::AddressRangeError::FamilyMismatch) =>
1959        FidlConversionError::AddressRangeFamilyMismatch
1960    )]
1961    #[test_case(
1962        fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant =>
1963        FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1964    )]
1965    #[test_case(
1966        fnet_matchers_ext::AddressError::AddressMatcherType(
1967            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1968                fnet_matchers_ext::SubnetError::PrefixTooLong)) =>
1969        FidlConversionError::SubnetPrefixTooLong
1970    )]
1971    #[test_case(
1972        fnet_matchers_ext::AddressError::AddressMatcherType(
1973            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1974                fnet_matchers_ext::SubnetError::HostBitsSet)) =>
1975        FidlConversionError::SubnetHostBitsSet
1976    )]
1977    #[test_case(
1978        fnet_matchers_ext::AddressError::AddressMatcherType(
1979            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1980                fnet_matchers_ext::AddressRangeError::Invalid)) =>
1981        FidlConversionError::InvalidAddressRange
1982    )]
1983    #[test_case(
1984        fnet_matchers_ext::AddressError::AddressMatcherType(
1985            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1986                fnet_matchers_ext::AddressRangeError::FamilyMismatch)) =>
1987        FidlConversionError::AddressRangeFamilyMismatch
1988    )]
1989    #[test_case(
1990        fnet_matchers_ext::AddressError::AddressMatcherType(
1991            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant) =>
1992            FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1993    )]
1994    #[test_case(
1995        fnet_matchers_ext::TransportProtocolError::Port(
1996            fnet_matchers_ext::PortError::InvalidPortRange) =>
1997        FidlConversionError::InvalidPortMatcherRange
1998    )]
1999    #[test_case(
2000        fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant =>
2001            FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
2002    )]
2003    fn fidl_error_from_matcher_error<E: Into<FidlConversionError>>(
2004        error: E,
2005    ) -> FidlConversionError {
2006        error.into()
2007    }
2008
2009    #[test]
2010    fn action_try_from_unknown_variant() {
2011        assert_eq!(
2012            Action::try_from(fnet_filter::Action::__SourceBreaking { unknown_ordinal: 0 }),
2013            Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
2014        );
2015    }
2016
2017    #[test]
2018    fn resource_try_from_unknown_variant() {
2019        assert_eq!(
2020            Resource::try_from(fnet_filter::Resource::__SourceBreaking { unknown_ordinal: 0 }),
2021            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
2022        );
2023    }
2024
2025    #[test]
2026    fn event_try_from_unknown_variant() {
2027        assert_eq!(
2028            Event::try_from(fnet_filter::Event::__SourceBreaking { unknown_ordinal: 0 }),
2029            Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
2030        );
2031    }
2032
2033    #[test]
2034    fn change_try_from_unknown_variant() {
2035        assert_eq!(
2036            Change::try_from(fnet_filter::Change::__SourceBreaking { unknown_ordinal: 0 }),
2037            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
2038        );
2039    }
2040
2041    fn test_controller_a() -> ControllerId {
2042        ControllerId(String::from("test-controller-a"))
2043    }
2044
2045    fn test_controller_b() -> ControllerId {
2046        ControllerId(String::from("test-controller-b"))
2047    }
2048
2049    pub(crate) fn test_resource_id() -> ResourceId {
2050        ResourceId::Namespace(NamespaceId(String::from("test-namespace")))
2051    }
2052
2053    pub(crate) fn test_resource() -> Resource {
2054        Resource::Namespace(Namespace {
2055            id: NamespaceId(String::from("test-namespace")),
2056            domain: Domain::AllIp,
2057        })
2058    }
2059
2060    // We can't easily create an invalid resource, so we just pretend and fake
2061    // the server response in tests.
2062    pub(crate) fn pretend_invalid_resource() -> Resource {
2063        Resource::Namespace(Namespace {
2064            id: NamespaceId(String::from("pretend-invalid-namespace")),
2065            domain: Domain::AllIp,
2066        })
2067    }
2068
2069    pub(crate) fn unknown_resource_id() -> ResourceId {
2070        ResourceId::Namespace(NamespaceId(String::from("does-not-exist")))
2071    }
2072
2073    #[fuchsia_async::run_singlethreaded(test)]
2074    async fn event_stream_from_state_conversion_error() {
2075        let (proxy, mut request_stream) =
2076            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2077        let stream = event_stream_from_state(proxy).expect("get event stream");
2078        futures::pin_mut!(stream);
2079
2080        let send_invalid_event = async {
2081            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2082                request_stream
2083                    .next()
2084                    .await
2085                    .expect("client should call state")
2086                    .expect("request should not error");
2087            let fnet_filter::WatcherRequest::Watch { responder } = request
2088                .into_stream()
2089                .next()
2090                .await
2091                .expect("client should call watch")
2092                .expect("request should not error");
2093            responder
2094                .send(&[fnet_filter::Event::Added(fnet_filter::AddedResource {
2095                    controller: String::from("controller"),
2096                    resource: fnet_filter::Resource::Namespace(fnet_filter::Namespace {
2097                        id: None,
2098                        domain: None,
2099                        ..Default::default()
2100                    }),
2101                })])
2102                .expect("send batch with invalid event");
2103        };
2104        let ((), result) = futures::future::join(send_invalid_event, stream.next()).await;
2105        assert_matches!(
2106            result,
2107            Some(Err(WatchError::Conversion(FidlConversionError::MissingNamespaceId)))
2108        );
2109    }
2110
2111    #[fuchsia_async::run_singlethreaded(test)]
2112    async fn event_stream_from_state_empty_event_batch() {
2113        let (proxy, mut request_stream) =
2114            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2115        let stream = event_stream_from_state(proxy).expect("get event stream");
2116        futures::pin_mut!(stream);
2117
2118        let send_empty_batch = async {
2119            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2120                request_stream
2121                    .next()
2122                    .await
2123                    .expect("client should call state")
2124                    .expect("request should not error");
2125            let fnet_filter::WatcherRequest::Watch { responder } = request
2126                .into_stream()
2127                .next()
2128                .await
2129                .expect("client should call watch")
2130                .expect("request should not error");
2131            responder.send(&[]).expect("send empty batch");
2132        };
2133        let ((), result) = futures::future::join(send_empty_batch, stream.next()).await;
2134        assert_matches!(result, Some(Err(WatchError::EmptyEventBatch)));
2135    }
2136
2137    #[fuchsia_async::run_singlethreaded(test)]
2138    async fn get_existing_resources_success() {
2139        let event_stream = futures::stream::iter([
2140            Ok(Event::Existing(test_controller_a(), test_resource())),
2141            Ok(Event::Existing(test_controller_b(), test_resource())),
2142            Ok(Event::Idle),
2143            Ok(Event::Removed(test_controller_a(), test_resource_id())),
2144        ]);
2145        futures::pin_mut!(event_stream);
2146
2147        let existing = get_existing_resources::<HashMap<_, _>>(event_stream.by_ref())
2148            .await
2149            .expect("get existing resources");
2150        assert_eq!(
2151            existing,
2152            HashMap::from([
2153                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2154                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2155            ])
2156        );
2157
2158        let trailing_events = event_stream.collect::<Vec<_>>().await;
2159        assert_matches!(
2160            &trailing_events[..],
2161            [Ok(Event::Removed(controller, resource))] if controller == &test_controller_a() &&
2162                                                           resource == &test_resource_id()
2163        );
2164    }
2165
2166    #[fuchsia_async::run_singlethreaded(test)]
2167    async fn get_existing_resources_error_in_stream() {
2168        let event_stream =
2169            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2170        futures::pin_mut!(event_stream);
2171        assert_matches!(
2172            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2173            Err(GetExistingResourcesError::ErrorInStream(WatchError::EmptyEventBatch))
2174        )
2175    }
2176
2177    #[fuchsia_async::run_singlethreaded(test)]
2178    async fn get_existing_resources_unexpected_event() {
2179        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::EndOfUpdate)));
2180        futures::pin_mut!(event_stream);
2181        assert_matches!(
2182            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2183            Err(GetExistingResourcesError::UnexpectedEvent(Event::EndOfUpdate))
2184        )
2185    }
2186
2187    #[fuchsia_async::run_singlethreaded(test)]
2188    async fn get_existing_resources_duplicate_resource() {
2189        let event_stream = futures::stream::iter([
2190            Ok(Event::Existing(test_controller_a(), test_resource())),
2191            Ok(Event::Existing(test_controller_a(), test_resource())),
2192        ]);
2193        futures::pin_mut!(event_stream);
2194        assert_matches!(
2195            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2196            Err(GetExistingResourcesError::DuplicateResource(resource))
2197                if resource == test_resource()
2198        )
2199    }
2200
2201    #[fuchsia_async::run_singlethreaded(test)]
2202    async fn get_existing_resources_stream_ended() {
2203        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::Existing(
2204            test_controller_a(),
2205            test_resource(),
2206        ))));
2207        futures::pin_mut!(event_stream);
2208        assert_matches!(
2209            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2210            Err(GetExistingResourcesError::StreamEnded)
2211        )
2212    }
2213
2214    #[fuchsia_async::run_singlethreaded(test)]
2215    async fn wait_for_condition_add_remove() {
2216        let mut state = HashMap::new();
2217
2218        // Verify that checking for the presence of a resource blocks until the
2219        // resource is added.
2220        let has_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2221            resources.get(&test_controller_a()).map_or(false, |controller| {
2222                controller
2223                    .get(&test_resource_id())
2224                    .map_or(false, |resource| resource == &test_resource())
2225            })
2226        };
2227        assert_matches!(
2228            wait_for_condition(futures::stream::pending(), &mut state, has_resource).now_or_never(),
2229            None
2230        );
2231        assert!(state.is_empty());
2232        assert_matches!(
2233            wait_for_condition(
2234                futures::stream::iter([
2235                    Ok(Event::Added(test_controller_b(), test_resource())),
2236                    Ok(Event::EndOfUpdate),
2237                    Ok(Event::Added(test_controller_a(), test_resource())),
2238                    Ok(Event::EndOfUpdate),
2239                ]),
2240                &mut state,
2241                has_resource
2242            )
2243            .now_or_never(),
2244            Some(Ok(()))
2245        );
2246        assert_eq!(
2247            state,
2248            HashMap::from([
2249                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2250                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2251            ])
2252        );
2253
2254        // Re-add the resource and observe an error.
2255        assert_matches!(
2256            wait_for_condition(
2257                futures::stream::iter([
2258                    Ok(Event::Added(test_controller_a(), test_resource())),
2259                    Ok(Event::EndOfUpdate),
2260                ]),
2261                &mut state,
2262                has_resource
2263            )
2264            .now_or_never(),
2265            Some(Err(WaitForConditionError::AddedAlreadyExisting(r))) if r == test_resource()
2266        );
2267        assert_eq!(
2268            state,
2269            HashMap::from([
2270                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2271                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2272            ])
2273        );
2274
2275        // Verify that checking for the absence of a resource blocks until the
2276        // resource is removed.
2277        let does_not_have_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2278            resources.get(&test_controller_a()).map_or(false, |controller| controller.is_empty())
2279        };
2280        assert_matches!(
2281            wait_for_condition(futures::stream::pending(), &mut state, does_not_have_resource)
2282                .now_or_never(),
2283            None
2284        );
2285        assert_eq!(
2286            state,
2287            HashMap::from([
2288                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2289                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2290            ])
2291        );
2292        assert_matches!(
2293            wait_for_condition(
2294                futures::stream::iter([
2295                    Ok(Event::Removed(test_controller_b(), test_resource_id())),
2296                    Ok(Event::EndOfUpdate),
2297                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2298                    Ok(Event::EndOfUpdate),
2299                ]),
2300                &mut state,
2301                does_not_have_resource
2302            )
2303            .now_or_never(),
2304            Some(Ok(()))
2305        );
2306        assert_eq!(
2307            state,
2308            HashMap::from([
2309                (test_controller_a(), HashMap::new()),
2310                (test_controller_b(), HashMap::new()),
2311            ])
2312        );
2313
2314        // Remove a non-existent resource and observe an error.
2315        assert_matches!(
2316            wait_for_condition(
2317                futures::stream::iter([
2318                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2319                    Ok(Event::EndOfUpdate),
2320                ]),
2321                &mut state,
2322                does_not_have_resource
2323            ).now_or_never(),
2324            Some(Err(WaitForConditionError::RemovedNonExistent(r))) if r == test_resource_id()
2325        );
2326        assert_eq!(
2327            state,
2328            HashMap::from([
2329                (test_controller_a(), HashMap::new()),
2330                (test_controller_b(), HashMap::new()),
2331            ])
2332        );
2333    }
2334
2335    #[test]
2336    fn predicate_not_tested_until_update_complete() {
2337        let mut state = HashMap::new();
2338        let (mut tx, rx) = mpsc::unbounded();
2339
2340        let wait = wait_for_condition(rx, &mut state, |state| !state.is_empty()).fuse();
2341        futures::pin_mut!(wait);
2342
2343        // Sending an `Added` event should *not* allow the wait operation to
2344        // complete, because the predicate should only be tested once the full
2345        // update has been observed.
2346        let mut exec = fuchsia_async::TestExecutor::new();
2347        exec.run_singlethreaded(async {
2348            tx.send(Ok(Event::Added(test_controller_a(), test_resource())))
2349                .await
2350                .expect("receiver should not be closed");
2351        });
2352        assert_matches!(exec.run_until_stalled(&mut wait), Poll::Pending);
2353
2354        exec.run_singlethreaded(async {
2355            tx.send(Ok(Event::EndOfUpdate)).await.expect("receiver should not be closed");
2356            wait.await.expect("condition should be satisfied once update is complete");
2357        });
2358    }
2359
2360    #[fuchsia_async::run_singlethreaded(test)]
2361    async fn wait_for_condition_error_in_stream() {
2362        let mut state = HashMap::new();
2363        let event_stream =
2364            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2365        assert_matches!(
2366            wait_for_condition(event_stream, &mut state, |_| true).await,
2367            Err(WaitForConditionError::ErrorInStream(WatchError::EmptyEventBatch))
2368        );
2369        assert!(state.is_empty());
2370    }
2371
2372    #[fuchsia_async::run_singlethreaded(test)]
2373    async fn wait_for_condition_stream_ended() {
2374        let mut state = HashMap::new();
2375        let event_stream = futures::stream::empty();
2376        assert_matches!(
2377            wait_for_condition(event_stream, &mut state, |_| true).await,
2378            Err(WaitForConditionError::StreamEnded)
2379        );
2380        assert!(state.is_empty());
2381    }
2382
2383    pub(crate) async fn handle_open_controller(
2384        mut request_stream: fnet_filter::ControlRequestStream,
2385    ) -> fnet_filter::NamespaceControllerRequestStream {
2386        let (id, request, _control_handle) = request_stream
2387            .next()
2388            .await
2389            .expect("client should open controller")
2390            .expect("request should not error")
2391            .into_open_controller()
2392            .expect("client should open controller");
2393        let (stream, control_handle) = request.into_stream_and_control_handle();
2394        control_handle.send_on_id_assigned(&id).expect("send assigned ID");
2395
2396        stream
2397    }
2398
2399    pub(crate) async fn handle_push_changes(
2400        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2401        push_changes_result: fnet_filter::ChangeValidationResult,
2402    ) {
2403        let (_changes, responder) = stream
2404            .next()
2405            .await
2406            .expect("client should push changes")
2407            .expect("request should not error")
2408            .into_push_changes()
2409            .expect("client should push changes");
2410        responder.send(push_changes_result).expect("send empty batch");
2411    }
2412
2413    pub(crate) async fn handle_commit(
2414        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2415        commit_result: fnet_filter::CommitResult,
2416    ) {
2417        let (_options, responder) = stream
2418            .next()
2419            .await
2420            .expect("client should commit")
2421            .expect("request should not error")
2422            .into_commit()
2423            .expect("client should commit");
2424        responder.send(commit_result).expect("send commit result");
2425    }
2426
2427    #[fuchsia_async::run_singlethreaded(test)]
2428    async fn controller_push_changes_reports_invalid_change() {
2429        let (control, request_stream) =
2430            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2431        let push_invalid_change = async {
2432            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2433                .await
2434                .expect("create controller");
2435            let result = controller
2436                .push_changes(vec![
2437                    Change::Create(test_resource()),
2438                    // We fake the server response to say this is invalid even
2439                    // though it really isn't.
2440                    Change::Create(pretend_invalid_resource()),
2441                    Change::Remove(test_resource_id()),
2442                ])
2443                .await;
2444            assert_matches!(
2445                result,
2446                Err(PushChangesError::ErrorOnChange(errors)) if errors == vec![(
2447                    Change::Create(pretend_invalid_resource()),
2448                    ChangeValidationError::InvalidPortMatcher
2449                )]
2450            );
2451        };
2452
2453        let handle_controller = async {
2454            let mut stream = handle_open_controller(request_stream).await;
2455            handle_push_changes(
2456                &mut stream,
2457                fnet_filter::ChangeValidationResult::ErrorOnChange(vec![
2458                    fnet_filter::ChangeValidationError::Ok,
2459                    fnet_filter::ChangeValidationError::InvalidPortMatcher,
2460                    fnet_filter::ChangeValidationError::NotReached,
2461                ]),
2462            )
2463            .await;
2464        };
2465
2466        let ((), ()) = futures::future::join(push_invalid_change, handle_controller).await;
2467    }
2468
2469    #[fuchsia_async::run_singlethreaded(test)]
2470    async fn controller_commit_reports_invalid_change() {
2471        let (control, request_stream) =
2472            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2473        let commit_invalid_change = async {
2474            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2475                .await
2476                .expect("create controller");
2477            controller
2478                .push_changes(vec![
2479                    Change::Create(test_resource()),
2480                    Change::Remove(unknown_resource_id()),
2481                    Change::Remove(test_resource_id()),
2482                ])
2483                .await
2484                .expect("push changes");
2485            let result = controller.commit().await;
2486            assert_matches!(
2487                result,
2488                Err(CommitError::ErrorOnChange(errors)) if errors == vec![(
2489                    Change::Remove(unknown_resource_id()),
2490                    ChangeCommitError::NamespaceNotFound,
2491                )]
2492            );
2493        };
2494        let handle_controller = async {
2495            let mut stream = handle_open_controller(request_stream).await;
2496            handle_push_changes(
2497                &mut stream,
2498                fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
2499            )
2500            .await;
2501            handle_commit(
2502                &mut stream,
2503                fnet_filter::CommitResult::ErrorOnChange(vec![
2504                    fnet_filter::CommitError::Ok,
2505                    fnet_filter::CommitError::NamespaceNotFound,
2506                    fnet_filter::CommitError::Ok,
2507                ]),
2508            )
2509            .await;
2510        };
2511        let ((), ()) = futures::future::join(commit_invalid_change, handle_controller).await;
2512    }
2513}