Skip to main content

fuchsia_bt_test_affordances/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::anyhow;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_bluetooth::{DeviceClass, PeerId, Uuid};
8
9use fidl_fuchsia_bluetooth_gatt2::{Characteristic, ServiceHandle, ServiceInfo};
10use fidl_fuchsia_bluetooth_le::{AdvertisingParameters, ConnectionMarker};
11use fidl_fuchsia_bluetooth_sys::{
12    HostInfo, InputCapability, OutputCapability, PairingOptions, Peer,
13};
14use fuchsia_async::LocalExecutor;
15use fuchsia_bluetooth::types::Channel;
16
17use fuchsia_sync::Mutex;
18use futures::StreamExt;
19use futures::channel::{mpsc, oneshot};
20use std::ffi::{CStr, CString};
21use std::sync::Arc;
22use std::thread;
23
24mod bredr;
25mod gatt;
26mod le;
27mod proxies;
28mod sys;
29
30use proxies::Proxies;
31
32// TODO(b/414848887): Pass more descriptive errors.
33enum Request {
34    GetHosts(oneshot::Sender<Result<Vec<HostInfo>, anyhow::Error>>),
35    GetKnownPeers(oneshot::Sender<Result<Vec<Peer>, anyhow::Error>>),
36    GetPeerId(CString, oneshot::Sender<Result<PeerId, anyhow::Error>>),
37    Connect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
38    Disconnect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
39    Pair(PeerId, PairingOptions, oneshot::Sender<Result<(), anyhow::Error>>),
40    StartPairingDelegate(
41        InputCapability,
42        OutputCapability,
43        oneshot::Sender<Result<(), anyhow::Error>>,
44    ),
45    StopPairingDelegate(oneshot::Sender<bool>),
46    Forget(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
47    ConnectL2cap(PeerId, u16, oneshot::Sender<Result<(), anyhow::Error>>),
48    DisconnectL2cap(oneshot::Sender<Result<(), anyhow::Error>>),
49    WriteL2cap(Vec<u8>, oneshot::Sender<Result<(), anyhow::Error>>),
50    SetDiscovery(bool, oneshot::Sender<Result<(), anyhow::Error>>),
51    SetDiscoverability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
52    SetConnectability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
53    SetLocalName(String, oneshot::Sender<Result<(), anyhow::Error>>),
54    SetDeviceClass(DeviceClass, oneshot::Sender<Result<(), anyhow::Error>>),
55    StartLeScan(
56        oneshot::Sender<
57            Result<
58                mpsc::UnboundedReceiver<
59                    Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
60                >,
61                anyhow::Error,
62            >,
63        >,
64    ),
65    StopLeScan(oneshot::Sender<bool>),
66    ConnectLe(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
67    AdvertisePeripheral(
68        Box<AdvertisingParameters>,
69        std::time::Duration,
70        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
71    ),
72    PublishService(
73        Uuid,
74        ServiceHandle,
75        Vec<Characteristic>,
76        oneshot::Sender<Result<(), anyhow::Error>>,
77    ),
78    DiscoverServices(oneshot::Sender<Result<Vec<ServiceInfo>, anyhow::Error>>),
79    ReadCharacteristic(
80        ServiceHandle,
81        fidl_fuchsia_bluetooth_gatt2::Handle,
82        oneshot::Sender<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>,
83    ),
84    RegisterCharacteristicNotifier(
85        ServiceHandle,
86        fidl_fuchsia_bluetooth_gatt2::Handle,
87        oneshot::Sender<Result<(), anyhow::Error>>,
88    ),
89    AdvertiseService(
90        u16,
91        std::time::Duration,
92        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
93    ),
94    Stop,
95}
96
97pub struct WorkThread {
98    thread_handle: Mutex<Option<thread::JoinHandle<Result<(), anyhow::Error>>>>,
99    sender: mpsc::UnboundedSender<Request>,
100}
101
102impl WorkThread {
103    pub fn spawn() -> Self {
104        let (sender, receiver) = mpsc::unbounded::<Request>();
105
106        let thread_handle = thread::spawn(move || {
107            LocalExecutor::default().run_singlethreaded(Self::handle_requests(receiver))?;
108            Ok(())
109        });
110
111        Self { thread_handle: Mutex::new(Some(thread_handle)), sender }
112    }
113
114    async fn handle_requests(
115        mut receiver: mpsc::UnboundedReceiver<Request>,
116    ) -> Result<(), anyhow::Error> {
117        let mut proxies = Proxies::connect()?;
118        let mut host_cache: Vec<HostInfo> = Vec::new();
119        // TODO(https://fxbug.dev/396500079): Consider HashMap<PeerId, Peer> instead.
120        let peer_cache: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::new()));
121        // TODO(https://fxbug.dev/452075770): Support multiple L2CAP channels.
122        let mut l2cap_channel: Option<Channel> = None;
123        let mut _peripheral_connection: ClientEnd<ConnectionMarker>;
124
125        while let Some(request) = receiver.next().await {
126            match request {
127                Request::GetHosts(result_sender) => {
128                    if let Err(err) = sys::refresh_host_cache(&mut proxies, &mut host_cache).await {
129                        result_sender
130                            .send(Err(anyhow!("refresh_host_cache() error: {err}")))
131                            .unwrap();
132                        continue;
133                    }
134                    result_sender.send(Ok(host_cache.clone())).unwrap();
135                }
136                Request::GetKnownPeers(result_sender) => {
137                    if let Err(err) = sys::refresh_peer_cache(
138                        &mut proxies,
139                        std::time::Duration::from_millis(10),
140                        peer_cache.clone(),
141                    )
142                    .await
143                    {
144                        result_sender
145                            .send(Err(anyhow!("refresh_peer_cache() error: {err}")))
146                            .unwrap();
147                        continue;
148                    }
149                    result_sender.send(Ok(peer_cache.lock().clone())).unwrap();
150                }
151                Request::GetPeerId(address, result_sender) => {
152                    if let Some(peer) = sys::get_peer(
153                        &mut proxies,
154                        &address,
155                        std::time::Duration::from_secs(2),
156                        peer_cache.clone(),
157                    )
158                    .await?
159                    {
160                        result_sender.send(Ok(peer.id.unwrap())).unwrap();
161                        continue;
162                    }
163                    result_sender.send(Err(anyhow!("Peer not found"))).unwrap();
164                }
165                Request::Forget(peer_id, result_sender) => {
166                    result_sender.send(sys::forget(&proxies, &peer_id).await).unwrap();
167                }
168                Request::Connect(peer_id, result_sender) => {
169                    result_sender.send(sys::connect_peer(&proxies, &peer_id).await).unwrap();
170                }
171                Request::Disconnect(peer_id, result_sender) => {
172                    result_sender.send(sys::disconnect_peer(&proxies, &peer_id).await).unwrap();
173                }
174                Request::Pair(peer_id, options, result_sender) => {
175                    result_sender.send(sys::pair(&proxies, &peer_id, &options).await).unwrap();
176                }
177                Request::StartPairingDelegate(input_cap, output_cap, result_sender) => {
178                    result_sender
179                        .send(sys::start_pairing_delegate(&proxies, &input_cap, &output_cap).await)
180                        .unwrap();
181                }
182                Request::StopPairingDelegate(result_sender) => {
183                    result_sender.send(sys::stop_pairing_delegate(&proxies).await).unwrap();
184                }
185                Request::ConnectL2cap(peer_id, psm, result_sender) => {
186                    match bredr::connect_l2cap(&proxies, &peer_id, psm).await {
187                        Ok(channel) => {
188                            l2cap_channel = Some(channel);
189                            result_sender.send(Ok(())).unwrap();
190                        }
191                        Err(err) => {
192                            result_sender.send(Err(err)).unwrap();
193                        }
194                    }
195                }
196                Request::DisconnectL2cap(result_sender) => {
197                    if let Some(_channel) = l2cap_channel.take() {
198                        println!("L2CAP channel disconnected");
199                    }
200                    result_sender.send(Ok(())).unwrap();
201                }
202                Request::WriteL2cap(data, result_sender) => {
203                    if let Some(ref l2cap_channel) = l2cap_channel {
204                        match l2cap_channel.write(&data) {
205                            Ok(_) => result_sender.send(Ok(())).unwrap(),
206                            Err(err) => result_sender
207                                .send(Err(anyhow!("Failed to write to L2CAP channel: {}", err)))
208                                .unwrap(),
209                        }
210                    } else {
211                        result_sender.send(Err(anyhow!("L2CAP channel not connected"))).unwrap();
212                    }
213                }
214                Request::SetDiscovery(discovery, result_sender) => {
215                    result_sender.send(sys::set_discovery(&mut proxies, discovery).await).unwrap();
216                }
217                Request::SetDiscoverability(discoverable, result_sender) => {
218                    result_sender
219                        .send(sys::set_discoverability(&mut proxies, discoverable).await)
220                        .unwrap();
221                }
222                Request::SetConnectability(connectable, result_sender) => {
223                    result_sender
224                        .send(sys::set_connectability(&proxies, connectable).await)
225                        .unwrap();
226                }
227                Request::SetLocalName(name, result_sender) => {
228                    result_sender.send(sys::set_local_name(&proxies, name)).unwrap();
229                }
230                Request::SetDeviceClass(device_class, result_sender) => {
231                    result_sender.send(sys::set_device_class(&proxies, device_class)).unwrap();
232                }
233                Request::StartLeScan(result_sender) => {
234                    result_sender
235                        .send(le::start_le_scan(&mut proxies, peer_cache.clone()).await)
236                        .unwrap();
237                }
238                Request::StopLeScan(result_sender) => {
239                    result_sender.send(le::stop_le_scan(&proxies)).unwrap();
240                }
241                Request::ConnectLe(peer_id, result_sender) => {
242                    result_sender.send(le::connect_le(&mut proxies, &peer_id).await).unwrap();
243                }
244                Request::AdvertisePeripheral(parameters, timeout, result_sender) => {
245                    match le::advertise_peripheral(&proxies, *parameters, timeout).await {
246                        Ok(Some((peer_id, connection))) => {
247                            _peripheral_connection = connection;
248                            result_sender.send(Ok(Some(peer_id))).unwrap();
249                        }
250                        result => {
251                            result_sender.send(result.map(|_| None)).unwrap();
252                        }
253                    }
254                }
255                Request::PublishService(uuid, service_handle, characteristics, result_sender) => {
256                    match gatt::publish_service(&proxies, uuid, service_handle, characteristics)
257                        .await
258                    {
259                        Ok(mut local_service_request_stream) => {
260                            fuchsia_async::Task::spawn(async move {
261                                while let Some(Ok(request)) =
262                                    local_service_request_stream.next().await
263                                {
264                                    // Just log the request for now.
265                                    println!("Received LocalService request: {:?}", request);
266                                }
267                            })
268                            .detach();
269                            result_sender.send(Ok(())).unwrap();
270                        }
271                        Err(err) => {
272                            result_sender.send(Err(err)).unwrap();
273                        }
274                    }
275                }
276                Request::DiscoverServices(result_sender) => {
277                    result_sender.send(gatt::discover_services(&mut proxies).await).unwrap();
278                }
279                Request::ReadCharacteristic(
280                    service_handle,
281                    characteristic_handle,
282                    result_sender,
283                ) => {
284                    result_sender
285                        .send(
286                            gatt::read_characteristic(
287                                &proxies,
288                                service_handle,
289                                characteristic_handle,
290                            )
291                            .await,
292                        )
293                        .unwrap();
294                }
295                Request::RegisterCharacteristicNotifier(
296                    service_handle,
297                    characteristic_handle,
298                    result_sender,
299                ) => {
300                    result_sender
301                        .send(
302                            gatt::register_characteristic_notifier(
303                                &proxies,
304                                service_handle,
305                                characteristic_handle,
306                            )
307                            .await,
308                        )
309                        .unwrap();
310                }
311                Request::AdvertiseService(psm, timeout, result_sender) => {
312                    match bredr::advertise_service(&proxies, psm).await {
313                        Ok(connection_receiver_stream) => {
314                            result_sender
315                                .send(
316                                    bredr::serve_connection_receiver(
317                                        connection_receiver_stream,
318                                        &mut l2cap_channel,
319                                        timeout,
320                                    )
321                                    .await,
322                                )
323                                .unwrap();
324                        }
325                        Err(err) => {
326                            result_sender.send(Err(err)).unwrap();
327                        }
328                    }
329                }
330                Request::Stop => break,
331            }
332        }
333
334        Ok(())
335    }
336
337    pub fn join(&self) -> Result<(), anyhow::Error> {
338        self.sender.clone().unbounded_send(Request::Stop).unwrap();
339        if let Err(err) =
340            self.thread_handle.lock().take().unwrap().join().expect("Failed to join work thread")
341        {
342            return Err(anyhow!("Work thread exited with error: {err}"));
343        }
344        Ok(())
345    }
346
347    // Get hosts.
348    pub async fn get_hosts(&self) -> Result<Vec<HostInfo>, anyhow::Error> {
349        let (sender, receiver) = oneshot::channel::<Result<Vec<HostInfo>, anyhow::Error>>();
350        self.sender.clone().unbounded_send(Request::GetHosts(sender))?;
351        receiver.await?
352    }
353
354    // Get identifier of peer at `address`.
355    pub async fn get_peer_id(&self, address: &CStr) -> Result<PeerId, anyhow::Error> {
356        let (sender, receiver) = oneshot::channel::<Result<PeerId, anyhow::Error>>();
357        self.sender.clone().unbounded_send(Request::GetPeerId(address.to_owned(), sender))?;
358        receiver.await?
359    }
360
361    pub async fn get_known_peers(&self) -> Result<Vec<Peer>, anyhow::Error> {
362        let (sender, receiver) = oneshot::channel::<Result<Vec<Peer>, anyhow::Error>>();
363        self.sender.clone().unbounded_send(Request::GetKnownPeers(sender))?;
364        receiver.await?
365    }
366
367    // Connect to peer with given identifier.
368    pub async fn connect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
369        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
370        self.sender.clone().unbounded_send(Request::Connect(peer_id, sender))?;
371        receiver.await?
372    }
373
374    // Disconnect all logical links (BR/EDR & LE) to peer with given identifier.
375    pub async fn disconnect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
376        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
377        self.sender.clone().unbounded_send(Request::Disconnect(peer_id, sender))?;
378        receiver.await?
379    }
380
381    // Initiate pairing with peer with given identifier.
382    // TODO(b/423700622): Add PairingDelegate server to bt-affordances.
383    pub async fn pair(
384        &self,
385        peer_id: PeerId,
386        options: PairingOptions,
387    ) -> Result<(), anyhow::Error> {
388        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
389        self.sender.clone().unbounded_send(Request::Pair(peer_id, options, sender))?;
390        receiver.await?
391    }
392
393    // Start a pairing delegate server.
394    //
395    // Calling this while a pairing delegate is already active drops and overwrites the existing
396    // delegate.
397    pub async fn start_pairing_delegate(
398        &self,
399        input_cap: InputCapability,
400        output_cap: OutputCapability,
401    ) -> Result<(), anyhow::Error> {
402        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
403        self.sender
404            .clone()
405            .unbounded_send(Request::StartPairingDelegate(input_cap, output_cap, sender))?;
406        receiver.await?
407    }
408
409    // Stop a pairing delegate server if one exists and was started by start_pairing_delegate.
410    // Returns false if no pairing delegate started by start_pairing_delegate is active.
411    pub async fn stop_pairing_delegate(&self) -> bool {
412        let (sender, receiver) = oneshot::channel::<bool>();
413        self.sender.clone().unbounded_send(Request::StopPairingDelegate(sender)).unwrap();
414        receiver.await.unwrap()
415    }
416
417    // Forget peer and delete all bonding information, if peer is found.
418    pub async fn forget_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
419        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
420        self.sender.clone().unbounded_send(Request::Forget(peer_id, sender))?;
421        receiver.await?
422    }
423
424    // Connect a basic L2CAP channel.
425    pub async fn connect_l2cap_channel(
426        &self,
427        peer_id: PeerId,
428        psm: u16,
429    ) -> Result<(), anyhow::Error> {
430        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
431        self.sender.clone().unbounded_send(Request::ConnectL2cap(peer_id, psm, sender))?;
432        receiver.await?
433    }
434
435    // Disconnect an L2CAP channel if one exists.
436    pub async fn disconnect_l2cap(&self) -> Result<(), anyhow::Error> {
437        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
438        self.sender.clone().unbounded_send(Request::DisconnectL2cap(sender))?;
439        receiver.await?
440    }
441
442    // Write data over the L2CAP channel if one exists.
443    pub async fn write_l2cap(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
444        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
445        self.sender.clone().unbounded_send(Request::WriteL2cap(data, sender))?;
446        receiver.await?
447    }
448
449    // Set discovery state.
450    pub async fn set_discovery(&self, discovery: bool) -> Result<(), anyhow::Error> {
451        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
452        self.sender.clone().unbounded_send(Request::SetDiscovery(discovery, sender))?;
453        receiver.await?
454    }
455
456    // Set discoverability state.
457    pub async fn set_discoverability(&self, discoverable: bool) -> Result<(), anyhow::Error> {
458        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
459        self.sender.clone().unbounded_send(Request::SetDiscoverability(discoverable, sender))?;
460        receiver.await?
461    }
462
463    // Set connection policy.
464    pub async fn set_connectability(&self, connectable: bool) -> Result<(), anyhow::Error> {
465        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
466        self.sender.clone().unbounded_send(Request::SetConnectability(connectable, sender))?;
467        receiver.await?
468    }
469
470    // Set local name.
471    pub async fn set_local_name(&self, name: String) -> Result<(), anyhow::Error> {
472        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
473        self.sender.clone().unbounded_send(Request::SetLocalName(name, sender))?;
474        receiver.await?
475    }
476
477    // Set device class.
478    pub async fn set_device_class(&self, device_class: DeviceClass) -> Result<(), anyhow::Error> {
479        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
480        self.sender.clone().unbounded_send(Request::SetDeviceClass(device_class, sender))?;
481        receiver.await?
482    }
483
484    // Scan for all nearby LE peripherals and broadcasters. Returns the receiving end of an
485    // mpsc::channel through which LE peer updates are written. Dropping the receiver closes the
486    // channel, which stops the scan when the next update is received.
487    //
488    // Calling this while a scan is ongoing drops and overwrites the existing scan.
489    pub async fn start_le_scan(
490        &self,
491    ) -> Result<
492        mpsc::UnboundedReceiver<
493            Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
494        >,
495        anyhow::Error,
496    > {
497        let (sender, receiver) = oneshot::channel::<
498            Result<
499                mpsc::UnboundedReceiver<
500                    Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
501                >,
502                anyhow::Error,
503            >,
504        >();
505        self.sender.clone().unbounded_send(Request::StartLeScan(sender))?;
506        receiver.await?
507    }
508
509    // Stop an ongoing LE scan. Returns false if no scan is ongoing. If an ongoing scan is stopped,
510    // the mpsc::channel exposed to the client closes (i.e. `receiver.next()` returns None).
511    pub async fn stop_le_scan(&self) -> bool {
512        let (sender, receiver) = oneshot::channel::<bool>();
513        self.sender.clone().unbounded_send(Request::StopLeScan(sender)).unwrap();
514        receiver.await.unwrap()
515    }
516
517    // Connect an LE peer and store the connection.
518    pub async fn connect_le(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
519        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
520        self.sender.clone().unbounded_send(Request::ConnectLe(peer_id, sender))?;
521        receiver.await?
522    }
523
524    // Start advertising as an LE peripheral, accept the first connection, and return the PeerId of
525    // its initiator. If `connectable` is false, then advertise and return None.
526    pub async fn advertise_peripheral(
527        &self,
528        parameters: AdvertisingParameters,
529        timeout: std::time::Duration,
530    ) -> Result<Option<PeerId>, anyhow::Error> {
531        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
532        self.sender
533            .clone()
534            .unbounded_send(Request::AdvertisePeripheral(Box::new(parameters), timeout, sender))
535            .unwrap();
536        receiver.await?
537    }
538
539    // Publish a GATT service with the given parameters. GATT requests are logged.
540    pub async fn publish_service(
541        &self,
542        uuid: Uuid,
543        service_handle: ServiceHandle,
544        characteristics: Vec<Characteristic>,
545    ) -> Result<(), anyhow::Error> {
546        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
547        self.sender.clone().unbounded_send(Request::PublishService(
548            uuid,
549            service_handle,
550            characteristics,
551            sender,
552        ))?;
553        receiver.await?
554    }
555
556    // Discover the GATT services of the currently connected LE peer.
557    pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, anyhow::Error> {
558        let (sender, receiver) = oneshot::channel::<Result<Vec<ServiceInfo>, anyhow::Error>>();
559        self.sender.clone().unbounded_send(Request::DiscoverServices(sender))?;
560        receiver.await?
561    }
562
563    // Perform a short read of the GATT characteristic identified with the given handles.
564    pub async fn read_characteristic(
565        &self,
566        service_handle: ServiceHandle,
567        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
568    ) -> Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error> {
569        let (sender, receiver) =
570            oneshot::channel::<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>();
571        self.sender.clone().unbounded_send(Request::ReadCharacteristic(
572            service_handle,
573            characteristic_handle,
574            sender,
575        ))?;
576        receiver.await?
577    }
578
579    // Enable notifications/indications on the GATT characteristic with the given handles.
580    //
581    // Only one operation on a Remote Service can be pending at a time.
582    pub async fn register_characteristic_notifier(
583        &self,
584        service_handle: ServiceHandle,
585        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
586    ) -> Result<(), anyhow::Error> {
587        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
588        self.sender.clone().unbounded_send(Request::RegisterCharacteristicNotifier(
589            service_handle,
590            characteristic_handle,
591            sender,
592        ))?;
593        receiver.await?
594    }
595
596    // Advertise a BR/EDR service on the given `psm` until the first connection. Return the PeerId
597    // of that connection. If no connection is established before `timeout` elapses, return None.
598    pub async fn advertise_service(
599        &self,
600        psm: u16,
601        timeout: std::time::Duration,
602    ) -> Result<Option<PeerId>, anyhow::Error> {
603        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
604        self.sender.clone().unbounded_send(Request::AdvertiseService(psm, timeout, sender))?;
605        receiver.await?
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612
613    #[fuchsia::test]
614    fn test_update_peer_cache_handles_duplicates_in_input() {
615        let peer_cache = Arc::new(Mutex::new(Vec::new()));
616
617        let mut peer1 = Peer::default();
618        peer1.id = Some(PeerId { value: 1 });
619        peer1.name = Some("Peer 1".to_string());
620
621        let mut peer2 = Peer::default();
622        peer2.id = Some(PeerId { value: 1 });
623        peer2.name = Some("Peer 2".to_string());
624
625        // List of updated peers includes two entries with the same ID.
626        sys::update_peer_cache(peer_cache.clone(), vec![peer1, peer2.clone()], vec![]);
627
628        let cache = peer_cache.lock();
629
630        // The cache should only keep the final entry.
631        assert_eq!(cache.len(), 1);
632        assert_eq!(cache[0].name.as_deref(), Some("Peer 2"));
633    }
634
635    #[fuchsia::test]
636    fn test_update_peer_cache_replaces_existing_entry() {
637        let mut peer = Peer::default();
638        peer.id = Some(PeerId { value: 1 });
639        peer.name = Some("Peer".to_string());
640        let peer_cache = Arc::new(Mutex::new(vec![peer.clone()]));
641
642        // Update the peer currently inside the cache with a new name.
643        peer.name = Some("Updated peer".to_string());
644        sys::update_peer_cache(peer_cache.clone(), vec![peer], vec![]);
645
646        let cache = peer_cache.lock();
647
648        // The cache should only have one entry with the updated name.
649        assert_eq!(cache.len(), 1);
650        assert_eq!(cache[0].name.as_deref(), Some("Updated peer"));
651    }
652}