1use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
6use fuchsia_async::{DurationExt, Task, TimeoutExt};
7use fuchsia_bluetooth::types::{A2dpDirection, Channel};
8use fuchsia_sync::{Mutex, RwLock};
9use futures::stream::{FusedStream, Stream};
10use futures::{FutureExt, Sink};
11use log::warn;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{Context, Poll};
15use std::{fmt, io};
16use zx::{MonotonicDuration, Status};
17
18use crate::types::{
19 EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
20 ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
21};
22use crate::{Peer, SimpleResponder};
23
24pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
25
26#[derive(PartialEq, Debug, Default, Clone, Copy)]
28pub enum StreamState {
29 #[default]
30 Idle,
31 Configured,
32 Opening,
34 Open,
35 Streaming,
36 Closing,
37 Aborting,
38}
39
40pub struct StreamEndpoint {
45 id: StreamEndpointId,
47 endpoint_type: EndpointType,
49 media_type: MediaType,
51 state: Arc<Mutex<StreamState>>,
53 transport: Option<Arc<RwLock<Channel>>>,
56 stream_held: Arc<Mutex<bool>>,
59 capabilities: Vec<ServiceCapability>,
61 remote_id: Option<StreamEndpointId>,
63 configuration: Vec<ServiceCapability>,
65 update_callback: Option<StreamEndpointUpdateCallback>,
67 in_progress: Option<Task<()>>,
70}
71
72impl fmt::Debug for StreamEndpoint {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("StreamEndpoint")
75 .field("id", &self.id.0)
76 .field("endpoint_type", &self.endpoint_type)
77 .field("media_type", &self.media_type)
78 .field("state", &self.state)
79 .field("capabilities", &self.capabilities)
80 .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
81 .field("configuration", &self.configuration)
82 .finish()
83 }
84}
85
86impl StreamEndpoint {
87 pub fn new(
91 id: u8,
92 media_type: MediaType,
93 endpoint_type: EndpointType,
94 capabilities: Vec<ServiceCapability>,
95 ) -> AvdtpResult<StreamEndpoint> {
96 let seid = StreamEndpointId::try_from(id)?;
97 Ok(StreamEndpoint {
98 id: seid,
99 capabilities,
100 media_type,
101 endpoint_type,
102 state: Default::default(),
103 transport: None,
104 stream_held: Arc::new(Mutex::new(false)),
105 remote_id: None,
106 configuration: vec![],
107 update_callback: None,
108 in_progress: None,
109 })
110 }
111
112 pub fn as_new(&self) -> Self {
113 StreamEndpoint::new(
114 self.id.0,
115 self.media_type.clone(),
116 self.endpoint_type.clone(),
117 self.capabilities.clone(),
118 )
119 .expect("as_new")
120 }
121
122 fn set_state(&mut self, state: StreamState) {
124 *self.state.lock() = state;
125 self.update_callback();
126 }
127
128 pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
131 self.update_callback = callback;
132 }
133
134 fn update_callback(&self) {
135 if let Some(cb) = self.update_callback.as_ref() {
136 cb(self);
137 }
138 }
139
140 pub fn from_info(
144 info: &StreamInformation,
145 capabilities: Vec<ServiceCapability>,
146 ) -> StreamEndpoint {
147 StreamEndpoint {
148 id: info.id().clone(),
149 capabilities,
150 media_type: info.media_type().clone(),
151 endpoint_type: info.endpoint_type().clone(),
152 state: Default::default(),
153 transport: None,
154 stream_held: Arc::new(Mutex::new(false)),
155 remote_id: None,
156 configuration: vec![],
157 update_callback: None,
158 in_progress: None,
159 }
160 }
161
162 fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
165 (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
166 }
167
168 pub fn configure(
172 &mut self,
173 remote_id: &StreamEndpointId,
174 capabilities: Vec<ServiceCapability>,
175 ) -> Result<(), (ServiceCategory, ErrorCode)> {
176 self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
177 self.remote_id = Some(remote_id.clone());
178 for cap in &capabilities {
179 if !self
180 .capabilities
181 .iter()
182 .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
183 {
184 return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
185 }
186 }
187 self.configuration = capabilities;
188 self.set_state(StreamState::Configured);
189 Ok(())
190 }
191
192 pub fn reconfigure(
197 &mut self,
198 mut capabilities: Vec<ServiceCapability>,
199 ) -> Result<(), (ServiceCategory, ErrorCode)> {
200 self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
201 if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
203 return Err((cap.category(), ErrorCode::InvalidCapabilities));
204 }
205 let to_replace: std::vec::Vec<_> =
207 capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
208 self.configuration.retain(|x| {
209 let disc = std::mem::discriminant(x);
210 !to_replace.contains(&disc)
211 });
212 self.configuration.append(&mut capabilities);
213 self.update_callback();
214 Ok(())
215 }
216
217 pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
221 if self.configuration.is_empty() {
222 return None;
223 }
224 Some(&self.configuration)
225 }
226
227 const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
230
231 pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
238 if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
239 return Err(Error::InvalidState);
240 }
241 self.transport = Some(Arc::new(RwLock::new(c)));
242 self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
243 self.stream_held = Arc::new(Mutex::new(false));
244 self.set_state(StreamState::Open);
246 Ok(false)
247 }
248
249 pub fn establish(&mut self) -> Result<(), ErrorCode> {
252 if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
253 return Err(ErrorCode::BadState);
254 }
255 self.set_state(StreamState::Opening);
256 Ok(())
257 }
258
259 pub fn try_priority(&self, active: bool) {
263 let priority = match (active, &self.endpoint_type) {
264 (false, _) => A2dpDirection::Normal,
265 (true, EndpointType::Source) => A2dpDirection::Source,
266 (true, EndpointType::Sink) => A2dpDirection::Sink,
267 };
268 let fut = match self.transport.as_ref().unwrap().try_read() {
269 None => return,
270 Some(channel) => channel.set_audio_priority(priority).map(|_| ()),
271 };
272 Task::spawn(fut).detach();
274 }
275
276 pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
278 if self.endpoint_type != EndpointType::Source {
279 return;
280 }
281 let fut = match self.transport.as_ref().unwrap().try_write() {
282 None => return,
283 Some(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
284 };
285 Task::spawn(fut).detach();
287 }
288
289 pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
293 {
294 let lock = self.state.lock();
295 if *lock != StreamState::Open && *lock != StreamState::Streaming {
296 return responder.reject(ErrorCode::BadState);
297 }
298 }
299 self.set_state(StreamState::Closing);
300 responder.send()?;
301 let release_wait_fut = {
302 let seid = self.remote_id.take().unwrap();
305 let transport = self.transport.take().unwrap();
306 let peer = peer.clone();
307 let state = self.state.clone();
308 async move {
309 let Some(transport) = transport.try_read() else {
310 warn!("unable to lock transport channel, dropping and assuming closed");
311 *state.lock() = StreamState::Idle;
312 return;
313 };
314 let closed_fut = transport
315 .closed()
316 .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
317 Err(Status::TIMED_OUT)
318 });
319 if let Err(Status::TIMED_OUT) = closed_fut.await {
320 let _ = peer.abort(&seid).await;
321 *state.lock() = StreamState::Aborting;
322 drop(transport);
324 }
325 *state.lock() = StreamState::Idle;
326 }
327 };
328 self.in_progress = Some(Task::local(release_wait_fut));
329 self.configuration.clear();
332 self.update_callback();
333 Ok(())
334 }
335
336 pub fn state(&self) -> StreamState {
338 *self.state.lock()
339 }
340
341 pub fn start(&mut self) -> Result<(), ErrorCode> {
344 self.state_is(StreamState::Open)?;
345 self.try_priority(true);
346 self.set_state(StreamState::Streaming);
347 Ok(())
348 }
349
350 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
353 self.state_is(StreamState::Streaming)?;
354 self.set_state(StreamState::Open);
355 self.try_priority(false);
356 Ok(())
357 }
358
359 pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
363 if let Some(seid) = self.remote_id.take() {
364 let _ = peer.abort(&seid).await;
365 self.set_state(StreamState::Aborting);
366 }
367 self.abort()
368 }
369
370 pub fn abort(&mut self) {
373 self.set_state(StreamState::Aborting);
374 self.configuration.clear();
375 self.remote_id = None;
376 self.transport = None;
377 self.set_state(StreamState::Idle);
378 }
379
380 pub fn capabilities(&self) -> &Vec<ServiceCapability> {
384 &self.capabilities
385 }
386
387 pub fn codec_type(&self) -> Option<&MediaCodecType> {
391 self.capabilities.iter().find_map(|cap| match cap {
392 ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
393 _ => None,
394 })
395 }
396
397 pub fn local_id(&self) -> &StreamEndpointId {
399 &self.id
400 }
401
402 pub fn remote_id(&self) -> Option<&StreamEndpointId> {
404 self.remote_id.as_ref()
405 }
406
407 pub fn endpoint_type(&self) -> &EndpointType {
409 &self.endpoint_type
410 }
411
412 pub fn information(&self) -> StreamInformation {
414 let in_use = self.state_is(StreamState::Idle).is_err();
415 StreamInformation::new(
416 self.id.clone(),
417 in_use,
418 self.media_type.clone(),
419 self.endpoint_type.clone(),
420 )
421 }
422
423 pub fn take_transport(&mut self) -> Option<MediaStream> {
427 let mut stream_held = self.stream_held.lock();
428 if *stream_held || self.transport.is_none() {
429 return None;
430 }
431
432 *stream_held = true;
433
434 Some(MediaStream::new(
435 self.stream_held.clone(),
436 Arc::downgrade(self.transport.as_ref().unwrap()),
437 ))
438 }
439
440 pub fn audio_offload(&self) -> Option<AudioOffloadExtProxy> {
442 self.transport.as_ref().and_then(|c| c.read().audio_offload())
443 }
444}
445
446pub struct MediaStream {
450 in_use: Arc<Mutex<bool>>,
451 channel: Weak<RwLock<Channel>>,
452 terminated: bool,
453}
454
455impl MediaStream {
456 pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
457 Self { in_use, channel, terminated: false }
458 }
459
460 fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
461 self.channel
462 .upgrade()
463 .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
464 }
465
466 pub fn max_tx_size(&self) -> Result<usize, io::Error> {
467 match self.try_upgrade()?.try_read() {
468 None => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
469 Some(lock) => Ok(lock.max_tx_size()),
470 }
471 }
472}
473
474impl Drop for MediaStream {
475 fn drop(&mut self) {
476 let mut l = self.in_use.lock();
477 *l = false;
478 }
479}
480
481impl Stream for MediaStream {
482 type Item = AvdtpResult<Vec<u8>>;
483
484 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485 let Ok(arc_chan) = self.try_upgrade() else {
486 self.terminated = true;
487 return Poll::Ready(None);
488 };
489 let Some(lock) = arc_chan.try_write() else {
490 self.terminated = true;
491 return Poll::Ready(None);
492 };
493 let mut pin_chan = Pin::new(lock);
494 match pin_chan.as_mut().poll_next(cx) {
495 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
496 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
497 Poll::Ready(None) => {
498 self.terminated = true;
499 Poll::Ready(None)
500 }
501 Poll::Pending => Poll::Pending,
502 }
503 }
504}
505
506impl FusedStream for MediaStream {
507 fn is_terminated(&self) -> bool {
508 self.terminated
509 }
510}
511
512impl Sink<Vec<u8>> for MediaStream {
513 type Error = io::Error;
514
515 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
516 let arc_chan = self.try_upgrade()?;
517 let mut lock = arc_chan
518 .try_write()
519 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
520 Pin::new(&mut *lock).poll_ready(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
521 }
522
523 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
524 let arc_chan = self.try_upgrade()?;
525 let mut lock = arc_chan
526 .try_write()
527 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
528 Pin::new(&mut *lock).start_send(item).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
529 }
530
531 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532 let arc_chan = self.try_upgrade()?;
533 let mut lock = arc_chan
534 .try_write()
535 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
536 Pin::new(&mut *lock).poll_flush(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
537 }
538
539 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
540 let arc_chan = self.try_upgrade()?;
541 let mut lock = arc_chan
542 .try_write()
543 .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
544 Pin::new(&mut *lock).poll_close(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::Request;
552 use crate::tests::{expect_remote_recv, setup_peer};
553
554 use assert_matches::assert_matches;
555 use async_utils::PollExt;
556 use fidl::endpoints::create_request_stream;
557 use fidl_fuchsia_bluetooth as fidl_bt;
558 use fidl_fuchsia_bluetooth_bredr as bredr;
559 use fuchsia_async as fasync;
560 use futures::SinkExt;
561 use futures::stream::StreamExt;
562
563 const REMOTE_ID_VAL: u8 = 1;
564 const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
565
566 #[test]
567 fn make() {
568 let s = StreamEndpoint::new(
569 REMOTE_ID_VAL,
570 MediaType::Audio,
571 EndpointType::Sink,
572 vec![ServiceCapability::MediaTransport],
573 );
574 assert!(s.is_ok());
575 let s = s.unwrap();
576 assert_eq!(&StreamEndpointId(1), s.local_id());
577
578 let info = s.information();
579 assert!(!info.in_use());
580
581 let no = StreamEndpoint::new(
582 0,
583 MediaType::Audio,
584 EndpointType::Sink,
585 vec![ServiceCapability::MediaTransport],
586 );
587 assert!(no.is_err());
588 }
589
590 fn establish_stream(s: &mut StreamEndpoint) -> Channel {
591 assert_matches!(s.establish(), Ok(()));
592 let (chan, remote) = Channel::create();
593 assert_matches!(s.receive_channel(chan), Ok(false));
594 remote
595 }
596
597 #[test]
598 fn from_info() {
599 let seid = StreamEndpointId::try_from(5).unwrap();
600 let info =
601 StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
602 let capabilities = vec![ServiceCapability::MediaTransport];
603
604 let endpoint = StreamEndpoint::from_info(&info, capabilities);
605
606 assert_eq!(&seid, endpoint.local_id());
607 assert_eq!(&false, endpoint.information().in_use());
608 assert_eq!(1, endpoint.capabilities().len());
609 }
610
611 #[test]
612 fn codec_type() {
613 let s = StreamEndpoint::new(
614 REMOTE_ID_VAL,
615 MediaType::Audio,
616 EndpointType::Sink,
617 vec![
618 ServiceCapability::MediaTransport,
619 ServiceCapability::MediaCodec {
620 media_type: MediaType::Audio,
621 codec_type: MediaCodecType::new(0x40),
622 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
624 ],
625 )
626 .unwrap();
627
628 assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
629
630 let s = StreamEndpoint::new(
631 REMOTE_ID_VAL,
632 MediaType::Audio,
633 EndpointType::Sink,
634 vec![ServiceCapability::MediaTransport],
635 )
636 .unwrap();
637
638 assert_eq!(None, s.codec_type());
639 }
640
641 fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
642 StreamEndpoint::new(
643 REMOTE_ID_VAL,
644 MediaType::Audio,
645 r#type,
646 vec![
647 ServiceCapability::MediaTransport,
648 ServiceCapability::MediaCodec {
649 media_type: MediaType::Audio,
650 codec_type: MediaCodecType::new(0x40),
651 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
653 ],
654 )
655 .unwrap()
656 }
657
658 #[test]
659 fn stream_configure_reconfigure() {
660 let _exec = fasync::TestExecutor::new();
661 let mut s = test_endpoint(EndpointType::Sink);
662
663 assert_matches!(
665 s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
666 Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
667 );
668
669 assert_matches!(
670 s.configure(
671 &REMOTE_ID,
672 vec![
673 ServiceCapability::MediaTransport,
674 ServiceCapability::MediaCodec {
675 media_type: MediaType::Audio,
676 codec_type: MediaCodecType::new(0x40),
677 codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
679 }
680 ]
681 ),
682 Ok(())
683 );
684
685 let _channel = establish_stream(&mut s);
690
691 assert_matches!(
692 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
693 Err((_, ErrorCode::BadState))
694 );
695
696 let reconfiguration = vec![ServiceCapability::MediaCodec {
697 media_type: MediaType::Audio,
698 codec_type: MediaCodecType::new(0x40),
699 codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
701 }];
702
703 let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
706
707 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
709
710 assert_eq!(Some(&new_configuration), s.get_configuration());
711
712 assert_matches!(
714 s.reconfigure(vec![ServiceCapability::MediaTransport]),
715 Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
716 );
717
718 assert_matches!(s.start(), Ok(()));
720
721 assert_matches!(
722 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
723 Err((_, ErrorCode::BadState))
724 );
725
726 assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
727
728 assert_matches!(s.suspend(), Ok(()));
729
730 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
732
733 assert_matches!(
735 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
736 Err((_, ErrorCode::BadState))
737 );
738 }
739
740 #[test]
741 fn stream_establishment() {
742 let mut exec = fasync::TestExecutor::new();
743 let mut s = test_endpoint(EndpointType::Sink);
744
745 let (mut remote, transport) = Channel::create();
746
747 assert_matches!(s.establish(), Err(ErrorCode::BadState));
749
750 assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
752
753 let mut read_fut = remote.next();
754 let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
755 assert_matches!(res, None);
757
758 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
759
760 assert_matches!(s.establish(), Ok(()));
761
762 let (_remote, transport) = Channel::create();
764 assert_matches!(s.receive_channel(transport), Ok(false));
765 }
766
767 fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
768 let (peer, mut signaling) = setup_peer();
769 exec.run_until_stalled(&mut signaling.send(vec![0x40, 0x08, 0x04]))
771 .expect("signaling write")
772 .expect("write successful");
773 let mut req_stream = peer.take_request_stream();
774 let mut req_fut = req_stream.next();
775 let complete = exec.run_until_stalled(&mut req_fut);
776 let responder = match complete {
777 Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
778 _ => panic!("Expected a close request"),
779 };
780 (peer, signaling, responder)
781 }
782
783 #[test]
784 fn stream_release_without_abort() {
785 let mut exec = fasync::TestExecutor::new();
786 let mut s = test_endpoint(EndpointType::Sink);
787
788 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
789
790 let remote_transport = establish_stream(&mut s);
791
792 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
793
794 s.release(responder, &peer).unwrap();
796 expect_remote_recv(&[0x42, 0x08], &mut signaling);
798
799 drop(remote_transport);
801
802 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
804 assert_eq!(s.state(), StreamState::Idle);
805 }
806
807 #[test]
808 fn test_mediastream() {
809 let mut exec = fasync::TestExecutor::new();
810 let mut s = test_endpoint(EndpointType::Sink);
811
812 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
813
814 assert!(s.take_transport().is_none());
816
817 let mut remote_transport = establish_stream(&mut s);
818
819 let temp_stream = s.take_transport();
821 assert!(temp_stream.is_some());
822
823 assert!(s.take_transport().is_none());
825
826 drop(temp_stream);
828
829 let media_stream = s.take_transport();
830 assert!(media_stream.is_some());
831 let mut media_stream = media_stream.unwrap();
832
833 assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
835
836 let hearts = vec![0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
838 let mut write_fut = media_stream.send(hearts.clone());
839
840 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(())));
841
842 expect_remote_recv(&hearts, &mut remote_transport);
843
844 let mut close_fut = media_stream.close();
846 assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
847 drop(s);
851
852 let mut read_fut = remote_transport.next();
854 let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
855 assert_matches!(res, None);
857
858 let mut write_fut = media_stream.send(vec![0xDE, 0xAD]);
860 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
861
862 let mut next_fut = media_stream.next();
864 assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
865
866 assert!(media_stream.is_terminated(), "should be terminated");
867
868 assert_matches!(media_stream.max_tx_size(), Err(_));
870 }
871
872 #[test]
873 fn stream_release_with_abort() {
874 let mut exec = fasync::TestExecutor::new();
875 let mut s = test_endpoint(EndpointType::Sink);
876
877 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
878 let remote_transport = establish_stream(&mut s);
879 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
880
881 s.release(responder, &peer).unwrap();
883 expect_remote_recv(&[0x42, 0x08], &mut signaling);
885
886 let next = std::pin::pin!(signaling.next());
888 let received =
889 exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
890 assert_eq!(0x0A, received[1]);
891 let txlabel = received[0] & 0xF0;
892 exec.run_until_stalled(&mut signaling.send(vec![txlabel | 0x02, 0x0A]))
894 .expect("signaling write")
895 .expect("write successful");
896
897 let _ = exec.run_singlethreaded(&mut std::pin::pin!(remote_transport.closed()));
898
899 while s.state() != StreamState::Idle {
901 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
902 }
903 }
904
905 #[test]
906 fn start_and_suspend() {
907 let mut exec = fasync::TestExecutor::new();
908 let mut s = test_endpoint(EndpointType::Sink);
909
910 assert_matches!(s.start(), Err(ErrorCode::BadState));
912 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
913
914 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
915
916 assert_matches!(s.start(), Err(ErrorCode::BadState));
917 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
918
919 assert_matches!(s.establish(), Ok(()));
920
921 assert_matches!(s.start(), Err(ErrorCode::BadState));
922 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
923
924 let (remote, local) = zx::Socket::create_datagram();
925 let (client_end, mut direction_request_stream) =
926 create_request_stream::<bredr::AudioDirectionExtMarker>();
927 let ext = bredr::Channel {
928 socket: Some(local),
929 channel_mode: Some(fidl_bt::ChannelMode::Basic),
930 max_tx_sdu_size: Some(1004),
931 ext_direction: Some(client_end),
932 ..Default::default()
933 };
934 let transport = Channel::try_from(ext).unwrap();
935 assert_matches!(s.receive_channel(transport), Ok(false));
936
937 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
939 assert_matches!(s.start(), Ok(()));
940
941 match exec.run_until_stalled(&mut direction_request_stream.next()) {
942 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
943 priority,
944 responder,
945 }))) => {
946 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
947 responder.send(Ok(())).expect("response to send cleanly");
948 }
949 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
950 };
951
952 assert_matches!(s.start(), Err(ErrorCode::BadState));
954 assert_matches!(s.suspend(), Ok(()));
955
956 match exec.run_until_stalled(&mut direction_request_stream.next()) {
957 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
958 priority,
959 responder,
960 }))) => {
961 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
962 responder.send(Ok(())).expect("response to send cleanly");
963 }
964 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
965 };
966
967 assert_matches!(s.start(), Ok(()));
969 assert_matches!(s.suspend(), Ok(()));
970
971 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
973
974 {
975 s.release(responder, &peer).unwrap();
976 expect_remote_recv(&[0x42, 0x08], &mut signaling);
978 drop(remote);
980 while s.state() != StreamState::Idle {
981 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
982 }
983 }
984
985 assert_matches!(s.start(), Err(ErrorCode::BadState));
987 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
988 }
989
990 fn receive_l2cap_params_channel(
991 s: &mut StreamEndpoint,
992 ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
993 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
994 assert_matches!(s.establish(), Ok(()));
995
996 let (remote, local) = zx::Socket::create_datagram();
997 let (client_end, l2cap_params_requests) =
998 create_request_stream::<bredr::L2capParametersExtMarker>();
999 let ext = bredr::Channel {
1000 socket: Some(local),
1001 channel_mode: Some(fidl_bt::ChannelMode::Basic),
1002 max_tx_sdu_size: Some(1004),
1003 ext_l2cap: Some(client_end),
1004 ..Default::default()
1005 };
1006 let transport = Channel::try_from(ext).unwrap();
1007 assert_matches!(s.receive_channel(transport), Ok(false));
1008 (remote, l2cap_params_requests)
1009 }
1010
1011 #[test]
1012 fn sets_flush_timeout_for_source_transports() {
1013 let mut exec = fasync::TestExecutor::new();
1014 let mut s = test_endpoint(EndpointType::Source);
1015 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1016
1017 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1019 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1020 request,
1021 responder,
1022 }))) => {
1023 assert_eq!(
1024 Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1025 request.flush_timeout
1026 );
1027 responder.send(&request).expect("response to send cleanly");
1028 }
1029 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1030 };
1031 }
1032
1033 #[test]
1034 fn no_flush_timeout_for_sink_transports() {
1035 let mut exec = fasync::TestExecutor::new();
1036 let mut s = test_endpoint(EndpointType::Sink);
1037 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1038
1039 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1041 Poll::Pending => {}
1042 x => panic!("Expected no request to set flush timeout, got {:?}", x),
1043 };
1044 }
1045
1046 #[test]
1047 fn get_configuration() {
1048 let mut s = test_endpoint(EndpointType::Sink);
1049
1050 assert!(s.get_configuration().is_none());
1052
1053 let config = vec![
1054 ServiceCapability::MediaTransport,
1055 ServiceCapability::MediaCodec {
1056 media_type: MediaType::Audio,
1057 codec_type: MediaCodecType::new(0),
1058 codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1060 },
1061 ];
1062
1063 assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1064
1065 match s.get_configuration() {
1066 Some(c) => assert_eq!(&config, c),
1067 x => panic!("Expected Ok from get_configuration but got {:?}", x),
1068 };
1069
1070 s.abort();
1072
1073 assert!(s.get_configuration().is_none());
1074 }
1075
1076 use std::sync::atomic::{AtomicUsize, Ordering};
1077
1078 fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1080 let call_count = Arc::new(AtomicUsize::new(0));
1081 let call_count_reader = call_count.clone();
1082 let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1083 let _ = call_count.fetch_add(1, Ordering::SeqCst);
1084 });
1085 (Some(count_cb), call_count_reader)
1086 }
1087
1088 #[test]
1095 fn update_callback() {
1096 let _exec = fasync::TestExecutor::new();
1098 let mut s = test_endpoint(EndpointType::Sink);
1099 let (cb, call_count) = call_count_callback();
1100 s.set_update_callback(cb);
1101
1102 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1103 .expect("Configure to succeed in test");
1104 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1105 call_count.store(0, Ordering::SeqCst); s.establish().expect("Establish to succeed in test");
1108 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1109 call_count.store(0, Ordering::SeqCst); let (_, transport) = Channel::create();
1112 assert_eq!(
1113 s.receive_channel(transport).expect("Receive channel to succeed in test"),
1114 false
1115 );
1116 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1117 call_count.store(0, Ordering::SeqCst); s.start().expect("Start to succeed in test");
1120 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1121 call_count.store(0, Ordering::SeqCst); s.suspend().expect("Suspend to succeed in test");
1124 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1125 call_count.store(0, Ordering::SeqCst); s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1128 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1129 call_count.store(0, Ordering::SeqCst); s.abort();
1133 }
1134}