1use anyhow::anyhow;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_bluetooth::{DeviceClass, PeerId, Uuid};
8
9use fidl_fuchsia_bluetooth_gatt2::{Characteristic, ServiceHandle, ServiceInfo};
10use fidl_fuchsia_bluetooth_le::{AdvertisingParameters, ConnectionMarker};
11use fidl_fuchsia_bluetooth_sys::{
12 HostInfo, InputCapability, OutputCapability, PairingOptions, Peer,
13};
14use fuchsia_async::LocalExecutor;
15use fuchsia_bluetooth::types::Channel;
16
17use fuchsia_sync::Mutex;
18use futures::StreamExt;
19use futures::channel::{mpsc, oneshot};
20use std::ffi::{CStr, CString};
21use std::sync::Arc;
22use std::thread;
23
24mod bredr;
25mod gatt;
26mod le;
27mod proxies;
28mod sys;
29
30use proxies::Proxies;
31
32enum Request {
34 GetHosts(oneshot::Sender<Result<Vec<HostInfo>, anyhow::Error>>),
35 GetKnownPeers(oneshot::Sender<Result<Vec<Peer>, anyhow::Error>>),
36 GetPeerId(CString, oneshot::Sender<Result<PeerId, anyhow::Error>>),
37 Connect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
38 Disconnect(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
39 Pair(PeerId, PairingOptions, oneshot::Sender<Result<(), anyhow::Error>>),
40 StartPairingDelegate(
41 InputCapability,
42 OutputCapability,
43 oneshot::Sender<Result<(), anyhow::Error>>,
44 ),
45 StopPairingDelegate(oneshot::Sender<bool>),
46 Forget(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
47 ConnectL2cap(PeerId, u16, oneshot::Sender<Result<(), anyhow::Error>>),
48 DisconnectL2cap(oneshot::Sender<Result<(), anyhow::Error>>),
49 WriteL2cap(Vec<u8>, oneshot::Sender<Result<(), anyhow::Error>>),
50 SetDiscovery(bool, oneshot::Sender<Result<(), anyhow::Error>>),
51 SetDiscoverability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
52 SetConnectability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
53 SetLocalName(String, oneshot::Sender<Result<(), anyhow::Error>>),
54 SetDeviceClass(DeviceClass, oneshot::Sender<Result<(), anyhow::Error>>),
55 StartLeScan(
56 oneshot::Sender<
57 Result<
58 mpsc::UnboundedReceiver<
59 Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
60 >,
61 anyhow::Error,
62 >,
63 >,
64 ),
65 StopLeScan(oneshot::Sender<bool>),
66 ConnectLe(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
67 AdvertisePeripheral(
68 Box<AdvertisingParameters>,
69 std::time::Duration,
70 oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
71 ),
72 PublishService(
73 Uuid,
74 ServiceHandle,
75 Vec<Characteristic>,
76 oneshot::Sender<Result<(), anyhow::Error>>,
77 ),
78 DiscoverServices(oneshot::Sender<Result<Vec<ServiceInfo>, anyhow::Error>>),
79 ReadCharacteristic(
80 ServiceHandle,
81 fidl_fuchsia_bluetooth_gatt2::Handle,
82 oneshot::Sender<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>,
83 ),
84 RegisterCharacteristicNotifier(
85 ServiceHandle,
86 fidl_fuchsia_bluetooth_gatt2::Handle,
87 oneshot::Sender<Result<(), anyhow::Error>>,
88 ),
89 AdvertiseService(
90 u16,
91 std::time::Duration,
92 oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
93 ),
94 Stop,
95}
96
97pub struct WorkThread {
98 thread_handle: Mutex<Option<thread::JoinHandle<Result<(), anyhow::Error>>>>,
99 sender: mpsc::UnboundedSender<Request>,
100}
101
102impl WorkThread {
103 pub fn spawn() -> Self {
104 let (sender, receiver) = mpsc::unbounded::<Request>();
105
106 let thread_handle = thread::spawn(move || {
107 LocalExecutor::default().run_singlethreaded(Self::handle_requests(receiver))?;
108 Ok(())
109 });
110
111 Self { thread_handle: Mutex::new(Some(thread_handle)), sender }
112 }
113
114 async fn handle_requests(
115 mut receiver: mpsc::UnboundedReceiver<Request>,
116 ) -> Result<(), anyhow::Error> {
117 let mut proxies = Proxies::connect()?;
118 let mut host_cache: Vec<HostInfo> = Vec::new();
119 let peer_cache: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::new()));
121 let mut l2cap_channel: Option<Channel> = None;
123 let mut _peripheral_connection: ClientEnd<ConnectionMarker>;
124
125 while let Some(request) = receiver.next().await {
126 match request {
127 Request::GetHosts(result_sender) => {
128 if let Err(err) = sys::refresh_host_cache(&mut proxies, &mut host_cache).await {
129 result_sender
130 .send(Err(anyhow!("refresh_host_cache() error: {err}")))
131 .unwrap();
132 continue;
133 }
134 result_sender.send(Ok(host_cache.clone())).unwrap();
135 }
136 Request::GetKnownPeers(result_sender) => {
137 if let Err(err) = sys::refresh_peer_cache(
138 &mut proxies,
139 std::time::Duration::from_millis(10),
140 peer_cache.clone(),
141 )
142 .await
143 {
144 result_sender
145 .send(Err(anyhow!("refresh_peer_cache() error: {err}")))
146 .unwrap();
147 continue;
148 }
149 result_sender.send(Ok(peer_cache.lock().clone())).unwrap();
150 }
151 Request::GetPeerId(address, result_sender) => {
152 if let Some(peer) = sys::get_peer(
153 &mut proxies,
154 &address,
155 std::time::Duration::from_secs(2),
156 peer_cache.clone(),
157 )
158 .await?
159 {
160 result_sender.send(Ok(peer.id.unwrap())).unwrap();
161 continue;
162 }
163 result_sender.send(Err(anyhow!("Peer not found"))).unwrap();
164 }
165 Request::Forget(peer_id, result_sender) => {
166 result_sender.send(sys::forget(&proxies, &peer_id).await).unwrap();
167 }
168 Request::Connect(peer_id, result_sender) => {
169 result_sender.send(sys::connect_peer(&proxies, &peer_id).await).unwrap();
170 }
171 Request::Disconnect(peer_id, result_sender) => {
172 result_sender.send(sys::disconnect_peer(&proxies, &peer_id).await).unwrap();
173 }
174 Request::Pair(peer_id, options, result_sender) => {
175 result_sender.send(sys::pair(&proxies, &peer_id, &options).await).unwrap();
176 }
177 Request::StartPairingDelegate(input_cap, output_cap, result_sender) => {
178 result_sender
179 .send(sys::start_pairing_delegate(&proxies, &input_cap, &output_cap).await)
180 .unwrap();
181 }
182 Request::StopPairingDelegate(result_sender) => {
183 result_sender.send(sys::stop_pairing_delegate(&proxies).await).unwrap();
184 }
185 Request::ConnectL2cap(peer_id, psm, result_sender) => {
186 match bredr::connect_l2cap(&proxies, &peer_id, psm).await {
187 Ok(channel) => {
188 l2cap_channel = Some(channel);
189 result_sender.send(Ok(())).unwrap();
190 }
191 Err(err) => {
192 result_sender.send(Err(err)).unwrap();
193 }
194 }
195 }
196 Request::DisconnectL2cap(result_sender) => {
197 if let Some(_channel) = l2cap_channel.take() {
198 println!("L2CAP channel disconnected");
199 }
200 result_sender.send(Ok(())).unwrap();
201 }
202 Request::WriteL2cap(data, result_sender) => {
203 if let Some(ref l2cap_channel) = l2cap_channel {
204 match l2cap_channel.write(&data) {
205 Ok(_) => result_sender.send(Ok(())).unwrap(),
206 Err(err) => result_sender
207 .send(Err(anyhow!("Failed to write to L2CAP channel: {}", err)))
208 .unwrap(),
209 }
210 } else {
211 result_sender.send(Err(anyhow!("L2CAP channel not connected"))).unwrap();
212 }
213 }
214 Request::SetDiscovery(discovery, result_sender) => {
215 result_sender.send(sys::set_discovery(&mut proxies, discovery).await).unwrap();
216 }
217 Request::SetDiscoverability(discoverable, result_sender) => {
218 result_sender
219 .send(sys::set_discoverability(&mut proxies, discoverable).await)
220 .unwrap();
221 }
222 Request::SetConnectability(connectable, result_sender) => {
223 result_sender
224 .send(sys::set_connectability(&proxies, connectable).await)
225 .unwrap();
226 }
227 Request::SetLocalName(name, result_sender) => {
228 result_sender.send(sys::set_local_name(&proxies, name)).unwrap();
229 }
230 Request::SetDeviceClass(device_class, result_sender) => {
231 result_sender.send(sys::set_device_class(&proxies, device_class)).unwrap();
232 }
233 Request::StartLeScan(result_sender) => {
234 result_sender
235 .send(le::start_le_scan(&mut proxies, peer_cache.clone()).await)
236 .unwrap();
237 }
238 Request::StopLeScan(result_sender) => {
239 result_sender.send(le::stop_le_scan(&proxies)).unwrap();
240 }
241 Request::ConnectLe(peer_id, result_sender) => {
242 result_sender.send(le::connect_le(&mut proxies, &peer_id).await).unwrap();
243 }
244 Request::AdvertisePeripheral(parameters, timeout, result_sender) => {
245 match le::advertise_peripheral(&proxies, *parameters, timeout).await {
246 Ok(Some((peer_id, connection))) => {
247 _peripheral_connection = connection;
248 result_sender.send(Ok(Some(peer_id))).unwrap();
249 }
250 result => {
251 result_sender.send(result.map(|_| None)).unwrap();
252 }
253 }
254 }
255 Request::PublishService(uuid, service_handle, characteristics, result_sender) => {
256 match gatt::publish_service(&proxies, uuid, service_handle, characteristics)
257 .await
258 {
259 Ok(mut local_service_request_stream) => {
260 fuchsia_async::Task::spawn(async move {
261 while let Some(Ok(request)) =
262 local_service_request_stream.next().await
263 {
264 println!("Received LocalService request: {:?}", request);
266 }
267 })
268 .detach();
269 result_sender.send(Ok(())).unwrap();
270 }
271 Err(err) => {
272 result_sender.send(Err(err)).unwrap();
273 }
274 }
275 }
276 Request::DiscoverServices(result_sender) => {
277 result_sender.send(gatt::discover_services(&mut proxies).await).unwrap();
278 }
279 Request::ReadCharacteristic(
280 service_handle,
281 characteristic_handle,
282 result_sender,
283 ) => {
284 result_sender
285 .send(
286 gatt::read_characteristic(
287 &proxies,
288 service_handle,
289 characteristic_handle,
290 )
291 .await,
292 )
293 .unwrap();
294 }
295 Request::RegisterCharacteristicNotifier(
296 service_handle,
297 characteristic_handle,
298 result_sender,
299 ) => {
300 result_sender
301 .send(
302 gatt::register_characteristic_notifier(
303 &proxies,
304 service_handle,
305 characteristic_handle,
306 )
307 .await,
308 )
309 .unwrap();
310 }
311 Request::AdvertiseService(psm, timeout, result_sender) => {
312 match bredr::advertise_service(&proxies, psm).await {
313 Ok(connection_receiver_stream) => {
314 result_sender
315 .send(
316 bredr::serve_connection_receiver(
317 connection_receiver_stream,
318 &mut l2cap_channel,
319 timeout,
320 )
321 .await,
322 )
323 .unwrap();
324 }
325 Err(err) => {
326 result_sender.send(Err(err)).unwrap();
327 }
328 }
329 }
330 Request::Stop => break,
331 }
332 }
333
334 Ok(())
335 }
336
337 pub fn join(&self) -> Result<(), anyhow::Error> {
338 self.sender.clone().unbounded_send(Request::Stop).unwrap();
339 if let Err(err) =
340 self.thread_handle.lock().take().unwrap().join().expect("Failed to join work thread")
341 {
342 return Err(anyhow!("Work thread exited with error: {err}"));
343 }
344 Ok(())
345 }
346
347 pub async fn get_hosts(&self) -> Result<Vec<HostInfo>, anyhow::Error> {
349 let (sender, receiver) = oneshot::channel::<Result<Vec<HostInfo>, anyhow::Error>>();
350 self.sender.clone().unbounded_send(Request::GetHosts(sender))?;
351 receiver.await?
352 }
353
354 pub async fn get_peer_id(&self, address: &CStr) -> Result<PeerId, anyhow::Error> {
356 let (sender, receiver) = oneshot::channel::<Result<PeerId, anyhow::Error>>();
357 self.sender.clone().unbounded_send(Request::GetPeerId(address.to_owned(), sender))?;
358 receiver.await?
359 }
360
361 pub async fn get_known_peers(&self) -> Result<Vec<Peer>, anyhow::Error> {
362 let (sender, receiver) = oneshot::channel::<Result<Vec<Peer>, anyhow::Error>>();
363 self.sender.clone().unbounded_send(Request::GetKnownPeers(sender))?;
364 receiver.await?
365 }
366
367 pub async fn connect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
369 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
370 self.sender.clone().unbounded_send(Request::Connect(peer_id, sender))?;
371 receiver.await?
372 }
373
374 pub async fn disconnect_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
376 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
377 self.sender.clone().unbounded_send(Request::Disconnect(peer_id, sender))?;
378 receiver.await?
379 }
380
381 pub async fn pair(
384 &self,
385 peer_id: PeerId,
386 options: PairingOptions,
387 ) -> Result<(), anyhow::Error> {
388 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
389 self.sender.clone().unbounded_send(Request::Pair(peer_id, options, sender))?;
390 receiver.await?
391 }
392
393 pub async fn start_pairing_delegate(
398 &self,
399 input_cap: InputCapability,
400 output_cap: OutputCapability,
401 ) -> Result<(), anyhow::Error> {
402 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
403 self.sender
404 .clone()
405 .unbounded_send(Request::StartPairingDelegate(input_cap, output_cap, sender))?;
406 receiver.await?
407 }
408
409 pub async fn stop_pairing_delegate(&self) -> bool {
412 let (sender, receiver) = oneshot::channel::<bool>();
413 self.sender.clone().unbounded_send(Request::StopPairingDelegate(sender)).unwrap();
414 receiver.await.unwrap()
415 }
416
417 pub async fn forget_peer(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
419 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
420 self.sender.clone().unbounded_send(Request::Forget(peer_id, sender))?;
421 receiver.await?
422 }
423
424 pub async fn connect_l2cap_channel(
426 &self,
427 peer_id: PeerId,
428 psm: u16,
429 ) -> Result<(), anyhow::Error> {
430 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
431 self.sender.clone().unbounded_send(Request::ConnectL2cap(peer_id, psm, sender))?;
432 receiver.await?
433 }
434
435 pub async fn disconnect_l2cap(&self) -> Result<(), anyhow::Error> {
437 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
438 self.sender.clone().unbounded_send(Request::DisconnectL2cap(sender))?;
439 receiver.await?
440 }
441
442 pub async fn write_l2cap(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
444 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
445 self.sender.clone().unbounded_send(Request::WriteL2cap(data, sender))?;
446 receiver.await?
447 }
448
449 pub async fn set_discovery(&self, discovery: bool) -> Result<(), anyhow::Error> {
451 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
452 self.sender.clone().unbounded_send(Request::SetDiscovery(discovery, sender))?;
453 receiver.await?
454 }
455
456 pub async fn set_discoverability(&self, discoverable: bool) -> Result<(), anyhow::Error> {
458 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
459 self.sender.clone().unbounded_send(Request::SetDiscoverability(discoverable, sender))?;
460 receiver.await?
461 }
462
463 pub async fn set_connectability(&self, connectable: bool) -> Result<(), anyhow::Error> {
465 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
466 self.sender.clone().unbounded_send(Request::SetConnectability(connectable, sender))?;
467 receiver.await?
468 }
469
470 pub async fn set_local_name(&self, name: String) -> Result<(), anyhow::Error> {
472 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
473 self.sender.clone().unbounded_send(Request::SetLocalName(name, sender))?;
474 receiver.await?
475 }
476
477 pub async fn set_device_class(&self, device_class: DeviceClass) -> Result<(), anyhow::Error> {
479 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
480 self.sender.clone().unbounded_send(Request::SetDeviceClass(device_class, sender))?;
481 receiver.await?
482 }
483
484 pub async fn start_le_scan(
490 &self,
491 ) -> Result<
492 mpsc::UnboundedReceiver<
493 Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
494 >,
495 anyhow::Error,
496 > {
497 let (sender, receiver) = oneshot::channel::<
498 Result<
499 mpsc::UnboundedReceiver<
500 Vec<(fidl_fuchsia_bluetooth_le::Peer, Option<fidl_fuchsia_bluetooth::Address>)>,
501 >,
502 anyhow::Error,
503 >,
504 >();
505 self.sender.clone().unbounded_send(Request::StartLeScan(sender))?;
506 receiver.await?
507 }
508
509 pub async fn stop_le_scan(&self) -> bool {
512 let (sender, receiver) = oneshot::channel::<bool>();
513 self.sender.clone().unbounded_send(Request::StopLeScan(sender)).unwrap();
514 receiver.await.unwrap()
515 }
516
517 pub async fn connect_le(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
519 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
520 self.sender.clone().unbounded_send(Request::ConnectLe(peer_id, sender))?;
521 receiver.await?
522 }
523
524 pub async fn advertise_peripheral(
527 &self,
528 parameters: AdvertisingParameters,
529 timeout: std::time::Duration,
530 ) -> Result<Option<PeerId>, anyhow::Error> {
531 let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
532 self.sender
533 .clone()
534 .unbounded_send(Request::AdvertisePeripheral(Box::new(parameters), timeout, sender))
535 .unwrap();
536 receiver.await?
537 }
538
539 pub async fn publish_service(
541 &self,
542 uuid: Uuid,
543 service_handle: ServiceHandle,
544 characteristics: Vec<Characteristic>,
545 ) -> Result<(), anyhow::Error> {
546 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
547 self.sender.clone().unbounded_send(Request::PublishService(
548 uuid,
549 service_handle,
550 characteristics,
551 sender,
552 ))?;
553 receiver.await?
554 }
555
556 pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, anyhow::Error> {
558 let (sender, receiver) = oneshot::channel::<Result<Vec<ServiceInfo>, anyhow::Error>>();
559 self.sender.clone().unbounded_send(Request::DiscoverServices(sender))?;
560 receiver.await?
561 }
562
563 pub async fn read_characteristic(
565 &self,
566 service_handle: ServiceHandle,
567 characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
568 ) -> Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error> {
569 let (sender, receiver) =
570 oneshot::channel::<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>();
571 self.sender.clone().unbounded_send(Request::ReadCharacteristic(
572 service_handle,
573 characteristic_handle,
574 sender,
575 ))?;
576 receiver.await?
577 }
578
579 pub async fn register_characteristic_notifier(
583 &self,
584 service_handle: ServiceHandle,
585 characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
586 ) -> Result<(), anyhow::Error> {
587 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
588 self.sender.clone().unbounded_send(Request::RegisterCharacteristicNotifier(
589 service_handle,
590 characteristic_handle,
591 sender,
592 ))?;
593 receiver.await?
594 }
595
596 pub async fn advertise_service(
599 &self,
600 psm: u16,
601 timeout: std::time::Duration,
602 ) -> Result<Option<PeerId>, anyhow::Error> {
603 let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
604 self.sender.clone().unbounded_send(Request::AdvertiseService(psm, timeout, sender))?;
605 receiver.await?
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612
613 #[fuchsia::test]
614 fn test_update_peer_cache_handles_duplicates_in_input() {
615 let peer_cache = Arc::new(Mutex::new(Vec::new()));
616
617 let mut peer1 = Peer::default();
618 peer1.id = Some(PeerId { value: 1 });
619 peer1.name = Some("Peer 1".to_string());
620
621 let mut peer2 = Peer::default();
622 peer2.id = Some(PeerId { value: 1 });
623 peer2.name = Some("Peer 2".to_string());
624
625 sys::update_peer_cache(peer_cache.clone(), vec![peer1, peer2.clone()], vec![]);
627
628 let cache = peer_cache.lock();
629
630 assert_eq!(cache.len(), 1);
632 assert_eq!(cache[0].name.as_deref(), Some("Peer 2"));
633 }
634
635 #[fuchsia::test]
636 fn test_update_peer_cache_replaces_existing_entry() {
637 let mut peer = Peer::default();
638 peer.id = Some(PeerId { value: 1 });
639 peer.name = Some("Peer".to_string());
640 let peer_cache = Arc::new(Mutex::new(vec![peer.clone()]));
641
642 peer.name = Some("Updated peer".to_string());
644 sys::update_peer_cache(peer_cache.clone(), vec![peer], vec![]);
645
646 let cache = peer_cache.lock();
647
648 assert_eq!(cache.len(), 1);
650 assert_eq!(cache[0].name.as_deref(), Some("Updated peer"));
651 }
652}