dns_server_watcher/
stream.rs1use fidl_fuchsia_net_name::{DnsServer_, DnsServerWatcherProxy};
8
9use async_utils::stream::WithTag as _;
10use futures::future::TryFutureExt as _;
11use futures::stream::Stream;
12
13#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
15pub enum DnsServersUpdateSource {
16 Default,
17 Netstack,
18 Dhcpv4 { interface_id: u64 },
19 Dhcpv6 { interface_id: u64 },
20 Ndp { interface_id: u64 },
21 SocketProxy,
22}
23
24pub fn new_dns_server_stream(
27 source: DnsServersUpdateSource,
28 proxy: DnsServerWatcherProxy,
29) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
30 futures::stream::try_unfold(proxy, move |proxy| {
31 proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
32 })
33 .tagged(source)
34}
35
36#[cfg(test)]
37mod tests {
38 use std::collections::VecDeque;
39 use std::sync::Arc;
40
41 use fidl_fuchsia_net_name::{
42 DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
43 DnsServerWatcherWatchServersResponder,
44 };
45
46 use fuchsia_async as fasync;
47 use futures::lock::Mutex;
48 use futures::{FutureExt, StreamExt, TryStreamExt};
49
50 use super::*;
51 use crate::test_util::constants::*;
52
53 struct MockDnsServerWatcher {
54 configs: VecDeque<Vec<DnsServer_>>,
55 pending_request: Option<DnsServerWatcherWatchServersResponder>,
56 }
57
58 impl MockDnsServerWatcher {
59 fn new() -> Self {
60 Self { configs: VecDeque::new(), pending_request: None }
61 }
62
63 fn push_config(&mut self, config: Vec<DnsServer_>) {
64 match self.pending_request.take() {
65 Some(req) => {
66 req.send(&config).expect("Failed to fulfill FIDL request");
67 }
68 None => self.configs.push_back(config),
69 }
70 }
71
72 async fn serve(
73 watcher: Arc<Mutex<Self>>,
74 rs: DnsServerWatcherRequestStream,
75 ) -> Result<(), fidl::Error> {
76 rs.try_for_each(move |r| {
77 let watcher = watcher.clone();
78 async move {
79 match r {
80 DnsServerWatcherRequest::WatchServers { responder } => {
81 let mut w = watcher.lock().await;
82 if w.pending_request.is_some() {
83 panic!("No more than 1 pending requests allowed");
84 }
85
86 if let Some(config) = w.configs.pop_front() {
87 responder.send(&config).expect("Failed to fulfill FIDL request");
88 } else {
89 w.pending_request = Some(responder)
90 }
91 }
92 }
93 Ok(())
94 }
95 })
96 .await
97 }
98 }
99
100 #[fasync::run_singlethreaded(test)]
101 async fn test_dns_server_stream() {
102 let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
103 let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
104 let (serve_fut, abort_handle) =
105 futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
106
107 let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
108 let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
109 assert!(stream.next().now_or_never().is_none());
110 assert!(stream.next().now_or_never().is_none());
111 {
112 let mut w = watcher.lock().await;
113 w.push_config(vec![ndp_server()]);
114 w.push_config(vec![static_server()]);
115 }
116 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
117 assert_eq!(source, DnsServersUpdateSource::Netstack);
118 assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
119
120 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
121 assert_eq!(source, DnsServersUpdateSource::Netstack);
122 assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
123
124 abort_handle.abort();
126 stream
127 })
128 .await;
129 let _aborted = serve_result.expect_err("Future must've been aborted");
130 let (source, res) = stream.next().await.expect("Stream must yield a final value");
131 assert_eq!(source, DnsServersUpdateSource::Netstack);
132 let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
133 assert!(stream.next().await.is_none(), "Stream must end after error");
134 }
135}