Skip to main content

fidl_fuchsia_net_filter_ext/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.net.filter FIDL library.
6//!
7//! Note that this library as written is not meant for inclusion in the SDK. It
8//! is only meant to be used in conjunction with a netstack that is compiled
9//! against the same API level of the `fuchsia.net.filter` FIDL library. This
10//! library opts in to compile-time and runtime breakage when the FIDL library
11//! is evolved in order to enforce that it is updated along with the FIDL
12//! library itself.
13
14#[cfg(target_os = "fuchsia")]
15pub mod sync;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::num::NonZeroU16;
20use std::ops::RangeInclusive;
21
22use async_utils::fold::FoldWhile;
23use fidl::marker::SourceBreaking;
24#[cfg(not(feature = "fdomain"))]
25use fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext;
26#[cfg(feature = "fdomain")]
27use fidl_fuchsia_net_interfaces_ext_fdomain as fnet_interfaces_ext;
28#[cfg(not(feature = "fdomain"))]
29use fidl_fuchsia_net_matchers_ext as fnet_matchers_ext;
30#[cfg(feature = "fdomain")]
31use fidl_fuchsia_net_matchers_ext_fdomain as fnet_matchers_ext;
32use flex_client::ProxyHasDomain as _;
33use flex_fuchsia_ebpf as febpf;
34use flex_fuchsia_net as fnet;
35use flex_fuchsia_net_filter as fnet_filter;
36use flex_fuchsia_net_root as fnet_root;
37use futures::{Stream, StreamExt as _, TryStreamExt as _};
38use thiserror::Error;
39
40/// Conversion errors from `fnet_filter` FIDL types to the
41/// equivalents defined in this module.
42#[derive(Debug, Error, PartialEq)]
43pub enum FidlConversionError {
44    #[error("union is of an unknown variant: {0}")]
45    UnknownUnionVariant(&'static str),
46    #[error("namespace ID not provided")]
47    MissingNamespaceId,
48    #[error("namespace domain not provided")]
49    MissingNamespaceDomain,
50    #[error("routine ID not provided")]
51    MissingRoutineId,
52    #[error("routine type not provided")]
53    MissingRoutineType,
54    #[error("IP installation hook not provided")]
55    MissingIpInstallationHook,
56    #[error("NAT installation hook not provided")]
57    MissingNatInstallationHook,
58    #[error("interface matcher specified an invalid ID of 0")]
59    ZeroInterfaceId,
60    #[error("invalid address range (start must be <= end)")]
61    InvalidAddressRange,
62    #[error("address range start and end addresses are not the same IP family")]
63    AddressRangeFamilyMismatch,
64    #[error("prefix length of subnet is longer than number of bits in IP address")]
65    SubnetPrefixTooLong,
66    #[error("host bits are set in subnet network")]
67    SubnetHostBitsSet,
68    #[error("invalid port matcher range (start must be <= end)")]
69    InvalidPortMatcherRange,
70    #[error("transparent proxy action specified an invalid local port of 0")]
71    UnspecifiedTransparentProxyPort,
72    #[error("NAT action specified an invalid rewrite port of 0")]
73    UnspecifiedNatPort,
74    #[error("invalid port range (start must be <= end)")]
75    InvalidPortRange,
76    #[error("non-error result variant could not be converted to an error")]
77    NotAnError,
78}
79
80impl From<fnet_matchers_ext::PortError> for FidlConversionError {
81    fn from(value: fnet_matchers_ext::PortError) -> Self {
82        match value {
83            fnet_matchers_ext::PortError::InvalidPortRange => {
84                FidlConversionError::InvalidPortMatcherRange
85            }
86        }
87    }
88}
89
90impl From<fnet_matchers_ext::InterfaceError> for FidlConversionError {
91    fn from(value: fnet_matchers_ext::InterfaceError) -> Self {
92        match value {
93            fnet_matchers_ext::InterfaceError::ZeroId => FidlConversionError::ZeroInterfaceId,
94            fnet_matchers_ext::InterfaceError::UnknownUnionVariant => {
95                FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
96            }
97            fnet_matchers_ext::InterfaceError::UnknownPortClass(unknown_port_class_error) => {
98                match unknown_port_class_error {
99                    fnet_interfaces_ext::UnknownPortClassError::NetInterfaces(_) => {
100                        FidlConversionError::UnknownUnionVariant(
101                            type_names::NET_INTERFACES_PORT_CLASS,
102                        )
103                    }
104                    fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(_) => {
105                        FidlConversionError::UnknownUnionVariant(
106                            type_names::HARDWARE_NETWORK_PORT_CLASS,
107                        )
108                    }
109                }
110            }
111        }
112    }
113}
114
115impl From<fnet_matchers_ext::AddressError> for FidlConversionError {
116    fn from(value: fnet_matchers_ext::AddressError) -> Self {
117        match value {
118            fnet_matchers_ext::AddressError::AddressMatcherType(address_matcher_type_error) => {
119                address_matcher_type_error.into()
120            }
121        }
122    }
123}
124
125impl From<fnet_matchers_ext::AddressMatcherTypeError> for FidlConversionError {
126    fn from(value: fnet_matchers_ext::AddressMatcherTypeError) -> Self {
127        match value {
128            fnet_matchers_ext::AddressMatcherTypeError::Subnet(subnet_error) => subnet_error.into(),
129            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(address_range_error) => {
130                address_range_error.into()
131            }
132            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant => {
133                FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
134            }
135        }
136    }
137}
138
139impl From<fnet_matchers_ext::AddressRangeError> for FidlConversionError {
140    fn from(value: fnet_matchers_ext::AddressRangeError) -> Self {
141        match value {
142            fnet_matchers_ext::AddressRangeError::Invalid => {
143                FidlConversionError::InvalidAddressRange
144            }
145            fnet_matchers_ext::AddressRangeError::FamilyMismatch => {
146                FidlConversionError::AddressRangeFamilyMismatch
147            }
148        }
149    }
150}
151
152impl From<fnet_matchers_ext::SubnetError> for FidlConversionError {
153    fn from(value: fnet_matchers_ext::SubnetError) -> Self {
154        match value {
155            fnet_matchers_ext::SubnetError::PrefixTooLong => {
156                FidlConversionError::SubnetPrefixTooLong
157            }
158            fnet_matchers_ext::SubnetError::HostBitsSet => FidlConversionError::SubnetHostBitsSet,
159        }
160    }
161}
162
163impl From<fnet_matchers_ext::TransportProtocolError> for FidlConversionError {
164    fn from(value: fnet_matchers_ext::TransportProtocolError) -> Self {
165        match value {
166            fnet_matchers_ext::TransportProtocolError::Port(port_matcher_error) => {
167                port_matcher_error.into()
168            }
169            fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant => {
170                FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
171            }
172        }
173    }
174}
175
176// TODO(https://fxbug.dev/317058051): remove this when the Rust FIDL bindings
177// expose constants for these.
178mod type_names {
179    pub(super) const RESOURCE_ID: &str = "fuchsia.net.filter/ResourceId";
180    pub(super) const DOMAIN: &str = "fuchsia.net.filter/Domain";
181    pub(super) const IP_INSTALLATION_HOOK: &str = "fuchsia.net.filter/IpInstallationHook";
182    pub(super) const NAT_INSTALLATION_HOOK: &str = "fuchsia.net.filter/NatInstallationHook";
183    pub(super) const ROUTINE_TYPE: &str = "fuchsia.net.filter/RoutineType";
184    pub(super) const INTERFACE_MATCHER: &str = "fuchsia.net.matchers/Interface";
185    pub(super) const ADDRESS_MATCHER_TYPE: &str = "fuchsia.net.filter/AddressMatcherType";
186    pub(super) const TRANSPORT_PROTOCOL: &str = "fuchsia.net.matchers/TransportProtocol";
187    pub(super) const ACTION: &str = "fuchsia.net.filter/Action";
188    pub(super) const MARK_ACTION: &str = "fuchsia.net.filter/MarkAction";
189    pub(super) const TRANSPARENT_PROXY: &str = "fuchsia.net.filter/TransparentProxy";
190    pub(super) const RESOURCE: &str = "fuchsia.net.filter/Resource";
191    pub(super) const EVENT: &str = "fuchsia.net.filter/Event";
192    pub(super) const CHANGE: &str = "fuchsia.net.filter/Change";
193    pub(super) const CHANGE_VALIDATION_ERROR: &str = "fuchsia.net.filter/ChangeValidationError";
194    pub(super) const CHANGE_VALIDATION_RESULT: &str = "fuchsia.net.filter/ChangeValidationResult";
195    pub(super) const COMMIT_ERROR: &str = "fuchsia.net.filter/CommitError";
196    pub(super) const COMMIT_RESULT: &str = "fuchsia.net.filter/CommitResult";
197    pub(super) const NET_INTERFACES_PORT_CLASS: &str = "fuchsia.net.interfaces/PortClass";
198    pub(super) const HARDWARE_NETWORK_PORT_CLASS: &str = "fuchsia.hardware.network/PortClass";
199    pub(super) const REJECT_TYPE: &str = "fuchsia.net.filter/RejectType";
200}
201
202/// Extension type for [`fnet_filter::NamespaceId`].
203#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
204pub struct NamespaceId(pub String);
205
206/// Extension type for [`fnet_filter::RoutineId`].
207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
208pub struct RoutineId {
209    pub namespace: NamespaceId,
210    pub name: String,
211}
212
213impl From<fnet_filter::RoutineId> for RoutineId {
214    fn from(id: fnet_filter::RoutineId) -> Self {
215        let fnet_filter::RoutineId { namespace, name } = id;
216        Self { namespace: NamespaceId(namespace), name }
217    }
218}
219
220impl From<RoutineId> for fnet_filter::RoutineId {
221    fn from(id: RoutineId) -> Self {
222        let RoutineId { namespace, name } = id;
223        let NamespaceId(namespace) = namespace;
224        Self { namespace, name }
225    }
226}
227
228/// Extension type for [`fnet_filter::RuleId`].
229#[derive(Debug, Clone, PartialEq, Eq, Hash)]
230pub struct RuleId {
231    pub routine: RoutineId,
232    pub index: u32,
233}
234
235impl From<fnet_filter::RuleId> for RuleId {
236    fn from(id: fnet_filter::RuleId) -> Self {
237        let fnet_filter::RuleId { routine, index } = id;
238        Self { routine: routine.into(), index }
239    }
240}
241
242impl From<RuleId> for fnet_filter::RuleId {
243    fn from(id: RuleId) -> Self {
244        let RuleId { routine, index } = id;
245        Self { routine: routine.into(), index }
246    }
247}
248
249/// Extension type for [`fnet_filter::ResourceId`].
250#[derive(Debug, Clone, PartialEq, Eq, Hash)]
251pub enum ResourceId {
252    Namespace(NamespaceId),
253    Routine(RoutineId),
254    Rule(RuleId),
255}
256
257impl TryFrom<fnet_filter::ResourceId> for ResourceId {
258    type Error = FidlConversionError;
259
260    fn try_from(id: fnet_filter::ResourceId) -> Result<Self, Self::Error> {
261        match id {
262            fnet_filter::ResourceId::Namespace(id) => Ok(Self::Namespace(NamespaceId(id))),
263            fnet_filter::ResourceId::Routine(id) => Ok(Self::Routine(id.into())),
264            fnet_filter::ResourceId::Rule(id) => Ok(Self::Rule(id.into())),
265            fnet_filter::ResourceId::__SourceBreaking { .. } => {
266                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
267            }
268        }
269    }
270}
271
272impl From<ResourceId> for fnet_filter::ResourceId {
273    fn from(id: ResourceId) -> Self {
274        match id {
275            ResourceId::Namespace(NamespaceId(id)) => fnet_filter::ResourceId::Namespace(id),
276            ResourceId::Routine(id) => fnet_filter::ResourceId::Routine(id.into()),
277            ResourceId::Rule(id) => fnet_filter::ResourceId::Rule(id.into()),
278        }
279    }
280}
281
282/// Extension type for [`fnet_filter::Domain`].
283#[derive(Debug, Clone, PartialEq)]
284pub enum Domain {
285    Ipv4,
286    Ipv6,
287    AllIp,
288}
289
290impl From<Domain> for fnet_filter::Domain {
291    fn from(domain: Domain) -> Self {
292        match domain {
293            Domain::Ipv4 => fnet_filter::Domain::Ipv4,
294            Domain::Ipv6 => fnet_filter::Domain::Ipv6,
295            Domain::AllIp => fnet_filter::Domain::AllIp,
296        }
297    }
298}
299
300impl TryFrom<fnet_filter::Domain> for Domain {
301    type Error = FidlConversionError;
302
303    fn try_from(domain: fnet_filter::Domain) -> Result<Self, Self::Error> {
304        match domain {
305            fnet_filter::Domain::Ipv4 => Ok(Self::Ipv4),
306            fnet_filter::Domain::Ipv6 => Ok(Self::Ipv6),
307            fnet_filter::Domain::AllIp => Ok(Self::AllIp),
308            fnet_filter::Domain::__SourceBreaking { .. } => {
309                Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
310            }
311        }
312    }
313}
314
315/// Extension type for [`fnet_filter::Namespace`].
316#[derive(Debug, Clone, PartialEq)]
317pub struct Namespace {
318    pub id: NamespaceId,
319    pub domain: Domain,
320}
321
322impl From<Namespace> for fnet_filter::Namespace {
323    fn from(namespace: Namespace) -> Self {
324        let Namespace { id, domain } = namespace;
325        let NamespaceId(id) = id;
326        Self { id: Some(id), domain: Some(domain.into()), __source_breaking: SourceBreaking }
327    }
328}
329
330impl TryFrom<fnet_filter::Namespace> for Namespace {
331    type Error = FidlConversionError;
332
333    fn try_from(namespace: fnet_filter::Namespace) -> Result<Self, Self::Error> {
334        let fnet_filter::Namespace { id, domain, __source_breaking } = namespace;
335        let id = NamespaceId(id.ok_or(FidlConversionError::MissingNamespaceId)?);
336        let domain = domain.ok_or(FidlConversionError::MissingNamespaceDomain)?.try_into()?;
337        Ok(Self { id, domain })
338    }
339}
340
341/// Extension type for [`fnet_filter::IpInstallationHook`].
342#[derive(Debug, Clone, Copy, PartialEq)]
343pub enum IpHook {
344    Ingress,
345    LocalIngress,
346    Forwarding,
347    LocalEgress,
348    Egress,
349}
350
351impl From<IpHook> for fnet_filter::IpInstallationHook {
352    fn from(hook: IpHook) -> Self {
353        match hook {
354            IpHook::Ingress => Self::Ingress,
355            IpHook::LocalIngress => Self::LocalIngress,
356            IpHook::Forwarding => Self::Forwarding,
357            IpHook::LocalEgress => Self::LocalEgress,
358            IpHook::Egress => Self::Egress,
359        }
360    }
361}
362
363impl TryFrom<fnet_filter::IpInstallationHook> for IpHook {
364    type Error = FidlConversionError;
365
366    fn try_from(hook: fnet_filter::IpInstallationHook) -> Result<Self, Self::Error> {
367        match hook {
368            fnet_filter::IpInstallationHook::Ingress => Ok(Self::Ingress),
369            fnet_filter::IpInstallationHook::LocalIngress => Ok(Self::LocalIngress),
370            fnet_filter::IpInstallationHook::Forwarding => Ok(Self::Forwarding),
371            fnet_filter::IpInstallationHook::LocalEgress => Ok(Self::LocalEgress),
372            fnet_filter::IpInstallationHook::Egress => Ok(Self::Egress),
373            fnet_filter::IpInstallationHook::__SourceBreaking { .. } => {
374                Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
375            }
376        }
377    }
378}
379
380/// Extension type for [`fnet_filter::NatInstallationHook`].
381#[derive(Debug, Clone, Copy, PartialEq)]
382pub enum NatHook {
383    Ingress,
384    LocalIngress,
385    LocalEgress,
386    Egress,
387}
388
389impl From<NatHook> for fnet_filter::NatInstallationHook {
390    fn from(hook: NatHook) -> Self {
391        match hook {
392            NatHook::Ingress => Self::Ingress,
393            NatHook::LocalIngress => Self::LocalIngress,
394            NatHook::LocalEgress => Self::LocalEgress,
395            NatHook::Egress => Self::Egress,
396        }
397    }
398}
399
400impl TryFrom<fnet_filter::NatInstallationHook> for NatHook {
401    type Error = FidlConversionError;
402
403    fn try_from(hook: fnet_filter::NatInstallationHook) -> Result<Self, Self::Error> {
404        match hook {
405            fnet_filter::NatInstallationHook::Ingress => Ok(Self::Ingress),
406            fnet_filter::NatInstallationHook::LocalIngress => Ok(Self::LocalIngress),
407            fnet_filter::NatInstallationHook::LocalEgress => Ok(Self::LocalEgress),
408            fnet_filter::NatInstallationHook::Egress => Ok(Self::Egress),
409            fnet_filter::NatInstallationHook::__SourceBreaking { .. } => {
410                Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
411            }
412        }
413    }
414}
415
416/// Extension type for [`fnet_filter::InstalledIpRoutine`].
417#[derive(Debug, Clone, PartialEq)]
418pub struct InstalledIpRoutine {
419    pub hook: IpHook,
420    pub priority: i32,
421}
422
423impl From<InstalledIpRoutine> for fnet_filter::InstalledIpRoutine {
424    fn from(routine: InstalledIpRoutine) -> Self {
425        let InstalledIpRoutine { hook, priority } = routine;
426        Self {
427            hook: Some(hook.into()),
428            priority: Some(priority),
429            __source_breaking: SourceBreaking,
430        }
431    }
432}
433
434impl TryFrom<fnet_filter::InstalledIpRoutine> for InstalledIpRoutine {
435    type Error = FidlConversionError;
436
437    fn try_from(routine: fnet_filter::InstalledIpRoutine) -> Result<Self, Self::Error> {
438        let fnet_filter::InstalledIpRoutine { hook, priority, __source_breaking } = routine;
439        let hook = hook.ok_or(FidlConversionError::MissingIpInstallationHook)?;
440        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
441        Ok(Self { hook: hook.try_into()?, priority })
442    }
443}
444
445/// Extension type for [`fnet_filter::InstalledNatRoutine`].
446#[derive(Debug, Clone, PartialEq)]
447pub struct InstalledNatRoutine {
448    pub hook: NatHook,
449    pub priority: i32,
450}
451
452impl From<InstalledNatRoutine> for fnet_filter::InstalledNatRoutine {
453    fn from(routine: InstalledNatRoutine) -> Self {
454        let InstalledNatRoutine { hook, priority } = routine;
455        Self {
456            hook: Some(hook.into()),
457            priority: Some(priority),
458            __source_breaking: SourceBreaking,
459        }
460    }
461}
462
463impl TryFrom<fnet_filter::InstalledNatRoutine> for InstalledNatRoutine {
464    type Error = FidlConversionError;
465
466    fn try_from(routine: fnet_filter::InstalledNatRoutine) -> Result<Self, Self::Error> {
467        let fnet_filter::InstalledNatRoutine { hook, priority, __source_breaking } = routine;
468        let hook = hook.ok_or(FidlConversionError::MissingNatInstallationHook)?;
469        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
470        Ok(Self { hook: hook.try_into()?, priority })
471    }
472}
473
474/// Extension type for [`fnet_filter::RoutineType`].
475#[derive(Debug, Clone, PartialEq)]
476pub enum RoutineType {
477    Ip(Option<InstalledIpRoutine>),
478    Nat(Option<InstalledNatRoutine>),
479}
480
481impl RoutineType {
482    pub fn is_installed(&self) -> bool {
483        // The `InstalledIpRoutine` or `InstalledNatRoutine` configuration is
484        // optional, and when omitted, signifies an uninstalled routine.
485        match self {
486            Self::Ip(Some(_)) | Self::Nat(Some(_)) => true,
487            Self::Ip(None) | Self::Nat(None) => false,
488        }
489    }
490}
491
492impl From<RoutineType> for fnet_filter::RoutineType {
493    fn from(routine: RoutineType) -> Self {
494        match routine {
495            RoutineType::Ip(installation) => Self::Ip(fnet_filter::IpRoutine {
496                installation: installation.map(Into::into),
497                __source_breaking: SourceBreaking,
498            }),
499            RoutineType::Nat(installation) => Self::Nat(fnet_filter::NatRoutine {
500                installation: installation.map(Into::into),
501                __source_breaking: SourceBreaking,
502            }),
503        }
504    }
505}
506
507impl TryFrom<fnet_filter::RoutineType> for RoutineType {
508    type Error = FidlConversionError;
509
510    fn try_from(type_: fnet_filter::RoutineType) -> Result<Self, Self::Error> {
511        match type_ {
512            fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
513                installation,
514                __source_breaking,
515            }) => Ok(RoutineType::Ip(installation.map(TryInto::try_into).transpose()?)),
516            fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine {
517                installation,
518                __source_breaking,
519            }) => Ok(RoutineType::Nat(installation.map(TryInto::try_into).transpose()?)),
520            fnet_filter::RoutineType::__SourceBreaking { .. } => {
521                Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
522            }
523        }
524    }
525}
526
527/// Extension type for [`fnet_filter::Routine`].
528#[derive(Debug, Clone, PartialEq)]
529pub struct Routine {
530    pub id: RoutineId,
531    pub routine_type: RoutineType,
532}
533
534impl From<Routine> for fnet_filter::Routine {
535    fn from(routine: Routine) -> Self {
536        let Routine { id, routine_type: type_ } = routine;
537        Self { id: Some(id.into()), type_: Some(type_.into()), __source_breaking: SourceBreaking }
538    }
539}
540
541impl TryFrom<fnet_filter::Routine> for Routine {
542    type Error = FidlConversionError;
543
544    fn try_from(routine: fnet_filter::Routine) -> Result<Self, Self::Error> {
545        let fnet_filter::Routine { id, type_, __source_breaking } = routine;
546        let id = id.ok_or(FidlConversionError::MissingRoutineId)?;
547        let type_ = type_.ok_or(FidlConversionError::MissingRoutineType)?;
548        Ok(Self { id: id.into(), routine_type: type_.try_into()? })
549    }
550}
551
552/// Extension type for [`fnet_filter::Matchers`].
553#[derive(Default, Clone, PartialEq)]
554pub struct Matchers {
555    pub in_interface: Option<fnet_matchers_ext::Interface>,
556    pub out_interface: Option<fnet_matchers_ext::Interface>,
557    pub src_addr: Option<fnet_matchers_ext::Address>,
558    pub dst_addr: Option<fnet_matchers_ext::Address>,
559    pub transport_protocol: Option<fnet_matchers_ext::TransportProtocol>,
560    pub ebpf_program: Option<febpf::ProgramId>,
561}
562
563impl From<Matchers> for fnet_filter::Matchers {
564    fn from(matchers: Matchers) -> Self {
565        let Matchers {
566            in_interface,
567            out_interface,
568            src_addr,
569            dst_addr,
570            transport_protocol,
571            ebpf_program,
572        } = matchers;
573        Self {
574            in_interface: in_interface.map(Into::into),
575            out_interface: out_interface.map(Into::into),
576            src_addr: src_addr.map(Into::into),
577            dst_addr: dst_addr.map(Into::into),
578            transport_protocol: transport_protocol.map(Into::into),
579            ebpf_program: ebpf_program.map(Into::into),
580            __source_breaking: SourceBreaking,
581        }
582    }
583}
584
585impl TryFrom<fnet_filter::Matchers> for Matchers {
586    type Error = FidlConversionError;
587
588    fn try_from(matchers: fnet_filter::Matchers) -> Result<Self, Self::Error> {
589        let fnet_filter::Matchers {
590            in_interface,
591            out_interface,
592            src_addr,
593            dst_addr,
594            transport_protocol,
595            ebpf_program,
596            __source_breaking,
597        } = matchers;
598        Ok(Self {
599            in_interface: in_interface.map(TryInto::try_into).transpose()?,
600            out_interface: out_interface.map(TryInto::try_into).transpose()?,
601            src_addr: src_addr.map(TryInto::try_into).transpose()?,
602            dst_addr: dst_addr.map(TryInto::try_into).transpose()?,
603            transport_protocol: transport_protocol.map(TryInto::try_into).transpose()?,
604            ebpf_program: ebpf_program.map(Into::into),
605        })
606    }
607}
608
609impl Debug for Matchers {
610    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611        let mut debug_struct = f.debug_struct("Matchers");
612
613        let Matchers {
614            in_interface,
615            out_interface,
616            src_addr,
617            dst_addr,
618            transport_protocol,
619            ebpf_program,
620        } = &self;
621
622        // Omit empty fields.
623        if let Some(matcher) = in_interface {
624            let _ = debug_struct.field("in_interface", matcher);
625        }
626
627        if let Some(matcher) = out_interface {
628            let _ = debug_struct.field("out_interface", matcher);
629        }
630
631        if let Some(matcher) = src_addr {
632            let _ = debug_struct.field("src_addr", matcher);
633        }
634
635        if let Some(matcher) = dst_addr {
636            let _ = debug_struct.field("dst_addr", matcher);
637        }
638
639        if let Some(matcher) = transport_protocol {
640            let _ = debug_struct.field("transport_protocol", matcher);
641        }
642
643        if let Some(matcher) = ebpf_program {
644            let _ = debug_struct.field("ebpf_program", matcher);
645        }
646
647        debug_struct.finish()
648    }
649}
650
651/// Extension type for [`fnet_filter::Action`].
652#[derive(Debug, Clone, PartialEq)]
653pub enum Action {
654    Accept,
655    Drop,
656    Jump(String),
657    Return,
658    TransparentProxy(TransparentProxy),
659    Redirect { dst_port: Option<PortRange> },
660    Masquerade { src_port: Option<PortRange> },
661    Mark { domain: fnet::MarkDomain, action: MarkAction },
662    None,
663    Reject(RejectType),
664}
665
666#[derive(Debug, Clone, PartialEq)]
667pub enum MarkAction {
668    SetMark { clearing_mask: fnet::Mark, mark: fnet::Mark },
669}
670
671/// Extension type for [`fnet_filter::TransparentProxy_`].
672#[derive(Debug, Clone, PartialEq)]
673pub enum TransparentProxy {
674    LocalAddr(fnet::IpAddress),
675    LocalPort(NonZeroU16),
676    LocalAddrAndPort(fnet::IpAddress, NonZeroU16),
677}
678
679#[derive(Debug, Clone, PartialEq)]
680pub struct PortRange(pub RangeInclusive<NonZeroU16>);
681
682impl From<PortRange> for fnet_filter::PortRange {
683    fn from(range: PortRange) -> Self {
684        let PortRange(range) = range;
685        Self { start: range.start().get(), end: range.end().get() }
686    }
687}
688
689impl TryFrom<fnet_filter::PortRange> for PortRange {
690    type Error = FidlConversionError;
691
692    fn try_from(range: fnet_filter::PortRange) -> Result<Self, Self::Error> {
693        let fnet_filter::PortRange { start, end } = range;
694        if start > end {
695            Err(FidlConversionError::InvalidPortRange)
696        } else {
697            let start = NonZeroU16::new(start).ok_or(FidlConversionError::UnspecifiedNatPort)?;
698            let end = NonZeroU16::new(end).ok_or(FidlConversionError::UnspecifiedNatPort)?;
699            Ok(Self(start..=end))
700        }
701    }
702}
703
704#[derive(Debug, Clone, Copy, PartialEq)]
705pub enum RejectType {
706    TcpReset,
707    NetUnreachable,
708    HostUnreachable,
709    ProtoUnreachable,
710    PortUnreachable,
711    RoutePolicyFail,
712    RejectRoute,
713    AdminProhibited,
714}
715
716impl From<RejectType> for fnet_filter::RejectType {
717    fn from(value: RejectType) -> Self {
718        match value {
719            RejectType::TcpReset => fnet_filter::RejectType::TcpReset,
720            RejectType::NetUnreachable => fnet_filter::RejectType::NetUnreachable,
721            RejectType::HostUnreachable => fnet_filter::RejectType::HostUnreachable,
722            RejectType::ProtoUnreachable => fnet_filter::RejectType::ProtoUnreachable,
723            RejectType::PortUnreachable => fnet_filter::RejectType::PortUnreachable,
724            RejectType::RoutePolicyFail => fnet_filter::RejectType::RoutePolicyFail,
725            RejectType::RejectRoute => fnet_filter::RejectType::RejectRoute,
726            RejectType::AdminProhibited => fnet_filter::RejectType::AdminProhibited,
727        }
728    }
729}
730
731impl TryFrom<fnet_filter::RejectType> for RejectType {
732    type Error = FidlConversionError;
733
734    fn try_from(value: fnet_filter::RejectType) -> Result<Self, Self::Error> {
735        match value {
736            fnet_filter::RejectType::TcpReset => Ok(RejectType::TcpReset),
737            fnet_filter::RejectType::NetUnreachable => Ok(RejectType::NetUnreachable),
738            fnet_filter::RejectType::HostUnreachable => Ok(RejectType::HostUnreachable),
739            fnet_filter::RejectType::ProtoUnreachable => Ok(RejectType::ProtoUnreachable),
740            fnet_filter::RejectType::PortUnreachable => Ok(RejectType::PortUnreachable),
741            fnet_filter::RejectType::RoutePolicyFail => Ok(RejectType::RoutePolicyFail),
742            fnet_filter::RejectType::RejectRoute => Ok(RejectType::RejectRoute),
743            fnet_filter::RejectType::AdminProhibited => Ok(RejectType::AdminProhibited),
744            fnet_filter::RejectType::__SourceBreaking { .. } => {
745                Err(FidlConversionError::UnknownUnionVariant(type_names::REJECT_TYPE))
746            }
747        }
748    }
749}
750
751impl From<Action> for fnet_filter::Action {
752    fn from(action: Action) -> Self {
753        match action {
754            Action::Accept => Self::Accept(fnet_filter::Empty {}),
755            Action::Drop => Self::Drop(fnet_filter::Empty {}),
756            Action::Jump(target) => Self::Jump(target),
757            Action::Return => Self::Return_(fnet_filter::Empty {}),
758            Action::TransparentProxy(proxy) => Self::TransparentProxy(match proxy {
759                TransparentProxy::LocalAddr(addr) => {
760                    fnet_filter::TransparentProxy_::LocalAddr(addr)
761                }
762                TransparentProxy::LocalPort(port) => {
763                    fnet_filter::TransparentProxy_::LocalPort(port.get())
764                }
765                TransparentProxy::LocalAddrAndPort(addr, port) => {
766                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
767                        addr,
768                        port: port.get(),
769                    })
770                }
771            }),
772            Action::Redirect { dst_port } => Self::Redirect(fnet_filter::Redirect {
773                dst_port: dst_port.map(Into::into),
774                __source_breaking: SourceBreaking,
775            }),
776            Action::Masquerade { src_port } => Self::Masquerade(fnet_filter::Masquerade {
777                src_port: src_port.map(Into::into),
778                __source_breaking: SourceBreaking,
779            }),
780            Action::Mark { domain, action } => {
781                Self::Mark(fnet_filter::Mark { domain, action: action.into() })
782            }
783            Action::None => Self::None(fnet_filter::Empty {}),
784            Action::Reject(reject_type) => {
785                Self::Reject(fnet_filter::Reject { reject_type: reject_type.into() })
786            }
787        }
788    }
789}
790
791impl TryFrom<fnet_filter::Action> for Action {
792    type Error = FidlConversionError;
793
794    fn try_from(action: fnet_filter::Action) -> Result<Self, Self::Error> {
795        match action {
796            fnet_filter::Action::Accept(fnet_filter::Empty {}) => Ok(Self::Accept),
797            fnet_filter::Action::Drop(fnet_filter::Empty {}) => Ok(Self::Drop),
798            fnet_filter::Action::Jump(target) => Ok(Self::Jump(target)),
799            fnet_filter::Action::Return_(fnet_filter::Empty {}) => Ok(Self::Return),
800            fnet_filter::Action::TransparentProxy(proxy) => {
801                Ok(Self::TransparentProxy(match proxy {
802                    fnet_filter::TransparentProxy_::LocalAddr(addr) => {
803                        TransparentProxy::LocalAddr(addr)
804                    }
805                    fnet_filter::TransparentProxy_::LocalPort(port) => {
806                        let port = NonZeroU16::new(port)
807                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
808                        TransparentProxy::LocalPort(port)
809                    }
810                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
811                        addr,
812                        port,
813                    }) => {
814                        let port = NonZeroU16::new(port)
815                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
816                        TransparentProxy::LocalAddrAndPort(addr, port)
817                    }
818                    fnet_filter::TransparentProxy_::__SourceBreaking { .. } => {
819                        return Err(FidlConversionError::UnknownUnionVariant(
820                            type_names::TRANSPARENT_PROXY,
821                        ));
822                    }
823                }))
824            }
825            fnet_filter::Action::Redirect(fnet_filter::Redirect {
826                dst_port,
827                __source_breaking,
828            }) => Ok(Self::Redirect { dst_port: dst_port.map(TryInto::try_into).transpose()? }),
829            fnet_filter::Action::Masquerade(fnet_filter::Masquerade {
830                src_port,
831                __source_breaking,
832            }) => Ok(Self::Masquerade { src_port: src_port.map(TryInto::try_into).transpose()? }),
833            fnet_filter::Action::Mark(fnet_filter::Mark { domain, action }) => {
834                Ok(Self::Mark { domain, action: action.try_into()? })
835            }
836            fnet_filter::Action::__SourceBreaking { .. } => {
837                Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
838            }
839            fnet_filter::Action::None(fnet_filter::Empty {}) => Ok(Self::None),
840            fnet_filter::Action::Reject(fnet_filter::Reject { reject_type }) => {
841                Ok(Self::Reject(reject_type.try_into()?))
842            }
843        }
844    }
845}
846
847impl From<MarkAction> for fnet_filter::MarkAction {
848    fn from(action: MarkAction) -> Self {
849        match action {
850            MarkAction::SetMark { clearing_mask, mark } => {
851                Self::SetMark(fnet_filter::SetMark { clearing_mask, mark })
852            }
853        }
854    }
855}
856
857impl TryFrom<fnet_filter::MarkAction> for MarkAction {
858    type Error = FidlConversionError;
859    fn try_from(action: fnet_filter::MarkAction) -> Result<Self, Self::Error> {
860        match action {
861            fnet_filter::MarkAction::SetMark(fnet_filter::SetMark { clearing_mask, mark }) => {
862                Ok(Self::SetMark { clearing_mask, mark })
863            }
864            fnet_filter::MarkAction::__SourceBreaking { .. } => {
865                Err(FidlConversionError::UnknownUnionVariant(type_names::MARK_ACTION))
866            }
867        }
868    }
869}
870
871/// Extension type for [`fnet_filter::Rule`].
872#[derive(Debug, Clone, PartialEq)]
873pub struct Rule {
874    pub id: RuleId,
875    pub matchers: Matchers,
876    pub action: Action,
877}
878
879impl From<Rule> for fnet_filter::Rule {
880    fn from(rule: Rule) -> Self {
881        let Rule { id, matchers, action } = rule;
882        Self { id: id.into(), matchers: matchers.into(), action: action.into() }
883    }
884}
885
886impl TryFrom<fnet_filter::Rule> for Rule {
887    type Error = FidlConversionError;
888
889    fn try_from(rule: fnet_filter::Rule) -> Result<Self, Self::Error> {
890        let fnet_filter::Rule { id, matchers, action } = rule;
891        Ok(Self { id: id.into(), matchers: matchers.try_into()?, action: action.try_into()? })
892    }
893}
894
895/// Extension type for [`fnet_filter::Resource`].
896#[derive(Debug, Clone, PartialEq)]
897pub enum Resource {
898    Namespace(Namespace),
899    Routine(Routine),
900    Rule(Rule),
901}
902
903impl Resource {
904    pub fn id(&self) -> ResourceId {
905        match self {
906            Self::Namespace(Namespace { id, domain: _ }) => ResourceId::Namespace(id.clone()),
907            Self::Routine(Routine { id, routine_type: _ }) => ResourceId::Routine(id.clone()),
908            Self::Rule(Rule { id, matchers: _, action: _ }) => ResourceId::Rule(id.clone()),
909        }
910    }
911}
912
913impl From<Resource> for fnet_filter::Resource {
914    fn from(resource: Resource) -> Self {
915        match resource {
916            Resource::Namespace(namespace) => Self::Namespace(namespace.into()),
917            Resource::Routine(routine) => Self::Routine(routine.into()),
918            Resource::Rule(rule) => Self::Rule(rule.into()),
919        }
920    }
921}
922
923impl TryFrom<fnet_filter::Resource> for Resource {
924    type Error = FidlConversionError;
925
926    fn try_from(resource: fnet_filter::Resource) -> Result<Self, Self::Error> {
927        match resource {
928            fnet_filter::Resource::Namespace(namespace) => {
929                Ok(Self::Namespace(namespace.try_into()?))
930            }
931            fnet_filter::Resource::Routine(routine) => Ok(Self::Routine(routine.try_into()?)),
932            fnet_filter::Resource::Rule(rule) => Ok(Self::Rule(rule.try_into()?)),
933            fnet_filter::Resource::__SourceBreaking { .. } => {
934                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
935            }
936        }
937    }
938}
939
940/// Extension type for [`fnet_filter::ControllerId`].
941#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
942pub struct ControllerId(pub String);
943
944/// Extension type for [`fnet_filter::Event`].
945#[derive(Debug, Clone, PartialEq)]
946pub enum Event {
947    Existing(ControllerId, Resource),
948    Idle,
949    Added(ControllerId, Resource),
950    Removed(ControllerId, ResourceId),
951    EndOfUpdate,
952}
953
954impl From<Event> for fnet_filter::Event {
955    fn from(event: Event) -> Self {
956        match event {
957            Event::Existing(controller, resource) => {
958                let ControllerId(id) = controller;
959                Self::Existing(fnet_filter::ExistingResource {
960                    controller: id,
961                    resource: resource.into(),
962                })
963            }
964            Event::Idle => Self::Idle(fnet_filter::Empty {}),
965            Event::Added(controller, resource) => {
966                let ControllerId(id) = controller;
967                Self::Added(fnet_filter::AddedResource {
968                    controller: id,
969                    resource: resource.into(),
970                })
971            }
972            Event::Removed(controller, resource) => {
973                let ControllerId(id) = controller;
974                Self::Removed(fnet_filter::RemovedResource {
975                    controller: id,
976                    resource: resource.into(),
977                })
978            }
979            Event::EndOfUpdate => Self::EndOfUpdate(fnet_filter::Empty {}),
980        }
981    }
982}
983
984impl TryFrom<fnet_filter::Event> for Event {
985    type Error = FidlConversionError;
986
987    fn try_from(event: fnet_filter::Event) -> Result<Self, Self::Error> {
988        match event {
989            fnet_filter::Event::Existing(fnet_filter::ExistingResource {
990                controller,
991                resource,
992            }) => Ok(Self::Existing(ControllerId(controller), resource.try_into()?)),
993            fnet_filter::Event::Idle(fnet_filter::Empty {}) => Ok(Self::Idle),
994            fnet_filter::Event::Added(fnet_filter::AddedResource { controller, resource }) => {
995                Ok(Self::Added(ControllerId(controller), resource.try_into()?))
996            }
997            fnet_filter::Event::Removed(fnet_filter::RemovedResource { controller, resource }) => {
998                Ok(Self::Removed(ControllerId(controller), resource.try_into()?))
999            }
1000            fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}) => Ok(Self::EndOfUpdate),
1001            fnet_filter::Event::__SourceBreaking { .. } => {
1002                Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
1003            }
1004        }
1005    }
1006}
1007
1008/// Filter watcher creation errors.
1009#[derive(Debug, Error)]
1010pub enum WatcherCreationError {
1011    #[error("failed to create filter watcher proxy: {0}")]
1012    CreateProxy(fidl::Error),
1013    #[error("failed to get filter watcher: {0}")]
1014    GetWatcher(fidl::Error),
1015}
1016
1017/// Filter watcher `Watch` errors.
1018#[derive(Debug, Error)]
1019pub enum WatchError {
1020    /// The call to `Watch` returned a FIDL error.
1021    #[error("the call to `Watch()` failed: {0}")]
1022    Fidl(fidl::Error),
1023    /// The event returned by `Watch` encountered a conversion error.
1024    #[error("failed to convert event returned by `Watch()`: {0}")]
1025    Conversion(FidlConversionError),
1026    /// The server returned an empty batch of events.
1027    #[error("the call to `Watch()` returned an empty batch of events")]
1028    EmptyEventBatch,
1029}
1030
1031/// Connects to the watcher protocol and converts the Hanging-Get style API into
1032/// an Event stream.
1033///
1034/// Each call to `Watch` returns a batch of events, which are flattened into a
1035/// single stream. If an error is encountered while calling `Watch` or while
1036/// converting the event, the stream is immediately terminated.
1037pub fn event_stream_from_state(
1038    state: fnet_filter::StateProxy,
1039) -> Result<impl Stream<Item = Result<Event, WatchError>>, WatcherCreationError> {
1040    let (watcher, server_end) = state.domain().create_proxy::<fnet_filter::WatcherMarker>();
1041    state
1042        .get_watcher(&fnet_filter::WatcherOptions::default(), server_end)
1043        .map_err(WatcherCreationError::GetWatcher)?;
1044
1045    let stream = futures::stream::try_unfold(watcher, |watcher| async {
1046        let events = watcher.watch().await.map_err(WatchError::Fidl)?;
1047        if events.is_empty() {
1048            return Err(WatchError::EmptyEventBatch);
1049        }
1050
1051        let event_stream = futures::stream::iter(events).map(Ok).and_then(|event| {
1052            futures::future::ready(event.try_into().map_err(WatchError::Conversion))
1053        });
1054        Ok(Some((event_stream, watcher)))
1055    })
1056    .try_flatten();
1057
1058    Ok(stream)
1059}
1060
1061/// Errors returned by [`get_existing_resources`].
1062#[derive(Debug, Error)]
1063pub enum GetExistingResourcesError {
1064    /// There was an error in the event stream.
1065    #[error("there was an error in the event stream: {0}")]
1066    ErrorInStream(WatchError),
1067    /// There was an unexpected event in the event stream. Only `existing` or
1068    /// `idle` events are expected.
1069    #[error("there was an unexpected event in the event stream: {0:?}")]
1070    UnexpectedEvent(Event),
1071    /// A duplicate existing resource was reported in the event stream.
1072    #[error("a duplicate existing resource was reported")]
1073    DuplicateResource(Resource),
1074    /// The event stream unexpectedly ended.
1075    #[error("the event stream unexpectedly ended")]
1076    StreamEnded,
1077}
1078
1079/// A trait for types holding filtering state that can be updated by change
1080/// events.
1081pub trait Update {
1082    /// Add the resource to the specified controller's state.
1083    ///
1084    /// Optionally returns a resource that has already been added to the
1085    /// controller with the same [`ResourceId`].
1086    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource>;
1087
1088    /// Remove the resource from the specified controller's state.
1089    ///
1090    /// Returns the removed resource, if present.
1091    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource>;
1092}
1093
1094impl Update for HashMap<ControllerId, HashMap<ResourceId, Resource>> {
1095    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource> {
1096        self.entry(controller).or_default().insert(resource.id(), resource)
1097    }
1098
1099    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource> {
1100        self.get_mut(&controller)?.remove(resource)
1101    }
1102}
1103
1104/// Collects all `existing` events from the stream, stopping once the `idle`
1105/// event is observed.
1106pub async fn get_existing_resources<C: Update + Default>(
1107    stream: impl Stream<Item = Result<Event, WatchError>>,
1108) -> Result<C, GetExistingResourcesError> {
1109    async_utils::fold::fold_while(
1110        stream,
1111        Ok(C::default()),
1112        |resources: Result<C, GetExistingResourcesError>, event| {
1113            let mut resources =
1114                resources.expect("`resources` must be `Ok`, because we stop folding on err");
1115            futures::future::ready(match event {
1116                Err(e) => FoldWhile::Done(Err(GetExistingResourcesError::ErrorInStream(e))),
1117                Ok(e) => match e {
1118                    Event::Existing(controller, resource) => {
1119                        if let Some(resource) = resources.add(controller, resource) {
1120                            FoldWhile::Done(Err(GetExistingResourcesError::DuplicateResource(
1121                                resource,
1122                            )))
1123                        } else {
1124                            FoldWhile::Continue(Ok(resources))
1125                        }
1126                    }
1127                    Event::Idle => FoldWhile::Done(Ok(resources)),
1128                    e @ (Event::Added(_, _) | Event::Removed(_, _) | Event::EndOfUpdate) => {
1129                        FoldWhile::Done(Err(GetExistingResourcesError::UnexpectedEvent(e)))
1130                    }
1131                },
1132            })
1133        },
1134    )
1135    .await
1136    .short_circuited()
1137    .map_err(|_resources| GetExistingResourcesError::StreamEnded)?
1138}
1139
1140/// Errors returned by [`wait_for_condition`].
1141#[derive(Debug, Error)]
1142pub enum WaitForConditionError {
1143    /// There was an error in the event stream.
1144    #[error("there was an error in the event stream: {0}")]
1145    ErrorInStream(WatchError),
1146    /// There was an `Added` event for an already existing resource.
1147    #[error("observed an added event for an already existing resource: {0:?}")]
1148    AddedAlreadyExisting(Resource),
1149    /// There was a `Removed` event for a non-existent resource.
1150    #[error("observed a removed event for a non-existent resource: {0:?}")]
1151    RemovedNonExistent(ResourceId),
1152    /// The event stream unexpectedly ended.
1153    #[error("the event stream unexpectedly ended")]
1154    StreamEnded,
1155}
1156
1157/// Wait for a condition on filtering state to be satisfied.
1158///
1159/// With the given `initial_state`, take events from `event_stream` and update
1160/// the state, calling `predicate` whenever the state changes. When predicates
1161/// returns `True` yield `Ok(())`.
1162pub async fn wait_for_condition<
1163    C: Update,
1164    S: Stream<Item = Result<Event, WatchError>>,
1165    F: Fn(&C) -> bool,
1166>(
1167    event_stream: S,
1168    initial_state: &mut C,
1169    predicate: F,
1170) -> Result<(), WaitForConditionError> {
1171    async_utils::fold::try_fold_while(
1172        event_stream.map_err(WaitForConditionError::ErrorInStream),
1173        initial_state,
1174        |resources: &mut C, event| {
1175            futures::future::ready(match event {
1176                Event::Existing(controller, resource) | Event::Added(controller, resource) => {
1177                    if let Some(resource) = resources.add(controller, resource) {
1178                        Err(WaitForConditionError::AddedAlreadyExisting(resource))
1179                    } else {
1180                        Ok(FoldWhile::Continue(resources))
1181                    }
1182                }
1183                Event::Removed(controller, resource) => resources
1184                    .remove(controller, &resource)
1185                    .map(|_| FoldWhile::Continue(resources))
1186                    .ok_or(WaitForConditionError::RemovedNonExistent(resource)),
1187                // Wait until a transactional update has been completed to call
1188                // the predicate so it's not run against partially-updated
1189                // state.
1190                Event::Idle | Event::EndOfUpdate => {
1191                    if predicate(&resources) {
1192                        Ok(FoldWhile::Done(()))
1193                    } else {
1194                        Ok(FoldWhile::Continue(resources))
1195                    }
1196                }
1197            })
1198        },
1199    )
1200    .await?
1201    .short_circuited()
1202    .map_err(|_resources: &mut C| WaitForConditionError::StreamEnded)
1203}
1204
1205/// Namespace controller creation errors.
1206#[derive(Debug, Error)]
1207pub enum ControllerCreationError {
1208    #[error("failed to create namespace controller proxy: {0}")]
1209    CreateProxy(fidl::Error),
1210    #[error("failed to open namespace controller: {0}")]
1211    OpenController(fidl::Error),
1212    #[error("server did not emit OnIdAssigned event")]
1213    NoIdAssigned,
1214    #[error("failed to observe ID assignment event: {0}")]
1215    IdAssignment(fidl::Error),
1216}
1217
1218/// Errors for individual changes pushed.
1219///
1220/// Extension type for the error variants of [`fnet_filter::ChangeValidationError`].
1221#[derive(Debug, Error, PartialEq)]
1222pub enum ChangeValidationError {
1223    #[error("change contains a resource that is missing a required field")]
1224    MissingRequiredField,
1225    #[error("rule specifies an invalid interface matcher")]
1226    InvalidInterfaceMatcher,
1227    #[error("rule specifies an invalid address matcher")]
1228    InvalidAddressMatcher,
1229    #[error("rule specifies an invalid port matcher")]
1230    InvalidPortMatcher,
1231    #[error("rule specifies an invalid transparent proxy action")]
1232    InvalidTransparentProxyAction,
1233    #[error("rule specifies an invalid NAT action")]
1234    InvalidNatAction,
1235    #[error("rule specifies an invalid port range")]
1236    InvalidPortRange,
1237}
1238
1239impl TryFrom<fnet_filter::ChangeValidationError> for ChangeValidationError {
1240    type Error = FidlConversionError;
1241
1242    fn try_from(error: fnet_filter::ChangeValidationError) -> Result<Self, Self::Error> {
1243        match error {
1244            fnet_filter::ChangeValidationError::MissingRequiredField => {
1245                Ok(Self::MissingRequiredField)
1246            }
1247            fnet_filter::ChangeValidationError::InvalidInterfaceMatcher => {
1248                Ok(Self::InvalidInterfaceMatcher)
1249            }
1250            fnet_filter::ChangeValidationError::InvalidAddressMatcher => {
1251                Ok(Self::InvalidAddressMatcher)
1252            }
1253            fnet_filter::ChangeValidationError::InvalidPortMatcher => Ok(Self::InvalidPortMatcher),
1254            fnet_filter::ChangeValidationError::InvalidTransparentProxyAction => {
1255                Ok(Self::InvalidTransparentProxyAction)
1256            }
1257            fnet_filter::ChangeValidationError::InvalidNatAction => Ok(Self::InvalidNatAction),
1258            fnet_filter::ChangeValidationError::InvalidPortRange => Ok(Self::InvalidPortRange),
1259            fnet_filter::ChangeValidationError::Ok
1260            | fnet_filter::ChangeValidationError::NotReached => {
1261                Err(FidlConversionError::NotAnError)
1262            }
1263            fnet_filter::ChangeValidationError::__SourceBreaking { unknown_ordinal: _ } => {
1264                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_ERROR))
1265            }
1266        }
1267    }
1268}
1269
1270#[derive(Debug, Error)]
1271pub enum RegisterEbpfProgramError {
1272    #[error("failed to call FIDL method: {0}")]
1273    CallMethod(fidl::Error),
1274
1275    #[error("failed to link the program")]
1276    LinkFailed,
1277
1278    #[error("failed to initialize a map")]
1279    MapFailed,
1280
1281    #[error("the program is already registered")]
1282    AlreadyRegistered,
1283
1284    #[error("the request is missing a required field")]
1285    MissingRequiredField,
1286}
1287
1288impl From<fnet_filter::RegisterEbpfProgramError> for RegisterEbpfProgramError {
1289    fn from(error: fnet_filter::RegisterEbpfProgramError) -> Self {
1290        match error {
1291            fnet_filter::RegisterEbpfProgramError::LinkFailed => Self::LinkFailed,
1292            fnet_filter::RegisterEbpfProgramError::MapFailed => Self::MapFailed,
1293            fnet_filter::RegisterEbpfProgramError::AlreadyRegistered => Self::AlreadyRegistered,
1294            fnet_filter::RegisterEbpfProgramError::MissingRequiredField => {
1295                Self::MissingRequiredField
1296            }
1297        }
1298    }
1299}
1300
1301/// Errors for the NamespaceController.PushChanges method.
1302#[derive(Debug, Error)]
1303pub enum PushChangesError {
1304    #[error("failed to call FIDL method: {0}")]
1305    CallMethod(fidl::Error),
1306    #[error("too many changes were pushed to the server")]
1307    TooManyChanges,
1308    #[error("invalid change(s) pushed: {0:?}")]
1309    ErrorOnChange(Vec<(Change, ChangeValidationError)>),
1310    #[error("unknown FIDL type: {0}")]
1311    FidlConversion(#[from] FidlConversionError),
1312}
1313
1314/// Errors for individual changes committed.
1315///
1316/// Extension type for the error variants of [`fnet_filter::CommitError`].
1317#[derive(Debug, Error, PartialEq)]
1318pub enum ChangeCommitError {
1319    #[error("the change referred to an unknown namespace")]
1320    NamespaceNotFound,
1321    #[error("the change referred to an unknown routine")]
1322    RoutineNotFound,
1323    #[error("the change referred to an unknown rule")]
1324    RuleNotFound,
1325    #[error("the specified resource already exists")]
1326    AlreadyExists,
1327    #[error("the change includes a rule that jumps to an installed routine")]
1328    TargetRoutineIsInstalled,
1329    #[error("the change includes an eBPF matcher with an invalid program ID")]
1330    InvalidEbpfProgramId,
1331}
1332
1333impl TryFrom<fnet_filter::CommitError> for ChangeCommitError {
1334    type Error = FidlConversionError;
1335
1336    fn try_from(error: fnet_filter::CommitError) -> Result<Self, Self::Error> {
1337        match error {
1338            fnet_filter::CommitError::NamespaceNotFound => Ok(Self::NamespaceNotFound),
1339            fnet_filter::CommitError::RoutineNotFound => Ok(Self::RoutineNotFound),
1340            fnet_filter::CommitError::RuleNotFound => Ok(Self::RuleNotFound),
1341            fnet_filter::CommitError::AlreadyExists => Ok(Self::AlreadyExists),
1342            fnet_filter::CommitError::TargetRoutineIsInstalled => {
1343                Ok(Self::TargetRoutineIsInstalled)
1344            }
1345            fnet_filter::CommitError::InvalidEbpfProgramId => Ok(Self::InvalidEbpfProgramId),
1346            fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1347                Err(FidlConversionError::NotAnError)
1348            }
1349            fnet_filter::CommitError::__SourceBreaking { unknown_ordinal: _ } => {
1350                Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR))
1351            }
1352        }
1353    }
1354}
1355
1356/// Errors for the NamespaceController.Commit method.
1357#[derive(Debug, Error)]
1358pub enum CommitError {
1359    #[error("failed to call FIDL method: {0}")]
1360    CallMethod(fidl::Error),
1361    #[error("rule has a matcher that is unavailable in its context: {0:?}")]
1362    RuleWithInvalidMatcher(RuleId),
1363    #[error("rule has an action that is invalid for its routine: {0:?}")]
1364    RuleWithInvalidAction(RuleId),
1365    #[error("rule has a TransparentProxy action but not a valid transport protocol matcher: {0:?}")]
1366    TransparentProxyWithInvalidMatcher(RuleId),
1367    #[error(
1368        "rule has a Redirect action that specifies a destination port but not a valid transport \
1369        protocol matcher: {0:?}"
1370    )]
1371    RedirectWithInvalidMatcher(RuleId),
1372    #[error(
1373        "rule has a Masquerade action that specifies a source port but not a valid transport \
1374        protocol matcher: {0:?}"
1375    )]
1376    MasqueradeWithInvalidMatcher(RuleId),
1377    #[error("rule has a Reject action but not a valid transport protocol matcher: {0:?}")]
1378    RejectWithInvalidMatcher(RuleId),
1379    #[error("routine forms a cycle {0:?}")]
1380    CyclicalRoutineGraph(RoutineId),
1381    #[error("invalid change was pushed: {0:?}")]
1382    ErrorOnChange(Vec<(Change, ChangeCommitError)>),
1383    #[error("unknown FIDL type: {0}")]
1384    FidlConversion(#[from] FidlConversionError),
1385}
1386
1387/// Extension type for [`fnet_filter::Change`].
1388#[derive(Debug, Clone, PartialEq)]
1389pub enum Change {
1390    Create(Resource),
1391    Remove(ResourceId),
1392}
1393
1394impl From<Change> for fnet_filter::Change {
1395    fn from(change: Change) -> Self {
1396        match change {
1397            Change::Create(resource) => Self::Create(resource.into()),
1398            Change::Remove(resource) => Self::Remove(resource.into()),
1399        }
1400    }
1401}
1402
1403impl TryFrom<fnet_filter::Change> for Change {
1404    type Error = FidlConversionError;
1405
1406    fn try_from(change: fnet_filter::Change) -> Result<Self, Self::Error> {
1407        match change {
1408            fnet_filter::Change::Create(resource) => Ok(Self::Create(resource.try_into()?)),
1409            fnet_filter::Change::Remove(resource) => Ok(Self::Remove(resource.try_into()?)),
1410            fnet_filter::Change::__SourceBreaking { .. } => {
1411                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
1412            }
1413        }
1414    }
1415}
1416
1417/// A controller for filtering state.
1418pub struct Controller {
1419    controller: fnet_filter::NamespaceControllerProxy,
1420    // The client provides an ID when creating a new controller, but the server
1421    // may need to assign a different ID to avoid conflicts; either way, the
1422    // server informs the client of the final `ControllerId` on creation.
1423    id: ControllerId,
1424    // Changes that have been pushed to the server but not yet committed. This
1425    // allows the `Controller` to report more informative errors by correlating
1426    // error codes with particular changes.
1427    pending_changes: Vec<Change>,
1428}
1429
1430impl Controller {
1431    pub async fn new_root(
1432        root: &fnet_root::FilterProxy,
1433        ControllerId(id): &ControllerId,
1434    ) -> Result<Self, ControllerCreationError> {
1435        let (controller, server_end) =
1436            root.domain().create_proxy::<fnet_filter::NamespaceControllerMarker>();
1437        root.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1438
1439        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1440            .take_event_stream()
1441            .next()
1442            .await
1443            .ok_or(ControllerCreationError::NoIdAssigned)?
1444            .map_err(ControllerCreationError::IdAssignment)?;
1445        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1446    }
1447
1448    /// Creates a new `Controller`.
1449    ///
1450    /// Note that the provided `ControllerId` may need to be modified server-
1451    /// side to avoid collisions; to obtain the final ID assigned to the
1452    /// `Controller`, use the `id` method.
1453    pub async fn new(
1454        control: &fnet_filter::ControlProxy,
1455        ControllerId(id): &ControllerId,
1456    ) -> Result<Self, ControllerCreationError> {
1457        let (controller, server_end) =
1458            control.domain().create_proxy::<fnet_filter::NamespaceControllerMarker>();
1459        control.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1460
1461        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1462            .take_event_stream()
1463            .next()
1464            .await
1465            .ok_or(ControllerCreationError::NoIdAssigned)?
1466            .map_err(ControllerCreationError::IdAssignment)?;
1467        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1468    }
1469
1470    pub fn id(&self) -> &ControllerId {
1471        &self.id
1472    }
1473
1474    pub async fn register_ebpf_program(
1475        &mut self,
1476        handle: febpf::ProgramHandle,
1477        program: febpf::VerifiedProgram,
1478    ) -> Result<(), RegisterEbpfProgramError> {
1479        self.controller
1480            .register_ebpf_program(handle, program)
1481            .await
1482            .map_err(RegisterEbpfProgramError::CallMethod)?
1483            .map_err(RegisterEbpfProgramError::from)
1484    }
1485
1486    pub async fn push_changes(&mut self, changes: Vec<Change>) -> Result<(), PushChangesError> {
1487        let fidl_changes = changes.iter().cloned().map(Into::into).collect::<Vec<_>>();
1488        let result = self
1489            .controller
1490            .push_changes(&fidl_changes)
1491            .await
1492            .map_err(PushChangesError::CallMethod)?;
1493        handle_change_validation_result(result, &changes)?;
1494        // Maintain a client-side copy of the pending changes we've pushed to
1495        // the server in order to provide better error messages if a commit
1496        // fails.
1497        self.pending_changes.extend(changes);
1498        Ok(())
1499    }
1500
1501    async fn commit_with_options(
1502        &mut self,
1503        options: fnet_filter::CommitOptions,
1504    ) -> Result<(), CommitError> {
1505        let committed_changes = std::mem::take(&mut self.pending_changes);
1506        let result = self.controller.commit(options).await.map_err(CommitError::CallMethod)?;
1507        handle_commit_result(result, committed_changes)
1508    }
1509
1510    pub async fn commit(&mut self) -> Result<(), CommitError> {
1511        self.commit_with_options(fnet_filter::CommitOptions::default()).await
1512    }
1513
1514    pub async fn commit_idempotent(&mut self) -> Result<(), CommitError> {
1515        self.commit_with_options(fnet_filter::CommitOptions {
1516            idempotent: Some(true),
1517            __source_breaking: SourceBreaking,
1518        })
1519        .await
1520    }
1521}
1522
1523pub(crate) fn handle_change_validation_result(
1524    change_validation_result: fnet_filter::ChangeValidationResult,
1525    changes: &Vec<Change>,
1526) -> Result<(), PushChangesError> {
1527    match change_validation_result {
1528        fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}) => Ok(()),
1529        fnet_filter::ChangeValidationResult::TooManyChanges(fnet_filter::Empty {}) => {
1530            Err(PushChangesError::TooManyChanges)
1531        }
1532        fnet_filter::ChangeValidationResult::ErrorOnChange(results) => {
1533            let errors: Result<_, PushChangesError> =
1534                changes.iter().zip(results).try_fold(Vec::new(), |mut errors, (change, result)| {
1535                    match result {
1536                        fnet_filter::ChangeValidationError::Ok
1537                        | fnet_filter::ChangeValidationError::NotReached => Ok(errors),
1538                        error @ (fnet_filter::ChangeValidationError::MissingRequiredField
1539                        | fnet_filter::ChangeValidationError::InvalidInterfaceMatcher
1540                        | fnet_filter::ChangeValidationError::InvalidAddressMatcher
1541                        | fnet_filter::ChangeValidationError::InvalidPortMatcher
1542                        | fnet_filter::ChangeValidationError::InvalidTransparentProxyAction
1543                        | fnet_filter::ChangeValidationError::InvalidNatAction
1544                        | fnet_filter::ChangeValidationError::InvalidPortRange) => {
1545                            let error = error
1546                                .try_into()
1547                                .expect("`Ok` and `NotReached` are handled in another arm");
1548                            errors.push((change.clone(), error));
1549                            Ok(errors)
1550                        }
1551                        fnet_filter::ChangeValidationError::__SourceBreaking { .. } => {
1552                            Err(FidlConversionError::UnknownUnionVariant(
1553                                type_names::CHANGE_VALIDATION_ERROR,
1554                            )
1555                            .into())
1556                        }
1557                    }
1558                });
1559            Err(PushChangesError::ErrorOnChange(errors?))
1560        }
1561        fnet_filter::ChangeValidationResult::__SourceBreaking { .. } => {
1562            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_RESULT)
1563                .into())
1564        }
1565    }
1566}
1567
1568pub(crate) fn handle_commit_result(
1569    commit_result: fnet_filter::CommitResult,
1570    committed_changes: Vec<Change>,
1571) -> Result<(), CommitError> {
1572    match commit_result {
1573        fnet_filter::CommitResult::Ok(fnet_filter::Empty {}) => Ok(()),
1574        fnet_filter::CommitResult::RuleWithInvalidMatcher(rule_id) => {
1575            Err(CommitError::RuleWithInvalidMatcher(rule_id.into()))
1576        }
1577        fnet_filter::CommitResult::RuleWithInvalidAction(rule_id) => {
1578            Err(CommitError::RuleWithInvalidAction(rule_id.into()))
1579        }
1580        fnet_filter::CommitResult::TransparentProxyWithInvalidMatcher(rule_id) => {
1581            Err(CommitError::TransparentProxyWithInvalidMatcher(rule_id.into()))
1582        }
1583        fnet_filter::CommitResult::RedirectWithInvalidMatcher(rule_id) => {
1584            Err(CommitError::RedirectWithInvalidMatcher(rule_id.into()))
1585        }
1586        fnet_filter::CommitResult::MasqueradeWithInvalidMatcher(rule_id) => {
1587            Err(CommitError::MasqueradeWithInvalidMatcher(rule_id.into()))
1588        }
1589        fnet_filter::CommitResult::RejectWithInvalidMatcher(rule_id) => {
1590            Err(CommitError::RejectWithInvalidMatcher(rule_id.into()))
1591        }
1592        fnet_filter::CommitResult::CyclicalRoutineGraph(routine_id) => {
1593            Err(CommitError::CyclicalRoutineGraph(routine_id.into()))
1594        }
1595        fnet_filter::CommitResult::ErrorOnChange(results) => {
1596            let errors: Result<_, CommitError> = committed_changes
1597                .into_iter()
1598                .zip(results)
1599                .try_fold(Vec::new(), |mut errors, (change, result)| match result {
1600                    fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1601                        Ok(errors)
1602                    }
1603                    error @ (fnet_filter::CommitError::NamespaceNotFound
1604                    | fnet_filter::CommitError::RoutineNotFound
1605                    | fnet_filter::CommitError::RuleNotFound
1606                    | fnet_filter::CommitError::AlreadyExists
1607                    | fnet_filter::CommitError::TargetRoutineIsInstalled
1608                    | fnet_filter::CommitError::InvalidEbpfProgramId) => {
1609                        let error = error
1610                            .try_into()
1611                            .expect("`Ok` and `NotReached` are handled in another arm");
1612                        errors.push((change, error));
1613                        Ok(errors)
1614                    }
1615                    fnet_filter::CommitError::__SourceBreaking { .. } => {
1616                        Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR)
1617                            .into())
1618                    }
1619                });
1620            Err(CommitError::ErrorOnChange(errors?))
1621        }
1622        fnet_filter::CommitResult::__SourceBreaking { .. } => {
1623            Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_RESULT).into())
1624        }
1625    }
1626}
1627
1628#[cfg(test)]
1629mod tests {
1630
1631    use assert_matches::assert_matches;
1632    use flex_fuchsia_net_matchers as fnet_matchers;
1633    use futures::channel::mpsc;
1634    use futures::{FutureExt as _, SinkExt as _};
1635    use test_case::test_case;
1636
1637    use flex_fuchsia_hardware_network as fhardware_network;
1638    use flex_fuchsia_net_interfaces as fnet_interfaces;
1639
1640    use super::*;
1641
1642    #[test_case(
1643        fnet_filter::ResourceId::Namespace(String::from("namespace")),
1644        ResourceId::Namespace(NamespaceId(String::from("namespace")));
1645        "NamespaceId"
1646    )]
1647    #[test_case(fnet_filter::Domain::Ipv4, Domain::Ipv4; "Domain")]
1648    #[test_case(
1649        fnet_filter::Namespace {
1650            id: Some(String::from("namespace")),
1651            domain: Some(fnet_filter::Domain::Ipv4),
1652            ..Default::default()
1653        },
1654        Namespace { id: NamespaceId(String::from("namespace")), domain: Domain::Ipv4 };
1655        "Namespace"
1656    )]
1657    #[test_case(fnet_filter::IpInstallationHook::Egress, IpHook::Egress; "IpHook")]
1658    #[test_case(fnet_filter::NatInstallationHook::Egress, NatHook::Egress; "NatHook")]
1659    #[test_case(
1660        fnet_filter::InstalledIpRoutine {
1661            hook: Some(fnet_filter::IpInstallationHook::Egress),
1662            priority: Some(1),
1663            ..Default::default()
1664        },
1665        InstalledIpRoutine {
1666            hook: IpHook::Egress,
1667            priority: 1,
1668        };
1669        "InstalledIpRoutine"
1670    )]
1671    #[test_case(
1672        fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
1673            installation: Some(fnet_filter::InstalledIpRoutine {
1674                hook: Some(fnet_filter::IpInstallationHook::LocalEgress),
1675                priority: Some(1),
1676                ..Default::default()
1677            }),
1678            ..Default::default()
1679        }),
1680        RoutineType::Ip(Some(InstalledIpRoutine { hook: IpHook::LocalEgress, priority: 1 }));
1681        "RoutineType"
1682    )]
1683    #[test_case(
1684        fnet_filter::Routine {
1685            id: Some(fnet_filter::RoutineId {
1686                namespace: String::from("namespace"),
1687                name: String::from("routine"),
1688            }),
1689            type_: Some(fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine::default())),
1690            ..Default::default()
1691        },
1692        Routine {
1693            id: RoutineId {
1694                namespace: NamespaceId(String::from("namespace")),
1695                name: String::from("routine"),
1696            },
1697            routine_type: RoutineType::Nat(None),
1698        };
1699        "Routine"
1700    )]
1701    #[test_case(
1702        fnet_filter::Matchers {
1703            in_interface: Some(fnet_matchers::Interface::Name(String::from("wlan"))),
1704            transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Tcp(fnet_matchers::TcpPacket {
1705                src_port: None,
1706                dst_port: Some(fnet_matchers::Port { start: 22, end: 22, invert: false }),
1707                ..Default::default()
1708            })),
1709            ..Default::default()
1710        },
1711        Matchers {
1712            in_interface: Some(fnet_matchers_ext::Interface::Name(String::from("wlan"))),
1713            transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Tcp {
1714                src_port: None,
1715                dst_port: Some(fnet_matchers_ext::Port::new(22, 22, false).unwrap()),
1716            }),
1717            ..Default::default()
1718        };
1719        "Matchers"
1720    )]
1721    #[test_case(
1722        fnet_filter::Action::Accept(fnet_filter::Empty {}),
1723        Action::Accept;
1724        "Action"
1725    )]
1726    #[test_case(
1727        fnet_filter::Rule {
1728            id: fnet_filter::RuleId {
1729                routine: fnet_filter::RoutineId {
1730                    namespace: String::from("namespace"),
1731                    name: String::from("routine"),
1732                },
1733                index: 1,
1734            },
1735            matchers: fnet_filter::Matchers {
1736                transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Icmp(
1737                    fnet_matchers::IcmpPacket::default()
1738                )),
1739                ..Default::default()
1740            },
1741            action: fnet_filter::Action::Drop(fnet_filter::Empty {}),
1742        },
1743        Rule {
1744            id: RuleId {
1745                routine: RoutineId {
1746                    namespace: NamespaceId(String::from("namespace")),
1747                    name: String::from("routine"),
1748                },
1749                index: 1,
1750            },
1751            matchers: Matchers {
1752                transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Icmp),
1753                ..Default::default()
1754            },
1755            action: Action::Drop,
1756        };
1757        "Rule"
1758    )]
1759    #[test_case(
1760        fnet_filter::Resource::Namespace(fnet_filter::Namespace {
1761            id: Some(String::from("namespace")),
1762            domain: Some(fnet_filter::Domain::Ipv4),
1763            ..Default::default()
1764        }),
1765        Resource::Namespace(Namespace {
1766            id: NamespaceId(String::from("namespace")),
1767            domain: Domain::Ipv4
1768        });
1769        "Resource"
1770    )]
1771    #[test_case(
1772        fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}),
1773        Event::EndOfUpdate;
1774        "Event"
1775    )]
1776    #[test_case(
1777        fnet_filter::Change::Remove(fnet_filter::ResourceId::Namespace(String::from("namespace"))),
1778        Change::Remove(ResourceId::Namespace(NamespaceId(String::from("namespace"))));
1779        "Change"
1780    )]
1781    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
1782    where
1783        E: TryFrom<F> + Clone + Debug + PartialEq,
1784        <E as TryFrom<F>>::Error: Debug + PartialEq,
1785        F: From<E> + Clone + Debug + PartialEq,
1786    {
1787        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
1788        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type.clone());
1789    }
1790
1791    #[test]
1792    fn resource_id_try_from_unknown_variant() {
1793        assert_eq!(
1794            ResourceId::try_from(fnet_filter::ResourceId::__SourceBreaking { unknown_ordinal: 0 }),
1795            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
1796        );
1797    }
1798
1799    #[test]
1800    fn domain_try_from_unknown_variant() {
1801        assert_eq!(
1802            Domain::try_from(fnet_filter::Domain::__SourceBreaking { unknown_ordinal: 0 }),
1803            Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
1804        );
1805    }
1806
1807    #[test]
1808    fn namespace_try_from_missing_properties() {
1809        assert_eq!(
1810            Namespace::try_from(fnet_filter::Namespace {
1811                id: None,
1812                domain: Some(fnet_filter::Domain::Ipv4),
1813                ..Default::default()
1814            }),
1815            Err(FidlConversionError::MissingNamespaceId)
1816        );
1817        assert_eq!(
1818            Namespace::try_from(fnet_filter::Namespace {
1819                id: Some(String::from("namespace")),
1820                domain: None,
1821                ..Default::default()
1822            }),
1823            Err(FidlConversionError::MissingNamespaceDomain)
1824        );
1825    }
1826
1827    #[test]
1828    fn ip_installation_hook_try_from_unknown_variant() {
1829        assert_eq!(
1830            IpHook::try_from(fnet_filter::IpInstallationHook::__SourceBreaking {
1831                unknown_ordinal: 0
1832            }),
1833            Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
1834        );
1835    }
1836
1837    #[test]
1838    fn nat_installation_hook_try_from_unknown_variant() {
1839        assert_eq!(
1840            NatHook::try_from(fnet_filter::NatInstallationHook::__SourceBreaking {
1841                unknown_ordinal: 0
1842            }),
1843            Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
1844        );
1845    }
1846
1847    #[test]
1848    fn installed_ip_routine_try_from_missing_hook() {
1849        assert_eq!(
1850            InstalledIpRoutine::try_from(fnet_filter::InstalledIpRoutine {
1851                hook: None,
1852                ..Default::default()
1853            }),
1854            Err(FidlConversionError::MissingIpInstallationHook)
1855        );
1856    }
1857
1858    #[test]
1859    fn installed_nat_routine_try_from_missing_hook() {
1860        assert_eq!(
1861            InstalledNatRoutine::try_from(fnet_filter::InstalledNatRoutine {
1862                hook: None,
1863                ..Default::default()
1864            }),
1865            Err(FidlConversionError::MissingNatInstallationHook)
1866        );
1867    }
1868
1869    #[test]
1870    fn routine_type_try_from_unknown_variant() {
1871        assert_eq!(
1872            RoutineType::try_from(fnet_filter::RoutineType::__SourceBreaking {
1873                unknown_ordinal: 0
1874            }),
1875            Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
1876        );
1877    }
1878
1879    #[test]
1880    fn routine_try_from_missing_properties() {
1881        assert_eq!(
1882            Routine::try_from(fnet_filter::Routine { id: None, ..Default::default() }),
1883            Err(FidlConversionError::MissingRoutineId)
1884        );
1885        assert_eq!(
1886            Routine::try_from(fnet_filter::Routine {
1887                id: Some(fnet_filter::RoutineId {
1888                    namespace: String::from("namespace"),
1889                    name: String::from("routine"),
1890                }),
1891                type_: None,
1892                ..Default::default()
1893            }),
1894            Err(FidlConversionError::MissingRoutineType)
1895        );
1896    }
1897
1898    #[test_case(
1899        fnet_matchers_ext::PortError::InvalidPortRange =>
1900        FidlConversionError::InvalidPortMatcherRange
1901    )]
1902    #[test_case(
1903        fnet_matchers_ext::InterfaceError::ZeroId =>
1904        FidlConversionError::ZeroInterfaceId
1905    )]
1906    #[test_case(
1907        fnet_matchers_ext::InterfaceError::UnknownUnionVariant =>
1908        FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
1909    )]
1910    #[test_case(
1911        {
1912            let invalid_port_class = fnet_interfaces::PortClass::__SourceBreaking {
1913                unknown_ordinal: 0
1914            };
1915            let error = fnet_interfaces_ext::PortClass::try_from(
1916                invalid_port_class
1917            ).unwrap_err();
1918            fnet_matchers_ext::InterfaceError::UnknownPortClass(error)
1919        } =>
1920        FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS);
1921        "UnknownPortClass=>UnknownUnionVariant"
1922    )]
1923    #[test_case(
1924        {
1925            let invalid_port_class = fhardware_network::PortClass::__SourceBreaking {
1926                unknown_ordinal: 0
1927            };
1928            let error = fnet_interfaces_ext::PortClass::try_from(
1929                invalid_port_class
1930            ).unwrap_err();
1931            fnet_matchers_ext::InterfaceError::UnknownPortClass(
1932                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(error))
1933        } =>
1934        FidlConversionError::UnknownUnionVariant(type_names::HARDWARE_NETWORK_PORT_CLASS);
1935        "UnknownPortClass(HardwareNetwork)=>UnknownUnionVariant"
1936    )]
1937    #[test_case(
1938        fnet_matchers_ext::SubnetError::PrefixTooLong =>
1939        FidlConversionError::SubnetPrefixTooLong
1940    )]
1941    #[test_case(
1942        fnet_matchers_ext::SubnetError::HostBitsSet =>
1943        FidlConversionError::SubnetHostBitsSet
1944    )]
1945    #[test_case(
1946        fnet_matchers_ext::AddressRangeError::Invalid =>
1947        FidlConversionError::InvalidAddressRange
1948    )]
1949    #[test_case(
1950        fnet_matchers_ext::AddressRangeError::FamilyMismatch =>
1951        FidlConversionError::AddressRangeFamilyMismatch
1952    )]
1953    #[test_case(
1954        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1955            fnet_matchers_ext::SubnetError::PrefixTooLong) =>
1956        FidlConversionError::SubnetPrefixTooLong
1957    )]
1958    #[test_case(
1959        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1960            fnet_matchers_ext::SubnetError::HostBitsSet) =>
1961        FidlConversionError::SubnetHostBitsSet
1962    )]
1963    #[test_case(
1964        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1965            fnet_matchers_ext::AddressRangeError::Invalid) =>
1966        FidlConversionError::InvalidAddressRange
1967    )]
1968    #[test_case(
1969        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1970            fnet_matchers_ext::AddressRangeError::FamilyMismatch) =>
1971        FidlConversionError::AddressRangeFamilyMismatch
1972    )]
1973    #[test_case(
1974        fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant =>
1975        FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1976    )]
1977    #[test_case(
1978        fnet_matchers_ext::AddressError::AddressMatcherType(
1979            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1980                fnet_matchers_ext::SubnetError::PrefixTooLong)) =>
1981        FidlConversionError::SubnetPrefixTooLong
1982    )]
1983    #[test_case(
1984        fnet_matchers_ext::AddressError::AddressMatcherType(
1985            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1986                fnet_matchers_ext::SubnetError::HostBitsSet)) =>
1987        FidlConversionError::SubnetHostBitsSet
1988    )]
1989    #[test_case(
1990        fnet_matchers_ext::AddressError::AddressMatcherType(
1991            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1992                fnet_matchers_ext::AddressRangeError::Invalid)) =>
1993        FidlConversionError::InvalidAddressRange
1994    )]
1995    #[test_case(
1996        fnet_matchers_ext::AddressError::AddressMatcherType(
1997            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1998                fnet_matchers_ext::AddressRangeError::FamilyMismatch)) =>
1999        FidlConversionError::AddressRangeFamilyMismatch
2000    )]
2001    #[test_case(
2002        fnet_matchers_ext::AddressError::AddressMatcherType(
2003            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant) =>
2004            FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
2005    )]
2006    #[test_case(
2007        fnet_matchers_ext::TransportProtocolError::Port(
2008            fnet_matchers_ext::PortError::InvalidPortRange) =>
2009        FidlConversionError::InvalidPortMatcherRange
2010    )]
2011    #[test_case(
2012        fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant =>
2013            FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
2014    )]
2015    fn fidl_error_from_matcher_error<E: Into<FidlConversionError>>(
2016        error: E,
2017    ) -> FidlConversionError {
2018        error.into()
2019    }
2020
2021    #[test]
2022    fn action_try_from_unknown_variant() {
2023        assert_eq!(
2024            Action::try_from(fnet_filter::Action::__SourceBreaking { unknown_ordinal: 0 }),
2025            Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
2026        );
2027    }
2028
2029    #[test]
2030    fn resource_try_from_unknown_variant() {
2031        assert_eq!(
2032            Resource::try_from(fnet_filter::Resource::__SourceBreaking { unknown_ordinal: 0 }),
2033            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
2034        );
2035    }
2036
2037    #[test]
2038    fn event_try_from_unknown_variant() {
2039        assert_eq!(
2040            Event::try_from(fnet_filter::Event::__SourceBreaking { unknown_ordinal: 0 }),
2041            Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
2042        );
2043    }
2044
2045    #[test]
2046    fn change_try_from_unknown_variant() {
2047        assert_eq!(
2048            Change::try_from(fnet_filter::Change::__SourceBreaking { unknown_ordinal: 0 }),
2049            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
2050        );
2051    }
2052
2053    fn test_controller_a() -> ControllerId {
2054        ControllerId(String::from("test-controller-a"))
2055    }
2056
2057    fn test_controller_b() -> ControllerId {
2058        ControllerId(String::from("test-controller-b"))
2059    }
2060
2061    pub(crate) fn test_resource_id() -> ResourceId {
2062        ResourceId::Namespace(NamespaceId(String::from("test-namespace")))
2063    }
2064
2065    pub(crate) fn test_resource() -> Resource {
2066        Resource::Namespace(Namespace {
2067            id: NamespaceId(String::from("test-namespace")),
2068            domain: Domain::AllIp,
2069        })
2070    }
2071
2072    // We can't easily create an invalid resource, so we just pretend and fake
2073    // the server response in tests.
2074    pub(crate) fn pretend_invalid_resource() -> Resource {
2075        Resource::Namespace(Namespace {
2076            id: NamespaceId(String::from("pretend-invalid-namespace")),
2077            domain: Domain::AllIp,
2078        })
2079    }
2080
2081    pub(crate) fn unknown_resource_id() -> ResourceId {
2082        ResourceId::Namespace(NamespaceId(String::from("does-not-exist")))
2083    }
2084
2085    #[fuchsia_async::run_singlethreaded(test)]
2086    async fn event_stream_from_state_conversion_error() {
2087        let client = flex_local::local_client_empty();
2088        let (proxy, mut request_stream) =
2089            client.create_proxy_and_stream::<fnet_filter::StateMarker>();
2090        let stream = event_stream_from_state(proxy).expect("get event stream");
2091        futures::pin_mut!(stream);
2092
2093        let send_invalid_event = async {
2094            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2095                request_stream
2096                    .next()
2097                    .await
2098                    .expect("client should call state")
2099                    .expect("request should not error");
2100            let fnet_filter::WatcherRequest::Watch { responder } = request
2101                .into_stream()
2102                .next()
2103                .await
2104                .expect("client should call watch")
2105                .expect("request should not error");
2106            responder
2107                .send(&[fnet_filter::Event::Added(fnet_filter::AddedResource {
2108                    controller: String::from("controller"),
2109                    resource: fnet_filter::Resource::Namespace(fnet_filter::Namespace {
2110                        id: None,
2111                        domain: None,
2112                        ..Default::default()
2113                    }),
2114                })])
2115                .expect("send batch with invalid event");
2116        };
2117        let ((), result) = futures::future::join(send_invalid_event, stream.next()).await;
2118        assert_matches!(
2119            result,
2120            Some(Err(WatchError::Conversion(FidlConversionError::MissingNamespaceId)))
2121        );
2122    }
2123
2124    #[fuchsia_async::run_singlethreaded(test)]
2125    async fn event_stream_from_state_empty_event_batch() {
2126        let client = flex_local::local_client_empty();
2127        let (proxy, mut request_stream) =
2128            client.create_proxy_and_stream::<fnet_filter::StateMarker>();
2129        let stream = event_stream_from_state(proxy).expect("get event stream");
2130        futures::pin_mut!(stream);
2131
2132        let send_empty_batch = async {
2133            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2134                request_stream
2135                    .next()
2136                    .await
2137                    .expect("client should call state")
2138                    .expect("request should not error");
2139            let fnet_filter::WatcherRequest::Watch { responder } = request
2140                .into_stream()
2141                .next()
2142                .await
2143                .expect("client should call watch")
2144                .expect("request should not error");
2145            responder.send(&[]).expect("send empty batch");
2146        };
2147        let ((), result) = futures::future::join(send_empty_batch, stream.next()).await;
2148        assert_matches!(result, Some(Err(WatchError::EmptyEventBatch)));
2149    }
2150
2151    #[fuchsia_async::run_singlethreaded(test)]
2152    async fn get_existing_resources_success() {
2153        let event_stream = futures::stream::iter([
2154            Ok(Event::Existing(test_controller_a(), test_resource())),
2155            Ok(Event::Existing(test_controller_b(), test_resource())),
2156            Ok(Event::Idle),
2157            Ok(Event::Removed(test_controller_a(), test_resource_id())),
2158        ]);
2159        futures::pin_mut!(event_stream);
2160
2161        let existing = get_existing_resources::<HashMap<_, _>>(event_stream.by_ref())
2162            .await
2163            .expect("get existing resources");
2164        assert_eq!(
2165            existing,
2166            HashMap::from([
2167                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2168                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2169            ])
2170        );
2171
2172        let trailing_events = event_stream.collect::<Vec<_>>().await;
2173        assert_matches!(
2174            &trailing_events[..],
2175            [Ok(Event::Removed(controller, resource))] if controller == &test_controller_a() &&
2176                                                           resource == &test_resource_id()
2177        );
2178    }
2179
2180    #[fuchsia_async::run_singlethreaded(test)]
2181    async fn get_existing_resources_error_in_stream() {
2182        let event_stream =
2183            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2184        futures::pin_mut!(event_stream);
2185        assert_matches!(
2186            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2187            Err(GetExistingResourcesError::ErrorInStream(WatchError::EmptyEventBatch))
2188        )
2189    }
2190
2191    #[fuchsia_async::run_singlethreaded(test)]
2192    async fn get_existing_resources_unexpected_event() {
2193        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::EndOfUpdate)));
2194        futures::pin_mut!(event_stream);
2195        assert_matches!(
2196            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2197            Err(GetExistingResourcesError::UnexpectedEvent(Event::EndOfUpdate))
2198        )
2199    }
2200
2201    #[fuchsia_async::run_singlethreaded(test)]
2202    async fn get_existing_resources_duplicate_resource() {
2203        let event_stream = futures::stream::iter([
2204            Ok(Event::Existing(test_controller_a(), test_resource())),
2205            Ok(Event::Existing(test_controller_a(), test_resource())),
2206        ]);
2207        futures::pin_mut!(event_stream);
2208        assert_matches!(
2209            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2210            Err(GetExistingResourcesError::DuplicateResource(resource))
2211                if resource == test_resource()
2212        )
2213    }
2214
2215    #[fuchsia_async::run_singlethreaded(test)]
2216    async fn get_existing_resources_stream_ended() {
2217        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::Existing(
2218            test_controller_a(),
2219            test_resource(),
2220        ))));
2221        futures::pin_mut!(event_stream);
2222        assert_matches!(
2223            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2224            Err(GetExistingResourcesError::StreamEnded)
2225        )
2226    }
2227
2228    #[fuchsia_async::run_singlethreaded(test)]
2229    async fn wait_for_condition_add_remove() {
2230        let mut state = HashMap::new();
2231
2232        // Verify that checking for the presence of a resource blocks until the
2233        // resource is added.
2234        let has_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2235            resources.get(&test_controller_a()).map_or(false, |controller| {
2236                controller
2237                    .get(&test_resource_id())
2238                    .map_or(false, |resource| resource == &test_resource())
2239            })
2240        };
2241        assert_matches!(
2242            wait_for_condition(futures::stream::pending(), &mut state, has_resource).now_or_never(),
2243            None
2244        );
2245        assert!(state.is_empty());
2246        assert_matches!(
2247            wait_for_condition(
2248                futures::stream::iter([
2249                    Ok(Event::Added(test_controller_b(), test_resource())),
2250                    Ok(Event::EndOfUpdate),
2251                    Ok(Event::Added(test_controller_a(), test_resource())),
2252                    Ok(Event::EndOfUpdate),
2253                ]),
2254                &mut state,
2255                has_resource
2256            )
2257            .now_or_never(),
2258            Some(Ok(()))
2259        );
2260        assert_eq!(
2261            state,
2262            HashMap::from([
2263                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2264                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2265            ])
2266        );
2267
2268        // Re-add the resource and observe an error.
2269        assert_matches!(
2270            wait_for_condition(
2271                futures::stream::iter([
2272                    Ok(Event::Added(test_controller_a(), test_resource())),
2273                    Ok(Event::EndOfUpdate),
2274                ]),
2275                &mut state,
2276                has_resource
2277            )
2278            .now_or_never(),
2279            Some(Err(WaitForConditionError::AddedAlreadyExisting(r))) if r == test_resource()
2280        );
2281        assert_eq!(
2282            state,
2283            HashMap::from([
2284                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2285                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2286            ])
2287        );
2288
2289        // Verify that checking for the absence of a resource blocks until the
2290        // resource is removed.
2291        let does_not_have_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2292            resources.get(&test_controller_a()).map_or(false, |controller| controller.is_empty())
2293        };
2294        assert_matches!(
2295            wait_for_condition(futures::stream::pending(), &mut state, does_not_have_resource)
2296                .now_or_never(),
2297            None
2298        );
2299        assert_eq!(
2300            state,
2301            HashMap::from([
2302                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2303                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2304            ])
2305        );
2306        assert_matches!(
2307            wait_for_condition(
2308                futures::stream::iter([
2309                    Ok(Event::Removed(test_controller_b(), test_resource_id())),
2310                    Ok(Event::EndOfUpdate),
2311                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2312                    Ok(Event::EndOfUpdate),
2313                ]),
2314                &mut state,
2315                does_not_have_resource
2316            )
2317            .now_or_never(),
2318            Some(Ok(()))
2319        );
2320        assert_eq!(
2321            state,
2322            HashMap::from([
2323                (test_controller_a(), HashMap::new()),
2324                (test_controller_b(), HashMap::new()),
2325            ])
2326        );
2327
2328        // Remove a non-existent resource and observe an error.
2329        assert_matches!(
2330            wait_for_condition(
2331                futures::stream::iter([
2332                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2333                    Ok(Event::EndOfUpdate),
2334                ]),
2335                &mut state,
2336                does_not_have_resource
2337            ).now_or_never(),
2338            Some(Err(WaitForConditionError::RemovedNonExistent(r))) if r == test_resource_id()
2339        );
2340        assert_eq!(
2341            state,
2342            HashMap::from([
2343                (test_controller_a(), HashMap::new()),
2344                (test_controller_b(), HashMap::new()),
2345            ])
2346        );
2347    }
2348
2349    #[test]
2350    fn predicate_not_tested_until_update_complete() {
2351        let mut state = HashMap::new();
2352        let (mut tx, rx) = mpsc::unbounded();
2353
2354        let wait = wait_for_condition(rx, &mut state, |state| !state.is_empty()).fuse();
2355        futures::pin_mut!(wait);
2356
2357        // Sending an `Added` event should *not* allow the wait operation to
2358        // complete, because the predicate should only be tested once the full
2359        // update has been observed.
2360        let mut exec = fuchsia_async::TestExecutor::new();
2361        exec.run_singlethreaded(async {
2362            tx.send(Ok(Event::Added(test_controller_a(), test_resource())))
2363                .await
2364                .expect("receiver should not be closed");
2365            assert_matches!((&mut wait).now_or_never(), None);
2366        });
2367
2368        exec.run_singlethreaded(async {
2369            tx.send(Ok(Event::EndOfUpdate)).await.expect("receiver should not be closed");
2370            wait.await.expect("condition should be satisfied once update is complete");
2371        });
2372    }
2373
2374    #[fuchsia_async::run_singlethreaded(test)]
2375    async fn wait_for_condition_error_in_stream() {
2376        let mut state = HashMap::new();
2377        let event_stream =
2378            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2379        assert_matches!(
2380            wait_for_condition(event_stream, &mut state, |_| true).await,
2381            Err(WaitForConditionError::ErrorInStream(WatchError::EmptyEventBatch))
2382        );
2383        assert!(state.is_empty());
2384    }
2385
2386    #[fuchsia_async::run_singlethreaded(test)]
2387    async fn wait_for_condition_stream_ended() {
2388        let mut state = HashMap::new();
2389        let event_stream = futures::stream::empty();
2390        assert_matches!(
2391            wait_for_condition(event_stream, &mut state, |_| true).await,
2392            Err(WaitForConditionError::StreamEnded)
2393        );
2394        assert!(state.is_empty());
2395    }
2396
2397    pub(crate) async fn handle_open_controller(
2398        mut request_stream: fnet_filter::ControlRequestStream,
2399    ) -> fnet_filter::NamespaceControllerRequestStream {
2400        let (id, request, _control_handle) = request_stream
2401            .next()
2402            .await
2403            .expect("client should open controller")
2404            .expect("request should not error")
2405            .into_open_controller()
2406            .expect("client should open controller");
2407        let (stream, control_handle) = request.into_stream_and_control_handle();
2408        control_handle.send_on_id_assigned(&id).expect("send assigned ID");
2409
2410        stream
2411    }
2412
2413    pub(crate) async fn handle_push_changes(
2414        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2415        push_changes_result: fnet_filter::ChangeValidationResult,
2416    ) {
2417        let (_changes, responder) = stream
2418            .next()
2419            .await
2420            .expect("client should push changes")
2421            .expect("request should not error")
2422            .into_push_changes()
2423            .expect("client should push changes");
2424        responder.send(push_changes_result).expect("send empty batch");
2425    }
2426
2427    pub(crate) async fn handle_commit(
2428        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2429        commit_result: fnet_filter::CommitResult,
2430    ) {
2431        let (_options, responder) = stream
2432            .next()
2433            .await
2434            .expect("client should commit")
2435            .expect("request should not error")
2436            .into_commit()
2437            .expect("client should commit");
2438        responder.send(commit_result).expect("send commit result");
2439    }
2440
2441    #[fuchsia_async::run_singlethreaded(test)]
2442    async fn controller_push_changes_reports_invalid_change() {
2443        let client = flex_local::local_client_empty();
2444        let (control, request_stream) =
2445            client.create_proxy_and_stream::<fnet_filter::ControlMarker>();
2446        let push_invalid_change = async {
2447            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2448                .await
2449                .expect("create controller");
2450            let result = controller
2451                .push_changes(vec![
2452                    Change::Create(test_resource()),
2453                    // We fake the server response to say this is invalid even
2454                    // though it really isn't.
2455                    Change::Create(pretend_invalid_resource()),
2456                    Change::Remove(test_resource_id()),
2457                ])
2458                .await;
2459            assert_matches!(
2460                result,
2461                Err(PushChangesError::ErrorOnChange(errors)) if errors == vec![(
2462                    Change::Create(pretend_invalid_resource()),
2463                    ChangeValidationError::InvalidPortMatcher
2464                )]
2465            );
2466        };
2467
2468        let handle_controller = async {
2469            let mut stream = handle_open_controller(request_stream).await;
2470            handle_push_changes(
2471                &mut stream,
2472                fnet_filter::ChangeValidationResult::ErrorOnChange(vec![
2473                    fnet_filter::ChangeValidationError::Ok,
2474                    fnet_filter::ChangeValidationError::InvalidPortMatcher,
2475                    fnet_filter::ChangeValidationError::NotReached,
2476                ]),
2477            )
2478            .await;
2479        };
2480
2481        let ((), ()) = futures::future::join(push_invalid_change, handle_controller).await;
2482    }
2483
2484    #[fuchsia_async::run_singlethreaded(test)]
2485    async fn controller_commit_reports_invalid_change() {
2486        let client = flex_local::local_client_empty();
2487        let (control, request_stream) =
2488            client.create_proxy_and_stream::<fnet_filter::ControlMarker>();
2489        let commit_invalid_change = async {
2490            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2491                .await
2492                .expect("create controller");
2493            controller
2494                .push_changes(vec![
2495                    Change::Create(test_resource()),
2496                    Change::Remove(unknown_resource_id()),
2497                    Change::Remove(test_resource_id()),
2498                ])
2499                .await
2500                .expect("push changes");
2501            let result = controller.commit().await;
2502            assert_matches!(
2503                result,
2504                Err(CommitError::ErrorOnChange(errors)) if errors == vec![(
2505                    Change::Remove(unknown_resource_id()),
2506                    ChangeCommitError::NamespaceNotFound,
2507                )]
2508            );
2509        };
2510        let handle_controller = async {
2511            let mut stream = handle_open_controller(request_stream).await;
2512            handle_push_changes(
2513                &mut stream,
2514                fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
2515            )
2516            .await;
2517            handle_commit(
2518                &mut stream,
2519                fnet_filter::CommitResult::ErrorOnChange(vec![
2520                    fnet_filter::CommitError::Ok,
2521                    fnet_filter::CommitError::NamespaceNotFound,
2522                    fnet_filter::CommitError::Ok,
2523                ]),
2524            )
2525            .await;
2526        };
2527        let ((), ()) = futures::future::join(commit_invalid_change, handle_controller).await;
2528    }
2529}