1use std::collections::HashMap;
6use std::pin::Pin;
7
8use {
9 fidl_fuchsia_net as fnet, fidl_fuchsia_net_name as fnet_name, fidl_fuchsia_net_ndp as fnet_ndp,
10 fidl_fuchsia_net_ndp_ext as fnet_ndp_ext,
11};
12
13use anyhow::Context;
14use async_utils::stream::{Tagged, WithTag as _};
15use dns_server_watcher::{DnsServers, DnsServersUpdateSource};
16use fidl::endpoints::{ControlHandle as _, Responder as _};
17use futures::stream::BoxStream;
18use futures::{Stream, StreamExt};
19use log::{error, info, trace, warn};
20use net_types::{Scope, ScopeableAddress};
21use packet_formats::icmp::ndp as packet_formats_ndp;
22
23const DNS_PORT: u16 = 53;
24
25use crate::network;
26
27pub(super) async fn update_servers(
29 lookup_admin: &fnet_name::LookupAdminProxy,
30 dns_servers: &mut DnsServers,
31 dns_server_watch_responders: &mut DnsServerWatchResponders,
32 networks_service: &mut network::NetpolNetworksService,
33 source: DnsServersUpdateSource,
34 servers: Vec<fnet_name::DnsServer_>,
35) {
36 trace!("updating DNS servers obtained from {:?} to {:?}", source, servers);
37
38 let servers_before = dns_servers.consolidated();
39 dns_servers.set_servers_from_source(source, servers);
40 let servers = dns_servers.consolidated();
41 if servers_before == servers {
42 trace!("Update skipped because dns server list has not changed");
43 return;
44 }
45 trace!("updating LookupAdmin with DNS servers = {:?}", servers);
46
47 match lookup_admin.set_dns_servers(&servers).await {
48 Ok(Ok(())) => {}
49 Ok(Err(e)) => warn!("error setting DNS servers: {:?}", zx::Status::from_raw(e)),
50 Err(e) => warn!("error sending set DNS servers request: {:?}", e),
51 }
52
53 dns_server_watch_responders.send(dns_servers.consolidated_dns_servers());
54
55 networks_service.update(network::PropertyUpdate::default().dns(dns_servers)).await;
56}
57
58pub(super) async fn create_rdnss_stream(
62 watcher_provider: &fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
63 source: DnsServersUpdateSource,
64 interface_id: u64,
65) -> Option<
66 Result<
67 impl Stream<Item = (DnsServersUpdateSource, Result<Vec<fnet_name::DnsServer_>, fidl::Error>)>
68 + use<>,
69 fidl::Error,
70 >,
71> {
72 let watcher_result = fnet_ndp_ext::create_watcher_stream(
73 &watcher_provider,
74 &fnet_ndp::RouterAdvertisementOptionWatcherParams {
75 interest_types: Some(vec![
76 packet_formats_ndp::options::NdpOptionType::RecursiveDnsServer.into(),
77 ]),
78 interest_interface_id: Some(interface_id),
79 ..Default::default()
80 },
81 )
82 .await?;
83
84 let watcher = match watcher_result {
87 Ok(res) => res,
88 Err(e) => return Some(Err(e)),
89 };
90
91 Some(Ok(watcher
92 .filter_map(move |entry_res| async move {
93 let entry = match entry_res {
94 Ok(entry) => entry,
95 Err(fnet_ndp_ext::OptionWatchStreamError::Fidl(e)) => {
96 return Some(Err(e));
97 }
98 Err(fnet_ndp_ext::OptionWatchStreamError::Conversion(e)) => {
99 error!("Failed to convert OptionWatchStream item: {e:?}");
102 return None;
103 }
104 };
105 match entry {
106 fnet_ndp_ext::OptionWatchStreamItem::Entry(entry) => {
107 match entry.try_parse_as_rdnss() {
108 fnet_ndp_ext::TryParseAsOptionResult::Parsed(option) => Some(Ok(option
109 .iter_addresses()
110 .into_iter()
111 .map(|addr| fnet_name::DnsServer_ {
112 address: Some(fnet::SocketAddress::Ipv6(fnet::Ipv6SocketAddress {
113 address: fnet::Ipv6Address { addr: addr.ipv6_bytes() },
114 port: DNS_PORT,
115 zone_index: addr
118 .scope()
119 .can_have_zone()
120 .then_some(interface_id)
121 .unwrap_or_default(),
122 })),
123 source: Some(fnet_name::DnsServerSource::Ndp(
124 fnet_name::NdpDnsServerSource {
125 source_interface: Some(interface_id),
126 ..Default::default()
127 },
128 )),
129 ..Default::default()
130 })
131 .collect::<Vec<_>>())),
132 fnet_ndp_ext::TryParseAsOptionResult::OptionTypeMismatch => {
133 error!("Option type provided did not match RDNSS option type");
135 None
136 }
137 fnet_ndp_ext::TryParseAsOptionResult::ParseErr(err) => {
138 warn!("Error while parsing as OptionResult: {err:?}");
140 None
141 }
142 }
143 }
144 fnet_ndp_ext::OptionWatchStreamItem::Dropped(num) => {
145 warn!(
146 "The server dropped ({num}) NDP options \
147 due to the HangingGet falling behind"
148 );
149 None
150 }
151 }
152 })
153 .tagged(source)))
154}
155
156pub(super) async fn add_rdnss_watcher(
157 watcher_provider: &fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
158 interface_id: crate::InterfaceId,
159 watchers: &mut crate::DnsServerWatchers<'_>,
160) -> Result<(), anyhow::Error> {
161 let source = DnsServersUpdateSource::Ndp { interface_id: interface_id.get() };
162
163 let stream = create_rdnss_stream(watcher_provider, source, interface_id.get()).await;
165
166 match stream {
167 Some(result) => {
168 if let Some(o) =
169 watchers.insert(source, result.context("failed to create watcher stream")?.boxed())
170 {
171 let _: Pin<Box<BoxStream<'_, _>>> = o;
172 unreachable!("DNS server watchers must not contain key {:?}", source);
173 }
174 info!("started NDP watcher on host interface (id={interface_id})");
175 }
176 None => {
177 info!(
178 "NDP protocol unavailable: not starting watcher for interface (id={interface_id})"
179 );
180 }
181 }
182 Ok(())
183}
184
185pub(super) async fn remove_rdnss_watcher(
186 lookup_admin: &fnet_name::LookupAdminProxy,
187 dns_servers: &mut DnsServers,
188 dns_server_watch_responders: &mut DnsServerWatchResponders,
189 netpol_networks_service: &mut network::NetpolNetworksService,
190 interface_id: crate::InterfaceId,
191 watchers: &mut crate::DnsServerWatchers<'_>,
192) {
193 let source = DnsServersUpdateSource::Ndp { interface_id: interface_id.get() };
194
195 if let None = watchers.remove(&source) {
196 warn!(
200 "DNS Watcher for key not present; multiple futures stopped NDP \
201 watcher for key {:?}; interface_id={}",
202 source, interface_id
203 );
204 }
205
206 update_servers(
207 lookup_admin,
208 dns_servers,
209 dns_server_watch_responders,
210 netpol_networks_service,
211 source,
212 vec![],
213 )
214 .await
215}
216
217#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
218pub(crate) struct ConnectionId(usize);
219
220#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
221pub(crate) struct UpdateGeneration(usize);
222
223#[derive(Default)]
228pub(crate) struct DnsServerWatchResponders {
229 generation: UpdateGeneration,
232
233 generations: HashMap<ConnectionId, UpdateGeneration>,
235
236 responders: HashMap<ConnectionId, fnet_name::DnsServerWatcherWatchServersResponder>,
238}
239
240impl DnsServerWatchResponders {
241 fn send(&mut self, next_servers: Vec<fnet_name::DnsServer_>) {
242 let responders = std::mem::take(&mut self.responders);
243 self.generation.0 += 1;
244 for (id, responder) in responders {
245 match responder.send(&next_servers) {
246 Ok(()) => {
247 let _: Option<UpdateGeneration> = self.generations.insert(id, self.generation);
248 }
249 Err(e) => warn!("Error responding to DnsServerWatcher request: {e:?}"),
250 }
251 }
252 }
253
254 pub(crate) fn handle_request(
257 &mut self,
258 id: ConnectionId,
259 request: Result<fnet_name::DnsServerWatcherRequest, fidl::Error>,
260 servers: &DnsServers,
261 ) -> Result<(), fidl::Error> {
262 use std::collections::hash_map::Entry;
263 match request {
264 Ok(fnet_name::DnsServerWatcherRequest::WatchServers { responder }) => {
265 match self.responders.entry(id) {
266 Entry::Occupied(_) => {
267 warn!(
268 "Only one call to fuchsia.net.name/DnsServerWatcher.WatchServers \
269 may be active at once"
270 );
271 responder.control_handle().shutdown()
272 }
273 Entry::Vacant(vacant_entry) => {
274 if self.generations.get(&id) < Some(&self.generation) {
277 let _: Option<_> = self.generations.insert(id, self.generation);
278 responder.send(&servers.consolidated_dns_servers())?;
279 } else {
280 let _: &fnet_name::DnsServerWatcherWatchServersResponder =
281 vacant_entry.insert(responder);
282 }
283 }
284 }
285 }
286 Err(e) => {
287 error!("fuchsia.net.name/DnsServerWatcher request error: {:?}", e)
288 }
289 }
290
291 Ok(())
292 }
293}
294
295#[derive(Default)]
298pub(crate) struct DnsServerWatcherRequestStreams {
299 next_id: ConnectionId,
301
302 request_streams:
304 futures::stream::SelectAll<Tagged<ConnectionId, fnet_name::DnsServerWatcherRequestStream>>,
305}
306
307impl DnsServerWatcherRequestStreams {
308 pub fn handle_request_stream(&mut self, req_stream: fnet_name::DnsServerWatcherRequestStream) {
309 self.request_streams.push(req_stream.tagged(self.next_id));
310 self.next_id.0 += 1;
311 }
312}
313
314impl futures::Stream for DnsServerWatcherRequestStreams {
315 type Item = (ConnectionId, Result<fnet_name::DnsServerWatcherRequest, fidl::Error>);
316
317 fn poll_next(
318 mut self: std::pin::Pin<&mut Self>,
319 cx: &mut std::task::Context<'_>,
320 ) -> std::task::Poll<Option<Self::Item>> {
321 std::pin::Pin::new(&mut self.request_streams).poll_next(cx)
322 }
323}
324
325impl futures::stream::FusedStream for DnsServerWatcherRequestStreams {
326 fn is_terminated(&self) -> bool {
327 self.request_streams.is_terminated()
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use anyhow::anyhow;
334 use fuchsia_component::server::{ServiceFs, ServiceFsDir};
335 use fuchsia_component_test::{
336 Capability, ChildOptions, LocalComponentHandles, RealmBuilder, RealmInstance, Ref, Route,
337 };
338 use futures::channel::mpsc;
339 use futures::{
340 FutureExt as _, SinkExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _,
341 };
342 use net_declare::fidl_socket_addr;
343 use pretty_assertions::assert_eq;
344
345 use super::*;
346
347 enum StubbedServices {
348 LookupAdmin(fnet_name::LookupAdminRequestStream),
349 }
350
351 async fn run_lookup_admin(handles: LocalComponentHandles) -> Result<(), anyhow::Error> {
352 let mut fs = ServiceFs::new();
353 let _: &mut ServiceFsDir<'_, _> =
354 fs.dir("svc").add_fidl_service(StubbedServices::LookupAdmin);
355 let _: &mut ServiceFs<_> = fs.serve_connection(handles.outgoing_dir)?;
356
357 fs.for_each_concurrent(0, move |StubbedServices::LookupAdmin(stream)| async move {
358 stream
359 .try_for_each(|request| async move {
360 match request {
361 fidl_fuchsia_net_name::LookupAdminRequest::SetDnsServers { .. } => {
362 }
364 fidl_fuchsia_net_name::LookupAdminRequest::GetDnsServers { .. } => {
365 unimplemented!("Unused in this test")
366 }
367 }
368 Ok(())
369 })
370 .await
371 .context("Failed to serve request stream")
372 .unwrap_or_else(|e| warn!("Error encountered: {:?}", e))
373 })
374 .await;
375
376 Ok(())
377 }
378
379 enum IncomingService {
380 DnsServerWatcher(fnet_name::DnsServerWatcherRequestStream),
381 }
382
383 async fn run_dns_server_watcher(
384 handles: LocalComponentHandles,
385 mut receiver: mpsc::Receiver<(crate::DnsServersUpdateSource, Vec<fnet_name::DnsServer_>)>,
386 ) -> Result<(), anyhow::Error> {
387 let connection = handles.connect_to_protocol()?;
388
389 let mut fs = ServiceFs::new();
390 let _: &mut ServiceFsDir<'_, _> =
391 fs.dir("svc").add_fidl_service(IncomingService::DnsServerWatcher);
392 let _: &mut ServiceFs<_> = fs.serve_connection(handles.outgoing_dir)?;
393
394 let mut dns_server_watcher_incoming_requests = DnsServerWatcherRequestStreams::default();
395 let mut dns_servers = DnsServers::default();
396 let mut dns_server_watch_responders = DnsServerWatchResponders::default();
397 let mut netpol_networks_service = network::NetpolNetworksService::default();
398
399 let mut fs = futures::StreamExt::fuse(fs);
400
401 loop {
402 futures::select! {
403 req_stream = fs.select_next_some() => {
404 match req_stream {
405 IncomingService::DnsServerWatcher(stream) => {
406 dns_server_watcher_incoming_requests.handle_request_stream(stream)
407 }
408 }
409 }
410 req = dns_server_watcher_incoming_requests.select_next_some() => {
411 let (id, req) = req;
412 dns_server_watch_responders.handle_request(
413 id,
414 req,
415 &dns_servers,
416 )?;
417 }
418 update = receiver.select_next_some() => {
419 let (source, servers) = update;
420 update_servers(
421 &connection,
422 &mut dns_servers,
423 &mut dns_server_watch_responders,
424 &mut netpol_networks_service,
425 source,
426 servers,
427 ).await
428 }
429 }
430 }
431 }
432
433 async fn setup_test() -> Result<
434 (RealmInstance, mpsc::Sender<(crate::DnsServersUpdateSource, Vec<fnet_name::DnsServer_>)>),
435 anyhow::Error,
436 > {
437 let (tx, rx) = mpsc::channel(1);
438 let builder = RealmBuilder::new().await?;
439 let admin_server = builder
440 .add_local_child(
441 "lookup_admin",
442 move |handles: LocalComponentHandles| Box::pin(run_lookup_admin(handles)),
443 ChildOptions::new(),
444 )
445 .await?;
446
447 let dns_server_watcher = builder
448 .add_local_child(
449 "dns_server_watcher",
450 {
451 let rx = fuchsia_sync::Mutex::new(Some(rx));
452 move |handles: LocalComponentHandles| {
453 Box::pin(run_dns_server_watcher(
454 handles,
455 rx.lock()
456 .take()
457 .expect("Only one instance of run_dns_server_watcher should exist"),
458 ))
459 }
460 },
461 ChildOptions::new(),
462 )
463 .await?;
464
465 builder
466 .add_route(
467 Route::new()
468 .capability(Capability::protocol::<fnet_name::DnsServerWatcherMarker>())
469 .from(&dns_server_watcher)
470 .to(Ref::parent()),
471 )
472 .await?;
473 builder
474 .add_route(
475 Route::new()
476 .capability(Capability::protocol::<fnet_name::LookupAdminMarker>())
477 .from(&admin_server)
478 .to(&dns_server_watcher),
479 )
480 .await?;
481
482 let realm = builder.build().await?;
483
484 Ok((realm, tx))
485 }
486
487 fn server(address: fidl_fuchsia_net::SocketAddress) -> fnet_name::DnsServer_ {
488 fnet_name::DnsServer_ { address: Some(address), ..fnet_name::DnsServer_::default() }
489 }
490
491 #[fuchsia::test]
492 async fn test_dns_server_watcher() -> Result<(), anyhow::Error> {
493 let (realm, mut tx) = setup_test().await?;
494
495 let watcher1: fnet_name::DnsServerWatcherProxy = realm
496 .root
497 .connect_to_protocol_at_exposed_dir()
498 .context("While connecting to DnsServerWatcher")?;
499 let watcher2: fnet_name::DnsServerWatcherProxy = realm
500 .root
501 .connect_to_protocol_at_exposed_dir()
502 .context("While connecting to DnsServerWatcher")?;
503
504 assert_eq!(watcher1.watch_servers().await?, vec![]);
505 assert_eq!(watcher2.watch_servers().await?, vec![]);
506
507 let mut watcher1_call = watcher1.watch_servers().fuse();
509 futures::select! {
510 _ = watcher1_call => {
511 return Err(
512 anyhow!("WatchServers should not respond here, there have been no updates")
513 );
514 },
515 _ = fuchsia_async::Timer::new(std::time::Duration::from_millis(100)).fuse() => {}
516 }
517
518 let (watch1, watch2, _) = futures::try_join!(
520 watcher1_call.map_err(|e| anyhow::Error::from(e)),
522 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
523 tx.send((
524 DnsServersUpdateSource::Default,
525 vec![server(fidl_socket_addr!("203.0.113.1:1"))],
526 ))
527 .map_err(|e| anyhow::Error::from(e)),
528 )?;
529 assert_eq!(watch1, vec![server(fidl_socket_addr!("203.0.113.1:1")),]);
530 assert_eq!(watch2, vec![server(fidl_socket_addr!("203.0.113.1:1")),]);
531
532 let (watch1, watch2, _) = futures::try_join!(
534 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
535 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
536 tx.send((
537 DnsServersUpdateSource::Dhcpv4 { interface_id: 1 },
538 vec![server(fidl_socket_addr!("203.0.113.1:2")),],
539 ))
540 .map_err(|e| anyhow::Error::from(e)),
541 )?;
542 let expectation = vec![
545 server(fidl_socket_addr!("203.0.113.1:2")),
546 server(fidl_socket_addr!("203.0.113.1:1")),
547 ];
548 assert_eq!(watch1, expectation);
549 assert_eq!(watch2, expectation);
550
551 let (watch1, _) = futures::try_join!(
553 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
554 tx.send((
555 DnsServersUpdateSource::Dhcpv6 { interface_id: 1 },
556 vec![server(fidl_socket_addr!("[2001:db8::]:1")),],
557 ))
558 .map_err(|e| anyhow::Error::from(e)),
559 )?;
560 let expectation = vec![
562 server(fidl_socket_addr!("203.0.113.1:2")),
563 server(fidl_socket_addr!("[2001:db8::]:1")),
564 server(fidl_socket_addr!("203.0.113.1:1")),
565 ];
566 assert_eq!(watch1, expectation);
567
568 tx.send((
572 DnsServersUpdateSource::Default,
573 vec![fnet_name::DnsServer_ {
574 address: Some(fidl_socket_addr!("203.0.113.1:5")),
575 ..fnet_name::DnsServer_::default()
576 }],
577 ))
578 .await?;
579 let (watch1, watch2) = futures::try_join!(
580 watcher1.watch_servers().map_err(|e| anyhow::Error::from(e)),
581 watcher2.watch_servers().map_err(|e| anyhow::Error::from(e)),
582 )?;
583 let expectation = vec![
585 server(fidl_socket_addr!("203.0.113.1:2")),
586 server(fidl_socket_addr!("[2001:db8::]:1")),
587 server(fidl_socket_addr!("203.0.113.1:5")),
588 ];
589 assert_eq!(watch1, expectation);
590
591 assert_eq!(watch2, expectation);
593
594 Ok(())
595 }
596}