netcfg/network/
mod.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::InterfaceId;
6use anyhow::Context as _;
7use async_utils::stream::{Tagged, WithTag as _};
8use dns_server_watcher::DnsServers;
9use fidl::endpoints::{ControlHandle as _, Responder as _};
10use log::{error, info, warn};
11use std::collections::HashMap;
12
13mod token_map;
14
15use {
16    fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name,
17    fidl_fuchsia_net_policy_properties as fnp_properties,
18    fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy,
19    fidl_fuchsia_posix_socket as fposix_socket,
20};
21
22pub(crate) struct NetworkTokenContents {
23    connection_id: ConnectionId,
24    interface_id: InterfaceId,
25}
26
27#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub struct ConnectionId(usize);
29
30#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct UpdateGeneration {
32    /// The current generation for `fuchsia.net.policy.properties.WatchDefault`.
33    /// Incremented each time the default network changes.
34    default_network: usize,
35
36    /// The current generation for `fuchsia.net.policy.properties.WatchProperties`.
37    /// Incremented each time a network property changes.
38    properties: usize,
39}
40
41#[derive(Clone, Debug, Default)]
42pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
43
44impl UpdateGenerations {
45    fn default_network(&self, id: &ConnectionId) -> Option<usize> {
46        self.0.get(id).map(|g| g.default_network)
47    }
48
49    fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
50        self.0.entry(id).or_default().default_network = generation.default_network;
51    }
52
53    fn properties(&self, id: &ConnectionId) -> Option<usize> {
54        self.0.get(id).map(|g| g.properties)
55    }
56
57    fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
58        self.0.entry(id).or_default().properties = generation.properties;
59    }
60
61    fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
62        self.0.remove(id)
63    }
64}
65
66#[derive(Debug)]
67pub(crate) struct NetworkPropertyResponder {
68    token: fidl::EventPair,
69    watched_properties: Vec<fnp_properties::Property>,
70    responder: fnp_properties::NetworksWatchPropertiesResponder,
71}
72
73impl NetworkPropertyResponder {
74    fn respond(
75        self,
76        response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
77    ) -> Result<(), fidl::Error> {
78        self.responder.send(response)
79    }
80}
81
82#[derive(Default)]
83pub struct NetpolNetworksService {
84    // The current generation
85    current_generation: UpdateGeneration,
86    // The last generation sent per connection
87    generations_by_connection: UpdateGenerations,
88    // Default Network Watchers
89    default_network_responders:
90        HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
91    tokens: token_map::TokenMap<NetworkTokenContents>,
92    // NetworkProperty Watchers
93    property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
94    // Network properties
95    network_properties: NetworkProperties,
96}
97
98#[derive(Default, Clone)]
99struct NetworkProperties {
100    // Current default network
101    default_network: Option<InterfaceId>,
102    // Network marks
103    socket_marks: HashMap<InterfaceId, fnet::Marks>,
104    // DNS Servers
105    dns_servers: Vec<fnet_name::DnsServer_>,
106}
107
108impl NetworkProperties {
109    fn apply(&mut self, update: PropertyUpdate) -> UpdatesApplied {
110        let mut updates = UpdatesApplied::default();
111
112        if let Some(new_default_network) = update.default_network {
113            updates.default_network = self.handle_default_network_update(new_default_network);
114        }
115
116        if let Some((netid, marks)) = update.socket_marks {
117            updates.socket_marks_network = self.handle_socket_marks_update(netid, marks);
118        }
119
120        if let Some(dns) = update.dns {
121            updates.dns_changed = self.dns_servers != dns;
122            self.dns_servers = dns;
123        }
124
125        updates
126    }
127
128    // Handle the `default_network` argument in a `PropertyUpdate`, determining
129    // whether the network changed as a result of the update.
130    //
131    // Returns the update to set for the `default_network` argument
132    // of UpdatesApplied.
133    fn handle_default_network_update(
134        &mut self,
135        new_default_network: Option<InterfaceId>,
136    ) -> Option<Option<InterfaceId>> {
137        // We do not need to send an update applied if the network stayed the same.
138        if new_default_network == self.default_network {
139            return None;
140        }
141
142        let old_default_network = self.default_network;
143        if let (None, Some(old_default_network_id)) = (new_default_network, old_default_network) {
144            let _: Option<_> = self.socket_marks.remove(&old_default_network_id);
145        }
146        self.default_network = new_default_network;
147        return Some(old_default_network);
148    }
149
150    // Handle the `socket_marks` argument in a `PropertyUpdate`, determining
151    // whether the socket marks changed as a result of the update.
152    //
153    // Returns the update to set for the `socket_marks_network` argument
154    // of UpdatesApplied.
155    fn handle_socket_marks_update(
156        &mut self,
157        netid: InterfaceId,
158        marks: fnet::Marks,
159    ) -> Option<InterfaceId> {
160        // We do not need to send an update applied if the marks for the
161        // provided netid stay the same.
162        if self.socket_marks.contains_key(&netid)
163            && self
164                .socket_marks
165                .get(&netid)
166                .and_then(|old_marks| Some(*old_marks == marks))
167                .unwrap_or_default()
168        {
169            return None;
170        }
171
172        let _: Option<_> = self.socket_marks.insert(netid, marks);
173        return Some(netid);
174    }
175
176    fn maybe_respond(
177        &self,
178        network: &NetworkTokenContents,
179        responder: NetworkPropertyResponder,
180    ) -> Option<NetworkPropertyResponder> {
181        let mut updates = Vec::new();
182        updates.add_socket_marks(self, network, &responder);
183        updates.add_dns(self, network, &responder);
184
185        if updates.is_empty() {
186            Some(responder)
187        } else {
188            if let Err(e) = responder.respond(Ok(&updates)) {
189                warn!("Could not send to responder: {e}");
190            }
191            None
192        }
193    }
194}
195
196trait PropertyUpdates {
197    fn add_socket_marks(
198        &mut self,
199        properties: &NetworkProperties,
200        network: &NetworkTokenContents,
201        responder: &NetworkPropertyResponder,
202    );
203    fn add_dns(
204        &mut self,
205        properties: &NetworkProperties,
206        network: &NetworkTokenContents,
207        responder: &NetworkPropertyResponder,
208    );
209}
210
211impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
212    fn add_socket_marks(
213        &mut self,
214        properties: &NetworkProperties,
215        network: &NetworkTokenContents,
216        responder: &NetworkPropertyResponder,
217    ) {
218        if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
219            return;
220        }
221        match properties.socket_marks.get(&network.interface_id) {
222            Some(marks) => self.push(fnp_properties::PropertyUpdate::SocketMarks(marks.clone())),
223            None => {}
224        }
225    }
226
227    fn add_dns(
228        &mut self,
229        properties: &NetworkProperties,
230        network: &NetworkTokenContents,
231        responder: &NetworkPropertyResponder,
232    ) {
233        if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
234            return;
235        }
236
237        let interface_id = network.interface_id;
238        self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
239            fnp_properties::DnsConfiguration {
240                servers: Some(
241                    properties
242                        .dns_servers
243                        .iter()
244                        .filter(|d| {
245                            match &d.source {
246                                Some(source) => match source {
247                                    fnet_name::DnsServerSource::StaticSource(_) => true,
248                                    fnet_name::DnsServerSource::SocketProxy(
249                                        fnet_name::SocketProxyDnsServerSource {
250                                            source_interface,
251                                            ..
252                                        },
253                                    )
254                                    | fnet_name::DnsServerSource::Dhcp(
255                                        fnet_name::DhcpDnsServerSource { source_interface, .. },
256                                    )
257                                    | fnet_name::DnsServerSource::Ndp(
258                                        fnet_name::NdpDnsServerSource { source_interface, .. },
259                                    )
260                                    | fnet_name::DnsServerSource::Dhcpv6(
261                                        fnet_name::Dhcpv6DnsServerSource {
262                                            source_interface, ..
263                                        },
264                                    ) => match (interface_id, source_interface) {
265                                        (_, None) => true,
266                                        (id1, Some(id2)) => id1.get() == *id2,
267                                    },
268
269                                    _ => {
270                                        error!("unhandled DnsServerSource: {source:?}");
271                                        false
272                                    }
273                                },
274
275                                // No source, assume static source, so include it.
276                                None => true,
277                            }
278                        })
279                        .cloned()
280                        .collect::<Vec<_>>(),
281                ),
282                ..Default::default()
283            },
284        ));
285    }
286}
287
288#[derive(Default, Debug)]
289pub struct PropertyUpdate {
290    default_network: Option<Option<InterfaceId>>,
291    socket_marks: Option<(InterfaceId, fnet::Marks)>,
292    dns: Option<Vec<fnet_name::DnsServer_>>,
293}
294
295#[derive(Default, Debug)]
296struct UpdatesApplied {
297    // If Some, contains old network id
298    default_network: Option<Option<InterfaceId>>,
299
300    // If Some, contains the network ID for which the mark was set
301    socket_marks_network: Option<InterfaceId>,
302
303    // Was the DNS changed
304    dns_changed: bool,
305}
306
307impl PropertyUpdate {
308    pub fn default_network<N: Into<InterfaceId>>(mut self, network_id: N) -> Self {
309        self.default_network = Some(Some(network_id.into()));
310        self
311    }
312
313    pub fn default_network_lost(mut self) -> Self {
314        self.default_network = Some(None);
315        self
316    }
317
318    pub fn socket_marks<N: Into<InterfaceId>, Marks: Into<fnet::Marks>>(
319        mut self,
320        network_id: N,
321        marks: Marks,
322    ) -> Self {
323        self.socket_marks = Some((network_id.into(), marks.into()));
324        self
325    }
326
327    pub fn dns(mut self, dns_servers: &DnsServers) -> Self {
328        self.dns = Some(dns_servers.consolidated_dns_servers());
329        self
330    }
331
332    pub fn add_network<N: Into<InterfaceId>>(self, _id: N) -> Self {
333        // TODO(https://fxbug.dev/428712735): Implement tracking of added networks.
334        self
335    }
336
337    pub fn remove_network<N: Into<InterfaceId>>(self, _id: N) -> Self {
338        // TODO(https://fxbug.dev/428712735): Implement tracking of added networks.
339        self
340    }
341}
342
343impl NetpolNetworksService {
344    pub async fn handle_network_attributes_request(
345        &mut self,
346        id: ConnectionId,
347        req: Result<fnp_properties::NetworksRequest, fidl::Error>,
348    ) -> Result<(), anyhow::Error> {
349        let req = req.context("network attributes request")?;
350        match req {
351            fnp_properties::NetworksRequest::WatchDefault { responder } => {
352                match self.default_network_responders.entry(id) {
353                    std::collections::hash_map::Entry::Occupied(_) => {
354                        warn!(
355                            "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
356                             may be active per connection"
357                        );
358                        responder
359                            .control_handle()
360                            .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
361                    }
362                    std::collections::hash_map::Entry::Vacant(vacant_entry) => {
363                        let interface_id = if self
364                            .generations_by_connection
365                            .default_network(&id)
366                            .unwrap_or_default()
367                            < self.current_generation.default_network
368                        {
369                            match self.network_properties.default_network {
370                                Some(interface_id) => Some(interface_id),
371                                None => None,
372                            }
373                        } else {
374                            None
375                        };
376                        if let Some(interface_id) = interface_id {
377                            self.generations_by_connection
378                                .set_default_network(id, self.current_generation);
379                            let token = self
380                                .tokens
381                                .insert_data(NetworkTokenContents {
382                                    connection_id: id,
383                                    interface_id,
384                                })
385                                .await;
386                            responder.send(
387                                fnp_properties::NetworksWatchDefaultResponse::Network(
388                                    fnp_properties::NetworkToken {
389                                        value: Some(token),
390                                        ..Default::default()
391                                    },
392                                ),
393                            )?;
394
395                            if let Some(responder) = self.property_responders.remove(&id) {
396                                let _: Option<_> = self.generations_by_connection.remove(&id);
397                                let _: Result<(), fidl::Error> = responder.respond(Err(
398                                    fnp_properties::WatchError::DefaultNetworkChanged,
399                                ));
400                            }
401                        } else {
402                            let _: &mut _ = vacant_entry.insert(responder);
403                        }
404                    }
405                }
406            }
407            fnp_properties::NetworksRequest::WatchProperties {
408                payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
409                responder,
410            } => match (network, properties) {
411                (None, _) | (_, None) => {
412                    responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
413                }
414                (Some(network), Some(properties)) => {
415                    if properties.is_empty() {
416                        responder.send(Err(fnp_properties::WatchError::NoProperties))?;
417                    } else if let Some(token) = network.value {
418                        match self.property_responders.entry(id) {
419                            std::collections::hash_map::Entry::Occupied(_) => {
420                                warn!(
421                                    "Only one call to \
422                                    fuchsia.net.policy.properties/Networks.WatchProperties may be \
423                                    active per connection"
424                                );
425                                responder
426                                    .control_handle()
427                                    .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
428                            }
429                            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
430                                match self.tokens.get(&token).await {
431                                    None => {
432                                        warn!("Unknown network token. ({token:?}");
433                                        responder.send(Err(
434                                            fnp_properties::WatchError::InvalidNetworkToken,
435                                        ))?;
436                                    }
437                                    Some(network_contents) => {
438                                        if network_contents.connection_id != id {
439                                            warn!(
440                                                "Cannot watch a NetworkToken that was not created \
441                                                by this connection."
442                                            );
443                                            responder.send(Err(
444                                                fnp_properties::WatchError::InvalidNetworkToken,
445                                            ))?;
446                                        } else {
447                                            let responder = NetworkPropertyResponder {
448                                                token,
449                                                watched_properties: properties,
450                                                responder,
451                                            };
452                                            if self
453                                                .generations_by_connection
454                                                .properties(&id)
455                                                .unwrap_or_default()
456                                                < self.current_generation.properties
457                                            {
458                                                self.generations_by_connection
459                                                    .set_properties(id, self.current_generation);
460                                                if let Some(responder) = self
461                                                    .network_properties
462                                                    .maybe_respond(&network_contents, responder)
463                                                {
464                                                    let _: &mut NetworkPropertyResponder =
465                                                        vacant_entry.insert(responder);
466                                                }
467                                            } else {
468                                                let _: &mut NetworkPropertyResponder =
469                                                    vacant_entry.insert(responder);
470                                            }
471                                        }
472                                    }
473                                }
474                            }
475                        }
476                    } else {
477                        responder.send(Err(fnp_properties::WatchError::InvalidNetworkToken))?;
478                    }
479                }
480            },
481            _ => {
482                warn!("Received unexpected request {req:?}");
483            }
484        }
485
486        Ok(())
487    }
488
489    pub async fn handle_delegated_networks_update(
490        &mut self,
491        update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
492    ) -> Result<(), anyhow::Error> {
493        use fnp_socketproxy::{
494            NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
495            NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
496        };
497
498        match update {
499            Err(e) => {
500                error!(
501                    "Encountered error watching for delegated network \
502                                    updates: {e:?}"
503                );
504                Ok(())
505            }
506            Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
507                (async || match network_id {
508                    // TODO(https://fxbug.dev/475266563): Stop using
509                    // `fuchsia.posix.socket.OptionalUint32` here.
510                    fposix_socket::OptionalUint32::Value(interface_id) => {
511                        self.update(
512                            PropertyUpdate::default().default_network(
513                                InterfaceId::try_from(interface_id)
514                                    .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
515                            ),
516                        )
517                        .await;
518                        Ok(())
519                    }
520                    fposix_socket::OptionalUint32::Unset(_) => {
521                        self.update(PropertyUpdate::default().default_network_lost()).await;
522                        Ok(())
523                    }
524                })()
525                .await,
526            ),
527            Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
528                (async || {
529                    let network_id: InterfaceId = network
530                        .network_id
531                        .and_then(|id| id.try_into().ok())
532                        .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
533                    let NetworkInfo::Starnix(info) =
534                        network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
535                    else {
536                        return Err(NetworkRegistryAddError::MissingNetworkInfo);
537                    };
538                    self.update(PropertyUpdate::default().add_network(network_id).socket_marks(
539                        network_id,
540                        // TODO(https://fxbug.dev/431822969): Replace this with a common definition
541                        // of which mark domain is used for which purpose.
542                        fnet::Marks { mark_1: info.mark, mark_2: None, ..Default::default() },
543                    ))
544                    .await;
545                    Ok(())
546                })()
547                .await,
548            ),
549            Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
550                (async || {
551                    let network_id: InterfaceId = network
552                        .network_id
553                        .and_then(|id| id.try_into().ok())
554                        .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
555                    let NetworkInfo::Starnix(info) =
556                        network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
557                    else {
558                        return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
559                    };
560                    self.update(PropertyUpdate::default().add_network(network_id).socket_marks(
561                        network_id,
562                        // TODO(https://fxbug.dev/431822969): Replace this with a common definition
563                        // of which mark domain is used for which purpose.
564                        fnet::Marks { mark_1: info.mark, mark_2: None, ..Default::default() },
565                    ))
566                    .await;
567                    Ok(())
568                })()
569                .await,
570            ),
571            Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
572                (async || {
573                    self.update(
574                        PropertyUpdate::default().remove_network(
575                            // Try to convert network_id to an `InterfaceId`. If
576                            // this fails (i.e. the network_id is 0) this is
577                            // treated the same as a `NOT_FOUND` error.
578                            InterfaceId::try_from(network_id)
579                                .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
580                        ),
581                    )
582                    .await;
583                    Ok(())
584                })()
585                .await,
586            ),
587        }
588        .context("while handling DelegatedNetwork request")
589    }
590
591    async fn changed_default_network(
592        error: fnp_properties::WatchError,
593        responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
594        generations: &mut UpdateGenerations,
595    ) {
596        let mut r = HashMap::new();
597        std::mem::swap(&mut r, responders);
598        r = r
599            .into_iter()
600            .filter_map(|(id, responder)| {
601                // NB: Currently all responders are for the default network.
602                let _: Option<_> = generations.remove(&id);
603                let _: Result<(), fidl::Error> = responder.respond(Err(error));
604                None
605            })
606            .collect::<HashMap<_, _>>();
607        std::mem::swap(&mut r, responders);
608    }
609
610    pub(crate) async fn remove_network<ID: Into<InterfaceId>>(&mut self, interface_id: ID) {
611        let interface_id = interface_id.into();
612        info!("Removing interface {interface_id}. Reporting NETWORK_GONE to all clients.");
613        let mut responders = HashMap::new();
614        std::mem::swap(&mut self.property_responders, &mut responders);
615        for (id, responder) in responders {
616            let network = match self.tokens.get(&responder.token).await {
617                Some(network) => network,
618                None => {
619                    warn!("Could not fetch network data for responder");
620                    continue;
621                }
622            };
623            if network.interface_id == interface_id {
624                // Report that this interface was removed
625                if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
626                    warn!("Could not send to responder: {e}");
627                }
628            } else {
629                if self.property_responders.insert(id, responder).is_some() {
630                    error!("Re-inserted in an existing responder slot. This should be impossible.");
631                }
632            }
633        }
634    }
635
636    pub async fn update(&mut self, update: PropertyUpdate) {
637        self.current_generation.properties += 1;
638        let updates_applied = self.network_properties.apply(update);
639        let mut property_responders = HashMap::new();
640        std::mem::swap(&mut self.property_responders, &mut property_responders);
641
642        if updates_applied.default_network.is_some() {
643            if let Some(default_network) = self.network_properties.default_network {
644                self.current_generation.default_network += 1;
645                let mut responders = HashMap::new();
646                std::mem::swap(&mut self.default_network_responders, &mut responders);
647                for (id, responder) in responders {
648                    self.generations_by_connection.set_default_network(id, self.current_generation);
649
650                    let token = self
651                        .tokens
652                        .insert_data(NetworkTokenContents {
653                            connection_id: id,
654                            interface_id: default_network,
655                        })
656                        .await;
657
658                    if let Err(e) =
659                        responder.send(fnp_properties::NetworksWatchDefaultResponse::Network(
660                            fnp_properties::NetworkToken {
661                                value: Some(token),
662                                ..Default::default()
663                            },
664                        ))
665                    {
666                        warn!("Could not send to responder: {e}");
667                    }
668                }
669
670                NetpolNetworksService::changed_default_network(
671                    fnp_properties::WatchError::DefaultNetworkChanged,
672                    &mut property_responders,
673                    &mut self.generations_by_connection,
674                )
675                .await;
676            } else {
677                // The default network has been lost.
678                self.current_generation.default_network += 1;
679                let mut responders = HashMap::new();
680                std::mem::swap(&mut self.default_network_responders, &mut responders);
681                for (id, responder) in responders {
682                    self.generations_by_connection.set_default_network(id, self.current_generation);
683                    if let Err(e) = responder.send(
684                        fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
685                            fnp_properties::Empty,
686                        ),
687                    ) {
688                        warn!("Could not send to responder: {e}");
689                    }
690                }
691
692                NetpolNetworksService::changed_default_network(
693                    fnp_properties::WatchError::DefaultNetworkLost,
694                    &mut property_responders,
695                    &mut self.generations_by_connection,
696                )
697                .await;
698            }
699        }
700
701        for (id, responder) in property_responders {
702            let mut updates = Vec::new();
703            let network = match self.tokens.get(&responder.token).await {
704                Some(network) => network,
705                None => {
706                    warn!("Could not fetch network data for responder");
707                    continue;
708                }
709            };
710
711            if let Some(network_id) = updates_applied.socket_marks_network {
712                if network.interface_id == network_id {
713                    updates.add_socket_marks(&self.network_properties, &network, &responder);
714                }
715            }
716            if updates_applied.dns_changed {
717                updates.add_dns(&self.network_properties, &network, &responder);
718            }
719
720            self.generations_by_connection.set_properties(id, self.current_generation);
721            if updates.is_empty() {
722                if self.property_responders.insert(id, responder).is_some() {
723                    warn!("Re-inserted in an existing responder slot. This should be impossible.");
724                }
725            } else {
726                if let Err(e) = responder.respond(Ok(&updates)) {
727                    warn!("Could not send to responder: {e}");
728                }
729            }
730        }
731    }
732}
733
734#[derive(Default)]
735pub struct NetworksRequestStreams {
736    next_id: ConnectionId,
737    request_streams:
738        futures::stream::SelectAll<Tagged<ConnectionId, fnp_properties::NetworksRequestStream>>,
739}
740
741impl NetworksRequestStreams {
742    pub fn push(&mut self, stream: fnp_properties::NetworksRequestStream) {
743        self.request_streams.push(stream.tagged(self.next_id));
744        self.next_id.0 += 1;
745    }
746}
747
748impl futures::Stream for NetworksRequestStreams {
749    type Item = (ConnectionId, Result<fnp_properties::NetworksRequest, fidl::Error>);
750
751    fn poll_next(
752        mut self: std::pin::Pin<&mut Self>,
753        cx: &mut std::task::Context<'_>,
754    ) -> std::task::Poll<Option<Self::Item>> {
755        std::pin::Pin::new(&mut self.request_streams).poll_next(cx)
756    }
757}
758
759impl futures::stream::FusedStream for NetworksRequestStreams {
760    fn is_terminated(&self) -> bool {
761        self.request_streams.is_terminated()
762    }
763}