host_watcher/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_utils::hanging_get::client::HangingGetStream;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use fidl_fuchsia_bluetooth_sys as sys;
9use fuchsia_bluetooth::types::{Address, HostInfo};
10use fuchsia_bluetooth::Error;
11use futures::ready;
12use futures::stream::{FusedStream, Stream, StreamExt};
13use log::trace;
14
15/// Item type returned by `<HostWatcher as Stream>::poll_next`.
16#[derive(Clone, Copy, Debug, PartialEq)]
17pub enum HostEvent {
18    /// The existing active host changed discoverable state.
19    Discoverable(bool),
20    /// There is a new active host.
21    NewActiveHost { discoverable: bool },
22    /// There is no active host.
23    NotAvailable,
24}
25
26/// The termination status of the stream.
27#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
28enum Terminated {
29    Active,
30    ShuttingDown,
31    Done,
32}
33
34/// Watches for changes in state in the active Bluetooth Host of the system.
35/// `HostWatcher` implements Stream. The `HostWatcher` _must_ be polled to receive updates about
36/// the currently active host.
37pub struct HostWatcher {
38    /// Hanging-get client wrapper to watch for changes in Host state.
39    host_updates: HangingGetStream<sys::HostWatcherProxy, Vec<sys::HostInfo>>,
40    /// Information about the currently active Host, or None if there is no such Host.
41    active_host: Option<HostInfo>,
42    /// Termination status of the `host_updates` watcher.
43    terminated: Terminated,
44}
45
46impl HostWatcher {
47    /// Returns a HostWatcher that watches for changes in state of the currently active Bluetooth
48    /// Host.
49    pub fn new(host_watcher: sys::HostWatcherProxy) -> Self {
50        let host_updates =
51            HangingGetStream::new_with_fn_ptr(host_watcher, sys::HostWatcherProxy::watch);
52        Self { host_updates, active_host: None, terminated: Terminated::Active }
53    }
54
55    #[cfg(test)]
56    fn new_with_state(host_watcher: sys::HostWatcherProxy, active_host: Option<HostInfo>) -> Self {
57        let mut this = Self::new(host_watcher);
58        this.active_host = active_host;
59        this
60    }
61
62    pub fn set_active_host(&mut self, host: HostInfo) {
63        self.active_host = Some(host);
64    }
65
66    // Compares the `new` host state to the current and returns a HostEvent if the relevant state
67    // has changed.
68    fn compare(&self, new: &Option<HostInfo>) -> Option<HostEvent> {
69        trace!("Current ({:?}) - New ({:?})", self.active_host, new);
70        match (&self.active_host, new) {
71            (None, Some(info)) => {
72                Some(HostEvent::NewActiveHost { discoverable: info.discoverable })
73            }
74            (Some(_), None) => Some(HostEvent::NotAvailable),
75            (Some(current_info), Some(new_info)) if current_info.id != new_info.id => {
76                Some(HostEvent::NewActiveHost { discoverable: new_info.discoverable })
77            }
78            (Some(current_info), Some(new_info))
79                if current_info.discoverable != new_info.discoverable =>
80            {
81                // The host discoverable state changed.
82                Some(HostEvent::Discoverable(new_info.discoverable))
83            }
84            _ => None, // Otherwise, there was no change in host availability or state.
85        }
86    }
87
88    fn handle_host_watcher_update(
89        &mut self,
90        update: Vec<sys::HostInfo>,
91    ) -> Result<Option<HostEvent>, Error> {
92        let maybe_active = update
93            .iter()
94            .find(|info| info.active.unwrap_or(false))
95            .map(HostInfo::try_from)
96            .transpose()?;
97
98        let event = self.compare(&maybe_active);
99        self.active_host = maybe_active;
100        return Ok(event);
101    }
102
103    /// Returns all the known addresses of the active Host, or None if not set.
104    pub fn addresses(&self) -> Option<Vec<Address>> {
105        self.active_host.as_ref().map(|host| host.addresses.clone())
106    }
107
108    /// Returns the public address of the active Host, or None if not set.
109    pub fn public_address(&self) -> Option<Address> {
110        self.active_host
111            .as_ref()
112            .map(|host| {
113                host.addresses.iter().find(|addr| matches!(addr, Address::Public(_))).copied()
114            })
115            .flatten()
116    }
117
118    /// Returns the BLE address of the active Host, or None if not set.
119    pub fn ble_address(&self) -> Option<Address> {
120        self.addresses()
121            .map(|addrs| addrs.into_iter().find(|addr| matches!(addr, Address::Random(_))))
122            .flatten()
123            .or_else(|| self.public_address())
124    }
125
126    /// Returns the current discoverable state of the active Host, or None if not set.
127    pub fn pairing_mode(&self) -> Option<bool> {
128        self.active_host.as_ref().map(|h| h.discoverable)
129    }
130
131    pub fn local_name(&self) -> Option<String> {
132        self.active_host.as_ref().map(|h| h.local_name.clone()).flatten()
133    }
134}
135
136impl Stream for HostWatcher {
137    type Item = Result<HostEvent, Error>;
138
139    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140        match self.terminated {
141            Terminated::Active => {}
142            Terminated::ShuttingDown => {
143                self.terminated = Terminated::Done;
144                return Poll::Ready(None);
145            }
146            Terminated::Done => panic!("Cannot poll a terminated stream"),
147        }
148
149        // Keep polling the request stream until it produces a request that should be returned or it
150        // produces Poll::Pending.
151        loop {
152            let result = ready!(self.host_updates.poll_next_unpin(cx));
153
154            let result = match result {
155                Some(Ok(update)) => match self.handle_host_watcher_update(update) {
156                    Ok(None) => continue,
157                    Ok(Some(request)) => Some(Ok(request)),
158                    Err(e) => Some(Err(e)),
159                },
160                Some(Err(e)) => {
161                    // FIDL errors are typically irrecoverable - return the Error and stage stream
162                    // for termination. The next time it is polled, it will complete.
163                    self.terminated = Terminated::ShuttingDown;
164                    Some(Err(e.into()))
165                }
166                None => None,
167            };
168            if result.is_none() {
169                trace!("HostWatcher hanging-get exhausted");
170                self.terminated = Terminated::Done;
171            }
172
173            return Poll::Ready(result);
174        }
175    }
176}
177
178impl FusedStream for HostWatcher {
179    fn is_terminated(&self) -> bool {
180        self.terminated == Terminated::Done
181    }
182}
183
184#[cfg(test)]
185pub(crate) mod tests {
186    use super::*;
187
188    use assert_matches::assert_matches;
189    use async_utils::PollExt;
190    use fuchsia_async as fasync;
191    use fuchsia_bluetooth::types::{example_host, HostId};
192    use std::pin::pin;
193
194    #[track_caller]
195    fn expect_watch_request(
196        exec: &mut fasync::TestExecutor,
197        stream: &mut sys::HostWatcherRequestStream,
198    ) -> sys::HostWatcherWatchResponder {
199        let expect_fut = stream.select_next_some();
200        let mut expect_fut = pin!(expect_fut);
201        exec.run_until_stalled(&mut expect_fut)
202            .expect("ready")
203            .expect("valid FIDL request")
204            .into_watch()
205            .expect("Watch request")
206    }
207
208    #[fuchsia::test]
209    fn update_with_no_hosts_stream_is_pending() {
210        let mut exec = fasync::TestExecutor::new();
211
212        let (proxy, mut server) =
213            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
214        let mut watcher = HostWatcher::new(proxy);
215
216        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
217
218        let watch_responder = expect_watch_request(&mut exec, &mut server);
219        // Respond with no hosts.
220        let _ = watch_responder.send(&[]).unwrap();
221
222        // By default, there are no hosts, so when the upstream watcher responds with no hosts, the
223        // watcher stream should not yield an event.
224        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
225    }
226
227    #[fuchsia::test]
228    fn update_with_active_host_change_yields_items() {
229        let mut exec = fasync::TestExecutor::new();
230
231        let (proxy, mut server) =
232            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
233        let mut watcher = HostWatcher::new(proxy);
234
235        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
236
237        let watch_responder = expect_watch_request(&mut exec, &mut server);
238        // Respond with an active host.
239        let host1 =
240            example_host(HostId(1), /* active= */ true, /* discoverable= */ false);
241        let _ = watch_responder.send(&[host1]).unwrap();
242
243        // HostWatcher stream should yield a change in host state.
244        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update ready");
245        assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
246
247        // Because this is a hanging-get, we expect the HostWatcher to make the next request.
248        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
249        let watch_responder = expect_watch_request(&mut exec, &mut server);
250        // Respond with no hosts.
251        let _ = watch_responder.send(&[]).unwrap();
252
253        // HostWatcher stream should yield a change in host state.
254        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
255        assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
256    }
257
258    #[fuchsia::test]
259    fn active_to_no_active_host_update_yields_event() {
260        let mut exec = fasync::TestExecutor::new();
261
262        let (proxy, mut server) =
263            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
264        let host = example_host(HostId(1), /* active= */ true, /* discoverable= */ false);
265        // HostWatcher starts off with a tracked active, non-discoverable host.
266        let mut watcher = HostWatcher::new_with_state(proxy, host.try_into().ok());
267        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
268
269        let watch_responder = expect_watch_request(&mut exec, &mut server);
270        // Respond with no active hosts.
271        let host2 =
272            example_host(HostId(2), /* active= */ false, /* discoverable= */ false);
273        let host3 =
274            example_host(HostId(3), /* active= */ false, /* discoverable= */ false);
275        let _ = watch_responder.send(&[host2, host3]).unwrap();
276
277        // HostWatcher stream should yield a change in host state since it went from active host
278        // to no active host.
279        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
280        assert_matches!(item, Some(Ok(HostEvent::NotAvailable)));
281    }
282
283    #[fuchsia::test]
284    fn update_with_no_active_host_changes_is_pending() {
285        let mut exec = fasync::TestExecutor::new();
286
287        let (proxy, mut server) =
288            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
289        // HostWatcher starts off with a tracked active, non-discoverable host.
290        let mut host1 =
291            example_host(HostId(1), /* active= */ true, /* discoverable= */ false);
292        let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
293        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
294
295        let watch_responder = expect_watch_request(&mut exec, &mut server);
296        // Respond with the same active host, and a different inactive host.
297        let host2 =
298            example_host(HostId(2), /* active= */ false, /* discoverable= */ false);
299        let _ = watch_responder.send(&[host1.clone(), host2]).unwrap();
300
301        // No HostWatcher stream item because the active host has not changed.
302        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
303
304        let watch_responder = expect_watch_request(&mut exec, &mut server);
305        // Same active host changes - but not in a relevant way.
306        host1.discovering = Some(false);
307        host1.local_name = Some("123".to_string());
308        let _ = watch_responder.send(&[host1]).unwrap();
309
310        // No HostWatcher stream item because the discoverable of the active host hasn't changed.
311        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
312    }
313
314    #[fuchsia::test]
315    fn update_with_active_host_discoverable_change_yields_item() {
316        let mut exec = fasync::TestExecutor::new();
317
318        let (proxy, mut server) =
319            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
320        let mut host1 =
321            example_host(HostId(1), /* active= */ true, /* discoverable= */ false);
322        // HostWatcher starts off with a tracked active, non-discoverable host.
323        let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
324        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
325
326        let watch_responder = expect_watch_request(&mut exec, &mut server);
327        // The same active host becomes discoverable.
328        host1.discoverable = Some(true);
329        let _ = watch_responder.send(&[host1.clone()]).unwrap();
330
331        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
332        assert_matches!(item, Some(Ok(HostEvent::Discoverable(true))));
333    }
334
335    #[fuchsia::test]
336    fn update_with_new_active_host_yields_item() {
337        let mut exec = fasync::TestExecutor::new();
338
339        let (proxy, mut server) =
340            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
341        let host1 =
342            example_host(HostId(1), /* active= */ true, /* discoverable= */ false);
343        // HostWatcher starts off with a tracked active, non-discoverable host.
344        let mut watcher = HostWatcher::new_with_state(proxy, host1.clone().try_into().ok());
345        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
346
347        // Receive an update about a new active, non-discoverable host.
348        let watch_responder = expect_watch_request(&mut exec, &mut server);
349        let host2 =
350            example_host(HostId(2), /* active= */ true, /* discoverable= */ false);
351        let _ = watch_responder.send(&[host2]).unwrap();
352
353        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
354        assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: false })));
355
356        // Receive an update about a new active, discoverable host.
357        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
358        let watch_responder = expect_watch_request(&mut exec, &mut server);
359        let host3 = example_host(HostId(3), /* active= */ true, /* discoverable= */ true);
360        let _ = watch_responder.send(&[host3]).unwrap();
361
362        let item = exec.run_until_stalled(&mut watcher.next()).expect("host update");
363        assert_matches!(item, Some(Ok(HostEvent::NewActiveHost { discoverable: true })));
364    }
365
366    #[fuchsia::test]
367    fn invalidly_formatted_host_returns_error_stream_item() {
368        let mut exec = fasync::TestExecutor::new();
369
370        let (proxy, mut server) =
371            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
372        let mut watcher = HostWatcher::new(proxy);
373        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
374        assert!(!watcher.is_terminated());
375
376        let watch_responder = expect_watch_request(&mut exec, &mut server);
377        // HostInfo is missing a bunch of mandatory fields.
378        let invalid_host =
379            sys::HostInfo { id: Some(HostId(12).into()), active: Some(true), ..Default::default() };
380        let _ = watch_responder.send(&[invalid_host]).unwrap();
381
382        let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher update");
383        assert_matches!(item, Some(Err(_)));
384        assert!(!watcher.is_terminated());
385    }
386
387    #[fuchsia::test]
388    fn closing_fidl_server_terminates_host_watcher() {
389        let mut exec = fasync::TestExecutor::new();
390
391        let (proxy, mut server) =
392            fidl::endpoints::create_proxy_and_stream::<sys::HostWatcherMarker>();
393        let mut watcher = HostWatcher::new(proxy);
394        let _ = exec.run_until_stalled(&mut watcher.next()).expect_pending("No updates");
395        assert!(!watcher.is_terminated());
396
397        let watch_responder = expect_watch_request(&mut exec, &mut server);
398        let _ = watch_responder.send(&[]).unwrap();
399
400        // The upstream `HostWatcher` protocol server disconnects. This should result in a FIDL
401        // error propagated to the stream. The next time the `HostWatcher` stream is polled, it
402        // should detect closure and terminate.
403        drop(server);
404
405        let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher FIDL error");
406        assert_matches!(item, Some(Err(_)));
407        assert!(!watcher.is_terminated());
408
409        let item = exec.run_until_stalled(&mut watcher.next()).expect("host watcher termination");
410        assert_matches!(item, None);
411        assert!(watcher.is_terminated());
412    }
413}