Skip to main content

bt_bass/
client.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod error;
6pub mod event;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use futures::stream::{BoxStream, FusedStream, SelectAll, Stream, StreamExt};
12use futures::Future;
13use log::warn;
14use parking_lot::Mutex;
15
16use bt_bap::types::BroadcastId;
17use bt_common::core::{AddressType, AdvertisingSetId, PeriodicAdvertisingInterval};
18use bt_common::generic_audio::metadata_ltv::Metadata;
19use bt_common::packet_encoding::Decodable;
20use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
21use bt_gatt::types::{Handle, WriteMode};
22
23use crate::client::error::{Error, ServiceError};
24use crate::client::event::*;
25use crate::types::*;
26
27const READ_CHARACTERISTIC_BUFFER_SIZE: usize = 255;
28
29/// Keeps track of Source_ID and Broadcast_ID that are associated together.
30/// Source_ID is assigned by the BASS server to a Broadcast Receive State
31/// characteristic. If the remote peer with the BASS server autonomously
32/// synchronized to a PA or accepted the Add Source operation, the server
33/// selects an empty Broadcast Receive State characteristic to update or deletes
34/// one of the existing one to update. However, because the concept of Source_ID
35/// is unqiue to BASS, we track the Broadcast_ID that a Source_ID is associated
36/// so that it can be used by upper layers.
37#[derive(Default)]
38pub(crate) struct KnownBroadcastSources(HashMap<Handle, BroadcastReceiveState>);
39
40impl KnownBroadcastSources {
41    fn new(receive_states: HashMap<Handle, BroadcastReceiveState>) -> Self {
42        KnownBroadcastSources(receive_states)
43    }
44
45    /// Updates the value of the specified broadcast receive state
46    /// characteristic. Returns the old value if it existed.
47    fn update_state(
48        &mut self,
49        key: Handle,
50        value: BroadcastReceiveState,
51    ) -> Option<BroadcastReceiveState> {
52        self.0.insert(key, value)
53    }
54
55    /// Given the broadcast ID, find the corresponding source ID.
56    /// Returns none if the server doesn't know the specified broadcast source
57    /// because a) the broadcast source was never added or discovered; or,
58    /// b) the broadcast source was removed from remove operation; or,
59    /// c) the broadcast source was removed by the server to add a different
60    ///    broadcast source.
61    fn source_id(&self, broadcast_id: &BroadcastId) -> Option<SourceId> {
62        let Some(state) = self.state(broadcast_id) else {
63            return None;
64        };
65        Some(state.source_id)
66    }
67
68    /// Gets the last updated broadcast receive state value.
69    /// Returns none if the server doesn't know the specified broadcast source.
70    fn state(&self, broadcast_id: &BroadcastId) -> Option<&ReceiveState> {
71        self.0.iter().find_map(|(&_k, &ref v)| match v {
72            BroadcastReceiveState::Empty => None,
73            BroadcastReceiveState::NonEmpty(rs) => {
74                if rs.broadcast_id() == *broadcast_id {
75                    return Some(rs);
76                }
77                None
78            }
79        })
80    }
81}
82
83/// Manages connection to the Broadcast Audio Scan Service at the
84/// remote Scan Delegator and writes/reads characteristics to/from it.
85pub struct BroadcastAudioScanServiceClient<T: bt_gatt::GattTypes> {
86    gatt_client: T::PeerService,
87    /// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control
88    /// Point characteristic according to BASS Section 3. There shall
89    /// be one or more Broadcast Receive State characteristics.
90    audio_scan_control_point: Handle,
91    /// Broadcast Receive State characteristics can be used to determine the
92    /// BASS status.
93    broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
94    /// Keeps track of the broadcast codes that were sent to the remote BASS
95    /// server.
96    broadcast_codes: Arc<Mutex<HashMap<SourceId, [u8; 16]>>>,
97    // GATT notification streams for BRS characteristic value changes.
98    notification_streams: Option<
99        SelectAll<BoxStream<'static, Result<CharacteristicNotification, bt_gatt::types::Error>>>,
100    >,
101}
102
103impl<T: bt_gatt::GattTypes> BroadcastAudioScanServiceClient<T> {
104    #[cfg(any(test, feature = "test-utils"))]
105    pub fn create_for_test(gatt_client: T::PeerService, audio_scan_control_point: Handle) -> Self {
106        Self {
107            gatt_client,
108            audio_scan_control_point,
109            broadcast_sources: Default::default(),
110            broadcast_codes: Arc::new(Mutex::new(HashMap::new())),
111            notification_streams: Some(SelectAll::new()),
112        }
113    }
114
115    pub async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
116    where
117        <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
118    {
119        // BASS server should have a single Broadcast Audio Scan Control Point
120        // Characteristic.
121        let bascp =
122            ServiceCharacteristic::<T>::find(&gatt_client, BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID)
123                .await
124                .map_err(|e| Error::Gatt(e))?;
125        if bascp.len() != 1 {
126            let err = if bascp.len() == 0 {
127                Error::Service(ServiceError::MissingCharacteristic)
128            } else {
129                Error::Service(ServiceError::ExtraScanControlPointCharacteristic)
130            };
131            return Err(err);
132        }
133        let bascp_handle = *bascp[0].handle();
134        let brs_chars = Self::discover_brs_characteristics(&gatt_client).await?;
135        let mut c = Self {
136            gatt_client,
137            audio_scan_control_point: bascp_handle,
138            broadcast_sources: Arc::new(Mutex::new(KnownBroadcastSources::new(brs_chars))),
139            broadcast_codes: Arc::new(Mutex::new(HashMap::new())),
140            notification_streams: None,
141        };
142        c.register_notifications();
143        Ok(c)
144    }
145
146    // Discover all the Broadcast Receive State characteristics.
147    // On success, returns the HashMap of all Broadcast Received State
148    // Characteristics.
149    async fn discover_brs_characteristics(
150        gatt_client: &T::PeerService,
151    ) -> Result<HashMap<Handle, BroadcastReceiveState>, Error> {
152        let brs = ServiceCharacteristic::<T>::find(gatt_client, BROADCAST_RECEIVE_STATE_UUID)
153            .await
154            .map_err(|e| Error::Gatt(e))?;
155        if brs.len() == 0 {
156            return Err(Error::Service(ServiceError::MissingCharacteristic));
157        }
158        let mut brs_map = HashMap::new();
159        for c in brs {
160            // Read the value of the Broadcast Recieve State at the time of discovery for
161            // record.
162            let mut buf = vec![0; READ_CHARACTERISTIC_BUFFER_SIZE];
163            match c.read(&mut buf[..]).await {
164                Ok(read_bytes) => match BroadcastReceiveState::decode(&buf[0..read_bytes]).0 {
165                    Ok(decoded) => {
166                        brs_map.insert(*c.handle(), decoded);
167                        continue;
168                    }
169                    Err(e) => warn!(
170                        "Failed to decode characteristic ({:?}) to Broadcast Receive State value: {:?}",
171                        *c.handle(),
172                        e
173                    ),
174                },
175                Err(e) => warn!("Failed to read characteristic ({:?}) value: {:?}", *c.handle(), e),
176            }
177            brs_map.insert(*c.handle(), BroadcastReceiveState::Empty);
178        }
179        Ok(brs_map)
180    }
181
182    fn register_notifications(&mut self)
183    where
184        <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
185    {
186        let mut notification_streams = SelectAll::new();
187        {
188            let lock = self.broadcast_sources.lock();
189            for handle in lock.0.keys() {
190                let stream = self.gatt_client.subscribe(&handle);
191                notification_streams.push(stream.boxed());
192            }
193        }
194        self.notification_streams = Some(notification_streams);
195    }
196
197    /// Returns a stream that can be used by the upper layer to poll for
198    /// BroadcastAudioScanServiceEvent. BroadcastAudioScanServiceEvents are
199    /// generated based on BRS characteristic change received from GATT
200    /// notification that are processed by BroadcastAudioScanServiceClient.
201    /// This method should only be called once.
202    /// Returns an error if the method is called for a second time.
203    pub fn take_event_stream(
204        &mut self,
205    ) -> Option<impl Stream<Item = Result<Event, Error>> + FusedStream> {
206        let notification_streams = self.notification_streams.take();
207        let Some(streams) = notification_streams else {
208            return None;
209        };
210        let event_stream = EventStream::new(streams, self.broadcast_sources.clone());
211        Some(event_stream)
212    }
213
214    /// Write to the Broadcast Audio Scan Control Point characteristic in
215    /// without response mode.
216    fn write_to_bascp(
217        &self,
218        op: impl ControlPointOperation,
219    ) -> impl Future<Output = Result<(), Error>> + '_ {
220        let handle = self.audio_scan_control_point;
221        let mut buf = vec![0; op.encoded_len()];
222        let encode_res = op.encode(&mut buf[..]);
223        async move {
224            match encode_res {
225                Err(e) => Err(Error::Packet(e)),
226                Ok(_) => self
227                    .gatt_client
228                    .write_characteristic(&handle, WriteMode::WithoutResponse, 0, buf.as_slice())
229                    .await
230                    .map_err(|e| Error::Gatt(e)),
231            }
232        }
233    }
234
235    fn get_source_id(&self, broadcast_id: &BroadcastId) -> Result<SourceId, Error> {
236        self.broadcast_sources
237            .lock()
238            .source_id(broadcast_id)
239            .ok_or(Error::UnknownBroadcastSource(*broadcast_id))
240    }
241
242    /// Returns a clone of the latest known broadcast audio receive state of the
243    /// specified broadcast source given its broadcast id.
244    fn get_broadcast_source_state(&self, broadcast_id: &BroadcastId) -> Option<ReceiveState> {
245        let lock = self.broadcast_sources.lock();
246        lock.state(broadcast_id).clone().map(|rs| rs.clone())
247    }
248
249    /// Indicates to the remote BASS server that we have started scanning for
250    /// broadcast sources on behalf of it. If the scan delegator that serves
251    /// the BASS server is collocated with a broadcast sink, this may or may
252    /// not change the scanning behaviour of the the broadcast sink.
253    pub async fn remote_scan_started(&self) -> Result<(), Error> {
254        let op = RemoteScanStartedOperation;
255        self.write_to_bascp(op).await
256    }
257
258    /// Indicates to the remote BASS server that we have stopped scanning for
259    /// broadcast sources on behalf of it.
260    pub async fn remote_scan_stopped(&self) -> Result<(), Error> {
261        let op = RemoteScanStoppedOperation;
262        self.write_to_bascp(op).await
263    }
264
265    /// Provides the BASS server with information regarding a Broadcast Source.
266    pub async fn add_broadcast_source(
267        &self,
268        broadcast_id: BroadcastId,
269        address_type: AddressType,
270        advertiser_address: [u8; ADDRESS_BYTE_SIZE],
271        sid: AdvertisingSetId,
272        pa_sync: PaSync,
273        pa_interval: PeriodicAdvertisingInterval,
274        subgroups: Vec<BigSubgroup>,
275    ) -> Result<(), Error> {
276        let op = AddSourceOperation::new(
277            address_type,
278            advertiser_address,
279            sid,
280            broadcast_id,
281            pa_sync,
282            pa_interval,
283            subgroups,
284        );
285        self.write_to_bascp(op).await
286    }
287
288    /// Requests the BASS server to add or update Metadata for the Broadcast
289    /// Source, and/or to request the server to synchronize to, or to stop
290    /// synchronization to, a PA and/or a BIS.
291    ///
292    /// # Arguments
293    ///
294    /// * `broadcast_id` - id of the broadcast source to modify
295    /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
296    ///   in
297    /// * `pa_interval` - updated PA interval value. If none, unknown value is
298    ///   used
299    /// * `bis_map` - desired BIG to BIS synchronization update information. If
300    ///   a BIG does not exist as a key, sync for that BIG is not updated.
301    /// * `metadata_map` - map of updated metadata for BIGs. If a mapping does
302    ///   not exist for a BIG, that BIG's metadata is not updated
303    pub async fn modify_broadcast_source(
304        &self,
305        broadcast_id: BroadcastId,
306        pa_sync: PaSync,
307        pa_interval: Option<PeriodicAdvertisingInterval>,
308        bis_map: HashMap<SubgroupIndex, BisSync>,
309        metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
310    ) -> Result<(), Error> {
311        let op = {
312            let mut state = self
313                .get_broadcast_source_state(&broadcast_id)
314                .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
315
316            // Update BIS_Sync param for BIGs if applicable.
317            for (big_index, group) in state.subgroups.iter_mut().enumerate() {
318                if let Some(bis_sync) = bis_map.get(&(big_index as u8)) {
319                    group.bis_sync = bis_sync.clone();
320                }
321            }
322
323            // Update metadata for BIGs if applicable.
324            if let Some(mut m) = metadata_map {
325                for (big_index, group) in state.subgroups.iter_mut().enumerate() {
326                    if let Some(metadata) = m.remove(&(big_index as u8)) {
327                        group.metadata = metadata;
328                    }
329                }
330
331                // Left over metadata values are new subgroups that are to be added. New
332                // subgroups can only be added if the subgroup index is
333                // contiguous to existing subgroups.
334                let mut new_big_indices: Vec<&u8> = m.keys().collect();
335                new_big_indices.sort();
336                for big_index in new_big_indices {
337                    if (*big_index as usize) != state.subgroups.len() {
338                        warn!("cannot add new [{big_index}th] subgroup");
339                        break;
340                    }
341                    let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone());
342                    state.subgroups.push(new_subgroup);
343                }
344            }
345
346            ModifySourceOperation::new(
347                state.source_id,
348                pa_sync,
349                pa_interval.unwrap_or(PeriodicAdvertisingInterval::unknown()),
350                state.subgroups,
351            )
352        };
353        self.write_to_bascp(op).await
354    }
355
356    pub async fn remove_broadcast_source(&self, broadcast_id: BroadcastId) -> Result<(), Error> {
357        let source_id = self.get_source_id(&broadcast_id)?;
358
359        let op = RemoveSourceOperation::new(source_id);
360        self.write_to_bascp(op).await
361    }
362
363    /// Sets the broadcast code for a particular broadcast stream.
364    pub async fn set_broadcast_code(
365        &self,
366        broadcast_id: BroadcastId,
367        broadcast_code: [u8; 16],
368    ) -> Result<(), Error> {
369        let source_id = self.get_source_id(&broadcast_id)?;
370
371        let op = SetBroadcastCodeOperation::new(source_id, broadcast_code.clone());
372        self.write_to_bascp(op).await?;
373
374        // Save the broadcast code we sent.
375        self.broadcast_codes.lock().insert(source_id, broadcast_code);
376        Ok(())
377    }
378
379    /// Returns a list of currently known broadcast sources at the time
380    /// this method was called.
381    pub fn known_broadcast_sources(&self) -> Vec<(Handle, BroadcastReceiveState)> {
382        let lock = self.broadcast_sources.lock();
383        let mut brs = Vec::new();
384        for (k, v) in lock.0.iter() {
385            brs.push((*k, v.clone()));
386        }
387        brs
388    }
389
390    #[cfg(any(test, feature = "test-utils"))]
391    pub fn insert_broadcast_receive_state(&mut self, handle: Handle, brs: BroadcastReceiveState) {
392        self.broadcast_sources.lock().update_state(handle, brs);
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    use std::task::Poll;
401
402    use assert_matches::assert_matches;
403    use futures::executor::block_on;
404    use futures::{pin_mut, FutureExt};
405
406    use bt_common::core::AdvertisingSetId;
407    use bt_common::Uuid;
408    use bt_gatt::test_utils::*;
409    use bt_gatt::types::{
410        AttributePermissions, CharacteristicProperties, CharacteristicProperty, Handle,
411    };
412    use bt_gatt::Characteristic;
413
414    const RECEIVE_STATE_1_HANDLE: Handle = Handle(1);
415    const RECEIVE_STATE_2_HANDLE: Handle = Handle(2);
416    const RECEIVE_STATE_3_HANDLE: Handle = Handle(3);
417    const RANDOME_CHAR_HANDLE: Handle = Handle(4);
418    const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(5);
419
420    fn setup_client() -> (BroadcastAudioScanServiceClient<FakeTypes>, FakePeerService) {
421        let mut fake_peer_service = FakePeerService::new();
422        // Add 3 Broadcast Receive State Characteristics, 1 Broadcast Audio Scan Control
423        // Point Characteristic, and 1 random one.
424        fake_peer_service.add_characteristic(
425            Characteristic {
426                handle: RECEIVE_STATE_1_HANDLE,
427                uuid: BROADCAST_RECEIVE_STATE_UUID,
428                properties: CharacteristicProperties(vec![
429                    CharacteristicProperty::Broadcast,
430                    CharacteristicProperty::Notify,
431                ]),
432                permissions: AttributePermissions::default(),
433                descriptors: vec![],
434            },
435            vec![],
436        );
437        fake_peer_service.add_characteristic(
438            Characteristic {
439                handle: RECEIVE_STATE_2_HANDLE,
440                uuid: BROADCAST_RECEIVE_STATE_UUID,
441                properties: CharacteristicProperties(vec![
442                    CharacteristicProperty::Broadcast,
443                    CharacteristicProperty::Notify,
444                ]),
445                permissions: AttributePermissions::default(),
446                descriptors: vec![],
447            },
448            vec![],
449        );
450        fake_peer_service.add_characteristic(
451            Characteristic {
452                handle: RECEIVE_STATE_3_HANDLE,
453                uuid: BROADCAST_RECEIVE_STATE_UUID,
454                properties: CharacteristicProperties(vec![
455                    CharacteristicProperty::Broadcast,
456                    CharacteristicProperty::Notify,
457                ]),
458                permissions: AttributePermissions::default(),
459                descriptors: vec![],
460            },
461            vec![],
462        );
463        fake_peer_service.add_characteristic(
464            Characteristic {
465                handle: RANDOME_CHAR_HANDLE,
466                uuid: Uuid::from_u16(0x1234),
467                properties: CharacteristicProperties(vec![CharacteristicProperty::Notify]),
468                permissions: AttributePermissions::default(),
469                descriptors: vec![],
470            },
471            vec![],
472        );
473        fake_peer_service.add_characteristic(
474            Characteristic {
475                handle: AUDIO_SCAN_CONTROL_POINT_HANDLE,
476                uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
477                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
478                permissions: AttributePermissions::default(),
479                descriptors: vec![],
480            },
481            vec![],
482        );
483
484        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
485        let create_result =
486            BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
487        pin_mut!(create_result);
488        let polled = create_result.poll_unpin(&mut noop_cx);
489        let Poll::Ready(Ok(client)) = polled else {
490            panic!("Expected BroadcastAudioScanServiceClient to be succesfully created");
491        };
492
493        (client, fake_peer_service)
494    }
495
496    #[test]
497    fn create_client() {
498        let (client, _) = setup_client();
499
500        // Check that all the characteristics have been discovered.
501        assert_eq!(client.audio_scan_control_point, AUDIO_SCAN_CONTROL_POINT_HANDLE);
502        let broadcast_sources = client.known_broadcast_sources();
503        assert_eq!(broadcast_sources.len(), 3);
504        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_1_HANDLE).is_some());
505        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_2_HANDLE).is_some());
506        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_3_HANDLE).is_some());
507    }
508
509    #[test]
510    fn create_client_fails_missing_characteristics() {
511        // Missing scan control point characteristic.
512        let mut fake_peer_service = FakePeerService::new();
513        fake_peer_service.add_characteristic(
514            Characteristic {
515                handle: Handle(1),
516                uuid: BROADCAST_RECEIVE_STATE_UUID,
517                properties: CharacteristicProperties(vec![
518                    CharacteristicProperty::Broadcast,
519                    CharacteristicProperty::Notify,
520                ]),
521                permissions: AttributePermissions::default(),
522                descriptors: vec![],
523            },
524            vec![],
525        );
526
527        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
528        let create_result =
529            BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
530
531        pin_mut!(create_result);
532        let polled = create_result.poll_unpin(&mut noop_cx);
533        let Poll::Ready(Err(_)) = polled else {
534            panic!("Expected BroadcastAudioScanServiceClient to have failed");
535        };
536
537        // Missing receive state characteristic.
538        let mut fake_peer_service: FakePeerService = FakePeerService::new();
539        fake_peer_service.add_characteristic(
540            Characteristic {
541                handle: Handle(1),
542                uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
543                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
544                permissions: AttributePermissions::default(),
545                descriptors: vec![],
546            },
547            vec![],
548        );
549
550        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
551        let create_result =
552            BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
553        pin_mut!(create_result);
554        let polled = create_result.poll_unpin(&mut noop_cx);
555        let Poll::Ready(Err(_)) = polled else {
556            panic!("Expected BroadcastAudioScanServiceClient to have failed");
557        };
558    }
559
560    #[test]
561    fn create_client_fails_duplicate_characteristics() {
562        // More than one scan control point characteristics.
563        let mut fake_peer_service = FakePeerService::new();
564        fake_peer_service.add_characteristic(
565            Characteristic {
566                handle: Handle(1),
567                uuid: BROADCAST_RECEIVE_STATE_UUID,
568                properties: CharacteristicProperties(vec![
569                    CharacteristicProperty::Broadcast,
570                    CharacteristicProperty::Notify,
571                ]),
572                permissions: AttributePermissions::default(),
573                descriptors: vec![],
574            },
575            vec![],
576        );
577        fake_peer_service.add_characteristic(
578            Characteristic {
579                handle: Handle(2),
580                uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
581                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
582                permissions: AttributePermissions::default(),
583                descriptors: vec![],
584            },
585            vec![],
586        );
587        fake_peer_service.add_characteristic(
588            Characteristic {
589                handle: Handle(3),
590                uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
591                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
592                permissions: AttributePermissions::default(),
593                descriptors: vec![],
594            },
595            vec![],
596        );
597
598        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
599        let create_result =
600            BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
601        pin_mut!(create_result);
602        let polled = create_result.poll_unpin(&mut noop_cx);
603        let Poll::Ready(Err(_)) = polled else {
604            panic!("Expected BroadcastAudioScanServiceClient to have failed");
605        };
606    }
607
608    #[test]
609    fn start_event_stream() {
610        let (mut client, mut fake_peer_service) = setup_client();
611        let mut event_stream = client.take_event_stream().expect("stream was created");
612
613        // Send notification for updating BRS characteristic to indicate it's synced and
614        // requires broadcast code.
615        #[rustfmt::skip]
616        fake_peer_service.add_characteristic(
617            Characteristic {
618                handle: RECEIVE_STATE_2_HANDLE,
619                uuid: BROADCAST_RECEIVE_STATE_UUID,
620                properties: CharacteristicProperties(vec![
621                    CharacteristicProperty::Broadcast,
622                    CharacteristicProperty::Notify,
623                ]),
624                permissions: AttributePermissions::default(),
625                descriptors: vec![],
626            },
627            vec![
628                0x02, AddressType::Public as u8,                      // source id and address type
629                0x02, 0x03, 0x04, 0x05, 0x06, 0x07,                   // address
630                0x01, 0x02, 0x03, 0x04,                               // ad set id and broadcast id
631                PaSyncState::Synced as u8,
632                EncryptionStatus::BroadcastCodeRequired.raw_value(),
633                0x00,                                                 // no subgroups
634            ],
635        );
636
637        // Check that synced and broadcast code required events were sent out.
638        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
639
640        let recv_fut = event_stream.select_next_some();
641        let event = block_on(recv_fut).expect("should receive event");
642        assert_eq!(
643            event,
644            Event::AddedBroadcastSource(
645                BroadcastId::try_from(0x040302).unwrap(),
646                PaSyncState::Synced,
647                EncryptionStatus::BroadcastCodeRequired
648            )
649        );
650
651        // Stream should be pending since no more notifications.
652        assert!(event_stream.poll_next_unpin(&mut noop_cx).is_pending());
653
654        // Send notification for updating BRS characteristic to indicate it requires
655        // sync info. Notification for updating the BRS characteristic value for
656        // characteristic with handle 3.
657        #[rustfmt::skip]
658        fake_peer_service.add_characteristic(
659            Characteristic {
660                handle: RECEIVE_STATE_3_HANDLE,
661                uuid: BROADCAST_RECEIVE_STATE_UUID,
662                properties: CharacteristicProperties(vec![
663                    CharacteristicProperty::Broadcast,
664                    CharacteristicProperty::Notify,
665                ]),
666                permissions: AttributePermissions::default(),
667                descriptors: vec![],
668            },
669            vec![
670                0x03, AddressType::Public as u8,             // source id and address type
671                0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
672                0x01, 0x03, 0x04, 0x05,                      // ad set id and broadcast id
673                PaSyncState::SyncInfoRequest as u8,
674                EncryptionStatus::NotEncrypted.raw_value(),
675                0x00,                                        // no subgroups
676            ],
677        );
678
679        let recv_fut = event_stream.select_next_some();
680        let event = block_on(recv_fut).expect("should receive event");
681        assert_eq!(
682            event,
683            Event::AddedBroadcastSource(
684                BroadcastId::try_from(0x050403).unwrap(),
685                PaSyncState::SyncInfoRequest,
686                EncryptionStatus::NotEncrypted
687            )
688        );
689
690        // Stream should be pending since no more notifications.
691        assert!(event_stream.poll_next_unpin(&mut noop_cx).is_pending());
692    }
693
694    #[test]
695    fn remote_scan_started() {
696        let (client, mut fake_peer_service) = setup_client();
697
698        fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x01]);
699
700        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
701        let op_fut = client.remote_scan_started();
702        pin_mut!(op_fut);
703        let polled = op_fut.poll_unpin(&mut noop_cx);
704        assert_matches!(polled, Poll::Ready(Ok(_)));
705    }
706
707    #[test]
708    fn remote_scan_stopped() {
709        let (client, mut fake_peer_service) = setup_client();
710
711        fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x00]);
712
713        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
714        let op_fut = client.remote_scan_stopped();
715        pin_mut!(op_fut);
716        let polled = op_fut.poll_unpin(&mut noop_cx);
717        assert_matches!(polled, Poll::Ready(Ok(_)));
718    }
719
720    #[test]
721    fn add_broadcast_source() {
722        let (client, mut fake_peer_service) = setup_client();
723
724        fake_peer_service.expect_characteristic_value(
725            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
726            vec![
727                0x02, 0x00, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x00, 0xFF,
728                0xFF, 0x00,
729            ],
730        );
731
732        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
733        let op_fut = client.add_broadcast_source(
734            BroadcastId::try_from(0x11).unwrap(),
735            AddressType::Public,
736            [0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
737            AdvertisingSetId(1),
738            PaSync::DoNotSync,
739            PeriodicAdvertisingInterval::unknown(),
740            vec![],
741        );
742        pin_mut!(op_fut);
743        let polled = op_fut.poll_unpin(&mut noop_cx);
744        assert_matches!(polled, Poll::Ready(Ok(_)));
745    }
746
747    #[test]
748    fn modify_broadcast_source() {
749        let (client, mut fake_peer_service) = setup_client();
750
751        // Manually update the broadcast source tracker for testing purposes.
752        // In practice, this would have been updated from BRS value change notification.
753        client.broadcast_sources.lock().update_state(
754            RECEIVE_STATE_1_HANDLE,
755            BroadcastReceiveState::NonEmpty(ReceiveState {
756                source_id: 0x11,
757                source_address_type: AddressType::Public,
758                source_address: [1, 2, 3, 4, 5, 6],
759                source_adv_sid: AdvertisingSetId(1),
760                broadcast_id: BroadcastId::try_from(0x11).unwrap(),
761                pa_sync_state: PaSyncState::Synced,
762                big_encryption: EncryptionStatus::BroadcastCodeRequired,
763                subgroups: vec![],
764            }),
765        );
766
767        #[rustfmt::skip]
768        fake_peer_service.expect_characteristic_value(
769            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
770            vec![
771                0x03, 0x11, 0x00,  // opcode, source id, pa sync
772                0xAA, 0xAA, 0x00,  // pa sync, pa interval, num of subgroups
773            ],
774        );
775
776        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
777        let op_fut = client.modify_broadcast_source(
778            BroadcastId::try_from(0x11).unwrap(),
779            PaSync::DoNotSync,
780            Some(PeriodicAdvertisingInterval(0xAAAA)),
781            HashMap::new(),
782            None,
783        );
784        pin_mut!(op_fut);
785        let polled = op_fut.poll_unpin(&mut noop_cx);
786        assert_matches!(polled, Poll::Ready(Ok(_)));
787    }
788
789    #[test]
790    fn modify_broadcast_source_updates_groups() {
791        let (client, mut fake_peer_service) = setup_client();
792
793        // Manually update the broadcast source tracker for testing purposes.
794        // In practice, this would have been updated from BRS value change notification.
795        client.broadcast_sources.lock().update_state(
796            RECEIVE_STATE_1_HANDLE,
797            BroadcastReceiveState::NonEmpty(ReceiveState {
798                source_id: 0x11,
799                source_address_type: AddressType::Public,
800                source_address: [1, 2, 3, 4, 5, 6],
801                source_adv_sid: AdvertisingSetId(1),
802                broadcast_id: BroadcastId::try_from(0x11).unwrap(),
803                pa_sync_state: PaSyncState::Synced,
804                big_encryption: EncryptionStatus::BroadcastCodeRequired,
805                subgroups: vec![BigSubgroup::new(None)],
806            }),
807        );
808
809        // Default PA interval value and subgroups value read from the BRS
810        // characteristic are used.
811        #[rustfmt::skip]
812        fake_peer_service.expect_characteristic_value(
813            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
814            vec![
815                0x03, 0x11, 0x00,                    // opcode, source id, pa sync
816                0xFF, 0xFF, 0x02,                    // pa sync, pa interval, num of subgroups
817                0x15, 0x00, 0x00, 0x00,              // bis sync (0th subgroup)
818                0x02, 0x01, 0x09,                    // metadata len, metadata
819                0xFF, 0xFF, 0xFF, 0xFF,              // bis sync (1th subgroup)
820                0x05, 0x04, 0x04, 0x65, 0x6E, 0x67,  // metadata len, metadata
821            ],
822        );
823
824        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
825        let op_fut = client.modify_broadcast_source(
826            BroadcastId::try_from(0x11).unwrap(),
827            PaSync::DoNotSync,
828            None,
829            HashMap::from([(0, BisSync::sync(vec![1, 3, 5]).unwrap())]),
830            Some(HashMap::from([
831                (0, vec![Metadata::BroadcastAudioImmediateRenderingFlag]),
832                (1, vec![Metadata::Language("eng".to_string())]),
833                (5, vec![Metadata::ProgramInfoURI("this subgroup shouldn't be added".to_string())]),
834            ])),
835        );
836        pin_mut!(op_fut);
837        let polled: Poll<Result<(), Error>> = op_fut.poll_unpin(&mut noop_cx);
838        assert_matches!(polled, Poll::Ready(Ok(_)));
839    }
840
841    #[test]
842    fn modify_broadcast_source_fail() {
843        let (client, _fake_peer_service) = setup_client();
844
845        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
846        // Broadcast source wasn't previously added.
847        let op_fut = client.modify_broadcast_source(
848            BroadcastId::try_from(0x11).unwrap(),
849            PaSync::DoNotSync,
850            None,
851            HashMap::new(),
852            None,
853        );
854        pin_mut!(op_fut);
855        let polled = op_fut.poll_unpin(&mut noop_cx);
856        assert_matches!(polled, Poll::Ready(Err(_)));
857    }
858
859    #[test]
860    fn remove_broadcast_source() {
861        let (client, mut fake_peer_service) = setup_client();
862        let bid = BroadcastId::try_from(0x11).expect("should not fail");
863
864        // Manually update the broadcast source tracker for testing purposes.
865        // In practice, this would have been updated from BRS value change notification.
866        client.broadcast_sources.lock().update_state(
867            RECEIVE_STATE_1_HANDLE,
868            BroadcastReceiveState::NonEmpty(ReceiveState {
869                source_id: 0x11,
870                source_address_type: AddressType::Public,
871                source_address: [1, 2, 3, 4, 5, 6],
872                source_adv_sid: AdvertisingSetId(1),
873                broadcast_id: bid,
874                pa_sync_state: PaSyncState::Synced,
875                big_encryption: EncryptionStatus::BroadcastCodeRequired,
876                subgroups: vec![],
877            }),
878        );
879
880        fake_peer_service
881            .expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x05, 0x11]);
882
883        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
884        // Broadcast source wasn't previously added.
885        let op_fut = client.remove_broadcast_source(bid);
886        pin_mut!(op_fut);
887        let polled = op_fut.poll_unpin(&mut noop_cx);
888        assert_matches!(polled, Poll::Ready(Ok(_)));
889    }
890
891    #[test]
892    fn remove_broadcast_source_fail() {
893        let (client, _fake_peer_service) = setup_client();
894
895        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
896        // Broadcast source wasn't previously added.
897        let op_fut = client.remove_broadcast_source(BroadcastId::try_from(0x11).unwrap());
898        pin_mut!(op_fut);
899        let polled = op_fut.poll_unpin(&mut noop_cx);
900        assert_matches!(polled, Poll::Ready(Err(_)));
901    }
902
903    #[test]
904    fn set_broadcast_code() {
905        let (client, mut fake_peer_service) = setup_client();
906
907        // Manually update the broadcast source tracker for testing purposes.
908        // In practice, this would have been updated from BRS value change notification.
909        client.broadcast_sources.lock().update_state(
910            RECEIVE_STATE_1_HANDLE,
911            BroadcastReceiveState::NonEmpty(ReceiveState {
912                source_id: 0x01,
913                source_address_type: AddressType::Public,
914                source_address: [1, 2, 3, 4, 5, 6],
915                source_adv_sid: AdvertisingSetId(1),
916                broadcast_id: BroadcastId::try_from(0x030201).unwrap(),
917                pa_sync_state: PaSyncState::Synced,
918                big_encryption: EncryptionStatus::BroadcastCodeRequired,
919                subgroups: vec![],
920            }),
921        );
922
923        fake_peer_service.expect_characteristic_value(
924            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
925            vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
926        );
927
928        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
929        let set_code_fut =
930            client.set_broadcast_code(BroadcastId::try_from(0x030201).unwrap(), [1; 16]);
931        pin_mut!(set_code_fut);
932        let polled = set_code_fut.poll_unpin(&mut noop_cx);
933        assert_matches!(polled, Poll::Ready(Ok(_)));
934    }
935
936    #[test]
937    fn set_broadcast_code_fails() {
938        let (client, _) = setup_client();
939
940        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
941        let set_code_fut =
942            client.set_broadcast_code(BroadcastId::try_from(0x030201).unwrap(), [1; 16]);
943        pin_mut!(set_code_fut);
944        let polled = set_code_fut.poll_unpin(&mut noop_cx);
945
946        // Should fail because we cannot get source id for the broadcast id since BRS
947        // Characteristic value wasn't updated.
948        assert_matches!(polled, Poll::Ready(Err(_)));
949    }
950}