bt_pacs/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implements the Published Audio Capabilities Service server role.
6//!
7//! Use the `ServerBuilder` to define a new `Server` instance with the specified
8//! characteristics. The server isn't published to GATT until
9//! `Server::publish` method is called.
10//! Once the `Server` is published, poll on it to receive events from the
11//! `Server`, which are created as it processes incoming client requests.
12//!
13//! For example:
14//!
15//! // Set up a GATT Server which implements `bt_gatt::ServerTypes::Server`.
16//! let gatt_server = ...;
17//! // Define supported and available audio contexts for this PACS.
18//! let supported = AudioContexts::new(...);
19//! let available = AudioContexts::new(...);
20//! let pacs_server = ServerBuilder::new()
21//!         .with_sources(...)
22//!         .with_sinks(...)
23//!         .build(supported, available)?;
24//!
25//! // Publish the server.
26//! pacs_server.publish(gatt_server).expect("publishes fine");
27//! // Process events from the PACS server.
28//! while let Some(event) = pacs_server.next().await {
29//!     // Do something with `event`
30//! }
31
32use bt_common::generic_audio::ContextType;
33use bt_gatt::Server as _;
34use bt_gatt::server::LocalService;
35use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
36use bt_gatt::types::{GattError, Handle};
37use futures::task::{Poll, Waker};
38use futures::{Future, Stream};
39use pin_project::pin_project;
40use std::collections::HashMap;
41use thiserror::Error;
42
43use crate::{
44    AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
45    SupportedAudioContexts,
46};
47
48pub(crate) mod types;
49use crate::server::types::*;
50
51#[pin_project(project = LocalServiceProj)]
52enum LocalServiceState<T: bt_gatt::ServerTypes> {
53    NotPublished {
54        waker: Option<Waker>,
55    },
56    Preparing {
57        #[pin]
58        fut: T::LocalServiceFut,
59    },
60    Published {
61        service: T::LocalService,
62        #[pin]
63        events: T::ServiceEventStream,
64    },
65    Terminated,
66}
67
68impl<T: bt_gatt::ServerTypes> Default for LocalServiceState<T> {
69    fn default() -> Self {
70        Self::NotPublished { waker: None }
71    }
72}
73
74#[derive(Debug, Error)]
75pub enum Error {
76    #[error("Service is already published")]
77    AlreadyPublished,
78    #[error("Issue publishing service: {0}")]
79    PublishError(#[from] bt_gatt::types::Error),
80    #[error("Service should support at least one of Sink or Source PAC characteristics")]
81    MissingPac,
82    #[error("Available audio contexts are not supported: {0:?}")]
83    UnsupportedAudioContexts(Vec<ContextType>),
84}
85
86impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> {
87    type Item = Result<bt_gatt::server::ServiceEvent<T>, Error>;
88
89    fn poll_next(
90        mut self: std::pin::Pin<&mut Self>,
91        cx: &mut std::task::Context<'_>,
92    ) -> Poll<Option<Self::Item>> {
93        // SAFETY:
94        //  - Wakers are Unpin
95        //  - We re-pin the structurally pinned futures in Preparing and Published
96        //    (service is untouched)
97        //  - Terminated is empty
98        loop {
99            match self.as_mut().project() {
100                LocalServiceProj::Terminated => return Poll::Ready(None),
101                LocalServiceProj::NotPublished { .. } => {
102                    self.as_mut()
103                        .set(LocalServiceState::NotPublished { waker: Some(cx.waker().clone()) });
104                    return Poll::Pending;
105                }
106                LocalServiceProj::Preparing { fut } => {
107                    let service_result = futures::ready!(fut.poll(cx));
108                    let Ok(service) = service_result else {
109                        return Poll::Ready(Some(Err(Error::PublishError(
110                            service_result.err().unwrap(),
111                        ))));
112                    };
113                    let events = service.publish();
114                    self.as_mut().set(LocalServiceState::Published { service, events });
115                    continue;
116                }
117                LocalServiceProj::Published { service: _, events } => {
118                    let item = futures::ready!(events.poll_next(cx));
119                    let Some(gatt_result) = item else {
120                        self.as_mut().set(LocalServiceState::Terminated);
121                        return Poll::Ready(None);
122                    };
123                    let Ok(event) = gatt_result else {
124                        self.as_mut().set(LocalServiceState::Terminated);
125                        return Poll::Ready(Some(Err(Error::PublishError(
126                            gatt_result.err().unwrap(),
127                        ))));
128                    };
129                    return Poll::Ready(Some(Ok(event)));
130                }
131            }
132        }
133    }
134}
135
136impl<T: bt_gatt::ServerTypes> LocalServiceState<T> {
137    fn is_published(&self) -> bool {
138        if let LocalServiceState::NotPublished { .. } = self { false } else { true }
139    }
140}
141
142#[derive(Default)]
143pub struct ServerBuilder {
144    source_pacs: Vec<Vec<PacRecord>>,
145    source_audio_locations: Option<AudioLocations>,
146    sink_pacs: Vec<Vec<PacRecord>>,
147    sink_audio_locations: Option<AudioLocations>,
148}
149
150impl ServerBuilder {
151    pub fn new() -> ServerBuilder {
152        ServerBuilder::default()
153    }
154
155    /// Adds a source PAC characteristic to the builder.
156    /// Each call adds a new characteristic.
157    /// `capabilities` represents the records for a single PAC characteristic.
158    /// If `capabilities` is empty, it will be ignored.
159    pub fn add_source(mut self, capabilities: Vec<PacRecord>) -> Self {
160        if !capabilities.is_empty() {
161            self.source_pacs.push(capabilities);
162        }
163        self
164    }
165
166    /// Sets the audio locations for the source.
167    /// This corresponds to a single Source Audio Locations characteristic.
168    pub fn set_source_locations(mut self, audio_locations: AudioLocations) -> Self {
169        self.source_audio_locations = Some(audio_locations);
170        self
171    }
172
173    /// Adds a sink PAC characteristic to the builder.
174    /// Each call adds a new characteristic.
175    /// `capabilities` represents the records for a single PAC characteristic.
176    /// If `capabilities` is empty, it will be ignored.
177    pub fn add_sink(mut self, capabilities: Vec<PacRecord>) -> Self {
178        if !capabilities.is_empty() {
179            self.sink_pacs.push(capabilities);
180        }
181        self
182    }
183
184    /// Sets the audio locations for the sink.
185    /// This corresponds to a single Sink Audio Locations characteristic.
186    pub fn set_sink_locations(mut self, audio_locations: AudioLocations) -> Self {
187        self.sink_audio_locations = Some(audio_locations);
188        self
189    }
190
191    fn verify_characteristics(
192        &self,
193        supported: &AudioContexts,
194        available: &AudioContexts,
195    ) -> Result<(), Error> {
196        // If the corresponding bit in the supported audio contexts is
197        // not set to 0b1, we shall not set a bit to 0b1 in the
198        // available audio contexts. See PACS v1.0.1 section 3.5.1.
199        let diff: Vec<ContextType> = available.sink.difference(&supported.sink).cloned().collect();
200        if diff.len() != 0 {
201            return Err(Error::UnsupportedAudioContexts(diff));
202        }
203        let diff: Vec<ContextType> =
204            available.source.difference(&supported.source).cloned().collect();
205        if diff.len() != 0 {
206            return Err(Error::UnsupportedAudioContexts(diff));
207        }
208
209        // PACS server must have at least one Sink or Source PACS record.
210        if self.source_pacs.len() == 0 && self.sink_pacs.len() == 0 {
211            return Err(Error::MissingPac);
212        }
213        Ok(())
214    }
215
216    /// Builds a server after verifying all the defined characteristics
217    /// for this server (see PACS v1.0.1 section 3 for details).
218    pub fn build<T>(
219        mut self,
220        mut supported: AudioContexts,
221        available: AudioContexts,
222    ) -> Result<Server<T>, Error>
223    where
224        T: bt_gatt::ServerTypes,
225    {
226        let _ = self.verify_characteristics(&supported, &available)?;
227
228        let mut service_def = ServiceDefinition::new(
229            bt_gatt::server::ServiceId::new(1),
230            crate::PACS_UUID,
231            bt_gatt::types::ServiceKind::Primary,
232        );
233
234        let supported = SupportedAudioContexts {
235            handle: SUPPORTED_AUDIO_CONTEXTS_HANDLE,
236            sink: supported.sink.drain().collect(),
237            source: supported.source.drain().collect(),
238        };
239        let _ = service_def.add_characteristic((&supported).into());
240
241        let available = AvailableAudioContexts {
242            handle: AVAILABLE_AUDIO_CONTEXTS_HANDLE,
243            sink: (&available.sink).into(),
244            source: (&available.source).into(),
245        };
246        let _ = service_def.add_characteristic((&available).into());
247
248        let mut next_handle_iter = (HANDLE_OFFSET..).map(|x| Handle(x));
249        let mut audio_capabilities = HashMap::new();
250
251        // Sink audio locations characteristic may exist iff it's defined
252        // and there are valid sink PAC characteristics.
253        let sink_audio_locations = match self.sink_audio_locations.take() {
254            Some(locations) if self.sink_pacs.len() > 0 => {
255                let sink =
256                    SinkAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
257                let _ = service_def.add_characteristic((&sink).into());
258                Some(sink)
259            }
260            _ => None,
261        };
262        for capabilities in self.sink_pacs.drain(..) {
263            let handle = next_handle_iter.next().unwrap();
264            let pac = PublishedAudioCapability::new_sink(handle, capabilities);
265            let _ = service_def.add_characteristic((&pac).into());
266            audio_capabilities.insert(handle, pac);
267        }
268
269        // Source audio locations characteristic may exist iff it's defined
270        // and there are valid source PAC characteristics.
271        let source_audio_locations = match self.source_audio_locations.take() {
272            Some(locations) if self.source_pacs.len() > 0 => {
273                let source =
274                    SourceAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
275                let _ = service_def.add_characteristic((&source).into());
276                Some(source)
277            }
278            _ => None,
279        };
280        for capabilities in self.source_pacs.drain(..) {
281            let handle = next_handle_iter.next().unwrap();
282            let pac = PublishedAudioCapability::new_source(handle, capabilities);
283            let _ = service_def.add_characteristic((&pac).into());
284            audio_capabilities.insert(handle, pac);
285        }
286
287        let server = Server {
288            service_def,
289            local_service: Default::default(),
290            published_audio_capabilities: audio_capabilities,
291            source_audio_locations,
292            sink_audio_locations,
293            available_audio_contexts: available,
294            supported_audio_contexts: supported,
295        };
296        Ok(server)
297    }
298}
299
300#[pin_project]
301pub struct Server<T: bt_gatt::ServerTypes> {
302    service_def: ServiceDefinition,
303    #[pin]
304    local_service: LocalServiceState<T>,
305    published_audio_capabilities: HashMap<Handle, PublishedAudioCapability>,
306    source_audio_locations: Option<SourceAudioLocations>,
307    sink_audio_locations: Option<SinkAudioLocations>,
308    available_audio_contexts: AvailableAudioContexts,
309    supported_audio_contexts: SupportedAudioContexts,
310}
311
312impl<T: bt_gatt::ServerTypes> Server<T> {
313    pub fn publish(&mut self, server: T::Server) -> Result<(), Error> {
314        if self.local_service.is_published() {
315            return Err(Error::AlreadyPublished);
316        }
317
318        let LocalServiceState::NotPublished { waker } = std::mem::replace(
319            &mut self.local_service,
320            LocalServiceState::Preparing { fut: server.prepare(self.service_def.clone()) },
321        ) else {
322            unreachable!();
323        };
324        waker.map(Waker::wake);
325        Ok(())
326    }
327
328    fn is_source_locations_handle(&self, handle: Handle) -> bool {
329        self.source_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
330    }
331
332    fn is_sink_locations_handle(&self, handle: Handle) -> bool {
333        self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
334    }
335}
336
337impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
338    type Item = Result<(), Error>;
339
340    fn poll_next(
341        mut self: std::pin::Pin<&mut Self>,
342        cx: &mut std::task::Context<'_>,
343    ) -> std::task::Poll<Option<Self::Item>> {
344        loop {
345            let mut this = self.as_mut().project();
346            let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) {
347                None => return Poll::Ready(None),
348                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
349                Some(Ok(event)) => event,
350            };
351            use bt_gatt::server::ServiceEvent::*;
352            match gatt_event {
353                Read { handle, offset, responder, .. } => {
354                    let offset = offset as usize;
355                    let value = match handle {
356                        x if x == AVAILABLE_AUDIO_CONTEXTS_HANDLE => {
357                            self.available_audio_contexts.into_char_value()
358                        }
359                        x if x == SUPPORTED_AUDIO_CONTEXTS_HANDLE => {
360                            self.supported_audio_contexts.into_char_value()
361                        }
362                        x if self.is_source_locations_handle(x) => {
363                            self.source_audio_locations.as_ref().unwrap().into_char_value()
364                        }
365                        x if self.is_sink_locations_handle(x) => {
366                            self.sink_audio_locations.as_ref().unwrap().into_char_value()
367                        }
368                        pac_handle => {
369                            let Some(ref pac) = self.published_audio_capabilities.get(&pac_handle)
370                            else {
371                                responder.error(GattError::InvalidHandle);
372                                continue;
373                            };
374                            pac.encode()
375                        }
376                    };
377                    responder.respond(&value[offset..]);
378                    continue;
379                }
380                // TODO(b/309015071): support optional writes.
381                Write { responder, .. } => {
382                    responder.error(GattError::WriteNotPermitted);
383                    continue;
384                }
385                // TODO(b/309015071): implement notify since it's mandatory.
386                ClientConfiguration { .. } => {
387                    unimplemented!();
388                }
389                _ => continue,
390            }
391        }
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    use bt_common::PeerId;
400    use bt_common::core::{CodecId, CodingFormat};
401    use bt_common::generic_audio::AudioLocation;
402    use bt_common::generic_audio::codec_capabilities::*;
403    use bt_gatt::server;
404    use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
405    use bt_gatt::types::ServiceKind;
406    use futures::{FutureExt, StreamExt};
407
408    use std::collections::HashSet;
409
410    use crate::AvailableContexts;
411
412    // Builder for a server with:
413    // - 1 sink and 1 source PAC characteristics
414    // - sink audio locations
415    fn default_server_builder() -> ServerBuilder {
416        let builder = ServerBuilder::new()
417            .add_sink(vec![PacRecord {
418                codec_id: CodecId::Assigned(CodingFormat::ALawLog),
419                codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
420                    FrameDurationSupport::BothNoPreference,
421                )],
422                metadata: vec![],
423            }])
424            .set_sink_locations(AudioLocations {
425                locations: HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight]),
426            })
427            .add_source(vec![
428                PacRecord {
429                    codec_id: CodecId::Assigned(CodingFormat::ALawLog),
430                    codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
431                        FrameDurationSupport::BothNoPreference,
432                    )],
433                    metadata: vec![],
434                },
435                PacRecord {
436                    codec_id: CodecId::Assigned(CodingFormat::MuLawLog),
437                    codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
438                        FrameDurationSupport::BothNoPreference,
439                    )],
440                    metadata: vec![],
441                },
442            ])
443            .add_source(vec![]);
444        builder
445    }
446
447    #[test]
448    fn build_server() {
449        let server = default_server_builder()
450            .build::<FakeTypes>(
451                AudioContexts::new(
452                    HashSet::from([ContextType::Conversational, ContextType::Media]),
453                    HashSet::from([ContextType::Media]),
454                ),
455                AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
456            )
457            .expect("should succeed");
458        assert_eq!(server.published_audio_capabilities.len(), 2);
459
460        assert_eq!(server.supported_audio_contexts.handle.0, 1);
461        assert_eq!(
462            server.supported_audio_contexts.sink,
463            HashSet::from([ContextType::Conversational, ContextType::Media])
464        );
465        assert_eq!(server.supported_audio_contexts.source, HashSet::from([ContextType::Media]));
466
467        assert_eq!(server.available_audio_contexts.handle.0, 2);
468        assert_eq!(
469            server.available_audio_contexts.sink,
470            AvailableContexts::Available(HashSet::from([ContextType::Media]))
471        );
472        assert_eq!(server.available_audio_contexts.source, AvailableContexts::NotAvailable);
473
474        // Should have 1 sink PAC characteristic with audio locations.
475        let location_char = server.sink_audio_locations.as_ref().expect("should exist");
476        assert_eq!(location_char.handle.0, 3);
477        assert_eq!(
478            location_char.locations.locations,
479            HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight])
480        );
481
482        let mut sink_iter =
483            server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_sink());
484        let sink_char = sink_iter.next().expect("should exist");
485        assert_eq!(sink_char.0, &Handle(4));
486        assert_eq!(sink_char.1.pac_records().len(), 1);
487        assert!(sink_iter.next().is_none());
488
489        // Should have 1 source PAC characteristic w/o audio locations.
490        assert!(server.source_audio_locations.is_none());
491        let mut source_iter =
492            server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_source());
493        let source_char = source_iter.next().expect("should exist");
494        assert_eq!(source_char.0, &Handle(5));
495        assert_eq!(source_char.1.pac_records().len(), 2);
496        assert_eq!(source_iter.next(), None);
497    }
498
499    #[test]
500    fn build_server_error() {
501        // No sink or source PACs.
502        assert!(
503            ServerBuilder::new()
504                .build::<FakeTypes>(
505                    AudioContexts::new(
506                        HashSet::from([ContextType::Conversational, ContextType::Media]),
507                        HashSet::from([ContextType::Media]),
508                    ),
509                    AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
510                )
511                .is_err()
512        );
513
514        // Sink audio context in available not in supported.
515        assert!(
516            default_server_builder()
517                .build::<FakeTypes>(
518                    AudioContexts::new(
519                        HashSet::from([ContextType::Conversational, ContextType::Media]),
520                        HashSet::from([ContextType::Media]),
521                    ),
522                    AudioContexts::new(HashSet::from([ContextType::Alerts]), HashSet::new()),
523                )
524                .is_err()
525        );
526
527        // Sink audio context in available not in supported.
528        assert!(
529            default_server_builder()
530                .build::<FakeTypes>(
531                    AudioContexts::new(
532                        HashSet::from([ContextType::Conversational, ContextType::Media]),
533                        HashSet::from([ContextType::Media]),
534                    ),
535                    AudioContexts::new(
536                        HashSet::from([]),
537                        HashSet::from([ContextType::EmergencyAlarm])
538                    ),
539                )
540                .is_err()
541        );
542    }
543
544    #[test]
545    fn publish_server() {
546        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
547
548        let mut server = default_server_builder()
549            .build::<FakeTypes>(
550                AudioContexts::new(
551                    HashSet::from([ContextType::Media]),
552                    HashSet::from([ContextType::Media]),
553                ),
554                AudioContexts::new(HashSet::new(), HashSet::new()),
555            )
556            .unwrap();
557
558        // Server should be pending still since GATT server not establihsed.
559        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
560            panic!("Should be pending");
561        };
562
563        let (fake_gatt_server, mut event_receiver) = FakeServer::new();
564
565        // Event stream should be pending still since service not published.
566        let mut event_stream = event_receiver.next();
567        let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
568            panic!("Should be pending");
569        };
570
571        let _ = server.publish(fake_gatt_server).expect("should succeed");
572
573        // Server should poll on local server state.
574        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
575            panic!("Should be pending");
576        };
577
578        // Should receive event that GATT service was published.
579        let Poll::Ready(Some(FakeServerEvent::Published { id, definition })) =
580            event_stream.poll_unpin(&mut noop_cx)
581        else {
582            panic!("Should be published");
583        };
584        assert_eq!(id, server::ServiceId::new(1));
585        assert_eq!(definition.characteristics().collect::<Vec<_>>().len(), 5);
586        assert_eq!(definition.kind(), ServiceKind::Primary);
587        assert_eq!(definition.uuid(), crate::PACS_UUID);
588
589        // Server can only be published once.
590        let (fake_gatt_server, _) = FakeServer::new();
591        assert!(server.publish(fake_gatt_server).is_err());
592    }
593
594    #[test]
595    fn read_from_server() {
596        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
597
598        let mut server = default_server_builder()
599            .build::<FakeTypes>(
600                AudioContexts::new(
601                    HashSet::from([ContextType::Media]),
602                    HashSet::from([ContextType::Media]),
603                ),
604                AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
605            )
606            .unwrap();
607
608        let (fake_gatt_server, mut event_receiver) = FakeServer::new();
609        let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
610
611        // Server should poll on local server state.
612        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
613            panic!("Should be pending");
614        };
615
616        // Should receive event that GATT service was published.
617        let mut event_stream = event_receiver.next();
618        let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
619            event_stream.poll_unpin(&mut noop_cx)
620        else {
621            panic!("Should be published");
622        };
623
624        // Fake an incoming read from a remote peer.
625        let available_char_handle = server.available_audio_contexts.handle;
626        fake_gatt_server.incoming_read(PeerId(0x01), id, available_char_handle, 0);
627
628        // Server should still be pending.
629        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
630            panic!("Should be pending");
631        };
632
633        // We should received read response.
634        let Poll::Ready(Some(FakeServerEvent::ReadResponded { handle, value, .. })) =
635            event_stream.poll_unpin(&mut noop_cx)
636        else {
637            panic!("Should be published");
638        };
639        assert_eq!(handle, available_char_handle);
640        assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
641    }
642}