dns_server_watcher/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! DNS Server watcher stream.
6
7use fidl_fuchsia_net_name::{DnsServerWatcherProxy, DnsServer_};
8
9use async_utils::stream::WithTag as _;
10use futures::future::TryFutureExt as _;
11use futures::stream::Stream;
12
13/// The possible sources of DNS server updates.
14#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
15pub enum DnsServersUpdateSource {
16    Default,
17    Netstack,
18    Dhcpv4 { interface_id: u64 },
19    Dhcpv6 { interface_id: u64 },
20    Ndp { interface_id: u64 },
21}
22
23/// Returns a `Stream` of [`DnsServerWatcherEvent`]s from watching the server configuration
24/// provided by `proxy`.
25pub fn new_dns_server_stream(
26    source: DnsServersUpdateSource,
27    proxy: DnsServerWatcherProxy,
28) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
29    futures::stream::try_unfold(proxy, move |proxy| {
30        proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
31    })
32    .tagged(source)
33}
34
35#[cfg(test)]
36mod tests {
37    use std::collections::VecDeque;
38    use std::sync::Arc;
39
40    use fidl_fuchsia_net_name::{
41        DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
42        DnsServerWatcherWatchServersResponder,
43    };
44
45    use fuchsia_async as fasync;
46    use futures::lock::Mutex;
47    use futures::{FutureExt, StreamExt, TryStreamExt};
48
49    use super::*;
50    use crate::test_util::constants::*;
51
52    struct MockDnsServerWatcher {
53        configs: VecDeque<Vec<DnsServer_>>,
54        pending_request: Option<DnsServerWatcherWatchServersResponder>,
55    }
56
57    impl MockDnsServerWatcher {
58        fn new() -> Self {
59            Self { configs: VecDeque::new(), pending_request: None }
60        }
61
62        fn push_config(&mut self, config: Vec<DnsServer_>) {
63            match self.pending_request.take() {
64                Some(req) => {
65                    let () = req.send(&config).expect("Failed to fulfill FIDL request");
66                }
67                None => self.configs.push_back(config),
68            }
69        }
70
71        async fn serve(
72            watcher: Arc<Mutex<Self>>,
73            rs: DnsServerWatcherRequestStream,
74        ) -> Result<(), fidl::Error> {
75            rs.try_for_each(move |r| {
76                let watcher = watcher.clone();
77                async move {
78                    match r {
79                        DnsServerWatcherRequest::WatchServers { responder } => {
80                            let mut w = watcher.lock().await;
81                            if w.pending_request.is_some() {
82                                panic!("No more than 1 pending requests allowed");
83                            }
84
85                            if let Some(config) = w.configs.pop_front() {
86                                responder.send(&config).expect("Failed to fulfill FIDL request");
87                            } else {
88                                w.pending_request = Some(responder)
89                            }
90                        }
91                    }
92                    Ok(())
93                }
94            })
95            .await
96        }
97    }
98
99    #[fasync::run_singlethreaded(test)]
100    async fn test_dns_server_stream() {
101        let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
102        let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
103        let (serve_fut, abort_handle) =
104            futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
105
106        let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
107            let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
108            assert!(stream.next().now_or_never().is_none());
109            assert!(stream.next().now_or_never().is_none());
110            {
111                let mut w = watcher.lock().await;
112                w.push_config(vec![ndp_server()]);
113                w.push_config(vec![static_server()]);
114            }
115            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
116            assert_eq!(source, DnsServersUpdateSource::Netstack);
117            assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
118
119            let (source, res) = stream.next().await.expect("stream ended unexpectedly");
120            assert_eq!(source, DnsServersUpdateSource::Netstack);
121            assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
122
123            // Abort the serving future so join will end.
124            abort_handle.abort();
125            stream
126        })
127        .await;
128        let _aborted = serve_result.expect_err("Future must've been aborted");
129        let (source, res) = stream.next().await.expect("Stream must yield a final value");
130        assert_eq!(source, DnsServersUpdateSource::Netstack);
131        let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
132        assert!(stream.next().await.is_none(), "Stream must end after error");
133    }
134}