fidl_fuchsia_net_filter_ext/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.net.filter FIDL library.
6//!
7//! Note that this library as written is not meant for inclusion in the SDK. It
8//! is only meant to be used in conjunction with a netstack that is compiled
9//! against the same API level of the `fuchsia.net.filter` FIDL library. This
10//! library opts in to compile-time and runtime breakage when the FIDL library
11//! is evolved in order to enforce that it is updated along with the FIDL
12//! library itself.
13
14#[cfg(target_os = "fuchsia")]
15pub mod sync;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::num::{NonZeroU16, NonZeroU64};
20use std::ops::RangeInclusive;
21
22use async_utils::fold::FoldWhile;
23use fidl::marker::SourceBreaking;
24use fidl_fuchsia_net_ext::IntoExt;
25use futures::{Stream, StreamExt as _, TryStreamExt as _};
26use thiserror::Error;
27use {
28    fidl_fuchsia_net as fnet, fidl_fuchsia_net_filter as fnet_filter,
29    fidl_fuchsia_net_interfaces as fnet_interfaces,
30    fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext, fidl_fuchsia_net_root as fnet_root,
31};
32
33/// Conversion errors from `fnet_filter` FIDL types to the
34/// equivalents defined in this module.
35#[derive(Debug, Error, PartialEq)]
36pub enum FidlConversionError {
37    #[error("union is of an unknown variant: {0}")]
38    UnknownUnionVariant(&'static str),
39    #[error("namespace ID not provided")]
40    MissingNamespaceId,
41    #[error("namespace domain not provided")]
42    MissingNamespaceDomain,
43    #[error("routine ID not provided")]
44    MissingRoutineId,
45    #[error("routine type not provided")]
46    MissingRoutineType,
47    #[error("IP installation hook not provided")]
48    MissingIpInstallationHook,
49    #[error("NAT installation hook not provided")]
50    MissingNatInstallationHook,
51    #[error("interface matcher specified an invalid ID of 0")]
52    ZeroInterfaceId,
53    #[error("invalid address range (start must be <= end)")]
54    InvalidAddressRange,
55    #[error("address range start and end addresses are not the same IP family")]
56    AddressRangeFamilyMismatch,
57    #[error("prefix length of subnet is longer than number of bits in IP address")]
58    SubnetPrefixTooLong,
59    #[error("host bits are set in subnet network")]
60    SubnetHostBitsSet,
61    #[error("invalid port matcher range (start must be <= end)")]
62    InvalidPortMatcherRange,
63    #[error("transparent proxy action specified an invalid local port of 0")]
64    UnspecifiedTransparentProxyPort,
65    #[error("NAT action specified an invalid rewrite port of 0")]
66    UnspecifiedNatPort,
67    #[error("invalid port range (start must be <= end)")]
68    InvalidPortRange,
69    #[error("non-error result variant could not be converted to an error")]
70    NotAnError,
71}
72
73// TODO(https://fxbug.dev/317058051): remove this when the Rust FIDL bindings
74// expose constants for these.
75mod type_names {
76    pub(super) const RESOURCE_ID: &str = "fuchsia.net.filter/ResourceId";
77    pub(super) const DOMAIN: &str = "fuchsia.net.filter/Domain";
78    pub(super) const IP_INSTALLATION_HOOK: &str = "fuchsia.net.filter/IpInstallationHook";
79    pub(super) const NAT_INSTALLATION_HOOK: &str = "fuchsia.net.filter/NatInstallationHook";
80    pub(super) const ROUTINE_TYPE: &str = "fuchsia.net.filter/RoutineType";
81    pub(super) const INTERFACE_MATCHER: &str = "fuchsia.net.filter/InterfaceMatcher";
82    pub(super) const ADDRESS_MATCHER_TYPE: &str = "fuchsia.net.filter/AddressMatcherType";
83    pub(super) const TRANSPORT_PROTOCOL: &str = "fuchsia.net.filter/TransportProtocol";
84    pub(super) const ACTION: &str = "fuchsia.net.filter/Action";
85    pub(super) const TRANSPARENT_PROXY: &str = "fuchsia.net.filter/TransparentProxy";
86    pub(super) const RESOURCE: &str = "fuchsia.net.filter/Resource";
87    pub(super) const EVENT: &str = "fuchsia.net.filter/Event";
88    pub(super) const CHANGE: &str = "fuchsia.net.filter/Change";
89    pub(super) const CHANGE_VALIDATION_ERROR: &str = "fuchsia.net.filter/ChangeValidationError";
90    pub(super) const CHANGE_VALIDATION_RESULT: &str = "fuchsia.net.filter/ChangeValidationResult";
91    pub(super) const COMMIT_ERROR: &str = "fuchsia.net.filter/CommitError";
92    pub(super) const COMMIT_RESULT: &str = "fuchsia.net.filter/CommitResult";
93    pub(super) const NET_INTERFACES_PORT_CLASS: &str = "fuchsia.net.interfaces/PortClass";
94    pub(super) const HARDWARE_NETWORK_PORT_CLASS: &str = "fuchsia.hardware.network/PortClass";
95}
96
97/// Extension type for [`fnet_filter::NamespaceId`].
98#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
99pub struct NamespaceId(pub String);
100
101/// Extension type for [`fnet_filter::RoutineId`].
102#[derive(Debug, Clone, PartialEq, Eq, Hash)]
103pub struct RoutineId {
104    pub namespace: NamespaceId,
105    pub name: String,
106}
107
108impl From<fnet_filter::RoutineId> for RoutineId {
109    fn from(id: fnet_filter::RoutineId) -> Self {
110        let fnet_filter::RoutineId { namespace, name } = id;
111        Self { namespace: NamespaceId(namespace), name }
112    }
113}
114
115impl From<RoutineId> for fnet_filter::RoutineId {
116    fn from(id: RoutineId) -> Self {
117        let RoutineId { namespace, name } = id;
118        let NamespaceId(namespace) = namespace;
119        Self { namespace, name }
120    }
121}
122
123/// Extension type for [`fnet_filter::RuleId`].
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
125pub struct RuleId {
126    pub routine: RoutineId,
127    pub index: u32,
128}
129
130impl From<fnet_filter::RuleId> for RuleId {
131    fn from(id: fnet_filter::RuleId) -> Self {
132        let fnet_filter::RuleId { routine, index } = id;
133        Self { routine: routine.into(), index }
134    }
135}
136
137impl From<RuleId> for fnet_filter::RuleId {
138    fn from(id: RuleId) -> Self {
139        let RuleId { routine, index } = id;
140        Self { routine: routine.into(), index }
141    }
142}
143
144/// Extension type for [`fnet_filter::ResourceId`].
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146pub enum ResourceId {
147    Namespace(NamespaceId),
148    Routine(RoutineId),
149    Rule(RuleId),
150}
151
152impl TryFrom<fnet_filter::ResourceId> for ResourceId {
153    type Error = FidlConversionError;
154
155    fn try_from(id: fnet_filter::ResourceId) -> Result<Self, Self::Error> {
156        match id {
157            fnet_filter::ResourceId::Namespace(id) => Ok(Self::Namespace(NamespaceId(id))),
158            fnet_filter::ResourceId::Routine(id) => Ok(Self::Routine(id.into())),
159            fnet_filter::ResourceId::Rule(id) => Ok(Self::Rule(id.into())),
160            fnet_filter::ResourceId::__SourceBreaking { .. } => {
161                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
162            }
163        }
164    }
165}
166
167impl From<ResourceId> for fnet_filter::ResourceId {
168    fn from(id: ResourceId) -> Self {
169        match id {
170            ResourceId::Namespace(NamespaceId(id)) => fnet_filter::ResourceId::Namespace(id),
171            ResourceId::Routine(id) => fnet_filter::ResourceId::Routine(id.into()),
172            ResourceId::Rule(id) => fnet_filter::ResourceId::Rule(id.into()),
173        }
174    }
175}
176
177/// Extension type for [`fnet_filter::Domain`].
178#[derive(Debug, Clone, PartialEq)]
179pub enum Domain {
180    Ipv4,
181    Ipv6,
182    AllIp,
183}
184
185impl From<Domain> for fnet_filter::Domain {
186    fn from(domain: Domain) -> Self {
187        match domain {
188            Domain::Ipv4 => fnet_filter::Domain::Ipv4,
189            Domain::Ipv6 => fnet_filter::Domain::Ipv6,
190            Domain::AllIp => fnet_filter::Domain::AllIp,
191        }
192    }
193}
194
195impl TryFrom<fnet_filter::Domain> for Domain {
196    type Error = FidlConversionError;
197
198    fn try_from(domain: fnet_filter::Domain) -> Result<Self, Self::Error> {
199        match domain {
200            fnet_filter::Domain::Ipv4 => Ok(Self::Ipv4),
201            fnet_filter::Domain::Ipv6 => Ok(Self::Ipv6),
202            fnet_filter::Domain::AllIp => Ok(Self::AllIp),
203            fnet_filter::Domain::__SourceBreaking { .. } => {
204                Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
205            }
206        }
207    }
208}
209
210/// Extension type for [`fnet_filter::Namespace`].
211#[derive(Debug, Clone, PartialEq)]
212pub struct Namespace {
213    pub id: NamespaceId,
214    pub domain: Domain,
215}
216
217impl From<Namespace> for fnet_filter::Namespace {
218    fn from(namespace: Namespace) -> Self {
219        let Namespace { id, domain } = namespace;
220        let NamespaceId(id) = id;
221        Self { id: Some(id), domain: Some(domain.into()), __source_breaking: SourceBreaking }
222    }
223}
224
225impl TryFrom<fnet_filter::Namespace> for Namespace {
226    type Error = FidlConversionError;
227
228    fn try_from(namespace: fnet_filter::Namespace) -> Result<Self, Self::Error> {
229        let fnet_filter::Namespace { id, domain, __source_breaking } = namespace;
230        let id = NamespaceId(id.ok_or(FidlConversionError::MissingNamespaceId)?);
231        let domain = domain.ok_or(FidlConversionError::MissingNamespaceDomain)?.try_into()?;
232        Ok(Self { id, domain })
233    }
234}
235
236/// Extension type for [`fnet_filter::IpInstallationHook`].
237#[derive(Debug, Clone, Copy, PartialEq)]
238pub enum IpHook {
239    Ingress,
240    LocalIngress,
241    Forwarding,
242    LocalEgress,
243    Egress,
244}
245
246impl From<IpHook> for fnet_filter::IpInstallationHook {
247    fn from(hook: IpHook) -> Self {
248        match hook {
249            IpHook::Ingress => Self::Ingress,
250            IpHook::LocalIngress => Self::LocalIngress,
251            IpHook::Forwarding => Self::Forwarding,
252            IpHook::LocalEgress => Self::LocalEgress,
253            IpHook::Egress => Self::Egress,
254        }
255    }
256}
257
258impl TryFrom<fnet_filter::IpInstallationHook> for IpHook {
259    type Error = FidlConversionError;
260
261    fn try_from(hook: fnet_filter::IpInstallationHook) -> Result<Self, Self::Error> {
262        match hook {
263            fnet_filter::IpInstallationHook::Ingress => Ok(Self::Ingress),
264            fnet_filter::IpInstallationHook::LocalIngress => Ok(Self::LocalIngress),
265            fnet_filter::IpInstallationHook::Forwarding => Ok(Self::Forwarding),
266            fnet_filter::IpInstallationHook::LocalEgress => Ok(Self::LocalEgress),
267            fnet_filter::IpInstallationHook::Egress => Ok(Self::Egress),
268            fnet_filter::IpInstallationHook::__SourceBreaking { .. } => {
269                Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
270            }
271        }
272    }
273}
274
275/// Extension type for [`fnet_filter::NatInstallationHook`].
276#[derive(Debug, Clone, Copy, PartialEq)]
277pub enum NatHook {
278    Ingress,
279    LocalIngress,
280    LocalEgress,
281    Egress,
282}
283
284impl From<NatHook> for fnet_filter::NatInstallationHook {
285    fn from(hook: NatHook) -> Self {
286        match hook {
287            NatHook::Ingress => Self::Ingress,
288            NatHook::LocalIngress => Self::LocalIngress,
289            NatHook::LocalEgress => Self::LocalEgress,
290            NatHook::Egress => Self::Egress,
291        }
292    }
293}
294
295impl TryFrom<fnet_filter::NatInstallationHook> for NatHook {
296    type Error = FidlConversionError;
297
298    fn try_from(hook: fnet_filter::NatInstallationHook) -> Result<Self, Self::Error> {
299        match hook {
300            fnet_filter::NatInstallationHook::Ingress => Ok(Self::Ingress),
301            fnet_filter::NatInstallationHook::LocalIngress => Ok(Self::LocalIngress),
302            fnet_filter::NatInstallationHook::LocalEgress => Ok(Self::LocalEgress),
303            fnet_filter::NatInstallationHook::Egress => Ok(Self::Egress),
304            fnet_filter::NatInstallationHook::__SourceBreaking { .. } => {
305                Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
306            }
307        }
308    }
309}
310
311/// Extension type for [`fnet_filter::InstalledIpRoutine`].
312#[derive(Debug, Clone, PartialEq)]
313pub struct InstalledIpRoutine {
314    pub hook: IpHook,
315    pub priority: i32,
316}
317
318impl From<InstalledIpRoutine> for fnet_filter::InstalledIpRoutine {
319    fn from(routine: InstalledIpRoutine) -> Self {
320        let InstalledIpRoutine { hook, priority } = routine;
321        Self {
322            hook: Some(hook.into()),
323            priority: Some(priority),
324            __source_breaking: SourceBreaking,
325        }
326    }
327}
328
329impl TryFrom<fnet_filter::InstalledIpRoutine> for InstalledIpRoutine {
330    type Error = FidlConversionError;
331
332    fn try_from(routine: fnet_filter::InstalledIpRoutine) -> Result<Self, Self::Error> {
333        let fnet_filter::InstalledIpRoutine { hook, priority, __source_breaking } = routine;
334        let hook = hook.ok_or(FidlConversionError::MissingIpInstallationHook)?;
335        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
336        Ok(Self { hook: hook.try_into()?, priority })
337    }
338}
339
340/// Extension type for [`fnet_filter::InstalledNatRoutine`].
341#[derive(Debug, Clone, PartialEq)]
342pub struct InstalledNatRoutine {
343    pub hook: NatHook,
344    pub priority: i32,
345}
346
347impl From<InstalledNatRoutine> for fnet_filter::InstalledNatRoutine {
348    fn from(routine: InstalledNatRoutine) -> Self {
349        let InstalledNatRoutine { hook, priority } = routine;
350        Self {
351            hook: Some(hook.into()),
352            priority: Some(priority),
353            __source_breaking: SourceBreaking,
354        }
355    }
356}
357
358impl TryFrom<fnet_filter::InstalledNatRoutine> for InstalledNatRoutine {
359    type Error = FidlConversionError;
360
361    fn try_from(routine: fnet_filter::InstalledNatRoutine) -> Result<Self, Self::Error> {
362        let fnet_filter::InstalledNatRoutine { hook, priority, __source_breaking } = routine;
363        let hook = hook.ok_or(FidlConversionError::MissingNatInstallationHook)?;
364        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
365        Ok(Self { hook: hook.try_into()?, priority })
366    }
367}
368
369/// Extension type for [`fnet_filter::RoutineType`].
370#[derive(Debug, Clone, PartialEq)]
371pub enum RoutineType {
372    Ip(Option<InstalledIpRoutine>),
373    Nat(Option<InstalledNatRoutine>),
374}
375
376impl RoutineType {
377    pub fn is_installed(&self) -> bool {
378        // The `InstalledIpRoutine` or `InstalledNatRoutine` configuration is
379        // optional, and when omitted, signifies an uninstalled routine.
380        match self {
381            Self::Ip(Some(_)) | Self::Nat(Some(_)) => true,
382            Self::Ip(None) | Self::Nat(None) => false,
383        }
384    }
385}
386
387impl From<RoutineType> for fnet_filter::RoutineType {
388    fn from(routine: RoutineType) -> Self {
389        match routine {
390            RoutineType::Ip(installation) => Self::Ip(fnet_filter::IpRoutine {
391                installation: installation.map(Into::into),
392                __source_breaking: SourceBreaking,
393            }),
394            RoutineType::Nat(installation) => Self::Nat(fnet_filter::NatRoutine {
395                installation: installation.map(Into::into),
396                __source_breaking: SourceBreaking,
397            }),
398        }
399    }
400}
401
402impl TryFrom<fnet_filter::RoutineType> for RoutineType {
403    type Error = FidlConversionError;
404
405    fn try_from(type_: fnet_filter::RoutineType) -> Result<Self, Self::Error> {
406        match type_ {
407            fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
408                installation,
409                __source_breaking,
410            }) => Ok(RoutineType::Ip(installation.map(TryInto::try_into).transpose()?)),
411            fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine {
412                installation,
413                __source_breaking,
414            }) => Ok(RoutineType::Nat(installation.map(TryInto::try_into).transpose()?)),
415            fnet_filter::RoutineType::__SourceBreaking { .. } => {
416                Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
417            }
418        }
419    }
420}
421
422/// Extension type for [`fnet_filter::Routine`].
423#[derive(Debug, Clone, PartialEq)]
424pub struct Routine {
425    pub id: RoutineId,
426    pub routine_type: RoutineType,
427}
428
429impl From<Routine> for fnet_filter::Routine {
430    fn from(routine: Routine) -> Self {
431        let Routine { id, routine_type: type_ } = routine;
432        Self { id: Some(id.into()), type_: Some(type_.into()), __source_breaking: SourceBreaking }
433    }
434}
435
436impl TryFrom<fnet_filter::Routine> for Routine {
437    type Error = FidlConversionError;
438
439    fn try_from(routine: fnet_filter::Routine) -> Result<Self, Self::Error> {
440        let fnet_filter::Routine { id, type_, __source_breaking } = routine;
441        let id = id.ok_or(FidlConversionError::MissingRoutineId)?;
442        let type_ = type_.ok_or(FidlConversionError::MissingRoutineType)?;
443        Ok(Self { id: id.into(), routine_type: type_.try_into()? })
444    }
445}
446
447/// Extension type for [`fnet_filter::InterfaceMatcher`].
448#[derive(Debug, Clone, PartialEq)]
449pub enum InterfaceMatcher {
450    Id(NonZeroU64),
451    Name(fnet_interfaces::Name),
452    PortClass(fnet_interfaces_ext::PortClass),
453}
454
455impl From<InterfaceMatcher> for fnet_filter::InterfaceMatcher {
456    fn from(matcher: InterfaceMatcher) -> Self {
457        match matcher {
458            InterfaceMatcher::Id(id) => Self::Id(id.get()),
459            InterfaceMatcher::Name(name) => Self::Name(name),
460            InterfaceMatcher::PortClass(port_class) => Self::PortClass(port_class.into()),
461        }
462    }
463}
464
465impl TryFrom<fnet_filter::InterfaceMatcher> for InterfaceMatcher {
466    type Error = FidlConversionError;
467
468    fn try_from(matcher: fnet_filter::InterfaceMatcher) -> Result<Self, Self::Error> {
469        match matcher {
470            fnet_filter::InterfaceMatcher::Id(id) => {
471                let id = NonZeroU64::new(id).ok_or(FidlConversionError::ZeroInterfaceId)?;
472                Ok(Self::Id(id))
473            }
474            fnet_filter::InterfaceMatcher::Name(name) => Ok(Self::Name(name)),
475            fnet_filter::InterfaceMatcher::PortClass(port_class) => {
476                port_class.try_into().map(Self::PortClass).map_err(|e| match e {
477                    fnet_interfaces_ext::UnknownPortClassError::NetInterfaces(_) => {
478                        FidlConversionError::UnknownUnionVariant(
479                            type_names::NET_INTERFACES_PORT_CLASS,
480                        )
481                    }
482                    fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(_) => {
483                        FidlConversionError::UnknownUnionVariant(
484                            type_names::HARDWARE_NETWORK_PORT_CLASS,
485                        )
486                    }
487                })
488            }
489            fnet_filter::InterfaceMatcher::__SourceBreaking { .. } => {
490                Err(FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER))
491            }
492        }
493    }
494}
495
496/// Extension type for the `Subnet` variant of [`fnet_filter::AddressMatcherType`].
497///
498/// This type witnesses to the invariant that the prefix length of the subnet is
499/// no greater than the number of bits in the IP address, and that no host bits
500/// in the network address are set.
501#[derive(Clone, Copy, Eq, Hash, PartialEq)]
502pub struct Subnet(fnet::Subnet);
503
504impl Subnet {
505    pub fn get(&self) -> fnet::Subnet {
506        let Subnet(subnet) = &self;
507        *subnet
508    }
509}
510
511impl From<Subnet> for fnet::Subnet {
512    fn from(subnet: Subnet) -> Self {
513        let Subnet(subnet) = subnet;
514        subnet
515    }
516}
517
518impl TryFrom<fnet::Subnet> for Subnet {
519    type Error = FidlConversionError;
520
521    fn try_from(subnet: fnet::Subnet) -> Result<Self, Self::Error> {
522        let fnet::Subnet { addr, prefix_len } = subnet;
523
524        // We convert to `net_types::ip::Subnet` to validate the subnet's
525        // properties, but we don't store the subnet as that type because we
526        // want to avoid forcing all `Resource` types in this library to be
527        // parameterized on IP version.
528        let result = match addr {
529            fnet::IpAddress::Ipv4(v4) => {
530                net_types::ip::Subnet::<net_types::ip::Ipv4Addr>::new(v4.into_ext(), prefix_len)
531                    .map(|_| Subnet(subnet))
532            }
533            fnet::IpAddress::Ipv6(v6) => {
534                net_types::ip::Subnet::<net_types::ip::Ipv6Addr>::new(v6.into_ext(), prefix_len)
535                    .map(|_| Subnet(subnet))
536            }
537        };
538        result.map_err(|e| match e {
539            net_types::ip::SubnetError::PrefixTooLong => FidlConversionError::SubnetPrefixTooLong,
540            net_types::ip::SubnetError::HostBitsSet => FidlConversionError::SubnetHostBitsSet,
541        })
542    }
543}
544
545impl Debug for Subnet {
546    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
547        let fnet::Subnet { addr, prefix_len } = self.0;
548
549        match addr {
550            fnet::IpAddress::Ipv4(v4) => {
551                let subnet = net_types::ip::Subnet::<net_types::ip::Ipv4Addr>::new(
552                    v4.into_ext(),
553                    prefix_len,
554                );
555
556                match subnet {
557                    Ok(inner) => inner.fmt(f),
558                    Err(err) => err.fmt(f),
559                }
560            }
561            fnet::IpAddress::Ipv6(v6) => {
562                let subnet = net_types::ip::Subnet::<net_types::ip::Ipv6Addr>::new(
563                    v6.into_ext(),
564                    prefix_len,
565                );
566
567                match subnet {
568                    Ok(inner) => inner.fmt(f),
569                    Err(err) => err.fmt(f),
570                }
571            }
572        }
573    }
574}
575
576/// Extension type for [`fnet_filter::AddressRange`].
577///
578/// This type witnesses to the invariant that `start` is in the same IP family
579/// as `end`, and that `start <= end`. (Comparisons are performed on the
580/// numerical big-endian representation of the IP address.)
581#[derive(Debug, Clone, PartialEq)]
582pub struct AddressRange {
583    range: RangeInclusive<fnet::IpAddress>,
584}
585
586impl AddressRange {
587    pub fn start(&self) -> fnet::IpAddress {
588        *self.range.start()
589    }
590
591    pub fn end(&self) -> fnet::IpAddress {
592        *self.range.end()
593    }
594}
595
596impl From<AddressRange> for fnet_filter::AddressRange {
597    fn from(range: AddressRange) -> Self {
598        Self { start: range.start(), end: range.end() }
599    }
600}
601
602impl TryFrom<fnet_filter::AddressRange> for AddressRange {
603    type Error = FidlConversionError;
604
605    fn try_from(range: fnet_filter::AddressRange) -> Result<Self, Self::Error> {
606        let fnet_filter::AddressRange { start, end } = range;
607        match (start, end) {
608            (
609                fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: start_bytes }),
610                fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: end_bytes }),
611            ) => {
612                if u32::from_be_bytes(start_bytes) > u32::from_be_bytes(end_bytes) {
613                    Err(FidlConversionError::InvalidAddressRange)
614                } else {
615                    Ok(Self { range: start..=end })
616                }
617            }
618            (
619                fnet::IpAddress::Ipv6(fnet::Ipv6Address { addr: start_bytes }),
620                fnet::IpAddress::Ipv6(fnet::Ipv6Address { addr: end_bytes }),
621            ) => {
622                if u128::from_be_bytes(start_bytes) > u128::from_be_bytes(end_bytes) {
623                    Err(FidlConversionError::InvalidAddressRange)
624                } else {
625                    Ok(Self { range: start..=end })
626                }
627            }
628            _ => Err(FidlConversionError::AddressRangeFamilyMismatch),
629        }
630    }
631}
632
633/// Extension type for [`fnet_filter::AddressMatcherType`].
634#[derive(Clone, PartialEq)]
635pub enum AddressMatcherType {
636    Subnet(Subnet),
637    Range(AddressRange),
638}
639
640impl From<AddressMatcherType> for fnet_filter::AddressMatcherType {
641    fn from(matcher: AddressMatcherType) -> Self {
642        match matcher {
643            AddressMatcherType::Subnet(subnet) => Self::Subnet(subnet.into()),
644            AddressMatcherType::Range(range) => Self::Range(range.into()),
645        }
646    }
647}
648
649impl TryFrom<fnet_filter::AddressMatcherType> for AddressMatcherType {
650    type Error = FidlConversionError;
651
652    fn try_from(matcher: fnet_filter::AddressMatcherType) -> Result<Self, Self::Error> {
653        match matcher {
654            fnet_filter::AddressMatcherType::Subnet(subnet) => Ok(Self::Subnet(subnet.try_into()?)),
655            fnet_filter::AddressMatcherType::Range(range) => Ok(Self::Range(range.try_into()?)),
656            fnet_filter::AddressMatcherType::__SourceBreaking { .. } => {
657                Err(FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE))
658            }
659        }
660    }
661}
662
663impl Debug for AddressMatcherType {
664    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665        match self {
666            AddressMatcherType::Subnet(subnet) => subnet.fmt(f),
667            AddressMatcherType::Range(address_range) => address_range.fmt(f),
668        }
669    }
670}
671
672/// Extension type for [`fnet_filter::AddressMatcher`].
673#[derive(Debug, Clone, PartialEq)]
674pub struct AddressMatcher {
675    pub matcher: AddressMatcherType,
676    pub invert: bool,
677}
678
679impl From<AddressMatcher> for fnet_filter::AddressMatcher {
680    fn from(matcher: AddressMatcher) -> Self {
681        let AddressMatcher { matcher, invert } = matcher;
682        Self { matcher: matcher.into(), invert }
683    }
684}
685
686impl TryFrom<fnet_filter::AddressMatcher> for AddressMatcher {
687    type Error = FidlConversionError;
688
689    fn try_from(matcher: fnet_filter::AddressMatcher) -> Result<Self, Self::Error> {
690        let fnet_filter::AddressMatcher { matcher, invert } = matcher;
691        Ok(Self { matcher: matcher.try_into()?, invert })
692    }
693}
694
695/// Extension type for [`fnet_filter::PortMatcher`].
696///
697/// This type witnesses to the invariant that `start <= end`.
698#[derive(Debug, Clone, PartialEq)]
699pub struct PortMatcher {
700    range: RangeInclusive<u16>,
701    pub invert: bool,
702}
703
704/// Errors when creating a `PortMatcher`.
705#[derive(Debug, Error, PartialEq)]
706pub enum PortMatcherError {
707    #[error("invalid port range (start must be <= end)")]
708    InvalidPortRange,
709}
710
711impl PortMatcher {
712    pub fn new(start: u16, end: u16, invert: bool) -> Result<Self, PortMatcherError> {
713        if start > end {
714            return Err(PortMatcherError::InvalidPortRange);
715        }
716        Ok(Self { range: start..=end, invert })
717    }
718
719    pub fn range(&self) -> &RangeInclusive<u16> {
720        &self.range
721    }
722
723    pub fn start(&self) -> u16 {
724        *self.range.start()
725    }
726
727    pub fn end(&self) -> u16 {
728        *self.range.end()
729    }
730}
731
732impl From<PortMatcher> for fnet_filter::PortMatcher {
733    fn from(matcher: PortMatcher) -> Self {
734        let PortMatcher { range, invert } = matcher;
735        Self { start: *range.start(), end: *range.end(), invert }
736    }
737}
738
739impl TryFrom<fnet_filter::PortMatcher> for PortMatcher {
740    type Error = FidlConversionError;
741
742    fn try_from(matcher: fnet_filter::PortMatcher) -> Result<Self, Self::Error> {
743        let fnet_filter::PortMatcher { start, end, invert } = matcher;
744        if start > end {
745            return Err(FidlConversionError::InvalidPortMatcherRange);
746        }
747        Ok(Self { range: start..=end, invert })
748    }
749}
750
751/// Extension type for [`fnet_filter::TransportProtocol`].
752#[derive(Clone, PartialEq)]
753pub enum TransportProtocolMatcher {
754    Tcp { src_port: Option<PortMatcher>, dst_port: Option<PortMatcher> },
755    Udp { src_port: Option<PortMatcher>, dst_port: Option<PortMatcher> },
756    Icmp,
757    Icmpv6,
758}
759
760impl From<TransportProtocolMatcher> for fnet_filter::TransportProtocol {
761    fn from(matcher: TransportProtocolMatcher) -> Self {
762        match matcher {
763            TransportProtocolMatcher::Tcp { src_port, dst_port } => {
764                Self::Tcp(fnet_filter::TcpMatcher {
765                    src_port: src_port.map(Into::into),
766                    dst_port: dst_port.map(Into::into),
767                    __source_breaking: SourceBreaking,
768                })
769            }
770            TransportProtocolMatcher::Udp { src_port, dst_port } => {
771                Self::Udp(fnet_filter::UdpMatcher {
772                    src_port: src_port.map(Into::into),
773                    dst_port: dst_port.map(Into::into),
774                    __source_breaking: SourceBreaking,
775                })
776            }
777            TransportProtocolMatcher::Icmp => Self::Icmp(fnet_filter::IcmpMatcher::default()),
778            TransportProtocolMatcher::Icmpv6 => Self::Icmpv6(fnet_filter::Icmpv6Matcher::default()),
779        }
780    }
781}
782
783impl TryFrom<fnet_filter::TransportProtocol> for TransportProtocolMatcher {
784    type Error = FidlConversionError;
785
786    fn try_from(matcher: fnet_filter::TransportProtocol) -> Result<Self, Self::Error> {
787        match matcher {
788            fnet_filter::TransportProtocol::Tcp(fnet_filter::TcpMatcher {
789                src_port,
790                dst_port,
791                __source_breaking,
792            }) => Ok(Self::Tcp {
793                src_port: src_port.map(TryInto::try_into).transpose()?,
794                dst_port: dst_port.map(TryInto::try_into).transpose()?,
795            }),
796            fnet_filter::TransportProtocol::Udp(fnet_filter::UdpMatcher {
797                src_port,
798                dst_port,
799                __source_breaking,
800            }) => Ok(Self::Udp {
801                src_port: src_port.map(TryInto::try_into).transpose()?,
802                dst_port: dst_port.map(TryInto::try_into).transpose()?,
803            }),
804            fnet_filter::TransportProtocol::Icmp(fnet_filter::IcmpMatcher {
805                __source_breaking,
806            }) => Ok(Self::Icmp),
807            fnet_filter::TransportProtocol::Icmpv6(fnet_filter::Icmpv6Matcher {
808                __source_breaking,
809            }) => Ok(Self::Icmpv6),
810            fnet_filter::TransportProtocol::__SourceBreaking { .. } => {
811                Err(FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL))
812            }
813        }
814    }
815}
816
817impl Debug for TransportProtocolMatcher {
818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819        // Omit empty fields.
820        match self {
821            TransportProtocolMatcher::Tcp { src_port, dst_port } => {
822                let mut debug_struct = f.debug_struct("Tcp");
823
824                // Omit empty fields.
825                if let Some(port) = &src_port {
826                    let _ = debug_struct.field("src_port", port);
827                }
828
829                if let Some(port) = &dst_port {
830                    let _ = debug_struct.field("dst_port", port);
831                }
832
833                debug_struct.finish()
834            }
835            TransportProtocolMatcher::Udp { src_port, dst_port } => {
836                let mut debug_struct = f.debug_struct("Udp");
837
838                // Omit empty fields.
839                if let Some(port) = &src_port {
840                    let _ = debug_struct.field("src_port", port);
841                }
842
843                if let Some(port) = &dst_port {
844                    let _ = debug_struct.field("dst_port", port);
845                }
846
847                debug_struct.finish()
848            }
849            TransportProtocolMatcher::Icmp => f.write_str("Icmp"),
850            TransportProtocolMatcher::Icmpv6 => f.write_str("Icmpv6"),
851        }
852    }
853}
854
855/// Extension type for [`fnet_filter::Matchers`].
856#[derive(Default, Clone, PartialEq)]
857pub struct Matchers {
858    pub in_interface: Option<InterfaceMatcher>,
859    pub out_interface: Option<InterfaceMatcher>,
860    pub src_addr: Option<AddressMatcher>,
861    pub dst_addr: Option<AddressMatcher>,
862    pub transport_protocol: Option<TransportProtocolMatcher>,
863}
864
865impl From<Matchers> for fnet_filter::Matchers {
866    fn from(matchers: Matchers) -> Self {
867        let Matchers { in_interface, out_interface, src_addr, dst_addr, transport_protocol } =
868            matchers;
869        Self {
870            in_interface: in_interface.map(Into::into),
871            out_interface: out_interface.map(Into::into),
872            src_addr: src_addr.map(Into::into),
873            dst_addr: dst_addr.map(Into::into),
874            transport_protocol: transport_protocol.map(Into::into),
875            __source_breaking: SourceBreaking,
876        }
877    }
878}
879
880impl TryFrom<fnet_filter::Matchers> for Matchers {
881    type Error = FidlConversionError;
882
883    fn try_from(matchers: fnet_filter::Matchers) -> Result<Self, Self::Error> {
884        let fnet_filter::Matchers {
885            in_interface,
886            out_interface,
887            src_addr,
888            dst_addr,
889            transport_protocol,
890            __source_breaking,
891        } = matchers;
892        Ok(Self {
893            in_interface: in_interface.map(TryInto::try_into).transpose()?,
894            out_interface: out_interface.map(TryInto::try_into).transpose()?,
895            src_addr: src_addr.map(TryInto::try_into).transpose()?,
896            dst_addr: dst_addr.map(TryInto::try_into).transpose()?,
897            transport_protocol: transport_protocol.map(TryInto::try_into).transpose()?,
898        })
899    }
900}
901
902impl Debug for Matchers {
903    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
904        let mut debug_struct = f.debug_struct("Matchers");
905
906        let Matchers { in_interface, out_interface, src_addr, dst_addr, transport_protocol } =
907            &self;
908
909        // Omit empty fields.
910        if let Some(matcher) = in_interface {
911            let _ = debug_struct.field("in_interface", matcher);
912        }
913
914        if let Some(matcher) = out_interface {
915            let _ = debug_struct.field("out_interface", matcher);
916        }
917
918        if let Some(matcher) = src_addr {
919            let _ = debug_struct.field("src_addr", matcher);
920        }
921
922        if let Some(matcher) = dst_addr {
923            let _ = debug_struct.field("dst_addr", matcher);
924        }
925
926        if let Some(matcher) = transport_protocol {
927            let _ = debug_struct.field("transport_protocol", matcher);
928        }
929
930        debug_struct.finish()
931    }
932}
933
934/// Extension type for [`fnet_filter::Action`].
935#[derive(Debug, Clone, PartialEq)]
936pub enum Action {
937    Accept,
938    Drop,
939    Jump(String),
940    Return,
941    TransparentProxy(TransparentProxy),
942    Redirect { dst_port: Option<PortRange> },
943    Masquerade { src_port: Option<PortRange> },
944}
945
946/// Extension type for [`fnet_filter::TransparentProxy_`].
947#[derive(Debug, Clone, PartialEq)]
948pub enum TransparentProxy {
949    LocalAddr(fnet::IpAddress),
950    LocalPort(NonZeroU16),
951    LocalAddrAndPort(fnet::IpAddress, NonZeroU16),
952}
953
954#[derive(Debug, Clone, PartialEq)]
955pub struct PortRange(pub RangeInclusive<NonZeroU16>);
956
957impl From<PortRange> for fnet_filter::PortRange {
958    fn from(range: PortRange) -> Self {
959        let PortRange(range) = range;
960        Self { start: range.start().get(), end: range.end().get() }
961    }
962}
963
964impl TryFrom<fnet_filter::PortRange> for PortRange {
965    type Error = FidlConversionError;
966
967    fn try_from(range: fnet_filter::PortRange) -> Result<Self, Self::Error> {
968        let fnet_filter::PortRange { start, end } = range;
969        if start > end {
970            Err(FidlConversionError::InvalidPortRange)
971        } else {
972            let start = NonZeroU16::new(start).ok_or(FidlConversionError::UnspecifiedNatPort)?;
973            let end = NonZeroU16::new(end).ok_or(FidlConversionError::UnspecifiedNatPort)?;
974            Ok(Self(start..=end))
975        }
976    }
977}
978
979impl From<Action> for fnet_filter::Action {
980    fn from(action: Action) -> Self {
981        match action {
982            Action::Accept => Self::Accept(fnet_filter::Empty {}),
983            Action::Drop => Self::Drop(fnet_filter::Empty {}),
984            Action::Jump(target) => Self::Jump(target),
985            Action::Return => Self::Return_(fnet_filter::Empty {}),
986            Action::TransparentProxy(proxy) => Self::TransparentProxy(match proxy {
987                TransparentProxy::LocalAddr(addr) => {
988                    fnet_filter::TransparentProxy_::LocalAddr(addr)
989                }
990                TransparentProxy::LocalPort(port) => {
991                    fnet_filter::TransparentProxy_::LocalPort(port.get())
992                }
993                TransparentProxy::LocalAddrAndPort(addr, port) => {
994                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
995                        addr,
996                        port: port.get(),
997                    })
998                }
999            }),
1000            Action::Redirect { dst_port } => Self::Redirect(fnet_filter::Redirect {
1001                dst_port: dst_port.map(Into::into),
1002                __source_breaking: SourceBreaking,
1003            }),
1004            Action::Masquerade { src_port } => Self::Masquerade(fnet_filter::Masquerade {
1005                src_port: src_port.map(Into::into),
1006                __source_breaking: SourceBreaking,
1007            }),
1008        }
1009    }
1010}
1011
1012impl TryFrom<fnet_filter::Action> for Action {
1013    type Error = FidlConversionError;
1014
1015    fn try_from(action: fnet_filter::Action) -> Result<Self, Self::Error> {
1016        match action {
1017            fnet_filter::Action::Accept(fnet_filter::Empty {}) => Ok(Self::Accept),
1018            fnet_filter::Action::Drop(fnet_filter::Empty {}) => Ok(Self::Drop),
1019            fnet_filter::Action::Jump(target) => Ok(Self::Jump(target)),
1020            fnet_filter::Action::Return_(fnet_filter::Empty {}) => Ok(Self::Return),
1021            fnet_filter::Action::TransparentProxy(proxy) => {
1022                Ok(Self::TransparentProxy(match proxy {
1023                    fnet_filter::TransparentProxy_::LocalAddr(addr) => {
1024                        TransparentProxy::LocalAddr(addr)
1025                    }
1026                    fnet_filter::TransparentProxy_::LocalPort(port) => {
1027                        let port = NonZeroU16::new(port)
1028                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
1029                        TransparentProxy::LocalPort(port)
1030                    }
1031                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
1032                        addr,
1033                        port,
1034                    }) => {
1035                        let port = NonZeroU16::new(port)
1036                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
1037                        TransparentProxy::LocalAddrAndPort(addr, port)
1038                    }
1039                    fnet_filter::TransparentProxy_::__SourceBreaking { .. } => {
1040                        return Err(FidlConversionError::UnknownUnionVariant(
1041                            type_names::TRANSPARENT_PROXY,
1042                        ))
1043                    }
1044                }))
1045            }
1046            fnet_filter::Action::Redirect(fnet_filter::Redirect {
1047                dst_port,
1048                __source_breaking,
1049            }) => Ok(Self::Redirect { dst_port: dst_port.map(TryInto::try_into).transpose()? }),
1050            fnet_filter::Action::Masquerade(fnet_filter::Masquerade {
1051                src_port,
1052                __source_breaking,
1053            }) => Ok(Self::Masquerade { src_port: src_port.map(TryInto::try_into).transpose()? }),
1054            fnet_filter::Action::__SourceBreaking { .. } => {
1055                Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
1056            }
1057        }
1058    }
1059}
1060
1061/// Extension type for [`fnet_filter::Rule`].
1062#[derive(Debug, Clone, PartialEq)]
1063pub struct Rule {
1064    pub id: RuleId,
1065    pub matchers: Matchers,
1066    pub action: Action,
1067}
1068
1069impl From<Rule> for fnet_filter::Rule {
1070    fn from(rule: Rule) -> Self {
1071        let Rule { id, matchers, action } = rule;
1072        Self { id: id.into(), matchers: matchers.into(), action: action.into() }
1073    }
1074}
1075
1076impl TryFrom<fnet_filter::Rule> for Rule {
1077    type Error = FidlConversionError;
1078
1079    fn try_from(rule: fnet_filter::Rule) -> Result<Self, Self::Error> {
1080        let fnet_filter::Rule { id, matchers, action } = rule;
1081        Ok(Self { id: id.into(), matchers: matchers.try_into()?, action: action.try_into()? })
1082    }
1083}
1084
1085/// Extension type for [`fnet_filter::Resource`].
1086#[derive(Debug, Clone, PartialEq)]
1087pub enum Resource {
1088    Namespace(Namespace),
1089    Routine(Routine),
1090    Rule(Rule),
1091}
1092
1093impl Resource {
1094    pub fn id(&self) -> ResourceId {
1095        match self {
1096            Self::Namespace(Namespace { id, domain: _ }) => ResourceId::Namespace(id.clone()),
1097            Self::Routine(Routine { id, routine_type: _ }) => ResourceId::Routine(id.clone()),
1098            Self::Rule(Rule { id, matchers: _, action: _ }) => ResourceId::Rule(id.clone()),
1099        }
1100    }
1101}
1102
1103impl From<Resource> for fnet_filter::Resource {
1104    fn from(resource: Resource) -> Self {
1105        match resource {
1106            Resource::Namespace(namespace) => Self::Namespace(namespace.into()),
1107            Resource::Routine(routine) => Self::Routine(routine.into()),
1108            Resource::Rule(rule) => Self::Rule(rule.into()),
1109        }
1110    }
1111}
1112
1113impl TryFrom<fnet_filter::Resource> for Resource {
1114    type Error = FidlConversionError;
1115
1116    fn try_from(resource: fnet_filter::Resource) -> Result<Self, Self::Error> {
1117        match resource {
1118            fnet_filter::Resource::Namespace(namespace) => {
1119                Ok(Self::Namespace(namespace.try_into()?))
1120            }
1121            fnet_filter::Resource::Routine(routine) => Ok(Self::Routine(routine.try_into()?)),
1122            fnet_filter::Resource::Rule(rule) => Ok(Self::Rule(rule.try_into()?)),
1123            fnet_filter::Resource::__SourceBreaking { .. } => {
1124                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
1125            }
1126        }
1127    }
1128}
1129
1130/// Extension type for [`fnet_filter::ControllerId`].
1131#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
1132pub struct ControllerId(pub String);
1133
1134/// Extension type for [`fnet_filter::Event`].
1135#[derive(Debug, Clone, PartialEq)]
1136pub enum Event {
1137    Existing(ControllerId, Resource),
1138    Idle,
1139    Added(ControllerId, Resource),
1140    Removed(ControllerId, ResourceId),
1141    EndOfUpdate,
1142}
1143
1144impl From<Event> for fnet_filter::Event {
1145    fn from(event: Event) -> Self {
1146        match event {
1147            Event::Existing(controller, resource) => {
1148                let ControllerId(id) = controller;
1149                Self::Existing(fnet_filter::ExistingResource {
1150                    controller: id,
1151                    resource: resource.into(),
1152                })
1153            }
1154            Event::Idle => Self::Idle(fnet_filter::Empty {}),
1155            Event::Added(controller, resource) => {
1156                let ControllerId(id) = controller;
1157                Self::Added(fnet_filter::AddedResource {
1158                    controller: id,
1159                    resource: resource.into(),
1160                })
1161            }
1162            Event::Removed(controller, resource) => {
1163                let ControllerId(id) = controller;
1164                Self::Removed(fnet_filter::RemovedResource {
1165                    controller: id,
1166                    resource: resource.into(),
1167                })
1168            }
1169            Event::EndOfUpdate => Self::EndOfUpdate(fnet_filter::Empty {}),
1170        }
1171    }
1172}
1173
1174impl TryFrom<fnet_filter::Event> for Event {
1175    type Error = FidlConversionError;
1176
1177    fn try_from(event: fnet_filter::Event) -> Result<Self, Self::Error> {
1178        match event {
1179            fnet_filter::Event::Existing(fnet_filter::ExistingResource {
1180                controller,
1181                resource,
1182            }) => Ok(Self::Existing(ControllerId(controller), resource.try_into()?)),
1183            fnet_filter::Event::Idle(fnet_filter::Empty {}) => Ok(Self::Idle),
1184            fnet_filter::Event::Added(fnet_filter::AddedResource { controller, resource }) => {
1185                Ok(Self::Added(ControllerId(controller), resource.try_into()?))
1186            }
1187            fnet_filter::Event::Removed(fnet_filter::RemovedResource { controller, resource }) => {
1188                Ok(Self::Removed(ControllerId(controller), resource.try_into()?))
1189            }
1190            fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}) => Ok(Self::EndOfUpdate),
1191            fnet_filter::Event::__SourceBreaking { .. } => {
1192                Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
1193            }
1194        }
1195    }
1196}
1197
1198/// Filter watcher creation errors.
1199#[derive(Debug, Error)]
1200pub enum WatcherCreationError {
1201    #[error("failed to create filter watcher proxy: {0}")]
1202    CreateProxy(fidl::Error),
1203    #[error("failed to get filter watcher: {0}")]
1204    GetWatcher(fidl::Error),
1205}
1206
1207/// Filter watcher `Watch` errors.
1208#[derive(Debug, Error)]
1209pub enum WatchError {
1210    /// The call to `Watch` returned a FIDL error.
1211    #[error("the call to `Watch()` failed: {0}")]
1212    Fidl(fidl::Error),
1213    /// The event returned by `Watch` encountered a conversion error.
1214    #[error("failed to convert event returned by `Watch()`: {0}")]
1215    Conversion(FidlConversionError),
1216    /// The server returned an empty batch of events.
1217    #[error("the call to `Watch()` returned an empty batch of events")]
1218    EmptyEventBatch,
1219}
1220
1221/// Connects to the watcher protocol and converts the Hanging-Get style API into
1222/// an Event stream.
1223///
1224/// Each call to `Watch` returns a batch of events, which are flattened into a
1225/// single stream. If an error is encountered while calling `Watch` or while
1226/// converting the event, the stream is immediately terminated.
1227pub fn event_stream_from_state(
1228    state: fnet_filter::StateProxy,
1229) -> Result<impl Stream<Item = Result<Event, WatchError>>, WatcherCreationError> {
1230    let (watcher, server_end) = fidl::endpoints::create_proxy::<fnet_filter::WatcherMarker>();
1231    state
1232        .get_watcher(&fnet_filter::WatcherOptions::default(), server_end)
1233        .map_err(WatcherCreationError::GetWatcher)?;
1234
1235    let stream = futures::stream::try_unfold(watcher, |watcher| async {
1236        let events = watcher.watch().await.map_err(WatchError::Fidl)?;
1237        if events.is_empty() {
1238            return Err(WatchError::EmptyEventBatch);
1239        }
1240
1241        let event_stream = futures::stream::iter(events).map(Ok).and_then(|event| {
1242            futures::future::ready(event.try_into().map_err(WatchError::Conversion))
1243        });
1244        Ok(Some((event_stream, watcher)))
1245    })
1246    .try_flatten();
1247
1248    Ok(stream)
1249}
1250
1251/// Errors returned by [`get_existing_resources`].
1252#[derive(Debug, Error)]
1253pub enum GetExistingResourcesError {
1254    /// There was an error in the event stream.
1255    #[error("there was an error in the event stream: {0}")]
1256    ErrorInStream(WatchError),
1257    /// There was an unexpected event in the event stream. Only `existing` or
1258    /// `idle` events are expected.
1259    #[error("there was an unexpected event in the event stream: {0:?}")]
1260    UnexpectedEvent(Event),
1261    /// A duplicate existing resource was reported in the event stream.
1262    #[error("a duplicate existing resource was reported")]
1263    DuplicateResource(Resource),
1264    /// The event stream unexpectedly ended.
1265    #[error("the event stream unexpectedly ended")]
1266    StreamEnded,
1267}
1268
1269/// A trait for types holding filtering state that can be updated by change
1270/// events.
1271pub trait Update {
1272    /// Add the resource to the specified controller's state.
1273    ///
1274    /// Optionally returns a resource that has already been added to the
1275    /// controller with the same [`ResourceId`].
1276    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource>;
1277
1278    /// Remove the resource from the specified controller's state.
1279    ///
1280    /// Returns the removed resource, if present.
1281    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource>;
1282}
1283
1284impl Update for HashMap<ControllerId, HashMap<ResourceId, Resource>> {
1285    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource> {
1286        self.entry(controller).or_default().insert(resource.id(), resource)
1287    }
1288
1289    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource> {
1290        self.get_mut(&controller)?.remove(resource)
1291    }
1292}
1293
1294/// Collects all `existing` events from the stream, stopping once the `idle`
1295/// event is observed.
1296pub async fn get_existing_resources<C: Update + Default>(
1297    stream: impl Stream<Item = Result<Event, WatchError>>,
1298) -> Result<C, GetExistingResourcesError> {
1299    async_utils::fold::fold_while(
1300        stream,
1301        Ok(C::default()),
1302        |resources: Result<C, GetExistingResourcesError>, event| {
1303            let mut resources =
1304                resources.expect("`resources` must be `Ok`, because we stop folding on err");
1305            futures::future::ready(match event {
1306                Err(e) => FoldWhile::Done(Err(GetExistingResourcesError::ErrorInStream(e))),
1307                Ok(e) => match e {
1308                    Event::Existing(controller, resource) => {
1309                        if let Some(resource) = resources.add(controller, resource) {
1310                            FoldWhile::Done(Err(GetExistingResourcesError::DuplicateResource(
1311                                resource,
1312                            )))
1313                        } else {
1314                            FoldWhile::Continue(Ok(resources))
1315                        }
1316                    }
1317                    Event::Idle => FoldWhile::Done(Ok(resources)),
1318                    e @ (Event::Added(_, _) | Event::Removed(_, _) | Event::EndOfUpdate) => {
1319                        FoldWhile::Done(Err(GetExistingResourcesError::UnexpectedEvent(e)))
1320                    }
1321                },
1322            })
1323        },
1324    )
1325    .await
1326    .short_circuited()
1327    .map_err(|_resources| GetExistingResourcesError::StreamEnded)?
1328}
1329
1330/// Errors returned by [`wait_for_condition`].
1331#[derive(Debug, Error)]
1332pub enum WaitForConditionError {
1333    /// There was an error in the event stream.
1334    #[error("there was an error in the event stream: {0}")]
1335    ErrorInStream(WatchError),
1336    /// There was an `Added` event for an already existing resource.
1337    #[error("observed an added event for an already existing resource: {0:?}")]
1338    AddedAlreadyExisting(Resource),
1339    /// There was a `Removed` event for a non-existent resource.
1340    #[error("observed a removed event for a non-existent resource: {0:?}")]
1341    RemovedNonExistent(ResourceId),
1342    /// The event stream unexpectedly ended.
1343    #[error("the event stream unexpectedly ended")]
1344    StreamEnded,
1345}
1346
1347/// Wait for a condition on filtering state to be satisfied.
1348///
1349/// With the given `initial_state`, take events from `event_stream` and update
1350/// the state, calling `predicate` whenever the state changes. When predicates
1351/// returns `True` yield `Ok(())`.
1352pub async fn wait_for_condition<
1353    C: Update,
1354    S: Stream<Item = Result<Event, WatchError>>,
1355    F: Fn(&C) -> bool,
1356>(
1357    event_stream: S,
1358    initial_state: &mut C,
1359    predicate: F,
1360) -> Result<(), WaitForConditionError> {
1361    async_utils::fold::try_fold_while(
1362        event_stream.map_err(WaitForConditionError::ErrorInStream),
1363        initial_state,
1364        |resources: &mut C, event| {
1365            futures::future::ready(match event {
1366                Event::Existing(controller, resource) | Event::Added(controller, resource) => {
1367                    if let Some(resource) = resources.add(controller, resource) {
1368                        Err(WaitForConditionError::AddedAlreadyExisting(resource))
1369                    } else {
1370                        Ok(FoldWhile::Continue(resources))
1371                    }
1372                }
1373                Event::Removed(controller, resource) => resources
1374                    .remove(controller, &resource)
1375                    .map(|_| FoldWhile::Continue(resources))
1376                    .ok_or(WaitForConditionError::RemovedNonExistent(resource)),
1377                // Wait until a transactional update has been completed to call
1378                // the predicate so it's not run against partially-updated
1379                // state.
1380                Event::Idle | Event::EndOfUpdate => {
1381                    if predicate(&resources) {
1382                        Ok(FoldWhile::Done(()))
1383                    } else {
1384                        Ok(FoldWhile::Continue(resources))
1385                    }
1386                }
1387            })
1388        },
1389    )
1390    .await?
1391    .short_circuited()
1392    .map_err(|_resources: &mut C| WaitForConditionError::StreamEnded)
1393}
1394
1395/// Namespace controller creation errors.
1396#[derive(Debug, Error)]
1397pub enum ControllerCreationError {
1398    #[error("failed to create namespace controller proxy: {0}")]
1399    CreateProxy(fidl::Error),
1400    #[error("failed to open namespace controller: {0}")]
1401    OpenController(fidl::Error),
1402    #[error("server did not emit OnIdAssigned event")]
1403    NoIdAssigned,
1404    #[error("failed to observe ID assignment event: {0}")]
1405    IdAssignment(fidl::Error),
1406}
1407
1408/// Errors for individual changes pushed.
1409///
1410/// Extension type for the error variants of [`fnet_filter::ChangeValidationError`].
1411#[derive(Debug, Error, PartialEq)]
1412pub enum ChangeValidationError {
1413    #[error("change contains a resource that is missing a required field")]
1414    MissingRequiredField,
1415    #[error("rule specifies an invalid interface matcher")]
1416    InvalidInterfaceMatcher,
1417    #[error("rule specifies an invalid address matcher")]
1418    InvalidAddressMatcher,
1419    #[error("rule specifies an invalid port matcher")]
1420    InvalidPortMatcher,
1421    #[error("rule specifies an invalid transparent proxy action")]
1422    InvalidTransparentProxyAction,
1423    #[error("rule specifies an invalid NAT action")]
1424    InvalidNatAction,
1425    #[error("rule specifies an invalid port range")]
1426    InvalidPortRange,
1427}
1428
1429impl TryFrom<fnet_filter::ChangeValidationError> for ChangeValidationError {
1430    type Error = FidlConversionError;
1431
1432    fn try_from(error: fnet_filter::ChangeValidationError) -> Result<Self, Self::Error> {
1433        match error {
1434            fnet_filter::ChangeValidationError::MissingRequiredField => {
1435                Ok(Self::MissingRequiredField)
1436            }
1437            fnet_filter::ChangeValidationError::InvalidInterfaceMatcher => {
1438                Ok(Self::InvalidInterfaceMatcher)
1439            }
1440            fnet_filter::ChangeValidationError::InvalidAddressMatcher => {
1441                Ok(Self::InvalidAddressMatcher)
1442            }
1443            fnet_filter::ChangeValidationError::InvalidPortMatcher => Ok(Self::InvalidPortMatcher),
1444            fnet_filter::ChangeValidationError::InvalidTransparentProxyAction => {
1445                Ok(Self::InvalidTransparentProxyAction)
1446            }
1447            fnet_filter::ChangeValidationError::InvalidNatAction => Ok(Self::InvalidNatAction),
1448            fnet_filter::ChangeValidationError::InvalidPortRange => Ok(Self::InvalidPortRange),
1449            fnet_filter::ChangeValidationError::Ok
1450            | fnet_filter::ChangeValidationError::NotReached => {
1451                Err(FidlConversionError::NotAnError)
1452            }
1453            fnet_filter::ChangeValidationError::__SourceBreaking { unknown_ordinal: _ } => {
1454                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_ERROR))
1455            }
1456        }
1457    }
1458}
1459
1460/// Errors for the NamespaceController.PushChanges method.
1461#[derive(Debug, Error)]
1462pub enum PushChangesError {
1463    #[error("failed to call FIDL method: {0}")]
1464    CallMethod(fidl::Error),
1465    #[error("too many changes were pushed to the server")]
1466    TooManyChanges,
1467    #[error("invalid change(s) pushed: {0:?}")]
1468    ErrorOnChange(Vec<(Change, ChangeValidationError)>),
1469    #[error("unknown FIDL type: {0}")]
1470    FidlConversion(#[from] FidlConversionError),
1471}
1472
1473/// Errors for individual changes committed.
1474///
1475/// Extension type for the error variants of [`fnet_filter::CommitError`].
1476#[derive(Debug, Error, PartialEq)]
1477pub enum ChangeCommitError {
1478    #[error("the change referred to an unknown namespace")]
1479    NamespaceNotFound,
1480    #[error("the change referred to an unknown routine")]
1481    RoutineNotFound,
1482    #[error("the change referred to an unknown rule")]
1483    RuleNotFound,
1484    #[error("the specified resource already exists")]
1485    AlreadyExists,
1486    #[error("the change includes a rule that jumps to an installed routine")]
1487    TargetRoutineIsInstalled,
1488}
1489
1490impl TryFrom<fnet_filter::CommitError> for ChangeCommitError {
1491    type Error = FidlConversionError;
1492
1493    fn try_from(error: fnet_filter::CommitError) -> Result<Self, Self::Error> {
1494        match error {
1495            fnet_filter::CommitError::NamespaceNotFound => Ok(Self::NamespaceNotFound),
1496            fnet_filter::CommitError::RoutineNotFound => Ok(Self::RoutineNotFound),
1497            fnet_filter::CommitError::RuleNotFound => Ok(Self::RuleNotFound),
1498            fnet_filter::CommitError::AlreadyExists => Ok(Self::AlreadyExists),
1499            fnet_filter::CommitError::TargetRoutineIsInstalled => {
1500                Ok(Self::TargetRoutineIsInstalled)
1501            }
1502            fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1503                Err(FidlConversionError::NotAnError)
1504            }
1505            fnet_filter::CommitError::__SourceBreaking { unknown_ordinal: _ } => {
1506                Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR))
1507            }
1508        }
1509    }
1510}
1511
1512/// Errors for the NamespaceController.Commit method.
1513#[derive(Debug, Error)]
1514pub enum CommitError {
1515    #[error("failed to call FIDL method: {0}")]
1516    CallMethod(fidl::Error),
1517    #[error("rule has a matcher that is unavailable in its context: {0:?}")]
1518    RuleWithInvalidMatcher(RuleId),
1519    #[error("rule has an action that is invalid for its routine: {0:?}")]
1520    RuleWithInvalidAction(RuleId),
1521    #[error(
1522        "rule has a TransparentProxy action but not a valid transport protocol matcher: {0:?}"
1523    )]
1524    TransparentProxyWithInvalidMatcher(RuleId),
1525    #[error(
1526        "rule has a Redirect action that specifies a destination port but not a valid transport \
1527        protocol matcher: {0:?}"
1528    )]
1529    RedirectWithInvalidMatcher(RuleId),
1530    #[error(
1531        "rule has a Masquerade action that specifies a source port but not a valid transport \
1532        protocol matcher: {0:?}"
1533    )]
1534    MasqueradeWithInvalidMatcher(RuleId),
1535    #[error("routine forms a cycle {0:?}")]
1536    CyclicalRoutineGraph(RoutineId),
1537    #[error("invalid change was pushed: {0:?}")]
1538    ErrorOnChange(Vec<(Change, ChangeCommitError)>),
1539    #[error("unknown FIDL type: {0}")]
1540    FidlConversion(#[from] FidlConversionError),
1541}
1542
1543/// Extension type for [`fnet_filter::Change`].
1544#[derive(Debug, Clone, PartialEq)]
1545pub enum Change {
1546    Create(Resource),
1547    Remove(ResourceId),
1548}
1549
1550impl From<Change> for fnet_filter::Change {
1551    fn from(change: Change) -> Self {
1552        match change {
1553            Change::Create(resource) => Self::Create(resource.into()),
1554            Change::Remove(resource) => Self::Remove(resource.into()),
1555        }
1556    }
1557}
1558
1559impl TryFrom<fnet_filter::Change> for Change {
1560    type Error = FidlConversionError;
1561
1562    fn try_from(change: fnet_filter::Change) -> Result<Self, Self::Error> {
1563        match change {
1564            fnet_filter::Change::Create(resource) => Ok(Self::Create(resource.try_into()?)),
1565            fnet_filter::Change::Remove(resource) => Ok(Self::Remove(resource.try_into()?)),
1566            fnet_filter::Change::__SourceBreaking { .. } => {
1567                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
1568            }
1569        }
1570    }
1571}
1572
1573/// A controller for filtering state.
1574pub struct Controller {
1575    controller: fnet_filter::NamespaceControllerProxy,
1576    // The client provides an ID when creating a new controller, but the server
1577    // may need to assign a different ID to avoid conflicts; either way, the
1578    // server informs the client of the final `ControllerId` on creation.
1579    id: ControllerId,
1580    // Changes that have been pushed to the server but not yet committed. This
1581    // allows the `Controller` to report more informative errors by correlating
1582    // error codes with particular changes.
1583    pending_changes: Vec<Change>,
1584}
1585
1586impl Controller {
1587    pub async fn new_root(
1588        root: &fnet_root::FilterProxy,
1589        ControllerId(id): &ControllerId,
1590    ) -> Result<Self, ControllerCreationError> {
1591        let (controller, server_end) = fidl::endpoints::create_proxy();
1592        root.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1593
1594        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1595            .take_event_stream()
1596            .next()
1597            .await
1598            .ok_or(ControllerCreationError::NoIdAssigned)?
1599            .map_err(ControllerCreationError::IdAssignment)?;
1600        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1601    }
1602
1603    /// Creates a new `Controller`.
1604    ///
1605    /// Note that the provided `ControllerId` may need to be modified server-
1606    /// side to avoid collisions; to obtain the final ID assigned to the
1607    /// `Controller`, use the `id` method.
1608    pub async fn new(
1609        control: &fnet_filter::ControlProxy,
1610        ControllerId(id): &ControllerId,
1611    ) -> Result<Self, ControllerCreationError> {
1612        let (controller, server_end) = fidl::endpoints::create_proxy();
1613        control.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1614
1615        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1616            .take_event_stream()
1617            .next()
1618            .await
1619            .ok_or(ControllerCreationError::NoIdAssigned)?
1620            .map_err(ControllerCreationError::IdAssignment)?;
1621        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1622    }
1623
1624    pub fn id(&self) -> &ControllerId {
1625        &self.id
1626    }
1627
1628    pub async fn push_changes(&mut self, changes: Vec<Change>) -> Result<(), PushChangesError> {
1629        let fidl_changes = changes.iter().cloned().map(Into::into).collect::<Vec<_>>();
1630        let result = self
1631            .controller
1632            .push_changes(&fidl_changes)
1633            .await
1634            .map_err(PushChangesError::CallMethod)?;
1635        handle_change_validation_result(result, &changes)?;
1636        // Maintain a client-side copy of the pending changes we've pushed to
1637        // the server in order to provide better error messages if a commit
1638        // fails.
1639        self.pending_changes.extend(changes);
1640        Ok(())
1641    }
1642
1643    async fn commit_with_options(
1644        &mut self,
1645        options: fnet_filter::CommitOptions,
1646    ) -> Result<(), CommitError> {
1647        let committed_changes = std::mem::take(&mut self.pending_changes);
1648        let result = self.controller.commit(options).await.map_err(CommitError::CallMethod)?;
1649        handle_commit_result(result, committed_changes)
1650    }
1651
1652    pub async fn commit(&mut self) -> Result<(), CommitError> {
1653        self.commit_with_options(fnet_filter::CommitOptions::default()).await
1654    }
1655
1656    pub async fn commit_idempotent(&mut self) -> Result<(), CommitError> {
1657        self.commit_with_options(fnet_filter::CommitOptions {
1658            idempotent: Some(true),
1659            __source_breaking: SourceBreaking,
1660        })
1661        .await
1662    }
1663}
1664
1665pub(crate) fn handle_change_validation_result(
1666    change_validation_result: fnet_filter::ChangeValidationResult,
1667    changes: &Vec<Change>,
1668) -> Result<(), PushChangesError> {
1669    match change_validation_result {
1670        fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}) => Ok(()),
1671        fnet_filter::ChangeValidationResult::TooManyChanges(fnet_filter::Empty {}) => {
1672            Err(PushChangesError::TooManyChanges)
1673        }
1674        fnet_filter::ChangeValidationResult::ErrorOnChange(results) => {
1675            let errors: Result<_, PushChangesError> =
1676                changes.iter().zip(results).try_fold(Vec::new(), |mut errors, (change, result)| {
1677                    match result {
1678                        fnet_filter::ChangeValidationError::Ok
1679                        | fnet_filter::ChangeValidationError::NotReached => Ok(errors),
1680                        error @ (fnet_filter::ChangeValidationError::MissingRequiredField
1681                        | fnet_filter::ChangeValidationError::InvalidInterfaceMatcher
1682                        | fnet_filter::ChangeValidationError::InvalidAddressMatcher
1683                        | fnet_filter::ChangeValidationError::InvalidPortMatcher
1684                        | fnet_filter::ChangeValidationError::InvalidTransparentProxyAction
1685                        | fnet_filter::ChangeValidationError::InvalidNatAction
1686                        | fnet_filter::ChangeValidationError::InvalidPortRange) => {
1687                            let error = error
1688                                .try_into()
1689                                .expect("`Ok` and `NotReached` are handled in another arm");
1690                            errors.push((change.clone(), error));
1691                            Ok(errors)
1692                        }
1693                        fnet_filter::ChangeValidationError::__SourceBreaking { .. } => {
1694                            Err(FidlConversionError::UnknownUnionVariant(
1695                                type_names::CHANGE_VALIDATION_ERROR,
1696                            )
1697                            .into())
1698                        }
1699                    }
1700                });
1701            Err(PushChangesError::ErrorOnChange(errors?))
1702        }
1703        fnet_filter::ChangeValidationResult::__SourceBreaking { .. } => {
1704            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_RESULT)
1705                .into())
1706        }
1707    }
1708}
1709
1710pub(crate) fn handle_commit_result(
1711    commit_result: fnet_filter::CommitResult,
1712    committed_changes: Vec<Change>,
1713) -> Result<(), CommitError> {
1714    match commit_result {
1715        fnet_filter::CommitResult::Ok(fnet_filter::Empty {}) => Ok(()),
1716        fnet_filter::CommitResult::RuleWithInvalidMatcher(rule_id) => {
1717            Err(CommitError::RuleWithInvalidMatcher(rule_id.into()))
1718        }
1719        fnet_filter::CommitResult::RuleWithInvalidAction(rule_id) => {
1720            Err(CommitError::RuleWithInvalidAction(rule_id.into()))
1721        }
1722        fnet_filter::CommitResult::TransparentProxyWithInvalidMatcher(rule_id) => {
1723            Err(CommitError::TransparentProxyWithInvalidMatcher(rule_id.into()))
1724        }
1725        fnet_filter::CommitResult::RedirectWithInvalidMatcher(rule_id) => {
1726            Err(CommitError::RedirectWithInvalidMatcher(rule_id.into()))
1727        }
1728        fnet_filter::CommitResult::MasqueradeWithInvalidMatcher(rule_id) => {
1729            Err(CommitError::MasqueradeWithInvalidMatcher(rule_id.into()))
1730        }
1731        fnet_filter::CommitResult::CyclicalRoutineGraph(routine_id) => {
1732            Err(CommitError::CyclicalRoutineGraph(routine_id.into()))
1733        }
1734        fnet_filter::CommitResult::ErrorOnChange(results) => {
1735            let errors: Result<_, CommitError> = committed_changes
1736                .into_iter()
1737                .zip(results)
1738                .try_fold(Vec::new(), |mut errors, (change, result)| match result {
1739                    fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1740                        Ok(errors)
1741                    }
1742                    error @ (fnet_filter::CommitError::NamespaceNotFound
1743                    | fnet_filter::CommitError::RoutineNotFound
1744                    | fnet_filter::CommitError::RuleNotFound
1745                    | fnet_filter::CommitError::AlreadyExists
1746                    | fnet_filter::CommitError::TargetRoutineIsInstalled) => {
1747                        let error = error
1748                            .try_into()
1749                            .expect("`Ok` and `NotReached` are handled in another arm");
1750                        errors.push((change, error));
1751                        Ok(errors)
1752                    }
1753                    fnet_filter::CommitError::__SourceBreaking { .. } => {
1754                        Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR)
1755                            .into())
1756                    }
1757                });
1758            Err(CommitError::ErrorOnChange(errors?))
1759        }
1760        fnet_filter::CommitResult::__SourceBreaking { .. } => {
1761            Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_RESULT).into())
1762        }
1763    }
1764}
1765
1766#[cfg(test)]
1767mod tests {
1768
1769    use assert_matches::assert_matches;
1770    use futures::channel::mpsc;
1771    use futures::task::Poll;
1772    use futures::{FutureExt as _, SinkExt as _};
1773    use net_declare::{fidl_ip, fidl_subnet};
1774    use test_case::test_case;
1775
1776    use super::*;
1777
1778    #[test_case(
1779        fnet_filter::ResourceId::Namespace(String::from("namespace")),
1780        ResourceId::Namespace(NamespaceId(String::from("namespace")));
1781        "NamespaceId"
1782    )]
1783    #[test_case(fnet_filter::Domain::Ipv4, Domain::Ipv4; "Domain")]
1784    #[test_case(
1785        fnet_filter::Namespace {
1786            id: Some(String::from("namespace")),
1787            domain: Some(fnet_filter::Domain::Ipv4),
1788            ..Default::default()
1789        },
1790        Namespace { id: NamespaceId(String::from("namespace")), domain: Domain::Ipv4 };
1791        "Namespace"
1792    )]
1793    #[test_case(fnet_filter::IpInstallationHook::Egress, IpHook::Egress; "IpHook")]
1794    #[test_case(fnet_filter::NatInstallationHook::Egress, NatHook::Egress; "NatHook")]
1795    #[test_case(
1796        fnet_filter::InstalledIpRoutine {
1797            hook: Some(fnet_filter::IpInstallationHook::Egress),
1798            priority: Some(1),
1799            ..Default::default()
1800        },
1801        InstalledIpRoutine {
1802            hook: IpHook::Egress,
1803            priority: 1,
1804        };
1805        "InstalledIpRoutine"
1806    )]
1807    #[test_case(
1808        fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
1809            installation: Some(fnet_filter::InstalledIpRoutine {
1810                hook: Some(fnet_filter::IpInstallationHook::LocalEgress),
1811                priority: Some(1),
1812                ..Default::default()
1813            }),
1814            ..Default::default()
1815        }),
1816        RoutineType::Ip(Some(InstalledIpRoutine { hook: IpHook::LocalEgress, priority: 1 }));
1817        "RoutineType"
1818    )]
1819    #[test_case(
1820        fnet_filter::Routine {
1821            id: Some(fnet_filter::RoutineId {
1822                namespace: String::from("namespace"),
1823                name: String::from("routine"),
1824            }),
1825            type_: Some(fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine::default())),
1826            ..Default::default()
1827        },
1828        Routine {
1829            id: RoutineId {
1830                namespace: NamespaceId(String::from("namespace")),
1831                name: String::from("routine"),
1832            },
1833            routine_type: RoutineType::Nat(None),
1834        };
1835        "Routine"
1836    )]
1837    #[test_case(
1838        fnet_filter::InterfaceMatcher::Id(1),
1839        InterfaceMatcher::Id(NonZeroU64::new(1).unwrap());
1840        "InterfaceMatcher"
1841    )]
1842    #[test_case(
1843        fnet_filter::AddressMatcherType::Subnet(fidl_subnet!("192.0.2.0/24")),
1844        AddressMatcherType::Subnet(Subnet(fidl_subnet!("192.0.2.0/24")));
1845        "AddressMatcherType"
1846    )]
1847    #[test_case(
1848        fnet_filter::AddressMatcher {
1849            matcher: fnet_filter::AddressMatcherType::Subnet(fidl_subnet!("192.0.2.0/24")),
1850            invert: true,
1851        },
1852        AddressMatcher {
1853            matcher: AddressMatcherType::Subnet(Subnet(fidl_subnet!("192.0.2.0/24"))),
1854            invert: true,
1855        };
1856        "AddressMatcher"
1857    )]
1858    #[test_case(
1859        fnet_filter::AddressRange {
1860            start: fidl_ip!("192.0.2.0"),
1861            end: fidl_ip!("192.0.2.1"),
1862        },
1863        AddressRange {
1864            range: fidl_ip!("192.0.2.0")..=fidl_ip!("192.0.2.1"),
1865        };
1866        "AddressRange"
1867    )]
1868    #[test_case(
1869        fnet_filter::TransportProtocol::Udp(fnet_filter::UdpMatcher {
1870            src_port: Some(fnet_filter::PortMatcher { start: 1024, end: u16::MAX, invert: false }),
1871            dst_port: None,
1872            ..Default::default()
1873        }),
1874        TransportProtocolMatcher::Udp {
1875            src_port: Some(PortMatcher { range: 1024..=u16::MAX, invert: false }),
1876            dst_port: None,
1877        };
1878        "TransportProtocol"
1879    )]
1880    #[test_case(
1881        fnet_filter::Matchers {
1882            in_interface: Some(fnet_filter::InterfaceMatcher::Name(String::from("wlan"))),
1883            transport_protocol: Some(fnet_filter::TransportProtocol::Tcp(fnet_filter::TcpMatcher {
1884                src_port: None,
1885                dst_port: Some(fnet_filter::PortMatcher { start: 22, end: 22, invert: false }),
1886                ..Default::default()
1887            })),
1888            ..Default::default()
1889        },
1890        Matchers {
1891            in_interface: Some(InterfaceMatcher::Name(String::from("wlan"))),
1892            transport_protocol: Some(TransportProtocolMatcher::Tcp {
1893                src_port: None,
1894                dst_port: Some(PortMatcher { range: 22..=22, invert: false }),
1895            }),
1896            ..Default::default()
1897        };
1898        "Matchers"
1899    )]
1900    #[test_case(
1901        fnet_filter::Action::Accept(fnet_filter::Empty {}),
1902        Action::Accept;
1903        "Action"
1904    )]
1905    #[test_case(
1906        fnet_filter::Rule {
1907            id: fnet_filter::RuleId {
1908                routine: fnet_filter::RoutineId {
1909                    namespace: String::from("namespace"),
1910                    name: String::from("routine"),
1911                },
1912                index: 1,
1913            },
1914            matchers: fnet_filter::Matchers {
1915                transport_protocol: Some(fnet_filter::TransportProtocol::Icmp(
1916                    fnet_filter::IcmpMatcher::default()
1917                )),
1918                ..Default::default()
1919            },
1920            action: fnet_filter::Action::Drop(fnet_filter::Empty {}),
1921        },
1922        Rule {
1923            id: RuleId {
1924                routine: RoutineId {
1925                    namespace: NamespaceId(String::from("namespace")),
1926                    name: String::from("routine"),
1927                },
1928                index: 1,
1929            },
1930            matchers: Matchers {
1931                transport_protocol: Some(TransportProtocolMatcher::Icmp),
1932                ..Default::default()
1933            },
1934            action: Action::Drop,
1935        };
1936        "Rule"
1937    )]
1938    #[test_case(
1939        fnet_filter::Resource::Namespace(fnet_filter::Namespace {
1940            id: Some(String::from("namespace")),
1941            domain: Some(fnet_filter::Domain::Ipv4),
1942            ..Default::default()
1943        }),
1944        Resource::Namespace(Namespace {
1945            id: NamespaceId(String::from("namespace")),
1946            domain: Domain::Ipv4
1947        });
1948        "Resource"
1949    )]
1950    #[test_case(
1951        fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}),
1952        Event::EndOfUpdate;
1953        "Event"
1954    )]
1955    #[test_case(
1956        fnet_filter::Change::Remove(fnet_filter::ResourceId::Namespace(String::from("namespace"))),
1957        Change::Remove(ResourceId::Namespace(NamespaceId(String::from("namespace"))));
1958        "Change"
1959    )]
1960    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
1961    where
1962        E: TryFrom<F> + Clone + Debug + PartialEq,
1963        <E as TryFrom<F>>::Error: Debug + PartialEq,
1964        F: From<E> + Clone + Debug + PartialEq,
1965    {
1966        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
1967        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type.clone());
1968    }
1969
1970    #[test]
1971    fn resource_id_try_from_unknown_variant() {
1972        assert_eq!(
1973            ResourceId::try_from(fnet_filter::ResourceId::__SourceBreaking { unknown_ordinal: 0 }),
1974            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
1975        );
1976    }
1977
1978    #[test]
1979    fn domain_try_from_unknown_variant() {
1980        assert_eq!(
1981            Domain::try_from(fnet_filter::Domain::__SourceBreaking { unknown_ordinal: 0 }),
1982            Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
1983        );
1984    }
1985
1986    #[test]
1987    fn namespace_try_from_missing_properties() {
1988        assert_eq!(
1989            Namespace::try_from(fnet_filter::Namespace {
1990                id: None,
1991                domain: Some(fnet_filter::Domain::Ipv4),
1992                ..Default::default()
1993            }),
1994            Err(FidlConversionError::MissingNamespaceId)
1995        );
1996        assert_eq!(
1997            Namespace::try_from(fnet_filter::Namespace {
1998                id: Some(String::from("namespace")),
1999                domain: None,
2000                ..Default::default()
2001            }),
2002            Err(FidlConversionError::MissingNamespaceDomain)
2003        );
2004    }
2005
2006    #[test]
2007    fn ip_installation_hook_try_from_unknown_variant() {
2008        assert_eq!(
2009            IpHook::try_from(fnet_filter::IpInstallationHook::__SourceBreaking {
2010                unknown_ordinal: 0
2011            }),
2012            Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
2013        );
2014    }
2015
2016    #[test]
2017    fn nat_installation_hook_try_from_unknown_variant() {
2018        assert_eq!(
2019            NatHook::try_from(fnet_filter::NatInstallationHook::__SourceBreaking {
2020                unknown_ordinal: 0
2021            }),
2022            Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
2023        );
2024    }
2025
2026    #[test]
2027    fn installed_ip_routine_try_from_missing_hook() {
2028        assert_eq!(
2029            InstalledIpRoutine::try_from(fnet_filter::InstalledIpRoutine {
2030                hook: None,
2031                ..Default::default()
2032            }),
2033            Err(FidlConversionError::MissingIpInstallationHook)
2034        );
2035    }
2036
2037    #[test]
2038    fn installed_nat_routine_try_from_missing_hook() {
2039        assert_eq!(
2040            InstalledNatRoutine::try_from(fnet_filter::InstalledNatRoutine {
2041                hook: None,
2042                ..Default::default()
2043            }),
2044            Err(FidlConversionError::MissingNatInstallationHook)
2045        );
2046    }
2047
2048    #[test]
2049    fn routine_type_try_from_unknown_variant() {
2050        assert_eq!(
2051            RoutineType::try_from(fnet_filter::RoutineType::__SourceBreaking {
2052                unknown_ordinal: 0
2053            }),
2054            Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
2055        );
2056    }
2057
2058    #[test]
2059    fn routine_try_from_missing_properties() {
2060        assert_eq!(
2061            Routine::try_from(fnet_filter::Routine { id: None, ..Default::default() }),
2062            Err(FidlConversionError::MissingRoutineId)
2063        );
2064        assert_eq!(
2065            Routine::try_from(fnet_filter::Routine {
2066                id: Some(fnet_filter::RoutineId {
2067                    namespace: String::from("namespace"),
2068                    name: String::from("routine"),
2069                }),
2070                type_: None,
2071                ..Default::default()
2072            }),
2073            Err(FidlConversionError::MissingRoutineType)
2074        );
2075    }
2076
2077    #[test]
2078    fn interface_matcher_try_from_unknown_variant() {
2079        assert_eq!(
2080            InterfaceMatcher::try_from(fnet_filter::InterfaceMatcher::__SourceBreaking {
2081                unknown_ordinal: 0
2082            }),
2083            Err(FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER))
2084        );
2085    }
2086
2087    #[test]
2088    fn interface_matcher_try_from_invalid() {
2089        assert_eq!(
2090            InterfaceMatcher::try_from(fnet_filter::InterfaceMatcher::Id(0)),
2091            Err(FidlConversionError::ZeroInterfaceId)
2092        );
2093    }
2094
2095    #[test]
2096    fn address_matcher_type_try_from_unknown_variant() {
2097        assert_eq!(
2098            AddressMatcherType::try_from(fnet_filter::AddressMatcherType::__SourceBreaking {
2099                unknown_ordinal: 0
2100            }),
2101            Err(FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE))
2102        );
2103    }
2104
2105    #[test]
2106    fn subnet_try_from_invalid() {
2107        assert_eq!(
2108            Subnet::try_from(fnet::Subnet { addr: fidl_ip!("192.0.2.1"), prefix_len: 33 }),
2109            Err(FidlConversionError::SubnetPrefixTooLong)
2110        );
2111        assert_eq!(
2112            Subnet::try_from(fidl_subnet!("192.0.2.1/24")),
2113            Err(FidlConversionError::SubnetHostBitsSet)
2114        );
2115    }
2116
2117    #[test]
2118    fn address_range_try_from_invalid() {
2119        assert_eq!(
2120            AddressRange::try_from(fnet_filter::AddressRange {
2121                start: fidl_ip!("192.0.2.1"),
2122                end: fidl_ip!("192.0.2.0"),
2123            }),
2124            Err(FidlConversionError::InvalidAddressRange)
2125        );
2126        assert_eq!(
2127            AddressRange::try_from(fnet_filter::AddressRange {
2128                start: fidl_ip!("2001:db8::1"),
2129                end: fidl_ip!("2001:db8::"),
2130            }),
2131            Err(FidlConversionError::InvalidAddressRange)
2132        );
2133    }
2134
2135    #[test]
2136    fn address_range_try_from_family_mismatch() {
2137        assert_eq!(
2138            AddressRange::try_from(fnet_filter::AddressRange {
2139                start: fidl_ip!("192.0.2.0"),
2140                end: fidl_ip!("2001:db8::"),
2141            }),
2142            Err(FidlConversionError::AddressRangeFamilyMismatch)
2143        );
2144    }
2145
2146    #[test]
2147    fn port_matcher_try_from_invalid() {
2148        assert_eq!(
2149            PortMatcher::try_from(fnet_filter::PortMatcher { start: 1, end: 0, invert: false }),
2150            Err(FidlConversionError::InvalidPortMatcherRange)
2151        );
2152    }
2153
2154    #[test]
2155    fn transport_protocol_try_from_unknown_variant() {
2156        assert_eq!(
2157            TransportProtocolMatcher::try_from(fnet_filter::TransportProtocol::__SourceBreaking {
2158                unknown_ordinal: 0
2159            }),
2160            Err(FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL))
2161        );
2162    }
2163
2164    #[test]
2165    fn action_try_from_unknown_variant() {
2166        assert_eq!(
2167            Action::try_from(fnet_filter::Action::__SourceBreaking { unknown_ordinal: 0 }),
2168            Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
2169        );
2170    }
2171
2172    #[test]
2173    fn resource_try_from_unknown_variant() {
2174        assert_eq!(
2175            Resource::try_from(fnet_filter::Resource::__SourceBreaking { unknown_ordinal: 0 }),
2176            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
2177        );
2178    }
2179
2180    #[test]
2181    fn event_try_from_unknown_variant() {
2182        assert_eq!(
2183            Event::try_from(fnet_filter::Event::__SourceBreaking { unknown_ordinal: 0 }),
2184            Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
2185        );
2186    }
2187
2188    #[test]
2189    fn change_try_from_unknown_variant() {
2190        assert_eq!(
2191            Change::try_from(fnet_filter::Change::__SourceBreaking { unknown_ordinal: 0 }),
2192            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
2193        );
2194    }
2195
2196    fn test_controller_a() -> ControllerId {
2197        ControllerId(String::from("test-controller-a"))
2198    }
2199
2200    fn test_controller_b() -> ControllerId {
2201        ControllerId(String::from("test-controller-b"))
2202    }
2203
2204    pub(crate) fn test_resource_id() -> ResourceId {
2205        ResourceId::Namespace(NamespaceId(String::from("test-namespace")))
2206    }
2207
2208    pub(crate) fn test_resource() -> Resource {
2209        Resource::Namespace(Namespace {
2210            id: NamespaceId(String::from("test-namespace")),
2211            domain: Domain::AllIp,
2212        })
2213    }
2214
2215    pub(crate) fn invalid_resource() -> Resource {
2216        Resource::Rule(Rule {
2217            id: RuleId {
2218                routine: RoutineId {
2219                    namespace: NamespaceId(String::from("namespace")),
2220                    name: String::from("routine"),
2221                },
2222                index: 0,
2223            },
2224            matchers: Matchers {
2225                transport_protocol: Some(TransportProtocolMatcher::Tcp {
2226                    #[allow(clippy::reversed_empty_ranges)]
2227                    src_port: Some(PortMatcher { range: u16::MAX..=0, invert: false }),
2228                    dst_port: None,
2229                }),
2230                ..Default::default()
2231            },
2232            action: Action::Drop,
2233        })
2234    }
2235
2236    pub(crate) fn unknown_resource_id() -> ResourceId {
2237        ResourceId::Namespace(NamespaceId(String::from("does-not-exist")))
2238    }
2239
2240    #[fuchsia_async::run_singlethreaded(test)]
2241    async fn event_stream_from_state_conversion_error() {
2242        let (proxy, mut request_stream) =
2243            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2244        let stream = event_stream_from_state(proxy).expect("get event stream");
2245        futures::pin_mut!(stream);
2246
2247        let send_invalid_event = async {
2248            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2249                request_stream
2250                    .next()
2251                    .await
2252                    .expect("client should call state")
2253                    .expect("request should not error");
2254            let fnet_filter::WatcherRequest::Watch { responder } = request
2255                .into_stream()
2256                .next()
2257                .await
2258                .expect("client should call watch")
2259                .expect("request should not error");
2260            responder
2261                .send(&[fnet_filter::Event::Added(fnet_filter::AddedResource {
2262                    controller: String::from("controller"),
2263                    resource: fnet_filter::Resource::Namespace(fnet_filter::Namespace {
2264                        id: None,
2265                        domain: None,
2266                        ..Default::default()
2267                    }),
2268                })])
2269                .expect("send batch with invalid event");
2270        };
2271        let ((), result) = futures::future::join(send_invalid_event, stream.next()).await;
2272        assert_matches!(
2273            result,
2274            Some(Err(WatchError::Conversion(FidlConversionError::MissingNamespaceId)))
2275        );
2276    }
2277
2278    #[fuchsia_async::run_singlethreaded(test)]
2279    async fn event_stream_from_state_empty_event_batch() {
2280        let (proxy, mut request_stream) =
2281            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2282        let stream = event_stream_from_state(proxy).expect("get event stream");
2283        futures::pin_mut!(stream);
2284
2285        let send_empty_batch = async {
2286            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2287                request_stream
2288                    .next()
2289                    .await
2290                    .expect("client should call state")
2291                    .expect("request should not error");
2292            let fnet_filter::WatcherRequest::Watch { responder } = request
2293                .into_stream()
2294                .next()
2295                .await
2296                .expect("client should call watch")
2297                .expect("request should not error");
2298            responder.send(&[]).expect("send empty batch");
2299        };
2300        let ((), result) = futures::future::join(send_empty_batch, stream.next()).await;
2301        assert_matches!(result, Some(Err(WatchError::EmptyEventBatch)));
2302    }
2303
2304    #[fuchsia_async::run_singlethreaded(test)]
2305    async fn get_existing_resources_success() {
2306        let event_stream = futures::stream::iter([
2307            Ok(Event::Existing(test_controller_a(), test_resource())),
2308            Ok(Event::Existing(test_controller_b(), test_resource())),
2309            Ok(Event::Idle),
2310            Ok(Event::Removed(test_controller_a(), test_resource_id())),
2311        ]);
2312        futures::pin_mut!(event_stream);
2313
2314        let existing = get_existing_resources::<HashMap<_, _>>(event_stream.by_ref())
2315            .await
2316            .expect("get existing resources");
2317        assert_eq!(
2318            existing,
2319            HashMap::from([
2320                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2321                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2322            ])
2323        );
2324
2325        let trailing_events = event_stream.collect::<Vec<_>>().await;
2326        assert_matches!(
2327            &trailing_events[..],
2328            [Ok(Event::Removed(controller, resource))] if controller == &test_controller_a() &&
2329                                                           resource == &test_resource_id()
2330        );
2331    }
2332
2333    #[fuchsia_async::run_singlethreaded(test)]
2334    async fn get_existing_resources_error_in_stream() {
2335        let event_stream =
2336            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2337        futures::pin_mut!(event_stream);
2338        assert_matches!(
2339            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2340            Err(GetExistingResourcesError::ErrorInStream(WatchError::EmptyEventBatch))
2341        )
2342    }
2343
2344    #[fuchsia_async::run_singlethreaded(test)]
2345    async fn get_existing_resources_unexpected_event() {
2346        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::EndOfUpdate)));
2347        futures::pin_mut!(event_stream);
2348        assert_matches!(
2349            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2350            Err(GetExistingResourcesError::UnexpectedEvent(Event::EndOfUpdate))
2351        )
2352    }
2353
2354    #[fuchsia_async::run_singlethreaded(test)]
2355    async fn get_existing_resources_duplicate_resource() {
2356        let event_stream = futures::stream::iter([
2357            Ok(Event::Existing(test_controller_a(), test_resource())),
2358            Ok(Event::Existing(test_controller_a(), test_resource())),
2359        ]);
2360        futures::pin_mut!(event_stream);
2361        assert_matches!(
2362            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2363            Err(GetExistingResourcesError::DuplicateResource(resource))
2364                if resource == test_resource()
2365        )
2366    }
2367
2368    #[fuchsia_async::run_singlethreaded(test)]
2369    async fn get_existing_resources_stream_ended() {
2370        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::Existing(
2371            test_controller_a(),
2372            test_resource(),
2373        ))));
2374        futures::pin_mut!(event_stream);
2375        assert_matches!(
2376            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2377            Err(GetExistingResourcesError::StreamEnded)
2378        )
2379    }
2380
2381    #[fuchsia_async::run_singlethreaded(test)]
2382    async fn wait_for_condition_add_remove() {
2383        let mut state = HashMap::new();
2384
2385        // Verify that checking for the presence of a resource blocks until the
2386        // resource is added.
2387        let has_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2388            resources.get(&test_controller_a()).map_or(false, |controller| {
2389                controller
2390                    .get(&test_resource_id())
2391                    .map_or(false, |resource| resource == &test_resource())
2392            })
2393        };
2394        assert_matches!(
2395            wait_for_condition(futures::stream::pending(), &mut state, has_resource).now_or_never(),
2396            None
2397        );
2398        assert!(state.is_empty());
2399        assert_matches!(
2400            wait_for_condition(
2401                futures::stream::iter([
2402                    Ok(Event::Added(test_controller_b(), test_resource())),
2403                    Ok(Event::EndOfUpdate),
2404                    Ok(Event::Added(test_controller_a(), test_resource())),
2405                    Ok(Event::EndOfUpdate),
2406                ]),
2407                &mut state,
2408                has_resource
2409            )
2410            .now_or_never(),
2411            Some(Ok(()))
2412        );
2413        assert_eq!(
2414            state,
2415            HashMap::from([
2416                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2417                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2418            ])
2419        );
2420
2421        // Re-add the resource and observe an error.
2422        assert_matches!(
2423            wait_for_condition(
2424                futures::stream::iter([
2425                    Ok(Event::Added(test_controller_a(), test_resource())),
2426                    Ok(Event::EndOfUpdate),
2427                ]),
2428                &mut state,
2429                has_resource
2430            )
2431            .now_or_never(),
2432            Some(Err(WaitForConditionError::AddedAlreadyExisting(r))) if r == test_resource()
2433        );
2434        assert_eq!(
2435            state,
2436            HashMap::from([
2437                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2438                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2439            ])
2440        );
2441
2442        // Verify that checking for the absence of a resource blocks until the
2443        // resource is removed.
2444        let does_not_have_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2445            resources.get(&test_controller_a()).map_or(false, |controller| controller.is_empty())
2446        };
2447        assert_matches!(
2448            wait_for_condition(futures::stream::pending(), &mut state, does_not_have_resource)
2449                .now_or_never(),
2450            None
2451        );
2452        assert_eq!(
2453            state,
2454            HashMap::from([
2455                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2456                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2457            ])
2458        );
2459        assert_matches!(
2460            wait_for_condition(
2461                futures::stream::iter([
2462                    Ok(Event::Removed(test_controller_b(), test_resource_id())),
2463                    Ok(Event::EndOfUpdate),
2464                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2465                    Ok(Event::EndOfUpdate),
2466                ]),
2467                &mut state,
2468                does_not_have_resource
2469            )
2470            .now_or_never(),
2471            Some(Ok(()))
2472        );
2473        assert_eq!(
2474            state,
2475            HashMap::from([
2476                (test_controller_a(), HashMap::new()),
2477                (test_controller_b(), HashMap::new()),
2478            ])
2479        );
2480
2481        // Remove a non-existent resource and observe an error.
2482        assert_matches!(
2483            wait_for_condition(
2484                futures::stream::iter([
2485                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2486                    Ok(Event::EndOfUpdate),
2487                ]),
2488                &mut state,
2489                does_not_have_resource
2490            ).now_or_never(),
2491            Some(Err(WaitForConditionError::RemovedNonExistent(r))) if r == test_resource_id()
2492        );
2493        assert_eq!(
2494            state,
2495            HashMap::from([
2496                (test_controller_a(), HashMap::new()),
2497                (test_controller_b(), HashMap::new()),
2498            ])
2499        );
2500    }
2501
2502    #[test]
2503    fn predicate_not_tested_until_update_complete() {
2504        let mut state = HashMap::new();
2505        let (mut tx, rx) = mpsc::unbounded();
2506
2507        let wait = wait_for_condition(rx, &mut state, |state| !state.is_empty()).fuse();
2508        futures::pin_mut!(wait);
2509
2510        // Sending an `Added` event should *not* allow the wait operation to
2511        // complete, because the predicate should only be tested once the full
2512        // update has been observed.
2513        let mut exec = fuchsia_async::TestExecutor::new();
2514        exec.run_singlethreaded(async {
2515            tx.send(Ok(Event::Added(test_controller_a(), test_resource())))
2516                .await
2517                .expect("receiver should not be closed");
2518        });
2519        assert_matches!(exec.run_until_stalled(&mut wait), Poll::Pending);
2520
2521        exec.run_singlethreaded(async {
2522            tx.send(Ok(Event::EndOfUpdate)).await.expect("receiver should not be closed");
2523            wait.await.expect("condition should be satisfied once update is complete");
2524        });
2525    }
2526
2527    #[fuchsia_async::run_singlethreaded(test)]
2528    async fn wait_for_condition_error_in_stream() {
2529        let mut state = HashMap::new();
2530        let event_stream =
2531            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2532        assert_matches!(
2533            wait_for_condition(event_stream, &mut state, |_| true).await,
2534            Err(WaitForConditionError::ErrorInStream(WatchError::EmptyEventBatch))
2535        );
2536        assert!(state.is_empty());
2537    }
2538
2539    #[fuchsia_async::run_singlethreaded(test)]
2540    async fn wait_for_condition_stream_ended() {
2541        let mut state = HashMap::new();
2542        let event_stream = futures::stream::empty();
2543        assert_matches!(
2544            wait_for_condition(event_stream, &mut state, |_| true).await,
2545            Err(WaitForConditionError::StreamEnded)
2546        );
2547        assert!(state.is_empty());
2548    }
2549
2550    pub(crate) async fn handle_open_controller(
2551        mut request_stream: fnet_filter::ControlRequestStream,
2552    ) -> fnet_filter::NamespaceControllerRequestStream {
2553        let (id, request, _control_handle) = request_stream
2554            .next()
2555            .await
2556            .expect("client should open controller")
2557            .expect("request should not error")
2558            .into_open_controller()
2559            .expect("client should open controller");
2560        let (stream, control_handle) = request.into_stream_and_control_handle();
2561        control_handle.send_on_id_assigned(&id).expect("send assigned ID");
2562
2563        stream
2564    }
2565
2566    pub(crate) async fn handle_push_changes(
2567        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2568        push_changes_result: fnet_filter::ChangeValidationResult,
2569    ) {
2570        let (_changes, responder) = stream
2571            .next()
2572            .await
2573            .expect("client should push changes")
2574            .expect("request should not error")
2575            .into_push_changes()
2576            .expect("client should push changes");
2577        responder.send(push_changes_result).expect("send empty batch");
2578    }
2579
2580    pub(crate) async fn handle_commit(
2581        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2582        commit_result: fnet_filter::CommitResult,
2583    ) {
2584        let (_options, responder) = stream
2585            .next()
2586            .await
2587            .expect("client should commit")
2588            .expect("request should not error")
2589            .into_commit()
2590            .expect("client should commit");
2591        responder.send(commit_result).expect("send commit result");
2592    }
2593
2594    #[fuchsia_async::run_singlethreaded(test)]
2595    async fn controller_push_changes_reports_invalid_change() {
2596        let (control, request_stream) =
2597            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2598        let push_invalid_change = async {
2599            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2600                .await
2601                .expect("create controller");
2602            let result = controller
2603                .push_changes(vec![
2604                    Change::Create(test_resource()),
2605                    Change::Create(invalid_resource()),
2606                    Change::Remove(test_resource_id()),
2607                ])
2608                .await;
2609            assert_matches!(
2610                result,
2611                Err(PushChangesError::ErrorOnChange(errors)) if errors == vec![(
2612                    Change::Create(invalid_resource()),
2613                    ChangeValidationError::InvalidPortMatcher
2614                )]
2615            );
2616        };
2617
2618        let handle_controller = async {
2619            let mut stream = handle_open_controller(request_stream).await;
2620            handle_push_changes(
2621                &mut stream,
2622                fnet_filter::ChangeValidationResult::ErrorOnChange(vec![
2623                    fnet_filter::ChangeValidationError::Ok,
2624                    fnet_filter::ChangeValidationError::InvalidPortMatcher,
2625                    fnet_filter::ChangeValidationError::NotReached,
2626                ]),
2627            )
2628            .await;
2629        };
2630
2631        let ((), ()) = futures::future::join(push_invalid_change, handle_controller).await;
2632    }
2633
2634    #[fuchsia_async::run_singlethreaded(test)]
2635    async fn controller_commit_reports_invalid_change() {
2636        let (control, request_stream) =
2637            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2638        let commit_invalid_change = async {
2639            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2640                .await
2641                .expect("create controller");
2642            controller
2643                .push_changes(vec![
2644                    Change::Create(test_resource()),
2645                    Change::Remove(unknown_resource_id()),
2646                    Change::Remove(test_resource_id()),
2647                ])
2648                .await
2649                .expect("push changes");
2650            let result = controller.commit().await;
2651            assert_matches!(
2652                result,
2653                Err(CommitError::ErrorOnChange(errors)) if errors == vec![(
2654                    Change::Remove(unknown_resource_id()),
2655                    ChangeCommitError::NamespaceNotFound,
2656                )]
2657            );
2658        };
2659        let handle_controller = async {
2660            let mut stream = handle_open_controller(request_stream).await;
2661            handle_push_changes(
2662                &mut stream,
2663                fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
2664            )
2665            .await;
2666            handle_commit(
2667                &mut stream,
2668                fnet_filter::CommitResult::ErrorOnChange(vec![
2669                    fnet_filter::CommitError::Ok,
2670                    fnet_filter::CommitError::NamespaceNotFound,
2671                    fnet_filter::CommitError::Ok,
2672                ]),
2673            )
2674            .await;
2675        };
2676        let ((), ()) = futures::future::join(commit_invalid_change, handle_controller).await;
2677    }
2678}