Skip to main content

bt_broadcast_assistant/assistant/
peer.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use bt_gatt::pii::GetPeerAddr;
6use futures::stream::FusedStream;
7use futures::Stream;
8use std::sync::Arc;
9use thiserror::Error;
10
11use bt_bap::types::BroadcastId;
12use bt_bass::client::error::Error as BassClientError;
13use bt_bass::client::event::Event as BassEvent;
14use bt_bass::client::BroadcastAudioScanServiceClient;
15#[cfg(any(test, feature = "debug"))]
16use bt_bass::types::BroadcastReceiveState;
17use bt_bass::types::{BisSync, PaSync};
18use bt_common::core::PeriodicAdvertisingInterval;
19use bt_common::packet_encoding::Error as PacketError;
20use bt_common::PeerId;
21#[cfg(any(test, feature = "debug"))]
22use bt_gatt::types::Handle;
23use std::collections::HashMap;
24
25use crate::assistant::DiscoveredBroadcastSources;
26
27#[derive(Debug, Error)]
28pub enum Error {
29    #[error("Failed to take event stream at Broadcast Audio Scan Service client")]
30    UnavailableBassEventStream,
31
32    #[error("Broadcast Audio Scan Service client error: {0:?}")]
33    BassClient(#[from] BassClientError),
34
35    #[error("Incomplete information for broadcast source with peer id ({0})")]
36    NotEnoughInfo(PeerId),
37
38    #[error("Broadcast source with peer id ({0}) does not exist")]
39    DoesNotExist(PeerId),
40
41    #[error("Failed to lookup address for peer id ({0}): {1:?}")]
42    AddressLookupError(PeerId, bt_gatt::types::Error),
43
44    #[error("Packet error: {0}")]
45    PacketError(#[from] PacketError),
46}
47
48/// Connected scan delegator peer. Clients can use this
49/// object to perform Broadcast Audio Scan Service operations on the
50/// scan delegator peer.
51/// Not thread-safe and only one operation must be done at a time.
52pub struct Peer<T: bt_gatt::GattTypes> {
53    peer_id: PeerId,
54    // Keep for peer connection.
55    _client: T::Client,
56    bass: BroadcastAudioScanServiceClient<T>,
57    // TODO(b/309015071): add a field for pacs.
58    broadcast_sources: Arc<DiscoveredBroadcastSources>,
59}
60
61impl<T: bt_gatt::GattTypes> Peer<T> {
62    pub(crate) fn new(
63        peer_id: PeerId,
64        client: T::Client,
65        bass: BroadcastAudioScanServiceClient<T>,
66        broadcast_sources: Arc<DiscoveredBroadcastSources>,
67    ) -> Self {
68        Peer { peer_id, _client: client, bass, broadcast_sources }
69    }
70
71    pub fn peer_id(&self) -> PeerId {
72        self.peer_id
73    }
74
75    /// Takes event stream for BASS events from this scan delegator peer.
76    /// Clients can call this method to start subscribing to BASS events.
77    pub fn take_event_stream(
78        &mut self,
79    ) -> Result<impl Stream<Item = Result<BassEvent, BassClientError>> + FusedStream, Error> {
80        self.bass.take_event_stream().ok_or(Error::UnavailableBassEventStream)
81    }
82
83    /// Send broadcast code for a particular broadcast.
84    pub async fn send_broadcast_code(
85        &self,
86        broadcast_id: BroadcastId,
87        broadcast_code: [u8; 16],
88    ) -> Result<(), Error> {
89        self.bass.set_broadcast_code(broadcast_id, broadcast_code).await.map_err(Into::into)
90    }
91
92    /// Sends a command to add a particular broadcast source.
93    ///
94    /// # Arguments
95    ///
96    /// * `broadcast_source_pid` - peer id of the braodcast source that's to be
97    ///   added to this scan delegator peer
98    /// * `address_lookup` - An implementation of [`GetPeerAddr`] that will be
99    ///   used to look up the peer's address.
100    /// * `pa_sync` - pa sync mode the peer should attempt to be in
101    /// * `bis_sync` - desired BIG to BIS synchronization information. If the
102    ///   set is empty, no preference value is used for all the BIGs
103    pub async fn add_broadcast_source(
104        &self,
105        source_peer_id: PeerId,
106        address_lookup: &impl GetPeerAddr,
107        pa_sync: PaSync,
108        bis_sync: HashMap<u8, BisSync>,
109    ) -> Result<(), Error> {
110        let mut broadcast_source = self
111            .broadcast_sources
112            .get_by_peer_id(&source_peer_id)
113            .ok_or(Error::DoesNotExist(source_peer_id))?;
114
115        let (broadcast_addr, broadcast_addr_type) = address_lookup
116            .get_peer_address(source_peer_id)
117            .await
118            .map_err(|err| Error::AddressLookupError(source_peer_id, err))?;
119        broadcast_source.with_address(broadcast_addr).with_address_type(broadcast_addr_type);
120
121        if !broadcast_source.into_add_source() {
122            return Err(Error::NotEnoughInfo(source_peer_id));
123        }
124
125        self.bass
126            .add_broadcast_source(
127                broadcast_source.broadcast_id.unwrap(),
128                broadcast_source.address_type.unwrap(),
129                broadcast_source.address.unwrap(),
130                broadcast_source.advertising_sid.unwrap(),
131                pa_sync,
132                broadcast_source
133                    .periodic_advertising_interval
134                    .unwrap_or(PeriodicAdvertisingInterval::unknown()),
135                broadcast_source.endpoint_to_big_subgroups(bis_sync).map_err(Error::PacketError)?,
136            )
137            .await
138            .map_err(Into::into)
139    }
140
141    /// Sends a command to to update a particular broadcast source's PA sync.
142    ///
143    /// # Arguments
144    ///
145    /// * `broadcast_id` - broadcast id of the broadcast source that's to be
146    ///   updated
147    /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
148    ///   in.
149    /// * `bis_sync` - desired BIG to BIS synchronization information
150    pub async fn update_broadcast_source_sync(
151        &self,
152        broadcast_id: BroadcastId,
153        pa_sync: PaSync,
154        bis_sync: HashMap<u8, BisSync>,
155    ) -> Result<(), Error> {
156        let pa_interval = self
157            .broadcast_sources
158            .get_by_broadcast_id(&broadcast_id)
159            .map(|bs| bs.periodic_advertising_interval)
160            .unwrap_or(None);
161
162        self.bass
163            .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, bis_sync, None)
164            .await
165            .map_err(Into::into)
166    }
167
168    /// Sends a command to remove a particular broadcast source.
169    ///
170    /// # Arguments
171    ///
172    /// * `broadcast_id` - broadcast id of the braodcast source that's to be
173    ///   removed from the scan delegator
174    pub async fn remove_broadcast_source(&self, broadcast_id: BroadcastId) -> Result<(), Error> {
175        self.bass.remove_broadcast_source(broadcast_id).await.map_err(Into::into)
176    }
177
178    /// Sends a command to inform the scan delegator peer that we have
179    /// started scanning for broadcast sources on behalf of it.
180    pub async fn inform_remote_scan_started(&self) -> Result<(), Error> {
181        self.bass.remote_scan_started().await.map_err(Into::into)
182    }
183
184    /// Sends a command to inform the scan delegator peer that we have
185    /// stopped scanning for broadcast sources on behalf of it.
186    pub async fn inform_remote_scan_stopped(&self) -> Result<(), Error> {
187        self.bass.remote_scan_stopped().await.map_err(Into::into)
188    }
189
190    /// Returns a list of BRS characteristics' latest values the scan delegator
191    /// has received.
192    #[cfg(any(test, feature = "debug"))]
193    pub fn get_broadcast_receive_states(&self) -> Vec<(Handle, BroadcastReceiveState)> {
194        self.bass.known_broadcast_sources()
195    }
196}
197
198#[cfg(test)]
199pub(crate) mod tests {
200    use super::*;
201
202    use assert_matches::assert_matches;
203    use bt_gatt::pii::StaticPeerAddr;
204    use futures::{pin_mut, FutureExt};
205    use std::collections::HashMap;
206    use std::task::Poll;
207
208    use bt_common::core::{AddressType, AdvertisingSetId};
209    use bt_gatt::test_utils::{FakeClient, FakeGetPeerAddr, FakePeerService, FakeTypes};
210    use bt_gatt::types::{
211        AttributePermissions, CharacteristicProperties, CharacteristicProperty, Handle,
212    };
213
214    use bt_gatt::Characteristic;
215
216    use crate::types::BroadcastSource;
217
218    const RECEIVE_STATE_HANDLE: Handle = Handle(0x11);
219    const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(0x12);
220
221    pub(crate) fn fake_bass_service() -> FakePeerService {
222        let mut peer_service = FakePeerService::new();
223        // One broadcast receive state and one broadcast audio scan control
224        // point characteristic handles.
225        peer_service.add_characteristic(
226            Characteristic {
227                handle: RECEIVE_STATE_HANDLE,
228                uuid: bt_bass::types::BROADCAST_RECEIVE_STATE_UUID,
229                properties: CharacteristicProperties(vec![
230                    CharacteristicProperty::Broadcast,
231                    CharacteristicProperty::Notify,
232                ]),
233                permissions: AttributePermissions::default(),
234                descriptors: vec![],
235            },
236            vec![],
237        );
238        peer_service.add_characteristic(
239            Characteristic {
240                handle: AUDIO_SCAN_CONTROL_POINT_HANDLE,
241                uuid: bt_bass::types::BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
242                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
243                permissions: AttributePermissions::default(),
244                descriptors: vec![],
245            },
246            vec![],
247        );
248        peer_service
249    }
250
251    fn setup() -> (Peer<FakeTypes>, FakePeerService, Arc<DiscoveredBroadcastSources>) {
252        let peer_service = fake_bass_service();
253
254        let broadcast_sources = DiscoveredBroadcastSources::new();
255        (
256            Peer {
257                peer_id: PeerId(0x1),
258                _client: FakeClient::new(),
259                bass: BroadcastAudioScanServiceClient::<FakeTypes>::create_for_test(
260                    peer_service.clone(),
261                    Handle(0x1),
262                ),
263                broadcast_sources: broadcast_sources.clone(),
264            },
265            peer_service,
266            broadcast_sources,
267        )
268    }
269
270    #[test]
271    fn take_event_stream() {
272        let (mut peer, _peer_service, _broadcast_source) = setup();
273        let _event_stream = peer.take_event_stream().expect("should succeed");
274
275        // If we try to take the event stream the second time, it should fail.
276        assert!(peer.take_event_stream().is_err());
277    }
278
279    #[test]
280    fn add_broadcast_source_fail() {
281        let (peer, _peer_service, broadcast_source) = setup();
282
283        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
284
285        // Should fail because broadcast source doesn't exist.
286        {
287            let fut = peer.add_broadcast_source(
288                PeerId(1001),
289                &FakeGetPeerAddr,
290                PaSync::SyncPastUnavailable,
291                HashMap::new(),
292            );
293            pin_mut!(fut);
294            let polled = fut.poll_unpin(&mut noop_cx);
295            assert_matches!(polled, Poll::Ready(Err(Error::DoesNotExist(_))));
296        }
297
298        let _ = broadcast_source.merge_broadcast_source_data(
299            &PeerId(1001),
300            &BroadcastSource::default()
301                .with_advertising_sid(AdvertisingSetId(1))
302                .with_broadcast_id(BroadcastId::try_from(1001).unwrap()),
303        );
304
305        // Should fail because peer address couldn't be looked up.
306        {
307            let address_lookup =
308                StaticPeerAddr::new_for_peer(PeerId(1002), [1, 2, 3, 4, 5, 6], AddressType::Public);
309            let fut = peer.add_broadcast_source(
310                PeerId(1001),
311                &address_lookup,
312                PaSync::SyncPastUnavailable,
313                HashMap::new(),
314            );
315            pin_mut!(fut);
316            let polled = fut.poll_unpin(&mut noop_cx);
317            assert_matches!(polled, Poll::Ready(Err(Error::AddressLookupError(_, _))));
318        }
319
320        // Should fail because not enough information.
321        {
322            let address_lookup =
323                StaticPeerAddr::new_for_peer(PeerId(1001), [1, 2, 3, 4, 5, 6], AddressType::Public);
324            let fut = peer.add_broadcast_source(
325                PeerId(1001),
326                &address_lookup,
327                PaSync::SyncPastUnavailable,
328                HashMap::new(),
329            );
330            pin_mut!(fut);
331            let polled = fut.poll_unpin(&mut noop_cx);
332            assert_matches!(polled, Poll::Ready(Err(Error::NotEnoughInfo(_))));
333        }
334    }
335}