dns_server_watcher/
stream.rs
1use fidl_fuchsia_net_name::{DnsServerWatcherProxy, DnsServer_};
8
9use async_utils::stream::WithTag as _;
10use futures::future::TryFutureExt as _;
11use futures::stream::Stream;
12
13#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
15pub enum DnsServersUpdateSource {
16 Default,
17 Netstack,
18 Dhcpv4 { interface_id: u64 },
19 Dhcpv6 { interface_id: u64 },
20 Ndp { interface_id: u64 },
21}
22
23pub fn new_dns_server_stream(
26 source: DnsServersUpdateSource,
27 proxy: DnsServerWatcherProxy,
28) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
29 futures::stream::try_unfold(proxy, move |proxy| {
30 proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
31 })
32 .tagged(source)
33}
34
35#[cfg(test)]
36mod tests {
37 use std::collections::VecDeque;
38 use std::sync::Arc;
39
40 use fidl_fuchsia_net_name::{
41 DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
42 DnsServerWatcherWatchServersResponder,
43 };
44
45 use fuchsia_async as fasync;
46 use futures::lock::Mutex;
47 use futures::{FutureExt, StreamExt, TryStreamExt};
48
49 use super::*;
50 use crate::test_util::constants::*;
51
52 struct MockDnsServerWatcher {
53 configs: VecDeque<Vec<DnsServer_>>,
54 pending_request: Option<DnsServerWatcherWatchServersResponder>,
55 }
56
57 impl MockDnsServerWatcher {
58 fn new() -> Self {
59 Self { configs: VecDeque::new(), pending_request: None }
60 }
61
62 fn push_config(&mut self, config: Vec<DnsServer_>) {
63 match self.pending_request.take() {
64 Some(req) => {
65 let () = req.send(&config).expect("Failed to fulfill FIDL request");
66 }
67 None => self.configs.push_back(config),
68 }
69 }
70
71 async fn serve(
72 watcher: Arc<Mutex<Self>>,
73 rs: DnsServerWatcherRequestStream,
74 ) -> Result<(), fidl::Error> {
75 rs.try_for_each(move |r| {
76 let watcher = watcher.clone();
77 async move {
78 match r {
79 DnsServerWatcherRequest::WatchServers { responder } => {
80 let mut w = watcher.lock().await;
81 if w.pending_request.is_some() {
82 panic!("No more than 1 pending requests allowed");
83 }
84
85 if let Some(config) = w.configs.pop_front() {
86 responder.send(&config).expect("Failed to fulfill FIDL request");
87 } else {
88 w.pending_request = Some(responder)
89 }
90 }
91 }
92 Ok(())
93 }
94 })
95 .await
96 }
97 }
98
99 #[fasync::run_singlethreaded(test)]
100 async fn test_dns_server_stream() {
101 let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
102 let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
103 let (serve_fut, abort_handle) =
104 futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
105
106 let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
107 let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
108 assert!(stream.next().now_or_never().is_none());
109 assert!(stream.next().now_or_never().is_none());
110 {
111 let mut w = watcher.lock().await;
112 w.push_config(vec![ndp_server()]);
113 w.push_config(vec![static_server()]);
114 }
115 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
116 assert_eq!(source, DnsServersUpdateSource::Netstack);
117 assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
118
119 let (source, res) = stream.next().await.expect("stream ended unexpectedly");
120 assert_eq!(source, DnsServersUpdateSource::Netstack);
121 assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
122
123 abort_handle.abort();
125 stream
126 })
127 .await;
128 let _aborted = serve_result.expect_err("Future must've been aborted");
129 let (source, res) = stream.next().await.expect("Stream must yield a final value");
130 assert_eq!(source, DnsServersUpdateSource::Netstack);
131 let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
132 assert!(stream.next().await.is_none(), "Stream must end after error");
133 }
134}