Skip to main content

netcfg/network/
mod.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::InterfaceId;
6use anyhow::Context as _;
7use async_utils::stream::{Tagged, WithTag as _};
8use dns_server_watcher::DnsServers;
9use fidl::endpoints::{ControlHandle as _, Responder as _};
10use log::{error, info, warn};
11use policy_properties::NetworkTokenExt as _;
12use std::collections::HashMap;
13use std::collections::hash_map::Entry;
14
15mod token_registry;
16
17use {
18    fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name,
19    fidl_fuchsia_net_policy_properties as fnp_properties,
20    fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy,
21    fidl_fuchsia_posix_socket as fposix_socket,
22};
23
24// The id for each network, separated by network source.
25//
26// NB: These are separated in the case that the same underlying
27// interface id is used by Fuchsia and a delegated actor.
28#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum NetworkId {
30    Fuchsia(InterfaceId),
31    Delegated(InterfaceId),
32}
33
34impl std::fmt::Display for NetworkId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            NetworkId::Fuchsia(interface_id) => write!(f, "fuchsia:{interface_id}"),
38            NetworkId::Delegated(interface_id) => write!(f, "delegated:{interface_id}"),
39        }
40    }
41}
42
43impl NetworkId {
44    pub fn get(&self) -> InterfaceId {
45        match self {
46            NetworkId::Fuchsia(interface_id) => *interface_id,
47            NetworkId::Delegated(interface_id) => *interface_id,
48        }
49    }
50
51    pub fn fuchsia<I: Into<InterfaceId>>(id: I) -> Self {
52        NetworkId::Fuchsia(id.into())
53    }
54
55    pub fn delegated<I: Into<InterfaceId>>(id: I) -> Self {
56        NetworkId::Delegated(id.into())
57    }
58}
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub(crate) struct NetworkTokenContents {
62    network_id: NetworkId,
63    is_default: bool,
64}
65
66#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct ConnectionId(usize);
68
69#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct UpdateGeneration {
71    /// The current generation for `fuchsia.net.policy.properties.WatchDefault`.
72    /// Incremented each time the default network changes.
73    default_network: usize,
74
75    /// The current generation for `fuchsia.net.policy.properties.WatchProperties`.
76    /// Incremented each time a network property changes.
77    properties: usize,
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
82
83impl UpdateGenerations {
84    fn default_network(&self, id: &ConnectionId) -> Option<usize> {
85        self.0.get(id).map(|g| g.default_network)
86    }
87
88    fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
89        self.0.entry(id).or_default().default_network = generation.default_network;
90    }
91
92    fn properties(&self, id: &ConnectionId) -> Option<usize> {
93        self.0.get(id).map(|g| g.properties)
94    }
95
96    fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
97        self.0.entry(id).or_default().properties = generation.properties;
98    }
99
100    fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
101        self.0.remove(id)
102    }
103}
104
105trait SetMark {
106    fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>);
107}
108
109impl SetMark for fnet::Marks {
110    fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>) {
111        match domain {
112            fnet::MarkDomain::Mark1 => self.mark_1 = value,
113            fnet::MarkDomain::Mark2 => self.mark_2 = value,
114        }
115    }
116}
117
118#[derive(Debug)]
119pub(crate) struct NetworkPropertyResponder {
120    token: fnp_properties::NetworkToken,
121    watched_properties: Vec<fnp_properties::Property>,
122    responder: fnp_properties::NetworksWatchPropertiesResponder,
123}
124
125impl NetworkPropertyResponder {
126    fn respond(
127        self,
128        response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
129    ) -> Result<(), fidl::Error> {
130        self.responder.send(response)
131    }
132}
133
134#[derive(Default, Clone)]
135struct NetworkProperties {
136    socket_marks: Option<fnet::Marks>,
137}
138
139impl NetworkProperties {
140    fn get_marks(&self) -> Option<&fnet::Marks> {
141        self.socket_marks.as_ref()
142    }
143}
144
145/// The current state of all networks sent to the NetworkRegistry.
146#[derive(Default, Clone)]
147struct RegisteredNetworks {
148    default_network: Option<NetworkId>,
149    networks: HashMap<NetworkId, NetworkProperties>,
150    dns_servers: Vec<fnet_name::DnsServer_>,
151}
152
153impl RegisteredNetworks {
154    fn apply(&mut self, update: PropertyUpdate) -> UpdateApplied {
155        match update {
156            PropertyUpdate::LoseDefaultNetwork => self.handle_default_network_update(None),
157            PropertyUpdate::ChangeNetwork(network_id, network_change) => match network_change {
158                NetworkUpdate::Properties(event) => self.handle_changed_network(network_id, event),
159                NetworkUpdate::Remove => UpdateApplied::NetworkRemoved(network_id),
160                NetworkUpdate::MakeDefault => self.handle_default_network_update(Some(network_id)),
161            },
162            PropertyUpdate::UpdateDns(dns_servers) => {
163                if self.dns_servers != dns_servers {
164                    self.dns_servers = dns_servers;
165                    UpdateApplied::DnsChanged
166                } else {
167                    UpdateApplied::None
168                }
169            }
170        }
171    }
172
173    // Handle the `default_network` argument in a `PropertyUpdate`, determining
174    // whether the network changed as a result of the update.
175    //
176    // Returns an `UpdateApplied::DefaultNetworkChanged` if the new default
177    // network is different from the old one.
178    fn handle_default_network_update(
179        &mut self,
180        new_default_network: Option<NetworkId>,
181    ) -> UpdateApplied {
182        // We do not need to send an update applied if the network stayed the same.
183        if new_default_network == self.default_network {
184            return UpdateApplied::None;
185        }
186
187        let old_default_network = self.default_network;
188        self.default_network = new_default_network;
189        return UpdateApplied::DefaultNetworkChanged(old_default_network);
190    }
191
192    // Handle the `NetworkPropertiesChange` in a `PropertyUpdate`, determining
193    // whether network properties changed as a result of the update.
194    //
195    // Returns an `UpdateApplied::NetworkChanged` event if this is a valid change.
196    fn handle_changed_network(
197        &mut self,
198        network_id: NetworkId,
199        event: NetworkPropertiesChange,
200    ) -> UpdateApplied {
201        let NetworkPropertiesChange { added, marks: socket_marks } = event;
202        let entry = self.networks.entry(network_id);
203        let result = match (added, &entry, network_id, socket_marks) {
204            (true, Entry::Occupied(_), _, _) => Err("add already added network"),
205            (false, Entry::Vacant(_), _, _) => Err("update a non-added network"),
206            (_, _, NetworkId::Fuchsia(_), Some(_)) => Err("have a fuchsia network with marks"),
207            (_, _, NetworkId::Delegated(_), None) => Err("have a delegated network without marks"),
208
209            (_, _, NetworkId::Fuchsia(_), None) => Ok((NetworkProperties::default(), added)),
210            (_, entry, NetworkId::Delegated(_), Some(socket_marks)) => {
211                let changed = if let Entry::Occupied(e) = entry {
212                    e.get().get_marks() != Some(&socket_marks)
213                } else {
214                    true
215                };
216                Ok((NetworkProperties { socket_marks: Some(socket_marks) }, changed))
217            }
218        };
219
220        match result {
221            Ok((properties, changed_marks)) => {
222                let _ = entry.insert_entry(properties);
223                UpdateApplied::NetworkChanged { network_id, added, changed_marks }
224            }
225            Err(e) => {
226                error!("Cannot {e}. Update ignored.");
227                UpdateApplied::None
228            }
229        }
230    }
231
232    fn maybe_respond(
233        &self,
234        network: &NetworkTokenContents,
235        responder: NetworkPropertyResponder,
236    ) -> Option<NetworkPropertyResponder> {
237        let mut updates = Vec::new();
238        updates.add_socket_marks(self, network, &responder);
239        updates.add_dns(self, network, &responder);
240
241        if updates.is_empty() {
242            Some(responder)
243        } else {
244            if let Err(e) = responder.respond(Ok(&updates)) {
245                warn!("Could not send to responder: {e}");
246            }
247            None
248        }
249    }
250}
251
252trait PropertyUpdates {
253    fn add_socket_marks(
254        &mut self,
255        network_registry: &RegisteredNetworks,
256        network: &NetworkTokenContents,
257        responder: &NetworkPropertyResponder,
258    );
259    fn add_dns(
260        &mut self,
261        network_registry: &RegisteredNetworks,
262        network: &NetworkTokenContents,
263        responder: &NetworkPropertyResponder,
264    );
265}
266
267impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
268    fn add_socket_marks(
269        &mut self,
270        network_registry: &RegisteredNetworks,
271        network: &NetworkTokenContents,
272        responder: &NetworkPropertyResponder,
273    ) {
274        if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
275            return;
276        }
277
278        match network_registry.networks.get(&network.network_id) {
279            Some(network) => {
280                if let Some(socket_marks) = network.get_marks() {
281                    self.push(fnp_properties::PropertyUpdate::SocketMarks(socket_marks.clone()));
282                }
283                return;
284            }
285            None => {
286                error!(
287                    "State is inconsistent. We attempted to add marks for a \
288            network that is not known: {:?}",
289                    network.network_id
290                );
291            }
292        }
293    }
294
295    fn add_dns(
296        &mut self,
297        network_registry: &RegisteredNetworks,
298        network: &NetworkTokenContents,
299        responder: &NetworkPropertyResponder,
300    ) {
301        if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
302            return;
303        }
304
305        let interface_id = network.network_id;
306        self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
307            fnp_properties::DnsConfiguration {
308                servers: Some(
309                    network_registry
310                        .dns_servers
311                        .iter()
312                        .filter(|d| {
313                            match &d.source {
314                                Some(source) => match source {
315                                    fnet_name::DnsServerSource::StaticSource(_) => true,
316                                    fnet_name::DnsServerSource::SocketProxy(
317                                        fnet_name::SocketProxyDnsServerSource {
318                                            source_interface,
319                                            ..
320                                        },
321                                    ) => match (interface_id, source_interface) {
322                                        (_, None) => true,
323                                        (id1, Some(id2)) => {
324                                            Ok(id1)
325                                                == InterfaceId::try_from(*id2)
326                                                    .map(|id| NetworkId::delegated(id))
327                                        }
328                                    },
329                                    fnet_name::DnsServerSource::Dhcp(
330                                        fnet_name::DhcpDnsServerSource { source_interface, .. },
331                                    )
332                                    | fnet_name::DnsServerSource::Ndp(
333                                        fnet_name::NdpDnsServerSource { source_interface, .. },
334                                    )
335                                    | fnet_name::DnsServerSource::Dhcpv6(
336                                        fnet_name::Dhcpv6DnsServerSource {
337                                            source_interface, ..
338                                        },
339                                    ) => match (interface_id, source_interface) {
340                                        (_, None) => true,
341                                        (id1, Some(id2)) => {
342                                            Ok(id1)
343                                                == InterfaceId::try_from(*id2)
344                                                    .map(|id| NetworkId::fuchsia(id))
345                                        }
346                                    },
347
348                                    _ => {
349                                        error!("unhandled DnsServerSource: {source:?}");
350                                        false
351                                    }
352                                },
353
354                                // No source, assume static source, so include it.
355                                None => true,
356                            }
357                        })
358                        .cloned()
359                        .collect::<Vec<_>>(),
360                ),
361                ..Default::default()
362            },
363        ));
364    }
365}
366
367/// An event representing the properties that changed for a network.
368#[derive(Debug)]
369pub struct NetworkPropertiesChange {
370    /// When true, this is a new network being added. Otherwise, this is an
371    /// update to an existing network.
372    pub added: bool,
373    /// The new marks for the network.
374    pub marks: Option<fnet::Marks>,
375}
376
377#[derive(Debug)]
378pub enum NetworkUpdate {
379    /// Change a network's properties.
380    Properties(NetworkPropertiesChange),
381    Remove,
382    MakeDefault,
383}
384
385#[derive(Debug)]
386enum UpdateApplied {
387    /// No update was performed.
388    None,
389
390    /// A default network has changed. Carries the previous default id, if any.
391    DefaultNetworkChanged(Option<NetworkId>),
392
393    /// Whether the DNS servers changed.
394    DnsChanged,
395
396    /// Network was added or updated, contains the NetworkId of the added network.
397    NetworkChanged { network_id: NetworkId, added: bool, changed_marks: bool },
398
399    /// Network was removed, contains the NetworkId of the removed network.
400    NetworkRemoved(NetworkId),
401}
402
403#[derive(Debug)]
404pub enum PropertyUpdate {
405    LoseDefaultNetwork,
406    ChangeNetwork(NetworkId, NetworkUpdate),
407    UpdateDns(Vec<fnet_name::DnsServer_>),
408}
409
410impl PropertyUpdate {
411    pub fn default_network_lost() -> Self {
412        PropertyUpdate::LoseDefaultNetwork
413    }
414
415    pub fn dns(dns_servers: &DnsServers) -> Self {
416        // TODO(https://fxbug.dev/477980011): Switch to deriving dns servers from
417        // NetworkRegistry updates.
418        PropertyUpdate::UpdateDns(dns_servers.consolidated_dns_servers())
419    }
420}
421
422#[derive(Default)]
423pub struct NetpolNetworksService {
424    // The current generation
425    current_generation: UpdateGeneration,
426    // The last generation sent per connection
427    generations_by_connection: UpdateGenerations,
428    // Default Network Watchers
429    default_network_responders:
430        HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
431    tokens: token_registry::TokenRegistry<NetworkTokenContents>,
432    // NetworkProperty Watchers
433    property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
434    // The networks known to the system
435    network_registry: RegisteredNetworks,
436}
437
438impl NetpolNetworksService {
439    pub async fn handle_network_attributes_request(
440        &mut self,
441        id: ConnectionId,
442        req: Result<fnp_properties::NetworksRequest, fidl::Error>,
443    ) -> Result<(), anyhow::Error> {
444        let req = req.context("network attributes request")?;
445        match req {
446            fnp_properties::NetworksRequest::WatchDefault { responder } => {
447                match self.default_network_responders.entry(id) {
448                    std::collections::hash_map::Entry::Occupied(_) => {
449                        warn!(
450                            "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
451                             may be active per connection"
452                        );
453                        responder
454                            .control_handle()
455                            .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
456                    }
457                    std::collections::hash_map::Entry::Vacant(vacant_entry) => {
458                        let network_id = if self
459                            .generations_by_connection
460                            .default_network(&id)
461                            .unwrap_or_default()
462                            < self.current_generation.default_network
463                        {
464                            self.network_registry.default_network
465                        } else {
466                            None
467                        };
468                        if let Some(network_id) = network_id {
469                            self.generations_by_connection
470                                .set_default_network(id, self.current_generation);
471                            let token = self
472                                .tokens
473                                .ensure_token(NetworkTokenContents { network_id, is_default: true })
474                                .get()
475                                .duplicate()
476                                .context("could not duplicate token")?;
477                            responder.send(
478                                fnp_properties::NetworksWatchDefaultResponse::Network(token),
479                            )?;
480
481                            if let Some(responder) = self.property_responders.remove(&id) {
482                                let _: Option<_> = self.generations_by_connection.remove(&id);
483                                let _: Result<(), fidl::Error> =
484                                    responder.respond(Err(fnp_properties::WatchError::NetworkGone));
485                            }
486                        } else {
487                            let _: &mut _ = vacant_entry.insert(responder);
488                        }
489                    }
490                }
491            }
492            fnp_properties::NetworksRequest::WatchProperties {
493                payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
494                responder,
495            } => match (network, properties) {
496                (None, _) | (_, None) => {
497                    responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
498                }
499                (Some(network), Some(properties)) => {
500                    if properties.is_empty() {
501                        responder.send(Err(fnp_properties::WatchError::NoProperties))?;
502                    } else {
503                        match self.property_responders.entry(id) {
504                            std::collections::hash_map::Entry::Occupied(_) => {
505                                warn!(
506                                    "Only one call to \
507                                    fuchsia.net.policy.properties/Networks.WatchProperties may be \
508                                    active per connection"
509                                );
510                                responder
511                                    .control_handle()
512                                    .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
513                            }
514                            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
515                                match self.tokens.get_contents(&network) {
516                                    Err(e) => {
517                                        warn!("Unknown network token. ({network:?}: {e})");
518                                        responder.send(Err(
519                                            fnp_properties::WatchError::InvalidNetworkToken,
520                                        ))?;
521                                    }
522                                    Ok(network_contents) => {
523                                        let responder = NetworkPropertyResponder {
524                                            token: network,
525                                            watched_properties: properties,
526                                            responder,
527                                        };
528                                        if self
529                                            .generations_by_connection
530                                            .properties(&id)
531                                            .unwrap_or_default()
532                                            < self.current_generation.properties
533                                        {
534                                            self.generations_by_connection
535                                                .set_properties(id, self.current_generation);
536                                            if let Some(responder) = self
537                                                .network_registry
538                                                .maybe_respond(&network_contents, responder)
539                                            {
540                                                let _: &mut NetworkPropertyResponder =
541                                                    vacant_entry.insert(responder);
542                                            }
543                                        } else {
544                                            let _: &mut NetworkPropertyResponder =
545                                                vacant_entry.insert(responder);
546                                        }
547                                    }
548                                }
549                            }
550                        }
551                    }
552                }
553            },
554            _ => {
555                warn!("Received unexpected request {req:?}");
556            }
557        }
558
559        Ok(())
560    }
561
562    pub async fn handle_delegated_networks_update(
563        &mut self,
564        update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
565    ) -> Result<(), anyhow::Error> {
566        use fnp_socketproxy::{
567            NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
568            NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
569        };
570
571        match update {
572            Err(e) => {
573                error!(
574                    "Encountered error watching for delegated network \
575                                    updates: {e:?}"
576                );
577                Ok(())
578            }
579            Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
580                (async || match network_id {
581                    // TODO(https://fxbug.dev/475266563): Stop using
582                    // `fuchsia.posix.socket.OptionalUint32` here.
583                    fposix_socket::OptionalUint32::Value(interface_id) => {
584                        self.update(PropertyUpdate::ChangeNetwork(
585                            NetworkId::delegated(
586                                InterfaceId::try_from(interface_id)
587                                    .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
588                            ),
589                            NetworkUpdate::MakeDefault,
590                        ))
591                        .await;
592                        Ok(())
593                    }
594                    fposix_socket::OptionalUint32::Unset(_) => {
595                        self.update(PropertyUpdate::default_network_lost()).await;
596                        Ok(())
597                    }
598                })()
599                .await,
600            ),
601            Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
602                (async || {
603                    let network_id = network
604                        .network_id
605                        .and_then(|id| InterfaceId::try_from(id).ok())
606                        .map(|id| NetworkId::delegated(id))
607                        .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
608                    let NetworkInfo::Starnix(info) =
609                        network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
610                    else {
611                        return Err(NetworkRegistryAddError::MissingNetworkInfo);
612                    };
613
614                    let mut marks = fnet::Marks::default();
615                    marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
616
617                    // TODO(https://fxbug.dev/477980011): Also include DNS update here,
618                    // rather than relying on DnsServerWatcher provided by socket-proxy.
619                    self.update(PropertyUpdate::ChangeNetwork(
620                        network_id,
621                        NetworkUpdate::Properties(NetworkPropertiesChange {
622                            added: true,
623                            marks: Some(marks),
624                        }),
625                    ))
626                    .await;
627                    Ok(())
628                })()
629                .await,
630            ),
631            Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
632                (async || {
633                    let network_id = network
634                        .network_id
635                        .and_then(|id| InterfaceId::try_from(id).ok())
636                        .map(|id| NetworkId::delegated(id))
637                        .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
638                    let NetworkInfo::Starnix(info) =
639                        network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
640                    else {
641                        return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
642                    };
643
644                    let mut marks = fnet::Marks::default();
645                    marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
646                    self.update(PropertyUpdate::ChangeNetwork(
647                        network_id,
648                        NetworkUpdate::Properties(NetworkPropertiesChange {
649                            added: false,
650                            marks: Some(marks),
651                        }),
652                    ))
653                    .await;
654                    Ok(())
655                })()
656                .await,
657            ),
658            Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
659                (async || {
660                    self.update(PropertyUpdate::ChangeNetwork(
661                        NetworkId::delegated(
662                            // Try to convert network_id to an `InterfaceId`. If
663                            // this fails (i.e. the network_id is 0) this is
664                            // treated the same as a `NOT_FOUND` error.
665                            InterfaceId::try_from(network_id)
666                                .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
667                        ),
668                        NetworkUpdate::Remove,
669                    ))
670                    .await;
671                    Ok(())
672                })()
673                .await,
674            ),
675        }
676        .context("while handling DelegatedNetwork request")
677    }
678
679    pub(crate) async fn handle_network_token_resolver_request(
680        &mut self,
681        request: Result<fnp_properties::NetworkTokenResolverRequest, fidl::Error>,
682    ) -> Result<(), anyhow::Error> {
683        use fnp_properties::NetworkTokenResolverResolveTokenError as ResolveTokenError;
684
685        let request = request.context("while handling NetworkTokenResolver request")?;
686        match request {
687            fnp_properties::NetworkTokenResolverRequest::ResolveToken { token, responder } => {
688                let maybe_contents = self.tokens.get_contents(&token).copied();
689                match maybe_contents {
690                    Err(e) => {
691                        warn!("Unknown network token. ({token:?}: {e})");
692                        responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
693                    }
694                    Ok(contents) => {
695                        if contents.is_default {
696                            // This is a default network token, we need to grab
697                            // the non-default variant.
698                            let query = NetworkTokenContents { is_default: false, ..contents };
699                            if let Some(tok) = self.tokens.get_token(&query) {
700                                responder.send(tok.duplicate().map_err(|e| {
701                                    warn!("Encountered issue duplicating generated token. {e}");
702                                    ResolveTokenError::InvalidNetworkToken
703                                }))?;
704                            } else {
705                                warn!("Requested canonical version of unregistered network.");
706                                responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
707                            }
708                        } else {
709                            responder.send(Ok(token))?;
710                        }
711                    }
712                }
713            }
714            fidl_fuchsia_net_policy_properties::NetworkTokenResolverRequest::_UnknownMethod {
715                ordinal,
716                control_handle,
717                method_type,
718                ..
719            } => warn!(
720                "Encountered unknown method call on NetworkTokenResolver: {ordinal} \
721                {control_handle:?} {method_type:?}"
722            ),
723        }
724
725        Ok(())
726    }
727
728    async fn changed_default_network(
729        &mut self,
730        previous_default_network: Option<NetworkId>,
731        responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
732    ) {
733        let mut r = HashMap::new();
734        std::mem::swap(&mut r, responders);
735        r = r
736            .into_iter()
737            .filter_map(|(id, responder)| {
738                match self.tokens.get_contents(&responder.token) {
739                    Ok(contents) => {
740                        // We only want to remove when watching a default token.
741                        if contents.is_default {
742                            let _: Option<_> = self.generations_by_connection.remove(&id);
743                            let _: Result<(), fidl::Error> =
744                                responder.respond(Err(fnp_properties::WatchError::NetworkGone));
745                            return None;
746                        }
747                    }
748                    Err(zx::Status::NOT_FOUND) => {
749                        warn!("Token provided to get_contents is not valid.");
750                    }
751                    Err(e) => {
752                        warn!("Encountered unknown issue while getting contents: {e}");
753                    }
754                }
755                Some((id, responder))
756            })
757            .collect::<HashMap<_, _>>();
758        std::mem::swap(&mut r, responders);
759        self.tokens.drop_if(|&c| {
760            c.is_default && previous_default_network.is_some_and(|i| i == c.network_id)
761        });
762    }
763
764    pub(crate) async fn remove_network(&mut self, network_id: NetworkId) {
765        info!("Removing interface {network_id}. Reporting NETWORK_GONE to all clients.");
766        let mut responders = HashMap::new();
767        std::mem::swap(&mut self.property_responders, &mut responders);
768        for (id, responder) in responders {
769            let network = match self.tokens.get_contents(&responder.token) {
770                Ok(network) => network,
771                Err(e) => {
772                    warn!("Could not fetch network data for responder: {e}");
773                    continue;
774                }
775            };
776            if network.network_id == network_id {
777                // Report that this interface was removed
778                if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
779                    warn!("Could not send to responder: {e}");
780                }
781            } else {
782                if self.property_responders.insert(id, responder).is_some() {
783                    error!("Re-inserted in an existing responder slot. This should be impossible.");
784                }
785            }
786        }
787    }
788
789    pub async fn update(&mut self, update: PropertyUpdate) {
790        self.current_generation.properties += 1;
791        let update_applied = self.network_registry.apply(update);
792        if let UpdateApplied::None = update_applied {
793            // Return early if the update resulted in no changes.
794            return;
795        }
796
797        let mut property_responders = HashMap::new();
798        std::mem::swap(&mut self.property_responders, &mut property_responders);
799
800        match update_applied {
801            UpdateApplied::DefaultNetworkChanged(previous_default) => {
802                self.changed_default_network(previous_default, &mut property_responders).await;
803                match self.network_registry.default_network {
804                    Some(default_network) => {
805                        self.current_generation.default_network += 1;
806                        let mut responders = HashMap::new();
807                        std::mem::swap(&mut self.default_network_responders, &mut responders);
808                        for (id, responder) in responders {
809                            self.generations_by_connection
810                                .set_default_network(id, self.current_generation);
811                            match self
812                                .tokens
813                                .ensure_token(NetworkTokenContents {
814                                    network_id: default_network,
815                                    is_default: true,
816                                })
817                                .get()
818                                .duplicate()
819                            {
820                                Ok(token) => {
821                                    if let Err(e) = responder.send(
822                                        fnp_properties::NetworksWatchDefaultResponse::Network(
823                                            token,
824                                        ),
825                                    ) {
826                                        warn!("Could not send to responder: {e}");
827                                    }
828                                }
829                                Err(e) => warn!("Could not duplicate token: {e}"),
830                            };
831                        }
832                    }
833                    None => {
834                        // The default network has been lost.
835                        self.current_generation.default_network += 1;
836                        let mut responders = HashMap::new();
837                        std::mem::swap(&mut self.default_network_responders, &mut responders);
838                        for (id, responder) in responders {
839                            self.generations_by_connection
840                                .set_default_network(id, self.current_generation);
841                            if let Err(e) = responder.send(
842                                fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
843                                    fnp_properties::Empty,
844                                ),
845                            ) {
846                                warn!("Could not send to responder: {e}");
847                            }
848                        }
849                    }
850                }
851
852                // All property updaters have been notified
853                return;
854            }
855            UpdateApplied::NetworkChanged { network_id, added: true, .. } => {
856                let _ = self
857                    .tokens
858                    .ensure_token(NetworkTokenContents { network_id, is_default: false });
859            }
860            UpdateApplied::NetworkRemoved(network_id) => {
861                self.tokens.drop_if(|c| !c.is_default && c.network_id == network_id);
862            }
863            UpdateApplied::NetworkChanged { added: false, .. } => {
864                // The network already exists so the token must also exist.
865                // No action is needed.
866            }
867            // TODO(https://fxbug.dev/477980011): Switch to deriving dns servers from
868            // NetworkRegistry updates.
869            UpdateApplied::DnsChanged => {}
870            UpdateApplied::None => {}
871        }
872
873        for (id, responder) in property_responders {
874            let mut updates = Vec::new();
875            let network = match self.tokens.get_contents(&responder.token) {
876                Ok(network) => network,
877                Err(e) => {
878                    warn!("Could not fetch network data for responder: {e}");
879                    continue;
880                }
881            };
882
883            if let UpdateApplied::NetworkChanged { network_id, changed_marks: true, .. } =
884                update_applied
885            {
886                if network.network_id == network_id {
887                    updates.add_socket_marks(&self.network_registry, &network, &responder);
888                }
889            }
890            if let UpdateApplied::DnsChanged = update_applied {
891                updates.add_dns(&self.network_registry, &network, &responder);
892            }
893
894            self.generations_by_connection.set_properties(id, self.current_generation);
895            if updates.is_empty() {
896                if self.property_responders.insert(id, responder).is_some() {
897                    warn!("Re-inserted in an existing responder slot. This should be impossible.");
898                }
899            } else {
900                if let Err(e) = responder.respond(Ok(&updates)) {
901                    warn!("Could not send to responder: {e}");
902                }
903            }
904        }
905    }
906}
907
908pub struct ConnectionTagged<Stream: futures::Stream + Unpin> {
909    next_id: ConnectionId,
910    streams: futures::stream::SelectAll<Tagged<ConnectionId, Stream>>,
911}
912
913impl<Stream: futures::Stream + Unpin> Default for ConnectionTagged<Stream> {
914    fn default() -> Self {
915        Self { next_id: Default::default(), streams: Default::default() }
916    }
917}
918
919impl<Stream: futures::Stream + Unpin> ConnectionTagged<Stream> {
920    pub fn push(&mut self, stream: Stream) {
921        self.streams.push(stream.tagged(self.next_id));
922        self.next_id.0 += 1;
923    }
924}
925
926impl<Stream: futures::Stream + Unpin> futures::Stream for ConnectionTagged<Stream> {
927    type Item = (ConnectionId, <Stream as futures::Stream>::Item);
928
929    fn poll_next(
930        mut self: std::pin::Pin<&mut Self>,
931        cx: &mut std::task::Context<'_>,
932    ) -> std::task::Poll<Option<Self::Item>> {
933        std::pin::Pin::new(&mut self.streams).poll_next(cx)
934    }
935}
936
937impl<Stream: futures::Stream + Unpin> futures::stream::FusedStream for ConnectionTagged<Stream> {
938    fn is_terminated(&self) -> bool {
939        self.streams.is_terminated()
940    }
941}