Skip to main content

fidl_fuchsia_net_filter_ext/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.net.filter FIDL library.
6//!
7//! Note that this library as written is not meant for inclusion in the SDK. It
8//! is only meant to be used in conjunction with a netstack that is compiled
9//! against the same API level of the `fuchsia.net.filter` FIDL library. This
10//! library opts in to compile-time and runtime breakage when the FIDL library
11//! is evolved in order to enforce that it is updated along with the FIDL
12//! library itself.
13
14#[cfg(target_os = "fuchsia")]
15pub mod sync;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::num::NonZeroU16;
20use std::ops::RangeInclusive;
21
22use async_utils::fold::FoldWhile;
23use fidl::marker::SourceBreaking;
24use futures::{Stream, StreamExt as _, TryStreamExt as _};
25use thiserror::Error;
26use {
27    fidl_fuchsia_ebpf as febpf, fidl_fuchsia_net as fnet, fidl_fuchsia_net_filter as fnet_filter,
28    fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext,
29    fidl_fuchsia_net_matchers_ext as fnet_matchers_ext, fidl_fuchsia_net_root as fnet_root,
30};
31
32/// Conversion errors from `fnet_filter` FIDL types to the
33/// equivalents defined in this module.
34#[derive(Debug, Error, PartialEq)]
35pub enum FidlConversionError {
36    #[error("union is of an unknown variant: {0}")]
37    UnknownUnionVariant(&'static str),
38    #[error("namespace ID not provided")]
39    MissingNamespaceId,
40    #[error("namespace domain not provided")]
41    MissingNamespaceDomain,
42    #[error("routine ID not provided")]
43    MissingRoutineId,
44    #[error("routine type not provided")]
45    MissingRoutineType,
46    #[error("IP installation hook not provided")]
47    MissingIpInstallationHook,
48    #[error("NAT installation hook not provided")]
49    MissingNatInstallationHook,
50    #[error("interface matcher specified an invalid ID of 0")]
51    ZeroInterfaceId,
52    #[error("invalid address range (start must be <= end)")]
53    InvalidAddressRange,
54    #[error("address range start and end addresses are not the same IP family")]
55    AddressRangeFamilyMismatch,
56    #[error("prefix length of subnet is longer than number of bits in IP address")]
57    SubnetPrefixTooLong,
58    #[error("host bits are set in subnet network")]
59    SubnetHostBitsSet,
60    #[error("invalid port matcher range (start must be <= end)")]
61    InvalidPortMatcherRange,
62    #[error("transparent proxy action specified an invalid local port of 0")]
63    UnspecifiedTransparentProxyPort,
64    #[error("NAT action specified an invalid rewrite port of 0")]
65    UnspecifiedNatPort,
66    #[error("invalid port range (start must be <= end)")]
67    InvalidPortRange,
68    #[error("non-error result variant could not be converted to an error")]
69    NotAnError,
70}
71
72impl From<fnet_matchers_ext::PortError> for FidlConversionError {
73    fn from(value: fnet_matchers_ext::PortError) -> Self {
74        match value {
75            fnet_matchers_ext::PortError::InvalidPortRange => {
76                FidlConversionError::InvalidPortMatcherRange
77            }
78        }
79    }
80}
81
82impl From<fnet_matchers_ext::InterfaceError> for FidlConversionError {
83    fn from(value: fnet_matchers_ext::InterfaceError) -> Self {
84        match value {
85            fnet_matchers_ext::InterfaceError::ZeroId => FidlConversionError::ZeroInterfaceId,
86            fnet_matchers_ext::InterfaceError::UnknownUnionVariant => {
87                FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
88            }
89            fidl_fuchsia_net_matchers_ext::InterfaceError::UnknownPortClass(
90                unknown_port_class_error,
91            ) => match unknown_port_class_error {
92                fnet_interfaces_ext::UnknownPortClassError::NetInterfaces(_) => {
93                    FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS)
94                }
95                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(_) => {
96                    FidlConversionError::UnknownUnionVariant(
97                        type_names::HARDWARE_NETWORK_PORT_CLASS,
98                    )
99                }
100            },
101        }
102    }
103}
104
105impl From<fnet_matchers_ext::AddressError> for FidlConversionError {
106    fn from(value: fnet_matchers_ext::AddressError) -> Self {
107        match value {
108            fnet_matchers_ext::AddressError::AddressMatcherType(address_matcher_type_error) => {
109                address_matcher_type_error.into()
110            }
111        }
112    }
113}
114
115impl From<fnet_matchers_ext::AddressMatcherTypeError> for FidlConversionError {
116    fn from(value: fnet_matchers_ext::AddressMatcherTypeError) -> Self {
117        match value {
118            fnet_matchers_ext::AddressMatcherTypeError::Subnet(subnet_error) => subnet_error.into(),
119            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(address_range_error) => {
120                address_range_error.into()
121            }
122            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant => {
123                FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
124            }
125        }
126    }
127}
128
129impl From<fnet_matchers_ext::AddressRangeError> for FidlConversionError {
130    fn from(value: fnet_matchers_ext::AddressRangeError) -> Self {
131        match value {
132            fnet_matchers_ext::AddressRangeError::Invalid => {
133                FidlConversionError::InvalidAddressRange
134            }
135            fnet_matchers_ext::AddressRangeError::FamilyMismatch => {
136                FidlConversionError::AddressRangeFamilyMismatch
137            }
138        }
139    }
140}
141
142impl From<fnet_matchers_ext::SubnetError> for FidlConversionError {
143    fn from(value: fnet_matchers_ext::SubnetError) -> Self {
144        match value {
145            fnet_matchers_ext::SubnetError::PrefixTooLong => {
146                FidlConversionError::SubnetPrefixTooLong
147            }
148            fnet_matchers_ext::SubnetError::HostBitsSet => FidlConversionError::SubnetHostBitsSet,
149        }
150    }
151}
152
153impl From<fnet_matchers_ext::TransportProtocolError> for FidlConversionError {
154    fn from(value: fnet_matchers_ext::TransportProtocolError) -> Self {
155        match value {
156            fnet_matchers_ext::TransportProtocolError::Port(port_matcher_error) => {
157                port_matcher_error.into()
158            }
159            fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant => {
160                FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
161            }
162        }
163    }
164}
165
166// TODO(https://fxbug.dev/317058051): remove this when the Rust FIDL bindings
167// expose constants for these.
168mod type_names {
169    pub(super) const RESOURCE_ID: &str = "fuchsia.net.filter/ResourceId";
170    pub(super) const DOMAIN: &str = "fuchsia.net.filter/Domain";
171    pub(super) const IP_INSTALLATION_HOOK: &str = "fuchsia.net.filter/IpInstallationHook";
172    pub(super) const NAT_INSTALLATION_HOOK: &str = "fuchsia.net.filter/NatInstallationHook";
173    pub(super) const ROUTINE_TYPE: &str = "fuchsia.net.filter/RoutineType";
174    pub(super) const INTERFACE_MATCHER: &str = "fuchsia.net.matchers/Interface";
175    pub(super) const ADDRESS_MATCHER_TYPE: &str = "fuchsia.net.filter/AddressMatcherType";
176    pub(super) const TRANSPORT_PROTOCOL: &str = "fuchsia.net.matchers/TransportProtocol";
177    pub(super) const ACTION: &str = "fuchsia.net.filter/Action";
178    pub(super) const MARK_ACTION: &str = "fuchsia.net.filter/MarkAction";
179    pub(super) const TRANSPARENT_PROXY: &str = "fuchsia.net.filter/TransparentProxy";
180    pub(super) const RESOURCE: &str = "fuchsia.net.filter/Resource";
181    pub(super) const EVENT: &str = "fuchsia.net.filter/Event";
182    pub(super) const CHANGE: &str = "fuchsia.net.filter/Change";
183    pub(super) const CHANGE_VALIDATION_ERROR: &str = "fuchsia.net.filter/ChangeValidationError";
184    pub(super) const CHANGE_VALIDATION_RESULT: &str = "fuchsia.net.filter/ChangeValidationResult";
185    pub(super) const COMMIT_ERROR: &str = "fuchsia.net.filter/CommitError";
186    pub(super) const COMMIT_RESULT: &str = "fuchsia.net.filter/CommitResult";
187    pub(super) const NET_INTERFACES_PORT_CLASS: &str = "fuchsia.net.interfaces/PortClass";
188    pub(super) const HARDWARE_NETWORK_PORT_CLASS: &str = "fuchsia.hardware.network/PortClass";
189    pub(super) const REJECT_TYPE: &str = "fuchsia.net.filter/RejectType";
190}
191
192/// Extension type for [`fnet_filter::NamespaceId`].
193#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
194pub struct NamespaceId(pub String);
195
196/// Extension type for [`fnet_filter::RoutineId`].
197#[derive(Debug, Clone, PartialEq, Eq, Hash)]
198pub struct RoutineId {
199    pub namespace: NamespaceId,
200    pub name: String,
201}
202
203impl From<fnet_filter::RoutineId> for RoutineId {
204    fn from(id: fnet_filter::RoutineId) -> Self {
205        let fnet_filter::RoutineId { namespace, name } = id;
206        Self { namespace: NamespaceId(namespace), name }
207    }
208}
209
210impl From<RoutineId> for fnet_filter::RoutineId {
211    fn from(id: RoutineId) -> Self {
212        let RoutineId { namespace, name } = id;
213        let NamespaceId(namespace) = namespace;
214        Self { namespace, name }
215    }
216}
217
218/// Extension type for [`fnet_filter::RuleId`].
219#[derive(Debug, Clone, PartialEq, Eq, Hash)]
220pub struct RuleId {
221    pub routine: RoutineId,
222    pub index: u32,
223}
224
225impl From<fnet_filter::RuleId> for RuleId {
226    fn from(id: fnet_filter::RuleId) -> Self {
227        let fnet_filter::RuleId { routine, index } = id;
228        Self { routine: routine.into(), index }
229    }
230}
231
232impl From<RuleId> for fnet_filter::RuleId {
233    fn from(id: RuleId) -> Self {
234        let RuleId { routine, index } = id;
235        Self { routine: routine.into(), index }
236    }
237}
238
239/// Extension type for [`fnet_filter::ResourceId`].
240#[derive(Debug, Clone, PartialEq, Eq, Hash)]
241pub enum ResourceId {
242    Namespace(NamespaceId),
243    Routine(RoutineId),
244    Rule(RuleId),
245}
246
247impl TryFrom<fnet_filter::ResourceId> for ResourceId {
248    type Error = FidlConversionError;
249
250    fn try_from(id: fnet_filter::ResourceId) -> Result<Self, Self::Error> {
251        match id {
252            fnet_filter::ResourceId::Namespace(id) => Ok(Self::Namespace(NamespaceId(id))),
253            fnet_filter::ResourceId::Routine(id) => Ok(Self::Routine(id.into())),
254            fnet_filter::ResourceId::Rule(id) => Ok(Self::Rule(id.into())),
255            fnet_filter::ResourceId::__SourceBreaking { .. } => {
256                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
257            }
258        }
259    }
260}
261
262impl From<ResourceId> for fnet_filter::ResourceId {
263    fn from(id: ResourceId) -> Self {
264        match id {
265            ResourceId::Namespace(NamespaceId(id)) => fnet_filter::ResourceId::Namespace(id),
266            ResourceId::Routine(id) => fnet_filter::ResourceId::Routine(id.into()),
267            ResourceId::Rule(id) => fnet_filter::ResourceId::Rule(id.into()),
268        }
269    }
270}
271
272/// Extension type for [`fnet_filter::Domain`].
273#[derive(Debug, Clone, PartialEq)]
274pub enum Domain {
275    Ipv4,
276    Ipv6,
277    AllIp,
278}
279
280impl From<Domain> for fnet_filter::Domain {
281    fn from(domain: Domain) -> Self {
282        match domain {
283            Domain::Ipv4 => fnet_filter::Domain::Ipv4,
284            Domain::Ipv6 => fnet_filter::Domain::Ipv6,
285            Domain::AllIp => fnet_filter::Domain::AllIp,
286        }
287    }
288}
289
290impl TryFrom<fnet_filter::Domain> for Domain {
291    type Error = FidlConversionError;
292
293    fn try_from(domain: fnet_filter::Domain) -> Result<Self, Self::Error> {
294        match domain {
295            fnet_filter::Domain::Ipv4 => Ok(Self::Ipv4),
296            fnet_filter::Domain::Ipv6 => Ok(Self::Ipv6),
297            fnet_filter::Domain::AllIp => Ok(Self::AllIp),
298            fnet_filter::Domain::__SourceBreaking { .. } => {
299                Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
300            }
301        }
302    }
303}
304
305/// Extension type for [`fnet_filter::Namespace`].
306#[derive(Debug, Clone, PartialEq)]
307pub struct Namespace {
308    pub id: NamespaceId,
309    pub domain: Domain,
310}
311
312impl From<Namespace> for fnet_filter::Namespace {
313    fn from(namespace: Namespace) -> Self {
314        let Namespace { id, domain } = namespace;
315        let NamespaceId(id) = id;
316        Self { id: Some(id), domain: Some(domain.into()), __source_breaking: SourceBreaking }
317    }
318}
319
320impl TryFrom<fnet_filter::Namespace> for Namespace {
321    type Error = FidlConversionError;
322
323    fn try_from(namespace: fnet_filter::Namespace) -> Result<Self, Self::Error> {
324        let fnet_filter::Namespace { id, domain, __source_breaking } = namespace;
325        let id = NamespaceId(id.ok_or(FidlConversionError::MissingNamespaceId)?);
326        let domain = domain.ok_or(FidlConversionError::MissingNamespaceDomain)?.try_into()?;
327        Ok(Self { id, domain })
328    }
329}
330
331/// Extension type for [`fnet_filter::IpInstallationHook`].
332#[derive(Debug, Clone, Copy, PartialEq)]
333pub enum IpHook {
334    Ingress,
335    LocalIngress,
336    Forwarding,
337    LocalEgress,
338    Egress,
339}
340
341impl From<IpHook> for fnet_filter::IpInstallationHook {
342    fn from(hook: IpHook) -> Self {
343        match hook {
344            IpHook::Ingress => Self::Ingress,
345            IpHook::LocalIngress => Self::LocalIngress,
346            IpHook::Forwarding => Self::Forwarding,
347            IpHook::LocalEgress => Self::LocalEgress,
348            IpHook::Egress => Self::Egress,
349        }
350    }
351}
352
353impl TryFrom<fnet_filter::IpInstallationHook> for IpHook {
354    type Error = FidlConversionError;
355
356    fn try_from(hook: fnet_filter::IpInstallationHook) -> Result<Self, Self::Error> {
357        match hook {
358            fnet_filter::IpInstallationHook::Ingress => Ok(Self::Ingress),
359            fnet_filter::IpInstallationHook::LocalIngress => Ok(Self::LocalIngress),
360            fnet_filter::IpInstallationHook::Forwarding => Ok(Self::Forwarding),
361            fnet_filter::IpInstallationHook::LocalEgress => Ok(Self::LocalEgress),
362            fnet_filter::IpInstallationHook::Egress => Ok(Self::Egress),
363            fnet_filter::IpInstallationHook::__SourceBreaking { .. } => {
364                Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
365            }
366        }
367    }
368}
369
370/// Extension type for [`fnet_filter::NatInstallationHook`].
371#[derive(Debug, Clone, Copy, PartialEq)]
372pub enum NatHook {
373    Ingress,
374    LocalIngress,
375    LocalEgress,
376    Egress,
377}
378
379impl From<NatHook> for fnet_filter::NatInstallationHook {
380    fn from(hook: NatHook) -> Self {
381        match hook {
382            NatHook::Ingress => Self::Ingress,
383            NatHook::LocalIngress => Self::LocalIngress,
384            NatHook::LocalEgress => Self::LocalEgress,
385            NatHook::Egress => Self::Egress,
386        }
387    }
388}
389
390impl TryFrom<fnet_filter::NatInstallationHook> for NatHook {
391    type Error = FidlConversionError;
392
393    fn try_from(hook: fnet_filter::NatInstallationHook) -> Result<Self, Self::Error> {
394        match hook {
395            fnet_filter::NatInstallationHook::Ingress => Ok(Self::Ingress),
396            fnet_filter::NatInstallationHook::LocalIngress => Ok(Self::LocalIngress),
397            fnet_filter::NatInstallationHook::LocalEgress => Ok(Self::LocalEgress),
398            fnet_filter::NatInstallationHook::Egress => Ok(Self::Egress),
399            fnet_filter::NatInstallationHook::__SourceBreaking { .. } => {
400                Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
401            }
402        }
403    }
404}
405
406/// Extension type for [`fnet_filter::InstalledIpRoutine`].
407#[derive(Debug, Clone, PartialEq)]
408pub struct InstalledIpRoutine {
409    pub hook: IpHook,
410    pub priority: i32,
411}
412
413impl From<InstalledIpRoutine> for fnet_filter::InstalledIpRoutine {
414    fn from(routine: InstalledIpRoutine) -> Self {
415        let InstalledIpRoutine { hook, priority } = routine;
416        Self {
417            hook: Some(hook.into()),
418            priority: Some(priority),
419            __source_breaking: SourceBreaking,
420        }
421    }
422}
423
424impl TryFrom<fnet_filter::InstalledIpRoutine> for InstalledIpRoutine {
425    type Error = FidlConversionError;
426
427    fn try_from(routine: fnet_filter::InstalledIpRoutine) -> Result<Self, Self::Error> {
428        let fnet_filter::InstalledIpRoutine { hook, priority, __source_breaking } = routine;
429        let hook = hook.ok_or(FidlConversionError::MissingIpInstallationHook)?;
430        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
431        Ok(Self { hook: hook.try_into()?, priority })
432    }
433}
434
435/// Extension type for [`fnet_filter::InstalledNatRoutine`].
436#[derive(Debug, Clone, PartialEq)]
437pub struct InstalledNatRoutine {
438    pub hook: NatHook,
439    pub priority: i32,
440}
441
442impl From<InstalledNatRoutine> for fnet_filter::InstalledNatRoutine {
443    fn from(routine: InstalledNatRoutine) -> Self {
444        let InstalledNatRoutine { hook, priority } = routine;
445        Self {
446            hook: Some(hook.into()),
447            priority: Some(priority),
448            __source_breaking: SourceBreaking,
449        }
450    }
451}
452
453impl TryFrom<fnet_filter::InstalledNatRoutine> for InstalledNatRoutine {
454    type Error = FidlConversionError;
455
456    fn try_from(routine: fnet_filter::InstalledNatRoutine) -> Result<Self, Self::Error> {
457        let fnet_filter::InstalledNatRoutine { hook, priority, __source_breaking } = routine;
458        let hook = hook.ok_or(FidlConversionError::MissingNatInstallationHook)?;
459        let priority = priority.unwrap_or(fnet_filter::DEFAULT_ROUTINE_PRIORITY);
460        Ok(Self { hook: hook.try_into()?, priority })
461    }
462}
463
464/// Extension type for [`fnet_filter::RoutineType`].
465#[derive(Debug, Clone, PartialEq)]
466pub enum RoutineType {
467    Ip(Option<InstalledIpRoutine>),
468    Nat(Option<InstalledNatRoutine>),
469}
470
471impl RoutineType {
472    pub fn is_installed(&self) -> bool {
473        // The `InstalledIpRoutine` or `InstalledNatRoutine` configuration is
474        // optional, and when omitted, signifies an uninstalled routine.
475        match self {
476            Self::Ip(Some(_)) | Self::Nat(Some(_)) => true,
477            Self::Ip(None) | Self::Nat(None) => false,
478        }
479    }
480}
481
482impl From<RoutineType> for fnet_filter::RoutineType {
483    fn from(routine: RoutineType) -> Self {
484        match routine {
485            RoutineType::Ip(installation) => Self::Ip(fnet_filter::IpRoutine {
486                installation: installation.map(Into::into),
487                __source_breaking: SourceBreaking,
488            }),
489            RoutineType::Nat(installation) => Self::Nat(fnet_filter::NatRoutine {
490                installation: installation.map(Into::into),
491                __source_breaking: SourceBreaking,
492            }),
493        }
494    }
495}
496
497impl TryFrom<fnet_filter::RoutineType> for RoutineType {
498    type Error = FidlConversionError;
499
500    fn try_from(type_: fnet_filter::RoutineType) -> Result<Self, Self::Error> {
501        match type_ {
502            fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
503                installation,
504                __source_breaking,
505            }) => Ok(RoutineType::Ip(installation.map(TryInto::try_into).transpose()?)),
506            fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine {
507                installation,
508                __source_breaking,
509            }) => Ok(RoutineType::Nat(installation.map(TryInto::try_into).transpose()?)),
510            fnet_filter::RoutineType::__SourceBreaking { .. } => {
511                Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
512            }
513        }
514    }
515}
516
517/// Extension type for [`fnet_filter::Routine`].
518#[derive(Debug, Clone, PartialEq)]
519pub struct Routine {
520    pub id: RoutineId,
521    pub routine_type: RoutineType,
522}
523
524impl From<Routine> for fnet_filter::Routine {
525    fn from(routine: Routine) -> Self {
526        let Routine { id, routine_type: type_ } = routine;
527        Self { id: Some(id.into()), type_: Some(type_.into()), __source_breaking: SourceBreaking }
528    }
529}
530
531impl TryFrom<fnet_filter::Routine> for Routine {
532    type Error = FidlConversionError;
533
534    fn try_from(routine: fnet_filter::Routine) -> Result<Self, Self::Error> {
535        let fnet_filter::Routine { id, type_, __source_breaking } = routine;
536        let id = id.ok_or(FidlConversionError::MissingRoutineId)?;
537        let type_ = type_.ok_or(FidlConversionError::MissingRoutineType)?;
538        Ok(Self { id: id.into(), routine_type: type_.try_into()? })
539    }
540}
541
542/// Extension type for [`fnet_filter::Matchers`].
543#[derive(Default, Clone, PartialEq)]
544pub struct Matchers {
545    pub in_interface: Option<fnet_matchers_ext::Interface>,
546    pub out_interface: Option<fnet_matchers_ext::Interface>,
547    pub src_addr: Option<fnet_matchers_ext::Address>,
548    pub dst_addr: Option<fnet_matchers_ext::Address>,
549    pub transport_protocol: Option<fnet_matchers_ext::TransportProtocol>,
550    pub ebpf_program: Option<febpf::ProgramId>,
551}
552
553impl From<Matchers> for fnet_filter::Matchers {
554    fn from(matchers: Matchers) -> Self {
555        let Matchers {
556            in_interface,
557            out_interface,
558            src_addr,
559            dst_addr,
560            transport_protocol,
561            ebpf_program,
562        } = matchers;
563        Self {
564            in_interface: in_interface.map(Into::into),
565            out_interface: out_interface.map(Into::into),
566            src_addr: src_addr.map(Into::into),
567            dst_addr: dst_addr.map(Into::into),
568            transport_protocol: transport_protocol.map(Into::into),
569            ebpf_program: ebpf_program.map(Into::into),
570            __source_breaking: SourceBreaking,
571        }
572    }
573}
574
575impl TryFrom<fnet_filter::Matchers> for Matchers {
576    type Error = FidlConversionError;
577
578    fn try_from(matchers: fnet_filter::Matchers) -> Result<Self, Self::Error> {
579        let fnet_filter::Matchers {
580            in_interface,
581            out_interface,
582            src_addr,
583            dst_addr,
584            transport_protocol,
585            ebpf_program,
586            __source_breaking,
587        } = matchers;
588        Ok(Self {
589            in_interface: in_interface.map(TryInto::try_into).transpose()?,
590            out_interface: out_interface.map(TryInto::try_into).transpose()?,
591            src_addr: src_addr.map(TryInto::try_into).transpose()?,
592            dst_addr: dst_addr.map(TryInto::try_into).transpose()?,
593            transport_protocol: transport_protocol.map(TryInto::try_into).transpose()?,
594            ebpf_program: ebpf_program.map(Into::into),
595        })
596    }
597}
598
599impl Debug for Matchers {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        let mut debug_struct = f.debug_struct("Matchers");
602
603        let Matchers {
604            in_interface,
605            out_interface,
606            src_addr,
607            dst_addr,
608            transport_protocol,
609            ebpf_program,
610        } = &self;
611
612        // Omit empty fields.
613        if let Some(matcher) = in_interface {
614            let _ = debug_struct.field("in_interface", matcher);
615        }
616
617        if let Some(matcher) = out_interface {
618            let _ = debug_struct.field("out_interface", matcher);
619        }
620
621        if let Some(matcher) = src_addr {
622            let _ = debug_struct.field("src_addr", matcher);
623        }
624
625        if let Some(matcher) = dst_addr {
626            let _ = debug_struct.field("dst_addr", matcher);
627        }
628
629        if let Some(matcher) = transport_protocol {
630            let _ = debug_struct.field("transport_protocol", matcher);
631        }
632
633        if let Some(matcher) = ebpf_program {
634            let _ = debug_struct.field("ebpf_program", matcher);
635        }
636
637        debug_struct.finish()
638    }
639}
640
641/// Extension type for [`fnet_filter::Action`].
642#[derive(Debug, Clone, PartialEq)]
643pub enum Action {
644    Accept,
645    Drop,
646    Jump(String),
647    Return,
648    TransparentProxy(TransparentProxy),
649    Redirect { dst_port: Option<PortRange> },
650    Masquerade { src_port: Option<PortRange> },
651    Mark { domain: fnet::MarkDomain, action: MarkAction },
652    None,
653    Reject(RejectType),
654}
655
656#[derive(Debug, Clone, PartialEq)]
657pub enum MarkAction {
658    SetMark { clearing_mask: fnet::Mark, mark: fnet::Mark },
659}
660
661/// Extension type for [`fnet_filter::TransparentProxy_`].
662#[derive(Debug, Clone, PartialEq)]
663pub enum TransparentProxy {
664    LocalAddr(fnet::IpAddress),
665    LocalPort(NonZeroU16),
666    LocalAddrAndPort(fnet::IpAddress, NonZeroU16),
667}
668
669#[derive(Debug, Clone, PartialEq)]
670pub struct PortRange(pub RangeInclusive<NonZeroU16>);
671
672impl From<PortRange> for fnet_filter::PortRange {
673    fn from(range: PortRange) -> Self {
674        let PortRange(range) = range;
675        Self { start: range.start().get(), end: range.end().get() }
676    }
677}
678
679impl TryFrom<fnet_filter::PortRange> for PortRange {
680    type Error = FidlConversionError;
681
682    fn try_from(range: fnet_filter::PortRange) -> Result<Self, Self::Error> {
683        let fnet_filter::PortRange { start, end } = range;
684        if start > end {
685            Err(FidlConversionError::InvalidPortRange)
686        } else {
687            let start = NonZeroU16::new(start).ok_or(FidlConversionError::UnspecifiedNatPort)?;
688            let end = NonZeroU16::new(end).ok_or(FidlConversionError::UnspecifiedNatPort)?;
689            Ok(Self(start..=end))
690        }
691    }
692}
693
694#[derive(Debug, Clone, Copy, PartialEq)]
695pub enum RejectType {
696    TcpReset,
697    NetUnreachable,
698    HostUnreachable,
699    ProtoUnreachable,
700    PortUnreachable,
701    RoutePolicyFail,
702    RejectRoute,
703    AdminProhibited,
704}
705
706impl From<RejectType> for fnet_filter::RejectType {
707    fn from(value: RejectType) -> Self {
708        match value {
709            RejectType::TcpReset => fnet_filter::RejectType::TcpReset,
710            RejectType::NetUnreachable => fnet_filter::RejectType::NetUnreachable,
711            RejectType::HostUnreachable => fnet_filter::RejectType::HostUnreachable,
712            RejectType::ProtoUnreachable => fnet_filter::RejectType::ProtoUnreachable,
713            RejectType::PortUnreachable => fnet_filter::RejectType::PortUnreachable,
714            RejectType::RoutePolicyFail => fnet_filter::RejectType::RoutePolicyFail,
715            RejectType::RejectRoute => fnet_filter::RejectType::RejectRoute,
716            RejectType::AdminProhibited => fnet_filter::RejectType::AdminProhibited,
717        }
718    }
719}
720
721impl TryFrom<fnet_filter::RejectType> for RejectType {
722    type Error = FidlConversionError;
723
724    fn try_from(value: fnet_filter::RejectType) -> Result<Self, Self::Error> {
725        match value {
726            fnet_filter::RejectType::TcpReset => Ok(RejectType::TcpReset),
727            fnet_filter::RejectType::NetUnreachable => Ok(RejectType::NetUnreachable),
728            fnet_filter::RejectType::HostUnreachable => Ok(RejectType::HostUnreachable),
729            fnet_filter::RejectType::ProtoUnreachable => Ok(RejectType::ProtoUnreachable),
730            fnet_filter::RejectType::PortUnreachable => Ok(RejectType::PortUnreachable),
731            fnet_filter::RejectType::RoutePolicyFail => Ok(RejectType::RoutePolicyFail),
732            fnet_filter::RejectType::RejectRoute => Ok(RejectType::RejectRoute),
733            fnet_filter::RejectType::AdminProhibited => Ok(RejectType::AdminProhibited),
734            fnet_filter::RejectType::__SourceBreaking { .. } => {
735                Err(FidlConversionError::UnknownUnionVariant(type_names::REJECT_TYPE))
736            }
737        }
738    }
739}
740
741impl From<Action> for fnet_filter::Action {
742    fn from(action: Action) -> Self {
743        match action {
744            Action::Accept => Self::Accept(fnet_filter::Empty {}),
745            Action::Drop => Self::Drop(fnet_filter::Empty {}),
746            Action::Jump(target) => Self::Jump(target),
747            Action::Return => Self::Return_(fnet_filter::Empty {}),
748            Action::TransparentProxy(proxy) => Self::TransparentProxy(match proxy {
749                TransparentProxy::LocalAddr(addr) => {
750                    fnet_filter::TransparentProxy_::LocalAddr(addr)
751                }
752                TransparentProxy::LocalPort(port) => {
753                    fnet_filter::TransparentProxy_::LocalPort(port.get())
754                }
755                TransparentProxy::LocalAddrAndPort(addr, port) => {
756                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
757                        addr,
758                        port: port.get(),
759                    })
760                }
761            }),
762            Action::Redirect { dst_port } => Self::Redirect(fnet_filter::Redirect {
763                dst_port: dst_port.map(Into::into),
764                __source_breaking: SourceBreaking,
765            }),
766            Action::Masquerade { src_port } => Self::Masquerade(fnet_filter::Masquerade {
767                src_port: src_port.map(Into::into),
768                __source_breaking: SourceBreaking,
769            }),
770            Action::Mark { domain, action } => {
771                Self::Mark(fnet_filter::Mark { domain, action: action.into() })
772            }
773            Action::None => Self::None(fnet_filter::Empty {}),
774            Action::Reject(reject_type) => {
775                Self::Reject(fnet_filter::Reject { reject_type: reject_type.into() })
776            }
777        }
778    }
779}
780
781impl TryFrom<fnet_filter::Action> for Action {
782    type Error = FidlConversionError;
783
784    fn try_from(action: fnet_filter::Action) -> Result<Self, Self::Error> {
785        match action {
786            fnet_filter::Action::Accept(fnet_filter::Empty {}) => Ok(Self::Accept),
787            fnet_filter::Action::Drop(fnet_filter::Empty {}) => Ok(Self::Drop),
788            fnet_filter::Action::Jump(target) => Ok(Self::Jump(target)),
789            fnet_filter::Action::Return_(fnet_filter::Empty {}) => Ok(Self::Return),
790            fnet_filter::Action::TransparentProxy(proxy) => {
791                Ok(Self::TransparentProxy(match proxy {
792                    fnet_filter::TransparentProxy_::LocalAddr(addr) => {
793                        TransparentProxy::LocalAddr(addr)
794                    }
795                    fnet_filter::TransparentProxy_::LocalPort(port) => {
796                        let port = NonZeroU16::new(port)
797                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
798                        TransparentProxy::LocalPort(port)
799                    }
800                    fnet_filter::TransparentProxy_::LocalAddrAndPort(fnet_filter::SocketAddr {
801                        addr,
802                        port,
803                    }) => {
804                        let port = NonZeroU16::new(port)
805                            .ok_or(FidlConversionError::UnspecifiedTransparentProxyPort)?;
806                        TransparentProxy::LocalAddrAndPort(addr, port)
807                    }
808                    fnet_filter::TransparentProxy_::__SourceBreaking { .. } => {
809                        return Err(FidlConversionError::UnknownUnionVariant(
810                            type_names::TRANSPARENT_PROXY,
811                        ));
812                    }
813                }))
814            }
815            fnet_filter::Action::Redirect(fnet_filter::Redirect {
816                dst_port,
817                __source_breaking,
818            }) => Ok(Self::Redirect { dst_port: dst_port.map(TryInto::try_into).transpose()? }),
819            fnet_filter::Action::Masquerade(fnet_filter::Masquerade {
820                src_port,
821                __source_breaking,
822            }) => Ok(Self::Masquerade { src_port: src_port.map(TryInto::try_into).transpose()? }),
823            fnet_filter::Action::Mark(fnet_filter::Mark { domain, action }) => {
824                Ok(Self::Mark { domain, action: action.try_into()? })
825            }
826            fnet_filter::Action::__SourceBreaking { .. } => {
827                Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
828            }
829            fnet_filter::Action::None(fnet_filter::Empty {}) => Ok(Self::None),
830            fnet_filter::Action::Reject(fnet_filter::Reject { reject_type }) => {
831                Ok(Self::Reject(reject_type.try_into()?))
832            }
833        }
834    }
835}
836
837impl From<MarkAction> for fnet_filter::MarkAction {
838    fn from(action: MarkAction) -> Self {
839        match action {
840            MarkAction::SetMark { clearing_mask, mark } => {
841                Self::SetMark(fnet_filter::SetMark { clearing_mask, mark })
842            }
843        }
844    }
845}
846
847impl TryFrom<fnet_filter::MarkAction> for MarkAction {
848    type Error = FidlConversionError;
849    fn try_from(action: fnet_filter::MarkAction) -> Result<Self, Self::Error> {
850        match action {
851            fnet_filter::MarkAction::SetMark(fnet_filter::SetMark { clearing_mask, mark }) => {
852                Ok(Self::SetMark { clearing_mask, mark })
853            }
854            fnet_filter::MarkAction::__SourceBreaking { .. } => {
855                Err(FidlConversionError::UnknownUnionVariant(type_names::MARK_ACTION))
856            }
857        }
858    }
859}
860
861/// Extension type for [`fnet_filter::Rule`].
862#[derive(Debug, Clone, PartialEq)]
863pub struct Rule {
864    pub id: RuleId,
865    pub matchers: Matchers,
866    pub action: Action,
867}
868
869impl From<Rule> for fnet_filter::Rule {
870    fn from(rule: Rule) -> Self {
871        let Rule { id, matchers, action } = rule;
872        Self { id: id.into(), matchers: matchers.into(), action: action.into() }
873    }
874}
875
876impl TryFrom<fnet_filter::Rule> for Rule {
877    type Error = FidlConversionError;
878
879    fn try_from(rule: fnet_filter::Rule) -> Result<Self, Self::Error> {
880        let fnet_filter::Rule { id, matchers, action } = rule;
881        Ok(Self { id: id.into(), matchers: matchers.try_into()?, action: action.try_into()? })
882    }
883}
884
885/// Extension type for [`fnet_filter::Resource`].
886#[derive(Debug, Clone, PartialEq)]
887pub enum Resource {
888    Namespace(Namespace),
889    Routine(Routine),
890    Rule(Rule),
891}
892
893impl Resource {
894    pub fn id(&self) -> ResourceId {
895        match self {
896            Self::Namespace(Namespace { id, domain: _ }) => ResourceId::Namespace(id.clone()),
897            Self::Routine(Routine { id, routine_type: _ }) => ResourceId::Routine(id.clone()),
898            Self::Rule(Rule { id, matchers: _, action: _ }) => ResourceId::Rule(id.clone()),
899        }
900    }
901}
902
903impl From<Resource> for fnet_filter::Resource {
904    fn from(resource: Resource) -> Self {
905        match resource {
906            Resource::Namespace(namespace) => Self::Namespace(namespace.into()),
907            Resource::Routine(routine) => Self::Routine(routine.into()),
908            Resource::Rule(rule) => Self::Rule(rule.into()),
909        }
910    }
911}
912
913impl TryFrom<fnet_filter::Resource> for Resource {
914    type Error = FidlConversionError;
915
916    fn try_from(resource: fnet_filter::Resource) -> Result<Self, Self::Error> {
917        match resource {
918            fnet_filter::Resource::Namespace(namespace) => {
919                Ok(Self::Namespace(namespace.try_into()?))
920            }
921            fnet_filter::Resource::Routine(routine) => Ok(Self::Routine(routine.try_into()?)),
922            fnet_filter::Resource::Rule(rule) => Ok(Self::Rule(rule.try_into()?)),
923            fnet_filter::Resource::__SourceBreaking { .. } => {
924                Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
925            }
926        }
927    }
928}
929
930/// Extension type for [`fnet_filter::ControllerId`].
931#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
932pub struct ControllerId(pub String);
933
934/// Extension type for [`fnet_filter::Event`].
935#[derive(Debug, Clone, PartialEq)]
936pub enum Event {
937    Existing(ControllerId, Resource),
938    Idle,
939    Added(ControllerId, Resource),
940    Removed(ControllerId, ResourceId),
941    EndOfUpdate,
942}
943
944impl From<Event> for fnet_filter::Event {
945    fn from(event: Event) -> Self {
946        match event {
947            Event::Existing(controller, resource) => {
948                let ControllerId(id) = controller;
949                Self::Existing(fnet_filter::ExistingResource {
950                    controller: id,
951                    resource: resource.into(),
952                })
953            }
954            Event::Idle => Self::Idle(fnet_filter::Empty {}),
955            Event::Added(controller, resource) => {
956                let ControllerId(id) = controller;
957                Self::Added(fnet_filter::AddedResource {
958                    controller: id,
959                    resource: resource.into(),
960                })
961            }
962            Event::Removed(controller, resource) => {
963                let ControllerId(id) = controller;
964                Self::Removed(fnet_filter::RemovedResource {
965                    controller: id,
966                    resource: resource.into(),
967                })
968            }
969            Event::EndOfUpdate => Self::EndOfUpdate(fnet_filter::Empty {}),
970        }
971    }
972}
973
974impl TryFrom<fnet_filter::Event> for Event {
975    type Error = FidlConversionError;
976
977    fn try_from(event: fnet_filter::Event) -> Result<Self, Self::Error> {
978        match event {
979            fnet_filter::Event::Existing(fnet_filter::ExistingResource {
980                controller,
981                resource,
982            }) => Ok(Self::Existing(ControllerId(controller), resource.try_into()?)),
983            fnet_filter::Event::Idle(fnet_filter::Empty {}) => Ok(Self::Idle),
984            fnet_filter::Event::Added(fnet_filter::AddedResource { controller, resource }) => {
985                Ok(Self::Added(ControllerId(controller), resource.try_into()?))
986            }
987            fnet_filter::Event::Removed(fnet_filter::RemovedResource { controller, resource }) => {
988                Ok(Self::Removed(ControllerId(controller), resource.try_into()?))
989            }
990            fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}) => Ok(Self::EndOfUpdate),
991            fnet_filter::Event::__SourceBreaking { .. } => {
992                Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
993            }
994        }
995    }
996}
997
998/// Filter watcher creation errors.
999#[derive(Debug, Error)]
1000pub enum WatcherCreationError {
1001    #[error("failed to create filter watcher proxy: {0}")]
1002    CreateProxy(fidl::Error),
1003    #[error("failed to get filter watcher: {0}")]
1004    GetWatcher(fidl::Error),
1005}
1006
1007/// Filter watcher `Watch` errors.
1008#[derive(Debug, Error)]
1009pub enum WatchError {
1010    /// The call to `Watch` returned a FIDL error.
1011    #[error("the call to `Watch()` failed: {0}")]
1012    Fidl(fidl::Error),
1013    /// The event returned by `Watch` encountered a conversion error.
1014    #[error("failed to convert event returned by `Watch()`: {0}")]
1015    Conversion(FidlConversionError),
1016    /// The server returned an empty batch of events.
1017    #[error("the call to `Watch()` returned an empty batch of events")]
1018    EmptyEventBatch,
1019}
1020
1021/// Connects to the watcher protocol and converts the Hanging-Get style API into
1022/// an Event stream.
1023///
1024/// Each call to `Watch` returns a batch of events, which are flattened into a
1025/// single stream. If an error is encountered while calling `Watch` or while
1026/// converting the event, the stream is immediately terminated.
1027pub fn event_stream_from_state(
1028    state: fnet_filter::StateProxy,
1029) -> Result<impl Stream<Item = Result<Event, WatchError>>, WatcherCreationError> {
1030    let (watcher, server_end) = fidl::endpoints::create_proxy::<fnet_filter::WatcherMarker>();
1031    state
1032        .get_watcher(&fnet_filter::WatcherOptions::default(), server_end)
1033        .map_err(WatcherCreationError::GetWatcher)?;
1034
1035    let stream = futures::stream::try_unfold(watcher, |watcher| async {
1036        let events = watcher.watch().await.map_err(WatchError::Fidl)?;
1037        if events.is_empty() {
1038            return Err(WatchError::EmptyEventBatch);
1039        }
1040
1041        let event_stream = futures::stream::iter(events).map(Ok).and_then(|event| {
1042            futures::future::ready(event.try_into().map_err(WatchError::Conversion))
1043        });
1044        Ok(Some((event_stream, watcher)))
1045    })
1046    .try_flatten();
1047
1048    Ok(stream)
1049}
1050
1051/// Errors returned by [`get_existing_resources`].
1052#[derive(Debug, Error)]
1053pub enum GetExistingResourcesError {
1054    /// There was an error in the event stream.
1055    #[error("there was an error in the event stream: {0}")]
1056    ErrorInStream(WatchError),
1057    /// There was an unexpected event in the event stream. Only `existing` or
1058    /// `idle` events are expected.
1059    #[error("there was an unexpected event in the event stream: {0:?}")]
1060    UnexpectedEvent(Event),
1061    /// A duplicate existing resource was reported in the event stream.
1062    #[error("a duplicate existing resource was reported")]
1063    DuplicateResource(Resource),
1064    /// The event stream unexpectedly ended.
1065    #[error("the event stream unexpectedly ended")]
1066    StreamEnded,
1067}
1068
1069/// A trait for types holding filtering state that can be updated by change
1070/// events.
1071pub trait Update {
1072    /// Add the resource to the specified controller's state.
1073    ///
1074    /// Optionally returns a resource that has already been added to the
1075    /// controller with the same [`ResourceId`].
1076    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource>;
1077
1078    /// Remove the resource from the specified controller's state.
1079    ///
1080    /// Returns the removed resource, if present.
1081    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource>;
1082}
1083
1084impl Update for HashMap<ControllerId, HashMap<ResourceId, Resource>> {
1085    fn add(&mut self, controller: ControllerId, resource: Resource) -> Option<Resource> {
1086        self.entry(controller).or_default().insert(resource.id(), resource)
1087    }
1088
1089    fn remove(&mut self, controller: ControllerId, resource: &ResourceId) -> Option<Resource> {
1090        self.get_mut(&controller)?.remove(resource)
1091    }
1092}
1093
1094/// Collects all `existing` events from the stream, stopping once the `idle`
1095/// event is observed.
1096pub async fn get_existing_resources<C: Update + Default>(
1097    stream: impl Stream<Item = Result<Event, WatchError>>,
1098) -> Result<C, GetExistingResourcesError> {
1099    async_utils::fold::fold_while(
1100        stream,
1101        Ok(C::default()),
1102        |resources: Result<C, GetExistingResourcesError>, event| {
1103            let mut resources =
1104                resources.expect("`resources` must be `Ok`, because we stop folding on err");
1105            futures::future::ready(match event {
1106                Err(e) => FoldWhile::Done(Err(GetExistingResourcesError::ErrorInStream(e))),
1107                Ok(e) => match e {
1108                    Event::Existing(controller, resource) => {
1109                        if let Some(resource) = resources.add(controller, resource) {
1110                            FoldWhile::Done(Err(GetExistingResourcesError::DuplicateResource(
1111                                resource,
1112                            )))
1113                        } else {
1114                            FoldWhile::Continue(Ok(resources))
1115                        }
1116                    }
1117                    Event::Idle => FoldWhile::Done(Ok(resources)),
1118                    e @ (Event::Added(_, _) | Event::Removed(_, _) | Event::EndOfUpdate) => {
1119                        FoldWhile::Done(Err(GetExistingResourcesError::UnexpectedEvent(e)))
1120                    }
1121                },
1122            })
1123        },
1124    )
1125    .await
1126    .short_circuited()
1127    .map_err(|_resources| GetExistingResourcesError::StreamEnded)?
1128}
1129
1130/// Errors returned by [`wait_for_condition`].
1131#[derive(Debug, Error)]
1132pub enum WaitForConditionError {
1133    /// There was an error in the event stream.
1134    #[error("there was an error in the event stream: {0}")]
1135    ErrorInStream(WatchError),
1136    /// There was an `Added` event for an already existing resource.
1137    #[error("observed an added event for an already existing resource: {0:?}")]
1138    AddedAlreadyExisting(Resource),
1139    /// There was a `Removed` event for a non-existent resource.
1140    #[error("observed a removed event for a non-existent resource: {0:?}")]
1141    RemovedNonExistent(ResourceId),
1142    /// The event stream unexpectedly ended.
1143    #[error("the event stream unexpectedly ended")]
1144    StreamEnded,
1145}
1146
1147/// Wait for a condition on filtering state to be satisfied.
1148///
1149/// With the given `initial_state`, take events from `event_stream` and update
1150/// the state, calling `predicate` whenever the state changes. When predicates
1151/// returns `True` yield `Ok(())`.
1152pub async fn wait_for_condition<
1153    C: Update,
1154    S: Stream<Item = Result<Event, WatchError>>,
1155    F: Fn(&C) -> bool,
1156>(
1157    event_stream: S,
1158    initial_state: &mut C,
1159    predicate: F,
1160) -> Result<(), WaitForConditionError> {
1161    async_utils::fold::try_fold_while(
1162        event_stream.map_err(WaitForConditionError::ErrorInStream),
1163        initial_state,
1164        |resources: &mut C, event| {
1165            futures::future::ready(match event {
1166                Event::Existing(controller, resource) | Event::Added(controller, resource) => {
1167                    if let Some(resource) = resources.add(controller, resource) {
1168                        Err(WaitForConditionError::AddedAlreadyExisting(resource))
1169                    } else {
1170                        Ok(FoldWhile::Continue(resources))
1171                    }
1172                }
1173                Event::Removed(controller, resource) => resources
1174                    .remove(controller, &resource)
1175                    .map(|_| FoldWhile::Continue(resources))
1176                    .ok_or(WaitForConditionError::RemovedNonExistent(resource)),
1177                // Wait until a transactional update has been completed to call
1178                // the predicate so it's not run against partially-updated
1179                // state.
1180                Event::Idle | Event::EndOfUpdate => {
1181                    if predicate(&resources) {
1182                        Ok(FoldWhile::Done(()))
1183                    } else {
1184                        Ok(FoldWhile::Continue(resources))
1185                    }
1186                }
1187            })
1188        },
1189    )
1190    .await?
1191    .short_circuited()
1192    .map_err(|_resources: &mut C| WaitForConditionError::StreamEnded)
1193}
1194
1195/// Namespace controller creation errors.
1196#[derive(Debug, Error)]
1197pub enum ControllerCreationError {
1198    #[error("failed to create namespace controller proxy: {0}")]
1199    CreateProxy(fidl::Error),
1200    #[error("failed to open namespace controller: {0}")]
1201    OpenController(fidl::Error),
1202    #[error("server did not emit OnIdAssigned event")]
1203    NoIdAssigned,
1204    #[error("failed to observe ID assignment event: {0}")]
1205    IdAssignment(fidl::Error),
1206}
1207
1208/// Errors for individual changes pushed.
1209///
1210/// Extension type for the error variants of [`fnet_filter::ChangeValidationError`].
1211#[derive(Debug, Error, PartialEq)]
1212pub enum ChangeValidationError {
1213    #[error("change contains a resource that is missing a required field")]
1214    MissingRequiredField,
1215    #[error("rule specifies an invalid interface matcher")]
1216    InvalidInterfaceMatcher,
1217    #[error("rule specifies an invalid address matcher")]
1218    InvalidAddressMatcher,
1219    #[error("rule specifies an invalid port matcher")]
1220    InvalidPortMatcher,
1221    #[error("rule specifies an invalid transparent proxy action")]
1222    InvalidTransparentProxyAction,
1223    #[error("rule specifies an invalid NAT action")]
1224    InvalidNatAction,
1225    #[error("rule specifies an invalid port range")]
1226    InvalidPortRange,
1227}
1228
1229impl TryFrom<fnet_filter::ChangeValidationError> for ChangeValidationError {
1230    type Error = FidlConversionError;
1231
1232    fn try_from(error: fnet_filter::ChangeValidationError) -> Result<Self, Self::Error> {
1233        match error {
1234            fnet_filter::ChangeValidationError::MissingRequiredField => {
1235                Ok(Self::MissingRequiredField)
1236            }
1237            fnet_filter::ChangeValidationError::InvalidInterfaceMatcher => {
1238                Ok(Self::InvalidInterfaceMatcher)
1239            }
1240            fnet_filter::ChangeValidationError::InvalidAddressMatcher => {
1241                Ok(Self::InvalidAddressMatcher)
1242            }
1243            fnet_filter::ChangeValidationError::InvalidPortMatcher => Ok(Self::InvalidPortMatcher),
1244            fnet_filter::ChangeValidationError::InvalidTransparentProxyAction => {
1245                Ok(Self::InvalidTransparentProxyAction)
1246            }
1247            fnet_filter::ChangeValidationError::InvalidNatAction => Ok(Self::InvalidNatAction),
1248            fnet_filter::ChangeValidationError::InvalidPortRange => Ok(Self::InvalidPortRange),
1249            fnet_filter::ChangeValidationError::Ok
1250            | fnet_filter::ChangeValidationError::NotReached => {
1251                Err(FidlConversionError::NotAnError)
1252            }
1253            fnet_filter::ChangeValidationError::__SourceBreaking { unknown_ordinal: _ } => {
1254                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_ERROR))
1255            }
1256        }
1257    }
1258}
1259
1260#[derive(Debug, Error)]
1261pub enum RegisterEbpfProgramError {
1262    #[error("failed to call FIDL method: {0}")]
1263    CallMethod(fidl::Error),
1264
1265    #[error("failed to link the program")]
1266    LinkFailed,
1267
1268    #[error("failed to initialize a map")]
1269    MapFailed,
1270
1271    #[error("the program is already registered")]
1272    AlreadyRegistered,
1273
1274    #[error("the request is missing a required field")]
1275    MissingRequiredField,
1276}
1277
1278impl From<fnet_filter::RegisterEbpfProgramError> for RegisterEbpfProgramError {
1279    fn from(error: fnet_filter::RegisterEbpfProgramError) -> Self {
1280        match error {
1281            fnet_filter::RegisterEbpfProgramError::LinkFailed => Self::LinkFailed,
1282            fnet_filter::RegisterEbpfProgramError::MapFailed => Self::MapFailed,
1283            fnet_filter::RegisterEbpfProgramError::AlreadyRegistered => Self::AlreadyRegistered,
1284            fnet_filter::RegisterEbpfProgramError::MissingRequiredField => {
1285                Self::MissingRequiredField
1286            }
1287        }
1288    }
1289}
1290
1291/// Errors for the NamespaceController.PushChanges method.
1292#[derive(Debug, Error)]
1293pub enum PushChangesError {
1294    #[error("failed to call FIDL method: {0}")]
1295    CallMethod(fidl::Error),
1296    #[error("too many changes were pushed to the server")]
1297    TooManyChanges,
1298    #[error("invalid change(s) pushed: {0:?}")]
1299    ErrorOnChange(Vec<(Change, ChangeValidationError)>),
1300    #[error("unknown FIDL type: {0}")]
1301    FidlConversion(#[from] FidlConversionError),
1302}
1303
1304/// Errors for individual changes committed.
1305///
1306/// Extension type for the error variants of [`fnet_filter::CommitError`].
1307#[derive(Debug, Error, PartialEq)]
1308pub enum ChangeCommitError {
1309    #[error("the change referred to an unknown namespace")]
1310    NamespaceNotFound,
1311    #[error("the change referred to an unknown routine")]
1312    RoutineNotFound,
1313    #[error("the change referred to an unknown rule")]
1314    RuleNotFound,
1315    #[error("the specified resource already exists")]
1316    AlreadyExists,
1317    #[error("the change includes a rule that jumps to an installed routine")]
1318    TargetRoutineIsInstalled,
1319    #[error("the change includes an eBPF matcher with an invalid program ID")]
1320    InvalidEbpfProgramId,
1321}
1322
1323impl TryFrom<fnet_filter::CommitError> for ChangeCommitError {
1324    type Error = FidlConversionError;
1325
1326    fn try_from(error: fnet_filter::CommitError) -> Result<Self, Self::Error> {
1327        match error {
1328            fnet_filter::CommitError::NamespaceNotFound => Ok(Self::NamespaceNotFound),
1329            fnet_filter::CommitError::RoutineNotFound => Ok(Self::RoutineNotFound),
1330            fnet_filter::CommitError::RuleNotFound => Ok(Self::RuleNotFound),
1331            fnet_filter::CommitError::AlreadyExists => Ok(Self::AlreadyExists),
1332            fnet_filter::CommitError::TargetRoutineIsInstalled => {
1333                Ok(Self::TargetRoutineIsInstalled)
1334            }
1335            fnet_filter::CommitError::InvalidEbpfProgramId => Ok(Self::InvalidEbpfProgramId),
1336            fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1337                Err(FidlConversionError::NotAnError)
1338            }
1339            fnet_filter::CommitError::__SourceBreaking { unknown_ordinal: _ } => {
1340                Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR))
1341            }
1342        }
1343    }
1344}
1345
1346/// Errors for the NamespaceController.Commit method.
1347#[derive(Debug, Error)]
1348pub enum CommitError {
1349    #[error("failed to call FIDL method: {0}")]
1350    CallMethod(fidl::Error),
1351    #[error("rule has a matcher that is unavailable in its context: {0:?}")]
1352    RuleWithInvalidMatcher(RuleId),
1353    #[error("rule has an action that is invalid for its routine: {0:?}")]
1354    RuleWithInvalidAction(RuleId),
1355    #[error("rule has a TransparentProxy action but not a valid transport protocol matcher: {0:?}")]
1356    TransparentProxyWithInvalidMatcher(RuleId),
1357    #[error(
1358        "rule has a Redirect action that specifies a destination port but not a valid transport \
1359        protocol matcher: {0:?}"
1360    )]
1361    RedirectWithInvalidMatcher(RuleId),
1362    #[error(
1363        "rule has a Masquerade action that specifies a source port but not a valid transport \
1364        protocol matcher: {0:?}"
1365    )]
1366    MasqueradeWithInvalidMatcher(RuleId),
1367    #[error("rule has a Reject action but not a valid transport protocol matcher: {0:?}")]
1368    RejectWithInvalidMatcher(RuleId),
1369    #[error("routine forms a cycle {0:?}")]
1370    CyclicalRoutineGraph(RoutineId),
1371    #[error("invalid change was pushed: {0:?}")]
1372    ErrorOnChange(Vec<(Change, ChangeCommitError)>),
1373    #[error("unknown FIDL type: {0}")]
1374    FidlConversion(#[from] FidlConversionError),
1375}
1376
1377/// Extension type for [`fnet_filter::Change`].
1378#[derive(Debug, Clone, PartialEq)]
1379pub enum Change {
1380    Create(Resource),
1381    Remove(ResourceId),
1382}
1383
1384impl From<Change> for fnet_filter::Change {
1385    fn from(change: Change) -> Self {
1386        match change {
1387            Change::Create(resource) => Self::Create(resource.into()),
1388            Change::Remove(resource) => Self::Remove(resource.into()),
1389        }
1390    }
1391}
1392
1393impl TryFrom<fnet_filter::Change> for Change {
1394    type Error = FidlConversionError;
1395
1396    fn try_from(change: fnet_filter::Change) -> Result<Self, Self::Error> {
1397        match change {
1398            fnet_filter::Change::Create(resource) => Ok(Self::Create(resource.try_into()?)),
1399            fnet_filter::Change::Remove(resource) => Ok(Self::Remove(resource.try_into()?)),
1400            fnet_filter::Change::__SourceBreaking { .. } => {
1401                Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
1402            }
1403        }
1404    }
1405}
1406
1407/// A controller for filtering state.
1408pub struct Controller {
1409    controller: fnet_filter::NamespaceControllerProxy,
1410    // The client provides an ID when creating a new controller, but the server
1411    // may need to assign a different ID to avoid conflicts; either way, the
1412    // server informs the client of the final `ControllerId` on creation.
1413    id: ControllerId,
1414    // Changes that have been pushed to the server but not yet committed. This
1415    // allows the `Controller` to report more informative errors by correlating
1416    // error codes with particular changes.
1417    pending_changes: Vec<Change>,
1418}
1419
1420impl Controller {
1421    pub async fn new_root(
1422        root: &fnet_root::FilterProxy,
1423        ControllerId(id): &ControllerId,
1424    ) -> Result<Self, ControllerCreationError> {
1425        let (controller, server_end) = fidl::endpoints::create_proxy();
1426        root.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1427
1428        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1429            .take_event_stream()
1430            .next()
1431            .await
1432            .ok_or(ControllerCreationError::NoIdAssigned)?
1433            .map_err(ControllerCreationError::IdAssignment)?;
1434        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1435    }
1436
1437    /// Creates a new `Controller`.
1438    ///
1439    /// Note that the provided `ControllerId` may need to be modified server-
1440    /// side to avoid collisions; to obtain the final ID assigned to the
1441    /// `Controller`, use the `id` method.
1442    pub async fn new(
1443        control: &fnet_filter::ControlProxy,
1444        ControllerId(id): &ControllerId,
1445    ) -> Result<Self, ControllerCreationError> {
1446        let (controller, server_end) = fidl::endpoints::create_proxy();
1447        control.open_controller(id, server_end).map_err(ControllerCreationError::OpenController)?;
1448
1449        let fnet_filter::NamespaceControllerEvent::OnIdAssigned { id } = controller
1450            .take_event_stream()
1451            .next()
1452            .await
1453            .ok_or(ControllerCreationError::NoIdAssigned)?
1454            .map_err(ControllerCreationError::IdAssignment)?;
1455        Ok(Self { controller, id: ControllerId(id), pending_changes: Vec::new() })
1456    }
1457
1458    pub fn id(&self) -> &ControllerId {
1459        &self.id
1460    }
1461
1462    pub async fn register_ebpf_program(
1463        &mut self,
1464        handle: febpf::ProgramHandle,
1465        program: febpf::VerifiedProgram,
1466    ) -> Result<(), RegisterEbpfProgramError> {
1467        self.controller
1468            .register_ebpf_program(handle, program)
1469            .await
1470            .map_err(RegisterEbpfProgramError::CallMethod)?
1471            .map_err(RegisterEbpfProgramError::from)
1472    }
1473
1474    pub async fn push_changes(&mut self, changes: Vec<Change>) -> Result<(), PushChangesError> {
1475        let fidl_changes = changes.iter().cloned().map(Into::into).collect::<Vec<_>>();
1476        let result = self
1477            .controller
1478            .push_changes(&fidl_changes)
1479            .await
1480            .map_err(PushChangesError::CallMethod)?;
1481        handle_change_validation_result(result, &changes)?;
1482        // Maintain a client-side copy of the pending changes we've pushed to
1483        // the server in order to provide better error messages if a commit
1484        // fails.
1485        self.pending_changes.extend(changes);
1486        Ok(())
1487    }
1488
1489    async fn commit_with_options(
1490        &mut self,
1491        options: fnet_filter::CommitOptions,
1492    ) -> Result<(), CommitError> {
1493        let committed_changes = std::mem::take(&mut self.pending_changes);
1494        let result = self.controller.commit(options).await.map_err(CommitError::CallMethod)?;
1495        handle_commit_result(result, committed_changes)
1496    }
1497
1498    pub async fn commit(&mut self) -> Result<(), CommitError> {
1499        self.commit_with_options(fnet_filter::CommitOptions::default()).await
1500    }
1501
1502    pub async fn commit_idempotent(&mut self) -> Result<(), CommitError> {
1503        self.commit_with_options(fnet_filter::CommitOptions {
1504            idempotent: Some(true),
1505            __source_breaking: SourceBreaking,
1506        })
1507        .await
1508    }
1509}
1510
1511pub(crate) fn handle_change_validation_result(
1512    change_validation_result: fnet_filter::ChangeValidationResult,
1513    changes: &Vec<Change>,
1514) -> Result<(), PushChangesError> {
1515    match change_validation_result {
1516        fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}) => Ok(()),
1517        fnet_filter::ChangeValidationResult::TooManyChanges(fnet_filter::Empty {}) => {
1518            Err(PushChangesError::TooManyChanges)
1519        }
1520        fnet_filter::ChangeValidationResult::ErrorOnChange(results) => {
1521            let errors: Result<_, PushChangesError> =
1522                changes.iter().zip(results).try_fold(Vec::new(), |mut errors, (change, result)| {
1523                    match result {
1524                        fnet_filter::ChangeValidationError::Ok
1525                        | fnet_filter::ChangeValidationError::NotReached => Ok(errors),
1526                        error @ (fnet_filter::ChangeValidationError::MissingRequiredField
1527                        | fnet_filter::ChangeValidationError::InvalidInterfaceMatcher
1528                        | fnet_filter::ChangeValidationError::InvalidAddressMatcher
1529                        | fnet_filter::ChangeValidationError::InvalidPortMatcher
1530                        | fnet_filter::ChangeValidationError::InvalidTransparentProxyAction
1531                        | fnet_filter::ChangeValidationError::InvalidNatAction
1532                        | fnet_filter::ChangeValidationError::InvalidPortRange) => {
1533                            let error = error
1534                                .try_into()
1535                                .expect("`Ok` and `NotReached` are handled in another arm");
1536                            errors.push((change.clone(), error));
1537                            Ok(errors)
1538                        }
1539                        fnet_filter::ChangeValidationError::__SourceBreaking { .. } => {
1540                            Err(FidlConversionError::UnknownUnionVariant(
1541                                type_names::CHANGE_VALIDATION_ERROR,
1542                            )
1543                            .into())
1544                        }
1545                    }
1546                });
1547            Err(PushChangesError::ErrorOnChange(errors?))
1548        }
1549        fnet_filter::ChangeValidationResult::__SourceBreaking { .. } => {
1550            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE_VALIDATION_RESULT)
1551                .into())
1552        }
1553    }
1554}
1555
1556pub(crate) fn handle_commit_result(
1557    commit_result: fnet_filter::CommitResult,
1558    committed_changes: Vec<Change>,
1559) -> Result<(), CommitError> {
1560    match commit_result {
1561        fnet_filter::CommitResult::Ok(fnet_filter::Empty {}) => Ok(()),
1562        fnet_filter::CommitResult::RuleWithInvalidMatcher(rule_id) => {
1563            Err(CommitError::RuleWithInvalidMatcher(rule_id.into()))
1564        }
1565        fnet_filter::CommitResult::RuleWithInvalidAction(rule_id) => {
1566            Err(CommitError::RuleWithInvalidAction(rule_id.into()))
1567        }
1568        fnet_filter::CommitResult::TransparentProxyWithInvalidMatcher(rule_id) => {
1569            Err(CommitError::TransparentProxyWithInvalidMatcher(rule_id.into()))
1570        }
1571        fnet_filter::CommitResult::RedirectWithInvalidMatcher(rule_id) => {
1572            Err(CommitError::RedirectWithInvalidMatcher(rule_id.into()))
1573        }
1574        fnet_filter::CommitResult::MasqueradeWithInvalidMatcher(rule_id) => {
1575            Err(CommitError::MasqueradeWithInvalidMatcher(rule_id.into()))
1576        }
1577        fnet_filter::CommitResult::RejectWithInvalidMatcher(rule_id) => {
1578            Err(CommitError::RejectWithInvalidMatcher(rule_id.into()))
1579        }
1580        fnet_filter::CommitResult::CyclicalRoutineGraph(routine_id) => {
1581            Err(CommitError::CyclicalRoutineGraph(routine_id.into()))
1582        }
1583        fnet_filter::CommitResult::ErrorOnChange(results) => {
1584            let errors: Result<_, CommitError> = committed_changes
1585                .into_iter()
1586                .zip(results)
1587                .try_fold(Vec::new(), |mut errors, (change, result)| match result {
1588                    fnet_filter::CommitError::Ok | fnet_filter::CommitError::NotReached => {
1589                        Ok(errors)
1590                    }
1591                    error @ (fnet_filter::CommitError::NamespaceNotFound
1592                    | fnet_filter::CommitError::RoutineNotFound
1593                    | fnet_filter::CommitError::RuleNotFound
1594                    | fnet_filter::CommitError::AlreadyExists
1595                    | fnet_filter::CommitError::TargetRoutineIsInstalled
1596                    | fnet_filter::CommitError::InvalidEbpfProgramId) => {
1597                        let error = error
1598                            .try_into()
1599                            .expect("`Ok` and `NotReached` are handled in another arm");
1600                        errors.push((change, error));
1601                        Ok(errors)
1602                    }
1603                    fnet_filter::CommitError::__SourceBreaking { .. } => {
1604                        Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_ERROR)
1605                            .into())
1606                    }
1607                });
1608            Err(CommitError::ErrorOnChange(errors?))
1609        }
1610        fnet_filter::CommitResult::__SourceBreaking { .. } => {
1611            Err(FidlConversionError::UnknownUnionVariant(type_names::COMMIT_RESULT).into())
1612        }
1613    }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618
1619    use assert_matches::assert_matches;
1620    use fidl_fuchsia_net_matchers as fnet_matchers;
1621    use futures::channel::mpsc;
1622    use futures::task::Poll;
1623    use futures::{FutureExt as _, SinkExt as _};
1624    use test_case::test_case;
1625
1626    use {
1627        fidl_fuchsia_hardware_network as fhardware_network,
1628        fidl_fuchsia_net_interfaces as fnet_interfaces,
1629    };
1630
1631    use super::*;
1632
1633    #[test_case(
1634        fnet_filter::ResourceId::Namespace(String::from("namespace")),
1635        ResourceId::Namespace(NamespaceId(String::from("namespace")));
1636        "NamespaceId"
1637    )]
1638    #[test_case(fnet_filter::Domain::Ipv4, Domain::Ipv4; "Domain")]
1639    #[test_case(
1640        fnet_filter::Namespace {
1641            id: Some(String::from("namespace")),
1642            domain: Some(fnet_filter::Domain::Ipv4),
1643            ..Default::default()
1644        },
1645        Namespace { id: NamespaceId(String::from("namespace")), domain: Domain::Ipv4 };
1646        "Namespace"
1647    )]
1648    #[test_case(fnet_filter::IpInstallationHook::Egress, IpHook::Egress; "IpHook")]
1649    #[test_case(fnet_filter::NatInstallationHook::Egress, NatHook::Egress; "NatHook")]
1650    #[test_case(
1651        fnet_filter::InstalledIpRoutine {
1652            hook: Some(fnet_filter::IpInstallationHook::Egress),
1653            priority: Some(1),
1654            ..Default::default()
1655        },
1656        InstalledIpRoutine {
1657            hook: IpHook::Egress,
1658            priority: 1,
1659        };
1660        "InstalledIpRoutine"
1661    )]
1662    #[test_case(
1663        fnet_filter::RoutineType::Ip(fnet_filter::IpRoutine {
1664            installation: Some(fnet_filter::InstalledIpRoutine {
1665                hook: Some(fnet_filter::IpInstallationHook::LocalEgress),
1666                priority: Some(1),
1667                ..Default::default()
1668            }),
1669            ..Default::default()
1670        }),
1671        RoutineType::Ip(Some(InstalledIpRoutine { hook: IpHook::LocalEgress, priority: 1 }));
1672        "RoutineType"
1673    )]
1674    #[test_case(
1675        fnet_filter::Routine {
1676            id: Some(fnet_filter::RoutineId {
1677                namespace: String::from("namespace"),
1678                name: String::from("routine"),
1679            }),
1680            type_: Some(fnet_filter::RoutineType::Nat(fnet_filter::NatRoutine::default())),
1681            ..Default::default()
1682        },
1683        Routine {
1684            id: RoutineId {
1685                namespace: NamespaceId(String::from("namespace")),
1686                name: String::from("routine"),
1687            },
1688            routine_type: RoutineType::Nat(None),
1689        };
1690        "Routine"
1691    )]
1692    #[test_case(
1693        fnet_filter::Matchers {
1694            in_interface: Some(fnet_matchers::Interface::Name(String::from("wlan"))),
1695            transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Tcp(fnet_matchers::TcpPacket {
1696                src_port: None,
1697                dst_port: Some(fnet_matchers::Port { start: 22, end: 22, invert: false }),
1698                ..Default::default()
1699            })),
1700            ..Default::default()
1701        },
1702        Matchers {
1703            in_interface: Some(fnet_matchers_ext::Interface::Name(String::from("wlan"))),
1704            transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Tcp {
1705                src_port: None,
1706                dst_port: Some(fnet_matchers_ext::Port::new(22, 22, false).unwrap()),
1707            }),
1708            ..Default::default()
1709        };
1710        "Matchers"
1711    )]
1712    #[test_case(
1713        fnet_filter::Action::Accept(fnet_filter::Empty {}),
1714        Action::Accept;
1715        "Action"
1716    )]
1717    #[test_case(
1718        fnet_filter::Rule {
1719            id: fnet_filter::RuleId {
1720                routine: fnet_filter::RoutineId {
1721                    namespace: String::from("namespace"),
1722                    name: String::from("routine"),
1723                },
1724                index: 1,
1725            },
1726            matchers: fnet_filter::Matchers {
1727                transport_protocol: Some(fnet_matchers::PacketTransportProtocol::Icmp(
1728                    fnet_matchers::IcmpPacket::default()
1729                )),
1730                ..Default::default()
1731            },
1732            action: fnet_filter::Action::Drop(fnet_filter::Empty {}),
1733        },
1734        Rule {
1735            id: RuleId {
1736                routine: RoutineId {
1737                    namespace: NamespaceId(String::from("namespace")),
1738                    name: String::from("routine"),
1739                },
1740                index: 1,
1741            },
1742            matchers: Matchers {
1743                transport_protocol: Some(fnet_matchers_ext::TransportProtocol::Icmp),
1744                ..Default::default()
1745            },
1746            action: Action::Drop,
1747        };
1748        "Rule"
1749    )]
1750    #[test_case(
1751        fnet_filter::Resource::Namespace(fnet_filter::Namespace {
1752            id: Some(String::from("namespace")),
1753            domain: Some(fnet_filter::Domain::Ipv4),
1754            ..Default::default()
1755        }),
1756        Resource::Namespace(Namespace {
1757            id: NamespaceId(String::from("namespace")),
1758            domain: Domain::Ipv4
1759        });
1760        "Resource"
1761    )]
1762    #[test_case(
1763        fnet_filter::Event::EndOfUpdate(fnet_filter::Empty {}),
1764        Event::EndOfUpdate;
1765        "Event"
1766    )]
1767    #[test_case(
1768        fnet_filter::Change::Remove(fnet_filter::ResourceId::Namespace(String::from("namespace"))),
1769        Change::Remove(ResourceId::Namespace(NamespaceId(String::from("namespace"))));
1770        "Change"
1771    )]
1772    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
1773    where
1774        E: TryFrom<F> + Clone + Debug + PartialEq,
1775        <E as TryFrom<F>>::Error: Debug + PartialEq,
1776        F: From<E> + Clone + Debug + PartialEq,
1777    {
1778        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
1779        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type.clone());
1780    }
1781
1782    #[test]
1783    fn resource_id_try_from_unknown_variant() {
1784        assert_eq!(
1785            ResourceId::try_from(fnet_filter::ResourceId::__SourceBreaking { unknown_ordinal: 0 }),
1786            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE_ID))
1787        );
1788    }
1789
1790    #[test]
1791    fn domain_try_from_unknown_variant() {
1792        assert_eq!(
1793            Domain::try_from(fnet_filter::Domain::__SourceBreaking { unknown_ordinal: 0 }),
1794            Err(FidlConversionError::UnknownUnionVariant(type_names::DOMAIN))
1795        );
1796    }
1797
1798    #[test]
1799    fn namespace_try_from_missing_properties() {
1800        assert_eq!(
1801            Namespace::try_from(fnet_filter::Namespace {
1802                id: None,
1803                domain: Some(fnet_filter::Domain::Ipv4),
1804                ..Default::default()
1805            }),
1806            Err(FidlConversionError::MissingNamespaceId)
1807        );
1808        assert_eq!(
1809            Namespace::try_from(fnet_filter::Namespace {
1810                id: Some(String::from("namespace")),
1811                domain: None,
1812                ..Default::default()
1813            }),
1814            Err(FidlConversionError::MissingNamespaceDomain)
1815        );
1816    }
1817
1818    #[test]
1819    fn ip_installation_hook_try_from_unknown_variant() {
1820        assert_eq!(
1821            IpHook::try_from(fnet_filter::IpInstallationHook::__SourceBreaking {
1822                unknown_ordinal: 0
1823            }),
1824            Err(FidlConversionError::UnknownUnionVariant(type_names::IP_INSTALLATION_HOOK))
1825        );
1826    }
1827
1828    #[test]
1829    fn nat_installation_hook_try_from_unknown_variant() {
1830        assert_eq!(
1831            NatHook::try_from(fnet_filter::NatInstallationHook::__SourceBreaking {
1832                unknown_ordinal: 0
1833            }),
1834            Err(FidlConversionError::UnknownUnionVariant(type_names::NAT_INSTALLATION_HOOK))
1835        );
1836    }
1837
1838    #[test]
1839    fn installed_ip_routine_try_from_missing_hook() {
1840        assert_eq!(
1841            InstalledIpRoutine::try_from(fnet_filter::InstalledIpRoutine {
1842                hook: None,
1843                ..Default::default()
1844            }),
1845            Err(FidlConversionError::MissingIpInstallationHook)
1846        );
1847    }
1848
1849    #[test]
1850    fn installed_nat_routine_try_from_missing_hook() {
1851        assert_eq!(
1852            InstalledNatRoutine::try_from(fnet_filter::InstalledNatRoutine {
1853                hook: None,
1854                ..Default::default()
1855            }),
1856            Err(FidlConversionError::MissingNatInstallationHook)
1857        );
1858    }
1859
1860    #[test]
1861    fn routine_type_try_from_unknown_variant() {
1862        assert_eq!(
1863            RoutineType::try_from(fnet_filter::RoutineType::__SourceBreaking {
1864                unknown_ordinal: 0
1865            }),
1866            Err(FidlConversionError::UnknownUnionVariant(type_names::ROUTINE_TYPE))
1867        );
1868    }
1869
1870    #[test]
1871    fn routine_try_from_missing_properties() {
1872        assert_eq!(
1873            Routine::try_from(fnet_filter::Routine { id: None, ..Default::default() }),
1874            Err(FidlConversionError::MissingRoutineId)
1875        );
1876        assert_eq!(
1877            Routine::try_from(fnet_filter::Routine {
1878                id: Some(fnet_filter::RoutineId {
1879                    namespace: String::from("namespace"),
1880                    name: String::from("routine"),
1881                }),
1882                type_: None,
1883                ..Default::default()
1884            }),
1885            Err(FidlConversionError::MissingRoutineType)
1886        );
1887    }
1888
1889    #[test_case(
1890        fnet_matchers_ext::PortError::InvalidPortRange =>
1891        FidlConversionError::InvalidPortMatcherRange
1892    )]
1893    #[test_case(
1894        fnet_matchers_ext::InterfaceError::ZeroId =>
1895        FidlConversionError::ZeroInterfaceId
1896    )]
1897    #[test_case(
1898        fnet_matchers_ext::InterfaceError::UnknownUnionVariant =>
1899        FidlConversionError::UnknownUnionVariant(type_names::INTERFACE_MATCHER)
1900    )]
1901    #[test_case(
1902        {
1903            let invalid_port_class = fnet_interfaces::PortClass::__SourceBreaking {
1904                unknown_ordinal: 0
1905            };
1906            let error = fnet_interfaces_ext::PortClass::try_from(
1907                invalid_port_class
1908            ).unwrap_err();
1909            fnet_matchers_ext::InterfaceError::UnknownPortClass(error)
1910        } =>
1911        FidlConversionError::UnknownUnionVariant(type_names::NET_INTERFACES_PORT_CLASS)
1912    )]
1913    #[test_case(
1914        {
1915            let invalid_port_class = fhardware_network::PortClass::__SourceBreaking {
1916                unknown_ordinal: 0
1917            };
1918            let error = fnet_interfaces_ext::PortClass::try_from(
1919                invalid_port_class
1920            ).unwrap_err();
1921            fnet_matchers_ext::InterfaceError::UnknownPortClass(
1922                fnet_interfaces_ext::UnknownPortClassError::HardwareNetwork(error))
1923        } =>
1924        FidlConversionError::UnknownUnionVariant(type_names::HARDWARE_NETWORK_PORT_CLASS)
1925    )]
1926    #[test_case(
1927        fnet_matchers_ext::SubnetError::PrefixTooLong =>
1928        FidlConversionError::SubnetPrefixTooLong
1929    )]
1930    #[test_case(
1931        fnet_matchers_ext::SubnetError::HostBitsSet =>
1932        FidlConversionError::SubnetHostBitsSet
1933    )]
1934    #[test_case(
1935        fnet_matchers_ext::AddressRangeError::Invalid =>
1936        FidlConversionError::InvalidAddressRange
1937    )]
1938    #[test_case(
1939        fnet_matchers_ext::AddressRangeError::FamilyMismatch =>
1940        FidlConversionError::AddressRangeFamilyMismatch
1941    )]
1942    #[test_case(
1943        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1944            fnet_matchers_ext::SubnetError::PrefixTooLong) =>
1945        FidlConversionError::SubnetPrefixTooLong
1946    )]
1947    #[test_case(
1948        fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1949            fnet_matchers_ext::SubnetError::HostBitsSet) =>
1950        FidlConversionError::SubnetHostBitsSet
1951    )]
1952    #[test_case(
1953        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1954            fnet_matchers_ext::AddressRangeError::Invalid) =>
1955        FidlConversionError::InvalidAddressRange
1956    )]
1957    #[test_case(
1958        fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1959            fnet_matchers_ext::AddressRangeError::FamilyMismatch) =>
1960        FidlConversionError::AddressRangeFamilyMismatch
1961    )]
1962    #[test_case(
1963        fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant =>
1964        FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1965    )]
1966    #[test_case(
1967        fnet_matchers_ext::AddressError::AddressMatcherType(
1968            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1969                fnet_matchers_ext::SubnetError::PrefixTooLong)) =>
1970        FidlConversionError::SubnetPrefixTooLong
1971    )]
1972    #[test_case(
1973        fnet_matchers_ext::AddressError::AddressMatcherType(
1974            fnet_matchers_ext::AddressMatcherTypeError::Subnet(
1975                fnet_matchers_ext::SubnetError::HostBitsSet)) =>
1976        FidlConversionError::SubnetHostBitsSet
1977    )]
1978    #[test_case(
1979        fnet_matchers_ext::AddressError::AddressMatcherType(
1980            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1981                fnet_matchers_ext::AddressRangeError::Invalid)) =>
1982        FidlConversionError::InvalidAddressRange
1983    )]
1984    #[test_case(
1985        fnet_matchers_ext::AddressError::AddressMatcherType(
1986            fnet_matchers_ext::AddressMatcherTypeError::AddressRange(
1987                fnet_matchers_ext::AddressRangeError::FamilyMismatch)) =>
1988        FidlConversionError::AddressRangeFamilyMismatch
1989    )]
1990    #[test_case(
1991        fnet_matchers_ext::AddressError::AddressMatcherType(
1992            fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant) =>
1993            FidlConversionError::UnknownUnionVariant(type_names::ADDRESS_MATCHER_TYPE)
1994    )]
1995    #[test_case(
1996        fnet_matchers_ext::TransportProtocolError::Port(
1997            fnet_matchers_ext::PortError::InvalidPortRange) =>
1998        FidlConversionError::InvalidPortMatcherRange
1999    )]
2000    #[test_case(
2001        fnet_matchers_ext::TransportProtocolError::UnknownUnionVariant =>
2002            FidlConversionError::UnknownUnionVariant(type_names::TRANSPORT_PROTOCOL)
2003    )]
2004    fn fidl_error_from_matcher_error<E: Into<FidlConversionError>>(
2005        error: E,
2006    ) -> FidlConversionError {
2007        error.into()
2008    }
2009
2010    #[test]
2011    fn action_try_from_unknown_variant() {
2012        assert_eq!(
2013            Action::try_from(fnet_filter::Action::__SourceBreaking { unknown_ordinal: 0 }),
2014            Err(FidlConversionError::UnknownUnionVariant(type_names::ACTION))
2015        );
2016    }
2017
2018    #[test]
2019    fn resource_try_from_unknown_variant() {
2020        assert_eq!(
2021            Resource::try_from(fnet_filter::Resource::__SourceBreaking { unknown_ordinal: 0 }),
2022            Err(FidlConversionError::UnknownUnionVariant(type_names::RESOURCE))
2023        );
2024    }
2025
2026    #[test]
2027    fn event_try_from_unknown_variant() {
2028        assert_eq!(
2029            Event::try_from(fnet_filter::Event::__SourceBreaking { unknown_ordinal: 0 }),
2030            Err(FidlConversionError::UnknownUnionVariant(type_names::EVENT))
2031        );
2032    }
2033
2034    #[test]
2035    fn change_try_from_unknown_variant() {
2036        assert_eq!(
2037            Change::try_from(fnet_filter::Change::__SourceBreaking { unknown_ordinal: 0 }),
2038            Err(FidlConversionError::UnknownUnionVariant(type_names::CHANGE))
2039        );
2040    }
2041
2042    fn test_controller_a() -> ControllerId {
2043        ControllerId(String::from("test-controller-a"))
2044    }
2045
2046    fn test_controller_b() -> ControllerId {
2047        ControllerId(String::from("test-controller-b"))
2048    }
2049
2050    pub(crate) fn test_resource_id() -> ResourceId {
2051        ResourceId::Namespace(NamespaceId(String::from("test-namespace")))
2052    }
2053
2054    pub(crate) fn test_resource() -> Resource {
2055        Resource::Namespace(Namespace {
2056            id: NamespaceId(String::from("test-namespace")),
2057            domain: Domain::AllIp,
2058        })
2059    }
2060
2061    // We can't easily create an invalid resource, so we just pretend and fake
2062    // the server response in tests.
2063    pub(crate) fn pretend_invalid_resource() -> Resource {
2064        Resource::Namespace(Namespace {
2065            id: NamespaceId(String::from("pretend-invalid-namespace")),
2066            domain: Domain::AllIp,
2067        })
2068    }
2069
2070    pub(crate) fn unknown_resource_id() -> ResourceId {
2071        ResourceId::Namespace(NamespaceId(String::from("does-not-exist")))
2072    }
2073
2074    #[fuchsia_async::run_singlethreaded(test)]
2075    async fn event_stream_from_state_conversion_error() {
2076        let (proxy, mut request_stream) =
2077            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2078        let stream = event_stream_from_state(proxy).expect("get event stream");
2079        futures::pin_mut!(stream);
2080
2081        let send_invalid_event = async {
2082            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2083                request_stream
2084                    .next()
2085                    .await
2086                    .expect("client should call state")
2087                    .expect("request should not error");
2088            let fnet_filter::WatcherRequest::Watch { responder } = request
2089                .into_stream()
2090                .next()
2091                .await
2092                .expect("client should call watch")
2093                .expect("request should not error");
2094            responder
2095                .send(&[fnet_filter::Event::Added(fnet_filter::AddedResource {
2096                    controller: String::from("controller"),
2097                    resource: fnet_filter::Resource::Namespace(fnet_filter::Namespace {
2098                        id: None,
2099                        domain: None,
2100                        ..Default::default()
2101                    }),
2102                })])
2103                .expect("send batch with invalid event");
2104        };
2105        let ((), result) = futures::future::join(send_invalid_event, stream.next()).await;
2106        assert_matches!(
2107            result,
2108            Some(Err(WatchError::Conversion(FidlConversionError::MissingNamespaceId)))
2109        );
2110    }
2111
2112    #[fuchsia_async::run_singlethreaded(test)]
2113    async fn event_stream_from_state_empty_event_batch() {
2114        let (proxy, mut request_stream) =
2115            fidl::endpoints::create_proxy_and_stream::<fnet_filter::StateMarker>();
2116        let stream = event_stream_from_state(proxy).expect("get event stream");
2117        futures::pin_mut!(stream);
2118
2119        let send_empty_batch = async {
2120            let fnet_filter::StateRequest::GetWatcher { options: _, request, control_handle: _ } =
2121                request_stream
2122                    .next()
2123                    .await
2124                    .expect("client should call state")
2125                    .expect("request should not error");
2126            let fnet_filter::WatcherRequest::Watch { responder } = request
2127                .into_stream()
2128                .next()
2129                .await
2130                .expect("client should call watch")
2131                .expect("request should not error");
2132            responder.send(&[]).expect("send empty batch");
2133        };
2134        let ((), result) = futures::future::join(send_empty_batch, stream.next()).await;
2135        assert_matches!(result, Some(Err(WatchError::EmptyEventBatch)));
2136    }
2137
2138    #[fuchsia_async::run_singlethreaded(test)]
2139    async fn get_existing_resources_success() {
2140        let event_stream = futures::stream::iter([
2141            Ok(Event::Existing(test_controller_a(), test_resource())),
2142            Ok(Event::Existing(test_controller_b(), test_resource())),
2143            Ok(Event::Idle),
2144            Ok(Event::Removed(test_controller_a(), test_resource_id())),
2145        ]);
2146        futures::pin_mut!(event_stream);
2147
2148        let existing = get_existing_resources::<HashMap<_, _>>(event_stream.by_ref())
2149            .await
2150            .expect("get existing resources");
2151        assert_eq!(
2152            existing,
2153            HashMap::from([
2154                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2155                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2156            ])
2157        );
2158
2159        let trailing_events = event_stream.collect::<Vec<_>>().await;
2160        assert_matches!(
2161            &trailing_events[..],
2162            [Ok(Event::Removed(controller, resource))] if controller == &test_controller_a() &&
2163                                                           resource == &test_resource_id()
2164        );
2165    }
2166
2167    #[fuchsia_async::run_singlethreaded(test)]
2168    async fn get_existing_resources_error_in_stream() {
2169        let event_stream =
2170            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2171        futures::pin_mut!(event_stream);
2172        assert_matches!(
2173            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2174            Err(GetExistingResourcesError::ErrorInStream(WatchError::EmptyEventBatch))
2175        )
2176    }
2177
2178    #[fuchsia_async::run_singlethreaded(test)]
2179    async fn get_existing_resources_unexpected_event() {
2180        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::EndOfUpdate)));
2181        futures::pin_mut!(event_stream);
2182        assert_matches!(
2183            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2184            Err(GetExistingResourcesError::UnexpectedEvent(Event::EndOfUpdate))
2185        )
2186    }
2187
2188    #[fuchsia_async::run_singlethreaded(test)]
2189    async fn get_existing_resources_duplicate_resource() {
2190        let event_stream = futures::stream::iter([
2191            Ok(Event::Existing(test_controller_a(), test_resource())),
2192            Ok(Event::Existing(test_controller_a(), test_resource())),
2193        ]);
2194        futures::pin_mut!(event_stream);
2195        assert_matches!(
2196            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2197            Err(GetExistingResourcesError::DuplicateResource(resource))
2198                if resource == test_resource()
2199        )
2200    }
2201
2202    #[fuchsia_async::run_singlethreaded(test)]
2203    async fn get_existing_resources_stream_ended() {
2204        let event_stream = futures::stream::once(futures::future::ready(Ok(Event::Existing(
2205            test_controller_a(),
2206            test_resource(),
2207        ))));
2208        futures::pin_mut!(event_stream);
2209        assert_matches!(
2210            get_existing_resources::<HashMap<_, _>>(event_stream).await,
2211            Err(GetExistingResourcesError::StreamEnded)
2212        )
2213    }
2214
2215    #[fuchsia_async::run_singlethreaded(test)]
2216    async fn wait_for_condition_add_remove() {
2217        let mut state = HashMap::new();
2218
2219        // Verify that checking for the presence of a resource blocks until the
2220        // resource is added.
2221        let has_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2222            resources.get(&test_controller_a()).map_or(false, |controller| {
2223                controller
2224                    .get(&test_resource_id())
2225                    .map_or(false, |resource| resource == &test_resource())
2226            })
2227        };
2228        assert_matches!(
2229            wait_for_condition(futures::stream::pending(), &mut state, has_resource).now_or_never(),
2230            None
2231        );
2232        assert!(state.is_empty());
2233        assert_matches!(
2234            wait_for_condition(
2235                futures::stream::iter([
2236                    Ok(Event::Added(test_controller_b(), test_resource())),
2237                    Ok(Event::EndOfUpdate),
2238                    Ok(Event::Added(test_controller_a(), test_resource())),
2239                    Ok(Event::EndOfUpdate),
2240                ]),
2241                &mut state,
2242                has_resource
2243            )
2244            .now_or_never(),
2245            Some(Ok(()))
2246        );
2247        assert_eq!(
2248            state,
2249            HashMap::from([
2250                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2251                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2252            ])
2253        );
2254
2255        // Re-add the resource and observe an error.
2256        assert_matches!(
2257            wait_for_condition(
2258                futures::stream::iter([
2259                    Ok(Event::Added(test_controller_a(), test_resource())),
2260                    Ok(Event::EndOfUpdate),
2261                ]),
2262                &mut state,
2263                has_resource
2264            )
2265            .now_or_never(),
2266            Some(Err(WaitForConditionError::AddedAlreadyExisting(r))) if r == test_resource()
2267        );
2268        assert_eq!(
2269            state,
2270            HashMap::from([
2271                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2272                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2273            ])
2274        );
2275
2276        // Verify that checking for the absence of a resource blocks until the
2277        // resource is removed.
2278        let does_not_have_resource = |resources: &HashMap<_, HashMap<_, _>>| {
2279            resources.get(&test_controller_a()).map_or(false, |controller| controller.is_empty())
2280        };
2281        assert_matches!(
2282            wait_for_condition(futures::stream::pending(), &mut state, does_not_have_resource)
2283                .now_or_never(),
2284            None
2285        );
2286        assert_eq!(
2287            state,
2288            HashMap::from([
2289                (test_controller_a(), HashMap::from([(test_resource_id(), test_resource())])),
2290                (test_controller_b(), HashMap::from([(test_resource_id(), test_resource())])),
2291            ])
2292        );
2293        assert_matches!(
2294            wait_for_condition(
2295                futures::stream::iter([
2296                    Ok(Event::Removed(test_controller_b(), test_resource_id())),
2297                    Ok(Event::EndOfUpdate),
2298                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2299                    Ok(Event::EndOfUpdate),
2300                ]),
2301                &mut state,
2302                does_not_have_resource
2303            )
2304            .now_or_never(),
2305            Some(Ok(()))
2306        );
2307        assert_eq!(
2308            state,
2309            HashMap::from([
2310                (test_controller_a(), HashMap::new()),
2311                (test_controller_b(), HashMap::new()),
2312            ])
2313        );
2314
2315        // Remove a non-existent resource and observe an error.
2316        assert_matches!(
2317            wait_for_condition(
2318                futures::stream::iter([
2319                    Ok(Event::Removed(test_controller_a(), test_resource_id())),
2320                    Ok(Event::EndOfUpdate),
2321                ]),
2322                &mut state,
2323                does_not_have_resource
2324            ).now_or_never(),
2325            Some(Err(WaitForConditionError::RemovedNonExistent(r))) if r == test_resource_id()
2326        );
2327        assert_eq!(
2328            state,
2329            HashMap::from([
2330                (test_controller_a(), HashMap::new()),
2331                (test_controller_b(), HashMap::new()),
2332            ])
2333        );
2334    }
2335
2336    #[test]
2337    fn predicate_not_tested_until_update_complete() {
2338        let mut state = HashMap::new();
2339        let (mut tx, rx) = mpsc::unbounded();
2340
2341        let wait = wait_for_condition(rx, &mut state, |state| !state.is_empty()).fuse();
2342        futures::pin_mut!(wait);
2343
2344        // Sending an `Added` event should *not* allow the wait operation to
2345        // complete, because the predicate should only be tested once the full
2346        // update has been observed.
2347        let mut exec = fuchsia_async::TestExecutor::new();
2348        exec.run_singlethreaded(async {
2349            tx.send(Ok(Event::Added(test_controller_a(), test_resource())))
2350                .await
2351                .expect("receiver should not be closed");
2352        });
2353        assert_matches!(exec.run_until_stalled(&mut wait), Poll::Pending);
2354
2355        exec.run_singlethreaded(async {
2356            tx.send(Ok(Event::EndOfUpdate)).await.expect("receiver should not be closed");
2357            wait.await.expect("condition should be satisfied once update is complete");
2358        });
2359    }
2360
2361    #[fuchsia_async::run_singlethreaded(test)]
2362    async fn wait_for_condition_error_in_stream() {
2363        let mut state = HashMap::new();
2364        let event_stream =
2365            futures::stream::once(futures::future::ready(Err(WatchError::EmptyEventBatch)));
2366        assert_matches!(
2367            wait_for_condition(event_stream, &mut state, |_| true).await,
2368            Err(WaitForConditionError::ErrorInStream(WatchError::EmptyEventBatch))
2369        );
2370        assert!(state.is_empty());
2371    }
2372
2373    #[fuchsia_async::run_singlethreaded(test)]
2374    async fn wait_for_condition_stream_ended() {
2375        let mut state = HashMap::new();
2376        let event_stream = futures::stream::empty();
2377        assert_matches!(
2378            wait_for_condition(event_stream, &mut state, |_| true).await,
2379            Err(WaitForConditionError::StreamEnded)
2380        );
2381        assert!(state.is_empty());
2382    }
2383
2384    pub(crate) async fn handle_open_controller(
2385        mut request_stream: fnet_filter::ControlRequestStream,
2386    ) -> fnet_filter::NamespaceControllerRequestStream {
2387        let (id, request, _control_handle) = request_stream
2388            .next()
2389            .await
2390            .expect("client should open controller")
2391            .expect("request should not error")
2392            .into_open_controller()
2393            .expect("client should open controller");
2394        let (stream, control_handle) = request.into_stream_and_control_handle();
2395        control_handle.send_on_id_assigned(&id).expect("send assigned ID");
2396
2397        stream
2398    }
2399
2400    pub(crate) async fn handle_push_changes(
2401        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2402        push_changes_result: fnet_filter::ChangeValidationResult,
2403    ) {
2404        let (_changes, responder) = stream
2405            .next()
2406            .await
2407            .expect("client should push changes")
2408            .expect("request should not error")
2409            .into_push_changes()
2410            .expect("client should push changes");
2411        responder.send(push_changes_result).expect("send empty batch");
2412    }
2413
2414    pub(crate) async fn handle_commit(
2415        stream: &mut fnet_filter::NamespaceControllerRequestStream,
2416        commit_result: fnet_filter::CommitResult,
2417    ) {
2418        let (_options, responder) = stream
2419            .next()
2420            .await
2421            .expect("client should commit")
2422            .expect("request should not error")
2423            .into_commit()
2424            .expect("client should commit");
2425        responder.send(commit_result).expect("send commit result");
2426    }
2427
2428    #[fuchsia_async::run_singlethreaded(test)]
2429    async fn controller_push_changes_reports_invalid_change() {
2430        let (control, request_stream) =
2431            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2432        let push_invalid_change = async {
2433            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2434                .await
2435                .expect("create controller");
2436            let result = controller
2437                .push_changes(vec![
2438                    Change::Create(test_resource()),
2439                    // We fake the server response to say this is invalid even
2440                    // though it really isn't.
2441                    Change::Create(pretend_invalid_resource()),
2442                    Change::Remove(test_resource_id()),
2443                ])
2444                .await;
2445            assert_matches!(
2446                result,
2447                Err(PushChangesError::ErrorOnChange(errors)) if errors == vec![(
2448                    Change::Create(pretend_invalid_resource()),
2449                    ChangeValidationError::InvalidPortMatcher
2450                )]
2451            );
2452        };
2453
2454        let handle_controller = async {
2455            let mut stream = handle_open_controller(request_stream).await;
2456            handle_push_changes(
2457                &mut stream,
2458                fnet_filter::ChangeValidationResult::ErrorOnChange(vec![
2459                    fnet_filter::ChangeValidationError::Ok,
2460                    fnet_filter::ChangeValidationError::InvalidPortMatcher,
2461                    fnet_filter::ChangeValidationError::NotReached,
2462                ]),
2463            )
2464            .await;
2465        };
2466
2467        let ((), ()) = futures::future::join(push_invalid_change, handle_controller).await;
2468    }
2469
2470    #[fuchsia_async::run_singlethreaded(test)]
2471    async fn controller_commit_reports_invalid_change() {
2472        let (control, request_stream) =
2473            fidl::endpoints::create_proxy_and_stream::<fnet_filter::ControlMarker>();
2474        let commit_invalid_change = async {
2475            let mut controller = Controller::new(&control, &ControllerId(String::from("test")))
2476                .await
2477                .expect("create controller");
2478            controller
2479                .push_changes(vec![
2480                    Change::Create(test_resource()),
2481                    Change::Remove(unknown_resource_id()),
2482                    Change::Remove(test_resource_id()),
2483                ])
2484                .await
2485                .expect("push changes");
2486            let result = controller.commit().await;
2487            assert_matches!(
2488                result,
2489                Err(CommitError::ErrorOnChange(errors)) if errors == vec![(
2490                    Change::Remove(unknown_resource_id()),
2491                    ChangeCommitError::NamespaceNotFound,
2492                )]
2493            );
2494        };
2495        let handle_controller = async {
2496            let mut stream = handle_open_controller(request_stream).await;
2497            handle_push_changes(
2498                &mut stream,
2499                fnet_filter::ChangeValidationResult::Ok(fnet_filter::Empty {}),
2500            )
2501            .await;
2502            handle_commit(
2503                &mut stream,
2504                fnet_filter::CommitResult::ErrorOnChange(vec![
2505                    fnet_filter::CommitError::Ok,
2506                    fnet_filter::CommitError::NamespaceNotFound,
2507                    fnet_filter::CommitError::Ok,
2508                ]),
2509            )
2510            .await;
2511        };
2512        let ((), ()) = futures::future::join(commit_invalid_change, handle_controller).await;
2513    }
2514}