1use async_utils::hanging_get::client::HangingGetStream;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use fidl_fuchsia_bluetooth_sys as sys;
9use fuchsia_bluetooth::types::{Address, HostInfo};
10use fuchsia_bluetooth::Error;
11use futures::ready;
12use futures::stream::{FusedStream, Stream, StreamExt};
13use log::trace;
14
15#[derive(Clone, Copy, Debug, PartialEq)]
17pub enum HostEvent {
18 Discoverable(bool),
20 NewActiveHost { discoverable: bool },
22 NotAvailable,
24}
25
26#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
28enum Terminated {
29 Active,
30 ShuttingDown,
31 Done,
32}
33
34pub struct HostWatcher {
38 host_updates: HangingGetStream<sys::HostWatcherProxy, Vec<sys::HostInfo>>,
40 active_host: Option<HostInfo>,
42 terminated: Terminated,
44}
45
46impl HostWatcher {
47 pub fn new(host_watcher: sys::HostWatcherProxy) -> Self {
50 let host_updates =
51 HangingGetStream::new_with_fn_ptr(host_watcher, sys::HostWatcherProxy::watch);
52 Self { host_updates, active_host: None, terminated: Terminated::Active }
53 }
54
55 #[cfg(test)]
56 fn new_with_state(host_watcher: sys::HostWatcherProxy, active_host: Option<HostInfo>) -> Self {
57 let mut this = Self::new(host_watcher);
58 this.active_host = active_host;
59 this
60 }
61
62 pub fn set_active_host(&mut self, host: HostInfo) {
63 self.active_host = Some(host);
64 }
65
66 fn compare(&self, new: &Option<HostInfo>) -> Option<HostEvent> {
69 trace!("Current ({:?}) - New ({:?})", self.active_host, new);
70 match (&self.active_host, new) {
71 (None, Some(info)) => {
72 Some(HostEvent::NewActiveHost { discoverable: info.discoverable })
73 }
74 (Some(_), None) => Some(HostEvent::NotAvailable),
75 (Some(current_info), Some(new_info)) if current_info.id != new_info.id => {
76 Some(HostEvent::NewActiveHost { discoverable: new_info.discoverable })
77 }
78 (Some(current_info), Some(new_info))
79 if current_info.discoverable != new_info.discoverable =>
80 {
81 Some(HostEvent::Discoverable(new_info.discoverable))
83 }
84 _ => None, }
86 }
87
88 fn handle_host_watcher_update(
89 &mut self,
90 update: Vec<sys::HostInfo>,
91 ) -> Result<Option<HostEvent>, Error> {
92 let maybe_active = update
93 .iter()
94 .find(|info| info.active.unwrap_or(false))
95 .map(HostInfo::try_from)
96 .transpose()?;
97
98 let event = self.compare(&maybe_active);
99 self.active_host = maybe_active;
100 return Ok(event);
101 }
102
103 pub fn addresses(&self) -> Option<Vec<Address>> {
105 self.active_host.as_ref().map(|host| host.addresses.clone())
106 }
107
108 pub fn public_address(&self) -> Option<Address> {
110 self.active_host
111 .as_ref()
112 .map(|host| {
113 host.addresses.iter().find(|addr| matches!(addr, Address::Public(_))).copied()
114 })
115 .flatten()
116 }
117
118 pub fn ble_address(&self) -> Option<Address> {
120 self.addresses()
121 .map(|addrs| addrs.into_iter().find(|addr| matches!(addr, Address::Random(_))))
122 .flatten()
123 .or_else(|| self.public_address())
124 }
125
126 pub fn pairing_mode(&self) -> Option<bool> {
128 self.active_host.as_ref().map(|h| h.discoverable)
129 }
130
131 pub fn local_name(&self) -> Option<String> {
132 self.active_host.as_ref().map(|h| h.local_name.clone()).flatten()
133 }
134}
135
136impl Stream for HostWatcher {
137 type Item = Result<HostEvent, Error>;
138
139 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140 match self.terminated {
141 Terminated::Active => {}
142 Terminated::ShuttingDown => {
143 self.terminated = Terminated::Done;
144 return Poll::Ready(None);
145 }
146 Terminated::Done => panic!("Cannot poll a terminated stream"),
147 }
148
149 loop {
152 let result = ready!(self.host_updates.poll_next_unpin(cx));
153
154 let result = match result {
155 Some(Ok(update)) => match self.handle_host_watcher_update(update) {
156 Ok(None) => continue,
157 Ok(Some(request)) => Some(Ok(request)),
158 Err(e) => Some(Err(e)),
159 },
160 Some(Err(e)) => {
161 self.terminated = Terminated::ShuttingDown;
164 Some(Err(e.into()))
165 }
166 None => None,
167 };
168 if result.is_none() {
169 trace!("HostWatcher hanging-get exhausted");
170 self.terminated = Terminated::Done;
171 }
172
173 return Poll::Ready(result);
174 }
175 }
176}
177
178impl FusedStream for HostWatcher {
179 fn is_terminated(&self) -> bool {
180 self.terminated == Terminated::Done
181 }
182}
183
184#[cfg(test)]
185pub(crate) mod tests {
186 use super::*;
187
188 use assert_matches::assert_matches;
189 use async_utils::PollExt;
190 use fuchsia_async as fasync;
191 use fuchsia_bluetooth::types::{example_host, HostId};
192 use std::pin::pin;
193
194 #[track_caller]
195 fn expect_watch_request(
196 exec: &mut fasync::TestExecutor,
197 stream: &mut sys::HostWatcherRequestStream,
198 ) -> sys::HostWatcherWatchResponder {
199 let expect_fut = stream.select_next_some();
200 let mut expect_fut = pin!(expect_fut);
201 exec.run_until_stalled(&mut expect_fut)
202 .expect("ready")
203 .expect("valid FIDL request")
204 .into_watch()
205 .expect("Watch request")
206 }
207
208 #[fuchsia::test]
209 fn update_with_no_hosts_stream_is_pending() {
210 let mut exec = fasync::TestExecutor::new();
211
212 let (proxy, mut server) =
213 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
214 let mut watcher = HostWatcher::new(proxy);
215
216 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
217
218 let watch_responder = expect_watch_request(&mut exec, &mut server);
219 let _ = watch_responder.send(&[]).unwrap();
221
222 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
225 }
226
227 #[fuchsia::test]
228 fn update_with_active_host_change_yields_items() {
229 let mut exec = fasync::TestExecutor::new();
230
231 let (proxy, mut server) =
232 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
233 let mut watcher = HostWatcher::new(proxy);
234
235 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
236
237 let watch_responder = expect_watch_request(&mut exec, &mut server);
238 let host1 =
240 example_host(HostId(1), true, false);
241 let _ = watch_responder.send(&[host1]).unwrap();
242
243 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update ready");
245 assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
246
247 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
249 let watch_responder = expect_watch_request(&mut exec, &mut server);
250 let _ = watch_responder.send(&[]).unwrap();
252
253 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
255 assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
256 }
257
258 #[fuchsia::test]
259 fn active_to_no_active_host_update_yields_event() {
260 let mut exec = fasync::TestExecutor::new();
261
262 let (proxy, mut server) =
263 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
264 let host = example_host(HostId(1), true, false);
265 let mut watcher = HostWatcher::new_with_state(proxy, host.try_into().ok());
267 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
268
269 let watch_responder = expect_watch_request(&mut exec, &mut server);
270 let host2 =
272 example_host(HostId(2), false, false);
273 let host3 =
274 example_host(HostId(3), false, false);
275 let _ = watch_responder.send(&[host2, host3]).unwrap();
276
277 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
280 assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
281 }
282
283 #[fuchsia::test]
284 fn update_with_no_active_host_changes_is_pending() {
285 let mut exec = fasync::TestExecutor::new();
286
287 let (proxy, mut server) =
288 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
289 let mut host1 =
291 example_host(HostId(1), true, false);
292 let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
293 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
294
295 let watch_responder = expect_watch_request(&mut exec, &mut server);
296 let host2 =
298 example_host(HostId(2), false, false);
299 let _ = watch_responder.send(&[host1.clone(), host2]).unwrap();
300
301 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
303
304 let watch_responder = expect_watch_request(&mut exec, &mut server);
305 host1.discovering = Some(false);
307 host1.local_name = Some("123".to_string());
308 let _ = watch_responder.send(&[host1]).unwrap();
309
310 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
312 }
313
314 #[fuchsia::test]
315 fn update_with_active_host_discoverable_change_yields_item() {
316 let mut exec = fasync::TestExecutor::new();
317
318 let (proxy, mut server) =
319 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
320 let mut host1 =
321 example_host(HostId(1), true, false);
322 let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
324 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
325
326 let watch_responder = expect_watch_request(&mut exec, &mut server);
327 host1.discoverable = Some(true);
329 let _ = watch_responder.send(&[host1.clone()]).unwrap();
330
331 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
332 assert_matches!(item, Some(Ok(HostEvent::Discoverable(true))));
333 }
334
335 #[fuchsia::test]
336 fn update_with_new_active_host_yields_item() {
337 let mut exec = fasync::TestExecutor::new();
338
339 let (proxy, mut server) =
340 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
341 let host1 =
342 example_host(HostId(1), true, false);
343 let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
345 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
346
347 let watch_responder = expect_watch_request(&mut exec, &mut server);
349 let host2 =
350 example_host(HostId(2), true, false);
351 let _ = watch_responder.send(&[host2]).unwrap();
352
353 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
354 assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
355
356 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
358 let watch_responder = expect_watch_request(&mut exec, &mut server);
359 let host3 = example_host(HostId(3), true, true);
360 let _ = watch_responder.send(&[host3]).unwrap();
361
362 let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
363 assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: true })));
364 }
365
366 #[fuchsia::test]
367 fn invalidly_formatted_host_returns_error_stream_item() {
368 let mut exec = fasync::TestExecutor::new();
369
370 let (proxy, mut server) =
371 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
372 let mut watcher = HostWatcher::new(proxy);
373 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
374 assert!(!watcher.is_terminated());
375
376 let watch_responder = expect_watch_request(&mut exec, &mut server);
377 let invalid_host =
379 sys::HostInfo { id: Some(HostId(12).into()), active: Some(true), ..Default::default() };
380 let _ = watch_responder.send(&[invalid_host]).unwrap();
381
382 let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher update");
383 assert_matches!(item, Some(Err(_)));
384 assert!(!watcher.is_terminated());
385 }
386
387 #[fuchsia::test]
388 fn closing_fidl_server_terminates_host_watcher() {
389 let mut exec = fasync::TestExecutor::new();
390
391 let (proxy, mut server) =
392 fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
393 let mut watcher = HostWatcher::new(proxy);
394 let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
395 assert!(!watcher.is_terminated());
396
397 let watch_responder = expect_watch_request(&mut exec, &mut server);
398 let _ = watch_responder.send(&[]).unwrap();
399
400 drop(server);
404
405 let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher FIDL error");
406 assert_matches!(item, Some(Err(_)));
407 assert!(!watcher.is_terminated());
408
409 let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher termination");
410 assert_matches!(item, None);
411 assert!(watcher.is_terminated());
412 }
413}