socket_proxy/
registry.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implements fuchsia.net.policy.socketproxy.NetworkRegistry.
6
7use anyhow::{Context, Error, anyhow};
8use fidl::endpoints::{ControlHandle, RequestStream};
9use fidl_fuchsia_net_policy_socketproxy::{
10    self as fnp_socketproxy, FuchsiaNetworkInfo, FuchsiaNetworksRequest, Network,
11    NetworkDnsServers, NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
12    NetworkRegistrySetDefaultError, NetworkRegistryUpdateError, StarnixNetworksRequest,
13};
14use fuchsia_component::client::connect_to_protocol;
15use fuchsia_inspect_derive::{IValue, Inspect, Unit};
16use futures::channel::mpsc;
17use futures::lock::Mutex;
18use futures::{SinkExt as _, StreamExt as _, TryStreamExt as _};
19use log::{error, info, warn};
20use std::collections::HashMap;
21use std::sync::Arc;
22use thiserror::Error;
23
24use {
25    fidl_fuchsia_net as fnet, fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext,
26    fidl_fuchsia_posix_socket as fposix_socket,
27};
28
29/// RFC-1035ยง4.2 specifies port 53 (decimal) as the default port for DNS requests.
30const DEFAULT_DNS_PORT: u16 = 53;
31
32/// If there are networks registered, but no default has been set, this value
33/// will be used, otherwise the mark will be OptionalUint32::Unset(Empty).
34pub(crate) const DEFAULT_SOCKET_MARK: u32 = 0;
35
36pub(crate) struct RequestForwarder {
37    forwarder_rx: mpsc::Receiver<NetworkRegistryRequest>,
38    registry: fnp_socketproxy::NetworkRegistryProxy,
39}
40
41impl RequestForwarder {
42    pub(crate) fn new(
43        forwarder_rx: mpsc::Receiver<NetworkRegistryRequest>,
44    ) -> Result<Self, anyhow::Error> {
45        Ok(Self {
46            forwarder_rx,
47            registry: connect_to_protocol::<fnp_socketproxy::NetworkRegistryMarker>()
48                .context("error connecting to network registry")?,
49        })
50    }
51
52    pub(crate) async fn run(&mut self) -> Result<(), anyhow::Error> {
53        while let Some(request) = self.forwarder_rx.next().await {
54            match self.forward_request(request).await {
55                Err(e) => {
56                    info!(
57                        "FIDL error while forwarding request to delegated networks. \
58                            Protocol likely not available: {e:?}"
59                    );
60                    return Err(e);
61                }
62                Ok(Err(e)) => {
63                    error!(
64                        "Failed to forward request to delegated networks. Future updates \
65                                 will be ignored, and netcfg will not have complete state: {e:?}"
66                    );
67                    return Err(anyhow!("Delegated registry has inconsistent state: {e:?}"));
68                }
69                Ok(Ok(_)) => continue,
70            }
71        }
72        error!(
73            "RequestForwarder stream ended. No more requests will be \
74            forwarded to netcfg"
75        );
76        Ok(())
77    }
78
79    async fn forward_request(
80        &self,
81        request: NetworkRegistryRequest,
82    ) -> Result<Result<(), NetworkRegistryError>, anyhow::Error> {
83        info!("forwarding Starnix NetworkRegistry change to netcfg: {request:?}");
84        let res = match request {
85            NetworkRegistryRequest::SetDefault { network_id } => self
86                .registry
87                .set_default(&match network_id {
88                    Some(id) => fposix_socket::OptionalUint32::Value(id),
89                    None => fposix_socket::OptionalUint32::Unset(fposix_socket::Empty),
90                })
91                .await
92                .context("fidl error forwarding set_default")?
93                .map_err(|e| e.into()),
94            NetworkRegistryRequest::Add { network } => self
95                .registry
96                .add(&network)
97                .await
98                .context("fidl error forwarding add")?
99                .map_err(|e| e.into()),
100            NetworkRegistryRequest::Update { network } => self
101                .registry
102                .update(&network)
103                .await
104                .context("fidl error forwarding update")?
105                .map_err(|e| e.into()),
106            NetworkRegistryRequest::Remove { network_id } => self
107                .registry
108                .remove(network_id)
109                .await
110                .context("fidl error forwarding remove")?
111                .map_err(|e| e.into()),
112        };
113        Ok(res)
114    }
115}
116
117enum CommonErrors {
118    MissingNetworkId,
119    MissingNetworkInfo,
120    MissingNetworkDnsServers,
121}
122
123trait IpAddressExt {
124    fn to_dns_socket_address(self) -> fnet::SocketAddress;
125}
126
127impl<T: IpAddressExt + Copy> IpAddressExt for &T {
128    fn to_dns_socket_address(self) -> fnet::SocketAddress {
129        (*self).to_dns_socket_address()
130    }
131}
132
133impl IpAddressExt for fnet::Ipv4Address {
134    fn to_dns_socket_address(self) -> fnet::SocketAddress {
135        fnet::SocketAddress::Ipv4(fnet::Ipv4SocketAddress { address: self, port: DEFAULT_DNS_PORT })
136    }
137}
138
139impl IpAddressExt for fnet::Ipv6Address {
140    fn to_dns_socket_address(self) -> fnet::SocketAddress {
141        fnet::SocketAddress::Ipv6(fnet::Ipv6SocketAddress {
142            address: self,
143            port: DEFAULT_DNS_PORT,
144            zone_index: 0,
145        })
146    }
147}
148
149trait IntoOptionalUint32 {
150    fn into_optional_uint32(self) -> fposix_socket::OptionalUint32;
151}
152
153impl IntoOptionalUint32 for Option<u32> {
154    fn into_optional_uint32(self) -> fposix_socket::OptionalUint32 {
155        match self {
156            Some(value) => fposix_socket::OptionalUint32::Value(value),
157            None => fposix_socket::OptionalUint32::Unset(fposix_socket::Empty),
158        }
159    }
160}
161
162trait NetworkInfoExt {
163    fn mark(&self) -> Option<u32>;
164}
165
166impl NetworkInfoExt for NetworkInfo {
167    fn mark(&self) -> Option<u32> {
168        match self {
169            NetworkInfo::Starnix(s) => s.mark,
170            // Sockets express using Fuchsia's default network by setting
171            // the mark to None.
172            NetworkInfo::Fuchsia(_) | _ => None,
173        }
174    }
175}
176
177// Errors produced when communicating updates to
178// the socket proxy.
179#[derive(Clone, Debug, Error)]
180pub enum NetworkRegistryError {
181    #[error("Error during socketproxy Add: {0:?}")]
182    Add(NetworkRegistryAddError),
183    #[error("Error during socketproxy Remove: {0:?}")]
184    Remove(NetworkRegistryRemoveError),
185    #[error("Error during socketproxy SetDefault: {0:?}")]
186    SetDefault(NetworkRegistrySetDefaultError),
187    #[error("Error during socketproxy Update: {0:?}")]
188    Update(NetworkRegistryUpdateError),
189}
190
191impl From<NetworkRegistryAddError> for NetworkRegistryError {
192    fn from(error: NetworkRegistryAddError) -> Self {
193        NetworkRegistryError::Add(error)
194    }
195}
196
197impl From<NetworkRegistryRemoveError> for NetworkRegistryError {
198    fn from(error: NetworkRegistryRemoveError) -> Self {
199        NetworkRegistryError::Remove(error)
200    }
201}
202
203impl From<NetworkRegistrySetDefaultError> for NetworkRegistryError {
204    fn from(error: NetworkRegistrySetDefaultError) -> Self {
205        NetworkRegistryError::SetDefault(error)
206    }
207}
208
209impl From<NetworkRegistryUpdateError> for NetworkRegistryError {
210    fn from(error: NetworkRegistryUpdateError) -> Self {
211        NetworkRegistryError::Update(error)
212    }
213}
214
215#[derive(Clone, Debug, Error)]
216pub enum NetworkConversionError {
217    #[error("Could not convert id ({0}) to u32")]
218    InvalidInterfaceId(u64),
219}
220
221pub trait NetworkExt<I: fnet_interfaces_ext::FieldInterests> {
222    fn from_watcher_properties(
223        properties: &fnet_interfaces_ext::Properties<I>,
224    ) -> Result<Self, NetworkConversionError>
225    where
226        Self: Sized;
227}
228
229impl<I: fnet_interfaces_ext::FieldInterests> NetworkExt<I> for Network {
230    fn from_watcher_properties(
231        properties: &fnet_interfaces_ext::Properties<I>,
232    ) -> Result<Self, NetworkConversionError> {
233        // We expect interface ids to safely fit in the range of u32 values.
234        let network_id: u32 =
235            properties.id.get().try_into().or_else(|_| {
236                Err(NetworkConversionError::InvalidInterfaceId(properties.id.into()))
237            })?;
238        let network = Self {
239            network_id: Some(network_id),
240            info: Some(NetworkInfo::Fuchsia(FuchsiaNetworkInfo {
241                // No Fuchsia-specific information to provide.
242                ..Default::default()
243            })),
244            // DNS servers of Fuchsia networks are observable in netcfg already, so don't provide
245            // them to the Socketproxy. Socketproxy requires these fields to be provided so
246            // instantiate the v4 and v6 fields as empty vectors.
247            dns_servers: Some(fnp_socketproxy::NetworkDnsServers {
248                v4: Some(vec![]),
249                v6: Some(vec![]),
250                ..Default::default()
251            }),
252            ..Default::default()
253        };
254        Ok(network)
255    }
256}
257
258#[derive(Debug, Clone, PartialEq)]
259/// A generic version of a NetworkRegistry request with the responder removed.
260pub enum NetworkRegistryRequest {
261    /// Sets the default network.
262    ///
263    /// The network must have previously been registered by a call to `Add`.
264    SetDefault { network_id: Option<u32> },
265    /// Add a new network.
266    ///
267    /// This call will not return until the DNS servers have been successfully
268    /// updated in netcfg.
269    Add { network: Network },
270    /// Update a previously Added network.
271    ///
272    /// This call will not return until the DNS servers have been
273    /// successfully updated in netcfg.
274    Update { network: Network },
275    /// Remove a previously Added network.
276    ///
277    /// This call will not return until the DNS servers have been
278    /// successfully updated in netcfg.
279    Remove { network_id: u32 },
280}
281
282impl From<&fnp_socketproxy::NetworkRegistryRequest> for NetworkRegistryRequest {
283    fn from(value: &fnp_socketproxy::NetworkRegistryRequest) -> Self {
284        match *value {
285            fnp_socketproxy::NetworkRegistryRequest::SetDefault { network_id, responder: _ } => {
286                NetworkRegistryRequest::SetDefault {
287                    network_id: match network_id {
288                        fposix_socket::OptionalUint32::Value(v) => Some(v),
289                        fposix_socket::OptionalUint32::Unset(_) => None,
290                    },
291                }
292            }
293            fnp_socketproxy::NetworkRegistryRequest::Add { ref network, responder: _ } => {
294                NetworkRegistryRequest::Add { network: network.clone() }
295            }
296            fnp_socketproxy::NetworkRegistryRequest::Update { ref network, responder: _ } => {
297                NetworkRegistryRequest::Update { network: network.clone() }
298            }
299            fnp_socketproxy::NetworkRegistryRequest::Remove { network_id, responder: _ } => {
300                NetworkRegistryRequest::Remove { network_id }
301            }
302        }
303    }
304}
305impl From<&StarnixNetworksRequest> for NetworkRegistryRequest {
306    fn from(value: &StarnixNetworksRequest) -> Self {
307        match *value {
308            StarnixNetworksRequest::SetDefault { network_id, responder: _ } => {
309                NetworkRegistryRequest::SetDefault {
310                    network_id: match network_id {
311                        fposix_socket::OptionalUint32::Value(v) => Some(v),
312                        fposix_socket::OptionalUint32::Unset(_) => None,
313                    },
314                }
315            }
316            StarnixNetworksRequest::Add { ref network, responder: _ } => {
317                NetworkRegistryRequest::Add { network: network.clone() }
318            }
319            StarnixNetworksRequest::Update { ref network, responder: _ } => {
320                NetworkRegistryRequest::Update { network: network.clone() }
321            }
322            StarnixNetworksRequest::Remove { network_id, responder: _ } => {
323                NetworkRegistryRequest::Remove { network_id }
324            }
325        }
326    }
327}
328impl From<&FuchsiaNetworksRequest> for NetworkRegistryRequest {
329    fn from(value: &FuchsiaNetworksRequest) -> Self {
330        match *value {
331            FuchsiaNetworksRequest::SetDefault { network_id, responder: _ } => {
332                NetworkRegistryRequest::SetDefault {
333                    network_id: match network_id {
334                        fposix_socket::OptionalUint32::Value(v) => Some(v),
335                        fposix_socket::OptionalUint32::Unset(_) => None,
336                    },
337                }
338            }
339            FuchsiaNetworksRequest::Add { ref network, responder: _ } => {
340                NetworkRegistryRequest::Add { network: network.clone() }
341            }
342            FuchsiaNetworksRequest::Update { ref network, responder: _ } => {
343                NetworkRegistryRequest::Update { network: network.clone() }
344            }
345            FuchsiaNetworksRequest::Remove { network_id, responder: _ } => {
346                NetworkRegistryRequest::Remove { network_id }
347            }
348        }
349    }
350}
351
352/// A copy of fnp_socketproxy::Network that ensures that all fields are present.
353#[derive(Debug, Clone)]
354pub(crate) struct ValidatedNetwork {
355    network_id: u32,
356    info: NetworkInfo,
357    dns_servers: NetworkDnsServers,
358}
359
360impl ValidatedNetwork {
361    fn dns_servers(&self) -> Vec<fnet::SocketAddress> {
362        self.dns_servers
363            .v4
364            .iter()
365            .flat_map(|a| a.iter().map(IpAddressExt::to_dns_socket_address))
366            .chain(
367                self.dns_servers
368                    .v6
369                    .iter()
370                    .flat_map(|a| a.iter().map(IpAddressExt::to_dns_socket_address)),
371            )
372            .collect()
373    }
374}
375
376trait ValidateNetworkExt {
377    fn validate(self) -> Result<ValidatedNetwork, CommonErrors>;
378}
379
380impl ValidateNetworkExt for Network {
381    fn validate(self) -> Result<ValidatedNetwork, CommonErrors> {
382        match self {
383            Network { network_id: None, .. } => Err(CommonErrors::MissingNetworkId),
384            Network { info: None, .. } => Err(CommonErrors::MissingNetworkInfo),
385            Network { dns_servers: None, .. } => Err(CommonErrors::MissingNetworkDnsServers),
386            Network {
387                network_id: Some(network_id),
388                info: Some(info),
389                dns_servers: Some(dns_servers),
390                ..
391            } => Ok(ValidatedNetwork { network_id, info, dns_servers }),
392        }
393    }
394}
395
396macro_rules! common_errors_impl {
397    ($($p:ty),+) => {
398        $(
399            impl From<CommonErrors> for $p {
400                fn from(value: CommonErrors) -> Self {
401                    use CommonErrors::*;
402                    match value {
403                        MissingNetworkId => <$p>::MissingNetworkId,
404                        MissingNetworkInfo => <$p>::MissingNetworkInfo,
405                        MissingNetworkDnsServers => <$p>::MissingNetworkDnsServers,
406                    }
407                }
408            }
409        )+
410    }
411}
412
413common_errors_impl!(
414    fnp_socketproxy::NetworkRegistryAddError,
415    fnp_socketproxy::NetworkRegistryUpdateError
416);
417
418/// NetworkRegistry tracks the networks that have been registered.
419#[derive(Inspect, Debug, Default)]
420struct NetworkRegistry {
421    networks: IValue<RegisteredNetworks>,
422
423    inspect_node: fuchsia_inspect::Node,
424}
425
426impl NetworkRegistry {
427    /// Returns a collated list of DnsServerList objects.
428    pub(crate) fn dns_servers(&self) -> Vec<fnp_socketproxy::DnsServerList> {
429        self.networks.dns_servers()
430    }
431
432    /// Returns whether the network registry has a default network set.
433    pub(crate) fn has_default_network(&self) -> bool {
434        self.networks.default_network_id.is_some()
435    }
436
437    /// Returns current socket mark for the default network.
438    pub(crate) fn current_mark(&self) -> Option<u32> {
439        self.networks.current_mark()
440    }
441}
442
443#[derive(Unit, Debug, Default)]
444struct MethodInspect {
445    successes: u32,
446    errors: u32,
447}
448
449#[derive(Unit, Default, Debug)]
450struct RegisteredNetworks {
451    default_network_id: Option<u32>,
452
453    #[inspect(skip)]
454    /// A mapping from network id to ValidatedNetwork for each registered network.
455    networks: HashMap<u32, ValidatedNetwork>,
456
457    adds: MethodInspect,
458    removes: MethodInspect,
459    set_defaults: MethodInspect,
460    updates: MethodInspect,
461}
462
463impl RegisteredNetworks {
464    fn add_network(&mut self, network: Network) -> fnp_socketproxy::NetworkRegistryAddResult {
465        let network = network.validate()?;
466        #[allow(clippy::map_entry, reason = "mass allow for https://fxbug.dev/381896734")]
467        if self.networks.contains_key(&network.network_id) {
468            self.adds.errors += 1;
469            Err(fnp_socketproxy::NetworkRegistryAddError::DuplicateNetworkId)
470        } else {
471            let _: Option<_> = self.networks.insert(network.network_id, network);
472            self.adds.successes += 1;
473            Ok(())
474        }
475    }
476
477    /// Empties the registered networks.
478    pub(crate) fn clear(&mut self) {
479        self.default_network_id = None;
480        self.networks.clear();
481    }
482
483    fn update_network(&mut self, network: Network) -> fnp_socketproxy::NetworkRegistryUpdateResult {
484        let network = network.validate()?;
485        let network_id = network.network_id;
486        *self
487            .networks
488            .get_mut(&network_id)
489            .ok_or(fnp_socketproxy::NetworkRegistryUpdateError::NotFound)
490            .inspect(|_| self.updates.successes += 1)
491            .inspect_err(|_| self.updates.errors += 1)? = network;
492        Ok(())
493    }
494
495    fn remove_network(&mut self, network_id: u32) -> fnp_socketproxy::NetworkRegistryRemoveResult {
496        if self.default_network_id == Some(network_id) {
497            self.removes.errors += 1;
498            return Err(fnp_socketproxy::NetworkRegistryRemoveError::CannotRemoveDefaultNetwork);
499        }
500        match self.networks.remove(&network_id) {
501            Some(_) => {
502                self.removes.successes += 1;
503                Ok(())
504            }
505            None => {
506                self.removes.errors += 1;
507                Err(fnp_socketproxy::NetworkRegistryRemoveError::NotFound)
508            }
509        }
510    }
511
512    /// Update the currently set default network id.
513    ///
514    /// If `network_id` is None, the default network id will be unset.
515    fn set_default_network(
516        &mut self,
517        network_id: Option<u32>,
518    ) -> fnp_socketproxy::NetworkRegistrySetDefaultResult {
519        if let Some(network_id) = network_id {
520            if !self.networks.contains_key(&network_id) {
521                self.set_defaults.errors += 1;
522                return Err(fnp_socketproxy::NetworkRegistrySetDefaultError::NotFound);
523            }
524        }
525        self.set_defaults.successes += 1;
526        self.default_network_id = network_id;
527
528        Ok(())
529    }
530
531    /// Returns a collated list of DnsServerList objects.
532    pub(crate) fn dns_servers(&self) -> Vec<fnp_socketproxy::DnsServerList> {
533        self.networks
534            .iter()
535            .map(|(id, network)| fnp_socketproxy::DnsServerList {
536                source_network_id: Some(*id),
537                addresses: Some(network.dns_servers()),
538                ..Default::default()
539            })
540            .collect()
541    }
542
543    fn current_mark(&self) -> Option<u32> {
544        match (self.default_network_id, self.networks.is_empty()) {
545            (None, false) => Some(DEFAULT_SOCKET_MARK),
546            (id, _) => id.and_then(|id| self.networks[&id].info.mark()),
547        }
548    }
549
550    fn len(&self) -> usize {
551        self.networks.len()
552    }
553}
554
555#[derive(Inspect, Clone, Debug, Default)]
556pub struct NetworkRegistries {
557    starnix: Arc<Mutex<NetworkRegistry>>,
558    fuchsia: Arc<Mutex<NetworkRegistry>>,
559}
560
561impl NetworkRegistries {
562    // When Fuchsia has a default network, then prefer its mark
563    // over any existing mark. When it is unset, then fallback
564    // to the mark provided by Starnix.
565    async fn current_mark(&self) -> Option<u32> {
566        {
567            let fuchsia = self.fuchsia.lock().await;
568            if fuchsia.has_default_network() {
569                return fuchsia.current_mark();
570            }
571        }
572
573        return self.starnix.lock().await.networks.current_mark();
574    }
575
576    // When Fuchsia has a default network, then prefer its DNS
577    // over any existing DNS servers. When it is unset, then
578    // fallback to the DNS servers provided by Starnix.
579    async fn current_dns_servers(&self) -> Vec<fnp_socketproxy::DnsServerList> {
580        {
581            let fuchsia = self.fuchsia.lock().await;
582            if fuchsia.has_default_network() {
583                return fuchsia.dns_servers();
584            }
585        }
586
587        return self.starnix.lock().await.dns_servers();
588    }
589}
590
591#[derive(Debug)]
592enum RegistryType {
593    Starnix,
594    Fuchsia,
595}
596
597#[derive(Inspect, Debug)]
598pub struct Registry {
599    #[inspect(forward)]
600    networks: NetworkRegistries,
601    // Reflects the marks that are set on the sockets vended
602    // by this component.
603    marks: Arc<Mutex<crate::SocketMarks>>,
604    dns_tx: mpsc::Sender<Vec<fnp_socketproxy::DnsServerList>>,
605    forwarder_tx: mpsc::Sender<NetworkRegistryRequest>,
606    starnix_occupant: Mutex<()>,
607    fuchsia_occupant: Mutex<()>,
608}
609
610macro_rules! handle_registry_request {
611    ($request_type:ident, $request:expr, $network_registry:expr, $registry_type:expr) => {{
612        let mut networks = $network_registry.networks.as_mut();
613        let (op, send): (_, Box<dyn FnOnce() -> Result<(), _> + Send + Sync + 'static>) =
614            match $request {
615                $request_type::SetDefault { network_id, responder } => {
616                    let result = networks.set_default_network(match network_id {
617                        fposix_socket::OptionalUint32::Value(value) => Some(value),
618                        fposix_socket::OptionalUint32::Unset(_) => None,
619                    });
620                    ("set default", Box::new(move || responder.send(result)))
621                }
622                $request_type::Add { network, responder } => {
623                    let result = networks.add_network(network);
624                    ("add", Box::new(move || responder.send(result)))
625                }
626                $request_type::Update { network, responder } => {
627                    let result = networks.update_network(network);
628                    ("update", Box::new(move || responder.send(result)))
629                }
630                $request_type::Remove { network_id, responder } => {
631                    let result = networks.remove_network(network_id);
632                    ("remove", Box::new(move || responder.send(result)))
633                }
634            };
635        let new_mark = networks.current_mark();
636        info!(
637            "{:?} registry {op}. mark: {new_mark:?}, networks count: {}",
638            $registry_type,
639            networks.len()
640        );
641        std::mem::drop(networks);
642        send
643    }};
644}
645
646impl Registry {
647    pub(crate) fn new(
648        marks: Arc<Mutex<crate::SocketMarks>>,
649        dns_tx: mpsc::Sender<Vec<fnp_socketproxy::DnsServerList>>,
650        forwarder_tx: mpsc::Sender<NetworkRegistryRequest>,
651    ) -> Result<Self, anyhow::Error> {
652        Ok(Self {
653            networks: Default::default(),
654            marks,
655            dns_tx,
656            forwarder_tx,
657            starnix_occupant: Default::default(),
658            fuchsia_occupant: Default::default(),
659        })
660    }
661}
662
663impl Registry {
664    pub(crate) async fn run_starnix(
665        &self,
666        stream: fnp_socketproxy::StarnixNetworksRequestStream,
667    ) -> Result<(), Error> {
668        let _occupant = match self.starnix_occupant.try_lock() {
669            Some(o) => o,
670            None => {
671                warn!("Only one connection to StarnixNetworks is allowed at a time");
672                stream.control_handle().shutdown_with_epitaph(fidl::Status::ACCESS_DENIED);
673                return Ok(());
674            }
675        };
676
677        info!("Starting fuchsia.net.policy.socketproxy.StarnixNetworks server");
678        self.networks.starnix.lock().await.networks.as_mut().clear();
679        stream
680            .map(|result| result.context("failed request"))
681            .try_for_each(|request| {
682                async {
683                    self.forwarder_tx.clone().feed((&request).into()).await.unwrap_or_else(
684                        |e| {
685                            if !e.is_disconnected() {
686                                // Log if the feed fails for reasons other than disconnection.
687                                error!("Unable to feed request forward: {e:?}")
688                            }
689                        },
690                    );
691                    let mut network_registry = self.networks.starnix.lock().await;
692                    let send: Box<dyn FnOnce() -> Result<(), _> + Send + Sync + 'static> =
693                        handle_registry_request!(
694                            StarnixNetworksRequest,
695                            request,
696                            network_registry,
697                            RegistryType::Starnix
698                        );
699                    std::mem::drop(network_registry);
700
701                    self.handle_state_changed().await?;
702                    send().context("error sending response")?;
703                    Ok(())
704                }
705            })
706            .await
707    }
708
709    pub(crate) async fn run_fuchsia(
710        &self,
711        stream: fnp_socketproxy::FuchsiaNetworksRequestStream,
712    ) -> Result<(), Error> {
713        let _occupant = match self.fuchsia_occupant.try_lock() {
714            Some(o) => o,
715            None => {
716                warn!("Only one connection to FuchsiaNetworks is allowed at a time");
717                stream.control_handle().shutdown_with_epitaph(fidl::Status::ACCESS_DENIED);
718                return Ok(());
719            }
720        };
721
722        info!("Starting fuchsia.net.policy.socketproxy.FuchsiaNetworks server");
723        self.networks.fuchsia.lock().await.networks.as_mut().clear();
724        stream
725            .map(|result| result.context("failed request"))
726            .try_for_each(|request| {
727                async {
728                    let mut network_registry = self.networks.fuchsia.lock().await;
729                    let send: Box<dyn FnOnce() -> Result<(), _> + Send + Sync + 'static> =
730                        handle_registry_request!(
731                            FuchsiaNetworksRequest,
732                            request,
733                            network_registry,
734                            RegistryType::Fuchsia
735                        );
736                    std::mem::drop(network_registry);
737
738                    self.handle_state_changed().await?;
739                    send().context("error sending response")?;
740                    Ok(())
741                }
742            })
743            .await
744    }
745
746    async fn handle_state_changed(&self) -> Result<(), Error> {
747        // We do feed here instead of send so that we don't wait for a flush
748        // in the event that the DnsServerWatcher is not running.
749        self.dns_tx.clone().feed(self.networks.current_dns_servers().await).await.unwrap_or_else(
750            |e| {
751                if !e.is_disconnected() {
752                    // Log if the feed fails for reasons other than disconnection.
753                    error!("Unable to feed DNS update: {e:?}")
754                }
755            },
756        );
757
758        // Ensure the mark is updated prior to sending out the response
759        // and dropping the registry.
760        // TODO(https://fxbug.dev/431822969): Replace this with a common definition of
761        // which mark domain is used for which purpose.
762        self.marks.lock().await.mark_1 = self.networks.current_mark().await.into_optional_uint32();
763        Ok(())
764    }
765}
766
767#[cfg(test)]
768mod test {
769    use super::*;
770    use fuchsia_component::server::ServiceFs;
771    use fuchsia_component_test::{
772        Capability, ChildOptions, LocalComponentHandles, RealmBuilder, RealmInstance, Ref, Route,
773    };
774    use futures::channel::mpsc::Receiver;
775    use futures::future;
776    use net_declare::{fidl_ip, fidl_socket_addr};
777    use pretty_assertions::assert_eq;
778    use socket_proxy_testing::{RegistryType, ToDnsServerList as _, ToNetwork};
779    use test_case::test_case;
780
781    #[derive(Clone, Debug)]
782    enum Op<N: ToNetwork> {
783        SetDefault {
784            network_id: Option<u32>,
785            result: Result<(), fnp_socketproxy::NetworkRegistrySetDefaultError>,
786        },
787        Add {
788            network: N,
789            result: Result<(), fnp_socketproxy::NetworkRegistryAddError>,
790        },
791        Update {
792            network: N,
793            result: Result<(), fnp_socketproxy::NetworkRegistryUpdateError>,
794        },
795        Remove {
796            network_id: u32,
797            result: Result<(), fnp_socketproxy::NetworkRegistryRemoveError>,
798        },
799    }
800
801    impl<N: ToNetwork + Clone> From<&Op<N>> for NetworkRegistryRequest {
802        fn from(value: &Op<N>) -> Self {
803            match value {
804                Op::SetDefault { network_id, result: _ } => {
805                    NetworkRegistryRequest::SetDefault { network_id: *network_id }
806                }
807                Op::Add { network, result: _ } => NetworkRegistryRequest::Add {
808                    network: network.clone().to_network(RegistryType::Starnix),
809                },
810                Op::Update { network, result: _ } => NetworkRegistryRequest::Update {
811                    network: network.clone().to_network(RegistryType::Starnix),
812                },
813                Op::Remove { network_id, result: _ } => {
814                    NetworkRegistryRequest::Remove { network_id: *network_id }
815                }
816            }
817        }
818    }
819
820    macro_rules! execute {
821        ($self:ident, $proxy:ident, $registry:expr) => {{
822            match $self {
823                Op::SetDefault { network_id, result } => {
824                    assert_eq!(
825                        $proxy
826                            .set_default(&match network_id {
827                                Some(value) => fposix_socket::OptionalUint32::Value(*value),
828                                None => fposix_socket::OptionalUint32::Unset(fposix_socket::Empty),
829                            })
830                            .await?,
831                        *result
832                    )
833                }
834                Op::Add { network, result } => {
835                    assert_eq!($proxy.add(&network.to_network($registry)).await?, *result)
836                }
837                Op::Update { network, result } => {
838                    assert_eq!($proxy.update(&network.to_network($registry)).await?, *result)
839                }
840                Op::Remove { network_id, result } => {
841                    assert_eq!($proxy.remove(*network_id).await?, *result)
842                }
843            }
844            Ok(())
845        }};
846    }
847
848    impl<N: ToNetwork + Clone> Op<N> {
849        async fn execute_starnix(
850            &self,
851            starnix: &fnp_socketproxy::StarnixNetworksProxy,
852        ) -> Result<(), Error> {
853            execute!(self, starnix, RegistryType::Starnix)
854        }
855
856        async fn execute_fuchsia(
857            &self,
858            fuchsia: &fnp_socketproxy::FuchsiaNetworksProxy,
859        ) -> Result<(), Error> {
860            execute!(self, fuchsia, RegistryType::Fuchsia)
861        }
862
863        fn is_err(&self) -> bool {
864            match &self {
865                Op::SetDefault { network_id: _, result } => result.is_err(),
866                Op::Add { network: _, result } => result.is_err(),
867                Op::Update { network: _, result } => result.is_err(),
868                Op::Remove { network_id: _, result } => result.is_err(),
869            }
870        }
871    }
872
873    enum IncomingService {
874        StarnixNetworks(fnp_socketproxy::StarnixNetworksRequestStream),
875        FuchsiaNetworks(fnp_socketproxy::FuchsiaNetworksRequestStream),
876    }
877
878    async fn run_registry(
879        handles: LocalComponentHandles,
880        starnix_networks: Arc<Mutex<NetworkRegistry>>,
881        fuchsia_networks: Arc<Mutex<NetworkRegistry>>,
882        marks: Arc<Mutex<crate::SocketMarks>>,
883        dns_tx: mpsc::Sender<Vec<fnp_socketproxy::DnsServerList>>,
884        forwarder_tx: mpsc::Sender<NetworkRegistryRequest>,
885    ) -> Result<(), Error> {
886        let mut fs = ServiceFs::new();
887        let _ = fs
888            .dir("svc")
889            .add_fidl_service(IncomingService::StarnixNetworks)
890            .add_fidl_service(IncomingService::FuchsiaNetworks);
891        let _ = fs.serve_connection(handles.outgoing_dir)?;
892
893        let registry = Registry {
894            networks: NetworkRegistries { starnix: starnix_networks, fuchsia: fuchsia_networks },
895            marks,
896            dns_tx,
897            forwarder_tx,
898            starnix_occupant: Default::default(),
899            fuchsia_occupant: Default::default(),
900        };
901
902        fs.for_each_concurrent(0, |service| async {
903            match service {
904                IncomingService::StarnixNetworks(stream) => registry.run_starnix(stream).await,
905                IncomingService::FuchsiaNetworks(stream) => registry.run_fuchsia(stream).await,
906            }
907            .unwrap_or_else(|e| error!("{e:?}"))
908        })
909        .await;
910
911        Ok(())
912    }
913
914    async fn setup_test() -> Result<
915        (
916            RealmInstance,
917            Receiver<Vec<fnp_socketproxy::DnsServerList>>,
918            Receiver<NetworkRegistryRequest>,
919        ),
920        Error,
921    > {
922        let builder = RealmBuilder::new().await?;
923        let starnix_networks = Arc::new(Mutex::new(Default::default()));
924        let fuchsia_networks = Arc::new(Mutex::new(Default::default()));
925        let (dns_tx, dns_rx) = mpsc::channel(1);
926        let (forwarder_tx, forwarder_rx) = mpsc::channel(1);
927        let marks = Arc::new(Mutex::new(crate::SocketMarks::default()));
928        let registry = builder
929            .add_local_child(
930                "registry",
931                {
932                    let starnix_networks = starnix_networks.clone();
933                    let fuchsia_networks = fuchsia_networks.clone();
934                    let marks = marks.clone();
935                    let dns_tx = dns_tx.clone();
936                    move |handles: LocalComponentHandles| {
937                        Box::pin(run_registry(
938                            handles,
939                            starnix_networks.clone(),
940                            fuchsia_networks.clone(),
941                            marks.clone(),
942                            dns_tx.clone(),
943                            forwarder_tx.clone(),
944                        ))
945                    }
946                },
947                ChildOptions::new(),
948            )
949            .await?;
950
951        builder
952            .add_route(
953                Route::new()
954                    .capability(Capability::protocol::<fnp_socketproxy::StarnixNetworksMarker>())
955                    .from(&registry)
956                    .to(Ref::parent()),
957            )
958            .await?;
959
960        builder
961            .add_route(
962                Route::new()
963                    .capability(Capability::protocol::<fnp_socketproxy::FuchsiaNetworksMarker>())
964                    .from(&registry)
965                    .to(Ref::parent()),
966            )
967            .await?;
968
969        let realm = builder.build().await?;
970
971        Ok((realm, dns_rx, forwarder_rx))
972    }
973
974    #[test_case(&[
975        Op::Add { network: 1, result: Ok(()) },
976        Op::Update { network: 1, result: Ok(()) },
977        Op::Remove { network_id: 1, result: Ok(()) },
978    ]; "normal operation")]
979    #[test_case(&[
980        Op::Add { network: 1, result: Ok(()) },
981        Op::Add { network: 1, result: Err(fnp_socketproxy::NetworkRegistryAddError::DuplicateNetworkId) },
982    ]; "duplicate add")]
983    #[test_case(&[
984        Op::Update { network: 1, result: Err(fnp_socketproxy::NetworkRegistryUpdateError::NotFound) },
985    ]; "update missing")]
986    #[test_case(&[
987        Op::<u32>::Remove { network_id: 1, result: Err(fnp_socketproxy::NetworkRegistryRemoveError::NotFound) },
988    ]; "remove missing")]
989    #[test_case(&[
990        Op::<u32>::SetDefault { network_id: Some(1), result: Err(fnp_socketproxy::NetworkRegistrySetDefaultError::NotFound) },
991    ]; "set default missing")]
992    #[test_case(&[
993        Op::Add { network: 1, result: Ok(()) },
994        Op::SetDefault { network_id: Some(1), result: Ok(()) },
995        Op::Remove { network_id: 1, result: Err(fnp_socketproxy::NetworkRegistryRemoveError::CannotRemoveDefaultNetwork)},
996    ]; "remove default network")]
997    #[test_case(&[
998        Op::Add { network: 1, result: Ok(()) },
999        Op::SetDefault { network_id: Some(1), result: Ok(()) },
1000        Op::Remove { network_id: 1, result: Err(fnp_socketproxy::NetworkRegistryRemoveError::CannotRemoveDefaultNetwork)},
1001        Op::Add { network: 2, result: Ok(()) },
1002        Op::SetDefault { network_id: Some(2), result: Ok(()) },
1003        Op::Remove { network_id: 1, result: Ok(()) },
1004    ]; "remove formerly default network")]
1005    #[test_case(&[
1006        Op::Add { network: 1, result: Ok(()) },
1007        Op::SetDefault { network_id: Some(1), result: Ok(()) },
1008        Op::Remove { network_id: 1, result: Err(fnp_socketproxy::NetworkRegistryRemoveError::CannotRemoveDefaultNetwork)},
1009        Op::SetDefault { network_id: None, result: Ok(()) },
1010        Op::Remove { network_id: 1, result: Ok(()) },
1011    ]; "remove last network")]
1012    #[test_case(&[
1013        Op::Add { network: 1, result: Ok(()) },
1014        Op::Update { network: 1, result: Ok(()) },
1015        Op::Add { network: 2, result: Ok(()) },
1016        Op::Add { network: 3, result: Ok(()) },
1017        Op::Add { network: 4, result: Ok(()) },
1018        Op::Update { network: 4, result: Ok(()) },
1019        Op::Update { network: 2, result: Ok(()) },
1020        Op::Update { network: 3, result: Ok(()) },
1021        Op::Add { network: 5, result: Ok(()) },
1022        Op::Update { network: 5, result: Ok(()) },
1023        Op::Add { network: 6, result: Ok(()) },
1024        Op::Add { network: 7, result: Ok(()) },
1025        Op::Add { network: 8, result: Ok(()) },
1026        Op::Update { network: 8, result: Ok(()) },
1027        Op::Update { network: 6, result: Ok(()) },
1028        Op::Add { network: 9, result: Ok(()) },
1029        Op::Update { network: 9, result: Ok(()) },
1030        Op::Update { network: 7, result: Ok(()) },
1031        Op::Add { network: 10, result: Ok(()) },
1032        Op::Update { network: 10, result: Ok(()) },
1033    ]; "many updates")]
1034    #[fuchsia::test]
1035    async fn test_operations<N: ToNetwork + Clone>(operations: &[Op<N>]) -> Result<(), Error> {
1036        let (realm, _, _) = setup_test().await?;
1037        let starnix_networks = realm
1038            .root
1039            .connect_to_protocol_at_exposed_dir()
1040            .context("While connecting to StarnixNetworks")?;
1041        let fuchsia_networks = realm
1042            .root
1043            .connect_to_protocol_at_exposed_dir()
1044            .context("While connecting to FuchsiaNetworks")?;
1045
1046        for op in operations {
1047            // Demonstrate that the same operations can be applied
1048            // independently in both registries.
1049            op.execute_starnix(&starnix_networks).await?;
1050            op.execute_fuchsia(&fuchsia_networks).await?;
1051        }
1052
1053        Ok(())
1054    }
1055
1056    #[test_case(&[
1057        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1058    ], vec![(1, vec![fidl_socket_addr!("192.0.2.0:53")]).to_dns_server_list()]
1059    ; "normal operation (v4)")]
1060    #[test_case(&[
1061        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1062        Op::Update { network: (1, vec![fidl_ip!("192.0.2.1")]), result: Ok(()) },
1063    ], vec![(1, vec![fidl_socket_addr!("192.0.2.1:53")]).to_dns_server_list()]
1064    ; "update server list (v4)")]
1065    #[test_case(&[
1066        Op::Add { network: (1, vec![fidl_ip!("2001:db8::1")]), result: Ok(()) },
1067    ], vec![(1, vec![fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list()]
1068    ; "normal operation (v6)")]
1069    #[test_case(&[
1070        Op::Add { network: (1, vec![fidl_ip!("2001:db8::1")]), result: Ok(()) },
1071        Op::Update { network: (1, vec![fidl_ip!("2001:db8::2")]), result: Ok(()) },
1072    ], vec![(1, vec![fidl_socket_addr!("[2001:db8::2]:53")]).to_dns_server_list()]
1073    ; "update server list (v6)")]
1074    #[test_case(&[
1075        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) },
1076    ], vec![(1, vec![fidl_socket_addr!("192.0.2.0:53"), fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list()]
1077    ; "normal operation (mixed)")]
1078    #[test_case(&[
1079        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) },
1080        Op::Update { network: (1, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) },
1081    ], vec![(1, vec![fidl_socket_addr!("192.0.2.1:53"), fidl_socket_addr!("[2001:db8::2]:53")]).to_dns_server_list()]
1082    ; "update server list (mixed)")]
1083    #[test_case(&[
1084        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) },
1085        Op::Add { network: (2, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) },
1086        Op::Add { network: (3, vec![fidl_ip!("192.0.2.2"), fidl_ip!("2001:db8::3")]), result: Ok(()) },
1087    ], vec![
1088        (1, vec![fidl_socket_addr!("192.0.2.0:53"), fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list(),
1089        (2, vec![fidl_socket_addr!("192.0.2.1:53"), fidl_socket_addr!("[2001:db8::2]:53")]).to_dns_server_list(),
1090        (3, vec![fidl_socket_addr!("192.0.2.2:53"), fidl_socket_addr!("[2001:db8::3]:53")]).to_dns_server_list(),
1091    ]; "multiple networks")]
1092    #[fuchsia::test]
1093    async fn test_dns_tracking<N: ToNetwork + Clone>(
1094        operations: &[Op<N>],
1095        dns_servers: Vec<fnp_socketproxy::DnsServerList>,
1096    ) -> Result<(), Error> {
1097        let (realm, mut dns_rx, _) = setup_test().await?;
1098        let starnix_networks = realm
1099            .root
1100            .connect_to_protocol_at_exposed_dir()
1101            .context("While connecting to StarnixNetworks")?;
1102
1103        let mut last_dns = None;
1104        for op in operations {
1105            // Starnix and Fuchsia registries have the same handling logic, so
1106            // use the Starnix registry to confirm this behavior.
1107            op.execute_starnix(&starnix_networks).await?;
1108            last_dns = Some(dns_rx.next().await.expect("dns update expected after each operation"));
1109        }
1110
1111        let mut last_dns = last_dns.expect("there should be at least one dns update");
1112        last_dns.sort_by_key(|a| a.source_network_id);
1113        assert_eq!(last_dns, dns_servers);
1114
1115        Ok(())
1116    }
1117
1118    #[test_case(&[
1119        (RegistryType::Fuchsia, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) }),
1120        (RegistryType::Fuchsia, Op::SetDefault { network_id: Some(1), result: Ok(()) }),
1121    ], vec![(1, vec![fidl_socket_addr!("192.0.2.0:53")]).to_dns_server_list()]
1122    ; "normal operation Fuchsia (v4)")]
1123    #[test_case(&[
1124        (RegistryType::Fuchsia, Op::Add { network: (1, vec![fidl_ip!("2001:db8::1")]), result: Ok(()) }),
1125        (RegistryType::Fuchsia, Op::SetDefault { network_id: Some(1), result: Ok(()) }),
1126    ], vec![(1, vec![fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list()]
1127    ; "normal operation Fuchsia (v6)")]
1128    #[test_case(&[
1129        (RegistryType::Starnix, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) }),
1130        (RegistryType::Fuchsia, Op::Remove { network_id: 1, result: Err(fnp_socketproxy::NetworkRegistryRemoveError::NotFound) }),
1131    ], vec![(1, vec![fidl_socket_addr!("192.0.2.0:53")]).to_dns_server_list()]
1132    ; "attempt remove in wrong registry")]
1133    #[test_case(&[
1134        (RegistryType::Starnix, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) }),
1135        (RegistryType::Fuchsia, Op::Add { network: (2, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) }),
1136    ], vec![
1137        (1, vec![fidl_socket_addr!("192.0.2.0:53"), fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list(),
1138    ]; "Fuchsia default absent, use Starnix")]
1139    #[test_case(&[
1140        (RegistryType::Starnix, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) }),
1141        (RegistryType::Fuchsia, Op::Add { network: (2, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) }),
1142        (RegistryType::Fuchsia, Op::SetDefault { network_id: Some(2), result: Ok(()) }),
1143        ], vec![
1144        (2, vec![fidl_socket_addr!("192.0.2.1:53"), fidl_socket_addr!("[2001:db8::2]:53")]).to_dns_server_list(),
1145    ]; "Fuchsia default present, use Fuchsia")]
1146    #[test_case(&[
1147        (RegistryType::Starnix, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) }),
1148        (RegistryType::Fuchsia, Op::Add { network: (2, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) }),
1149        (RegistryType::Fuchsia, Op::SetDefault { network_id: Some(2), result: Ok(()) }),
1150        (RegistryType::Fuchsia, Op::SetDefault { network_id: None, result: Ok(()) }),
1151        ], vec![
1152        (1, vec![fidl_socket_addr!("192.0.2.0:53"), fidl_socket_addr!("[2001:db8::1]:53")]).to_dns_server_list(),
1153    ]; "Fallback to Starnix network")]
1154    #[test_case(&[
1155        (RegistryType::Starnix, Op::Add { network: (1, vec![fidl_ip!("192.0.2.0"), fidl_ip!("2001:db8::1")]), result: Ok(()) }),
1156        (RegistryType::Fuchsia, Op::Add { network: (2, vec![fidl_ip!("192.0.2.1"), fidl_ip!("2001:db8::2")]), result: Ok(()) }),
1157        (RegistryType::Fuchsia, Op::SetDefault { network_id: Some(2), result: Ok(()) }),
1158        (RegistryType::Fuchsia, Op::Update { network: (2, vec![fidl_ip!("192.0.2.2"), fidl_ip!("2001:db8::3")]), result: Ok(()) }),
1159        ], vec![
1160        (2, vec![fidl_socket_addr!("192.0.2.2:53"), fidl_socket_addr!("[2001:db8::3]:53")]).to_dns_server_list(),
1161    ]; "Fuchsia default present then updated")]
1162    #[fuchsia::test]
1163    async fn test_dns_tracking_across_registries<N: ToNetwork + Clone>(
1164        operations: &[(RegistryType, Op<N>)],
1165        dns_servers: Vec<fnp_socketproxy::DnsServerList>,
1166    ) -> Result<(), Error> {
1167        let (realm, mut dns_rx, _) = setup_test().await?;
1168        let starnix_networks = realm
1169            .root
1170            .connect_to_protocol_at_exposed_dir()
1171            .context("While connecting to StarnixNetworks")?;
1172        let fuchsia_networks = realm
1173            .root
1174            .connect_to_protocol_at_exposed_dir()
1175            .context("While connecting to FuchsiaNetworks")?;
1176
1177        let mut last_dns = None;
1178        for (registry, op) in operations {
1179            match registry {
1180                RegistryType::Starnix => {
1181                    op.execute_starnix(&starnix_networks).await?;
1182                }
1183                RegistryType::Fuchsia => {
1184                    op.execute_fuchsia(&fuchsia_networks).await?;
1185                }
1186            }
1187            // When the operation results in an error, we don't expect that to
1188            // result in an additional DNS update.
1189            if !op.is_err() {
1190                last_dns =
1191                    Some(dns_rx.next().await.expect("dns update expected after each operation"));
1192            }
1193        }
1194
1195        let mut last_dns = last_dns.expect("there should be at least one dns update");
1196        last_dns.sort_by_key(|a| a.source_network_id);
1197        assert_eq!(last_dns, dns_servers);
1198
1199        Ok(())
1200    }
1201
1202    #[test_case(&[
1203        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1204    ]
1205    ; "Add but no default")]
1206    #[test_case(&[
1207        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1208        Op::SetDefault { network_id: Some(1), result: Ok(()) },
1209    ]
1210    ; "Add and set default")]
1211    #[test_case(&[
1212        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1213        Op::Add { network: (2, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1214        Op::SetDefault { network_id: Some(1), result: Ok(()) },
1215        Op::SetDefault { network_id: Some(2), result: Ok(()) },
1216    ]
1217    ; "Add two and set default")]
1218    #[test_case(&[
1219        Op::Add { network: (1, vec![fidl_ip!("192.0.2.0")]), result: Ok(()) },
1220        Op::SetDefault { network_id: Some(1), result: Ok(()) },
1221        Op::SetDefault { network_id: None, result: Ok(()) },
1222        Op::Remove { network_id: 1, result: Ok(()) },
1223    ]
1224    ; "Add default and delete")]
1225    #[fuchsia::test]
1226    async fn test_forward_network_update<N: ToNetwork + Clone + std::fmt::Debug>(
1227        operations: &[Op<N>],
1228    ) -> Result<(), Error> {
1229        let (realm, _, mut forwarder_rx) = setup_test().await?;
1230        let starnix_networks = realm
1231            .root
1232            .connect_to_protocol_at_exposed_dir()
1233            .context("While connecting to StarnixNetworks")?;
1234
1235        let (_, seen_updates) = future::join(
1236            async move {
1237                for op in operations {
1238                    op.execute_starnix(&starnix_networks).await?;
1239                }
1240                std::mem::drop(realm);
1241                Ok::<(), anyhow::Error>(())
1242            },
1243            async move {
1244                let mut forwarded_requests = Vec::new();
1245                while let Some(req) = forwarder_rx.next().await {
1246                    forwarded_requests.push(req);
1247                }
1248                forwarded_requests
1249            },
1250        )
1251        .await;
1252
1253        let expected_updates =
1254            operations.iter().map(NetworkRegistryRequest::from).collect::<Vec<_>>();
1255        assert_eq!(expected_updates, seen_updates);
1256
1257        Ok(())
1258    }
1259}