1use crate::InterfaceId;
6use anyhow::Context as _;
7use async_utils::stream::{Tagged, WithTag as _};
8use dns_server_watcher::DnsServers;
9use fidl::endpoints::{ControlHandle as _, Responder as _};
10use log::{error, info, warn};
11use std::collections::HashMap;
12
13mod token_map;
14
15use {
16 fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name,
17 fidl_fuchsia_net_policy_properties as fnp_properties,
18 fidl_fuchsia_net_policy_socketproxy as fnp_socketproxy,
19 fidl_fuchsia_posix_socket as fposix_socket,
20};
21
22pub(crate) struct NetworkTokenContents {
23 connection_id: ConnectionId,
24 interface_id: InterfaceId,
25}
26
27#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub struct ConnectionId(usize);
29
30#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct UpdateGeneration {
32 default_network: usize,
35
36 properties: usize,
39}
40
41#[derive(Clone, Debug, Default)]
42pub struct UpdateGenerations(HashMap<ConnectionId, UpdateGeneration>);
43
44impl UpdateGenerations {
45 fn default_network(&self, id: &ConnectionId) -> Option<usize> {
46 self.0.get(id).map(|g| g.default_network)
47 }
48
49 fn set_default_network(&mut self, id: ConnectionId, generation: UpdateGeneration) {
50 self.0.entry(id).or_default().default_network = generation.default_network;
51 }
52
53 fn properties(&self, id: &ConnectionId) -> Option<usize> {
54 self.0.get(id).map(|g| g.properties)
55 }
56
57 fn set_properties(&mut self, id: ConnectionId, generation: UpdateGeneration) {
58 self.0.entry(id).or_default().properties = generation.properties;
59 }
60
61 fn remove(&mut self, id: &ConnectionId) -> Option<UpdateGeneration> {
62 self.0.remove(id)
63 }
64}
65
66#[derive(Debug)]
67pub(crate) struct NetworkPropertyResponder {
68 token: fidl::EventPair,
69 watched_properties: Vec<fnp_properties::Property>,
70 responder: fnp_properties::NetworksWatchPropertiesResponder,
71}
72
73impl NetworkPropertyResponder {
74 fn respond(
75 self,
76 response: Result<&[fnp_properties::PropertyUpdate], fnp_properties::WatchError>,
77 ) -> Result<(), fidl::Error> {
78 self.responder.send(response)
79 }
80}
81
82#[derive(Default)]
83pub struct NetpolNetworksService {
84 current_generation: UpdateGeneration,
86 generations_by_connection: UpdateGenerations,
88 default_network_responders:
90 HashMap<ConnectionId, fnp_properties::NetworksWatchDefaultResponder>,
91 tokens: token_map::TokenMap<NetworkTokenContents>,
92 property_responders: HashMap<ConnectionId, NetworkPropertyResponder>,
94 network_properties: NetworkProperties,
96}
97
98#[derive(Default, Clone)]
99struct NetworkProperties {
100 default_network: Option<InterfaceId>,
102 socket_marks: HashMap<InterfaceId, fnet::Marks>,
104 dns_servers: Vec<fnet_name::DnsServer_>,
106}
107
108impl NetworkProperties {
109 fn apply(&mut self, update: PropertyUpdate) -> UpdatesApplied {
110 let mut updates = UpdatesApplied::default();
111
112 if let Some(new_default_network) = update.default_network {
113 updates.default_network = self.handle_default_network_update(new_default_network);
114 }
115
116 if let Some((netid, marks)) = update.socket_marks {
117 updates.socket_marks_network = self.handle_socket_marks_update(netid, marks);
118 }
119
120 if let Some(dns) = update.dns {
121 updates.dns_changed = self.dns_servers != dns;
122 self.dns_servers = dns;
123 }
124
125 updates
126 }
127
128 fn handle_default_network_update(
134 &mut self,
135 new_default_network: Option<InterfaceId>,
136 ) -> Option<Option<InterfaceId>> {
137 if new_default_network == self.default_network {
139 return None;
140 }
141
142 let old_default_network = self.default_network;
143 if let (None, Some(old_default_network_id)) = (new_default_network, old_default_network) {
144 let _: Option<_> = self.socket_marks.remove(&old_default_network_id);
145 }
146 self.default_network = new_default_network;
147 return Some(old_default_network);
148 }
149
150 fn handle_socket_marks_update(
156 &mut self,
157 netid: InterfaceId,
158 marks: fnet::Marks,
159 ) -> Option<InterfaceId> {
160 if self.socket_marks.contains_key(&netid)
163 && self
164 .socket_marks
165 .get(&netid)
166 .and_then(|old_marks| Some(*old_marks == marks))
167 .unwrap_or_default()
168 {
169 return None;
170 }
171
172 let _: Option<_> = self.socket_marks.insert(netid, marks);
173 return Some(netid);
174 }
175
176 fn maybe_respond(
177 &self,
178 network: &NetworkTokenContents,
179 responder: NetworkPropertyResponder,
180 ) -> Option<NetworkPropertyResponder> {
181 let mut updates = Vec::new();
182 updates.add_socket_marks(self, network, &responder);
183 updates.add_dns(self, network, &responder);
184
185 if updates.is_empty() {
186 Some(responder)
187 } else {
188 if let Err(e) = responder.respond(Ok(&updates)) {
189 warn!("Could not send to responder: {e}");
190 }
191 None
192 }
193 }
194}
195
196trait PropertyUpdates {
197 fn add_socket_marks(
198 &mut self,
199 properties: &NetworkProperties,
200 network: &NetworkTokenContents,
201 responder: &NetworkPropertyResponder,
202 );
203 fn add_dns(
204 &mut self,
205 properties: &NetworkProperties,
206 network: &NetworkTokenContents,
207 responder: &NetworkPropertyResponder,
208 );
209}
210
211impl PropertyUpdates for Vec<fnp_properties::PropertyUpdate> {
212 fn add_socket_marks(
213 &mut self,
214 properties: &NetworkProperties,
215 network: &NetworkTokenContents,
216 responder: &NetworkPropertyResponder,
217 ) {
218 if !responder.watched_properties.contains(&fnp_properties::Property::SocketMarks) {
219 return;
220 }
221 match properties.socket_marks.get(&network.interface_id) {
222 Some(marks) => self.push(fnp_properties::PropertyUpdate::SocketMarks(marks.clone())),
223 None => {}
224 }
225 }
226
227 fn add_dns(
228 &mut self,
229 properties: &NetworkProperties,
230 network: &NetworkTokenContents,
231 responder: &NetworkPropertyResponder,
232 ) {
233 if !responder.watched_properties.contains(&fnp_properties::Property::DnsConfiguration) {
234 return;
235 }
236
237 let interface_id = network.interface_id;
238 self.push(fnp_properties::PropertyUpdate::DnsConfiguration(
239 fnp_properties::DnsConfiguration {
240 servers: Some(
241 properties
242 .dns_servers
243 .iter()
244 .filter(|d| {
245 match &d.source {
246 Some(source) => match source {
247 fnet_name::DnsServerSource::StaticSource(_) => true,
248 fnet_name::DnsServerSource::SocketProxy(
249 fnet_name::SocketProxyDnsServerSource {
250 source_interface,
251 ..
252 },
253 )
254 | fnet_name::DnsServerSource::Dhcp(
255 fnet_name::DhcpDnsServerSource { source_interface, .. },
256 )
257 | fnet_name::DnsServerSource::Ndp(
258 fnet_name::NdpDnsServerSource { source_interface, .. },
259 )
260 | fnet_name::DnsServerSource::Dhcpv6(
261 fnet_name::Dhcpv6DnsServerSource {
262 source_interface, ..
263 },
264 ) => match (interface_id, source_interface) {
265 (_, None) => true,
266 (id1, Some(id2)) => id1.get() == *id2,
267 },
268
269 _ => {
270 error!("unhandled DnsServerSource: {source:?}");
271 false
272 }
273 },
274
275 None => true,
277 }
278 })
279 .cloned()
280 .collect::<Vec<_>>(),
281 ),
282 ..Default::default()
283 },
284 ));
285 }
286}
287
288#[derive(Default, Debug)]
289pub struct PropertyUpdate {
290 default_network: Option<Option<InterfaceId>>,
291 socket_marks: Option<(InterfaceId, fnet::Marks)>,
292 dns: Option<Vec<fnet_name::DnsServer_>>,
293}
294
295#[derive(Default, Debug)]
296struct UpdatesApplied {
297 default_network: Option<Option<InterfaceId>>,
299
300 socket_marks_network: Option<InterfaceId>,
302
303 dns_changed: bool,
305}
306
307impl PropertyUpdate {
308 pub fn default_network<N: Into<InterfaceId>>(mut self, network_id: N) -> Self {
309 self.default_network = Some(Some(network_id.into()));
310 self
311 }
312
313 pub fn default_network_lost(mut self) -> Self {
314 self.default_network = Some(None);
315 self
316 }
317
318 pub fn socket_marks<N: Into<InterfaceId>, Marks: Into<fnet::Marks>>(
319 mut self,
320 network_id: N,
321 marks: Marks,
322 ) -> Self {
323 self.socket_marks = Some((network_id.into(), marks.into()));
324 self
325 }
326
327 pub fn dns(mut self, dns_servers: &DnsServers) -> Self {
328 self.dns = Some(dns_servers.consolidated_dns_servers());
329 self
330 }
331
332 pub fn add_network<N: Into<InterfaceId>>(self, _id: N) -> Self {
333 self
335 }
336
337 pub fn remove_network<N: Into<InterfaceId>>(self, _id: N) -> Self {
338 self
340 }
341}
342
343impl NetpolNetworksService {
344 pub async fn handle_network_attributes_request(
345 &mut self,
346 id: ConnectionId,
347 req: Result<fnp_properties::NetworksRequest, fidl::Error>,
348 ) -> Result<(), anyhow::Error> {
349 let req = req.context("network attributes request")?;
350 match req {
351 fnp_properties::NetworksRequest::WatchDefault { responder } => {
352 match self.default_network_responders.entry(id) {
353 std::collections::hash_map::Entry::Occupied(_) => {
354 warn!(
355 "Only one call to fuchsia.net.policy.properties/Networks.WatchDefault \
356 may be active per connection"
357 );
358 responder
359 .control_handle()
360 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
361 }
362 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
363 let interface_id = if self
364 .generations_by_connection
365 .default_network(&id)
366 .unwrap_or_default()
367 < self.current_generation.default_network
368 {
369 match self.network_properties.default_network {
370 Some(interface_id) => Some(interface_id),
371 None => None,
372 }
373 } else {
374 None
375 };
376 if let Some(interface_id) = interface_id {
377 self.generations_by_connection
378 .set_default_network(id, self.current_generation);
379 let token = self
380 .tokens
381 .insert_data(NetworkTokenContents {
382 connection_id: id,
383 interface_id,
384 })
385 .await;
386 responder.send(
387 fnp_properties::NetworksWatchDefaultResponse::Network(
388 fnp_properties::NetworkToken {
389 value: Some(token),
390 ..Default::default()
391 },
392 ),
393 )?;
394
395 if let Some(responder) = self.property_responders.remove(&id) {
396 let _: Option<_> = self.generations_by_connection.remove(&id);
397 let _: Result<(), fidl::Error> = responder.respond(Err(
398 fnp_properties::WatchError::DefaultNetworkChanged,
399 ));
400 }
401 } else {
402 let _: &mut _ = vacant_entry.insert(responder);
403 }
404 }
405 }
406 }
407 fnp_properties::NetworksRequest::WatchProperties {
408 payload: fnp_properties::NetworksWatchPropertiesRequest { network, properties, .. },
409 responder,
410 } => match (network, properties) {
411 (None, _) | (_, None) => {
412 responder.send(Err(fnp_properties::WatchError::MissingRequiredArgument))?
413 }
414 (Some(network), Some(properties)) => {
415 if properties.is_empty() {
416 responder.send(Err(fnp_properties::WatchError::NoProperties))?;
417 } else if let Some(token) = network.value {
418 match self.property_responders.entry(id) {
419 std::collections::hash_map::Entry::Occupied(_) => {
420 warn!(
421 "Only one call to \
422 fuchsia.net.policy.properties/Networks.WatchProperties may be \
423 active per connection"
424 );
425 responder
426 .control_handle()
427 .shutdown_with_epitaph(zx::Status::CONNECTION_ABORTED)
428 }
429 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
430 match self.tokens.get(&token).await {
431 None => {
432 warn!("Unknown network token. ({token:?}");
433 responder.send(Err(
434 fnp_properties::WatchError::InvalidNetworkToken,
435 ))?;
436 }
437 Some(network_contents) => {
438 if network_contents.connection_id != id {
439 warn!(
440 "Cannot watch a NetworkToken that was not created \
441 by this connection."
442 );
443 responder.send(Err(
444 fnp_properties::WatchError::InvalidNetworkToken,
445 ))?;
446 } else {
447 let responder = NetworkPropertyResponder {
448 token,
449 watched_properties: properties,
450 responder,
451 };
452 if self
453 .generations_by_connection
454 .properties(&id)
455 .unwrap_or_default()
456 < self.current_generation.properties
457 {
458 self.generations_by_connection
459 .set_properties(id, self.current_generation);
460 if let Some(responder) = self
461 .network_properties
462 .maybe_respond(&network_contents, responder)
463 {
464 let _: &mut NetworkPropertyResponder =
465 vacant_entry.insert(responder);
466 }
467 } else {
468 let _: &mut NetworkPropertyResponder =
469 vacant_entry.insert(responder);
470 }
471 }
472 }
473 }
474 }
475 }
476 } else {
477 responder.send(Err(fnp_properties::WatchError::InvalidNetworkToken))?;
478 }
479 }
480 },
481 _ => {
482 warn!("Received unexpected request {req:?}");
483 }
484 }
485
486 Ok(())
487 }
488
489 pub async fn handle_delegated_networks_update(
490 &mut self,
491 update: Result<fnp_socketproxy::NetworkRegistryRequest, fidl::Error>,
492 ) -> Result<(), anyhow::Error> {
493 use fnp_socketproxy::{
494 NetworkInfo, NetworkRegistryAddError, NetworkRegistryRemoveError,
495 NetworkRegistryRequest, NetworkRegistrySetDefaultError, NetworkRegistryUpdateError,
496 };
497
498 match update {
499 Err(e) => {
500 error!(
501 "Encountered error watching for delegated network \
502 updates: {e:?}"
503 );
504 Ok(())
505 }
506 Ok(NetworkRegistryRequest::SetDefault { network_id, responder }) => responder.send(
507 (async || match network_id {
508 fposix_socket::OptionalUint32::Value(interface_id) => {
511 self.update(
512 PropertyUpdate::default().default_network(
513 InterfaceId::try_from(interface_id)
514 .map_err(|_| NetworkRegistrySetDefaultError::NotFound)?,
515 ),
516 )
517 .await;
518 Ok(())
519 }
520 fposix_socket::OptionalUint32::Unset(_) => {
521 self.update(PropertyUpdate::default().default_network_lost()).await;
522 Ok(())
523 }
524 })()
525 .await,
526 ),
527 Ok(NetworkRegistryRequest::Add { network, responder }) => responder.send(
528 (async || {
529 let network_id: InterfaceId = network
530 .network_id
531 .and_then(|id| id.try_into().ok())
532 .ok_or(NetworkRegistryAddError::MissingNetworkId)?;
533 let NetworkInfo::Starnix(info) =
534 network.info.ok_or(NetworkRegistryAddError::MissingNetworkInfo)?
535 else {
536 return Err(NetworkRegistryAddError::MissingNetworkInfo);
537 };
538 self.update(PropertyUpdate::default().add_network(network_id).socket_marks(
539 network_id,
540 fnet::Marks { mark_1: info.mark, mark_2: None, ..Default::default() },
543 ))
544 .await;
545 Ok(())
546 })()
547 .await,
548 ),
549 Ok(NetworkRegistryRequest::Update { network, responder }) => responder.send(
550 (async || {
551 let network_id: InterfaceId = network
552 .network_id
553 .and_then(|id| id.try_into().ok())
554 .ok_or(NetworkRegistryUpdateError::MissingNetworkId)?;
555 let NetworkInfo::Starnix(info) =
556 network.info.ok_or(NetworkRegistryUpdateError::MissingNetworkInfo)?
557 else {
558 return Err(NetworkRegistryUpdateError::MissingNetworkInfo);
559 };
560 self.update(PropertyUpdate::default().add_network(network_id).socket_marks(
561 network_id,
562 fnet::Marks { mark_1: info.mark, mark_2: None, ..Default::default() },
565 ))
566 .await;
567 Ok(())
568 })()
569 .await,
570 ),
571 Ok(NetworkRegistryRequest::Remove { network_id, responder }) => responder.send(
572 (async || {
573 self.update(
574 PropertyUpdate::default().remove_network(
575 InterfaceId::try_from(network_id)
579 .map_err(|_| NetworkRegistryRemoveError::NotFound)?,
580 ),
581 )
582 .await;
583 Ok(())
584 })()
585 .await,
586 ),
587 }
588 .context("while handling DelegatedNetwork request")
589 }
590
591 async fn changed_default_network(
592 error: fnp_properties::WatchError,
593 responders: &mut HashMap<ConnectionId, NetworkPropertyResponder>,
594 generations: &mut UpdateGenerations,
595 ) {
596 let mut r = HashMap::new();
597 std::mem::swap(&mut r, responders);
598 r = r
599 .into_iter()
600 .filter_map(|(id, responder)| {
601 let _: Option<_> = generations.remove(&id);
603 let _: Result<(), fidl::Error> = responder.respond(Err(error));
604 None
605 })
606 .collect::<HashMap<_, _>>();
607 std::mem::swap(&mut r, responders);
608 }
609
610 pub(crate) async fn remove_network<ID: Into<InterfaceId>>(&mut self, interface_id: ID) {
611 let interface_id = interface_id.into();
612 info!("Removing interface {interface_id}. Reporting NETWORK_GONE to all clients.");
613 let mut responders = HashMap::new();
614 std::mem::swap(&mut self.property_responders, &mut responders);
615 for (id, responder) in responders {
616 let network = match self.tokens.get(&responder.token).await {
617 Some(network) => network,
618 None => {
619 warn!("Could not fetch network data for responder");
620 continue;
621 }
622 };
623 if network.interface_id == interface_id {
624 if let Err(e) = responder.respond(Err(fnp_properties::WatchError::NetworkGone)) {
626 warn!("Could not send to responder: {e}");
627 }
628 } else {
629 if self.property_responders.insert(id, responder).is_some() {
630 error!("Re-inserted in an existing responder slot. This should be impossible.");
631 }
632 }
633 }
634 }
635
636 pub async fn update(&mut self, update: PropertyUpdate) {
637 self.current_generation.properties += 1;
638 let updates_applied = self.network_properties.apply(update);
639 let mut property_responders = HashMap::new();
640 std::mem::swap(&mut self.property_responders, &mut property_responders);
641
642 if updates_applied.default_network.is_some() {
643 if let Some(default_network) = self.network_properties.default_network {
644 self.current_generation.default_network += 1;
645 let mut responders = HashMap::new();
646 std::mem::swap(&mut self.default_network_responders, &mut responders);
647 for (id, responder) in responders {
648 self.generations_by_connection.set_default_network(id, self.current_generation);
649
650 let token = self
651 .tokens
652 .insert_data(NetworkTokenContents {
653 connection_id: id,
654 interface_id: default_network,
655 })
656 .await;
657
658 if let Err(e) =
659 responder.send(fnp_properties::NetworksWatchDefaultResponse::Network(
660 fnp_properties::NetworkToken {
661 value: Some(token),
662 ..Default::default()
663 },
664 ))
665 {
666 warn!("Could not send to responder: {e}");
667 }
668 }
669
670 NetpolNetworksService::changed_default_network(
671 fnp_properties::WatchError::DefaultNetworkChanged,
672 &mut property_responders,
673 &mut self.generations_by_connection,
674 )
675 .await;
676 } else {
677 self.current_generation.default_network += 1;
679 let mut responders = HashMap::new();
680 std::mem::swap(&mut self.default_network_responders, &mut responders);
681 for (id, responder) in responders {
682 self.generations_by_connection.set_default_network(id, self.current_generation);
683 if let Err(e) = responder.send(
684 fnp_properties::NetworksWatchDefaultResponse::NoDefaultNetwork(
685 fnp_properties::Empty,
686 ),
687 ) {
688 warn!("Could not send to responder: {e}");
689 }
690 }
691
692 NetpolNetworksService::changed_default_network(
693 fnp_properties::WatchError::DefaultNetworkLost,
694 &mut property_responders,
695 &mut self.generations_by_connection,
696 )
697 .await;
698 }
699 }
700
701 for (id, responder) in property_responders {
702 let mut updates = Vec::new();
703 let network = match self.tokens.get(&responder.token).await {
704 Some(network) => network,
705 None => {
706 warn!("Could not fetch network data for responder");
707 continue;
708 }
709 };
710
711 if let Some(network_id) = updates_applied.socket_marks_network {
712 if network.interface_id == network_id {
713 updates.add_socket_marks(&self.network_properties, &network, &responder);
714 }
715 }
716 if updates_applied.dns_changed {
717 updates.add_dns(&self.network_properties, &network, &responder);
718 }
719
720 self.generations_by_connection.set_properties(id, self.current_generation);
721 if updates.is_empty() {
722 if self.property_responders.insert(id, responder).is_some() {
723 warn!("Re-inserted in an existing responder slot. This should be impossible.");
724 }
725 } else {
726 if let Err(e) = responder.respond(Ok(&updates)) {
727 warn!("Could not send to responder: {e}");
728 }
729 }
730 }
731 }
732}
733
734#[derive(Default)]
735pub struct NetworksRequestStreams {
736 next_id: ConnectionId,
737 request_streams:
738 futures::stream::SelectAll<Tagged<ConnectionId, fnp_properties::NetworksRequestStream>>,
739}
740
741impl NetworksRequestStreams {
742 pub fn push(&mut self, stream: fnp_properties::NetworksRequestStream) {
743 self.request_streams.push(stream.tagged(self.next_id));
744 self.next_id.0 += 1;
745 }
746}
747
748impl futures::Stream for NetworksRequestStreams {
749 type Item = (ConnectionId, Result<fnp_properties::NetworksRequest, fidl::Error>);
750
751 fn poll_next(
752 mut self: std::pin::Pin<&mut Self>,
753 cx: &mut std::task::Context<'_>,
754 ) -> std::task::Poll<Option<Self::Item>> {
755 std::pin::Pin::new(&mut self.request_streams).poll_next(cx)
756 }
757}
758
759impl futures::stream::FusedStream for NetworksRequestStreams {
760 fn is_terminated(&self) -> bool {
761 self.request_streams.is_terminated()
762 }
763}