1use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
6use fuchsia_async::{DurationExt, Task, TimeoutExt};
7use fuchsia_bluetooth::types::{A2dpDirection, Channel};
8use fuchsia_sync::Mutex;
9use futures::stream::{FusedStream, Stream};
10use futures::{FutureExt, io};
11use log::warn;
12use std::fmt;
13use std::pin::Pin;
14use std::sync::{Arc, RwLock, Weak};
15use std::task::{Context, Poll};
16use zx::{MonotonicDuration, Status};
17
18use crate::types::{
19 EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
20 ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
21};
22use crate::{Peer, SimpleResponder};
23
24pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
25
26#[derive(PartialEq, Debug, Default, Clone, Copy)]
28pub enum StreamState {
29 #[default]
30 Idle,
31 Configured,
32 Opening,
34 Open,
35 Streaming,
36 Closing,
37 Aborting,
38}
39
40pub struct StreamEndpoint {
45 id: StreamEndpointId,
47 endpoint_type: EndpointType,
49 media_type: MediaType,
51 state: Arc<Mutex<StreamState>>,
53 transport: Option<Arc<RwLock<Channel>>>,
56 stream_held: Arc<Mutex<bool>>,
59 capabilities: Vec<ServiceCapability>,
61 remote_id: Option<StreamEndpointId>,
63 configuration: Vec<ServiceCapability>,
65 update_callback: Option<StreamEndpointUpdateCallback>,
67 in_progress: Option<Task<()>>,
70}
71
72impl fmt::Debug for StreamEndpoint {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("StreamEndpoint")
75 .field("id", &self.id.0)
76 .field("endpoint_type", &self.endpoint_type)
77 .field("media_type", &self.media_type)
78 .field("state", &self.state)
79 .field("capabilities", &self.capabilities)
80 .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
81 .field("configuration", &self.configuration)
82 .finish()
83 }
84}
85
86impl StreamEndpoint {
87 pub fn new(
91 id: u8,
92 media_type: MediaType,
93 endpoint_type: EndpointType,
94 capabilities: Vec<ServiceCapability>,
95 ) -> AvdtpResult<StreamEndpoint> {
96 let seid = StreamEndpointId::try_from(id)?;
97 Ok(StreamEndpoint {
98 id: seid,
99 capabilities,
100 media_type,
101 endpoint_type,
102 state: Default::default(),
103 transport: None,
104 stream_held: Arc::new(Mutex::new(false)),
105 remote_id: None,
106 configuration: vec![],
107 update_callback: None,
108 in_progress: None,
109 })
110 }
111
112 pub fn as_new(&self) -> Self {
113 StreamEndpoint::new(
114 self.id.0,
115 self.media_type.clone(),
116 self.endpoint_type.clone(),
117 self.capabilities.clone(),
118 )
119 .expect("as_new")
120 }
121
122 fn set_state(&mut self, state: StreamState) {
124 *self.state.lock() = state;
125 self.update_callback();
126 }
127
128 pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
131 self.update_callback = callback;
132 }
133
134 fn update_callback(&self) {
135 if let Some(cb) = self.update_callback.as_ref() {
136 cb(self);
137 }
138 }
139
140 pub fn from_info(
144 info: &StreamInformation,
145 capabilities: Vec<ServiceCapability>,
146 ) -> StreamEndpoint {
147 StreamEndpoint {
148 id: info.id().clone(),
149 capabilities,
150 media_type: info.media_type().clone(),
151 endpoint_type: info.endpoint_type().clone(),
152 state: Default::default(),
153 transport: None,
154 stream_held: Arc::new(Mutex::new(false)),
155 remote_id: None,
156 configuration: vec![],
157 update_callback: None,
158 in_progress: None,
159 }
160 }
161
162 fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
165 (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
166 }
167
168 pub fn configure(
172 &mut self,
173 remote_id: &StreamEndpointId,
174 capabilities: Vec<ServiceCapability>,
175 ) -> Result<(), (ServiceCategory, ErrorCode)> {
176 self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
177 self.remote_id = Some(remote_id.clone());
178 for cap in &capabilities {
179 if !self
180 .capabilities
181 .iter()
182 .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
183 {
184 return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
185 }
186 }
187 self.configuration = capabilities;
188 self.set_state(StreamState::Configured);
189 Ok(())
190 }
191
192 pub fn reconfigure(
197 &mut self,
198 mut capabilities: Vec<ServiceCapability>,
199 ) -> Result<(), (ServiceCategory, ErrorCode)> {
200 self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
201 if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
203 return Err((cap.category(), ErrorCode::InvalidCapabilities));
204 }
205 let to_replace: std::vec::Vec<_> =
207 capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
208 self.configuration.retain(|x| {
209 let disc = std::mem::discriminant(x);
210 !to_replace.contains(&disc)
211 });
212 self.configuration.append(&mut capabilities);
213 self.update_callback();
214 Ok(())
215 }
216
217 pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
221 if self.configuration.is_empty() {
222 return None;
223 }
224 Some(&self.configuration)
225 }
226
227 const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
230
231 pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
238 if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
239 return Err(Error::InvalidState);
240 }
241 self.transport = Some(Arc::new(RwLock::new(c)));
242 self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
243 self.stream_held = Arc::new(Mutex::new(false));
244 self.set_state(StreamState::Open);
246 Ok(false)
247 }
248
249 pub fn establish(&mut self) -> Result<(), ErrorCode> {
252 if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
253 return Err(ErrorCode::BadState);
254 }
255 self.set_state(StreamState::Opening);
256 Ok(())
257 }
258
259 pub fn try_priority(&self, active: bool) {
263 let priority = match (active, &self.endpoint_type) {
264 (false, _) => A2dpDirection::Normal,
265 (true, EndpointType::Source) => A2dpDirection::Source,
266 (true, EndpointType::Sink) => A2dpDirection::Sink,
267 };
268 let fut = match self.transport.as_ref().unwrap().try_read() {
269 Err(_) => return,
270 Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
271 };
272 Task::spawn(fut).detach();
274 }
275
276 pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
278 if self.endpoint_type != EndpointType::Source {
279 return;
280 }
281 let fut = match self.transport.as_ref().unwrap().try_write() {
282 Err(_) => return,
283 Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
284 };
285 Task::spawn(fut).detach();
287 }
288
289 pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
293 {
294 let lock = self.state.lock();
295 if *lock != StreamState::Open && *lock != StreamState::Streaming {
296 return responder.reject(ErrorCode::BadState);
297 }
298 }
299 self.set_state(StreamState::Closing);
300 responder.send()?;
301 let release_wait_fut = {
302 let seid = self.remote_id.take().unwrap();
305 let transport = self.transport.take().unwrap();
306 let peer = peer.clone();
307 let state = self.state.clone();
308 async move {
309 let Ok(transport) = transport.try_read() else {
310 warn!("unable to lock transport channel, dropping and assuming closed");
311 *state.lock() = StreamState::Idle;
312 return;
313 };
314 let closed_fut = transport
315 .closed()
316 .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
317 Err(Status::TIMED_OUT)
318 });
319 if let Err(Status::TIMED_OUT) = closed_fut.await {
320 let _ = peer.abort(&seid).await;
321 *state.lock() = StreamState::Aborting;
322 drop(transport);
324 }
325 *state.lock() = StreamState::Idle;
326 }
327 };
328 self.in_progress = Some(Task::local(release_wait_fut));
329 self.configuration.clear();
332 self.update_callback();
333 Ok(())
334 }
335
336 pub fn state(&self) -> StreamState {
338 *self.state.lock()
339 }
340
341 pub fn start(&mut self) -> Result<(), ErrorCode> {
344 self.state_is(StreamState::Open)?;
345 self.try_priority(true);
346 self.set_state(StreamState::Streaming);
347 Ok(())
348 }
349
350 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
353 self.state_is(StreamState::Streaming)?;
354 self.set_state(StreamState::Open);
355 self.try_priority(false);
356 Ok(())
357 }
358
359 pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
363 if let Some(seid) = self.remote_id.take() {
364 let _ = peer.abort(&seid).await;
365 self.set_state(StreamState::Aborting);
366 }
367 self.abort()
368 }
369
370 pub fn abort(&mut self) {
373 self.set_state(StreamState::Aborting);
374 self.configuration.clear();
375 self.remote_id = None;
376 self.transport = None;
377 self.set_state(StreamState::Idle);
378 }
379
380 pub fn capabilities(&self) -> &Vec<ServiceCapability> {
384 &self.capabilities
385 }
386
387 pub fn codec_type(&self) -> Option<&MediaCodecType> {
391 self.capabilities.iter().find_map(|cap| match cap {
392 ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
393 _ => None,
394 })
395 }
396
397 pub fn local_id(&self) -> &StreamEndpointId {
399 &self.id
400 }
401
402 pub fn remote_id(&self) -> Option<&StreamEndpointId> {
404 self.remote_id.as_ref()
405 }
406
407 pub fn endpoint_type(&self) -> &EndpointType {
409 &self.endpoint_type
410 }
411
412 pub fn information(&self) -> StreamInformation {
414 let in_use = self.state_is(StreamState::Idle).is_err();
415 StreamInformation::new(
416 self.id.clone(),
417 in_use,
418 self.media_type.clone(),
419 self.endpoint_type.clone(),
420 )
421 }
422
423 pub fn take_transport(&mut self) -> Option<MediaStream> {
427 let mut stream_held = self.stream_held.lock();
428 if *stream_held || self.transport.is_none() {
429 return None;
430 }
431
432 *stream_held = true;
433
434 Some(MediaStream::new(
435 self.stream_held.clone(),
436 Arc::downgrade(self.transport.as_ref().unwrap()),
437 ))
438 }
439
440 pub fn audio_offload(&self) -> Option<AudioOffloadExtProxy> {
442 self.transport.as_ref().and_then(|c| c.read().unwrap().audio_offload())
443 }
444}
445
446pub struct MediaStream {
450 in_use: Arc<Mutex<bool>>,
451 channel: Weak<RwLock<Channel>>,
452 terminated: bool,
453}
454
455impl MediaStream {
456 pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
457 Self { in_use, channel, terminated: false }
458 }
459
460 fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
461 self.channel
462 .upgrade()
463 .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
464 }
465
466 pub fn max_tx_size(&self) -> Result<usize, io::Error> {
467 match self.try_upgrade()?.try_read() {
468 Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
469 Ok(lock) => Ok(lock.max_tx_size()),
470 }
471 }
472}
473
474impl Drop for MediaStream {
475 fn drop(&mut self) {
476 let mut l = self.in_use.lock();
477 *l = false;
478 }
479}
480
481impl Stream for MediaStream {
482 type Item = AvdtpResult<Vec<u8>>;
483
484 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485 let Ok(arc_chan) = self.try_upgrade() else {
486 self.terminated = true;
487 return Poll::Ready(None);
488 };
489 let Ok(lock) = arc_chan.try_write() else {
490 self.terminated = true;
491 return Poll::Ready(None);
492 };
493 let mut pin_chan = Pin::new(lock);
494 match pin_chan.as_mut().poll_next(cx) {
495 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
496 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
497 Poll::Ready(None) => {
498 self.terminated = true;
499 Poll::Ready(None)
500 }
501 Poll::Pending => Poll::Pending,
502 }
503 }
504}
505
506impl FusedStream for MediaStream {
507 fn is_terminated(&self) -> bool {
508 self.terminated
509 }
510}
511
512impl io::AsyncWrite for MediaStream {
513 fn poll_write(
514 self: Pin<&mut Self>,
515 cx: &mut Context<'_>,
516 buf: &[u8],
517 ) -> Poll<Result<usize, io::Error>> {
518 let arc_chan = match self.try_upgrade() {
519 Err(e) => return Poll::Ready(Err(e)),
520 Ok(c) => c,
521 };
522 let lock = match arc_chan.try_write() {
523 Err(_) => {
524 return Poll::Ready(Err(io::Error::new(
525 io::ErrorKind::WouldBlock,
526 "couldn't lock",
527 )));
528 }
529 Ok(lock) => lock,
530 };
531 let mut pin_chan = Pin::new(lock);
532 pin_chan.as_mut().poll_write(cx, buf)
533 }
534
535 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
536 let arc_chan = match self.try_upgrade() {
537 Err(e) => return Poll::Ready(Err(e)),
538 Ok(c) => c,
539 };
540 let lock = match arc_chan.try_write() {
541 Err(_) => {
542 return Poll::Ready(Err(io::Error::new(
543 io::ErrorKind::WouldBlock,
544 "couldn't lock",
545 )));
546 }
547 Ok(lock) => lock,
548 };
549 let mut pin_chan = Pin::new(lock);
550 pin_chan.as_mut().poll_flush(cx)
551 }
552
553 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
554 let arc_chan = match self.try_upgrade() {
555 Err(e) => return Poll::Ready(Err(e)),
556 Ok(c) => c,
557 };
558 let lock = match arc_chan.try_write() {
559 Err(_) => {
560 return Poll::Ready(Err(io::Error::new(
561 io::ErrorKind::WouldBlock,
562 "couldn't lock",
563 )));
564 }
565 Ok(lock) => lock,
566 };
567 let mut pin_chan = Pin::new(lock);
568 pin_chan.as_mut().poll_close(cx)
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use crate::Request;
576 use crate::tests::{expect_remote_recv, setup_peer};
577
578 use assert_matches::assert_matches;
579 use fidl::endpoints::create_request_stream;
580 use futures::io::AsyncWriteExt;
581 use futures::stream::StreamExt;
582 use {
583 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
584 fuchsia_async as fasync,
585 };
586
587 const REMOTE_ID_VAL: u8 = 1;
588 const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
589
590 #[test]
591 fn make() {
592 let s = StreamEndpoint::new(
593 REMOTE_ID_VAL,
594 MediaType::Audio,
595 EndpointType::Sink,
596 vec![ServiceCapability::MediaTransport],
597 );
598 assert!(s.is_ok());
599 let s = s.unwrap();
600 assert_eq!(&StreamEndpointId(1), s.local_id());
601
602 let info = s.information();
603 assert!(!info.in_use());
604
605 let no = StreamEndpoint::new(
606 0,
607 MediaType::Audio,
608 EndpointType::Sink,
609 vec![ServiceCapability::MediaTransport],
610 );
611 assert!(no.is_err());
612 }
613
614 fn establish_stream(s: &mut StreamEndpoint) -> Channel {
615 assert_matches!(s.establish(), Ok(()));
616 let (chan, remote) = Channel::create();
617 assert_matches!(s.receive_channel(chan), Ok(false));
618 remote
619 }
620
621 #[test]
622 fn from_info() {
623 let seid = StreamEndpointId::try_from(5).unwrap();
624 let info =
625 StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
626 let capabilities = vec![ServiceCapability::MediaTransport];
627
628 let endpoint = StreamEndpoint::from_info(&info, capabilities);
629
630 assert_eq!(&seid, endpoint.local_id());
631 assert_eq!(&false, endpoint.information().in_use());
632 assert_eq!(1, endpoint.capabilities().len());
633 }
634
635 #[test]
636 fn codec_type() {
637 let s = StreamEndpoint::new(
638 REMOTE_ID_VAL,
639 MediaType::Audio,
640 EndpointType::Sink,
641 vec![
642 ServiceCapability::MediaTransport,
643 ServiceCapability::MediaCodec {
644 media_type: MediaType::Audio,
645 codec_type: MediaCodecType::new(0x40),
646 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
648 ],
649 )
650 .unwrap();
651
652 assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
653
654 let s = StreamEndpoint::new(
655 REMOTE_ID_VAL,
656 MediaType::Audio,
657 EndpointType::Sink,
658 vec![ServiceCapability::MediaTransport],
659 )
660 .unwrap();
661
662 assert_eq!(None, s.codec_type());
663 }
664
665 fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
666 StreamEndpoint::new(
667 REMOTE_ID_VAL,
668 MediaType::Audio,
669 r#type,
670 vec![
671 ServiceCapability::MediaTransport,
672 ServiceCapability::MediaCodec {
673 media_type: MediaType::Audio,
674 codec_type: MediaCodecType::new(0x40),
675 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
677 ],
678 )
679 .unwrap()
680 }
681
682 #[test]
683 fn stream_configure_reconfigure() {
684 let _exec = fasync::TestExecutor::new();
685 let mut s = test_endpoint(EndpointType::Sink);
686
687 assert_matches!(
689 s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
690 Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
691 );
692
693 assert_matches!(
694 s.configure(
695 &REMOTE_ID,
696 vec![
697 ServiceCapability::MediaTransport,
698 ServiceCapability::MediaCodec {
699 media_type: MediaType::Audio,
700 codec_type: MediaCodecType::new(0x40),
701 codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
703 }
704 ]
705 ),
706 Ok(())
707 );
708
709 let _channel = establish_stream(&mut s);
714
715 assert_matches!(
716 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
717 Err((_, ErrorCode::BadState))
718 );
719
720 let reconfiguration = vec![ServiceCapability::MediaCodec {
721 media_type: MediaType::Audio,
722 codec_type: MediaCodecType::new(0x40),
723 codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
725 }];
726
727 let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
730
731 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
733
734 assert_eq!(Some(&new_configuration), s.get_configuration());
735
736 assert_matches!(
738 s.reconfigure(vec![ServiceCapability::MediaTransport]),
739 Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
740 );
741
742 assert_matches!(s.start(), Ok(()));
744
745 assert_matches!(
746 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
747 Err((_, ErrorCode::BadState))
748 );
749
750 assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
751
752 assert_matches!(s.suspend(), Ok(()));
753
754 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
756
757 assert_matches!(
759 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
760 Err((_, ErrorCode::BadState))
761 );
762 }
763
764 #[test]
765 fn stream_establishment() {
766 let _exec = fasync::TestExecutor::new();
767 let mut s = test_endpoint(EndpointType::Sink);
768
769 let (remote, transport) = Channel::create();
770
771 assert_matches!(s.establish(), Err(ErrorCode::BadState));
773
774 assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
776
777 let buf: &mut [u8] = &mut [0; 1];
778
779 assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
780
781 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
782
783 assert_matches!(s.establish(), Ok(()));
784
785 let (_remote, transport) = Channel::create();
787 assert_matches!(s.receive_channel(transport), Ok(false));
788 }
789
790 fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
791 let (peer, signaling) = setup_peer();
792 let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
794 let mut req_stream = peer.take_request_stream();
795 let mut req_fut = req_stream.next();
796 let complete = exec.run_until_stalled(&mut req_fut);
797 let responder = match complete {
798 Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
799 _ => panic!("Expected a close request"),
800 };
801 (peer, signaling, responder)
802 }
803
804 #[test]
805 fn stream_release_without_abort() {
806 let mut exec = fasync::TestExecutor::new();
807 let mut s = test_endpoint(EndpointType::Sink);
808
809 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
810
811 let remote_transport = establish_stream(&mut s);
812
813 let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
814
815 s.release(responder, &peer).unwrap();
817 expect_remote_recv(&[0x42, 0x08], &signaling);
819
820 drop(remote_transport);
822
823 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
825 assert_eq!(s.state(), StreamState::Idle);
826 }
827
828 #[test]
829 fn test_mediastream() {
830 let mut exec = fasync::TestExecutor::new();
831 let mut s = test_endpoint(EndpointType::Sink);
832
833 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
834
835 assert!(s.take_transport().is_none());
837
838 let remote_transport = establish_stream(&mut s);
839
840 let temp_stream = s.take_transport();
842 assert!(temp_stream.is_some());
843
844 assert!(s.take_transport().is_none());
846
847 drop(temp_stream);
849
850 let media_stream = s.take_transport();
851 assert!(media_stream.is_some());
852 let mut media_stream = media_stream.unwrap();
853
854 assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
856
857 let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
859 let mut write_fut = media_stream.write(hearts);
860
861 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
862
863 expect_remote_recv(hearts, &remote_transport);
864
865 let mut close_fut = media_stream.close();
867 assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
868 drop(s);
872
873 let mut result = vec![0];
875 assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
876
877 let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
879 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
880
881 let mut next_fut = media_stream.next();
883 assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
884
885 assert!(media_stream.is_terminated(), "should be terminated");
886
887 assert_matches!(media_stream.max_tx_size(), Err(_));
889 }
890
891 #[test]
892 fn stream_release_with_abort() {
893 let mut exec = fasync::TestExecutor::new();
894 let mut s = test_endpoint(EndpointType::Sink);
895
896 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
897 let remote_transport = establish_stream(&mut s);
898 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
899
900 s.release(responder, &peer).unwrap();
902 expect_remote_recv(&[0x42, 0x08], &signaling);
904
905 let next = std::pin::pin!(signaling.next());
907 let received =
908 exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
909 assert_eq!(0x0A, received[1]);
910 let txlabel = received[0] & 0xF0;
911 assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
913
914 let _ = exec.run_singlethreaded(&mut remote_transport.closed());
915
916 while s.state() != StreamState::Idle {
918 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
919 }
920 }
921
922 #[test]
923 fn start_and_suspend() {
924 let mut exec = fasync::TestExecutor::new();
925 let mut s = test_endpoint(EndpointType::Sink);
926
927 assert_matches!(s.start(), Err(ErrorCode::BadState));
929 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
930
931 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
932
933 assert_matches!(s.start(), Err(ErrorCode::BadState));
934 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
935
936 assert_matches!(s.establish(), Ok(()));
937
938 assert_matches!(s.start(), Err(ErrorCode::BadState));
939 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
940
941 let (remote, local) = zx::Socket::create_datagram();
942 let (client_end, mut direction_request_stream) =
943 create_request_stream::<bredr::AudioDirectionExtMarker>();
944 let ext = bredr::Channel {
945 socket: Some(local),
946 channel_mode: Some(fidl_bt::ChannelMode::Basic),
947 max_tx_sdu_size: Some(1004),
948 ext_direction: Some(client_end),
949 ..Default::default()
950 };
951 let transport = Channel::try_from(ext).unwrap();
952 assert_matches!(s.receive_channel(transport), Ok(false));
953
954 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
956 assert_matches!(s.start(), Ok(()));
957
958 match exec.run_until_stalled(&mut direction_request_stream.next()) {
959 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
960 priority,
961 responder,
962 }))) => {
963 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
964 responder.send(Ok(())).expect("response to send cleanly");
965 }
966 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
967 };
968
969 assert_matches!(s.start(), Err(ErrorCode::BadState));
971 assert_matches!(s.suspend(), Ok(()));
972
973 match exec.run_until_stalled(&mut direction_request_stream.next()) {
974 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
975 priority,
976 responder,
977 }))) => {
978 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
979 responder.send(Ok(())).expect("response to send cleanly");
980 }
981 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
982 };
983
984 assert_matches!(s.start(), Ok(()));
986 assert_matches!(s.suspend(), Ok(()));
987
988 let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
990
991 {
992 s.release(responder, &peer).unwrap();
993 expect_remote_recv(&[0x42, 0x08], &signaling);
995 drop(remote);
997 while s.state() != StreamState::Idle {
998 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
999 }
1000 }
1001
1002 assert_matches!(s.start(), Err(ErrorCode::BadState));
1004 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
1005 }
1006
1007 fn receive_l2cap_params_channel(
1008 s: &mut StreamEndpoint,
1009 ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
1010 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
1011 assert_matches!(s.establish(), Ok(()));
1012
1013 let (remote, local) = zx::Socket::create_datagram();
1014 let (client_end, l2cap_params_requests) =
1015 create_request_stream::<bredr::L2capParametersExtMarker>();
1016 let ext = bredr::Channel {
1017 socket: Some(local),
1018 channel_mode: Some(fidl_bt::ChannelMode::Basic),
1019 max_tx_sdu_size: Some(1004),
1020 ext_l2cap: Some(client_end),
1021 ..Default::default()
1022 };
1023 let transport = Channel::try_from(ext).unwrap();
1024 assert_matches!(s.receive_channel(transport), Ok(false));
1025 (remote, l2cap_params_requests)
1026 }
1027
1028 #[test]
1029 fn sets_flush_timeout_for_source_transports() {
1030 let mut exec = fasync::TestExecutor::new();
1031 let mut s = test_endpoint(EndpointType::Source);
1032 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1033
1034 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1036 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1037 request,
1038 responder,
1039 }))) => {
1040 assert_eq!(
1041 Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1042 request.flush_timeout
1043 );
1044 responder.send(&request).expect("response to send cleanly");
1045 }
1046 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1047 };
1048 }
1049
1050 #[test]
1051 fn no_flush_timeout_for_sink_transports() {
1052 let mut exec = fasync::TestExecutor::new();
1053 let mut s = test_endpoint(EndpointType::Sink);
1054 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1055
1056 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1058 Poll::Pending => {}
1059 x => panic!("Expected no request to set flush timeout, got {:?}", x),
1060 };
1061 }
1062
1063 #[test]
1064 fn get_configuration() {
1065 let mut s = test_endpoint(EndpointType::Sink);
1066
1067 assert!(s.get_configuration().is_none());
1069
1070 let config = vec![
1071 ServiceCapability::MediaTransport,
1072 ServiceCapability::MediaCodec {
1073 media_type: MediaType::Audio,
1074 codec_type: MediaCodecType::new(0),
1075 codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1077 },
1078 ];
1079
1080 assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1081
1082 match s.get_configuration() {
1083 Some(c) => assert_eq!(&config, c),
1084 x => panic!("Expected Ok from get_configuration but got {:?}", x),
1085 };
1086
1087 s.abort();
1089
1090 assert!(s.get_configuration().is_none());
1091 }
1092
1093 use std::sync::atomic::{AtomicUsize, Ordering};
1094
1095 fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1097 let call_count = Arc::new(AtomicUsize::new(0));
1098 let call_count_reader = call_count.clone();
1099 let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1100 let _ = call_count.fetch_add(1, Ordering::SeqCst);
1101 });
1102 (Some(count_cb), call_count_reader)
1103 }
1104
1105 #[test]
1112 fn update_callback() {
1113 let _exec = fasync::TestExecutor::new();
1115 let mut s = test_endpoint(EndpointType::Sink);
1116 let (cb, call_count) = call_count_callback();
1117 s.set_update_callback(cb);
1118
1119 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1120 .expect("Configure to succeed in test");
1121 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1122 call_count.store(0, Ordering::SeqCst); s.establish().expect("Establish to succeed in test");
1125 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1126 call_count.store(0, Ordering::SeqCst); let (_, transport) = Channel::create();
1129 assert_eq!(
1130 s.receive_channel(transport).expect("Receive channel to succeed in test"),
1131 false
1132 );
1133 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1134 call_count.store(0, Ordering::SeqCst); s.start().expect("Start to succeed in test");
1137 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1138 call_count.store(0, Ordering::SeqCst); s.suspend().expect("Suspend to succeed in test");
1141 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1142 call_count.store(0, Ordering::SeqCst); s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1145 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1146 call_count.store(0, Ordering::SeqCst); s.abort();
1150 }
1151}