1use bt_common::generic_audio::ContextType;
33use bt_gatt::server::LocalService;
34use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
35use bt_gatt::types::{GattError, Handle};
36use bt_gatt::Server as _;
37
38use futures::task::{Poll, Waker};
39use futures::{Future, Stream};
40use pin_project::pin_project;
41use std::collections::HashMap;
42use thiserror::Error;
43
44use crate::{
45 AudioLocations, AvailableAudioContexts, AvailableContexts, PacRecord, SinkAudioLocations,
46 SourceAudioLocations, SupportedAudioContexts,
47};
48
49pub(crate) mod types;
50use crate::server::types::*;
51
52#[pin_project(project = LocalServiceProj)]
53enum LocalServiceState<T: bt_gatt::ServerTypes> {
54 NotPublished {
55 waker: Option<Waker>,
56 },
57 Preparing {
58 #[pin]
59 fut: T::LocalServiceFut,
60 },
61 Published {
62 service: T::LocalService,
63 #[pin]
64 events: T::ServiceEventStream,
65 },
66 Terminated,
67}
68
69impl<T: bt_gatt::ServerTypes> Default for LocalServiceState<T> {
70 fn default() -> Self {
71 Self::NotPublished { waker: None }
72 }
73}
74
75#[derive(Debug, Error)]
76pub enum Error {
77 #[error("Service is already published")]
78 AlreadyPublished,
79 #[error("Issue publishing service: {0}")]
80 PublishError(#[from] bt_gatt::types::Error),
81 #[error("Service should support at least one of Sink or Source PAC characteristics")]
82 MissingPac,
83 #[error("Available audio contexts are not supported: {0:?}")]
84 UnsupportedAudioContexts(Vec<ContextType>),
85}
86
87impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> {
88 type Item = Result<bt_gatt::server::ServiceEvent<T>, Error>;
89
90 fn poll_next(
91 mut self: std::pin::Pin<&mut Self>,
92 cx: &mut std::task::Context<'_>,
93 ) -> Poll<Option<Self::Item>> {
94 loop {
100 match self.as_mut().project() {
101 LocalServiceProj::Terminated => return Poll::Ready(None),
102 LocalServiceProj::NotPublished { .. } => {
103 self.as_mut()
104 .set(LocalServiceState::NotPublished { waker: Some(cx.waker().clone()) });
105 return Poll::Pending;
106 }
107 LocalServiceProj::Preparing { fut } => {
108 let service_result = futures::ready!(fut.poll(cx));
109 let Ok(service) = service_result else {
110 return Poll::Ready(Some(Err(Error::PublishError(
111 service_result.err().unwrap(),
112 ))));
113 };
114 let events = service.publish();
115 self.as_mut().set(LocalServiceState::Published { service, events });
116 continue;
117 }
118 LocalServiceProj::Published { service: _, events } => {
119 let item = futures::ready!(events.poll_next(cx));
120 let Some(gatt_result) = item else {
121 self.as_mut().set(LocalServiceState::Terminated);
122 return Poll::Ready(None);
123 };
124 let Ok(event) = gatt_result else {
125 self.as_mut().set(LocalServiceState::Terminated);
126 return Poll::Ready(Some(Err(Error::PublishError(
127 gatt_result.err().unwrap(),
128 ))));
129 };
130 return Poll::Ready(Some(Ok(event)));
131 }
132 }
133 }
134 }
135}
136
137impl<T: bt_gatt::ServerTypes> LocalServiceState<T> {
138 fn is_published(&self) -> bool {
139 if let LocalServiceState::NotPublished { .. } = self {
140 false
141 } else {
142 true
143 }
144 }
145}
146
147#[derive(Default)]
148pub struct ServerBuilder {
149 source_pacs: Vec<Vec<PacRecord>>,
150 source_audio_locations: Option<AudioLocations>,
151 sink_pacs: Vec<Vec<PacRecord>>,
152 sink_audio_locations: Option<AudioLocations>,
153}
154
155impl ServerBuilder {
156 pub fn new() -> ServerBuilder {
157 ServerBuilder::default()
158 }
159
160 pub fn add_source(mut self, capabilities: Vec<PacRecord>) -> Self {
165 if !capabilities.is_empty() {
166 self.source_pacs.push(capabilities);
167 }
168 self
169 }
170
171 pub fn set_source_locations(mut self, audio_locations: AudioLocations) -> Self {
174 self.source_audio_locations = Some(audio_locations);
175 self
176 }
177
178 pub fn add_sink(mut self, capabilities: Vec<PacRecord>) -> Self {
183 if !capabilities.is_empty() {
184 self.sink_pacs.push(capabilities);
185 }
186 self
187 }
188
189 pub fn set_sink_locations(mut self, audio_locations: AudioLocations) -> Self {
192 self.sink_audio_locations = Some(audio_locations);
193 self
194 }
195
196 fn verify_characteristics(
197 &self,
198 supported: &AudioContexts,
199 available: &AudioContexts,
200 ) -> Result<(), Error> {
201 let diff: Vec<ContextType> = available.sink.difference(&supported.sink).cloned().collect();
205 if diff.len() != 0 {
206 return Err(Error::UnsupportedAudioContexts(diff));
207 }
208 let diff: Vec<ContextType> =
209 available.source.difference(&supported.source).cloned().collect();
210 if diff.len() != 0 {
211 return Err(Error::UnsupportedAudioContexts(diff));
212 }
213
214 if self.source_pacs.len() == 0 && self.sink_pacs.len() == 0 {
216 return Err(Error::MissingPac);
217 }
218 Ok(())
219 }
220
221 pub fn build<T>(
224 mut self,
225 mut supported: AudioContexts,
226 available: AudioContexts,
227 ) -> Result<Server<T>, Error>
228 where
229 T: bt_gatt::ServerTypes,
230 {
231 let _ = self.verify_characteristics(&supported, &available)?;
232
233 let mut service_def = ServiceDefinition::new(
234 bt_gatt::server::ServiceId::new(1),
235 crate::PACS_UUID,
236 bt_gatt::types::ServiceKind::Primary,
237 );
238
239 let supported = SupportedAudioContexts {
240 handle: SUPPORTED_AUDIO_CONTEXTS_HANDLE,
241 sink: supported.sink.drain().collect(),
242 source: supported.source.drain().collect(),
243 };
244 let _ = service_def.add_characteristic((&supported).into());
245
246 let available = AvailableAudioContexts {
247 handle: AVAILABLE_AUDIO_CONTEXTS_HANDLE,
248 sink: (&available.sink).into(),
249 source: (&available.source).into(),
250 };
251 let _ = service_def.add_characteristic((&available).into());
252
253 let mut next_handle_iter = (HANDLE_OFFSET..).map(|x| Handle(x));
254 let mut audio_capabilities = HashMap::new();
255
256 let sink_audio_locations = match self.sink_audio_locations.take() {
259 Some(locations) if self.sink_pacs.len() > 0 => {
260 let sink =
261 SinkAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
262 let _ = service_def.add_characteristic((&sink).into());
263 Some(sink)
264 }
265 _ => None,
266 };
267 for capabilities in self.sink_pacs.drain(..) {
268 let handle = next_handle_iter.next().unwrap();
269 let pac = PublishedAudioCapability::new_sink(handle, capabilities);
270 let _ = service_def.add_characteristic((&pac).into());
271 audio_capabilities.insert(handle, pac);
272 }
273
274 let source_audio_locations = match self.source_audio_locations.take() {
277 Some(locations) if self.source_pacs.len() > 0 => {
278 let source =
279 SourceAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
280 let _ = service_def.add_characteristic((&source).into());
281 Some(source)
282 }
283 _ => None,
284 };
285 for capabilities in self.source_pacs.drain(..) {
286 let handle = next_handle_iter.next().unwrap();
287 let pac = PublishedAudioCapability::new_source(handle, capabilities);
288 let _ = service_def.add_characteristic((&pac).into());
289 audio_capabilities.insert(handle, pac);
290 }
291
292 let server = Server {
293 service_def,
294 local_service: Default::default(),
295 published_audio_capabilities: audio_capabilities,
296 source_audio_locations,
297 sink_audio_locations,
298 available_audio_contexts: available,
299 supported_audio_contexts: supported,
300 };
301 Ok(server)
302 }
303}
304
305#[pin_project]
306pub struct Server<T: bt_gatt::ServerTypes> {
307 service_def: ServiceDefinition,
308 #[pin]
309 local_service: LocalServiceState<T>,
310 published_audio_capabilities: HashMap<Handle, PublishedAudioCapability>,
311 source_audio_locations: Option<SourceAudioLocations>,
312 sink_audio_locations: Option<SinkAudioLocations>,
313 available_audio_contexts: AvailableAudioContexts,
314 supported_audio_contexts: SupportedAudioContexts,
315}
316
317impl<T: bt_gatt::ServerTypes> Server<T> {
318 pub fn publish(&mut self, server: T::Server) -> Result<(), Error> {
319 if self.local_service.is_published() {
320 return Err(Error::AlreadyPublished);
321 }
322
323 let LocalServiceState::NotPublished { waker } = std::mem::replace(
324 &mut self.local_service,
325 LocalServiceState::Preparing { fut: server.prepare(self.service_def.clone()) },
326 ) else {
327 unreachable!();
328 };
329 waker.map(Waker::wake);
330 Ok(())
331 }
332
333 fn is_source_locations_handle(&self, handle: Handle) -> bool {
334 self.source_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
335 }
336
337 fn is_sink_locations_handle(&self, handle: Handle) -> bool {
338 self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
339 }
340
341 pub fn update_available(&mut self, available: AudioContexts) -> &AvailableAudioContexts {
348 let supported = &self.supported_audio_contexts;
349 let new_sink =
350 AvailableContexts::from_iter(available.sink.intersection(&supported.sink).cloned());
351 let new_source =
352 AvailableContexts::from_iter(available.source.intersection(&supported.source).cloned());
353
354 let changed = self.available_audio_contexts.sink != new_sink
355 || self.available_audio_contexts.source != new_source;
356
357 self.available_audio_contexts.sink = new_sink;
358 self.available_audio_contexts.source = new_source;
359
360 if !changed {
361 return &self.available_audio_contexts;
362 }
363 if let LocalServiceState::Published { service, .. } = &self.local_service {
364 service.notify(
365 &AVAILABLE_AUDIO_CONTEXTS_HANDLE,
366 &self.available_audio_contexts.into_char_value(),
367 &[],
368 );
369 }
370 &self.available_audio_contexts
371 }
372}
373
374impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
375 type Item = Result<(), Error>;
376
377 fn poll_next(
378 mut self: std::pin::Pin<&mut Self>,
379 cx: &mut std::task::Context<'_>,
380 ) -> std::task::Poll<Option<Self::Item>> {
381 loop {
382 let mut this = self.as_mut().project();
383 let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) {
384 None => return Poll::Ready(None),
385 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
386 Some(Ok(event)) => event,
387 };
388 use bt_gatt::server::ServiceEvent::*;
389 match gatt_event {
390 Read { handle, offset, responder, .. } => {
391 let offset = offset as usize;
392 let value = match handle {
393 x if x == AVAILABLE_AUDIO_CONTEXTS_HANDLE => {
394 self.available_audio_contexts.into_char_value()
395 }
396 x if x == SUPPORTED_AUDIO_CONTEXTS_HANDLE => {
397 self.supported_audio_contexts.into_char_value()
398 }
399 x if self.is_source_locations_handle(x) => {
400 self.source_audio_locations.as_ref().unwrap().into_char_value()
401 }
402 x if self.is_sink_locations_handle(x) => {
403 self.sink_audio_locations.as_ref().unwrap().into_char_value()
404 }
405 pac_handle => {
406 let Some(ref pac) = self.published_audio_capabilities.get(&pac_handle)
407 else {
408 responder.error(GattError::InvalidHandle);
409 continue;
410 };
411 pac.encode()
412 }
413 };
414 responder.respond(&value[offset..]);
415 continue;
416 }
417 Write { responder, .. } => {
419 responder.error(GattError::WriteNotPermitted);
420 continue;
421 }
422 ClientConfiguration { .. } => continue,
424 _ => continue,
425 }
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 use bt_common::core::{CodecId, CodingFormat};
435 use bt_common::generic_audio::codec_capabilities::*;
436 use bt_common::generic_audio::AudioLocation;
437 use bt_common::PeerId;
438 use bt_gatt::server;
439 use bt_gatt::server::NotificationType;
440 use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
441 use bt_gatt::types::ServiceKind;
442
443 use futures::{FutureExt, StreamExt};
444 use std::collections::HashSet;
445
446 use crate::AvailableContexts;
447
448 fn default_server_builder() -> ServerBuilder {
452 ServerBuilder::new()
453 .add_sink(vec![PacRecord {
454 codec_id: CodecId::Assigned(CodingFormat::ALawLog),
455 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
456 FrameDurationSupport::BothNoPreference,
457 )],
458 metadata: vec![],
459 }])
460 .set_sink_locations(AudioLocations {
461 locations: HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight]),
462 })
463 .add_source(vec![
464 PacRecord {
465 codec_id: CodecId::Assigned(CodingFormat::ALawLog),
466 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
467 FrameDurationSupport::BothNoPreference,
468 )],
469 metadata: vec![],
470 },
471 PacRecord {
472 codec_id: CodecId::Assigned(CodingFormat::MuLawLog),
473 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
474 FrameDurationSupport::BothNoPreference,
475 )],
476 metadata: vec![],
477 },
478 ])
479 .add_source(vec![])
480 }
481
482 #[test]
483 fn build_server() {
484 let server = default_server_builder()
485 .build::<FakeTypes>(
486 AudioContexts::new(
487 HashSet::from([ContextType::Conversational, ContextType::Media]),
488 HashSet::from([ContextType::Media]),
489 ),
490 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
491 )
492 .expect("should succeed");
493 assert_eq!(server.published_audio_capabilities.len(), 2);
494
495 assert_eq!(server.supported_audio_contexts.handle.0, 1);
496 assert_eq!(
497 server.supported_audio_contexts.sink,
498 HashSet::from([ContextType::Conversational, ContextType::Media])
499 );
500 assert_eq!(server.supported_audio_contexts.source, HashSet::from([ContextType::Media]));
501
502 assert_eq!(server.available_audio_contexts.handle.0, 2);
503 assert_eq!(
504 server.available_audio_contexts.sink,
505 AvailableContexts::Available(HashSet::from([ContextType::Media]))
506 );
507 assert_eq!(server.available_audio_contexts.source, AvailableContexts::NotAvailable);
508
509 let location_char = server.sink_audio_locations.as_ref().expect("should exist");
511 assert_eq!(location_char.handle.0, 3);
512 assert_eq!(
513 location_char.locations.locations,
514 HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight])
515 );
516
517 let mut sink_iter =
518 server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_sink());
519 let sink_char = sink_iter.next().expect("should exist");
520 assert_eq!(sink_char.0, &Handle(4));
521 assert_eq!(sink_char.1.pac_records().len(), 1);
522 assert!(sink_iter.next().is_none());
523
524 assert!(server.source_audio_locations.is_none());
526 let mut source_iter =
527 server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_source());
528 let source_char = source_iter.next().expect("should exist");
529 assert_eq!(source_char.0, &Handle(5));
530 assert_eq!(source_char.1.pac_records().len(), 2);
531 assert_eq!(source_iter.next(), None);
532 }
533
534 #[test]
535 fn build_server_error() {
536 assert!(ServerBuilder::new()
538 .build::<FakeTypes>(
539 AudioContexts::new(
540 HashSet::from([ContextType::Conversational, ContextType::Media]),
541 HashSet::from([ContextType::Media]),
542 ),
543 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
544 )
545 .is_err());
546
547 assert!(default_server_builder()
549 .build::<FakeTypes>(
550 AudioContexts::new(
551 HashSet::from([ContextType::Conversational, ContextType::Media]),
552 HashSet::from([ContextType::Media]),
553 ),
554 AudioContexts::new(HashSet::from([ContextType::Alerts]), HashSet::new()),
555 )
556 .is_err());
557
558 assert!(default_server_builder()
560 .build::<FakeTypes>(
561 AudioContexts::new(
562 HashSet::from([ContextType::Conversational, ContextType::Media]),
563 HashSet::from([ContextType::Media]),
564 ),
565 AudioContexts::new(HashSet::from([]), HashSet::from([ContextType::EmergencyAlarm])),
566 )
567 .is_err());
568 }
569
570 #[test]
571 fn publish_server() {
572 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
573
574 let mut server = default_server_builder()
575 .build::<FakeTypes>(
576 AudioContexts::new(
577 HashSet::from([ContextType::Media]),
578 HashSet::from([ContextType::Media]),
579 ),
580 AudioContexts::new(HashSet::new(), HashSet::new()),
581 )
582 .unwrap();
583
584 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
586 panic!("Should be pending");
587 };
588
589 let (fake_gatt_server, mut event_receiver) = FakeServer::new();
590
591 let mut event_stream = event_receiver.next();
593 let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
594 panic!("Should be pending");
595 };
596
597 let _ = server.publish(fake_gatt_server).expect("should succeed");
598
599 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
601 panic!("Should be pending");
602 };
603
604 let Poll::Ready(Some(FakeServerEvent::Published { id, definition })) =
606 event_stream.poll_unpin(&mut noop_cx)
607 else {
608 panic!("Should be published");
609 };
610 assert_eq!(id, server::ServiceId::new(1));
611 assert_eq!(definition.characteristics().collect::<Vec<_>>().len(), 5);
612 assert_eq!(definition.kind(), ServiceKind::Primary);
613 assert_eq!(definition.uuid(), crate::PACS_UUID);
614
615 let (fake_gatt_server, _) = FakeServer::new();
617 assert!(server.publish(fake_gatt_server).is_err());
618 }
619
620 #[test]
621 fn read_from_server() {
622 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
623
624 let mut server = default_server_builder()
625 .build::<FakeTypes>(
626 AudioContexts::new(
627 HashSet::from([ContextType::Media]),
628 HashSet::from([ContextType::Media]),
629 ),
630 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
631 )
632 .unwrap();
633
634 let (fake_gatt_server, mut event_receiver) = FakeServer::new();
635 let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
636
637 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
639 panic!("Should be pending");
640 };
641
642 let mut event_stream = event_receiver.next();
644 let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
645 event_stream.poll_unpin(&mut noop_cx)
646 else {
647 panic!("Should be published");
648 };
649
650 let available_char_handle = server.available_audio_contexts.handle;
652 fake_gatt_server.incoming_read(PeerId(0x01), id, available_char_handle, 0);
653
654 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
656 panic!("Should be pending");
657 };
658
659 let Poll::Ready(Some(FakeServerEvent::ReadResponded { handle, value, .. })) =
661 event_stream.poll_unpin(&mut noop_cx)
662 else {
663 panic!("Should be published");
664 };
665 assert_eq!(handle, available_char_handle);
666 assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
667 }
668
669 #[test]
670 fn update_available() {
671 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
672
673 let mut server = default_server_builder()
674 .build::<FakeTypes>(
675 AudioContexts::new(
676 HashSet::from([ContextType::Media, ContextType::Alerts]),
677 HashSet::from([ContextType::Media]),
678 ),
679 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
680 )
681 .unwrap();
682
683 let (fake_gatt_server, mut event_receiver) = FakeServer::new();
684 let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
685
686 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
688 panic!("Should be pending");
689 };
690
691 let mut event_stream = event_receiver.next();
693 let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
694 event_stream.poll_unpin(&mut noop_cx)
695 else {
696 panic!("Should be published");
697 };
698
699 fake_gatt_server.incoming_client_configuration(
701 PeerId(1),
702 id,
703 AVAILABLE_AUDIO_CONTEXTS_HANDLE,
704 NotificationType::Disable,
705 );
706
707 let updated = server.update_available(AudioContexts {
709 sink: HashSet::from([ContextType::Alerts]),
710 source: HashSet::from([ContextType::Media, ContextType::Conversational]),
711 });
712 assert_eq!(
713 updated.sink,
714 AvailableContexts::Available(HashSet::from([ContextType::Alerts]))
715 );
716 assert_eq!(
717 updated.source,
718 AvailableContexts::Available(HashSet::from([ContextType::Media]))
719 );
720 let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
722 panic!("No peer registered for notifications");
723 };
724
725 fake_gatt_server.incoming_client_configuration(
727 PeerId(1),
728 id,
729 AVAILABLE_AUDIO_CONTEXTS_HANDLE,
730 NotificationType::Notify,
731 );
732 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
734 panic!("Should be pending");
735 };
736 let updated = server.update_available(AudioContexts {
738 sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
739 source: HashSet::from([ContextType::Media]),
740 });
741 assert_eq!(
742 updated.sink,
743 AvailableContexts::Available(HashSet::from([ContextType::Media, ContextType::Alerts]))
744 );
745 assert_eq!(
746 updated.source,
747 AvailableContexts::Available(HashSet::from([ContextType::Media]))
748 );
749 let Poll::Ready(Some(FakeServerEvent::Notified { service_id, handle, value, peers })) =
751 event_stream.poll_unpin(&mut noop_cx)
752 else {
753 panic!("Should be notified");
754 };
755 assert_eq!(service_id, id);
756 assert_eq!(handle, AVAILABLE_AUDIO_CONTEXTS_HANDLE);
757 assert_eq!(value, vec![0x04, 0x04, 0x04, 0x00]);
758 assert_eq!(peers, vec![PeerId(1)]);
759
760 let _ = server.update_available(AudioContexts {
762 sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
763 source: HashSet::from([ContextType::Media, ContextType::Conversational]),
764 });
765 let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
767 panic!("Should not be notified");
768 };
769 }
770}