Skip to main content

bt_bass/client/
event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7use std::task::Poll;
8
9use futures::stream::{BoxStream, FusedStream, SelectAll};
10use futures::{Stream, StreamExt};
11use parking_lot::Mutex;
12
13use bt_bap::types::BroadcastId;
14use bt_common::packet_encoding::Decodable;
15use bt_gatt::client::CharacteristicNotification;
16use bt_gatt::types::Error as BtGattError;
17
18use crate::client::error::Error;
19use crate::client::error::ServiceError;
20use crate::client::KnownBroadcastSources;
21use crate::types::*;
22
23#[derive(Clone, Debug, PartialEq)]
24pub enum Event {
25    // Broadcast Audio Scan Service (BASS) server requested for SyncInfo through PAST procedure.
26    SyncInfoRequested(BroadcastId),
27    // BASS server failed to synchornize to PA or did not synchronize to PA.
28    NotSyncedToPa(BroadcastId),
29    // BASS server successfully synced to PA.
30    SyncedToPa(BroadcastId),
31    // BASS server failed to sync to PA since SyncInfo wasn't received.
32    SyncedFailedNoPast(BroadcastId),
33    // BASS server requires code to since the BIS is encrypted.
34    BroadcastCodeRequired(BroadcastId),
35    // BASS server failed to decrypt BIS using the previously provided code.
36    InvalidBroadcastCode(BroadcastId, [u8; 16]),
37    // BASS server has autonomously synchronized to a BIS that is encrypted, and the server
38    // has the correct encryption key to decrypt the BIS.
39    Decrypting(BroadcastId),
40    // Received a packet from the BASS server not recognized by this library.
41    UnknownPacket,
42    // Broadcast source was removed by the BASS server.
43    RemovedBroadcastSource(BroadcastId),
44    // Broadcast source was added by the BASS server.
45    AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
46}
47
48impl Event {
49    pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
50        let mut events = Vec::new();
51        let pa_sync_state = state.pa_sync_state();
52        let broadcast_id = state.broadcast_id();
53        match pa_sync_state {
54            PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
55            PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
56            PaSyncState::FailedToSync | PaSyncState::NotSynced => {
57                events.push(Event::NotSyncedToPa(broadcast_id))
58            }
59            PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
60        }
61        match state.big_encryption() {
62            EncryptionStatus::BroadcastCodeRequired => {
63                events.push(Event::BroadcastCodeRequired(broadcast_id))
64            }
65            EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
66            EncryptionStatus::BadCode(code) => {
67                events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
68            }
69            _ => {}
70        };
71        events
72    }
73}
74
75/// Trait for representing a stream that outputs Events from BASS. If there was
76/// an error the stream should output error instead and terminate.
77
78pub struct EventStream {
79    // Actual GATT notification streams that we poll from.
80    notification_streams:
81        SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
82
83    event_queue: VecDeque<Result<Event, Error>>,
84    terminated: bool,
85
86    // States to be updated.
87    broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
88}
89
90impl EventStream {
91    pub(crate) fn new(
92        notification_streams: SelectAll<
93            BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
94        >,
95        broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
96    ) -> Self {
97        Self {
98            notification_streams,
99            event_queue: VecDeque::new(),
100            terminated: false,
101            broadcast_sources,
102        }
103    }
104}
105
106impl FusedStream for EventStream {
107    fn is_terminated(&self) -> bool {
108        self.terminated
109    }
110}
111
112impl Stream for EventStream {
113    type Item = Result<Event, Error>;
114
115    fn poll_next(
116        mut self: std::pin::Pin<&mut Self>,
117        cx: &mut std::task::Context<'_>,
118    ) -> Poll<Option<Self::Item>> {
119        if self.terminated {
120            return Poll::Ready(None);
121        }
122
123        loop {
124            if let Some(item) = self.event_queue.pop_front() {
125                // An error from a single stream will be reported, but the main EventStream
126                // will continue for other sources.
127                return Poll::Ready(Some(item));
128            }
129
130            let Some(item) = futures::ready!(self.notification_streams.poll_next_unpin(cx)) else {
131                // All notification streams have been closed. Terminate the EventStream.
132                self.terminated = true;
133                let err = Error::EventStream(Box::new(Error::Service(
134                    ServiceError::NotificationChannelClosed(format!(
135                        "All BASS GATT notification streams closed"
136                    )),
137                )));
138                return Poll::Ready(Some(Err(err)));
139            };
140
141            // One of the notification streams produced an error. Report it, but do not
142            // terminate. SelectAll will remove the faulty stream from its set.
143            let Ok(notification) = item else {
144                let err = Error::EventStream(Box::new(Error::Gatt(item.unwrap_err())));
145                return Poll::Ready(Some(Err(err)));
146            };
147
148            let char_handle = notification.handle;
149            let (Ok(new_state), _) = BroadcastReceiveState::decode(notification.value.as_slice())
150            else {
151                self.event_queue.push_back(Ok(Event::UnknownPacket));
152                continue;
153            };
154
155            let maybe_prev_state = {
156                let mut lock = self.broadcast_sources.lock();
157                lock.update_state(char_handle, new_state.clone())
158            };
159
160            // If the previous value was not empty, check if it was overwritten.
161            if let Some(ref prev_state) = maybe_prev_state {
162                if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
163                    if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state) {
164                        self.event_queue.push_back(Ok(Event::RemovedBroadcastSource(
165                            prev_receive_state.broadcast_id,
166                        )));
167                    }
168                }
169            }
170
171            // BRS characteristic value was updated with a new broadcast source
172            // information.
173            if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
174                let is_new_source = match maybe_prev_state {
175                    Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
176                    None => true,
177                };
178                if is_new_source {
179                    self.event_queue.push_back(Ok(Event::AddedBroadcastSource(
180                        receive_state.broadcast_id,
181                        receive_state.pa_sync_state,
182                        receive_state.big_encryption,
183                    )));
184                } else {
185                    let other_events = Event::from_broadcast_receive_state(&receive_state);
186                    for e in other_events.into_iter() {
187                        self.event_queue.push_back(Ok(e));
188                    }
189                }
190            }
191            // Continue to the top of the loop to start draining the event_queue.
192            continue;
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    use std::collections::HashMap;
202
203    use assert_matches::assert_matches;
204    use futures::channel::mpsc::unbounded;
205
206    use bt_common::core::AddressType;
207    use bt_gatt::types::{Error as BtGattError, GattError, Handle};
208
209    #[test]
210    fn poll_event_stream() {
211        let mut streams = SelectAll::new();
212        let (sender1, receiver1) = unbounded();
213        let (sender2, receiver2) = unbounded();
214        streams.push(receiver1.boxed());
215        streams.push(receiver2.boxed());
216
217        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
218            (Handle(0x1), BroadcastReceiveState::Empty),
219            (Handle(0x2), BroadcastReceiveState::Empty),
220        ]))));
221        let mut event_streams = EventStream::new(streams, source_tracker);
222
223        // Send notifications to underlying streams.
224        let bad_code_status =
225            EncryptionStatus::BadCode([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
226        #[rustfmt::skip]
227        sender1
228            .unbounded_send(Ok(CharacteristicNotification {
229                handle: Handle(0x1),
230                value: vec![
231                    0x01, AddressType::Public as u8,         // source id and address type
232                    0x02, 0x03, 0x04, 0x05, 0x06, 0x07,      // address
233                    0x01, 0x01, 0x02, 0x03,                  // ad set id and broadcast id
234                    PaSyncState::FailedToSync as u8,
235                    bad_code_status.raw_value(),
236                    1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,  // bad code
237                    0x00,                                    // no subgroups
238                ],
239                maybe_truncated: false,
240            }))
241            .expect("should send");
242
243        #[rustfmt::skip]
244        sender2
245            .unbounded_send(Ok(CharacteristicNotification {
246                handle: Handle(0x2),
247                value: vec![
248                    0x02, AddressType::Public as u8,             // source id and address type
249                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
250                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
251                    PaSyncState::NoPast as u8,
252                    EncryptionStatus::NotEncrypted.raw_value(),
253                    0x00,                                        // no subgroups
254                ],
255                maybe_truncated: false,
256            }))
257            .expect("should send");
258
259        // Events should have been generated from notifications.
260        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
261        let polled = event_streams.poll_next_unpin(&mut noop_cx);
262        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
263            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x030201).unwrap(), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
264        });
265
266        let polled = event_streams.poll_next_unpin(&mut noop_cx);
267        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
268            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
269        });
270
271        // Should be pending because no more events generated from notifications.
272        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
273
274        // Send notifications to underlying streams.
275        #[rustfmt::skip]
276        sender2
277            .unbounded_send(Ok(CharacteristicNotification {
278                handle: Handle(0x2),
279                value: vec![
280                    0x02, AddressType::Public as u8,             // source id and address type
281                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
282                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
283                    PaSyncState::Synced as u8,
284                    EncryptionStatus::NotEncrypted.raw_value(),
285                    0x00,                                        // no subgroups
286                ],
287                maybe_truncated: false,
288            }))
289            .expect("should send");
290
291        // Event should have been generated from notification.
292        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
293        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId::try_from(0x040302).unwrap())) });
294
295        // Should be pending because no more events generated from notifications.
296        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
297    }
298
299    #[test]
300    fn broadcast_source_is_removed() {
301        let mut streams = SelectAll::new();
302        let (_sender1, receiver1) = unbounded();
303        let (sender2, receiver2) = unbounded();
304        streams.push(receiver1.boxed());
305        streams.push(receiver2.boxed());
306
307        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
308            (Handle(0x1), BroadcastReceiveState::Empty),
309            (Handle(0x2), BroadcastReceiveState::Empty),
310        ]))));
311        let mut event_streams = EventStream::new(streams, source_tracker);
312
313        // Send notifications to underlying streams.
314        #[rustfmt::skip]
315        sender2
316            .unbounded_send(Ok(CharacteristicNotification {
317                handle: Handle(0x2),
318                value: vec![
319                    0x02, AddressType::Public as u8,             // source id and address type
320                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
321                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
322                    PaSyncState::Synced as u8,
323                    EncryptionStatus::NotEncrypted.raw_value(),
324                    0x00,                                        // no subgroups
325                ],
326                maybe_truncated: false,
327            }))
328            .expect("should send");
329
330        // Events should have been generated from notifications.
331        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
332
333        let polled = event_streams.poll_next_unpin(&mut noop_cx);
334        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
335            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
336        });
337
338        // Should be pending because no more events generated from notifications.
339        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
340
341        // Send notifications to underlying streams. Ths time, send empty BRS
342        // characteristic value.
343        sender2
344            .unbounded_send(Ok(CharacteristicNotification {
345                handle: Handle(0x2),
346                value: vec![],
347                maybe_truncated: false,
348            }))
349            .expect("should send");
350
351        // Event should have been generated from notification.
352        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
353        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId::try_from(0x040302).unwrap())) });
354
355        // Should be pending because no more events generated from notifications.
356        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
357    }
358
359    #[test]
360    fn error_on_one_stream_does_not_terminate_event_stream() {
361        let mut streams = SelectAll::new();
362        let (sender1, receiver1) = unbounded();
363        let (sender2, receiver2) = unbounded();
364        streams.push(receiver1.boxed());
365        streams.push(receiver2.boxed());
366
367        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
368            (Handle(0x1), BroadcastReceiveState::Empty),
369            (Handle(0x2), BroadcastReceiveState::Empty),
370        ]))));
371        let mut event_streams = EventStream::new(streams, source_tracker);
372        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
373
374        // Send an error on one stream.
375        sender1.unbounded_send(Err(BtGattError::Gatt(GattError::InvalidPdu))).expect("should send");
376
377        // We should receive the error event.
378        let polled = event_streams.poll_next_unpin(&mut noop_cx);
379        assert_matches!(polled, Poll::Ready(Some(Err(Error::EventStream(_)))));
380
381        // The stream should NOT be terminated.
382        assert!(!event_streams.is_terminated());
383        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
384
385        // Send a valid notification on the other stream.
386        #[rustfmt::skip]
387        sender2
388            .unbounded_send(Ok(CharacteristicNotification {
389                handle: Handle(0x2),
390                value: vec![
391                    0x02, AddressType::Public as u8,             // source id and address type
392                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
393                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
394                    PaSyncState::Synced as u8,
395                    EncryptionStatus::NotEncrypted.raw_value(),
396                    0x00,                                        // no subgroups
397                ],
398                maybe_truncated: false,
399            }))
400            .expect("should send");
401
402        // We should be able to receive the event from the second stream.
403        let polled = event_streams.poll_next_unpin(&mut noop_cx);
404        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
405            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
406        });
407
408        // The stream should still not be terminated.
409        assert!(!event_streams.is_terminated());
410    }
411}