Skip to main content

dns_server_watcher/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! DNS Server watcher stream.
6
7use fidl_fuchsia_net_name::{DnsServer_, DnsServerWatcherProxy};
8
9use async_utils::stream::WithTag as _;
10use futures::future::TryFutureExt as _;
11use futures::stream::Stream;
12
13/// The possible sources of DNS server updates.
14#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
15pub enum DnsServersUpdateSource {
16    Default,
17    Netstack,
18    Dhcpv4 { interface_id: u64 },
19    Dhcpv6 { interface_id: u64 },
20    Ndp { interface_id: u64 },
21    SocketProxy,
22}
23
24/// Returns a `Stream` of [`DnsServerWatcherEvent`]s from watching the server configuration
25/// provided by `proxy`.
26pub fn new_dns_server_stream(
27    source: DnsServersUpdateSource,
28    proxy: DnsServerWatcherProxy,
29) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
30    futures::stream::try_unfold(proxy, move |proxy| {
31        proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
32    })
33    .tagged(source)
34}
35
36#[cfg(test)]
37mod tests {
38    use std::collections::VecDeque;
39    use std::sync::Arc;
40
41    use fidl_fuchsia_net_name::{
42        DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
43        DnsServerWatcherWatchServersResponder,
44    };
45
46    use fuchsia_async as fasync;
47    use futures::lock::Mutex;
48    use futures::{FutureExt, StreamExt, TryStreamExt};
49
50    use super::*;
51    use crate::test_util::constants::*;
52
53    struct MockDnsServerWatcher {
54        configs: VecDeque<Vec<DnsServer_>>,
55        pending_request: Option<DnsServerWatcherWatchServersResponder>,
56    }
57
58    impl MockDnsServerWatcher {
59        fn new() -> Self {
60            Self { configs: VecDeque::new(), pending_request: None }
61        }
62
63        fn push_config(&mut self, config: Vec<DnsServer_>) {
64            match self.pending_request.take() {
65                Some(req) => {
66                    req.send(&config).expect("Failed to fulfill FIDL request");
67                }
68                None => self.configs.push_back(config),
69            }
70        }
71
72        async fn serve(
73            watcher: Arc<Mutex<Self>>,
74            rs: DnsServerWatcherRequestStream,
75        ) -> Result<(), fidl::Error> {
76            rs.try_for_each(move |r| {
77                let watcher = watcher.clone();
78                async move {
79                    match r {
80                        DnsServerWatcherRequest::WatchServers { responder } => {
81                            let mut w = watcher.lock().await;
82                            if w.pending_request.is_some() {
83                                panic!("No more than 1 pending requests allowed");
84                            }
85
86                            if let Some(config) = w.configs.pop_front() {
87                                responder.send(&config).expect("Failed to fulfill FIDL request");
88                            } else {
89                                w.pending_request = Some(responder)
90                            }
91                        }
92                    }
93                    Ok(())
94                }
95            })
96            .await
97        }
98    }
99
100    #[fasync::run_singlethreaded(test)]
101    async fn test_dns_server_stream() {
102        let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
103        let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
104        let (serve_fut, abort_handle) =
105            futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
106
107        let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
108            let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
109            assert!(stream.next().now_or_never().is_none());
110            assert!(stream.next().now_or_never().is_none());
111            {
112                let mut w = watcher.lock().await;
113                w.push_config(vec![ndp_server()]);
114                w.push_config(vec![static_server()]);
115            }
116            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
117            assert_eq!(source, DnsServersUpdateSource::Netstack);
118            assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
119
120            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
121            assert_eq!(source, DnsServersUpdateSource::Netstack);
122            assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
123
124            // Abort the serving future so join will end.
125            abort_handle.abort();
126            stream
127        })
128        .await;
129        let _aborted = serve_result.expect_err("Future must've been aborted");
130        let (source, res) = stream.next().await.expect("Stream must yield a final value");
131        assert_eq!(source, DnsServersUpdateSource::Netstack);
132        let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
133        assert!(stream.next().await.is_none(), "Stream must end after error");
134    }
135}