Skip to main content

fuchsia_bt_test_affordances/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::anyhow;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_bluetooth::{DeviceClass, HostId, PeerId, Uuid};
8use fidl_fuchsia_bluetooth_gatt2::{Characteristic, ServiceHandle, ServiceInfo};
9use fidl_fuchsia_bluetooth_le::{AdvertisingParameters, ConnectionMarker};
10use fidl_fuchsia_bluetooth_sys::{HostInfo, Peer};
11use fuchsia_async::LocalExecutor;
12use fuchsia_bluetooth::types::Channel;
13use fuchsia_sync::Mutex;
14use futures::StreamExt;
15use futures::channel::{mpsc, oneshot};
16use std::ffi::{CStr, CString};
17use std::sync::Arc;
18use std::thread;
19
20mod bredr;
21mod gatt;
22mod le;
23mod proxies;
24mod sys;
25
26use proxies::Proxies;
27
28// TODO(https://fxbug.dev/414848887): Return fidl_fuchsia_bluetooth_affordances::Error instead of
29// anyhow::Error.
30enum Request {
31    GetHosts(oneshot::Sender<Result<Vec<HostInfo>, anyhow::Error>>),
32    GetKnownPeers(oneshot::Sender<Result<Vec<Peer>, anyhow::Error>>),
33    GetPeerId(CString, oneshot::Sender<Result<PeerId, anyhow::Error>>),
34    ConnectL2cap(PeerId, u16, oneshot::Sender<Result<(), anyhow::Error>>),
35    DisconnectL2cap(oneshot::Sender<Result<(), anyhow::Error>>),
36    WriteL2cap(Vec<u8>, oneshot::Sender<Result<(), anyhow::Error>>),
37    SetDiscovery(bool, oneshot::Sender<Result<(), anyhow::Error>>),
38    SetDiscoverability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
39    SetConnectability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
40    SetActiveHost(HostId, oneshot::Sender<Result<(), anyhow::Error>>),
41    SetLocalName(String, oneshot::Sender<Result<(), anyhow::Error>>),
42    SetDeviceClass(DeviceClass, oneshot::Sender<Result<(), anyhow::Error>>),
43    StartLeScan(
44        futures::channel::mpsc::UnboundedSender<
45            Vec<fidl_fuchsia_bluetooth_affordances::ScannedPeer>,
46        >,
47        oneshot::Sender<Result<(), anyhow::Error>>,
48    ),
49    StopLeScan(oneshot::Sender<Result<(), anyhow::Error>>),
50    ConnectLe(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
51    AdvertisePeripheral(
52        Box<AdvertisingParameters>,
53        std::time::Duration,
54        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
55    ),
56    PublishService(
57        Uuid,
58        ServiceHandle,
59        Vec<Characteristic>,
60        oneshot::Sender<Result<(), anyhow::Error>>,
61    ),
62    DiscoverServices(oneshot::Sender<Result<Vec<ServiceInfo>, anyhow::Error>>),
63    ReadCharacteristic(
64        ServiceHandle,
65        fidl_fuchsia_bluetooth_gatt2::Handle,
66        oneshot::Sender<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>,
67    ),
68    RegisterCharacteristicNotifier(
69        ServiceHandle,
70        fidl_fuchsia_bluetooth_gatt2::Handle,
71        oneshot::Sender<Result<(), anyhow::Error>>,
72    ),
73    AdvertiseService(
74        u16,
75        std::time::Duration,
76        oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
77    ),
78    Stop,
79}
80
81pub struct WorkThread {
82    thread_handle: Mutex<Option<thread::JoinHandle<Result<(), anyhow::Error>>>>,
83    sender: mpsc::UnboundedSender<Request>,
84}
85
86impl WorkThread {
87    pub fn spawn() -> Self {
88        let (sender, receiver) = mpsc::unbounded::<Request>();
89
90        let thread_handle = thread::spawn(move || {
91            LocalExecutor::default().run_singlethreaded(Self::handle_requests(receiver))?;
92            Ok(())
93        });
94
95        Self { thread_handle: Mutex::new(Some(thread_handle)), sender }
96    }
97
98    async fn handle_requests(
99        mut receiver: mpsc::UnboundedReceiver<Request>,
100    ) -> Result<(), anyhow::Error> {
101        let mut proxies = Proxies::connect()?;
102        let mut host_cache: Vec<HostInfo> = Vec::new();
103        // TODO(https://fxbug.dev/396500079): Consider HashMap<PeerId, Peer> instead.
104        let peer_cache: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::new()));
105        // TODO(https://fxbug.dev/452075770): Support multiple L2CAP channels.
106        let mut l2cap_channel: Option<Channel> = None;
107        let mut _peripheral_connection: ClientEnd<ConnectionMarker>;
108
109        while let Some(request) = receiver.next().await {
110            match request {
111                Request::GetHosts(result_sender) => {
112                    if let Err(err) = sys::refresh_host_cache(&mut proxies, &mut host_cache).await {
113                        result_sender
114                            .send(Err(anyhow!("refresh_host_cache() error: {err}")))
115                            .unwrap();
116                        continue;
117                    }
118                    result_sender.send(Ok(host_cache.clone())).unwrap();
119                }
120                Request::GetKnownPeers(result_sender) => {
121                    if let Err(err) = sys::refresh_peer_cache(
122                        &mut proxies,
123                        std::time::Duration::from_millis(10),
124                        peer_cache.clone(),
125                    )
126                    .await
127                    {
128                        result_sender
129                            .send(Err(anyhow!("refresh_peer_cache() error: {err}")))
130                            .unwrap();
131                        continue;
132                    }
133                    result_sender.send(Ok(peer_cache.lock().clone())).unwrap();
134                }
135                Request::GetPeerId(address, result_sender) => {
136                    if let Some(peer) = sys::get_peer(
137                        &mut proxies,
138                        &address,
139                        std::time::Duration::from_secs(2),
140                        peer_cache.clone(),
141                    )
142                    .await?
143                    {
144                        result_sender.send(Ok(peer.id.unwrap())).unwrap();
145                        continue;
146                    }
147                    result_sender.send(Err(anyhow!("Peer not found"))).unwrap();
148                }
149                Request::ConnectL2cap(peer_id, psm, result_sender) => {
150                    match bredr::connect_l2cap(&proxies, &peer_id, psm).await {
151                        Ok(channel) => {
152                            l2cap_channel = Some(channel);
153                            result_sender.send(Ok(())).unwrap();
154                        }
155                        Err(err) => {
156                            result_sender.send(Err(err)).unwrap();
157                        }
158                    }
159                }
160                Request::DisconnectL2cap(result_sender) => {
161                    if let Some(_channel) = l2cap_channel.take() {
162                        println!("L2CAP channel disconnected");
163                    }
164                    result_sender.send(Ok(())).unwrap();
165                }
166                Request::WriteL2cap(data, result_sender) => {
167                    if let Some(ref l2cap_channel) = l2cap_channel {
168                        match l2cap_channel.write(&data) {
169                            Ok(_) => result_sender.send(Ok(())).unwrap(),
170                            Err(err) => result_sender
171                                .send(Err(anyhow!("Failed to write to L2CAP channel: {}", err)))
172                                .unwrap(),
173                        }
174                    } else {
175                        result_sender.send(Err(anyhow!("L2CAP channel not connected"))).unwrap();
176                    }
177                }
178                Request::SetDiscovery(discovery, result_sender) => {
179                    result_sender.send(sys::set_discovery(&mut proxies, discovery).await).unwrap();
180                }
181                Request::SetDiscoverability(discoverable, result_sender) => {
182                    result_sender
183                        .send(sys::set_discoverability(&mut proxies, discoverable).await)
184                        .unwrap();
185                }
186                Request::SetConnectability(connectable, result_sender) => {
187                    result_sender
188                        .send(sys::set_connectability(&proxies, connectable).await)
189                        .unwrap();
190                }
191                Request::SetActiveHost(host_id, result_sender) => {
192                    result_sender.send(sys::set_active_host(&proxies, host_id).await).unwrap();
193                }
194                Request::SetLocalName(name, result_sender) => {
195                    result_sender.send(sys::set_local_name(&proxies, name)).unwrap();
196                }
197                Request::SetDeviceClass(device_class, result_sender) => {
198                    result_sender.send(sys::set_device_class(&proxies, device_class)).unwrap();
199                }
200                Request::StartLeScan(sender, result_sender) => {
201                    result_sender.send(le::start_le_scan(&mut proxies, sender).await).unwrap();
202                }
203                Request::StopLeScan(result_sender) => {
204                    let stopped = le::stop_scan(&proxies);
205                    if stopped {
206                        result_sender.send(Ok(())).unwrap();
207                    } else {
208                        result_sender.send(Err(anyhow!("No scan ongoing"))).unwrap();
209                    }
210                }
211                Request::ConnectLe(peer_id, result_sender) => {
212                    result_sender.send(le::connect_le(&mut proxies, &peer_id).await).unwrap();
213                }
214                Request::AdvertisePeripheral(parameters, timeout, result_sender) => {
215                    match le::advertise_peripheral(&proxies, *parameters, timeout).await {
216                        Ok(Some((peer_id, connection))) => {
217                            _peripheral_connection = connection;
218                            result_sender.send(Ok(Some(peer_id))).unwrap();
219                        }
220                        result => {
221                            result_sender.send(result.map(|_| None)).unwrap();
222                        }
223                    }
224                }
225                Request::PublishService(uuid, service_handle, characteristics, result_sender) => {
226                    match gatt::publish_service(&proxies, uuid, service_handle, characteristics)
227                        .await
228                    {
229                        Ok(mut local_service_request_stream) => {
230                            fuchsia_async::Task::spawn(async move {
231                                while let Some(Ok(request)) =
232                                    local_service_request_stream.next().await
233                                {
234                                    // Just log the request for now.
235                                    println!("Received LocalService request: {:?}", request);
236                                }
237                            })
238                            .detach();
239                            result_sender.send(Ok(())).unwrap();
240                        }
241                        Err(err) => {
242                            result_sender.send(Err(err)).unwrap();
243                        }
244                    }
245                }
246                Request::DiscoverServices(result_sender) => {
247                    result_sender.send(gatt::discover_services(&mut proxies).await).unwrap();
248                }
249                Request::ReadCharacteristic(
250                    service_handle,
251                    characteristic_handle,
252                    result_sender,
253                ) => {
254                    result_sender
255                        .send(
256                            gatt::read_characteristic(
257                                &proxies,
258                                service_handle,
259                                characteristic_handle,
260                            )
261                            .await,
262                        )
263                        .unwrap();
264                }
265                Request::RegisterCharacteristicNotifier(
266                    service_handle,
267                    characteristic_handle,
268                    result_sender,
269                ) => {
270                    result_sender
271                        .send(
272                            gatt::register_characteristic_notifier(
273                                &proxies,
274                                service_handle,
275                                characteristic_handle,
276                            )
277                            .await,
278                        )
279                        .unwrap();
280                }
281                Request::AdvertiseService(psm, timeout, result_sender) => {
282                    match bredr::advertise_service(&proxies, psm).await {
283                        Ok(connection_receiver_stream) => {
284                            result_sender
285                                .send(
286                                    bredr::serve_connection_receiver(
287                                        connection_receiver_stream,
288                                        &mut l2cap_channel,
289                                        timeout,
290                                    )
291                                    .await,
292                                )
293                                .unwrap();
294                        }
295                        Err(err) => {
296                            result_sender.send(Err(err)).unwrap();
297                        }
298                    }
299                }
300                Request::Stop => break,
301            }
302        }
303
304        Ok(())
305    }
306
307    pub fn join(&self) -> Result<(), anyhow::Error> {
308        self.sender.clone().unbounded_send(Request::Stop).unwrap();
309        if let Err(err) =
310            self.thread_handle.lock().take().unwrap().join().expect("Failed to join work thread")
311        {
312            return Err(anyhow!("Work thread exited with error: {err}"));
313        }
314        Ok(())
315    }
316
317    // Get hosts.
318    pub async fn get_hosts(&self) -> Result<Vec<HostInfo>, anyhow::Error> {
319        let (sender, receiver) = oneshot::channel::<Result<Vec<HostInfo>, anyhow::Error>>();
320        self.sender.clone().unbounded_send(Request::GetHosts(sender))?;
321        receiver.await?
322    }
323
324    // Get identifier of peer at `address`.
325    pub async fn get_peer_id(&self, address: &CStr) -> Result<PeerId, anyhow::Error> {
326        let (sender, receiver) = oneshot::channel::<Result<PeerId, anyhow::Error>>();
327        self.sender.clone().unbounded_send(Request::GetPeerId(address.to_owned(), sender))?;
328        receiver.await?
329    }
330
331    pub async fn get_known_peers(&self) -> Result<Vec<Peer>, anyhow::Error> {
332        let (sender, receiver) = oneshot::channel::<Result<Vec<Peer>, anyhow::Error>>();
333        self.sender.clone().unbounded_send(Request::GetKnownPeers(sender))?;
334        receiver.await?
335    }
336
337    // Connect a basic L2CAP channel.
338    pub async fn connect_l2cap_channel(
339        &self,
340        peer_id: PeerId,
341        psm: u16,
342    ) -> Result<(), anyhow::Error> {
343        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
344        self.sender.clone().unbounded_send(Request::ConnectL2cap(peer_id, psm, sender))?;
345        receiver.await?
346    }
347
348    // Disconnect an L2CAP channel if one exists.
349    pub async fn disconnect_l2cap(&self) -> Result<(), anyhow::Error> {
350        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
351        self.sender.clone().unbounded_send(Request::DisconnectL2cap(sender))?;
352        receiver.await?
353    }
354
355    // Write data over the L2CAP channel if one exists.
356    pub async fn write_l2cap(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
357        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
358        self.sender.clone().unbounded_send(Request::WriteL2cap(data, sender))?;
359        receiver.await?
360    }
361
362    // Set discovery state.
363    pub async fn set_discovery(&self, discovery: bool) -> Result<(), anyhow::Error> {
364        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
365        self.sender.clone().unbounded_send(Request::SetDiscovery(discovery, sender))?;
366        receiver.await?
367    }
368
369    // Set discoverability state.
370    pub async fn set_discoverability(&self, discoverable: bool) -> Result<(), anyhow::Error> {
371        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
372        self.sender.clone().unbounded_send(Request::SetDiscoverability(discoverable, sender))?;
373        receiver.await?
374    }
375
376    // Set connection policy.
377    pub async fn set_connectability(&self, connectable: bool) -> Result<(), anyhow::Error> {
378        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
379        self.sender.clone().unbounded_send(Request::SetConnectability(connectable, sender))?;
380        receiver.await?
381    }
382
383    // Set active host.
384    pub async fn set_active_host(&self, host_id: HostId) -> Result<(), anyhow::Error> {
385        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
386        self.sender.clone().unbounded_send(Request::SetActiveHost(host_id, sender))?;
387        receiver.await?
388    }
389
390    // Set local name.
391    pub async fn set_local_name(&self, name: String) -> Result<(), anyhow::Error> {
392        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
393        self.sender.clone().unbounded_send(Request::SetLocalName(name, sender))?;
394        receiver.await?
395    }
396
397    // Set device class.
398    pub async fn set_device_class(&self, device_class: DeviceClass) -> Result<(), anyhow::Error> {
399        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
400        self.sender.clone().unbounded_send(Request::SetDeviceClass(device_class, sender))?;
401        receiver.await?
402    }
403
404    // Scan for nearby LE peripherals and broadcasters.
405    pub async fn start_le_scan(
406        &self,
407        sender: futures::channel::mpsc::UnboundedSender<
408            Vec<fidl_fuchsia_bluetooth_affordances::ScannedPeer>,
409        >,
410    ) -> Result<(), anyhow::Error> {
411        let (oneshot_sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
412        self.sender.clone().unbounded_send(Request::StartLeScan(sender, oneshot_sender))?;
413        receiver.await?
414    }
415
416    // Stop an ongoing LE scan. Returns an error if no scan is ongoing.
417    pub async fn stop_le_scan(&self) -> Result<(), anyhow::Error> {
418        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
419        self.sender.clone().unbounded_send(Request::StopLeScan(sender))?;
420        receiver.await?
421    }
422
423    // Connect an LE peer and store the connection.
424    pub async fn connect_le(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
425        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
426        self.sender.clone().unbounded_send(Request::ConnectLe(peer_id, sender))?;
427        receiver.await?
428    }
429
430    // Start advertising as an LE peripheral, accept the first connection, and return the PeerId of
431    // its initiator. If `connectable` is false, then advertise and return None.
432    pub async fn advertise_peripheral(
433        &self,
434        parameters: AdvertisingParameters,
435        timeout: std::time::Duration,
436    ) -> Result<Option<PeerId>, anyhow::Error> {
437        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
438        self.sender
439            .clone()
440            .unbounded_send(Request::AdvertisePeripheral(Box::new(parameters), timeout, sender))
441            .unwrap();
442        receiver.await?
443    }
444
445    // Publish a GATT service with the given parameters. GATT requests are logged.
446    pub async fn publish_service(
447        &self,
448        uuid: Uuid,
449        service_handle: ServiceHandle,
450        characteristics: Vec<Characteristic>,
451    ) -> Result<(), anyhow::Error> {
452        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
453        self.sender.clone().unbounded_send(Request::PublishService(
454            uuid,
455            service_handle,
456            characteristics,
457            sender,
458        ))?;
459        receiver.await?
460    }
461
462    // Discover the GATT services of the currently connected LE peer.
463    pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, anyhow::Error> {
464        let (sender, receiver) = oneshot::channel::<Result<Vec<ServiceInfo>, anyhow::Error>>();
465        self.sender.clone().unbounded_send(Request::DiscoverServices(sender))?;
466        receiver.await?
467    }
468
469    // Perform a short read of the GATT characteristic identified with the given handles.
470    pub async fn read_characteristic(
471        &self,
472        service_handle: ServiceHandle,
473        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
474    ) -> Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error> {
475        let (sender, receiver) =
476            oneshot::channel::<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>();
477        self.sender.clone().unbounded_send(Request::ReadCharacteristic(
478            service_handle,
479            characteristic_handle,
480            sender,
481        ))?;
482        receiver.await?
483    }
484
485    // Enable notifications/indications on the GATT characteristic with the given handles.
486    //
487    // Only one operation on a Remote Service can be pending at a time.
488    pub async fn register_characteristic_notifier(
489        &self,
490        service_handle: ServiceHandle,
491        characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
492    ) -> Result<(), anyhow::Error> {
493        let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
494        self.sender.clone().unbounded_send(Request::RegisterCharacteristicNotifier(
495            service_handle,
496            characteristic_handle,
497            sender,
498        ))?;
499        receiver.await?
500    }
501
502    // Advertise a BR/EDR service on the given `psm` until the first connection. Return the PeerId
503    // of that connection. If no connection is established before `timeout` elapses, return None.
504    pub async fn advertise_service(
505        &self,
506        psm: u16,
507        timeout: std::time::Duration,
508    ) -> Result<Option<PeerId>, anyhow::Error> {
509        let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
510        self.sender.clone().unbounded_send(Request::AdvertiseService(psm, timeout, sender))?;
511        receiver.await?
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[fuchsia::test]
520    fn test_update_peer_cache_handles_duplicates_in_input() {
521        let peer_cache = Arc::new(Mutex::new(Vec::new()));
522
523        let mut peer1 = Peer::default();
524        peer1.id = Some(PeerId { value: 1 });
525        peer1.name = Some("Peer 1".to_string());
526
527        let mut peer2 = Peer::default();
528        peer2.id = Some(PeerId { value: 1 });
529        peer2.name = Some("Peer 2".to_string());
530
531        // List of updated peers includes two entries with the same ID.
532        sys::update_peer_cache(peer_cache.clone(), vec![peer1, peer2.clone()], vec![]);
533
534        let cache = peer_cache.lock();
535
536        // The cache should only keep the final entry.
537        assert_eq!(cache.len(), 1);
538        assert_eq!(cache[0].name.as_deref(), Some("Peer 2"));
539    }
540
541    #[fuchsia::test]
542    fn test_update_peer_cache_replaces_existing_entry() {
543        let mut peer = Peer::default();
544        peer.id = Some(PeerId { value: 1 });
545        peer.name = Some("Peer".to_string());
546        let peer_cache = Arc::new(Mutex::new(vec![peer.clone()]));
547
548        // Update the peer currently inside the cache with a new name.
549        peer.name = Some("Updated peer".to_string());
550        sys::update_peer_cache(peer_cache.clone(), vec![peer], vec![]);
551
552        let cache = peer_cache.lock();
553
554        // The cache should only have one entry with the updated name.
555        assert_eq!(cache.len(), 1);
556        assert_eq!(cache[0].name.as_deref(), Some("Updated peer"));
557    }
558}