use async_utils::hanging_get::client::HangingGetStream;
use core::pin::Pin;
use core::task::{Context, Poll};
use fidl_fuchsia_bluetooth_sys as sys;
use fuchsia_bluetooth::types::{Address, HostInfo};
use fuchsia_bluetooth::Error;
use futures::ready;
use futures::stream::{FusedStream, Stream, StreamExt};
use tracing::trace;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum HostEvent {
Discoverable(bool),
NewActiveHost { discoverable: bool },
NotAvailable,
}
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
enum Terminated {
Active,
ShuttingDown,
Done,
}
pub struct HostWatcher {
host_updates: HangingGetStream<sys::HostWatcherProxy, Vec<sys::HostInfo>>,
active_host: Option<HostInfo>,
terminated: Terminated,
}
impl HostWatcher {
pub fn new(host_watcher: sys::HostWatcherProxy) -> Self {
let host_updates =
HangingGetStream::new_with_fn_ptr(host_watcher, sys::HostWatcherProxy::watch);
Self { host_updates, active_host: None, terminated: Terminated::Active }
}
#[cfg(test)]
fn new_with_state(host_watcher: sys::HostWatcherProxy, active_host: Option<HostInfo>) -> Self {
let mut this = Self::new(host_watcher);
this.active_host = active_host;
this
}
pub fn set_active_host(&mut self, host: HostInfo) {
self.active_host = Some(host);
}
fn compare(&self, new: &Option<HostInfo>) -> Option<HostEvent> {
trace!("Current ({:?}) - New ({:?})", self.active_host, new);
match (&self.active_host, new) {
(None, Some(info)) => {
Some(HostEvent::NewActiveHost { discoverable: info.discoverable })
}
(Some(_), None) => Some(HostEvent::NotAvailable),
(Some(current_info), Some(new_info)) if current_info.id != new_info.id => {
Some(HostEvent::NewActiveHost { discoverable: new_info.discoverable })
}
(Some(current_info), Some(new_info))
if current_info.discoverable != new_info.discoverable =>
{
Some(HostEvent::Discoverable(new_info.discoverable))
}
_ => None, }
}
fn handle_host_watcher_update(
&mut self,
update: Vec<sys::HostInfo>,
) -> Result<Option<HostEvent>, Error> {
let maybe_active = update
.iter()
.find(|info| info.active.unwrap_or(false))
.map(HostInfo::try_from)
.transpose()?;
let event = self.compare(&maybe_active);
self.active_host = maybe_active;
return Ok(event);
}
pub fn addresses(&self) -> Option<Vec<Address>> {
self.active_host.as_ref().map(|host| host.addresses.clone())
}
pub fn public_address(&self) -> Option<Address> {
self.active_host
.as_ref()
.map(|host| {
host.addresses.iter().find(|addr| matches!(addr, Address::Public(_))).copied()
})
.flatten()
}
pub fn ble_address(&self) -> Option<Address> {
self.addresses()
.map(|addrs| addrs.into_iter().find(|addr| matches!(addr, Address::Random(_))))
.flatten()
.or_else(|| self.public_address())
}
pub fn pairing_mode(&self) -> Option<bool> {
self.active_host.as_ref().map(|h| h.discoverable)
}
pub fn local_name(&self) -> Option<String> {
self.active_host.as_ref().map(|h| h.local_name.clone()).flatten()
}
}
impl Stream for HostWatcher {
type Item = Result<HostEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.terminated {
Terminated::Active => {}
Terminated::ShuttingDown => {
self.terminated = Terminated::Done;
return Poll::Ready(None);
}
Terminated::Done => panic!("Cannot poll a terminated stream"),
}
loop {
let result = ready!(self.host_updates.poll_next_unpin(cx));
let result = match result {
Some(Ok(update)) => match self.handle_host_watcher_update(update) {
Ok(None) => continue,
Ok(Some(request)) => Some(Ok(request)),
Err(e) => Some(Err(e)),
},
Some(Err(e)) => {
self.terminated = Terminated::ShuttingDown;
Some(Err(e.into()))
}
None => None,
};
if result.is_none() {
trace!("HostWatcher hanging-get exhausted");
self.terminated = Terminated::Done;
}
return Poll::Ready(result);
}
}
}
impl FusedStream for HostWatcher {
fn is_terminated(&self) -> bool {
self.terminated == Terminated::Done
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use assert_matches::assert_matches;
use async_utils::PollExt;
use fuchsia_async as fasync;
use fuchsia_bluetooth::types::{example_host, HostId};
use std::pin::pin;
#[track_caller]
fn expect_watch_request(
exec: &mut fasync::TestExecutor,
stream: &mut sys::HostWatcherRequestStream,
) -> sys::HostWatcherWatchResponder {
let expect_fut = stream.select_next_some();
let mut expect_fut = pin!(expect_fut);
exec.run_until_stalled(&mut expect_fut)
.expect("ready")
.expect("valid FIDL request")
.into_watch()
.expect("Watch request")
}
#[fuchsia::test]
fn update_with_no_hosts_stream_is_pending() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut watcher = HostWatcher::new(proxy);
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let _ = watch_responder.send(&[]).unwrap();
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
}
#[fuchsia::test]
fn update_with_active_host_change_yields_items() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut watcher = HostWatcher::new(proxy);
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let host1 =
example_host(HostId(1), true, false);
let _ = watch_responder.send(&[host1]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update ready");
assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let _ = watch_responder.send(&[]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
}
#[fuchsia::test]
fn active_to_no_active_host_update_yields_event() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let host = example_host(HostId(1), true, false);
let mut watcher = HostWatcher::new_with_state(proxy, host.try_into().ok());
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let host2 =
example_host(HostId(2), false, false);
let host3 =
example_host(HostId(3), false, false);
let _ = watch_responder.send(&[host2, host3]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
}
#[fuchsia::test]
fn update_with_no_active_host_changes_is_pending() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut host1 =
example_host(HostId(1), true, false);
let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let host2 =
example_host(HostId(2), false, false);
let _ = watch_responder.send(&[host1.clone(), host2]).unwrap();
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
host1.discovering = Some(false);
host1.local_name = Some("123".to_string());
let _ = watch_responder.send(&[host1]).unwrap();
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
}
#[fuchsia::test]
fn update_with_active_host_discoverable_change_yields_item() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut host1 =
example_host(HostId(1), true, false);
let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
host1.discoverable = Some(true);
let _ = watch_responder.send(&[host1.clone()]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
assert_matches!(item, Some(Ok(HostEvent::Discoverable(true))));
}
#[fuchsia::test]
fn update_with_new_active_host_yields_item() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let host1 =
example_host(HostId(1), true, false);
let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let host2 =
example_host(HostId(2), true, false);
let _ = watch_responder.send(&[host2]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
let watch_responder = expect_watch_request(&mut exec, &mut server);
let host3 = example_host(HostId(3), true, true);
let _ = watch_responder.send(&[host3]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: true })));
}
#[fuchsia::test]
fn invalidly_formatted_host_returns_error_stream_item() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut watcher = HostWatcher::new(proxy);
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
assert!(!watcher.is_terminated());
let watch_responder = expect_watch_request(&mut exec, &mut server);
let invalid_host =
sys::HostInfo { id: Some(HostId(12).into()), active: Some(true), ..Default::default() };
let _ = watch_responder.send(&[invalid_host]).unwrap();
let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher update");
assert_matches!(item, Some(Err(_)));
assert!(!watcher.is_terminated());
}
#[fuchsia::test]
fn closing_fidl_server_terminates_host_watcher() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
let mut watcher = HostWatcher::new(proxy);
let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
assert!(!watcher.is_terminated());
let watch_responder = expect_watch_request(&mut exec, &mut server);
let _ = watch_responder.send(&[]).unwrap();
drop(server);
let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher FIDL error");
assert_matches!(item, Some(Err(_)));
assert!(!watcher.is_terminated());
let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher termination");
assert_matches!(item, None);
assert!(watcher.is_terminated());
}
}