1use bt_common::generic_audio::ContextType;
33use bt_gatt::Server as _;
34use bt_gatt::server::LocalService;
35use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
36use bt_gatt::types::{GattError, Handle};
37use futures::task::{Poll, Waker};
38use futures::{Future, Stream};
39use pin_project::pin_project;
40use std::collections::HashMap;
41use thiserror::Error;
42
43use crate::{
44 AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
45 SupportedAudioContexts,
46};
47
48pub(crate) mod types;
49use crate::server::types::*;
50
51#[pin_project(project = LocalServiceProj)]
52enum LocalServiceState<T: bt_gatt::ServerTypes> {
53 NotPublished {
54 waker: Option<Waker>,
55 },
56 Preparing {
57 #[pin]
58 fut: T::LocalServiceFut,
59 },
60 Published {
61 service: T::LocalService,
62 #[pin]
63 events: T::ServiceEventStream,
64 },
65 Terminated,
66}
67
68impl<T: bt_gatt::ServerTypes> Default for LocalServiceState<T> {
69 fn default() -> Self {
70 Self::NotPublished { waker: None }
71 }
72}
73
74#[derive(Debug, Error)]
75pub enum Error {
76 #[error("Service is already published")]
77 AlreadyPublished,
78 #[error("Issue publishing service: {0}")]
79 PublishError(#[from] bt_gatt::types::Error),
80 #[error("Service should support at least one of Sink or Source PAC characteristics")]
81 MissingPac,
82 #[error("Available audio contexts are not supported: {0:?}")]
83 UnsupportedAudioContexts(Vec<ContextType>),
84}
85
86impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> {
87 type Item = Result<bt_gatt::server::ServiceEvent<T>, Error>;
88
89 fn poll_next(
90 mut self: std::pin::Pin<&mut Self>,
91 cx: &mut std::task::Context<'_>,
92 ) -> Poll<Option<Self::Item>> {
93 loop {
99 match self.as_mut().project() {
100 LocalServiceProj::Terminated => return Poll::Ready(None),
101 LocalServiceProj::NotPublished { .. } => {
102 self.as_mut()
103 .set(LocalServiceState::NotPublished { waker: Some(cx.waker().clone()) });
104 return Poll::Pending;
105 }
106 LocalServiceProj::Preparing { fut } => {
107 let service_result = futures::ready!(fut.poll(cx));
108 let Ok(service) = service_result else {
109 return Poll::Ready(Some(Err(Error::PublishError(
110 service_result.err().unwrap(),
111 ))));
112 };
113 let events = service.publish();
114 self.as_mut().set(LocalServiceState::Published { service, events });
115 continue;
116 }
117 LocalServiceProj::Published { service: _, events } => {
118 let item = futures::ready!(events.poll_next(cx));
119 let Some(gatt_result) = item else {
120 self.as_mut().set(LocalServiceState::Terminated);
121 return Poll::Ready(None);
122 };
123 let Ok(event) = gatt_result else {
124 self.as_mut().set(LocalServiceState::Terminated);
125 return Poll::Ready(Some(Err(Error::PublishError(
126 gatt_result.err().unwrap(),
127 ))));
128 };
129 return Poll::Ready(Some(Ok(event)));
130 }
131 }
132 }
133 }
134}
135
136impl<T: bt_gatt::ServerTypes> LocalServiceState<T> {
137 fn is_published(&self) -> bool {
138 if let LocalServiceState::NotPublished { .. } = self { false } else { true }
139 }
140}
141
142#[derive(Default)]
143pub struct ServerBuilder {
144 source_pacs: Vec<Vec<PacRecord>>,
145 source_audio_locations: Option<AudioLocations>,
146 sink_pacs: Vec<Vec<PacRecord>>,
147 sink_audio_locations: Option<AudioLocations>,
148}
149
150impl ServerBuilder {
151 pub fn new() -> ServerBuilder {
152 ServerBuilder::default()
153 }
154
155 pub fn add_source(mut self, capabilities: Vec<PacRecord>) -> Self {
160 if !capabilities.is_empty() {
161 self.source_pacs.push(capabilities);
162 }
163 self
164 }
165
166 pub fn set_source_locations(mut self, audio_locations: AudioLocations) -> Self {
169 self.source_audio_locations = Some(audio_locations);
170 self
171 }
172
173 pub fn add_sink(mut self, capabilities: Vec<PacRecord>) -> Self {
178 if !capabilities.is_empty() {
179 self.sink_pacs.push(capabilities);
180 }
181 self
182 }
183
184 pub fn set_sink_locations(mut self, audio_locations: AudioLocations) -> Self {
187 self.sink_audio_locations = Some(audio_locations);
188 self
189 }
190
191 fn verify_characteristics(
192 &self,
193 supported: &AudioContexts,
194 available: &AudioContexts,
195 ) -> Result<(), Error> {
196 let diff: Vec<ContextType> = available.sink.difference(&supported.sink).cloned().collect();
200 if diff.len() != 0 {
201 return Err(Error::UnsupportedAudioContexts(diff));
202 }
203 let diff: Vec<ContextType> =
204 available.source.difference(&supported.source).cloned().collect();
205 if diff.len() != 0 {
206 return Err(Error::UnsupportedAudioContexts(diff));
207 }
208
209 if self.source_pacs.len() == 0 && self.sink_pacs.len() == 0 {
211 return Err(Error::MissingPac);
212 }
213 Ok(())
214 }
215
216 pub fn build<T>(
219 mut self,
220 mut supported: AudioContexts,
221 available: AudioContexts,
222 ) -> Result<Server<T>, Error>
223 where
224 T: bt_gatt::ServerTypes,
225 {
226 let _ = self.verify_characteristics(&supported, &available)?;
227
228 let mut service_def = ServiceDefinition::new(
229 bt_gatt::server::ServiceId::new(1),
230 crate::PACS_UUID,
231 bt_gatt::types::ServiceKind::Primary,
232 );
233
234 let supported = SupportedAudioContexts {
235 handle: SUPPORTED_AUDIO_CONTEXTS_HANDLE,
236 sink: supported.sink.drain().collect(),
237 source: supported.source.drain().collect(),
238 };
239 let _ = service_def.add_characteristic((&supported).into());
240
241 let available = AvailableAudioContexts {
242 handle: AVAILABLE_AUDIO_CONTEXTS_HANDLE,
243 sink: (&available.sink).into(),
244 source: (&available.source).into(),
245 };
246 let _ = service_def.add_characteristic((&available).into());
247
248 let mut next_handle_iter = (HANDLE_OFFSET..).map(|x| Handle(x));
249 let mut audio_capabilities = HashMap::new();
250
251 let sink_audio_locations = match self.sink_audio_locations.take() {
254 Some(locations) if self.sink_pacs.len() > 0 => {
255 let sink =
256 SinkAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
257 let _ = service_def.add_characteristic((&sink).into());
258 Some(sink)
259 }
260 _ => None,
261 };
262 for capabilities in self.sink_pacs.drain(..) {
263 let handle = next_handle_iter.next().unwrap();
264 let pac = PublishedAudioCapability::new_sink(handle, capabilities);
265 let _ = service_def.add_characteristic((&pac).into());
266 audio_capabilities.insert(handle, pac);
267 }
268
269 let source_audio_locations = match self.source_audio_locations.take() {
272 Some(locations) if self.source_pacs.len() > 0 => {
273 let source =
274 SourceAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
275 let _ = service_def.add_characteristic((&source).into());
276 Some(source)
277 }
278 _ => None,
279 };
280 for capabilities in self.source_pacs.drain(..) {
281 let handle = next_handle_iter.next().unwrap();
282 let pac = PublishedAudioCapability::new_source(handle, capabilities);
283 let _ = service_def.add_characteristic((&pac).into());
284 audio_capabilities.insert(handle, pac);
285 }
286
287 let server = Server {
288 service_def,
289 local_service: Default::default(),
290 published_audio_capabilities: audio_capabilities,
291 source_audio_locations,
292 sink_audio_locations,
293 available_audio_contexts: available,
294 supported_audio_contexts: supported,
295 };
296 Ok(server)
297 }
298}
299
300#[pin_project]
301pub struct Server<T: bt_gatt::ServerTypes> {
302 service_def: ServiceDefinition,
303 #[pin]
304 local_service: LocalServiceState<T>,
305 published_audio_capabilities: HashMap<Handle, PublishedAudioCapability>,
306 source_audio_locations: Option<SourceAudioLocations>,
307 sink_audio_locations: Option<SinkAudioLocations>,
308 available_audio_contexts: AvailableAudioContexts,
309 supported_audio_contexts: SupportedAudioContexts,
310}
311
312impl<T: bt_gatt::ServerTypes> Server<T> {
313 pub fn publish(&mut self, server: T::Server) -> Result<(), Error> {
314 if self.local_service.is_published() {
315 return Err(Error::AlreadyPublished);
316 }
317
318 let LocalServiceState::NotPublished { waker } = std::mem::replace(
319 &mut self.local_service,
320 LocalServiceState::Preparing { fut: server.prepare(self.service_def.clone()) },
321 ) else {
322 unreachable!();
323 };
324 waker.map(Waker::wake);
325 Ok(())
326 }
327
328 fn is_source_locations_handle(&self, handle: Handle) -> bool {
329 self.source_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
330 }
331
332 fn is_sink_locations_handle(&self, handle: Handle) -> bool {
333 self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
334 }
335}
336
337impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
338 type Item = Result<(), Error>;
339
340 fn poll_next(
341 mut self: std::pin::Pin<&mut Self>,
342 cx: &mut std::task::Context<'_>,
343 ) -> std::task::Poll<Option<Self::Item>> {
344 loop {
345 let mut this = self.as_mut().project();
346 let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) {
347 None => return Poll::Ready(None),
348 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
349 Some(Ok(event)) => event,
350 };
351 use bt_gatt::server::ServiceEvent::*;
352 match gatt_event {
353 Read { handle, offset, responder, .. } => {
354 let offset = offset as usize;
355 let value = match handle {
356 x if x == AVAILABLE_AUDIO_CONTEXTS_HANDLE => {
357 self.available_audio_contexts.into_char_value()
358 }
359 x if x == SUPPORTED_AUDIO_CONTEXTS_HANDLE => {
360 self.supported_audio_contexts.into_char_value()
361 }
362 x if self.is_source_locations_handle(x) => {
363 self.source_audio_locations.as_ref().unwrap().into_char_value()
364 }
365 x if self.is_sink_locations_handle(x) => {
366 self.sink_audio_locations.as_ref().unwrap().into_char_value()
367 }
368 pac_handle => {
369 let Some(ref pac) = self.published_audio_capabilities.get(&pac_handle)
370 else {
371 responder.error(GattError::InvalidHandle);
372 continue;
373 };
374 pac.encode()
375 }
376 };
377 responder.respond(&value[offset..]);
378 continue;
379 }
380 Write { responder, .. } => {
382 responder.error(GattError::WriteNotPermitted);
383 continue;
384 }
385 ClientConfiguration { .. } => {
387 unimplemented!();
388 }
389 _ => continue,
390 }
391 }
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 use bt_common::PeerId;
400 use bt_common::core::{CodecId, CodingFormat};
401 use bt_common::generic_audio::AudioLocation;
402 use bt_common::generic_audio::codec_capabilities::*;
403 use bt_gatt::server;
404 use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
405 use bt_gatt::types::ServiceKind;
406 use futures::{FutureExt, StreamExt};
407
408 use std::collections::HashSet;
409
410 use crate::AvailableContexts;
411
412 fn default_server_builder() -> ServerBuilder {
416 let builder = ServerBuilder::new()
417 .add_sink(vec![PacRecord {
418 codec_id: CodecId::Assigned(CodingFormat::ALawLog),
419 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
420 FrameDurationSupport::BothNoPreference,
421 )],
422 metadata: vec![],
423 }])
424 .set_sink_locations(AudioLocations {
425 locations: HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight]),
426 })
427 .add_source(vec![
428 PacRecord {
429 codec_id: CodecId::Assigned(CodingFormat::ALawLog),
430 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
431 FrameDurationSupport::BothNoPreference,
432 )],
433 metadata: vec![],
434 },
435 PacRecord {
436 codec_id: CodecId::Assigned(CodingFormat::MuLawLog),
437 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
438 FrameDurationSupport::BothNoPreference,
439 )],
440 metadata: vec![],
441 },
442 ])
443 .add_source(vec![]);
444 builder
445 }
446
447 #[test]
448 fn build_server() {
449 let server = default_server_builder()
450 .build::<FakeTypes>(
451 AudioContexts::new(
452 HashSet::from([ContextType::Conversational, ContextType::Media]),
453 HashSet::from([ContextType::Media]),
454 ),
455 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
456 )
457 .expect("should succeed");
458 assert_eq!(server.published_audio_capabilities.len(), 2);
459
460 assert_eq!(server.supported_audio_contexts.handle.0, 1);
461 assert_eq!(
462 server.supported_audio_contexts.sink,
463 HashSet::from([ContextType::Conversational, ContextType::Media])
464 );
465 assert_eq!(server.supported_audio_contexts.source, HashSet::from([ContextType::Media]));
466
467 assert_eq!(server.available_audio_contexts.handle.0, 2);
468 assert_eq!(
469 server.available_audio_contexts.sink,
470 AvailableContexts::Available(HashSet::from([ContextType::Media]))
471 );
472 assert_eq!(server.available_audio_contexts.source, AvailableContexts::NotAvailable);
473
474 let location_char = server.sink_audio_locations.as_ref().expect("should exist");
476 assert_eq!(location_char.handle.0, 3);
477 assert_eq!(
478 location_char.locations.locations,
479 HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight])
480 );
481
482 let mut sink_iter =
483 server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_sink());
484 let sink_char = sink_iter.next().expect("should exist");
485 assert_eq!(sink_char.0, &Handle(4));
486 assert_eq!(sink_char.1.pac_records().len(), 1);
487 assert!(sink_iter.next().is_none());
488
489 assert!(server.source_audio_locations.is_none());
491 let mut source_iter =
492 server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_source());
493 let source_char = source_iter.next().expect("should exist");
494 assert_eq!(source_char.0, &Handle(5));
495 assert_eq!(source_char.1.pac_records().len(), 2);
496 assert_eq!(source_iter.next(), None);
497 }
498
499 #[test]
500 fn build_server_error() {
501 assert!(
503 ServerBuilder::new()
504 .build::<FakeTypes>(
505 AudioContexts::new(
506 HashSet::from([ContextType::Conversational, ContextType::Media]),
507 HashSet::from([ContextType::Media]),
508 ),
509 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
510 )
511 .is_err()
512 );
513
514 assert!(
516 default_server_builder()
517 .build::<FakeTypes>(
518 AudioContexts::new(
519 HashSet::from([ContextType::Conversational, ContextType::Media]),
520 HashSet::from([ContextType::Media]),
521 ),
522 AudioContexts::new(HashSet::from([ContextType::Alerts]), HashSet::new()),
523 )
524 .is_err()
525 );
526
527 assert!(
529 default_server_builder()
530 .build::<FakeTypes>(
531 AudioContexts::new(
532 HashSet::from([ContextType::Conversational, ContextType::Media]),
533 HashSet::from([ContextType::Media]),
534 ),
535 AudioContexts::new(
536 HashSet::from([]),
537 HashSet::from([ContextType::EmergencyAlarm])
538 ),
539 )
540 .is_err()
541 );
542 }
543
544 #[test]
545 fn publish_server() {
546 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
547
548 let mut server = default_server_builder()
549 .build::<FakeTypes>(
550 AudioContexts::new(
551 HashSet::from([ContextType::Media]),
552 HashSet::from([ContextType::Media]),
553 ),
554 AudioContexts::new(HashSet::new(), HashSet::new()),
555 )
556 .unwrap();
557
558 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
560 panic!("Should be pending");
561 };
562
563 let (fake_gatt_server, mut event_receiver) = FakeServer::new();
564
565 let mut event_stream = event_receiver.next();
567 let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
568 panic!("Should be pending");
569 };
570
571 let _ = server.publish(fake_gatt_server).expect("should succeed");
572
573 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
575 panic!("Should be pending");
576 };
577
578 let Poll::Ready(Some(FakeServerEvent::Published { id, definition })) =
580 event_stream.poll_unpin(&mut noop_cx)
581 else {
582 panic!("Should be published");
583 };
584 assert_eq!(id, server::ServiceId::new(1));
585 assert_eq!(definition.characteristics().collect::<Vec<_>>().len(), 5);
586 assert_eq!(definition.kind(), ServiceKind::Primary);
587 assert_eq!(definition.uuid(), crate::PACS_UUID);
588
589 let (fake_gatt_server, _) = FakeServer::new();
591 assert!(server.publish(fake_gatt_server).is_err());
592 }
593
594 #[test]
595 fn read_from_server() {
596 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
597
598 let mut server = default_server_builder()
599 .build::<FakeTypes>(
600 AudioContexts::new(
601 HashSet::from([ContextType::Media]),
602 HashSet::from([ContextType::Media]),
603 ),
604 AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
605 )
606 .unwrap();
607
608 let (fake_gatt_server, mut event_receiver) = FakeServer::new();
609 let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
610
611 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
613 panic!("Should be pending");
614 };
615
616 let mut event_stream = event_receiver.next();
618 let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
619 event_stream.poll_unpin(&mut noop_cx)
620 else {
621 panic!("Should be published");
622 };
623
624 let available_char_handle = server.available_audio_contexts.handle;
626 fake_gatt_server.incoming_read(PeerId(0x01), id, available_char_handle, 0);
627
628 let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
630 panic!("Should be pending");
631 };
632
633 let Poll::Ready(Some(FakeServerEvent::ReadResponded { handle, value, .. })) =
635 event_stream.poll_unpin(&mut noop_cx)
636 else {
637 panic!("Should be published");
638 };
639 assert_eq!(handle, available_char_handle);
640 assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
641 }
642}