dns_server_watcher/
stream.rsuse fidl_fuchsia_net_name::{DnsServerWatcherProxy, DnsServer_};
use async_utils::stream::WithTag as _;
use futures::future::TryFutureExt as _;
use futures::stream::Stream;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum DnsServersUpdateSource {
Default,
Netstack,
Dhcpv4 { interface_id: u64 },
Dhcpv6 { interface_id: u64 },
}
pub fn new_dns_server_stream(
source: DnsServersUpdateSource,
proxy: DnsServerWatcherProxy,
) -> impl Stream<Item = (DnsServersUpdateSource, Result<Vec<DnsServer_>, fidl::Error>)> {
futures::stream::try_unfold(proxy, move |proxy| {
proxy.watch_servers().map_ok(move |s| Some((s, proxy)))
})
.tagged(source)
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::sync::Arc;
use fidl_fuchsia_net_name::{
DnsServerWatcherMarker, DnsServerWatcherRequest, DnsServerWatcherRequestStream,
DnsServerWatcherWatchServersResponder,
};
use fuchsia_async as fasync;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt, TryStreamExt};
use super::*;
use crate::test_util::constants::*;
struct MockDnsServerWatcher {
configs: VecDeque<Vec<DnsServer_>>,
pending_request: Option<DnsServerWatcherWatchServersResponder>,
}
impl MockDnsServerWatcher {
fn new() -> Self {
Self { configs: VecDeque::new(), pending_request: None }
}
fn push_config(&mut self, config: Vec<DnsServer_>) {
match self.pending_request.take() {
Some(req) => {
let () = req.send(&config).expect("Failed to fulfill FIDL request");
}
None => self.configs.push_back(config),
}
}
async fn serve(
watcher: Arc<Mutex<Self>>,
rs: DnsServerWatcherRequestStream,
) -> Result<(), fidl::Error> {
rs.try_for_each(move |r| {
let watcher = watcher.clone();
async move {
match r {
DnsServerWatcherRequest::WatchServers { responder } => {
let mut w = watcher.lock().await;
if w.pending_request.is_some() {
panic!("No more than 1 pending requests allowed");
}
if let Some(config) = w.configs.pop_front() {
responder.send(&config).expect("Failed to fulfill FIDL request");
} else {
w.pending_request = Some(responder)
}
}
}
Ok(())
}
})
.await
}
}
#[fasync::run_singlethreaded(test)]
async fn test_dns_server_stream() {
let watcher = Arc::new(Mutex::new(MockDnsServerWatcher::new()));
let (proxy, rs) = fidl::endpoints::create_proxy_and_stream::<DnsServerWatcherMarker>();
let (serve_fut, abort_handle) =
futures::future::abortable(MockDnsServerWatcher::serve(watcher.clone(), rs));
let (serve_result, mut stream) = futures::future::join(serve_fut, async move {
let mut stream = new_dns_server_stream(DnsServersUpdateSource::Netstack, proxy);
assert!(stream.next().now_or_never().is_none());
assert!(stream.next().now_or_never().is_none());
{
let mut w = watcher.lock().await;
w.push_config(vec![ndp_server()]);
w.push_config(vec![static_server()]);
}
let (source, res) = stream.next().await.expect("stream ended unexpectedly");
assert_eq!(source, DnsServersUpdateSource::Netstack);
assert_eq!(vec![ndp_server()], res.expect("FIDL error occurred"));
let (source, res) = stream.next().await.expect("stream ended unexpectedly");
assert_eq!(source, DnsServersUpdateSource::Netstack);
assert_eq!(vec![static_server()], res.expect("FIDL error occurred"));
abort_handle.abort();
stream
})
.await;
let _aborted = serve_result.expect_err("Future must've been aborted");
let (source, res) = stream.next().await.expect("Stream must yield a final value");
assert_eq!(source, DnsServersUpdateSource::Netstack);
let _fidl_error: fidl::Error = res.expect_err("Stream must yield an error");
assert!(stream.next().await.is_none(), "Stream must end after error");
}
}