1use std::collections::VecDeque;
6use std::sync::Arc;
7use std::task::Poll;
8
9use futures::stream::{BoxStream, FusedStream, SelectAll};
10use futures::{Stream, StreamExt};
11use parking_lot::Mutex;
12
13use bt_bap::types::BroadcastId;
14use bt_common::packet_encoding::Decodable;
15use bt_gatt::client::CharacteristicNotification;
16use bt_gatt::types::Error as BtGattError;
17
18use crate::client::KnownBroadcastSources;
19use crate::client::error::Error;
20use crate::client::error::ServiceError;
21use crate::types::*;
22
23#[derive(Clone, Debug, PartialEq)]
24pub enum Event {
25 SyncInfoRequested(BroadcastId),
27 NotSyncedToPa(BroadcastId),
29 SyncedToPa(BroadcastId),
31 SyncedFailedNoPast(BroadcastId),
33 BroadcastCodeRequired(BroadcastId),
35 InvalidBroadcastCode(BroadcastId, [u8; 16]),
37 Decrypting(BroadcastId),
40 UnknownPacket,
42 RemovedBroadcastSource(BroadcastId),
44 AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
46}
47
48impl Event {
49 pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
50 let mut events = Vec::new();
51 let pa_sync_state = state.pa_sync_state();
52 let broadcast_id = state.broadcast_id();
53 match pa_sync_state {
54 PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
55 PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
56 PaSyncState::FailedToSync | PaSyncState::NotSynced => {
57 events.push(Event::NotSyncedToPa(broadcast_id))
58 }
59 PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
60 }
61 match state.big_encryption() {
62 EncryptionStatus::BroadcastCodeRequired => {
63 events.push(Event::BroadcastCodeRequired(broadcast_id))
64 }
65 EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
66 EncryptionStatus::BadCode(code) => {
67 events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
68 }
69 _ => {}
70 };
71 events
72 }
73}
74
75pub struct EventStream {
79 notification_streams:
81 SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
82
83 event_queue: VecDeque<Result<Event, Error>>,
84 terminated: bool,
85
86 broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
88}
89
90impl EventStream {
91 pub(crate) fn new(
92 notification_streams: SelectAll<
93 BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
94 >,
95 broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
96 ) -> Self {
97 Self {
98 notification_streams,
99 event_queue: VecDeque::new(),
100 terminated: false,
101 broadcast_sources,
102 }
103 }
104}
105
106impl FusedStream for EventStream {
107 fn is_terminated(&self) -> bool {
108 self.terminated
109 }
110}
111
112impl Stream for EventStream {
113 type Item = Result<Event, Error>;
114
115 fn poll_next(
116 mut self: std::pin::Pin<&mut Self>,
117 cx: &mut std::task::Context<'_>,
118 ) -> Poll<Option<Self::Item>> {
119 if self.terminated {
120 return Poll::Ready(None);
121 }
122
123 loop {
124 if let Some(item) = self.event_queue.pop_front() {
125 match item {
126 Ok(event) => return Poll::Ready(Some(Ok(event))),
127 Err(e) => {
128 self.terminated = true;
131 return Poll::Ready(Some(Err(e)));
132 }
133 }
134 }
135
136 match self.notification_streams.poll_next_unpin(cx) {
137 Poll::Pending => return Poll::Pending,
138 Poll::Ready(None) => {
139 let err = Error::EventStream(Box::new(Error::Service(
140 ServiceError::NotificationChannelClosed(format!(
141 "GATT notification stream for BRS characteristics closed"
142 )),
143 )));
144 self.event_queue.push_back(Err(err));
145 }
146 Poll::Ready(Some(Err(error))) => {
147 let err = Error::EventStream(Box::new(Error::Gatt(error)));
149 self.event_queue.push_back(Err(err));
150 }
151 Poll::Ready(Some(Ok(notification))) => {
152 let char_handle = notification.handle;
153 let (Ok(new_state), _) =
154 BroadcastReceiveState::decode(notification.value.as_slice())
155 else {
156 self.event_queue.push_back(Ok(Event::UnknownPacket));
157 continue;
158 };
159
160 let maybe_prev_state = {
161 let mut lock = self.broadcast_sources.lock();
162 lock.update_state(char_handle, new_state.clone())
163 };
164
165 let mut multi_events = VecDeque::new();
166
167 if let Some(ref prev_state) = maybe_prev_state {
169 if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
170 if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
171 {
172 multi_events.push_back(Ok(Event::RemovedBroadcastSource(
173 prev_receive_state.broadcast_id,
174 )));
175 }
176 }
177 }
178
179 if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
182 let is_new_source = match maybe_prev_state {
183 Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
184 None => true,
185 };
186 if is_new_source {
187 multi_events.push_back(Ok(Event::AddedBroadcastSource(
188 receive_state.broadcast_id,
189 receive_state.pa_sync_state,
190 receive_state.big_encryption,
191 )));
192 } else {
193 let other_events = Event::from_broadcast_receive_state(&receive_state);
194 for e in other_events.into_iter() {
195 multi_events.push_back(Ok(e));
196 }
197 }
198 }
199 if multi_events.len() != 0 {
200 self.event_queue.append(&mut multi_events);
201 continue;
202 }
203 continue;
204 }
205 };
206
207 break;
208 }
209 Poll::Pending
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 use std::collections::HashMap;
218
219 use assert_matches::assert_matches;
220 use futures::channel::mpsc::unbounded;
221
222 use bt_common::core::AddressType;
223 use bt_gatt::types::Handle;
224
225 #[test]
226 fn poll_event_stream() {
227 let mut streams = SelectAll::new();
228 let (sender1, receiver1) = unbounded();
229 let (sender2, receiver2) = unbounded();
230 streams.push(receiver1.boxed());
231 streams.push(receiver2.boxed());
232
233 let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
234 (Handle(0x1), BroadcastReceiveState::Empty),
235 (Handle(0x2), BroadcastReceiveState::Empty),
236 ]))));
237 let mut event_streams = EventStream::new(streams, source_tracker);
238
239 let bad_code_status =
241 EncryptionStatus::BadCode([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
242 #[rustfmt::skip]
243 sender1
244 .unbounded_send(Ok(CharacteristicNotification {
245 handle: Handle(0x1),
246 value: vec![
247 0x01, AddressType::Public as u8, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x01, 0x01, 0x02, 0x03, PaSyncState::FailedToSync as u8,
251 bad_code_status.raw_value(),
252 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16, 0x00, ],
255 maybe_truncated: false,
256 }))
257 .expect("should send");
258
259 #[rustfmt::skip]
260 sender2
261 .unbounded_send(Ok(CharacteristicNotification {
262 handle: Handle(0x2),
263 value: vec![
264 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::NoPast as u8,
268 EncryptionStatus::NotEncrypted.raw_value(),
269 0x00, ],
271 maybe_truncated: false,
272 }))
273 .expect("should send");
274
275 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
277 let polled = event_streams.poll_next_unpin(&mut noop_cx);
278 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
279 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x030201).unwrap(), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
280 });
281
282 let polled = event_streams.poll_next_unpin(&mut noop_cx);
283 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
284 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
285 });
286
287 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
289
290 #[rustfmt::skip]
292 sender2
293 .unbounded_send(Ok(CharacteristicNotification {
294 handle: Handle(0x2),
295 value: vec![
296 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::Synced as u8,
300 EncryptionStatus::NotEncrypted.raw_value(),
301 0x00, ],
303 maybe_truncated: false,
304 }))
305 .expect("should send");
306
307 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
309 assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId::try_from(0x040302).unwrap())) });
310
311 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
313 }
314
315 #[test]
316 fn broadcast_source_is_removed() {
317 let mut streams = SelectAll::new();
318 let (_sender1, receiver1) = unbounded();
319 let (sender2, receiver2) = unbounded();
320 streams.push(receiver1.boxed());
321 streams.push(receiver2.boxed());
322
323 let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
324 (Handle(0x1), BroadcastReceiveState::Empty),
325 (Handle(0x2), BroadcastReceiveState::Empty),
326 ]))));
327 let mut event_streams = EventStream::new(streams, source_tracker);
328
329 #[rustfmt::skip]
331 sender2
332 .unbounded_send(Ok(CharacteristicNotification {
333 handle: Handle(0x2),
334 value: vec![
335 0x02, AddressType::Public as u8, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, PaSyncState::Synced as u8,
339 EncryptionStatus::NotEncrypted.raw_value(),
340 0x00, ],
342 maybe_truncated: false,
343 }))
344 .expect("should send");
345
346 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
348
349 let polled = event_streams.poll_next_unpin(&mut noop_cx);
350 assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
351 assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
352 });
353
354 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
356
357 sender2
360 .unbounded_send(Ok(CharacteristicNotification {
361 handle: Handle(0x2),
362 value: vec![],
363 maybe_truncated: false,
364 }))
365 .expect("should send");
366
367 let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
369 assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId::try_from(0x040302).unwrap())) });
370
371 assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
373 }
374}