1use crate::InterfaceId;
6use crate::telemetry::{NetworkEventMetadata, TelemetryEvent, TelemetrySender};
7use anyhow::Context as _;
8use async_utils::stream::{Tagged, WithTag as _};
9use dns_server_watcher::DnsServers;
10use fidl::endpoints::{ControlHandle as _, Responder as _};
11use log::{error, info, warn};
12use policy_properties::NetworkTokenExt as _;
13use std::collections::HashMap;
14use std::collections::hash_map::Entry;
15
16mod token_registry;
17
18use fidl_fuchsia_net as fnet;
19use fidl_fuchsia_net_name as fnet_name;
20use fidl_fuchsia_net_policy_properties as fnp_properties;
21use fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy;
22use fidl_fuchsia_posix_socket as fposix_socket;
23
24#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum NetworkId {
30 Fuchsia(InterfaceId),
31 Delegated(InterfaceId),
32}
33
34impl std::fmt::Display for NetworkId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 NetworkId::Fuchsia(interface_id) => write!(f, "fuchsia:{interface_id}"),
38 NetworkId::Delegated(interface_id) => write!(f, "delegated:{interface_id}"),
39 }
40 }
41}
42
43impl NetworkId {
44 pub fn get(&self) -> InterfaceId {
45 match self {
46 NetworkId::Fuchsia(interface_id) => *interface_id,
47 NetworkId::Delegated(interface_id) => *interface_id,
48 }
49 }
50
51 pub fn fuchsia<I: Into<InterfaceId>>(id: I) -> Self {
52 NetworkId::Fuchsia(id.into())
53 }
54
55 pub fn delegated<I: Into<InterfaceId>>(id: I) -> Self {
56 NetworkId::Delegated(id.into())
57 }
58}
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub(crate) struct NetworkTokenContents {
62 network_id: NetworkId,
63 is_default: bool,
64}
65
66#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct ConnectionId(usize);
68
69#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct UpdateGeneration {
71 default_network: usize,
74
75 properties: usize,
78}
79
80#[derive(Clone, Debug, Default)]
81pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
82
83impl UpdateGenerations {
84 fn default_network(&self, id: &ConnectionId) -> Option<usize> {
85 self.0.get(id).map(|g| g.default_network)
86 }
87
88 fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
89 self.0.entry(id).or_default().default_network = generation.default_network;
90 }
91
92 fn properties(&self, id: &ConnectionId) -> Option<usize> {
93 self.0.get(id).map(|g| g.properties)
94 }
95
96 fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
97 self.0.entry(id).or_default().properties = generation.properties;
98 }
99
100 fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
101 self.0.remove(id)
102 }
103}
104
105trait SetMark {
106 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>);
107}
108
109impl SetMark for fnet::Marks {
110 fn set_mark(&mut self, domain: fnet::MarkDomain, value: Option<u32>) {
111 match domain {
112 fnet::MarkDomain::Mark1 => self.mark_1 = value,
113 fnet::MarkDomain::Mark2 => self.mark_2 = value,
114 }
115 }
116}
117
118#[derive(Debug)]
119pub(crate) struct NetworkPropertyResponder {
120 token: fnp_properties::NetworkToken,
121 watched_properties: Vec<fnp_properties::Property>,
122 responder: fnp_properties::NetworksWatchPropertiesResponder,
123}
124
125impl NetworkPropertyResponder {
126 fn respond(
127 self,
128 response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
129 ) -> Result<(), fidl::Error> {
130 self.responder.send(response)
131 }
132}
133
134#[derive(Default, Clone)]
135struct NetworkProperties {
136 socket_marks: Option<fnet::Marks>,
137 #[allow(dead_code)]
139 connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
140 name: Option<String>,
141 network_type: Option<fnp_socketproxy::NetworkType>,
142}
143
144impl NetworkProperties {
145 fn get_marks(&self) -> Option<&fnet::Marks> {
146 self.socket_marks.as_ref()
147 }
148}
149
150#[derive(Default, Clone)]
152struct RegisteredNetworks {
153 default_network: Option<NetworkId>,
154 networks: HashMap<NetworkId, NetworkProperties>,
155 dns_servers: Vec<fnet_name::DnsServer_>,
156}
157
158impl RegisteredNetworks {
159 fn apply(&mut self, update: PropertyUpdate) -> UpdateApplied {
160 match update {
161 PropertyUpdate::LoseDefaultNetwork => self.handle_default_network_update(None),
162 PropertyUpdate::ChangeNetwork(network_id, network_change) => match network_change {
163 NetworkUpdate::Properties(event) => self.handle_changed_network(network_id, event),
164 NetworkUpdate::Remove => UpdateApplied::NetworkRemoved(network_id),
165 NetworkUpdate::MakeDefault => self.handle_default_network_update(Some(network_id)),
166 },
167 PropertyUpdate::UpdateDns(dns_servers) => {
168 if self.dns_servers != dns_servers {
169 self.dns_servers = dns_servers;
170 UpdateApplied::DnsChanged
171 } else {
172 UpdateApplied::None
173 }
174 }
175 }
176 }
177
178 fn handle_default_network_update(
184 &mut self,
185 new_default_network: Option<NetworkId>,
186 ) -> UpdateApplied {
187 if new_default_network == self.default_network {
189 return UpdateApplied::None;
190 }
191
192 let old_default_network = self.default_network;
193 self.default_network = new_default_network;
194 return UpdateApplied::DefaultNetworkChanged(old_default_network);
195 }
196
197 fn handle_changed_network(
202 &mut self,
203 network_id: NetworkId,
204 event: NetworkPropertiesChange,
205 ) -> UpdateApplied {
206 let NetworkPropertiesChange {
207 added,
208 marks: socket_marks,
209 connectivity_state,
210 name,
211 network_type,
212 } = event;
213 let entry = self.networks.entry(network_id);
214 let result = match (added, &entry, network_id, socket_marks) {
215 (true, Entry::Occupied(_), _, _) => Err("add already added network"),
216 (false, Entry::Vacant(_), _, _) => Err("update a non-added network"),
217 (_, _, NetworkId::Fuchsia(_), Some(_)) => Err("have a fuchsia network with marks"),
218 (_, _, NetworkId::Delegated(_), None) => Err("have a delegated network without marks"),
219 (_, _, NetworkId::Fuchsia(_), None) => Ok((NetworkProperties::default(), added)),
220 (_, entry, NetworkId::Delegated(_), Some(socket_marks)) => {
221 let changed = if let Entry::Occupied(e) = entry {
222 e.get().get_marks() != Some(&socket_marks)
223 } else {
224 true
225 };
226 Ok((
227 NetworkProperties { socket_marks: Some(socket_marks), ..Default::default() },
228 changed,
229 ))
230 }
231 };
232
233 match result {
234 Ok((mut properties, changed_marks)) => {
235 properties.connectivity_state = connectivity_state;
236 properties.network_type = network_type;
237 properties.name = name.clone();
238 let _ = entry.insert_entry(properties);
239 UpdateApplied::NetworkChanged {
240 network_id,
241 added,
242 changed_marks,
243 name,
244 network_type,
245 }
246 }
247 Err(e) => {
248 error!("Cannot {e}. Update ignored.");
249 UpdateApplied::None
250 }
251 }
252 }
253
254 fn maybe_respond(
255 &self,
256 network: &NetworkTokenContents,
257 responder: NetworkPropertyResponder,
258 ) -> Option<NetworkPropertyResponder> {
259 let mut updates = Vec::new();
260 updates.add_socket_marks(self, network, &responder);
261 updates.add_dns(self, network, &responder);
262
263 if updates.is_empty() {
264 Some(responder)
265 } else {
266 if let Err(e) = responder.respond(Ok(&updates)) {
267 warn!("Could not send to responder: {e}");
268 }
269 None
270 }
271 }
272}
273
274trait PropertyUpdates {
275 fn add_socket_marks(
276 &mut self,
277 network_registry: &RegisteredNetworks,
278 network: &NetworkTokenContents,
279 responder: &NetworkPropertyResponder,
280 );
281 fn add_dns(
282 &mut self,
283 network_registry: &RegisteredNetworks,
284 network: &NetworkTokenContents,
285 responder: &NetworkPropertyResponder,
286 );
287}
288
289impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
290 fn add_socket_marks(
291 &mut self,
292 network_registry: &RegisteredNetworks,
293 network: &NetworkTokenContents,
294 responder: &NetworkPropertyResponder,
295 ) {
296 if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
297 return;
298 }
299
300 match network_registry.networks.get(&network.network_id) {
301 Some(network) => {
302 if let Some(socket_marks) = network.get_marks() {
303 self.push(fnp_properties::PropertyUpdate::SocketMarks(socket_marks.clone()));
304 }
305 return;
306 }
307 None => {
308 error!(
309 "State is inconsistent. We attempted to add marks for a \
310 network that is not known: {:?}",
311 network.network_id
312 );
313 }
314 }
315 }
316
317 fn add_dns(
318 &mut self,
319 network_registry: &RegisteredNetworks,
320 network: &NetworkTokenContents,
321 responder: &NetworkPropertyResponder,
322 ) {
323 if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
324 return;
325 }
326
327 let interface_id = network.network_id;
328 self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
329 fnp_properties::DnsConfiguration {
330 servers: Some(
331 network_registry
332 .dns_servers
333 .iter()
334 .filter(|d| {
335 match &d.source {
336 Some(source) => match source {
337 fnet_name::DnsServerSource::StaticSource(_) => true,
338 fnet_name::DnsServerSource::SocketProxy(
339 fnet_name::SocketProxyDnsServerSource {
340 source_interface,
341 ..
342 },
343 ) => match (interface_id, source_interface) {
344 (_, None) => true,
345 (id1, Some(id2)) => {
346 Ok(id1)
347 == InterfaceId::try_from(*id2)
348 .map(|id| NetworkId::delegated(id))
349 }
350 },
351 fnet_name::DnsServerSource::Dhcp(
352 fnet_name::DhcpDnsServerSource { source_interface, .. },
353 )
354 | fnet_name::DnsServerSource::Ndp(
355 fnet_name::NdpDnsServerSource { source_interface, .. },
356 )
357 | fnet_name::DnsServerSource::Dhcpv6(
358 fnet_name::Dhcpv6DnsServerSource {
359 source_interface, ..
360 },
361 ) => match (interface_id, source_interface) {
362 (_, None) => true,
363 (id1, Some(id2)) => {
364 Ok(id1)
365 == InterfaceId::try_from(*id2)
366 .map(|id| NetworkId::fuchsia(id))
367 }
368 },
369
370 _ => {
371 error!("unhandled DnsServerSource: {source:?}");
372 false
373 }
374 },
375
376 None => true,
378 }
379 })
380 .cloned()
381 .collect::<Vec<_>>(),
382 ),
383 ..Default::default()
384 },
385 ));
386 }
387}
388
389#[derive(Debug)]
391pub struct NetworkPropertiesChange {
392 pub added: bool,
395 pub marks: Option<fnet::Marks>,
397 pub connectivity_state: Option<fnp_socketproxy::ConnectivityState>,
399 pub name: Option<String>,
401 pub network_type: Option<fnp_socketproxy::NetworkType>,
403}
404
405#[derive(Debug)]
406pub enum NetworkUpdate {
407 Properties(NetworkPropertiesChange),
409 Remove,
410 MakeDefault,
411}
412
413#[derive(Debug, PartialEq, Eq)]
414enum UpdateApplied {
415 None,
417
418 DefaultNetworkChanged(Option<NetworkId>),
420
421 DnsChanged,
423
424 NetworkChanged {
426 network_id: NetworkId,
427 added: bool,
428 changed_marks: bool,
429 name: Option<String>,
430 network_type: Option<fnp_socketproxy::NetworkType>,
431 },
432
433 NetworkRemoved(NetworkId),
435}
436
437#[derive(Debug)]
438pub enum PropertyUpdate {
439 LoseDefaultNetwork,
440 ChangeNetwork(NetworkId, NetworkUpdate),
441 UpdateDns(Vec<fnet_name::DnsServer_>),
442}
443
444impl PropertyUpdate {
445 pub fn default_network_lost() -> Self {
446 PropertyUpdate::LoseDefaultNetwork
447 }
448
449 pub fn dns(dns_servers: &DnsServers) -> Self {
450 PropertyUpdate::UpdateDns(dns_servers.consolidated_dns_servers())
453 }
454}
455
456#[derive(Default)]
457pub struct NetpolNetworksService {
458 current_generation: UpdateGeneration,
460 generations_by_connection: UpdateGenerations,
462 default_network_responders:
464 HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
465 tokens: token_registry::TokenRegistry<NetworkTokenContents>,
466 property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
468 network_registry: RegisteredNetworks,
470 telemetry: Option<TelemetrySender>,
471}
472
473impl NetpolNetworksService {
474 pub fn set_telemetry(&mut self, telemetry: TelemetrySender) {
475 self.telemetry = Some(telemetry);
476 }
477
478 pub async fn handle_network_attributes_request(
479 &mut self,
480 id: ConnectionId,
481 req: Result<fnp_properties::NetworksRequest, fidl::Error>,
482 ) -> Result<(), anyhow::Error> {
483 let req = req.context("network attributes request")?;
484 match req {
485 fnp_properties::NetworksRequest::WatchDefault { responder } => {
486 match self.default_network_responders.entry(id) {
487 std::collections::hash_map::Entry::Occupied(_) => {
488 warn!(
489 "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
490 may be active per connection"
491 );
492 responder
493 .control_handle()
494 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
495 }
496 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
497 let network_id = if self
498 .generations_by_connection
499 .default_network(&id)
500 .unwrap_or_default()
501 < self.current_generation.default_network
502 {
503 self.network_registry.default_network
504 } else {
505 None
506 };
507 if let Some(network_id) = network_id {
508 self.generations_by_connection
509 .set_default_network(id, self.current_generation);
510 let token = self
511 .tokens
512 .ensure_token(NetworkTokenContents { network_id, is_default: true })
513 .get()
514 .duplicate()
515 .context("could not duplicate token")?;
516 responder.send(
517 fnp_properties::NetworksWatchDefaultResponse::Network(token),
518 )?;
519
520 if let Some(responder) = self.property_responders.remove(&id) {
521 let _: Option<_> = self.generations_by_connection.remove(&id);
522 let _: Result<(), fidl::Error> =
523 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
524 }
525 } else {
526 let _: &mut _ = vacant_entry.insert(responder);
527 }
528 }
529 }
530 }
531 fnp_properties::NetworksRequest::WatchProperties {
532 payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
533 responder,
534 } => match (network, properties) {
535 (None, _) | (_, None) => {
536 responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
537 }
538 (Some(network), Some(properties)) => {
539 if properties.is_empty() {
540 responder.send(Err(fnp_properties::WatchError::NoProperties))?;
541 } else {
542 match self.property_responders.entry(id) {
543 std::collections::hash_map::Entry::Occupied(_) => {
544 warn!(
545 "Only one call to \
546 fuchsia.net.policy.properties/Networks.WatchProperties may be \
547 active per connection"
548 );
549 responder
550 .control_handle()
551 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
552 }
553 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
554 match self.tokens.get_contents(&network) {
555 Err(e) => {
556 warn!("Unknown network token. ({network:?}: {e})");
557 responder.send(Err(
558 fnp_properties::WatchError::InvalidNetworkToken,
559 ))?;
560 }
561 Ok(network_contents) => {
562 let responder = NetworkPropertyResponder {
563 token: network,
564 watched_properties: properties,
565 responder,
566 };
567 if self
568 .generations_by_connection
569 .properties(&id)
570 .unwrap_or_default()
571 < self.current_generation.properties
572 {
573 self.generations_by_connection
574 .set_properties(id, self.current_generation);
575 if let Some(responder) = self
576 .network_registry
577 .maybe_respond(&network_contents, responder)
578 {
579 let _: &mut NetworkPropertyResponder =
580 vacant_entry.insert(responder);
581 }
582 } else {
583 let _: &mut NetworkPropertyResponder =
584 vacant_entry.insert(responder);
585 }
586 }
587 }
588 }
589 }
590 }
591 }
592 },
593 _ => {
594 warn!("Received unexpected request {req:?}");
595 }
596 }
597
598 Ok(())
599 }
600
601 pub async fn handle_delegated_networks_update(
602 &mut self,
603 update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
604 ) -> Result<(), anyhow::Error> {
605 use fnp_socketproxy::{
606 NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
607 NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
608 };
609
610 match update {
611 Err(e) => {
612 error!(
613 "Encountered error watching for delegated network \
614 updates: {e:?}"
615 );
616 Ok(())
617 }
618 Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
619 (async || match network_id {
620 fposix_socket::OptionalUint32::Value(interface_id) => {
623 self.update(PropertyUpdate::ChangeNetwork(
624 NetworkId::delegated(
625 InterfaceId::try_from(interface_id)
626 .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
627 ),
628 NetworkUpdate::MakeDefault,
629 ))
630 .await;
631 Ok(())
632 }
633 fposix_socket::OptionalUint32::Unset(_) => {
634 self.update(PropertyUpdate::default_network_lost()).await;
635 Ok(())
636 }
637 })()
638 .await,
639 ),
640 Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
641 (async || {
642 let network_id = network
643 .network_id
644 .and_then(|id| InterfaceId::try_from(id).ok())
645 .map(|id| NetworkId::delegated(id))
646 .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
647 let NetworkInfo::Starnix(info) =
648 network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
649 else {
650 return Err(NetworkRegistryAddError::MissingNetworkInfo);
651 };
652
653 let mut marks = fnet::Marks::default();
654 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
655
656 self.update(PropertyUpdate::ChangeNetwork(
659 network_id,
660 NetworkUpdate::Properties(NetworkPropertiesChange {
661 added: true,
662 marks: Some(marks),
663 connectivity_state: network.connectivity,
664 name: network.name,
665 network_type: network.network_type,
666 }),
667 ))
668 .await;
669 Ok(())
670 })()
671 .await,
672 ),
673 Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
674 (async || {
675 let network_id = network
676 .network_id
677 .and_then(|id| InterfaceId::try_from(id).ok())
678 .map(|id| NetworkId::delegated(id))
679 .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
680 let NetworkInfo::Starnix(info) =
681 network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
682 else {
683 return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
684 };
685
686 let mut marks = fnet::Marks::default();
687 marks.set_mark(fnet::MARK_DOMAIN_SO_MARK, info.mark);
688 self.update(PropertyUpdate::ChangeNetwork(
689 network_id,
690 NetworkUpdate::Properties(NetworkPropertiesChange {
691 added: false,
692 marks: Some(marks),
693 connectivity_state: network.connectivity,
694 name: network.name,
695 network_type: network.network_type,
696 }),
697 ))
698 .await;
699 Ok(())
700 })()
701 .await,
702 ),
703 Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
704 (async || {
705 self.update(PropertyUpdate::ChangeNetwork(
706 NetworkId::delegated(
707 InterfaceId::try_from(network_id)
711 .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
712 ),
713 NetworkUpdate::Remove,
714 ))
715 .await;
716 Ok(())
717 })()
718 .await,
719 ),
720 }
721 .context("while handling DelegatedNetwork request")
722 }
723
724 pub(crate) async fn handle_network_token_resolver_request(
725 &mut self,
726 request: Result<fnp_properties::NetworkTokenResolverRequest, fidl::Error>,
727 ) -> Result<(), anyhow::Error> {
728 use fnp_properties::NetworkTokenResolverResolveTokenError as ResolveTokenError;
729
730 let request = request.context("while handling NetworkTokenResolver request")?;
731 match request {
732 fnp_properties::NetworkTokenResolverRequest::ResolveToken { token, responder } => {
733 let maybe_contents = self.tokens.get_contents(&token).copied();
734 match maybe_contents {
735 Err(e) => {
736 warn!("Unknown network token. ({token:?}: {e})");
737 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
738 }
739 Ok(contents) => {
740 if contents.is_default {
741 let query = NetworkTokenContents { is_default: false, ..contents };
744 if let Some(tok) = self.tokens.get_token(&query) {
745 responder.send(tok.duplicate().map_err(|e| {
746 warn!("Encountered issue duplicating generated token. {e}");
747 ResolveTokenError::InvalidNetworkToken
748 }))?;
749 } else {
750 warn!("Requested canonical version of unregistered network.");
751 responder.send(Err(ResolveTokenError::InvalidNetworkToken))?;
752 }
753 } else {
754 responder.send(Ok(token))?;
755 }
756 }
757 }
758 }
759 fidl_fuchsia_net_policy_properties::NetworkTokenResolverRequest::_UnknownMethod {
760 ordinal,
761 control_handle,
762 method_type,
763 ..
764 } => warn!(
765 "Encountered unknown method call on NetworkTokenResolver: {ordinal} \
766 {control_handle:?} {method_type:?}"
767 ),
768 }
769
770 Ok(())
771 }
772
773 async fn changed_default_network(
774 &mut self,
775 previous_default_network: Option<NetworkId>,
776 responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
777 ) {
778 let mut r = HashMap::new();
779 std::mem::swap(&mut r, responders);
780 r = r
781 .into_iter()
782 .filter_map(|(id, responder)| {
783 match self.tokens.get_contents(&responder.token) {
784 Ok(contents) => {
785 if contents.is_default {
787 let _: Option<_> = self.generations_by_connection.remove(&id);
788 let _: Result<(), fidl::Error> =
789 responder.respond(Err(fnp_properties::WatchError::NetworkGone));
790 return None;
791 }
792 }
793 Err(zx::Status::NOT_FOUND) => {
794 warn!("Token provided to get_contents is not valid.");
795 }
796 Err(e) => {
797 warn!("Encountered unknown issue while getting contents: {e}");
798 }
799 }
800 Some((id, responder))
801 })
802 .collect::<HashMap<_, _>>();
803 std::mem::swap(&mut r, responders);
804 self.tokens.drop_if(|&c| {
805 c.is_default && previous_default_network.is_some_and(|i| i == c.network_id)
806 });
807 }
808
809 pub(crate) async fn remove_network(&mut self, network_id: NetworkId) {
810 info!("Removing interface {network_id}. Reporting NETWORK_GONE to all clients.");
811 let mut responders = HashMap::new();
812 std::mem::swap(&mut self.property_responders, &mut responders);
813 for (id, responder) in responders {
814 let network = match self.tokens.get_contents(&responder.token) {
815 Ok(network) => network,
816 Err(e) => {
817 warn!("Could not fetch network data for responder: {e}");
818 continue;
819 }
820 };
821 if network.network_id == network_id {
822 if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
824 warn!("Could not send to responder: {e}");
825 }
826 } else {
827 if self.property_responders.insert(id, responder).is_some() {
828 error!("Re-inserted in an existing responder slot. This should be impossible.");
829 }
830 }
831 }
832 }
833
834 pub async fn update(&mut self, update: PropertyUpdate) {
835 self.current_generation.properties += 1;
836 let update_applied = self.network_registry.apply(update);
837 if let UpdateApplied::None = update_applied {
838 return;
840 }
841
842 let mut property_responders = HashMap::new();
843 std::mem::swap(&mut self.property_responders, &mut property_responders);
844
845 match update_applied {
846 UpdateApplied::DefaultNetworkChanged(previous_default) => {
847 self.changed_default_network(previous_default, &mut property_responders).await;
848 match self.network_registry.default_network {
849 Some(default_network) => {
850 if let Some(telemetry) = &self.telemetry {
851 if let Some(props) =
852 self.network_registry.networks.get(&default_network)
853 {
854 telemetry.send(TelemetryEvent::DefaultNetworkChanged(
855 NetworkEventMetadata {
856 id: default_network.get().get(),
857 name: props.name.clone(),
858 transport: props
859 .network_type
860 .unwrap_or(fnp_socketproxy::NetworkType::Unknown),
861 is_fuchsia_provisioned: matches!(
862 default_network,
863 NetworkId::Fuchsia(_)
864 ),
865 connectivity_state: props.connectivity_state,
866 },
867 ));
868 } else {
869 warn!("Could not fetch network data for default network.");
870 }
871 }
872 self.current_generation.default_network += 1;
873 let mut responders = HashMap::new();
874 std::mem::swap(&mut self.default_network_responders, &mut responders);
875 for (id, responder) in responders {
876 self.generations_by_connection
877 .set_default_network(id, self.current_generation);
878 match self
879 .tokens
880 .ensure_token(NetworkTokenContents {
881 network_id: default_network,
882 is_default: true,
883 })
884 .get()
885 .duplicate()
886 {
887 Ok(token) => {
888 if let Err(e) = responder.send(
889 fnp_properties::NetworksWatchDefaultResponse::Network(
890 token,
891 ),
892 ) {
893 warn!("Could not send to responder: {e}");
894 }
895 }
896 Err(e) => warn!("Could not duplicate token: {e}"),
897 };
898 }
899 }
900 None => {
901 if let Some(telemetry) = &self.telemetry {
902 telemetry.send(TelemetryEvent::DefaultNetworkLost);
903 }
904 self.current_generation.default_network += 1;
906 let mut responders = HashMap::new();
907 std::mem::swap(&mut self.default_network_responders, &mut responders);
908 for (id, responder) in responders {
909 self.generations_by_connection
910 .set_default_network(id, self.current_generation);
911 if let Err(e) = responder.send(
912 fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
913 fnp_properties::Empty,
914 ),
915 ) {
916 warn!("Could not send to responder: {e}");
917 }
918 }
919 }
920 }
921
922 return;
924 }
925 UpdateApplied::NetworkChanged { network_id, added: true, .. } => {
926 let _ = self
927 .tokens
928 .ensure_token(NetworkTokenContents { network_id, is_default: false });
929 }
930 UpdateApplied::NetworkRemoved(network_id) => {
931 self.tokens.drop_if(|c| !c.is_default && c.network_id == network_id);
932 }
933 UpdateApplied::NetworkChanged { added: false, .. } => {
934 }
937 UpdateApplied::DnsChanged => {}
940 UpdateApplied::None => {}
941 }
942
943 if let UpdateApplied::NetworkChanged { network_id, .. } = update_applied {
944 if let Some(telemetry) = &self.telemetry {
945 if let Some(props) = self.network_registry.networks.get(&network_id) {
946 telemetry.send(TelemetryEvent::NetworkChanged(NetworkEventMetadata {
947 id: network_id.get().get(),
948 name: props.name.clone(),
949 transport: props
950 .network_type
951 .unwrap_or(fnp_socketproxy::NetworkType::Unknown),
952 is_fuchsia_provisioned: matches!(network_id, NetworkId::Fuchsia(_)),
953 connectivity_state: props.connectivity_state,
954 }));
955 }
956 }
957 }
958
959 for (id, responder) in property_responders {
960 let mut updates = Vec::new();
961 let network = match self.tokens.get_contents(&responder.token) {
962 Ok(network) => network,
963 Err(e) => {
964 warn!("Could not fetch network data for responder: {e}");
965 continue;
966 }
967 };
968
969 if let UpdateApplied::NetworkChanged { network_id, changed_marks: true, .. } =
970 update_applied
971 {
972 if network.network_id == network_id {
973 updates.add_socket_marks(&self.network_registry, &network, &responder);
974 }
975 }
976 if let UpdateApplied::DnsChanged = update_applied {
977 updates.add_dns(&self.network_registry, &network, &responder);
978 }
979
980 self.generations_by_connection.set_properties(id, self.current_generation);
981 if updates.is_empty() {
982 if self.property_responders.insert(id, responder).is_some() {
983 warn!("Re-inserted in an existing responder slot. This should be impossible.");
984 }
985 } else {
986 if let Err(e) = responder.respond(Ok(&updates)) {
987 warn!("Could not send to responder: {e}");
988 }
989 }
990 }
991 }
992}
993
994pub struct ConnectionTagged<Stream: futures::Stream + Unpin> {
995 next_id: ConnectionId,
996 streams: futures::stream::SelectAll<Tagged<ConnectionId, Stream>>,
997}
998
999impl<Stream: futures::Stream + Unpin> Default for ConnectionTagged<Stream> {
1000 fn default() -> Self {
1001 Self { next_id: Default::default(), streams: Default::default() }
1002 }
1003}
1004
1005impl<Stream: futures::Stream + Unpin> ConnectionTagged<Stream> {
1006 pub fn push(&mut self, stream: Stream) {
1007 self.streams.push(stream.tagged(self.next_id));
1008 self.next_id.0 += 1;
1009 }
1010}
1011
1012impl<Stream: futures::Stream + Unpin> futures::Stream for ConnectionTagged<Stream> {
1013 type Item = (ConnectionId, <Stream as futures::Stream>::Item);
1014
1015 fn poll_next(
1016 mut self: std::pin::Pin<&mut Self>,
1017 cx: &mut std::task::Context<'_>,
1018 ) -> std::task::Poll<Option<Self::Item>> {
1019 std::pin::Pin::new(&mut self.streams).poll_next(cx)
1020 }
1021}
1022
1023impl<Stream: futures::Stream + Unpin> futures::stream::FusedStream for ConnectionTagged<Stream> {
1024 fn is_terminated(&self) -> bool {
1025 self.streams.is_terminated()
1026 }
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031 use super::*;
1032 use crate::InterfaceId;
1033 use std::num::NonZeroU64;
1034 const ID_1: InterfaceId = InterfaceId(NonZeroU64::new(1).unwrap());
1035 const ID_2: InterfaceId = InterfaceId(NonZeroU64::new(2).unwrap());
1036 const NAME_1: &str = "testif1";
1037 const NAME_2: &str = "testif2";
1038
1039 #[test]
1040 fn test_handle_changed_network_delegated() {
1041 let mut networks = RegisteredNetworks::default();
1042 let network_id = NetworkId::Delegated(ID_1);
1043 let marks = fnet::Marks { mark_1: Some(123), ..Default::default() };
1044
1045 let event = NetworkPropertiesChange {
1047 added: true,
1048 marks: Some(marks.clone()),
1049 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1050 name: Some(NAME_1.to_string()),
1051 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1052 };
1053 assert_eq!(
1054 networks.handle_changed_network(network_id, event),
1055 UpdateApplied::NetworkChanged {
1056 network_id,
1057 added: true,
1058 changed_marks: true,
1059 name: Some(NAME_1.to_string()),
1060 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1061 }
1062 );
1063
1064 let properties = networks.networks.get(&network_id).expect("network should be present");
1065 assert_eq!(properties.socket_marks, Some(marks.clone()));
1066 assert_eq!(
1067 properties.connectivity_state,
1068 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1069 );
1070
1071 let event = NetworkPropertiesChange {
1073 added: false,
1074 marks: Some(marks.clone()),
1075 connectivity_state: Some(fnp_socketproxy::ConnectivityState::NoConnectivity),
1076 name: Some(NAME_1.to_string()),
1077 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1078 };
1079 assert_eq!(
1080 networks.handle_changed_network(network_id, event),
1081 UpdateApplied::NetworkChanged {
1082 network_id,
1083 added: false,
1084 changed_marks: false,
1085 name: Some(NAME_1.to_string()),
1086 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1087 }
1088 );
1089
1090 let properties = networks.networks.get(&network_id).expect("network should be present");
1091 assert_eq!(properties.socket_marks, Some(marks.clone()));
1092 assert_eq!(
1093 properties.connectivity_state,
1094 Some(fnp_socketproxy::ConnectivityState::NoConnectivity)
1095 );
1096
1097 let new_marks = fnet::Marks { mark_1: Some(456), ..Default::default() };
1099 let event = NetworkPropertiesChange {
1100 added: false,
1101 marks: Some(new_marks.clone()),
1102 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1103 name: Some(NAME_1.to_string()),
1104 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1105 };
1106 assert_eq!(
1107 networks.handle_changed_network(network_id, event),
1108 UpdateApplied::NetworkChanged {
1109 network_id,
1110 added: false,
1111 changed_marks: true,
1112 name: Some(NAME_1.to_string()),
1113 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1114 }
1115 );
1116
1117 let properties = networks.networks.get(&network_id).expect("network should be present");
1118 assert_eq!(properties.socket_marks, Some(new_marks));
1119 assert_eq!(
1120 properties.connectivity_state,
1121 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1122 );
1123 }
1124
1125 #[test]
1126 fn test_handle_changed_network_fuchsia() {
1127 let mut networks = RegisteredNetworks::default();
1128 let network_id = NetworkId::Fuchsia(ID_2);
1129
1130 let event = NetworkPropertiesChange {
1132 added: true,
1133 marks: None,
1134 connectivity_state: Some(fnp_socketproxy::ConnectivityState::LocalConnectivity),
1135 name: Some(NAME_2.to_string()),
1136 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1137 };
1138 assert_eq!(
1139 networks.handle_changed_network(network_id, event),
1140 UpdateApplied::NetworkChanged {
1141 network_id,
1142 added: true,
1143 changed_marks: true,
1144 name: Some(NAME_2.to_string()),
1145 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1146 }
1147 );
1148
1149 let properties = networks.networks.get(&network_id).expect("network should be present");
1150 assert_eq!(properties.socket_marks, None);
1151 assert_eq!(
1152 properties.connectivity_state,
1153 Some(fnp_socketproxy::ConnectivityState::LocalConnectivity)
1154 );
1155
1156 let event = NetworkPropertiesChange {
1158 added: false,
1159 marks: None,
1160 connectivity_state: Some(fnp_socketproxy::ConnectivityState::FullConnectivity),
1161 name: Some(NAME_2.to_string()),
1162 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1163 };
1164 assert_eq!(
1165 networks.handle_changed_network(network_id, event),
1166 UpdateApplied::NetworkChanged {
1167 network_id,
1168 added: false,
1169 changed_marks: false,
1170 name: Some(NAME_2.to_string()),
1171 network_type: Some(fnp_socketproxy::NetworkType::Wifi),
1172 }
1173 );
1174
1175 let properties = networks.networks.get(&network_id).expect("network should be present");
1176 assert_eq!(
1177 properties.connectivity_state,
1178 Some(fnp_socketproxy::ConnectivityState::FullConnectivity)
1179 );
1180 }
1181
1182 #[test]
1183 fn test_handle_changed_network_validation() {
1184 let mut networks = RegisteredNetworks::default();
1185 let network_id = NetworkId::Delegated(ID_1);
1186 let marks = fnet::Marks { mark_1: Some(123), ..Default::default() };
1187
1188 let event = NetworkPropertiesChange {
1190 added: false,
1191 marks: Some(marks.clone()),
1192 connectivity_state: None,
1193 name: Some(NAME_1.to_string()),
1194 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1195 };
1196 assert_eq!(networks.handle_changed_network(network_id, event), UpdateApplied::None);
1197 let event = NetworkPropertiesChange {
1199 added: true,
1200 marks: Some(marks.clone()),
1201 connectivity_state: None,
1202 name: Some(NAME_1.to_string()),
1203 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1204 };
1205 assert_eq!(
1206 networks.handle_changed_network(network_id, event),
1207 UpdateApplied::NetworkChanged {
1208 network_id,
1209 added: true,
1210 changed_marks: true,
1211 name: Some(NAME_1.to_string()),
1212 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1213 }
1214 );
1215
1216 let event = NetworkPropertiesChange {
1218 added: true,
1219 marks: Some(marks.clone()),
1220 connectivity_state: None,
1221 name: Some(NAME_1.to_string()),
1222 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1223 };
1224 assert_eq!(networks.handle_changed_network(network_id, event), UpdateApplied::None);
1225
1226 let fuchsia_id = NetworkId::Fuchsia(ID_1);
1228 let event = NetworkPropertiesChange {
1229 added: true,
1230 marks: Some(marks.clone()),
1231 connectivity_state: None,
1232 name: Some(NAME_1.to_string()),
1233 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1234 };
1235 assert_eq!(networks.handle_changed_network(fuchsia_id, event), UpdateApplied::None);
1236
1237 let delegated_id = NetworkId::Delegated(ID_1);
1239 let event = NetworkPropertiesChange {
1240 added: true,
1241 marks: None,
1242 connectivity_state: None,
1243 name: Some(NAME_1.to_string()),
1244 network_type: Some(fnp_socketproxy::NetworkType::Ethernet),
1245 };
1246 assert_eq!(networks.handle_changed_network(delegated_id, event), UpdateApplied::None);
1247 }
1248}