Skip to main content

fuchsia_bt_test_affordances/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::anyhow;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_bluetooth::{DeviceClass, HostId, PeerId, Uuid};
8
9use fidl_fuchsia_bluetooth_gatt2::{Characteristic, ServiceHandle, ServiceInfo};
10use fidl_fuchsia_bluetooth_le::{AdvertisingParameters, ConnectionMarker};
11use fidl_fuchsia_bluetooth_sys::{
12    HostInfo, InputCapability, OutputCapability, PairingOptions, Peer,
13};
14use fuchsia_async::LocalExecutor;
15use fuchsia_bluetooth::types::Channel;
16
17use fuchsia_sync::Mutex;
18use futures::StreamExt;
19use futures::channel::{mpsc, oneshot};
20use std::ffi::{CStr, CString};
21use std::sync::Arc;
22use std::thread;
23
24mod bredr;
25mod gatt;
26mod le;
27mod proxies;
28mod sys;
29
30use proxies::Proxies;
31
32// TODO(b/414848887): Pass more descriptive errors.
33enum Request {
34    GetHosts(oneshot::Sender<Result<Vec<HostInfo>, anyhow::Error>>),
35    GetKnownPeers(oneshot::Sender<Result<Vec<Peer>, anyhow::Error>>),
36    GetPeerId(CString, oneshot::Sender<Result<PeerId, anyhow::Error>>),
37    Connect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
38    Disconnect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
39    Pair(PeerId, PairingOptions, oneshot::Sender<Result<(), anyhow::Error>>),
40    StartPairingDelegate(
41        InputCapability,
42        OutputCapability,
43        oneshot::Sender<Result<(), anyhow::Error>>,
44    ),
45    StopPairingDelegate(oneshot::Sender<bool>),
46    Forget(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
47    ConnectL2cap(PeerId, u16, oneshot::Sender<Result<(), anyhow::Error>>),
48    DisconnectL2cap(oneshot::Sender<Result<(), anyhow::Error>>),
49    WriteL2cap(Vec<u8>, oneshot::Sender<Result<(), anyhow::Error>>),
50    SetDiscovery(bool, oneshot::Sender<Result<(), anyhow::Error>>),
51    SetDiscoverability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
52    SetConnectability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
53    SetActiveHost(HostId, oneshot::Sender<Result<(), anyhow::Error>>),
54    SetLocalName(String, oneshot::Sender<Result<(), anyhow::Error>>),
55    SetDeviceClass(DeviceClass, oneshot::Sender<Result<(), anyhow::Error>>),
56    StartLeScan(
57        oneshot::Sender<
58            Result<
59                mpsc::UnboundedReceiver<
60                    Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
61                >,
62                anyhow::Error,
63            >,
64        >,
65    ),
66    StopLeScan(oneshot::Sender<bool>),
67    ConnectLe(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
68    AdvertisePeripheral(
69        Box<AdvertisingParameters>,
70        std::time::Duration,
71        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
72    ),
73    PublishService(
74        Uuid,
75        ServiceHandle,
76        Vec<Characteristic>,
77        oneshot::Sender<Result<(), anyhow::Error>>,
78    ),
79    DiscoverServices(oneshot::Sender<Result<Vec<ServiceInfo>, anyhow::Error>>),
80    ReadCharacteristic(
81        ServiceHandle,
82        fidl_fuchsia_bluetooth_gatt2::Handle,
83        oneshot::Sender<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>,
84    ),
85    RegisterCharacteristicNotifier(
86        ServiceHandle,
87        fidl_fuchsia_bluetooth_gatt2::Handle,
88        oneshot::Sender<Result<(), anyhow::Error>>,
89    ),
90    AdvertiseService(
91        u16,
92        std::time::Duration,
93        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
94    ),
95    Stop,
96}
97
98pub struct WorkThread {
99    thread_handle: Mutex<Option<thread::JoinHandle<Result<(), anyhow::Error>>>>,
100    sender: mpsc::UnboundedSender<Request>,
101}
102
103impl WorkThread {
104    pub fn spawn() -> Self {
105        let (sender, receiver) = mpsc::unbounded::<Request>();
106
107        let thread_handle = thread::spawn(move || {
108            LocalExecutor::default().run_singlethreaded(Self::handle_requests(receiver))?;
109            Ok(())
110        });
111
112        Self { thread_handle: Mutex::new(Some(thread_handle)), sender }
113    }
114
115    async fn handle_requests(
116        mut receiver: mpsc::UnboundedReceiver<Request>,
117    ) -> Result<(), anyhow::Error> {
118        let mut proxies = Proxies::connect()?;
119        let mut host_cache: Vec<HostInfo> = Vec::new();
120        // TODO(https://fxbug.dev/396500079): Consider HashMap<PeerId, Peer> instead.
121        let peer_cache: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::new()));
122        // TODO(https://fxbug.dev/452075770): Support multiple L2CAP channels.
123        let mut l2cap_channel: Option<Channel> = None;
124        let mut _peripheral_connection: ClientEnd<ConnectionMarker>;
125
126        while let Some(request) = receiver.next().await {
127            match request {
128                Request::GetHosts(result_sender) => {
129                    if let Err(err) = sys::refresh_host_cache(&mut proxies, &mut host_cache).await {
130                        result_sender
131                            .send(Err(anyhow!("refresh_host_cache() error: {err}")))
132                            .unwrap();
133                        continue;
134                    }
135                    result_sender.send(Ok(host_cache.clone())).unwrap();
136                }
137                Request::GetKnownPeers(result_sender) => {
138                    if let Err(err) = sys::refresh_peer_cache(
139                        &mut proxies,
140                        std::time::Duration::from_millis(10),
141                        peer_cache.clone(),
142                    )
143                    .await
144                    {
145                        result_sender
146                            .send(Err(anyhow!("refresh_peer_cache() error: {err}")))
147                            .unwrap();
148                        continue;
149                    }
150                    result_sender.send(Ok(peer_cache.lock().clone())).unwrap();
151                }
152                Request::GetPeerId(address, result_sender) => {
153                    if let Some(peer) = sys::get_peer(
154                        &mut proxies,
155                        &address,
156                        std::time::Duration::from_secs(2),
157                        peer_cache.clone(),
158                    )
159                    .await?
160                    {
161                        result_sender.send(Ok(peer.id.unwrap())).unwrap();
162                        continue;
163                    }
164                    result_sender.send(Err(anyhow!("Peer not found"))).unwrap();
165                }
166                Request::Forget(peer_id, result_sender) => {
167                    result_sender.send(sys::forget(&proxies, &peer_id).await).unwrap();
168                }
169                Request::Connect(peer_id, result_sender) => {
170                    result_sender.send(sys::connect_peer(&proxies, &peer_id).await).unwrap();
171                }
172                Request::Disconnect(peer_id, result_sender) => {
173                    result_sender.send(sys::disconnect_peer(&proxies, &peer_id).await).unwrap();
174                }
175                Request::Pair(peer_id, options, result_sender) => {
176                    result_sender.send(sys::pair(&proxies, &peer_id, &options).await).unwrap();
177                }
178                Request::StartPairingDelegate(input_cap, output_cap, result_sender) => {
179                    result_sender
180                        .send(sys::start_pairing_delegate(&proxies, &input_cap, &output_cap).await)
181                        .unwrap();
182                }
183                Request::StopPairingDelegate(result_sender) => {
184                    result_sender.send(sys::stop_pairing_delegate(&proxies).await).unwrap();
185                }
186                Request::ConnectL2cap(peer_id, psm, result_sender) => {
187                    match bredr::connect_l2cap(&proxies, &peer_id, psm).await {
188                        Ok(channel) => {
189                            l2cap_channel = Some(channel);
190                            result_sender.send(Ok(())).unwrap();
191                        }
192                        Err(err) => {
193                            result_sender.send(Err(err)).unwrap();
194                        }
195                    }
196                }
197                Request::DisconnectL2cap(result_sender) => {
198                    if let Some(_channel) = l2cap_channel.take() {
199                        println!("L2CAP channel disconnected");
200                    }
201                    result_sender.send(Ok(())).unwrap();
202                }
203                Request::WriteL2cap(data, result_sender) => {
204                    if let Some(ref l2cap_channel) = l2cap_channel {
205                        match l2cap_channel.write(&data) {
206                            Ok(_) => result_sender.send(Ok(())).unwrap(),
207                            Err(err) => result_sender
208                                .send(Err(anyhow!("Failed to write to L2CAP channel: {}", err)))
209                                .unwrap(),
210                        }
211                    } else {
212                        result_sender.send(Err(anyhow!("L2CAP channel not connected"))).unwrap();
213                    }
214                }
215                Request::SetDiscovery(discovery, result_sender) => {
216                    result_sender.send(sys::set_discovery(&mut proxies, discovery).await).unwrap();
217                }
218                Request::SetDiscoverability(discoverable, result_sender) => {
219                    result_sender
220                        .send(sys::set_discoverability(&mut proxies, discoverable).await)
221                        .unwrap();
222                }
223                Request::SetConnectability(connectable, result_sender) => {
224                    result_sender
225                        .send(sys::set_connectability(&proxies, connectable).await)
226                        .unwrap();
227                }
228                Request::SetActiveHost(host_id, result_sender) => {
229                    result_sender.send(sys::set_active_host(&proxies, host_id).await).unwrap();
230                }
231                Request::SetLocalName(name, result_sender) => {
232                    result_sender.send(sys::set_local_name(&proxies, name)).unwrap();
233                }
234                Request::SetDeviceClass(device_class, result_sender) => {
235                    result_sender.send(sys::set_device_class(&proxies, device_class)).unwrap();
236                }
237                Request::StartLeScan(result_sender) => {
238                    result_sender
239                        .send(le::start_le_scan(&mut proxies, peer_cache.clone()).await)
240                        .unwrap();
241                }
242                Request::StopLeScan(result_sender) => {
243                    result_sender.send(le::stop_le_scan(&proxies)).unwrap();
244                }
245                Request::ConnectLe(peer_id, result_sender) => {
246                    result_sender.send(le::connect_le(&mut proxies, &peer_id).await).unwrap();
247                }
248                Request::AdvertisePeripheral(parameters, timeout, result_sender) => {
249                    match le::advertise_peripheral(&proxies, *parameters, timeout).await {
250                        Ok(Some((peer_id, connection))) => {
251                            _peripheral_connection = connection;
252                            result_sender.send(Ok(Some(peer_id))).unwrap();
253                        }
254                        result => {
255                            result_sender.send(result.map(|_| None)).unwrap();
256                        }
257                    }
258                }
259                Request::PublishService(uuid, service_handle, characteristics, result_sender) => {
260                    match gatt::publish_service(&proxies, uuid, service_handle, characteristics)
261                        .await
262                    {
263                        Ok(mut local_service_request_stream) => {
264                            fuchsia_async::Task::spawn(async move {
265                                while let Some(Ok(request)) =
266                                    local_service_request_stream.next().await
267                                {
268                                    // Just log the request for now.
269                                    println!("Received LocalService request: {:?}", request);
270                                }
271                            })
272                            .detach();
273                            result_sender.send(Ok(())).unwrap();
274                        }
275                        Err(err) => {
276                            result_sender.send(Err(err)).unwrap();
277                        }
278                    }
279                }
280                Request::DiscoverServices(result_sender) => {
281                    result_sender.send(gatt::discover_services(&mut proxies).await).unwrap();
282                }
283                Request::ReadCharacteristic(
284                    service_handle,
285                    characteristic_handle,
286                    result_sender,
287                ) => {
288                    result_sender
289                        .send(
290                            gatt::read_characteristic(
291                                &proxies,
292                                service_handle,
293                                characteristic_handle,
294                            )
295                            .await,
296                        )
297                        .unwrap();
298                }
299                Request::RegisterCharacteristicNotifier(
300                    service_handle,
301                    characteristic_handle,
302                    result_sender,
303                ) => {
304                    result_sender
305                        .send(
306                            gatt::register_characteristic_notifier(
307                                &proxies,
308                                service_handle,
309                                characteristic_handle,
310                            )
311                            .await,
312                        )
313                        .unwrap();
314                }
315                Request::AdvertiseService(psm, timeout, result_sender) => {
316                    match bredr::advertise_service(&proxies, psm).await {
317                        Ok(connection_receiver_stream) => {
318                            result_sender
319                                .send(
320                                    bredr::serve_connection_receiver(
321                                        connection_receiver_stream,
322                                        &mut l2cap_channel,
323                                        timeout,
324                                    )
325                                    .await,
326                                )
327                                .unwrap();
328                        }
329                        Err(err) => {
330                            result_sender.send(Err(err)).unwrap();
331                        }
332                    }
333                }
334                Request::Stop => break,
335            }
336        }
337
338        Ok(())
339    }
340
341    pub fn join(&self) -> Result<(), anyhow::Error> {
342        self.sender.clone().unbounded_send(Request::Stop).unwrap();
343        if let Err(err) =
344            self.thread_handle.lock().take().unwrap().join().expect("Failed to join work thread")
345        {
346            return Err(anyhow!("Work thread exited with error: {err}"));
347        }
348        Ok(())
349    }
350
351    // Get hosts.
352    pub async fn get_hosts(&self) -> Result<Vec<HostInfo>, anyhow::Error> {
353        let (sender, receiver) = oneshot::channel::<Result<Vec<HostInfo>, anyhow::Error>>();
354        self.sender.clone().unbounded_send(Request::GetHosts(sender))?;
355        receiver.await?
356    }
357
358    // Get identifier of peer at `address`.
359    pub async fn get_peer_id(&self, address: &CStr) -> Result<PeerId, anyhow::Error> {
360        let (sender, receiver) = oneshot::channel::<Result<PeerId, anyhow::Error>>();
361        self.sender.clone().unbounded_send(Request::GetPeerId(address.to_owned(), sender))?;
362        receiver.await?
363    }
364
365    pub async fn get_known_peers(&self) -> Result<Vec<Peer>, anyhow::Error> {
366        let (sender, receiver) = oneshot::channel::<Result<Vec<Peer>, anyhow::Error>>();
367        self.sender.clone().unbounded_send(Request::GetKnownPeers(sender))?;
368        receiver.await?
369    }
370
371    // Connect to peer with given identifier.
372    pub async fn connect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
373        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
374        self.sender.clone().unbounded_send(Request::Connect(peer_id, sender))?;
375        receiver.await?
376    }
377
378    // Disconnect all logical links (BR/EDR & LE) to peer with given identifier.
379    pub async fn disconnect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
380        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
381        self.sender.clone().unbounded_send(Request::Disconnect(peer_id, sender))?;
382        receiver.await?
383    }
384
385    // Initiate pairing with peer with given identifier.
386    // TODO(b/423700622): Add PairingDelegate server to bt-affordances.
387    pub async fn pair(
388        &self,
389        peer_id: PeerId,
390        options: PairingOptions,
391    ) -> Result<(), anyhow::Error> {
392        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
393        self.sender.clone().unbounded_send(Request::Pair(peer_id, options, sender))?;
394        receiver.await?
395    }
396
397    // Start a pairing delegate server.
398    //
399    // Calling this while a pairing delegate is already active drops and overwrites the existing
400    // delegate.
401    pub async fn start_pairing_delegate(
402        &self,
403        input_cap: InputCapability,
404        output_cap: OutputCapability,
405    ) -> Result<(), anyhow::Error> {
406        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
407        self.sender
408            .clone()
409            .unbounded_send(Request::StartPairingDelegate(input_cap, output_cap, sender))?;
410        receiver.await?
411    }
412
413    // Stop a pairing delegate server if one exists and was started by start_pairing_delegate.
414    // Returns false if no pairing delegate started by start_pairing_delegate is active.
415    pub async fn stop_pairing_delegate(&self) -> bool {
416        let (sender, receiver) = oneshot::channel::<bool>();
417        self.sender.clone().unbounded_send(Request::StopPairingDelegate(sender)).unwrap();
418        receiver.await.unwrap()
419    }
420
421    // Forget peer and delete all bonding information, if peer is found.
422    pub async fn forget_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
423        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
424        self.sender.clone().unbounded_send(Request::Forget(peer_id, sender))?;
425        receiver.await?
426    }
427
428    // Connect a basic L2CAP channel.
429    pub async fn connect_l2cap_channel(
430        &self,
431        peer_id: PeerId,
432        psm: u16,
433    ) -> Result<(), anyhow::Error> {
434        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
435        self.sender.clone().unbounded_send(Request::ConnectL2cap(peer_id, psm, sender))?;
436        receiver.await?
437    }
438
439    // Disconnect an L2CAP channel if one exists.
440    pub async fn disconnect_l2cap(&self) -> Result<(), anyhow::Error> {
441        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
442        self.sender.clone().unbounded_send(Request::DisconnectL2cap(sender))?;
443        receiver.await?
444    }
445
446    // Write data over the L2CAP channel if one exists.
447    pub async fn write_l2cap(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
448        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
449        self.sender.clone().unbounded_send(Request::WriteL2cap(data, sender))?;
450        receiver.await?
451    }
452
453    // Set discovery state.
454    pub async fn set_discovery(&self, discovery: bool) -> Result<(), anyhow::Error> {
455        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
456        self.sender.clone().unbounded_send(Request::SetDiscovery(discovery, sender))?;
457        receiver.await?
458    }
459
460    // Set discoverability state.
461    pub async fn set_discoverability(&self, discoverable: bool) -> Result<(), anyhow::Error> {
462        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
463        self.sender.clone().unbounded_send(Request::SetDiscoverability(discoverable, sender))?;
464        receiver.await?
465    }
466
467    // Set connection policy.
468    pub async fn set_connectability(&self, connectable: bool) -> Result<(), anyhow::Error> {
469        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
470        self.sender.clone().unbounded_send(Request::SetConnectability(connectable, sender))?;
471        receiver.await?
472    }
473
474    // Set active host.
475    pub async fn set_active_host(&self, host_id: HostId) -> Result<(), anyhow::Error> {
476        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
477        self.sender.clone().unbounded_send(Request::SetActiveHost(host_id, sender))?;
478        receiver.await?
479    }
480
481    // Set local name.
482    pub async fn set_local_name(&self, name: String) -> Result<(), anyhow::Error> {
483        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
484        self.sender.clone().unbounded_send(Request::SetLocalName(name, sender))?;
485        receiver.await?
486    }
487
488    // Set device class.
489    pub async fn set_device_class(&self, device_class: DeviceClass) -> Result<(), anyhow::Error> {
490        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
491        self.sender.clone().unbounded_send(Request::SetDeviceClass(device_class, sender))?;
492        receiver.await?
493    }
494
495    // Scan for all nearby LE peripherals and broadcasters. Returns the receiving end of an
496    // mpsc::channel through which LE peer updates are written. Dropping the receiver closes the
497    // channel, which stops the scan when the next update is received.
498    //
499    // Calling this while a scan is ongoing drops and overwrites the existing scan.
500    pub async fn start_le_scan(
501        &self,
502    ) -> Result<
503        mpsc::UnboundedReceiver<
504            Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
505        >,
506        anyhow::Error,
507    > {
508        let (sender, receiver) = oneshot::channel::<
509            Result<
510                mpsc::UnboundedReceiver<
511                    Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
512                >,
513                anyhow::Error,
514            >,
515        >();
516        self.sender.clone().unbounded_send(Request::StartLeScan(sender))?;
517        receiver.await?
518    }
519
520    // Stop an ongoing LE scan. Returns false if no scan is ongoing. If an ongoing scan is stopped,
521    // the mpsc::channel exposed to the client closes (i.e. `receiver.next()` returns None).
522    pub async fn stop_le_scan(&self) -> bool {
523        let (sender, receiver) = oneshot::channel::<bool>();
524        self.sender.clone().unbounded_send(Request::StopLeScan(sender)).unwrap();
525        receiver.await.unwrap()
526    }
527
528    // Connect an LE peer and store the connection.
529    pub async fn connect_le(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
530        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
531        self.sender.clone().unbounded_send(Request::ConnectLe(peer_id, sender))?;
532        receiver.await?
533    }
534
535    // Start advertising as an LE peripheral, accept the first connection, and return the PeerId of
536    // its initiator. If `connectable` is false, then advertise and return None.
537    pub async fn advertise_peripheral(
538        &self,
539        parameters: AdvertisingParameters,
540        timeout: std::time::Duration,
541    ) -> Result<Option<PeerId>, anyhow::Error> {
542        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
543        self.sender
544            .clone()
545            .unbounded_send(Request::AdvertisePeripheral(Box::new(parameters), timeout, sender))
546            .unwrap();
547        receiver.await?
548    }
549
550    // Publish a GATT service with the given parameters. GATT requests are logged.
551    pub async fn publish_service(
552        &self,
553        uuid: Uuid,
554        service_handle: ServiceHandle,
555        characteristics: Vec<Characteristic>,
556    ) -> Result<(), anyhow::Error> {
557        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
558        self.sender.clone().unbounded_send(Request::PublishService(
559            uuid,
560            service_handle,
561            characteristics,
562            sender,
563        ))?;
564        receiver.await?
565    }
566
567    // Discover the GATT services of the currently connected LE peer.
568    pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, anyhow::Error> {
569        let (sender, receiver) = oneshot::channel::<Result<Vec<ServiceInfo>, anyhow::Error>>();
570        self.sender.clone().unbounded_send(Request::DiscoverServices(sender))?;
571        receiver.await?
572    }
573
574    // Perform a short read of the GATT characteristic identified with the given handles.
575    pub async fn read_characteristic(
576        &self,
577        service_handle: ServiceHandle,
578        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
579    ) -> Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error> {
580        let (sender, receiver) =
581            oneshot::channel::<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>();
582        self.sender.clone().unbounded_send(Request::ReadCharacteristic(
583            service_handle,
584            characteristic_handle,
585            sender,
586        ))?;
587        receiver.await?
588    }
589
590    // Enable notifications/indications on the GATT characteristic with the given handles.
591    //
592    // Only one operation on a Remote Service can be pending at a time.
593    pub async fn register_characteristic_notifier(
594        &self,
595        service_handle: ServiceHandle,
596        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
597    ) -> Result<(), anyhow::Error> {
598        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
599        self.sender.clone().unbounded_send(Request::RegisterCharacteristicNotifier(
600            service_handle,
601            characteristic_handle,
602            sender,
603        ))?;
604        receiver.await?
605    }
606
607    // Advertise a BR/EDR service on the given `psm` until the first connection. Return the PeerId
608    // of that connection. If no connection is established before `timeout` elapses, return None.
609    pub async fn advertise_service(
610        &self,
611        psm: u16,
612        timeout: std::time::Duration,
613    ) -> Result<Option<PeerId>, anyhow::Error> {
614        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
615        self.sender.clone().unbounded_send(Request::AdvertiseService(psm, timeout, sender))?;
616        receiver.await?
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623
624    #[fuchsia::test]
625    fn test_update_peer_cache_handles_duplicates_in_input() {
626        let peer_cache = Arc::new(Mutex::new(Vec::new()));
627
628        let mut peer1 = Peer::default();
629        peer1.id = Some(PeerId { value: 1 });
630        peer1.name = Some("Peer 1".to_string());
631
632        let mut peer2 = Peer::default();
633        peer2.id = Some(PeerId { value: 1 });
634        peer2.name = Some("Peer 2".to_string());
635
636        // List of updated peers includes two entries with the same ID.
637        sys::update_peer_cache(peer_cache.clone(), vec![peer1, peer2.clone()], vec![]);
638
639        let cache = peer_cache.lock();
640
641        // The cache should only keep the final entry.
642        assert_eq!(cache.len(), 1);
643        assert_eq!(cache[0].name.as_deref(), Some("Peer 2"));
644    }
645
646    #[fuchsia::test]
647    fn test_update_peer_cache_replaces_existing_entry() {
648        let mut peer = Peer::default();
649        peer.id = Some(PeerId { value: 1 });
650        peer.name = Some("Peer".to_string());
651        let peer_cache = Arc::new(Mutex::new(vec![peer.clone()]));
652
653        // Update the peer currently inside the cache with a new name.
654        peer.name = Some("Updated peer".to_string());
655        sys::update_peer_cache(peer_cache.clone(), vec![peer], vec![]);
656
657        let cache = peer_cache.lock();
658
659        // The cache should only have one entry with the updated name.
660        assert_eq!(cache.len(), 1);
661        assert_eq!(cache[0].name.as_deref(), Some("Updated peer"));
662    }
663}