1use std::collections::VecDeque;
6use std::sync::Arc;
7use std::task::Poll;
8
9use futures::stream::{BoxStream, FusedStream, SelectAll};
10use futures::{Stream, StreamExt};
11use parking_lot::Mutex;
12
13use bt_bap::types::BroadcastId;
14use bt_common::packet_encoding::Decodable;
15use bt_gatt::client::CharacteristicNotification;
16use bt_gatt::types::Error as BtGattError;
17
18use crate::client::error::Error;
19use crate::client::error::ServiceError;
20use crate::client::KnownBroadcastSources;
21use crate::types::*;
22
23#[derive(Clone, Debug, PartialEq)]
24pub enum Event {
25 SyncInfoRequested(BroadcastId),
27 NotSyncedToPa(BroadcastId),
29 SyncedToPa(BroadcastId),
31 SyncedFailedNoPast(BroadcastId),
33 BroadcastCodeRequired(BroadcastId),
35 InvalidBroadcastCode(BroadcastId, [u8; 16]),
37 Decrypting(BroadcastId),
40 UnknownPacket,
42 RemovedBroadcastSource(BroadcastId),
44 AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
46}
47
48impl Event {
49 pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
50 let mut events = Vec::new();
51 let pa_sync_state = state.pa_sync_state();
52 let broadcast_id = state.broadcast_id();
53 match pa_sync_state {
54 PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
55 PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
56 PaSyncState::FailedToSync | PaSyncState::NotSynced => {
57 events.push(Event::NotSyncedToPa(broadcast_id))
58 }
59 PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
60 }
61 match state.big_encryption() {
62 EncryptionStatus::BroadcastCodeRequired => {
63 events.push(Event::BroadcastCodeRequired(broadcast_id))
64 }
65 EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
66 EncryptionStatus::BadCode(code) => {
67 events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
68 }
69 _ => {}
70 };
71 events
72 }
73}
74
75pub struct EventStream {
79 notification_streams:
81 SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
82
83 event_queue: VecDeque<Result<Event, Error>>,
84 terminated: bool,
85
86 broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
88}
89
90impl EventStream {
91 pub(crate) fn new(
92 notification_streams: SelectAll<
93 BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
94 >,
95 broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
96 ) -> Self {
97 Self {
98 notification_streams,
99 event_queue: VecDeque::new(),
100 terminated: false,
101 broadcast_sources,
102 }
103 }
104}
105
106impl FusedStream for EventStream {
107 fn is_terminated(&self) -> bool {
108 self.terminated
109 }
110}
111
112impl Stream for EventStream {
113 type Item = Result<Event, Error>;
114
115 fn poll_next(
116 mut self: std::pin::Pin<&mut Self>,
117 cx: &mut std::task::Context<'_>,
118 ) -> Poll<Option<Self::Item>> {
119 if self.terminated {
120 return Poll::Ready(None);
121 }
122
123 loop {
124 if let Some(item) = self.event_queue.pop_front() {
125 return Poll::Ready(Some(item));
128 }
129
130 let Some(item) = futures::ready!(self.notification_streams.poll_next_unpin(cx)) else {
131 self.terminated = true;
133 let err = Error::EventStream(Box::new(Error::Service(
134 ServiceError::NotificationChannelClosed(format!(
135 "All BASS GATT notification streams closed"
136 )),
137 )));
138 return Poll::Ready(Some(Err(err)));
139 };
140
141 let Ok(notification) = item else {
144 let err = Error::EventStream(Box::new(Error::Gatt(item.unwrap_err())));
145 return Poll::Ready(Some(Err(err)));
146 };
147
148 let char_handle = notification.handle;
149 let (Ok(new_state), _) = BroadcastReceiveState::decode(notification.value.as_slice())
150 else {
151 self.event_queue.push_back(Ok(Event::UnknownPacket));
152 continue;
153 };
154
155 let maybe_prev_state = {
156 let mut lock = self.broadcast_sources.lock();
157 lock.update_state(char_handle, new_state.clone())
158 };
159
160 if let Some(ref prev_state) = maybe_prev_state {
162 if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
163 if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state) {
164 self.event_queue.push_back(Ok(Event::RemovedBroadcastSource(
165 prev_receive_state.broadcast_id,
166 )));
167 }
168 }
169 }
170
171 if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
174 let is_new_source = match maybe_prev_state {
175 Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
176 None => true,
177 };
178 if is_new_source {
179 self.event_queue.push_back(Ok(Event::AddedBroadcastSource(
180 receive_state.broadcast_id,
181 receive_state.pa_sync_state,
182 receive_state.big_encryption,
183 )));
184 } else {
185 let other_events = Event::from_broadcast_receive_state(&receive_state);
186 for e in other_events.into_iter() {
187 self.event_queue.push_back(Ok(e));
188 }
189 }
190 }
191 continue;
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 use std::collections::HashMap;
202
203 use assert_matches::assert_matches;
204 use futures::channel::mpsc::unbounded;
205
206 use bt_common::core::AddressType;
207 use bt_gatt::types::{Error as BtGattError, GattError, Handle};
208
209 #[test]
210 fn poll_event_stream() {
211 let mut streams = SelectAll::new();
212 let (sender1, receiver1) = unbounded();
213 let (sender2, receiver2) = unbounded();
214 streams.push(receiver1.boxed());
215 streams.push(receiver2.boxed());
216
217 let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
218 (Handle(0x1), BroadcastReceiveState::Empty),
219 (Handle(0x2), BroadcastReceiveState::Empty),
220 ]))));
221 let mut event_streams = EventStream::new(streams, source_tracker);
222
223 let bad_code_status =
225 EncryptionStatus::BadCode([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
226 #[rustfmt::skip]
227 sender1
228 .unbounded_send(Ok(CharacteristicNotification {
229 handle: Handle(0x1),
230 value: vec![
231 0x01, AddressType::Public as u8, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x01, 0x01, 0x02, 0x03, PaSyncState::FailedToSync as u8,
235 bad_code_status.raw_value(),
236 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16, 0x00, ],
239 maybe_truncated: false,
240 }))
241 .expect("should send");
242
243 #[rustfmt::skip]
244 sender2
245 .unbounded_send(Ok(CharacteristicNotification {
246 handle: Handle(0x2),
247 value: vec![
248 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::NoPast as u8,
252 EncryptionStatus::NotEncrypted.raw_value(),
253 0x00, ],
255 maybe_truncated: false,
256 }))
257 .expect("should send");
258
259 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
261 let polled = event_streams.poll_next_unpin(&mut noop_cx);
262 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
263 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x030201).unwrap(), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
264 });
265
266 let polled = event_streams.poll_next_unpin(&mut noop_cx);
267 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
268 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
269 });
270
271 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
273
274 #[rustfmt::skip]
276 sender2
277 .unbounded_send(Ok(CharacteristicNotification {
278 handle: Handle(0x2),
279 value: vec![
280 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::Synced as u8,
284 EncryptionStatus::NotEncrypted.raw_value(),
285 0x00, ],
287 maybe_truncated: false,
288 }))
289 .expect("should send");
290
291 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
293 assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId::try_from(0x040302).unwrap())) });
294
295 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
297 }
298
299 #[test]
300 fn broadcast_source_is_removed() {
301 let mut streams = SelectAll::new();
302 let (_sender1, receiver1) = unbounded();
303 let (sender2, receiver2) = unbounded();
304 streams.push(receiver1.boxed());
305 streams.push(receiver2.boxed());
306
307 let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
308 (Handle(0x1), BroadcastReceiveState::Empty),
309 (Handle(0x2), BroadcastReceiveState::Empty),
310 ]))));
311 let mut event_streams = EventStream::new(streams, source_tracker);
312
313 #[rustfmt::skip]
315 sender2
316 .unbounded_send(Ok(CharacteristicNotification {
317 handle: Handle(0x2),
318 value: vec![
319 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::Synced as u8,
323 EncryptionStatus::NotEncrypted.raw_value(),
324 0x00, ],
326 maybe_truncated: false,
327 }))
328 .expect("should send");
329
330 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
332
333 let polled = event_streams.poll_next_unpin(&mut noop_cx);
334 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
335 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
336 });
337
338 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
340
341 sender2
344 .unbounded_send(Ok(CharacteristicNotification {
345 handle: Handle(0x2),
346 value: vec![],
347 maybe_truncated: false,
348 }))
349 .expect("should send");
350
351 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
353 assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId::try_from(0x040302).unwrap())) });
354
355 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
357 }
358
359 #[test]
360 fn error_on_one_stream_does_not_terminate_event_stream() {
361 let mut streams = SelectAll::new();
362 let (sender1, receiver1) = unbounded();
363 let (sender2, receiver2) = unbounded();
364 streams.push(receiver1.boxed());
365 streams.push(receiver2.boxed());
366
367 let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
368 (Handle(0x1), BroadcastReceiveState::Empty),
369 (Handle(0x2), BroadcastReceiveState::Empty),
370 ]))));
371 let mut event_streams = EventStream::new(streams, source_tracker);
372 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
373
374 sender1.unbounded_send(Err(BtGattError::Gatt(GattError::InvalidPdu))).expect("should send");
376
377 let polled = event_streams.poll_next_unpin(&mut noop_cx);
379 assert_matches!(polled, Poll::Ready(Some(Err(Error::EventStream(_)))));
380
381 assert!(!event_streams.is_terminated());
383 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
384
385 #[rustfmt::skip]
387 sender2
388 .unbounded_send(Ok(CharacteristicNotification {
389 handle: Handle(0x2),
390 value: vec![
391 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::Synced as u8,
395 EncryptionStatus::NotEncrypted.raw_value(),
396 0x00, ],
398 maybe_truncated: false,
399 }))
400 .expect("should send");
401
402 let polled = event_streams.poll_next_unpin(&mut noop_cx);
404 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
405 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
406 });
407
408 assert!(!event_streams.is_terminated());
410 }
411}