1use crate::InterfaceId;
6use anyhow::Context as _;
7use async_utils::stream::{Tagged, WithTag as _};
8use dns_server_watcher::DnsServers;
9use fidl::endpoints::{ControlHandle as _, Responder as _};
10use log::{error, info, warn};
11use policy_properties::NetworkTokenExt as _;
12use std::collections::HashMap;
13use std::collections::hash_map::Entry;
14
15mod token_registry;
16
17use {
18 fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name,
19 fidl_fuchsia_net_policy_properties as fnp_properties,
20 fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy,
21 fidl_fuchsia_posix_socket as fposix_socket,
22};
23
24#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum NetworkId {
30 Fuchsia(InterfaceId),
31 Delegated(InterfaceId),
32}
33
34impl std::fmt::Display for NetworkId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 NetworkId::Fuchsia(interface_id) => write!(f, "fuchsia:{interface_id}"),
38 NetworkId::Delegated(interface_id) => write!(f, "delegated:{interface_id}"),
39 }
40 }
41}
42
43impl NetworkId {
44 pub fn get(&self) -> InterfaceId {
45 match self {
46 NetworkId::Fuchsia(interface_id) => *interface_id,
47 NetworkId::Delegated(interface_id) => *interface_id,
48 }
49 }
50
51 pub fn fuchsia<I: Into<InterfaceId>>(id: I) -> Self {
52 NetworkId::Fuchsia(id.into())
53 }
54
55 pub fn delegated<I: Into<InterfaceId>>(id: I) -> Self {
56 NetworkId::Delegated(id.into())
57 }
58}
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub(crate) struct NetworkTokenContents {
62 network_id: NetworkId,
63 is_default: bool,
64}
65
66#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct ConnectionId(usize);
68
69#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct UpdateGeneration {
71 default_network: usize,
74
75 properties: usize,
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
82
83impl UpdateGenerations {
84 fn default_network(&self, id: &ConnectionId) -> Option<usize> {
85 self.0.get(id).map(|g| g.default_network)
86 }
87
88 fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
89 self.0.entry(id).or_default().default_network = generation.default_network;
90 }
91
92 fn properties(&self, id: &ConnectionId) -> Option<usize> {
93 self.0.get(id).map(|g| g.properties)
94 }
95
96 fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
97 self.0.entry(id).or_default().properties = generation.properties;
98 }
99
100 fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
101 self.0.remove(id)
102 }
103}
104
105trait SetMark {
106 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>);
107}
108
109impl SetMark for fnet::Marks {
110 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>) {
111 match domain {
112 fnet::MarkDomain::Mark1 => self.mark_1 = value,
113 fnet::MarkDomain::Mark2 => self.mark_2 = value,
114 }
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct NetworkPropertyResponder {
120 token: fnp_properties::NetworkToken,
121 watched_properties: Vec<fnp_properties::Property>,
122 responder: fnp_properties::NetworksWatchPropertiesResponder,
123}
124
125impl NetworkPropertyResponder {
126 fn respond(
127 self,
128 response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
129 ) -> Result<(), fidl::Error> {
130 self.responder.send(response)
131 }
132}
133
134#[derive(Default, Clone)]
135struct NetworkProperties {
136 socket_marks: Option<fnet::Marks>,
137}
138
139impl NetworkProperties {
140 fn get_marks(&self) -> Option<&fnet::Marks> {
141 self.socket_marks.as_ref()
142 }
143}
144
145#[derive(Default, Clone)]
147struct RegisteredNetworks {
148 default_network: Option<NetworkId>,
149 networks: HashMap<NetworkId, NetworkProperties>,
150 dns_servers: Vec<fnet_name::DnsServer_>,
151}
152
153impl RegisteredNetworks {
154 fn apply(&mut self, update: PropertyUpdate) -> UpdateApplied {
155 match update {
156 PropertyUpdate::LoseDefaultNetwork => self.handle_default_network_update(None),
157 PropertyUpdate::ChangeNetwork(network_id, network_change) => match network_change {
158 NetworkUpdate::Properties(event) => self.handle_changed_network(network_id, event),
159 NetworkUpdate::Remove => UpdateApplied::NetworkRemoved(network_id),
160 NetworkUpdate::MakeDefault => self.handle_default_network_update(Some(network_id)),
161 },
162 PropertyUpdate::UpdateDns(dns_servers) => {
163 if self.dns_servers != dns_servers {
164 self.dns_servers = dns_servers;
165 UpdateApplied::DnsChanged
166 } else {
167 UpdateApplied::None
168 }
169 }
170 }
171 }
172
173 fn handle_default_network_update(
179 &mut self,
180 new_default_network: Option<NetworkId>,
181 ) -> UpdateApplied {
182 if new_default_network == self.default_network {
184 return UpdateApplied::None;
185 }
186
187 let old_default_network = self.default_network;
188 self.default_network = new_default_network;
189 return UpdateApplied::DefaultNetworkChanged(old_default_network);
190 }
191
192 fn handle_changed_network(
197 &mut self,
198 network_id: NetworkId,
199 event: NetworkPropertiesChange,
200 ) -> UpdateApplied {
201 let NetworkPropertiesChange { added, marks: socket_marks } = event;
202 let entry = self.networks.entry(network_id);
203 let result = match (added, &entry, network_id, socket_marks) {
204 (true, Entry::Occupied(_), _, _) => Err("add already added network"),
205 (false, Entry::Vacant(_), _, _) => Err("update a non-added network"),
206 (_, _, NetworkId::Fuchsia(_), Some(_)) => Err("have a fuchsia network with marks"),
207 (_, _, NetworkId::Delegated(_), None) => Err("have a delegated network without marks"),
208
209 (_, _, NetworkId::Fuchsia(_), None) => Ok((NetworkProperties::default(), added)),
210 (_, entry, NetworkId::Delegated(_), Some(socket_marks)) => {
211 let changed = if let Entry::Occupied(e) = entry {
212 e.get().get_marks() != Some(&socket_marks)
213 } else {
214 true
215 };
216 Ok((NetworkProperties { socket_marks: Some(socket_marks) }, changed))
217 }
218 };
219
220 match result {
221 Ok((properties, changed_marks)) => {
222 let _ = entry.insert_entry(properties);
223 UpdateApplied::NetworkChanged { network_id, added, changed_marks }
224 }
225 Err(e) => {
226 error!("Cannot {e}. Update ignored.");
227 UpdateApplied::None
228 }
229 }
230 }
231
232 fn maybe_respond(
233 &self,
234 network: &NetworkTokenContents,
235 responder: NetworkPropertyResponder,
236 ) -> Option<NetworkPropertyResponder> {
237 let mut updates = Vec::new();
238 updates.add_socket_marks(self, network, &responder);
239 updates.add_dns(self, network, &responder);
240
241 if updates.is_empty() {
242 Some(responder)
243 } else {
244 if let Err(e) = responder.respond(Ok(&updates)) {
245 warn!("Could not send to responder: {e}");
246 }
247 None
248 }
249 }
250}
251
252trait PropertyUpdates {
253 fn add_socket_marks(
254 &mut self,
255 network_registry: &RegisteredNetworks,
256 network: &NetworkTokenContents,
257 responder: &NetworkPropertyResponder,
258 );
259 fn add_dns(
260 &mut self,
261 network_registry: &RegisteredNetworks,
262 network: &NetworkTokenContents,
263 responder: &NetworkPropertyResponder,
264 );
265}
266
267impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
268 fn add_socket_marks(
269 &mut self,
270 network_registry: &RegisteredNetworks,
271 network: &NetworkTokenContents,
272 responder: &NetworkPropertyResponder,
273 ) {
274 if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
275 return;
276 }
277
278 match network_registry.networks.get(&network.network_id) {
279 Some(network) => {
280 if let Some(socket_marks) = network.get_marks() {
281 self.push(fnp_properties::PropertyUpdate::SocketMarks(socket_marks.clone()));
282 }
283 return;
284 }
285 None => {
286 error!(
287 "State is inconsistent. We attempted to add marks for a \
288 network that is not known: {:?}",
289 network.network_id
290 );
291 }
292 }
293 }
294
295 fn add_dns(
296 &mut self,
297 network_registry: &RegisteredNetworks,
298 network: &NetworkTokenContents,
299 responder: &NetworkPropertyResponder,
300 ) {
301 if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
302 return;
303 }
304
305 let interface_id = network.network_id;
306 self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
307 fnp_properties::DnsConfiguration {
308 servers: Some(
309 network_registry
310 .dns_servers
311 .iter()
312 .filter(|d| {
313 match &d.source {
314 Some(source) => match source {
315 fnet_name::DnsServerSource::StaticSource(_) => true,
316 fnet_name::DnsServerSource::SocketProxy(
317 fnet_name::SocketProxyDnsServerSource {
318 source_interface,
319 ..
320 },
321 ) => match (interface_id, source_interface) {
322 (_, None) => true,
323 (id1, Some(id2)) => {
324 Ok(id1)
325 == InterfaceId::try_from(*id2)
326 .map(|id| NetworkId::delegated(id))
327 }
328 },
329 fnet_name::DnsServerSource::Dhcp(
330 fnet_name::DhcpDnsServerSource { source_interface, .. },
331 )
332 | fnet_name::DnsServerSource::Ndp(
333 fnet_name::NdpDnsServerSource { source_interface, .. },
334 )
335 | fnet_name::DnsServerSource::Dhcpv6(
336 fnet_name::Dhcpv6DnsServerSource {
337 source_interface, ..
338 },
339 ) => match (interface_id, source_interface) {
340 (_, None) => true,
341 (id1, Some(id2)) => {
342 Ok(id1)
343 == InterfaceId::try_from(*id2)
344 .map(|id| NetworkId::fuchsia(id))
345 }
346 },
347
348 _ => {
349 error!("unhandled DnsServerSource: {source:?}");
350 false
351 }
352 },
353
354 None => true,
356 }
357 })
358 .cloned()
359 .collect::<Vec<_>>(),
360 ),
361 ..Default::default()
362 },
363 ));
364 }
365}
366
367#[derive(Debug)]
369pub struct NetworkPropertiesChange {
370 pub added: bool,
373 pub marks: Option<fnet::Marks>,
375}
376
377#[derive(Debug)]
378pub enum NetworkUpdate {
379 Properties(NetworkPropertiesChange),
381 Remove,
382 MakeDefault,
383}
384
385#[derive(Debug)]
386enum UpdateApplied {
387 None,
389
390 DefaultNetworkChanged(Option<NetworkId>),
392
393 DnsChanged,
395
396 NetworkChanged { network_id: NetworkId, added: bool, changed_marks: bool },
398
399 NetworkRemoved(NetworkId),
401}
402
403#[derive(Debug)]
404pub enum PropertyUpdate {
405 LoseDefaultNetwork,
406 ChangeNetwork(NetworkId, NetworkUpdate),
407 UpdateDns(Vec<fnet_name::DnsServer_>),
408}
409
410impl PropertyUpdate {
411 pub fn default_network_lost() -> Self {
412 PropertyUpdate::LoseDefaultNetwork
413 }
414
415 pub fn dns(dns_servers: &DnsServers) -> Self {
416 PropertyUpdate::UpdateDns(dns_servers.consolidated_dns_servers())
419 }
420}
421
422#[derive(Default)]
423pub struct NetpolNetworksService {
424 current_generation: UpdateGeneration,
426 generations_by_connection: UpdateGenerations,
428 default_network_responders:
430 HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
431 tokens: token_registry::TokenRegistry<NetworkTokenContents>,
432 property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
434 network_registry: RegisteredNetworks,
436}
437
438impl NetpolNetworksService {
439 pub async fn handle_network_attributes_request(
440 &mut self,
441 id: ConnectionId,
442 req: Result<fnp_properties::NetworksRequest, fidl::Error>,
443 ) -> Result<(), anyhow::Error> {
444 let req = req.context("network attributes request")?;
445 match req {
446 fnp_properties::NetworksRequest::WatchDefault { responder } => {
447 match self.default_network_responders.entry(id) {
448 std::collections::hash_map::Entry::Occupied(_) => {
449 warn!(
450 "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
451 may be active per connection"
452 );
453 responder
454 .control_handle()
455 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
456 }
457 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
458 let network_id = if self
459 .generations_by_connection
460 .default_network(&id)
461 .unwrap_or_default()
462 < self.current_generation.default_network
463 {
464 self.network_registry.default_network
465 } else {
466 None
467 };
468 if let Some(network_id) = network_id {
469 self.generations_by_connection
470 .set_default_network(id, self.current_generation);
471 let token = self
472 .tokens
473 .ensure_token(NetworkTokenContents { network_id, is_default: true })
474 .get()
475 .duplicate()
476 .context("could not duplicate token")?;
477 responder.send(
478 fnp_properties::NetworksWatchDefaultResponse::Network(token),
479 )?;
480
481 if let Some(responder) = self.property_responders.remove(&id) {
482 let _: Option<_> = self.generations_by_connection.remove(&id);
483 let _: Result<(), fidl::Error> =
484 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
485 }
486 } else {
487 let _: &mut _ = vacant_entry.insert(responder);
488 }
489 }
490 }
491 }
492 fnp_properties::NetworksRequest::WatchProperties {
493 payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
494 responder,
495 } => match (network, properties) {
496 (None, _) | (_, None) => {
497 responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
498 }
499 (Some(network), Some(properties)) => {
500 if properties.is_empty() {
501 responder.send(Err(fnp_properties::WatchError::NoProperties))?;
502 } else {
503 match self.property_responders.entry(id) {
504 std::collections::hash_map::Entry::Occupied(_) => {
505 warn!(
506 "Only one call to \
507 fuchsia.net.policy.properties/Networks.WatchProperties may be \
508 active per connection"
509 );
510 responder
511 .control_handle()
512 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
513 }
514 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
515 match self.tokens.get_contents(&network) {
516 Err(e) => {
517 warn!("Unknown network token. ({network:?}: {e})");
518 responder.send(Err(
519 fnp_properties::WatchError::InvalidNetworkToken,
520 ))?;
521 }
522 Ok(network_contents) => {
523 let responder = NetworkPropertyResponder {
524 token: network,
525 watched_properties: properties,
526 responder,
527 };
528 if self
529 .generations_by_connection
530 .properties(&id)
531 .unwrap_or_default()
532 < self.current_generation.properties
533 {
534 self.generations_by_connection
535 .set_properties(id, self.current_generation);
536 if let Some(responder) = self
537 .network_registry
538 .maybe_respond(&network_contents, responder)
539 {
540 let _: &mut NetworkPropertyResponder =
541 vacant_entry.insert(responder);
542 }
543 } else {
544 let _: &mut NetworkPropertyResponder =
545 vacant_entry.insert(responder);
546 }
547 }
548 }
549 }
550 }
551 }
552 }
553 },
554 _ => {
555 warn!("Received unexpected request {req:?}");
556 }
557 }
558
559 Ok(())
560 }
561
562 pub async fn handle_delegated_networks_update(
563 &mut self,
564 update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
565 ) -> Result<(), anyhow::Error> {
566 use fnp_socketproxy::{
567 NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
568 NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
569 };
570
571 match update {
572 Err(e) => {
573 error!(
574 "Encountered error watching for delegated network \
575 updates: {e:?}"
576 );
577 Ok(())
578 }
579 Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
580 (async || match network_id {
581 fposix_socket::OptionalUint32::Value(interface_id) => {
584 self.update(PropertyUpdate::ChangeNetwork(
585 NetworkId::delegated(
586 InterfaceId::try_from(interface_id)
587 .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
588 ),
589 NetworkUpdate::MakeDefault,
590 ))
591 .await;
592 Ok(())
593 }
594 fposix_socket::OptionalUint32::Unset(_) => {
595 self.update(PropertyUpdate::default_network_lost()).await;
596 Ok(())
597 }
598 })()
599 .await,
600 ),
601 Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
602 (async || {
603 let network_id = network
604 .network_id
605 .and_then(|id| InterfaceId::try_from(id).ok())
606 .map(|id| NetworkId::delegated(id))
607 .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
608 let NetworkInfo::Starnix(info) =
609 network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
610 else {
611 return Err(NetworkRegistryAddError::MissingNetworkInfo);
612 };
613
614 let mut marks = fnet::Marks::default();
615 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
616
617 self.update(PropertyUpdate::ChangeNetwork(
620 network_id,
621 NetworkUpdate::Properties(NetworkPropertiesChange {
622 added: true,
623 marks: Some(marks),
624 }),
625 ))
626 .await;
627 Ok(())
628 })()
629 .await,
630 ),
631 Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
632 (async || {
633 let network_id = network
634 .network_id
635 .and_then(|id| InterfaceId::try_from(id).ok())
636 .map(|id| NetworkId::delegated(id))
637 .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
638 let NetworkInfo::Starnix(info) =
639 network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
640 else {
641 return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
642 };
643
644 let mut marks = fnet::Marks::default();
645 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
646 self.update(PropertyUpdate::ChangeNetwork(
647 network_id,
648 NetworkUpdate::Properties(NetworkPropertiesChange {
649 added: false,
650 marks: Some(marks),
651 }),
652 ))
653 .await;
654 Ok(())
655 })()
656 .await,
657 ),
658 Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
659 (async || {
660 self.update(PropertyUpdate::ChangeNetwork(
661 NetworkId::delegated(
662 InterfaceId::try_from(network_id)
666 .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
667 ),
668 NetworkUpdate::Remove,
669 ))
670 .await;
671 Ok(())
672 })()
673 .await,
674 ),
675 }
676 .context("while handling DelegatedNetwork request")
677 }
678
679 pub(crate) async fn handle_network_token_resolver_request(
680 &mut self,
681 request: Result<fnp_properties::NetworkTokenResolverRequest, fidl::Error>,
682 ) -> Result<(), anyhow::Error> {
683 use fnp_properties::NetworkTokenResolverResolveTokenError as ResolveTokenError;
684
685 let request = request.context("while handling NetworkTokenResolver request")?;
686 match request {
687 fnp_properties::NetworkTokenResolverRequest::ResolveToken { token, responder } => {
688 let maybe_contents = self.tokens.get_contents(&token).copied();
689 match maybe_contents {
690 Err(e) => {
691 warn!("Unknown network token. ({token:?}: {e})");
692 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
693 }
694 Ok(contents) => {
695 if contents.is_default {
696 let query = NetworkTokenContents { is_default: false, ..contents };
699 if let Some(tok) = self.tokens.get_token(&query) {
700 responder.send(tok.duplicate().map_err(|e| {
701 warn!("Encountered issue duplicating generated token. {e}");
702 ResolveTokenError::InvalidNetworkToken
703 }))?;
704 } else {
705 warn!("Requested canonical version of unregistered network.");
706 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
707 }
708 } else {
709 responder.send(Ok(token))?;
710 }
711 }
712 }
713 }
714 fidl_fuchsia_net_policy_properties::NetworkTokenResolverRequest::_UnknownMethod {
715 ordinal,
716 control_handle,
717 method_type,
718 ..
719 } => warn!(
720 "Encountered unknown method call on NetworkTokenResolver: {ordinal} \
721 {control_handle:?} {method_type:?}"
722 ),
723 }
724
725 Ok(())
726 }
727
728 async fn changed_default_network(
729 &mut self,
730 previous_default_network: Option<NetworkId>,
731 responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
732 ) {
733 let mut r = HashMap::new();
734 std::mem::swap(&mut r, responders);
735 r = r
736 .into_iter()
737 .filter_map(|(id, responder)| {
738 match self.tokens.get_contents(&responder.token) {
739 Ok(contents) => {
740 if contents.is_default {
742 let _: Option<_> = self.generations_by_connection.remove(&id);
743 let _: Result<(), fidl::Error> =
744 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
745 return None;
746 }
747 }
748 Err(zx::Status::NOT_FOUND) => {
749 warn!("Token provided to get_contents is not valid.");
750 }
751 Err(e) => {
752 warn!("Encountered unknown issue while getting contents: {e}");
753 }
754 }
755 Some((id, responder))
756 })
757 .collect::<HashMap<_, _>>();
758 std::mem::swap(&mut r, responders);
759 self.tokens.drop_if(|&c| {
760 c.is_default && previous_default_network.is_some_and(|i| i == c.network_id)
761 });
762 }
763
764 pub(crate) async fn remove_network(&mut self, network_id: NetworkId) {
765 info!("Removing interface {network_id}. Reporting NETWORK_GONE to all clients.");
766 let mut responders = HashMap::new();
767 std::mem::swap(&mut self.property_responders, &mut responders);
768 for (id, responder) in responders {
769 let network = match self.tokens.get_contents(&responder.token) {
770 Ok(network) => network,
771 Err(e) => {
772 warn!("Could not fetch network data for responder: {e}");
773 continue;
774 }
775 };
776 if network.network_id == network_id {
777 if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
779 warn!("Could not send to responder: {e}");
780 }
781 } else {
782 if self.property_responders.insert(id, responder).is_some() {
783 error!("Re-inserted in an existing responder slot. This should be impossible.");
784 }
785 }
786 }
787 }
788
789 pub async fn update(&mut self, update: PropertyUpdate) {
790 self.current_generation.properties += 1;
791 let update_applied = self.network_registry.apply(update);
792 if let UpdateApplied::None = update_applied {
793 return;
795 }
796
797 let mut property_responders = HashMap::new();
798 std::mem::swap(&mut self.property_responders, &mut property_responders);
799
800 match update_applied {
801 UpdateApplied::DefaultNetworkChanged(previous_default) => {
802 self.changed_default_network(previous_default, &mut property_responders).await;
803 match self.network_registry.default_network {
804 Some(default_network) => {
805 self.current_generation.default_network += 1;
806 let mut responders = HashMap::new();
807 std::mem::swap(&mut self.default_network_responders, &mut responders);
808 for (id, responder) in responders {
809 self.generations_by_connection
810 .set_default_network(id, self.current_generation);
811 match self
812 .tokens
813 .ensure_token(NetworkTokenContents {
814 network_id: default_network,
815 is_default: true,
816 })
817 .get()
818 .duplicate()
819 {
820 Ok(token) => {
821 if let Err(e) = responder.send(
822 fnp_properties::NetworksWatchDefaultResponse::Network(
823 token,
824 ),
825 ) {
826 warn!("Could not send to responder: {e}");
827 }
828 }
829 Err(e) => warn!("Could not duplicate token: {e}"),
830 };
831 }
832 }
833 None => {
834 self.current_generation.default_network += 1;
836 let mut responders = HashMap::new();
837 std::mem::swap(&mut self.default_network_responders, &mut responders);
838 for (id, responder) in responders {
839 self.generations_by_connection
840 .set_default_network(id, self.current_generation);
841 if let Err(e) = responder.send(
842 fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
843 fnp_properties::Empty,
844 ),
845 ) {
846 warn!("Could not send to responder: {e}");
847 }
848 }
849 }
850 }
851
852 return;
854 }
855 UpdateApplied::NetworkChanged { network_id, added: true, .. } => {
856 let _ = self
857 .tokens
858 .ensure_token(NetworkTokenContents { network_id, is_default: false });
859 }
860 UpdateApplied::NetworkRemoved(network_id) => {
861 self.tokens.drop_if(|c| !c.is_default && c.network_id == network_id);
862 }
863 UpdateApplied::NetworkChanged { added: false, .. } => {
864 }
867 UpdateApplied::DnsChanged => {}
870 UpdateApplied::None => {}
871 }
872
873 for (id, responder) in property_responders {
874 let mut updates = Vec::new();
875 let network = match self.tokens.get_contents(&responder.token) {
876 Ok(network) => network,
877 Err(e) => {
878 warn!("Could not fetch network data for responder: {e}");
879 continue;
880 }
881 };
882
883 if let UpdateApplied::NetworkChanged { network_id, changed_marks: true, .. } =
884 update_applied
885 {
886 if network.network_id == network_id {
887 updates.add_socket_marks(&self.network_registry, &network, &responder);
888 }
889 }
890 if let UpdateApplied::DnsChanged = update_applied {
891 updates.add_dns(&self.network_registry, &network, &responder);
892 }
893
894 self.generations_by_connection.set_properties(id, self.current_generation);
895 if updates.is_empty() {
896 if self.property_responders.insert(id, responder).is_some() {
897 warn!("Re-inserted in an existing responder slot. This should be impossible.");
898 }
899 } else {
900 if let Err(e) = responder.respond(Ok(&updates)) {
901 warn!("Could not send to responder: {e}");
902 }
903 }
904 }
905 }
906}
907
908pub struct ConnectionTagged<Stream: futures::Stream + Unpin> {
909 next_id: ConnectionId,
910 streams: futures::stream::SelectAll<Tagged<ConnectionId, Stream>>,
911}
912
913impl<Stream: futures::Stream + Unpin> Default for ConnectionTagged<Stream> {
914 fn default() -> Self {
915 Self { next_id: Default::default(), streams: Default::default() }
916 }
917}
918
919impl<Stream: futures::Stream + Unpin> ConnectionTagged<Stream> {
920 pub fn push(&mut self, stream: Stream) {
921 self.streams.push(stream.tagged(self.next_id));
922 self.next_id.0 += 1;
923 }
924}
925
926impl<Stream: futures::Stream + Unpin> futures::Stream for ConnectionTagged<Stream> {
927 type Item = (ConnectionId, <Stream as futures::Stream>::Item);
928
929 fn poll_next(
930 mut self: std::pin::Pin<&mut Self>,
931 cx: &mut std::task::Context<'_>,
932 ) -> std::task::Poll<Option<Self::Item>> {
933 std::pin::Pin::new(&mut self.streams).poll_next(cx)
934 }
935}
936
937impl<Stream: futures::Stream + Unpin> futures::stream::FusedStream for ConnectionTagged<Stream> {
938 fn is_terminated(&self) -> bool {
939 self.streams.is_terminated()
940 }
941}