bt_bass/client/
event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7use std::task::Poll;
8
9use futures::stream::{BoxStream, FusedStream, SelectAll};
10use futures::{Stream, StreamExt};
11use parking_lot::Mutex;
12
13use bt_bap::types::BroadcastId;
14use bt_common::packet_encoding::Decodable;
15use bt_gatt::client::CharacteristicNotification;
16use bt_gatt::types::Error as BtGattError;
17
18use crate::client::KnownBroadcastSources;
19use crate::client::error::Error;
20use crate::client::error::ServiceError;
21use crate::types::*;
22
23#[derive(Clone, Debug, PartialEq)]
24pub enum Event {
25    // Broadcast Audio Scan Service (BASS) server requested for SyncInfo through PAST procedure.
26    SyncInfoRequested(BroadcastId),
27    // BASS server failed to synchornize to PA or did not synchronize to PA.
28    NotSyncedToPa(BroadcastId),
29    // BASS server successfully synced to PA.
30    SyncedToPa(BroadcastId),
31    // BASS server failed to sync to PA since SyncInfo wasn't received.
32    SyncedFailedNoPast(BroadcastId),
33    // BASS server requires code to since the BIS is encrypted.
34    BroadcastCodeRequired(BroadcastId),
35    // BASS server failed to decrypt BIS using the previously provided code.
36    InvalidBroadcastCode(BroadcastId, [u8; 16]),
37    // BASS server has autonomously synchronized to a BIS that is encrypted, and the server
38    // has the correct encryption key to decrypt the BIS.
39    Decrypting(BroadcastId),
40    // Received a packet from the BASS server not recognized by this library.
41    UnknownPacket,
42    // Broadcast source was removed by the BASS server.
43    RemovedBroadcastSource(BroadcastId),
44    // Broadcast source was added by the BASS server.
45    AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
46}
47
48impl Event {
49    pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
50        let mut events = Vec::new();
51        let pa_sync_state = state.pa_sync_state();
52        let broadcast_id = state.broadcast_id();
53        match pa_sync_state {
54            PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
55            PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
56            PaSyncState::FailedToSync | PaSyncState::NotSynced => {
57                events.push(Event::NotSyncedToPa(broadcast_id))
58            }
59            PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
60        }
61        match state.big_encryption() {
62            EncryptionStatus::BroadcastCodeRequired => {
63                events.push(Event::BroadcastCodeRequired(broadcast_id))
64            }
65            EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
66            EncryptionStatus::BadCode(code) => {
67                events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
68            }
69            _ => {}
70        };
71        events
72    }
73}
74
75/// Trait for representing a stream that outputs Events from BASS. If there was
76/// an error the stream should output error instead and terminate.
77
78pub struct EventStream {
79    // Actual GATT notification streams that we poll from.
80    notification_streams:
81        SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
82
83    event_queue: VecDeque<Result<Event, Error>>,
84    terminated: bool,
85
86    // States to be updated.
87    broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
88}
89
90impl EventStream {
91    pub(crate) fn new(
92        notification_streams: SelectAll<
93            BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
94        >,
95        broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
96    ) -> Self {
97        Self {
98            notification_streams,
99            event_queue: VecDeque::new(),
100            terminated: false,
101            broadcast_sources,
102        }
103    }
104}
105
106impl FusedStream for EventStream {
107    fn is_terminated(&self) -> bool {
108        self.terminated
109    }
110}
111
112impl Stream for EventStream {
113    type Item = Result<Event, Error>;
114
115    fn poll_next(
116        mut self: std::pin::Pin<&mut Self>,
117        cx: &mut std::task::Context<'_>,
118    ) -> Poll<Option<Self::Item>> {
119        if self.terminated {
120            return Poll::Ready(None);
121        }
122
123        loop {
124            if let Some(item) = self.event_queue.pop_front() {
125                match item {
126                    Ok(event) => return Poll::Ready(Some(Ok(event))),
127                    Err(e) => {
128                        // If an error was received, we terminate the event stream, but send an
129                        // error to indicate why it was terminated.
130                        self.terminated = true;
131                        return Poll::Ready(Some(Err(e)));
132                    }
133                }
134            }
135
136            match self.notification_streams.poll_next_unpin(cx) {
137                Poll::Pending => return Poll::Pending,
138                Poll::Ready(None) => {
139                    let err = Error::EventStream(Box::new(Error::Service(
140                        ServiceError::NotificationChannelClosed(format!(
141                            "GATT notification stream for BRS characteristics closed"
142                        )),
143                    )));
144                    self.event_queue.push_back(Err(err));
145                }
146                Poll::Ready(Some(Err(error))) => {
147                    // Deem all errors as critical.
148                    let err = Error::EventStream(Box::new(Error::Gatt(error)));
149                    self.event_queue.push_back(Err(err));
150                }
151                Poll::Ready(Some(Ok(notification))) => {
152                    let char_handle = notification.handle;
153                    let (Ok(new_state), _) =
154                        BroadcastReceiveState::decode(notification.value.as_slice())
155                    else {
156                        self.event_queue.push_back(Ok(Event::UnknownPacket));
157                        continue;
158                    };
159
160                    let maybe_prev_state = {
161                        let mut lock = self.broadcast_sources.lock();
162                        lock.update_state(char_handle, new_state.clone())
163                    };
164
165                    let mut multi_events = VecDeque::new();
166
167                    // If the previous value was not empty, check if it was overwritten.
168                    if let Some(ref prev_state) = maybe_prev_state {
169                        if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
170                            if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
171                            {
172                                multi_events.push_back(Ok(Event::RemovedBroadcastSource(
173                                    prev_receive_state.broadcast_id,
174                                )));
175                            }
176                        }
177                    }
178
179                    // BRS characteristic value was updated with a new broadcast source
180                    // information.
181                    if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
182                        let is_new_source = match maybe_prev_state {
183                            Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
184                            None => true,
185                        };
186                        if is_new_source {
187                            multi_events.push_back(Ok(Event::AddedBroadcastSource(
188                                receive_state.broadcast_id,
189                                receive_state.pa_sync_state,
190                                receive_state.big_encryption,
191                            )));
192                        } else {
193                            let other_events = Event::from_broadcast_receive_state(&receive_state);
194                            for e in other_events.into_iter() {
195                                multi_events.push_back(Ok(e));
196                            }
197                        }
198                    }
199                    if multi_events.len() != 0 {
200                        self.event_queue.append(&mut multi_events);
201                        continue;
202                    }
203                    continue;
204                }
205            };
206
207            break;
208        }
209        Poll::Pending
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    use std::collections::HashMap;
218
219    use assert_matches::assert_matches;
220    use futures::channel::mpsc::unbounded;
221
222    use bt_common::core::AddressType;
223    use bt_gatt::types::Handle;
224
225    #[test]
226    fn poll_event_stream() {
227        let mut streams = SelectAll::new();
228        let (sender1, receiver1) = unbounded();
229        let (sender2, receiver2) = unbounded();
230        streams.push(receiver1.boxed());
231        streams.push(receiver2.boxed());
232
233        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
234            (Handle(0x1), BroadcastReceiveState::Empty),
235            (Handle(0x2), BroadcastReceiveState::Empty),
236        ]))));
237        let mut event_streams = EventStream::new(streams, source_tracker);
238
239        // Send notifications to underlying streams.
240        let bad_code_status =
241            EncryptionStatus::BadCode([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
242        #[rustfmt::skip]
243        sender1
244            .unbounded_send(Ok(CharacteristicNotification {
245                handle: Handle(0x1),
246                value: vec![
247                    0x01, AddressType::Public as u8,         // source id and address type
248                    0x02, 0x03, 0x04, 0x05, 0x06, 0x07,      // address
249                    0x01, 0x01, 0x02, 0x03,                  // ad set id and broadcast id
250                    PaSyncState::FailedToSync as u8,
251                    bad_code_status.raw_value(),
252                    1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,  // bad code
253                    0x00,                                    // no subgroups
254                ],
255                maybe_truncated: false,
256            }))
257            .expect("should send");
258
259        #[rustfmt::skip]
260        sender2
261            .unbounded_send(Ok(CharacteristicNotification {
262                handle: Handle(0x2),
263                value: vec![
264                    0x02, AddressType::Public as u8,             // source id and address type
265                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
266                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
267                    PaSyncState::NoPast as u8,
268                    EncryptionStatus::NotEncrypted.raw_value(),
269                    0x00,                                        // no subgroups
270                ],
271                maybe_truncated: false,
272            }))
273            .expect("should send");
274
275        // Events should have been generated from notifications.
276        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
277        let polled = event_streams.poll_next_unpin(&mut noop_cx);
278        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
279            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x030201).unwrap(), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
280        });
281
282        let polled = event_streams.poll_next_unpin(&mut noop_cx);
283        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
284            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
285        });
286
287        // Should be pending because no more events generated from notifications.
288        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
289
290        // Send notifications to underlying streams.
291        #[rustfmt::skip]
292        sender2
293            .unbounded_send(Ok(CharacteristicNotification {
294                handle: Handle(0x2),
295                value: vec![
296                    0x02, AddressType::Public as u8,             // source id and address type
297                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
298                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
299                    PaSyncState::Synced as u8,
300                    EncryptionStatus::NotEncrypted.raw_value(),
301                    0x00,                                        // no subgroups
302                ],
303                maybe_truncated: false,
304            }))
305            .expect("should send");
306
307        // Event should have been generated from notification.
308        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
309        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId::try_from(0x040302).unwrap())) });
310
311        // Should be pending because no more events generated from notifications.
312        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
313    }
314
315    #[test]
316    fn broadcast_source_is_removed() {
317        let mut streams = SelectAll::new();
318        let (_sender1, receiver1) = unbounded();
319        let (sender2, receiver2) = unbounded();
320        streams.push(receiver1.boxed());
321        streams.push(receiver2.boxed());
322
323        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
324            (Handle(0x1), BroadcastReceiveState::Empty),
325            (Handle(0x2), BroadcastReceiveState::Empty),
326        ]))));
327        let mut event_streams = EventStream::new(streams, source_tracker);
328
329        // Send notifications to underlying streams.
330        #[rustfmt::skip]
331        sender2
332            .unbounded_send(Ok(CharacteristicNotification {
333                handle: Handle(0x2),
334                value: vec![
335                    0x02, AddressType::Public as u8,             // source id and address type
336                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
337                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
338                    PaSyncState::Synced as u8,
339                    EncryptionStatus::NotEncrypted.raw_value(),
340                    0x00,                                        // no subgroups
341                ],
342                maybe_truncated: false,
343            }))
344            .expect("should send");
345
346        // Events should have been generated from notifications.
347        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
348
349        let polled = event_streams.poll_next_unpin(&mut noop_cx);
350        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
351            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
352        });
353
354        // Should be pending because no more events generated from notifications.
355        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
356
357        // Send notifications to underlying streams. Ths time, send empty BRS
358        // characteristic value.
359        sender2
360            .unbounded_send(Ok(CharacteristicNotification {
361                handle: Handle(0x2),
362                value: vec![],
363                maybe_truncated: false,
364            }))
365            .expect("should send");
366
367        // Event should have been generated from notification.
368        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
369        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId::try_from(0x040302).unwrap())) });
370
371        // Should be pending because no more events generated from notifications.
372        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
373    }
374}