1use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
6use fuchsia_async::{DurationExt, Task, TimeoutExt};
7use fuchsia_bluetooth::types::{A2dpDirection, Channel};
8use fuchsia_sync::{Mutex, RwLock};
9use futures::stream::{FusedStream, Stream};
10use futures::{FutureExt, io};
11use log::warn;
12use std::fmt;
13use std::pin::Pin;
14use std::sync::{Arc, Weak};
15use std::task::{Context, Poll};
16use zx::{MonotonicDuration, Status};
17
18use crate::types::{
19 EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
20 ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
21};
22use crate::{Peer, SimpleResponder};
23
24pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
25
26#[derive(PartialEq, Debug, Default, Clone, Copy)]
28pub enum StreamState {
29 #[default]
30 Idle,
31 Configured,
32 Opening,
34 Open,
35 Streaming,
36 Closing,
37 Aborting,
38}
39
40pub struct StreamEndpoint {
45 id: StreamEndpointId,
47 endpoint_type: EndpointType,
49 media_type: MediaType,
51 state: Arc<Mutex<StreamState>>,
53 transport: Option<Arc<RwLock<Channel>>>,
56 stream_held: Arc<Mutex<bool>>,
59 capabilities: Vec<ServiceCapability>,
61 remote_id: Option<StreamEndpointId>,
63 configuration: Vec<ServiceCapability>,
65 update_callback: Option<StreamEndpointUpdateCallback>,
67 in_progress: Option<Task<()>>,
70}
71
72impl fmt::Debug for StreamEndpoint {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("StreamEndpoint")
75 .field("id", &self.id.0)
76 .field("endpoint_type", &self.endpoint_type)
77 .field("media_type", &self.media_type)
78 .field("state", &self.state)
79 .field("capabilities", &self.capabilities)
80 .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
81 .field("configuration", &self.configuration)
82 .finish()
83 }
84}
85
86impl StreamEndpoint {
87 pub fn new(
91 id: u8,
92 media_type: MediaType,
93 endpoint_type: EndpointType,
94 capabilities: Vec<ServiceCapability>,
95 ) -> AvdtpResult<StreamEndpoint> {
96 let seid = StreamEndpointId::try_from(id)?;
97 Ok(StreamEndpoint {
98 id: seid,
99 capabilities,
100 media_type,
101 endpoint_type,
102 state: Default::default(),
103 transport: None,
104 stream_held: Arc::new(Mutex::new(false)),
105 remote_id: None,
106 configuration: vec![],
107 update_callback: None,
108 in_progress: None,
109 })
110 }
111
112 pub fn as_new(&self) -> Self {
113 StreamEndpoint::new(
114 self.id.0,
115 self.media_type.clone(),
116 self.endpoint_type.clone(),
117 self.capabilities.clone(),
118 )
119 .expect("as_new")
120 }
121
122 fn set_state(&mut self, state: StreamState) {
124 *self.state.lock() = state;
125 self.update_callback();
126 }
127
128 pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
131 self.update_callback = callback;
132 }
133
134 fn update_callback(&self) {
135 if let Some(cb) = self.update_callback.as_ref() {
136 cb(self);
137 }
138 }
139
140 pub fn from_info(
144 info: &StreamInformation,
145 capabilities: Vec<ServiceCapability>,
146 ) -> StreamEndpoint {
147 StreamEndpoint {
148 id: info.id().clone(),
149 capabilities,
150 media_type: info.media_type().clone(),
151 endpoint_type: info.endpoint_type().clone(),
152 state: Default::default(),
153 transport: None,
154 stream_held: Arc::new(Mutex::new(false)),
155 remote_id: None,
156 configuration: vec![],
157 update_callback: None,
158 in_progress: None,
159 }
160 }
161
162 fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
165 (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
166 }
167
168 pub fn configure(
172 &mut self,
173 remote_id: &StreamEndpointId,
174 capabilities: Vec<ServiceCapability>,
175 ) -> Result<(), (ServiceCategory, ErrorCode)> {
176 self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
177 self.remote_id = Some(remote_id.clone());
178 for cap in &capabilities {
179 if !self
180 .capabilities
181 .iter()
182 .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
183 {
184 return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
185 }
186 }
187 self.configuration = capabilities;
188 self.set_state(StreamState::Configured);
189 Ok(())
190 }
191
192 pub fn reconfigure(
197 &mut self,
198 mut capabilities: Vec<ServiceCapability>,
199 ) -> Result<(), (ServiceCategory, ErrorCode)> {
200 self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
201 if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
203 return Err((cap.category(), ErrorCode::InvalidCapabilities));
204 }
205 let to_replace: std::vec::Vec<_> =
207 capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
208 self.configuration.retain(|x| {
209 let disc = std::mem::discriminant(x);
210 !to_replace.contains(&disc)
211 });
212 self.configuration.append(&mut capabilities);
213 self.update_callback();
214 Ok(())
215 }
216
217 pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
221 if self.configuration.is_empty() {
222 return None;
223 }
224 Some(&self.configuration)
225 }
226
227 const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
230
231 pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
238 if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
239 return Err(Error::InvalidState);
240 }
241 self.transport = Some(Arc::new(RwLock::new(c)));
242 self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
243 self.stream_held = Arc::new(Mutex::new(false));
244 self.set_state(StreamState::Open);
246 Ok(false)
247 }
248
249 pub fn establish(&mut self) -> Result<(), ErrorCode> {
252 if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
253 return Err(ErrorCode::BadState);
254 }
255 self.set_state(StreamState::Opening);
256 Ok(())
257 }
258
259 pub fn try_priority(&self, active: bool) {
263 let priority = match (active, &self.endpoint_type) {
264 (false, _) => A2dpDirection::Normal,
265 (true, EndpointType::Source) => A2dpDirection::Source,
266 (true, EndpointType::Sink) => A2dpDirection::Sink,
267 };
268 let fut = match self.transport.as_ref().unwrap().try_read() {
269 None => return,
270 Some(channel) => channel.set_audio_priority(priority).map(|_| ()),
271 };
272 Task::spawn(fut).detach();
274 }
275
276 pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
278 if self.endpoint_type != EndpointType::Source {
279 return;
280 }
281 let fut = match self.transport.as_ref().unwrap().try_write() {
282 None => return,
283 Some(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
284 };
285 Task::spawn(fut).detach();
287 }
288
289 pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
293 {
294 let lock = self.state.lock();
295 if *lock != StreamState::Open && *lock != StreamState::Streaming {
296 return responder.reject(ErrorCode::BadState);
297 }
298 }
299 self.set_state(StreamState::Closing);
300 responder.send()?;
301 let release_wait_fut = {
302 let seid = self.remote_id.take().unwrap();
305 let transport = self.transport.take().unwrap();
306 let peer = peer.clone();
307 let state = self.state.clone();
308 async move {
309 let Some(transport) = transport.try_read() else {
310 warn!("unable to lock transport channel, dropping and assuming closed");
311 *state.lock() = StreamState::Idle;
312 return;
313 };
314 let closed_fut = transport
315 .closed()
316 .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
317 Err(Status::TIMED_OUT)
318 });
319 if let Err(Status::TIMED_OUT) = closed_fut.await {
320 let _ = peer.abort(&seid).await;
321 *state.lock() = StreamState::Aborting;
322 drop(transport);
324 }
325 *state.lock() = StreamState::Idle;
326 }
327 };
328 self.in_progress = Some(Task::local(release_wait_fut));
329 self.configuration.clear();
332 self.update_callback();
333 Ok(())
334 }
335
336 pub fn state(&self) -> StreamState {
338 *self.state.lock()
339 }
340
341 pub fn start(&mut self) -> Result<(), ErrorCode> {
344 self.state_is(StreamState::Open)?;
345 self.try_priority(true);
346 self.set_state(StreamState::Streaming);
347 Ok(())
348 }
349
350 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
353 self.state_is(StreamState::Streaming)?;
354 self.set_state(StreamState::Open);
355 self.try_priority(false);
356 Ok(())
357 }
358
359 pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
363 if let Some(seid) = self.remote_id.take() {
364 let _ = peer.abort(&seid).await;
365 self.set_state(StreamState::Aborting);
366 }
367 self.abort()
368 }
369
370 pub fn abort(&mut self) {
373 self.set_state(StreamState::Aborting);
374 self.configuration.clear();
375 self.remote_id = None;
376 self.transport = None;
377 self.set_state(StreamState::Idle);
378 }
379
380 pub fn capabilities(&self) -> &Vec<ServiceCapability> {
384 &self.capabilities
385 }
386
387 pub fn codec_type(&self) -> Option<&MediaCodecType> {
391 self.capabilities.iter().find_map(|cap| match cap {
392 ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
393 _ => None,
394 })
395 }
396
397 pub fn local_id(&self) -> &StreamEndpointId {
399 &self.id
400 }
401
402 pub fn remote_id(&self) -> Option<&StreamEndpointId> {
404 self.remote_id.as_ref()
405 }
406
407 pub fn endpoint_type(&self) -> &EndpointType {
409 &self.endpoint_type
410 }
411
412 pub fn information(&self) -> StreamInformation {
414 let in_use = self.state_is(StreamState::Idle).is_err();
415 StreamInformation::new(
416 self.id.clone(),
417 in_use,
418 self.media_type.clone(),
419 self.endpoint_type.clone(),
420 )
421 }
422
423 pub fn take_transport(&mut self) -> Option<MediaStream> {
427 let mut stream_held = self.stream_held.lock();
428 if *stream_held || self.transport.is_none() {
429 return None;
430 }
431
432 *stream_held = true;
433
434 Some(MediaStream::new(
435 self.stream_held.clone(),
436 Arc::downgrade(self.transport.as_ref().unwrap()),
437 ))
438 }
439
440 pub fn audio_offload(&self) -> Option<AudioOffloadExtProxy> {
442 self.transport.as_ref().and_then(|c| c.read().audio_offload())
443 }
444}
445
446pub struct MediaStream {
450 in_use: Arc<Mutex<bool>>,
451 channel: Weak<RwLock<Channel>>,
452 terminated: bool,
453}
454
455impl MediaStream {
456 pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
457 Self { in_use, channel, terminated: false }
458 }
459
460 fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
461 self.channel
462 .upgrade()
463 .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
464 }
465
466 pub fn max_tx_size(&self) -> Result<usize, io::Error> {
467 match self.try_upgrade()?.try_read() {
468 None => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
469 Some(lock) => Ok(lock.max_tx_size()),
470 }
471 }
472}
473
474impl Drop for MediaStream {
475 fn drop(&mut self) {
476 let mut l = self.in_use.lock();
477 *l = false;
478 }
479}
480
481impl Stream for MediaStream {
482 type Item = AvdtpResult<Vec<u8>>;
483
484 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485 let Ok(arc_chan) = self.try_upgrade() else {
486 self.terminated = true;
487 return Poll::Ready(None);
488 };
489 let Some(lock) = arc_chan.try_write() else {
490 self.terminated = true;
491 return Poll::Ready(None);
492 };
493 let mut pin_chan = Pin::new(lock);
494 match pin_chan.as_mut().poll_next(cx) {
495 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
496 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
497 Poll::Ready(None) => {
498 self.terminated = true;
499 Poll::Ready(None)
500 }
501 Poll::Pending => Poll::Pending,
502 }
503 }
504}
505
506impl FusedStream for MediaStream {
507 fn is_terminated(&self) -> bool {
508 self.terminated
509 }
510}
511
512impl io::AsyncWrite for MediaStream {
513 fn poll_write(
514 self: Pin<&mut Self>,
515 cx: &mut Context<'_>,
516 buf: &[u8],
517 ) -> Poll<Result<usize, io::Error>> {
518 let arc_chan = match self.try_upgrade() {
519 Err(e) => return Poll::Ready(Err(e)),
520 Ok(c) => c,
521 };
522 let lock = match arc_chan.try_write() {
523 None => {
524 return Poll::Ready(Err(io::Error::new(
525 io::ErrorKind::WouldBlock,
526 "couldn't lock",
527 )));
528 }
529 Some(lock) => lock,
530 };
531 let mut pin_chan = Pin::new(lock);
532 pin_chan.as_mut().poll_write(cx, buf)
533 }
534
535 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
536 let arc_chan = match self.try_upgrade() {
537 Err(e) => return Poll::Ready(Err(e)),
538 Ok(c) => c,
539 };
540 let lock = match arc_chan.try_write() {
541 None => {
542 return Poll::Ready(Err(io::Error::new(
543 io::ErrorKind::WouldBlock,
544 "couldn't lock",
545 )));
546 }
547 Some(lock) => lock,
548 };
549 let mut pin_chan = Pin::new(lock);
550 pin_chan.as_mut().poll_flush(cx)
551 }
552
553 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
554 let arc_chan = match self.try_upgrade() {
555 Err(e) => return Poll::Ready(Err(e)),
556 Ok(c) => c,
557 };
558 let lock = match arc_chan.try_write() {
559 None => {
560 return Poll::Ready(Err(io::Error::new(
561 io::ErrorKind::WouldBlock,
562 "couldn't lock",
563 )));
564 }
565 Some(lock) => lock,
566 };
567 let mut pin_chan = Pin::new(lock);
568 pin_chan.as_mut().poll_close(cx)
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use crate::Request;
576 use crate::tests::{expect_remote_recv, setup_peer};
577
578 use assert_matches::assert_matches;
579 use async_utils::PollExt;
580 use fidl::endpoints::create_request_stream;
581 use futures::io::{AsyncReadExt, AsyncWriteExt};
582 use futures::stream::StreamExt;
583 use {
584 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
585 fuchsia_async as fasync,
586 };
587
588 const REMOTE_ID_VAL: u8 = 1;
589 const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
590
591 #[test]
592 fn make() {
593 let s = StreamEndpoint::new(
594 REMOTE_ID_VAL,
595 MediaType::Audio,
596 EndpointType::Sink,
597 vec![ServiceCapability::MediaTransport],
598 );
599 assert!(s.is_ok());
600 let s = s.unwrap();
601 assert_eq!(&StreamEndpointId(1), s.local_id());
602
603 let info = s.information();
604 assert!(!info.in_use());
605
606 let no = StreamEndpoint::new(
607 0,
608 MediaType::Audio,
609 EndpointType::Sink,
610 vec![ServiceCapability::MediaTransport],
611 );
612 assert!(no.is_err());
613 }
614
615 fn establish_stream(s: &mut StreamEndpoint) -> Channel {
616 assert_matches!(s.establish(), Ok(()));
617 let (chan, remote) = Channel::create();
618 assert_matches!(s.receive_channel(chan), Ok(false));
619 remote
620 }
621
622 #[test]
623 fn from_info() {
624 let seid = StreamEndpointId::try_from(5).unwrap();
625 let info =
626 StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
627 let capabilities = vec![ServiceCapability::MediaTransport];
628
629 let endpoint = StreamEndpoint::from_info(&info, capabilities);
630
631 assert_eq!(&seid, endpoint.local_id());
632 assert_eq!(&false, endpoint.information().in_use());
633 assert_eq!(1, endpoint.capabilities().len());
634 }
635
636 #[test]
637 fn codec_type() {
638 let s = StreamEndpoint::new(
639 REMOTE_ID_VAL,
640 MediaType::Audio,
641 EndpointType::Sink,
642 vec![
643 ServiceCapability::MediaTransport,
644 ServiceCapability::MediaCodec {
645 media_type: MediaType::Audio,
646 codec_type: MediaCodecType::new(0x40),
647 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
649 ],
650 )
651 .unwrap();
652
653 assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
654
655 let s = StreamEndpoint::new(
656 REMOTE_ID_VAL,
657 MediaType::Audio,
658 EndpointType::Sink,
659 vec![ServiceCapability::MediaTransport],
660 )
661 .unwrap();
662
663 assert_eq!(None, s.codec_type());
664 }
665
666 fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
667 StreamEndpoint::new(
668 REMOTE_ID_VAL,
669 MediaType::Audio,
670 r#type,
671 vec![
672 ServiceCapability::MediaTransport,
673 ServiceCapability::MediaCodec {
674 media_type: MediaType::Audio,
675 codec_type: MediaCodecType::new(0x40),
676 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
678 ],
679 )
680 .unwrap()
681 }
682
683 #[test]
684 fn stream_configure_reconfigure() {
685 let _exec = fasync::TestExecutor::new();
686 let mut s = test_endpoint(EndpointType::Sink);
687
688 assert_matches!(
690 s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
691 Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
692 );
693
694 assert_matches!(
695 s.configure(
696 &REMOTE_ID,
697 vec![
698 ServiceCapability::MediaTransport,
699 ServiceCapability::MediaCodec {
700 media_type: MediaType::Audio,
701 codec_type: MediaCodecType::new(0x40),
702 codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
704 }
705 ]
706 ),
707 Ok(())
708 );
709
710 let _channel = establish_stream(&mut s);
715
716 assert_matches!(
717 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
718 Err((_, ErrorCode::BadState))
719 );
720
721 let reconfiguration = vec![ServiceCapability::MediaCodec {
722 media_type: MediaType::Audio,
723 codec_type: MediaCodecType::new(0x40),
724 codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
726 }];
727
728 let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
731
732 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
734
735 assert_eq!(Some(&new_configuration), s.get_configuration());
736
737 assert_matches!(
739 s.reconfigure(vec![ServiceCapability::MediaTransport]),
740 Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
741 );
742
743 assert_matches!(s.start(), Ok(()));
745
746 assert_matches!(
747 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
748 Err((_, ErrorCode::BadState))
749 );
750
751 assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
752
753 assert_matches!(s.suspend(), Ok(()));
754
755 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
757
758 assert_matches!(
760 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
761 Err((_, ErrorCode::BadState))
762 );
763 }
764
765 #[test]
766 fn stream_establishment() {
767 let mut exec = fasync::TestExecutor::new();
768 let mut s = test_endpoint(EndpointType::Sink);
769
770 let (mut remote, transport) = Channel::create();
771
772 assert_matches!(s.establish(), Err(ErrorCode::BadState));
774
775 assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
777
778 let buf: &mut [u8] = &mut [0; 1];
779
780 let mut read_fut = remote.read(buf);
781 let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
782 assert_matches!(res, Ok(0));
784
785 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
786
787 assert_matches!(s.establish(), Ok(()));
788
789 let (_remote, transport) = Channel::create();
791 assert_matches!(s.receive_channel(transport), Ok(false));
792 }
793
794 fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
795 let (peer, signaling) = setup_peer();
796 let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
798 let mut req_stream = peer.take_request_stream();
799 let mut req_fut = req_stream.next();
800 let complete = exec.run_until_stalled(&mut req_fut);
801 let responder = match complete {
802 Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
803 _ => panic!("Expected a close request"),
804 };
805 (peer, signaling, responder)
806 }
807
808 #[test]
809 fn stream_release_without_abort() {
810 let mut exec = fasync::TestExecutor::new();
811 let mut s = test_endpoint(EndpointType::Sink);
812
813 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
814
815 let remote_transport = establish_stream(&mut s);
816
817 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
818
819 s.release(responder, &peer).unwrap();
821 expect_remote_recv(&[0x42, 0x08], &mut signaling);
823
824 drop(remote_transport);
826
827 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
829 assert_eq!(s.state(), StreamState::Idle);
830 }
831
832 #[test]
833 fn test_mediastream() {
834 let mut exec = fasync::TestExecutor::new();
835 let mut s = test_endpoint(EndpointType::Sink);
836
837 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
838
839 assert!(s.take_transport().is_none());
841
842 let mut remote_transport = establish_stream(&mut s);
843
844 let temp_stream = s.take_transport();
846 assert!(temp_stream.is_some());
847
848 assert!(s.take_transport().is_none());
850
851 drop(temp_stream);
853
854 let media_stream = s.take_transport();
855 assert!(media_stream.is_some());
856 let mut media_stream = media_stream.unwrap();
857
858 assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
860
861 let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
863 let mut write_fut = media_stream.write(hearts);
864
865 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
866
867 expect_remote_recv(hearts, &mut remote_transport);
868
869 let mut close_fut = media_stream.close();
871 assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
872 drop(s);
876
877 let mut result = vec![0];
879 let mut read_fut = remote_transport.read(&mut result[..]);
880 let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
881 assert_matches!(res, Ok(0));
883
884 let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
886 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
887
888 let mut next_fut = media_stream.next();
890 assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
891
892 assert!(media_stream.is_terminated(), "should be terminated");
893
894 assert_matches!(media_stream.max_tx_size(), Err(_));
896 }
897
898 #[test]
899 fn stream_release_with_abort() {
900 let mut exec = fasync::TestExecutor::new();
901 let mut s = test_endpoint(EndpointType::Sink);
902
903 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
904 let remote_transport = establish_stream(&mut s);
905 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
906
907 s.release(responder, &peer).unwrap();
909 expect_remote_recv(&[0x42, 0x08], &mut signaling);
911
912 let next = std::pin::pin!(signaling.next());
914 let received =
915 exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
916 assert_eq!(0x0A, received[1]);
917 let txlabel = received[0] & 0xF0;
918 assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
920
921 let _ = exec.run_singlethreaded(&mut remote_transport.closed());
922
923 while s.state() != StreamState::Idle {
925 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
926 }
927 }
928
929 #[test]
930 fn start_and_suspend() {
931 let mut exec = fasync::TestExecutor::new();
932 let mut s = test_endpoint(EndpointType::Sink);
933
934 assert_matches!(s.start(), Err(ErrorCode::BadState));
936 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
937
938 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
939
940 assert_matches!(s.start(), Err(ErrorCode::BadState));
941 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
942
943 assert_matches!(s.establish(), Ok(()));
944
945 assert_matches!(s.start(), Err(ErrorCode::BadState));
946 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
947
948 let (remote, local) = zx::Socket::create_datagram();
949 let (client_end, mut direction_request_stream) =
950 create_request_stream::<bredr::AudioDirectionExtMarker>();
951 let ext = bredr::Channel {
952 socket: Some(local),
953 channel_mode: Some(fidl_bt::ChannelMode::Basic),
954 max_tx_sdu_size: Some(1004),
955 ext_direction: Some(client_end),
956 ..Default::default()
957 };
958 let transport = Channel::try_from(ext).unwrap();
959 assert_matches!(s.receive_channel(transport), Ok(false));
960
961 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
963 assert_matches!(s.start(), Ok(()));
964
965 match exec.run_until_stalled(&mut direction_request_stream.next()) {
966 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
967 priority,
968 responder,
969 }))) => {
970 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
971 responder.send(Ok(())).expect("response to send cleanly");
972 }
973 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
974 };
975
976 assert_matches!(s.start(), Err(ErrorCode::BadState));
978 assert_matches!(s.suspend(), Ok(()));
979
980 match exec.run_until_stalled(&mut direction_request_stream.next()) {
981 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
982 priority,
983 responder,
984 }))) => {
985 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
986 responder.send(Ok(())).expect("response to send cleanly");
987 }
988 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
989 };
990
991 assert_matches!(s.start(), Ok(()));
993 assert_matches!(s.suspend(), Ok(()));
994
995 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
997
998 {
999 s.release(responder, &peer).unwrap();
1000 expect_remote_recv(&[0x42, 0x08], &mut signaling);
1002 drop(remote);
1004 while s.state() != StreamState::Idle {
1005 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1006 }
1007 }
1008
1009 assert_matches!(s.start(), Err(ErrorCode::BadState));
1011 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
1012 }
1013
1014 fn receive_l2cap_params_channel(
1015 s: &mut StreamEndpoint,
1016 ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
1017 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
1018 assert_matches!(s.establish(), Ok(()));
1019
1020 let (remote, local) = zx::Socket::create_datagram();
1021 let (client_end, l2cap_params_requests) =
1022 create_request_stream::<bredr::L2capParametersExtMarker>();
1023 let ext = bredr::Channel {
1024 socket: Some(local),
1025 channel_mode: Some(fidl_bt::ChannelMode::Basic),
1026 max_tx_sdu_size: Some(1004),
1027 ext_l2cap: Some(client_end),
1028 ..Default::default()
1029 };
1030 let transport = Channel::try_from(ext).unwrap();
1031 assert_matches!(s.receive_channel(transport), Ok(false));
1032 (remote, l2cap_params_requests)
1033 }
1034
1035 #[test]
1036 fn sets_flush_timeout_for_source_transports() {
1037 let mut exec = fasync::TestExecutor::new();
1038 let mut s = test_endpoint(EndpointType::Source);
1039 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1040
1041 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1043 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1044 request,
1045 responder,
1046 }))) => {
1047 assert_eq!(
1048 Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1049 request.flush_timeout
1050 );
1051 responder.send(&request).expect("response to send cleanly");
1052 }
1053 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1054 };
1055 }
1056
1057 #[test]
1058 fn no_flush_timeout_for_sink_transports() {
1059 let mut exec = fasync::TestExecutor::new();
1060 let mut s = test_endpoint(EndpointType::Sink);
1061 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1062
1063 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1065 Poll::Pending => {}
1066 x => panic!("Expected no request to set flush timeout, got {:?}", x),
1067 };
1068 }
1069
1070 #[test]
1071 fn get_configuration() {
1072 let mut s = test_endpoint(EndpointType::Sink);
1073
1074 assert!(s.get_configuration().is_none());
1076
1077 let config = vec![
1078 ServiceCapability::MediaTransport,
1079 ServiceCapability::MediaCodec {
1080 media_type: MediaType::Audio,
1081 codec_type: MediaCodecType::new(0),
1082 codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1084 },
1085 ];
1086
1087 assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1088
1089 match s.get_configuration() {
1090 Some(c) => assert_eq!(&config, c),
1091 x => panic!("Expected Ok from get_configuration but got {:?}", x),
1092 };
1093
1094 s.abort();
1096
1097 assert!(s.get_configuration().is_none());
1098 }
1099
1100 use std::sync::atomic::{AtomicUsize, Ordering};
1101
1102 fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1104 let call_count = Arc::new(AtomicUsize::new(0));
1105 let call_count_reader = call_count.clone();
1106 let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1107 let _ = call_count.fetch_add(1, Ordering::SeqCst);
1108 });
1109 (Some(count_cb), call_count_reader)
1110 }
1111
1112 #[test]
1119 fn update_callback() {
1120 let _exec = fasync::TestExecutor::new();
1122 let mut s = test_endpoint(EndpointType::Sink);
1123 let (cb, call_count) = call_count_callback();
1124 s.set_update_callback(cb);
1125
1126 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1127 .expect("Configure to succeed in test");
1128 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1129 call_count.store(0, Ordering::SeqCst); s.establish().expect("Establish to succeed in test");
1132 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1133 call_count.store(0, Ordering::SeqCst); let (_, transport) = Channel::create();
1136 assert_eq!(
1137 s.receive_channel(transport).expect("Receive channel to succeed in test"),
1138 false
1139 );
1140 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1141 call_count.store(0, Ordering::SeqCst); s.start().expect("Start to succeed in test");
1144 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1145 call_count.store(0, Ordering::SeqCst); s.suspend().expect("Suspend to succeed in test");
1148 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1149 call_count.store(0, Ordering::SeqCst); s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1152 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1153 call_count.store(0, Ordering::SeqCst); s.abort();
1157 }
1158}