1use anyhow::anyhow;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_bluetooth::{DeviceClass, HostId, PeerId, Uuid};
8use fidl_fuchsia_bluetooth_gatt2::{Characteristic, ServiceHandle, ServiceInfo};
9use fidl_fuchsia_bluetooth_le::{AdvertisingParameters, ConnectionMarker};
10use fidl_fuchsia_bluetooth_sys::{HostInfo, Peer};
11use fuchsia_async::LocalExecutor;
12use fuchsia_bluetooth::types::Channel;
13use fuchsia_sync::Mutex;
14use futures::StreamExt;
15use futures::channel::{mpsc, oneshot};
16use std::ffi::{CStr, CString};
17use std::sync::Arc;
18use std::thread;
19
20mod bredr;
21mod gatt;
22mod le;
23mod proxies;
24mod sys;
25
26use proxies::Proxies;
27
28enum Request {
31 GetHosts(oneshot::Sender<Result<Vec<HostInfo>, anyhow::Error>>),
32 GetKnownPeers(oneshot::Sender<Result<Vec<Peer>, anyhow::Error>>),
33 GetPeerId(CString, oneshot::Sender<Result<PeerId, anyhow::Error>>),
34 ConnectL2cap(PeerId, u16, oneshot::Sender<Result<(), anyhow::Error>>),
35 DisconnectL2cap(oneshot::Sender<Result<(), anyhow::Error>>),
36 WriteL2cap(Vec<u8>, oneshot::Sender<Result<(), anyhow::Error>>),
37 SetDiscovery(bool, oneshot::Sender<Result<(), anyhow::Error>>),
38 SetDiscoverability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
39 SetConnectability(bool, oneshot::Sender<Result<(), anyhow::Error>>),
40 SetActiveHost(HostId, oneshot::Sender<Result<(), anyhow::Error>>),
41 SetLocalName(String, oneshot::Sender<Result<(), anyhow::Error>>),
42 SetDeviceClass(DeviceClass, oneshot::Sender<Result<(), anyhow::Error>>),
43 StartLeScan(
44 futures::channel::mpsc::UnboundedSender<
45 Vec<fidl_fuchsia_bluetooth_affordances::ScannedPeer>,
46 >,
47 oneshot::Sender<Result<(), anyhow::Error>>,
48 ),
49 StopLeScan(oneshot::Sender<Result<(), anyhow::Error>>),
50 ConnectLe(PeerId, oneshot::Sender<Result<(), anyhow::Error>>),
51 AdvertisePeripheral(
52 Box<AdvertisingParameters>,
53 std::time::Duration,
54 oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
55 ),
56 PublishService(
57 Uuid,
58 ServiceHandle,
59 Vec<Characteristic>,
60 oneshot::Sender<Result<(), anyhow::Error>>,
61 ),
62 DiscoverServices(oneshot::Sender<Result<Vec<ServiceInfo>, anyhow::Error>>),
63 ReadCharacteristic(
64 ServiceHandle,
65 fidl_fuchsia_bluetooth_gatt2::Handle,
66 oneshot::Sender<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>,
67 ),
68 RegisterCharacteristicNotifier(
69 ServiceHandle,
70 fidl_fuchsia_bluetooth_gatt2::Handle,
71 oneshot::Sender<Result<(), anyhow::Error>>,
72 ),
73 AdvertiseService(
74 u16,
75 std::time::Duration,
76 oneshot::Sender<Result<Option<PeerId>, anyhow::Error>>,
77 ),
78 Stop,
79}
80
81pub struct WorkThread {
82 thread_handle: Mutex<Option<thread::JoinHandle<Result<(), anyhow::Error>>>>,
83 sender: mpsc::UnboundedSender<Request>,
84}
85
86impl WorkThread {
87 pub fn spawn() -> Self {
88 let (sender, receiver) = mpsc::unbounded::<Request>();
89
90 let thread_handle = thread::spawn(move || {
91 LocalExecutor::default().run_singlethreaded(Self::handle_requests(receiver))?;
92 Ok(())
93 });
94
95 Self { thread_handle: Mutex::new(Some(thread_handle)), sender }
96 }
97
98 async fn handle_requests(
99 mut receiver: mpsc::UnboundedReceiver<Request>,
100 ) -> Result<(), anyhow::Error> {
101 let mut proxies = Proxies::connect()?;
102 let mut host_cache: Vec<HostInfo> = Vec::new();
103 let peer_cache: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::new()));
105 let mut l2cap_channel: Option<Channel> = None;
107 let mut _peripheral_connection: ClientEnd<ConnectionMarker>;
108
109 while let Some(request) = receiver.next().await {
110 match request {
111 Request::GetHosts(result_sender) => {
112 if let Err(err) = sys::refresh_host_cache(&mut proxies, &mut host_cache).await {
113 result_sender
114 .send(Err(anyhow!("refresh_host_cache() error: {err}")))
115 .unwrap();
116 continue;
117 }
118 result_sender.send(Ok(host_cache.clone())).unwrap();
119 }
120 Request::GetKnownPeers(result_sender) => {
121 if let Err(err) = sys::refresh_peer_cache(
122 &mut proxies,
123 std::time::Duration::from_millis(10),
124 peer_cache.clone(),
125 )
126 .await
127 {
128 result_sender
129 .send(Err(anyhow!("refresh_peer_cache() error: {err}")))
130 .unwrap();
131 continue;
132 }
133 result_sender.send(Ok(peer_cache.lock().clone())).unwrap();
134 }
135 Request::GetPeerId(address, result_sender) => {
136 if let Some(peer) = sys::get_peer(
137 &mut proxies,
138 &address,
139 std::time::Duration::from_secs(2),
140 peer_cache.clone(),
141 )
142 .await?
143 {
144 result_sender.send(Ok(peer.id.unwrap())).unwrap();
145 continue;
146 }
147 result_sender.send(Err(anyhow!("Peer not found"))).unwrap();
148 }
149 Request::ConnectL2cap(peer_id, psm, result_sender) => {
150 match bredr::connect_l2cap(&proxies, &peer_id, psm).await {
151 Ok(channel) => {
152 l2cap_channel = Some(channel);
153 result_sender.send(Ok(())).unwrap();
154 }
155 Err(err) => {
156 result_sender.send(Err(err)).unwrap();
157 }
158 }
159 }
160 Request::DisconnectL2cap(result_sender) => {
161 if let Some(_channel) = l2cap_channel.take() {
162 println!("L2CAP channel disconnected");
163 }
164 result_sender.send(Ok(())).unwrap();
165 }
166 Request::WriteL2cap(data, result_sender) => {
167 if let Some(ref l2cap_channel) = l2cap_channel {
168 match l2cap_channel.write(&data) {
169 Ok(_) => result_sender.send(Ok(())).unwrap(),
170 Err(err) => result_sender
171 .send(Err(anyhow!("Failed to write to L2CAP channel: {}", err)))
172 .unwrap(),
173 }
174 } else {
175 result_sender.send(Err(anyhow!("L2CAP channel not connected"))).unwrap();
176 }
177 }
178 Request::SetDiscovery(discovery, result_sender) => {
179 result_sender.send(sys::set_discovery(&mut proxies, discovery).await).unwrap();
180 }
181 Request::SetDiscoverability(discoverable, result_sender) => {
182 result_sender
183 .send(sys::set_discoverability(&mut proxies, discoverable).await)
184 .unwrap();
185 }
186 Request::SetConnectability(connectable, result_sender) => {
187 result_sender
188 .send(sys::set_connectability(&proxies, connectable).await)
189 .unwrap();
190 }
191 Request::SetActiveHost(host_id, result_sender) => {
192 result_sender.send(sys::set_active_host(&proxies, host_id).await).unwrap();
193 }
194 Request::SetLocalName(name, result_sender) => {
195 result_sender.send(sys::set_local_name(&proxies, name)).unwrap();
196 }
197 Request::SetDeviceClass(device_class, result_sender) => {
198 result_sender.send(sys::set_device_class(&proxies, device_class)).unwrap();
199 }
200 Request::StartLeScan(sender, result_sender) => {
201 result_sender.send(le::start_le_scan(&mut proxies, sender).await).unwrap();
202 }
203 Request::StopLeScan(result_sender) => {
204 let stopped = le::stop_scan(&proxies);
205 if stopped {
206 result_sender.send(Ok(())).unwrap();
207 } else {
208 result_sender.send(Err(anyhow!("No scan ongoing"))).unwrap();
209 }
210 }
211 Request::ConnectLe(peer_id, result_sender) => {
212 result_sender.send(le::connect_le(&mut proxies, &peer_id).await).unwrap();
213 }
214 Request::AdvertisePeripheral(parameters, timeout, result_sender) => {
215 match le::advertise_peripheral(&proxies, *parameters, timeout).await {
216 Ok(Some((peer_id, connection))) => {
217 _peripheral_connection = connection;
218 result_sender.send(Ok(Some(peer_id))).unwrap();
219 }
220 result => {
221 result_sender.send(result.map(|_| None)).unwrap();
222 }
223 }
224 }
225 Request::PublishService(uuid, service_handle, characteristics, result_sender) => {
226 match gatt::publish_service(&proxies, uuid, service_handle, characteristics)
227 .await
228 {
229 Ok(mut local_service_request_stream) => {
230 fuchsia_async::Task::spawn(async move {
231 while let Some(Ok(request)) =
232 local_service_request_stream.next().await
233 {
234 println!("Received LocalService request: {:?}", request);
236 }
237 })
238 .detach();
239 result_sender.send(Ok(())).unwrap();
240 }
241 Err(err) => {
242 result_sender.send(Err(err)).unwrap();
243 }
244 }
245 }
246 Request::DiscoverServices(result_sender) => {
247 result_sender.send(gatt::discover_services(&mut proxies).await).unwrap();
248 }
249 Request::ReadCharacteristic(
250 service_handle,
251 characteristic_handle,
252 result_sender,
253 ) => {
254 result_sender
255 .send(
256 gatt::read_characteristic(
257 &proxies,
258 service_handle,
259 characteristic_handle,
260 )
261 .await,
262 )
263 .unwrap();
264 }
265 Request::RegisterCharacteristicNotifier(
266 service_handle,
267 characteristic_handle,
268 result_sender,
269 ) => {
270 result_sender
271 .send(
272 gatt::register_characteristic_notifier(
273 &proxies,
274 service_handle,
275 characteristic_handle,
276 )
277 .await,
278 )
279 .unwrap();
280 }
281 Request::AdvertiseService(psm, timeout, result_sender) => {
282 match bredr::advertise_service(&proxies, psm).await {
283 Ok(connection_receiver_stream) => {
284 result_sender
285 .send(
286 bredr::serve_connection_receiver(
287 connection_receiver_stream,
288 &mut l2cap_channel,
289 timeout,
290 )
291 .await,
292 )
293 .unwrap();
294 }
295 Err(err) => {
296 result_sender.send(Err(err)).unwrap();
297 }
298 }
299 }
300 Request::Stop => break,
301 }
302 }
303
304 Ok(())
305 }
306
307 pub fn join(&self) -> Result<(), anyhow::Error> {
308 self.sender.clone().unbounded_send(Request::Stop).unwrap();
309 if let Err(err) =
310 self.thread_handle.lock().take().unwrap().join().expect("Failed to join work thread")
311 {
312 return Err(anyhow!("Work thread exited with error: {err}"));
313 }
314 Ok(())
315 }
316
317 pub async fn get_hosts(&self) -> Result<Vec<HostInfo>, anyhow::Error> {
319 let (sender, receiver) = oneshot::channel::<Result<Vec<HostInfo>, anyhow::Error>>();
320 self.sender.clone().unbounded_send(Request::GetHosts(sender))?;
321 receiver.await?
322 }
323
324 pub async fn get_peer_id(&self, address: &CStr) -> Result<PeerId, anyhow::Error> {
326 let (sender, receiver) = oneshot::channel::<Result<PeerId, anyhow::Error>>();
327 self.sender.clone().unbounded_send(Request::GetPeerId(address.to_owned(), sender))?;
328 receiver.await?
329 }
330
331 pub async fn get_known_peers(&self) -> Result<Vec<Peer>, anyhow::Error> {
332 let (sender, receiver) = oneshot::channel::<Result<Vec<Peer>, anyhow::Error>>();
333 self.sender.clone().unbounded_send(Request::GetKnownPeers(sender))?;
334 receiver.await?
335 }
336
337 pub async fn connect_l2cap_channel(
339 &self,
340 peer_id: PeerId,
341 psm: u16,
342 ) -> Result<(), anyhow::Error> {
343 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
344 self.sender.clone().unbounded_send(Request::ConnectL2cap(peer_id, psm, sender))?;
345 receiver.await?
346 }
347
348 pub async fn disconnect_l2cap(&self) -> Result<(), anyhow::Error> {
350 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
351 self.sender.clone().unbounded_send(Request::DisconnectL2cap(sender))?;
352 receiver.await?
353 }
354
355 pub async fn write_l2cap(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
357 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
358 self.sender.clone().unbounded_send(Request::WriteL2cap(data, sender))?;
359 receiver.await?
360 }
361
362 pub async fn set_discovery(&self, discovery: bool) -> Result<(), anyhow::Error> {
364 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
365 self.sender.clone().unbounded_send(Request::SetDiscovery(discovery, sender))?;
366 receiver.await?
367 }
368
369 pub async fn set_discoverability(&self, discoverable: bool) -> Result<(), anyhow::Error> {
371 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
372 self.sender.clone().unbounded_send(Request::SetDiscoverability(discoverable, sender))?;
373 receiver.await?
374 }
375
376 pub async fn set_connectability(&self, connectable: bool) -> Result<(), anyhow::Error> {
378 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
379 self.sender.clone().unbounded_send(Request::SetConnectability(connectable, sender))?;
380 receiver.await?
381 }
382
383 pub async fn set_active_host(&self, host_id: HostId) -> Result<(), anyhow::Error> {
385 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
386 self.sender.clone().unbounded_send(Request::SetActiveHost(host_id, sender))?;
387 receiver.await?
388 }
389
390 pub async fn set_local_name(&self, name: String) -> Result<(), anyhow::Error> {
392 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
393 self.sender.clone().unbounded_send(Request::SetLocalName(name, sender))?;
394 receiver.await?
395 }
396
397 pub async fn set_device_class(&self, device_class: DeviceClass) -> Result<(), anyhow::Error> {
399 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
400 self.sender.clone().unbounded_send(Request::SetDeviceClass(device_class, sender))?;
401 receiver.await?
402 }
403
404 pub async fn start_le_scan(
406 &self,
407 sender: futures::channel::mpsc::UnboundedSender<
408 Vec<fidl_fuchsia_bluetooth_affordances::ScannedPeer>,
409 >,
410 ) -> Result<(), anyhow::Error> {
411 let (oneshot_sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
412 self.sender.clone().unbounded_send(Request::StartLeScan(sender, oneshot_sender))?;
413 receiver.await?
414 }
415
416 pub async fn stop_le_scan(&self) -> Result<(), anyhow::Error> {
418 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
419 self.sender.clone().unbounded_send(Request::StopLeScan(sender))?;
420 receiver.await?
421 }
422
423 pub async fn connect_le(&self, peer_id: PeerId) -> Result<(), anyhow::Error> {
425 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
426 self.sender.clone().unbounded_send(Request::ConnectLe(peer_id, sender))?;
427 receiver.await?
428 }
429
430 pub async fn advertise_peripheral(
433 &self,
434 parameters: AdvertisingParameters,
435 timeout: std::time::Duration,
436 ) -> Result<Option<PeerId>, anyhow::Error> {
437 let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
438 self.sender
439 .clone()
440 .unbounded_send(Request::AdvertisePeripheral(Box::new(parameters), timeout, sender))
441 .unwrap();
442 receiver.await?
443 }
444
445 pub async fn publish_service(
447 &self,
448 uuid: Uuid,
449 service_handle: ServiceHandle,
450 characteristics: Vec<Characteristic>,
451 ) -> Result<(), anyhow::Error> {
452 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
453 self.sender.clone().unbounded_send(Request::PublishService(
454 uuid,
455 service_handle,
456 characteristics,
457 sender,
458 ))?;
459 receiver.await?
460 }
461
462 pub async fn discover_services(&self) -> Result<Vec<ServiceInfo>, anyhow::Error> {
464 let (sender, receiver) = oneshot::channel::<Result<Vec<ServiceInfo>, anyhow::Error>>();
465 self.sender.clone().unbounded_send(Request::DiscoverServices(sender))?;
466 receiver.await?
467 }
468
469 pub async fn read_characteristic(
471 &self,
472 service_handle: ServiceHandle,
473 characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
474 ) -> Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error> {
475 let (sender, receiver) =
476 oneshot::channel::<Result<fidl_fuchsia_bluetooth_gatt2::ReadValue, anyhow::Error>>();
477 self.sender.clone().unbounded_send(Request::ReadCharacteristic(
478 service_handle,
479 characteristic_handle,
480 sender,
481 ))?;
482 receiver.await?
483 }
484
485 pub async fn register_characteristic_notifier(
489 &self,
490 service_handle: ServiceHandle,
491 characteristic_handle: fidl_fuchsia_bluetooth_gatt2::Handle,
492 ) -> Result<(), anyhow::Error> {
493 let (sender, receiver) = oneshot::channel::<Result<(), anyhow::Error>>();
494 self.sender.clone().unbounded_send(Request::RegisterCharacteristicNotifier(
495 service_handle,
496 characteristic_handle,
497 sender,
498 ))?;
499 receiver.await?
500 }
501
502 pub async fn advertise_service(
505 &self,
506 psm: u16,
507 timeout: std::time::Duration,
508 ) -> Result<Option<PeerId>, anyhow::Error> {
509 let (sender, receiver) = oneshot::channel::<Result<Option<PeerId>, anyhow::Error>>();
510 self.sender.clone().unbounded_send(Request::AdvertiseService(psm, timeout, sender))?;
511 receiver.await?
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[fuchsia::test]
520 fn test_update_peer_cache_handles_duplicates_in_input() {
521 let peer_cache = Arc::new(Mutex::new(Vec::new()));
522
523 let mut peer1 = Peer::default();
524 peer1.id = Some(PeerId { value: 1 });
525 peer1.name = Some("Peer 1".to_string());
526
527 let mut peer2 = Peer::default();
528 peer2.id = Some(PeerId { value: 1 });
529 peer2.name = Some("Peer 2".to_string());
530
531 sys::update_peer_cache(peer_cache.clone(), vec![peer1, peer2.clone()], vec![]);
533
534 let cache = peer_cache.lock();
535
536 assert_eq!(cache.len(), 1);
538 assert_eq!(cache[0].name.as_deref(), Some("Peer 2"));
539 }
540
541 #[fuchsia::test]
542 fn test_update_peer_cache_replaces_existing_entry() {
543 let mut peer = Peer::default();
544 peer.id = Some(PeerId { value: 1 });
545 peer.name = Some("Peer".to_string());
546 let peer_cache = Arc::new(Mutex::new(vec![peer.clone()]));
547
548 peer.name = Some("Updated peer".to_string());
550 sys::update_peer_cache(peer_cache.clone(), vec![peer], vec![]);
551
552 let cache = peer_cache.lock();
553
554 assert_eq!(cache.len(), 1);
556 assert_eq!(cache[0].name.as_deref(), Some("Updated peer"));
557 }
558}