1use fuchsia_async::{DurationExt, Task, TimeoutExt};
6use fuchsia_bluetooth::types::{A2dpDirection, Channel};
7use fuchsia_sync::Mutex;
8use futures::stream::Stream;
9use futures::{io, FutureExt};
10use log::warn;
11use std::fmt;
12use std::pin::Pin;
13use std::sync::{Arc, RwLock, Weak};
14use std::task::{Context, Poll};
15use zx::{MonotonicDuration, Status};
16
17use crate::types::{
18 EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
19 ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
20};
21use crate::{Peer, SimpleResponder};
22
23pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
24
25#[derive(PartialEq, Debug, Default, Clone, Copy)]
27pub enum StreamState {
28 #[default]
29 Idle,
30 Configured,
31 Opening,
33 Open,
34 Streaming,
35 Closing,
36 Aborting,
37}
38
39pub struct StreamEndpoint {
44 id: StreamEndpointId,
46 endpoint_type: EndpointType,
48 media_type: MediaType,
50 state: Arc<Mutex<StreamState>>,
52 transport: Option<Arc<RwLock<Channel>>>,
55 stream_held: Arc<Mutex<bool>>,
58 capabilities: Vec<ServiceCapability>,
60 remote_id: Option<StreamEndpointId>,
62 configuration: Vec<ServiceCapability>,
64 update_callback: Option<StreamEndpointUpdateCallback>,
66 in_progress: Option<Task<()>>,
69}
70
71impl fmt::Debug for StreamEndpoint {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("StreamEndpoint")
74 .field("id", &self.id.0)
75 .field("endpoint_type", &self.endpoint_type)
76 .field("media_type", &self.media_type)
77 .field("state", &self.state)
78 .field("capabilities", &self.capabilities)
79 .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
80 .field("configuration", &self.configuration)
81 .finish()
82 }
83}
84
85impl StreamEndpoint {
86 pub fn new(
90 id: u8,
91 media_type: MediaType,
92 endpoint_type: EndpointType,
93 capabilities: Vec<ServiceCapability>,
94 ) -> AvdtpResult<StreamEndpoint> {
95 let seid = StreamEndpointId::try_from(id)?;
96 Ok(StreamEndpoint {
97 id: seid,
98 capabilities,
99 media_type,
100 endpoint_type,
101 state: Default::default(),
102 transport: None,
103 stream_held: Arc::new(Mutex::new(false)),
104 remote_id: None,
105 configuration: vec![],
106 update_callback: None,
107 in_progress: None,
108 })
109 }
110
111 pub fn as_new(&self) -> Self {
112 StreamEndpoint::new(
113 self.id.0,
114 self.media_type.clone(),
115 self.endpoint_type.clone(),
116 self.capabilities.clone(),
117 )
118 .expect("as_new")
119 }
120
121 fn set_state(&mut self, state: StreamState) {
123 *self.state.lock() = state;
124 self.update_callback();
125 }
126
127 pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
130 self.update_callback = callback;
131 }
132
133 fn update_callback(&self) {
134 if let Some(cb) = self.update_callback.as_ref() {
135 cb(self);
136 }
137 }
138
139 pub fn from_info(
143 info: &StreamInformation,
144 capabilities: Vec<ServiceCapability>,
145 ) -> StreamEndpoint {
146 StreamEndpoint {
147 id: info.id().clone(),
148 capabilities,
149 media_type: info.media_type().clone(),
150 endpoint_type: info.endpoint_type().clone(),
151 state: Default::default(),
152 transport: None,
153 stream_held: Arc::new(Mutex::new(false)),
154 remote_id: None,
155 configuration: vec![],
156 update_callback: None,
157 in_progress: None,
158 }
159 }
160
161 fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
164 (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
165 }
166
167 pub fn configure(
171 &mut self,
172 remote_id: &StreamEndpointId,
173 capabilities: Vec<ServiceCapability>,
174 ) -> Result<(), (ServiceCategory, ErrorCode)> {
175 self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
176 self.remote_id = Some(remote_id.clone());
177 for cap in &capabilities {
178 if !self
179 .capabilities
180 .iter()
181 .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
182 {
183 return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
184 }
185 }
186 self.configuration = capabilities;
187 self.set_state(StreamState::Configured);
188 Ok(())
189 }
190
191 pub fn reconfigure(
196 &mut self,
197 mut capabilities: Vec<ServiceCapability>,
198 ) -> Result<(), (ServiceCategory, ErrorCode)> {
199 self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
200 if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
202 return Err((cap.category(), ErrorCode::InvalidCapabilities));
203 }
204 let to_replace: std::vec::Vec<_> =
206 capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
207 self.configuration.retain(|x| {
208 let disc = std::mem::discriminant(x);
209 !to_replace.contains(&disc)
210 });
211 self.configuration.append(&mut capabilities);
212 self.update_callback();
213 Ok(())
214 }
215
216 pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
220 if self.configuration.is_empty() {
221 return None;
222 }
223 Some(&self.configuration)
224 }
225
226 const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
229
230 pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
237 if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
238 return Err(Error::InvalidState);
239 }
240 self.transport = Some(Arc::new(RwLock::new(c)));
241 self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
242 self.stream_held = Arc::new(Mutex::new(false));
243 self.set_state(StreamState::Open);
245 Ok(false)
246 }
247
248 pub fn establish(&mut self) -> Result<(), ErrorCode> {
251 if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
252 return Err(ErrorCode::BadState);
253 }
254 self.set_state(StreamState::Opening);
255 Ok(())
256 }
257
258 pub fn try_priority(&self, active: bool) {
262 let priority = match (active, &self.endpoint_type) {
263 (false, _) => A2dpDirection::Normal,
264 (true, EndpointType::Source) => A2dpDirection::Source,
265 (true, EndpointType::Sink) => A2dpDirection::Sink,
266 };
267 let fut = match self.transport.as_ref().unwrap().try_read() {
268 Err(_) => return,
269 Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
270 };
271 Task::spawn(fut).detach();
273 }
274
275 pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
277 if self.endpoint_type != EndpointType::Source {
278 return;
279 }
280 let fut = match self.transport.as_ref().unwrap().try_write() {
281 Err(_) => return,
282 Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
283 };
284 Task::spawn(fut).detach();
286 }
287
288 pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
292 {
293 let lock = self.state.lock();
294 if *lock != StreamState::Open && *lock != StreamState::Streaming {
295 return responder.reject(ErrorCode::BadState);
296 }
297 }
298 self.set_state(StreamState::Closing);
299 responder.send()?;
300 let release_wait_fut = {
301 let seid = self.remote_id.take().unwrap();
304 let transport = self.transport.take().unwrap();
305 let peer = peer.clone();
306 let state = self.state.clone();
307 async move {
308 let Ok(transport) = transport.try_read() else {
309 warn!("unable to lock transport channel, dropping and assuming closed");
310 *state.lock() = StreamState::Idle;
311 return;
312 };
313 let closed_fut = transport
314 .closed()
315 .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
316 Err(Status::TIMED_OUT)
317 });
318 if let Err(Status::TIMED_OUT) = closed_fut.await {
319 let _ = peer.abort(&seid).await;
320 *state.lock() = StreamState::Aborting;
321 drop(transport);
323 }
324 *state.lock() = StreamState::Idle;
325 }
326 };
327 self.in_progress = Some(Task::local(release_wait_fut));
328 self.configuration.clear();
331 self.update_callback();
332 Ok(())
333 }
334
335 pub fn state(&self) -> StreamState {
337 *self.state.lock()
338 }
339
340 pub fn start(&mut self) -> Result<(), ErrorCode> {
343 self.state_is(StreamState::Open)?;
344 self.try_priority(true);
345 self.set_state(StreamState::Streaming);
346 Ok(())
347 }
348
349 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
352 self.state_is(StreamState::Streaming)?;
353 self.set_state(StreamState::Open);
354 self.try_priority(false);
355 Ok(())
356 }
357
358 pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
362 if let Some(seid) = self.remote_id.take() {
363 let _ = peer.abort(&seid).await;
364 self.set_state(StreamState::Aborting);
365 }
366 self.abort()
367 }
368
369 pub fn abort(&mut self) {
372 self.set_state(StreamState::Aborting);
373 self.configuration.clear();
374 self.remote_id = None;
375 self.transport = None;
376 self.set_state(StreamState::Idle);
377 }
378
379 pub fn capabilities(&self) -> &Vec<ServiceCapability> {
383 &self.capabilities
384 }
385
386 pub fn codec_type(&self) -> Option<&MediaCodecType> {
390 self.capabilities.iter().find_map(|cap| match cap {
391 ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
392 _ => None,
393 })
394 }
395
396 pub fn local_id(&self) -> &StreamEndpointId {
398 &self.id
399 }
400
401 pub fn remote_id(&self) -> Option<&StreamEndpointId> {
403 self.remote_id.as_ref()
404 }
405
406 pub fn endpoint_type(&self) -> &EndpointType {
408 &self.endpoint_type
409 }
410
411 pub fn information(&self) -> StreamInformation {
413 let in_use = self.state_is(StreamState::Idle).is_err();
414 StreamInformation::new(
415 self.id.clone(),
416 in_use,
417 self.media_type.clone(),
418 self.endpoint_type.clone(),
419 )
420 }
421
422 pub fn take_transport(&mut self) -> Option<MediaStream> {
426 let mut stream_held = self.stream_held.lock();
427 if *stream_held || self.transport.is_none() {
428 return None;
429 }
430
431 *stream_held = true;
432
433 Some(MediaStream::new(
434 self.stream_held.clone(),
435 Arc::downgrade(self.transport.as_ref().unwrap()),
436 ))
437 }
438}
439
440pub struct MediaStream {
444 in_use: Arc<Mutex<bool>>,
445 channel: Weak<RwLock<Channel>>,
446}
447
448impl MediaStream {
449 pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
450 Self { in_use, channel }
451 }
452
453 fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
454 self.channel
455 .upgrade()
456 .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
457 }
458
459 pub fn max_tx_size(&self) -> Result<usize, io::Error> {
460 match self.try_upgrade()?.try_read() {
461 Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
462 Ok(lock) => Ok(lock.max_tx_size()),
463 }
464 }
465}
466
467impl Drop for MediaStream {
468 fn drop(&mut self) {
469 let mut l = self.in_use.lock();
470 *l = false;
471 }
472}
473
474impl Stream for MediaStream {
475 type Item = AvdtpResult<Vec<u8>>;
476
477 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
478 let arc_chan = match self.try_upgrade() {
479 Err(_e) => return Poll::Ready(None),
480 Ok(c) => c,
481 };
482 let lock = match arc_chan.try_write() {
483 Err(_e) => return Poll::Ready(None),
484 Ok(lock) => lock,
485 };
486 let mut pin_chan = Pin::new(lock);
487 match pin_chan.as_mut().poll_next(cx) {
488 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
489 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
490 Poll::Ready(None) => Poll::Ready(None),
491 Poll::Pending => Poll::Pending,
492 }
493 }
494}
495
496impl io::AsyncWrite for MediaStream {
497 fn poll_write(
498 self: Pin<&mut Self>,
499 cx: &mut Context<'_>,
500 buf: &[u8],
501 ) -> Poll<Result<usize, io::Error>> {
502 let arc_chan = match self.try_upgrade() {
503 Err(e) => return Poll::Ready(Err(e)),
504 Ok(c) => c,
505 };
506 let lock = match arc_chan.try_write() {
507 Err(_) => {
508 return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
509 }
510 Ok(lock) => lock,
511 };
512 let mut pin_chan = Pin::new(lock);
513 pin_chan.as_mut().poll_write(cx, buf)
514 }
515
516 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
517 let arc_chan = match self.try_upgrade() {
518 Err(e) => return Poll::Ready(Err(e)),
519 Ok(c) => c,
520 };
521 let lock = match arc_chan.try_write() {
522 Err(_) => {
523 return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
524 }
525 Ok(lock) => lock,
526 };
527 let mut pin_chan = Pin::new(lock);
528 pin_chan.as_mut().poll_flush(cx)
529 }
530
531 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
532 let arc_chan = match self.try_upgrade() {
533 Err(e) => return Poll::Ready(Err(e)),
534 Ok(c) => c,
535 };
536 let lock = match arc_chan.try_write() {
537 Err(_) => {
538 return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
539 }
540 Ok(lock) => lock,
541 };
542 let mut pin_chan = Pin::new(lock);
543 pin_chan.as_mut().poll_close(cx)
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550 use crate::tests::{expect_remote_recv, setup_peer};
551 use crate::Request;
552
553 use assert_matches::assert_matches;
554 use fidl::endpoints::create_request_stream;
555 use futures::io::AsyncWriteExt;
556 use futures::stream::StreamExt;
557 use {
558 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
559 fuchsia_async as fasync,
560 };
561
562 const REMOTE_ID_VAL: u8 = 1;
563 const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
564
565 #[test]
566 fn make() {
567 let s = StreamEndpoint::new(
568 REMOTE_ID_VAL,
569 MediaType::Audio,
570 EndpointType::Sink,
571 vec![ServiceCapability::MediaTransport],
572 );
573 assert!(s.is_ok());
574 let s = s.unwrap();
575 assert_eq!(&StreamEndpointId(1), s.local_id());
576
577 let info = s.information();
578 assert!(!info.in_use());
579
580 let no = StreamEndpoint::new(
581 0,
582 MediaType::Audio,
583 EndpointType::Sink,
584 vec![ServiceCapability::MediaTransport],
585 );
586 assert!(no.is_err());
587 }
588
589 fn establish_stream(s: &mut StreamEndpoint) -> Channel {
590 assert_matches!(s.establish(), Ok(()));
591 let (chan, remote) = Channel::create();
592 assert_matches!(s.receive_channel(chan), Ok(false));
593 remote
594 }
595
596 #[test]
597 fn from_info() {
598 let seid = StreamEndpointId::try_from(5).unwrap();
599 let info =
600 StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
601 let capabilities = vec![ServiceCapability::MediaTransport];
602
603 let endpoint = StreamEndpoint::from_info(&info, capabilities);
604
605 assert_eq!(&seid, endpoint.local_id());
606 assert_eq!(&false, endpoint.information().in_use());
607 assert_eq!(1, endpoint.capabilities().len());
608 }
609
610 #[test]
611 fn codec_type() {
612 let s = StreamEndpoint::new(
613 REMOTE_ID_VAL,
614 MediaType::Audio,
615 EndpointType::Sink,
616 vec![
617 ServiceCapability::MediaTransport,
618 ServiceCapability::MediaCodec {
619 media_type: MediaType::Audio,
620 codec_type: MediaCodecType::new(0x40),
621 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
623 ],
624 )
625 .unwrap();
626
627 assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
628
629 let s = StreamEndpoint::new(
630 REMOTE_ID_VAL,
631 MediaType::Audio,
632 EndpointType::Sink,
633 vec![ServiceCapability::MediaTransport],
634 )
635 .unwrap();
636
637 assert_eq!(None, s.codec_type());
638 }
639
640 fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
641 StreamEndpoint::new(
642 REMOTE_ID_VAL,
643 MediaType::Audio,
644 r#type,
645 vec![
646 ServiceCapability::MediaTransport,
647 ServiceCapability::MediaCodec {
648 media_type: MediaType::Audio,
649 codec_type: MediaCodecType::new(0x40),
650 codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
652 ],
653 )
654 .unwrap()
655 }
656
657 #[test]
658 fn stream_configure_reconfigure() {
659 let _exec = fasync::TestExecutor::new();
660 let mut s = test_endpoint(EndpointType::Sink);
661
662 assert_matches!(
664 s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
665 Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
666 );
667
668 assert_matches!(
669 s.configure(
670 &REMOTE_ID,
671 vec![
672 ServiceCapability::MediaTransport,
673 ServiceCapability::MediaCodec {
674 media_type: MediaType::Audio,
675 codec_type: MediaCodecType::new(0x40),
676 codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
678 }
679 ]
680 ),
681 Ok(())
682 );
683
684 let _channel = establish_stream(&mut s);
689
690 assert_matches!(
691 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
692 Err((_, ErrorCode::BadState))
693 );
694
695 let reconfiguration = vec![ServiceCapability::MediaCodec {
696 media_type: MediaType::Audio,
697 codec_type: MediaCodecType::new(0x40),
698 codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
700 }];
701
702 let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
705
706 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
708
709 assert_eq!(Some(&new_configuration), s.get_configuration());
710
711 assert_matches!(
713 s.reconfigure(vec![ServiceCapability::MediaTransport]),
714 Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
715 );
716
717 assert_matches!(s.start(), Ok(()));
719
720 assert_matches!(
721 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
722 Err((_, ErrorCode::BadState))
723 );
724
725 assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
726
727 assert_matches!(s.suspend(), Ok(()));
728
729 assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
731
732 assert_matches!(
734 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
735 Err((_, ErrorCode::BadState))
736 );
737 }
738
739 #[test]
740 fn stream_establishment() {
741 let _exec = fasync::TestExecutor::new();
742 let mut s = test_endpoint(EndpointType::Sink);
743
744 let (remote, transport) = Channel::create();
745
746 assert_matches!(s.establish(), Err(ErrorCode::BadState));
748
749 assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
751
752 let buf: &mut [u8] = &mut [0; 1];
753
754 assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
755
756 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
757
758 assert_matches!(s.establish(), Ok(()));
759
760 let (_remote, transport) = Channel::create();
762 assert_matches!(s.receive_channel(transport), Ok(false));
763 }
764
765 fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
766 let (peer, signaling) = setup_peer();
767 let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
769 let mut req_stream = peer.take_request_stream();
770 let mut req_fut = req_stream.next();
771 let complete = exec.run_until_stalled(&mut req_fut);
772 let responder = match complete {
773 Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
774 _ => panic!("Expected a close request"),
775 };
776 (peer, signaling, responder)
777 }
778
779 #[test]
780 fn stream_release_without_abort() {
781 let mut exec = fasync::TestExecutor::new();
782 let mut s = test_endpoint(EndpointType::Sink);
783
784 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
785
786 let remote_transport = establish_stream(&mut s);
787
788 let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
789
790 s.release(responder, &peer).unwrap();
792 expect_remote_recv(&[0x42, 0x08], &signaling);
794
795 drop(remote_transport);
797
798 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
800 assert_eq!(s.state(), StreamState::Idle);
801 }
802
803 #[test]
804 fn test_mediastream() {
805 let mut exec = fasync::TestExecutor::new();
806 let mut s = test_endpoint(EndpointType::Sink);
807
808 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
809
810 assert!(s.take_transport().is_none());
812
813 let remote_transport = establish_stream(&mut s);
814
815 let temp_stream = s.take_transport();
817 assert!(temp_stream.is_some());
818
819 assert!(s.take_transport().is_none());
821
822 drop(temp_stream);
824
825 let media_stream = s.take_transport();
826 assert!(media_stream.is_some());
827 let mut media_stream = media_stream.unwrap();
828
829 assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
831
832 let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
834 let mut write_fut = media_stream.write(hearts);
835
836 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
837
838 expect_remote_recv(hearts, &remote_transport);
839
840 let mut close_fut = media_stream.close();
842 assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
843 drop(s);
847
848 let mut result = vec![0];
850 assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
851
852 let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
854 assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
855
856 let mut next_fut = media_stream.next();
858 assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
859
860 assert_matches!(media_stream.max_tx_size(), Err(_));
862 }
863
864 #[test]
865 fn stream_release_with_abort() {
866 let mut exec = fasync::TestExecutor::new();
867 let mut s = test_endpoint(EndpointType::Sink);
868
869 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
870 let remote_transport = establish_stream(&mut s);
871 let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
872
873 s.release(responder, &peer).unwrap();
875 expect_remote_recv(&[0x42, 0x08], &signaling);
877
878 let next = std::pin::pin!(signaling.next());
880 let received =
881 exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
882 assert_eq!(0x0A, received[1]);
883 let txlabel = received[0] & 0xF0;
884 assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
886
887 let _ = exec.run_singlethreaded(&mut remote_transport.closed());
888
889 while s.state() != StreamState::Idle {
891 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
892 }
893 }
894
895 #[test]
896 fn start_and_suspend() {
897 let mut exec = fasync::TestExecutor::new();
898 let mut s = test_endpoint(EndpointType::Sink);
899
900 assert_matches!(s.start(), Err(ErrorCode::BadState));
902 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
903
904 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
905
906 assert_matches!(s.start(), Err(ErrorCode::BadState));
907 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
908
909 assert_matches!(s.establish(), Ok(()));
910
911 assert_matches!(s.start(), Err(ErrorCode::BadState));
912 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
913
914 let (remote, local) = zx::Socket::create_datagram();
915 let (client_end, mut direction_request_stream) =
916 create_request_stream::<bredr::AudioDirectionExtMarker>();
917 let ext = bredr::Channel {
918 socket: Some(local),
919 channel_mode: Some(fidl_bt::ChannelMode::Basic),
920 max_tx_sdu_size: Some(1004),
921 ext_direction: Some(client_end),
922 ..Default::default()
923 };
924 let transport = Channel::try_from(ext).unwrap();
925 assert_matches!(s.receive_channel(transport), Ok(false));
926
927 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
929 assert_matches!(s.start(), Ok(()));
930
931 match exec.run_until_stalled(&mut direction_request_stream.next()) {
932 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
933 priority,
934 responder,
935 }))) => {
936 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
937 responder.send(Ok(())).expect("response to send cleanly");
938 }
939 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
940 };
941
942 assert_matches!(s.start(), Err(ErrorCode::BadState));
944 assert_matches!(s.suspend(), Ok(()));
945
946 match exec.run_until_stalled(&mut direction_request_stream.next()) {
947 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
948 priority,
949 responder,
950 }))) => {
951 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
952 responder.send(Ok(())).expect("response to send cleanly");
953 }
954 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
955 };
956
957 assert_matches!(s.start(), Ok(()));
959 assert_matches!(s.suspend(), Ok(()));
960
961 let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
963
964 {
965 s.release(responder, &peer).unwrap();
966 expect_remote_recv(&[0x42, 0x08], &signaling);
968 drop(remote);
970 while s.state() != StreamState::Idle {
971 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
972 }
973 }
974
975 assert_matches!(s.start(), Err(ErrorCode::BadState));
977 assert_matches!(s.suspend(), Err(ErrorCode::BadState));
978 }
979
980 fn receive_l2cap_params_channel(
981 s: &mut StreamEndpoint,
982 ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
983 assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
984 assert_matches!(s.establish(), Ok(()));
985
986 let (remote, local) = zx::Socket::create_datagram();
987 let (client_end, l2cap_params_requests) =
988 create_request_stream::<bredr::L2capParametersExtMarker>();
989 let ext = bredr::Channel {
990 socket: Some(local),
991 channel_mode: Some(fidl_bt::ChannelMode::Basic),
992 max_tx_sdu_size: Some(1004),
993 ext_l2cap: Some(client_end),
994 ..Default::default()
995 };
996 let transport = Channel::try_from(ext).unwrap();
997 assert_matches!(s.receive_channel(transport), Ok(false));
998 (remote, l2cap_params_requests)
999 }
1000
1001 #[test]
1002 fn sets_flush_timeout_for_source_transports() {
1003 let mut exec = fasync::TestExecutor::new();
1004 let mut s = test_endpoint(EndpointType::Source);
1005 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1006
1007 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1009 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1010 request,
1011 responder,
1012 }))) => {
1013 assert_eq!(
1014 Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1015 request.flush_timeout
1016 );
1017 responder.send(&request).expect("response to send cleanly");
1018 }
1019 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1020 };
1021 }
1022
1023 #[test]
1024 fn no_flush_timeout_for_sink_transports() {
1025 let mut exec = fasync::TestExecutor::new();
1026 let mut s = test_endpoint(EndpointType::Sink);
1027 let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1028
1029 match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1031 Poll::Pending => {}
1032 x => panic!("Expected no request to set flush timeout, got {:?}", x),
1033 };
1034 }
1035
1036 #[test]
1037 fn get_configuration() {
1038 let mut s = test_endpoint(EndpointType::Sink);
1039
1040 assert!(s.get_configuration().is_none());
1042
1043 let config = vec![
1044 ServiceCapability::MediaTransport,
1045 ServiceCapability::MediaCodec {
1046 media_type: MediaType::Audio,
1047 codec_type: MediaCodecType::new(0),
1048 codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1050 },
1051 ];
1052
1053 assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1054
1055 match s.get_configuration() {
1056 Some(c) => assert_eq!(&config, c),
1057 x => panic!("Expected Ok from get_configuration but got {:?}", x),
1058 };
1059
1060 s.abort();
1062
1063 assert!(s.get_configuration().is_none());
1064 }
1065
1066 use std::sync::atomic::{AtomicUsize, Ordering};
1067
1068 fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1070 let call_count = Arc::new(AtomicUsize::new(0));
1071 let call_count_reader = call_count.clone();
1072 let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1073 let _ = call_count.fetch_add(1, Ordering::SeqCst);
1074 });
1075 (Some(count_cb), call_count_reader)
1076 }
1077
1078 #[test]
1085 fn update_callback() {
1086 let _exec = fasync::TestExecutor::new();
1088 let mut s = test_endpoint(EndpointType::Sink);
1089 let (cb, call_count) = call_count_callback();
1090 s.set_update_callback(cb);
1091
1092 s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1093 .expect("Configure to succeed in test");
1094 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1095 call_count.store(0, Ordering::SeqCst); s.establish().expect("Establish to succeed in test");
1098 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1099 call_count.store(0, Ordering::SeqCst); let (_, transport) = Channel::create();
1102 assert_eq!(
1103 s.receive_channel(transport).expect("Receive channel to succeed in test"),
1104 false
1105 );
1106 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1107 call_count.store(0, Ordering::SeqCst); s.start().expect("Start to succeed in test");
1110 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1111 call_count.store(0, Ordering::SeqCst); s.suspend().expect("Suspend to succeed in test");
1114 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1115 call_count.store(0, Ordering::SeqCst); s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1118 assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1119 call_count.store(0, Ordering::SeqCst); s.abort();
1123 }
1124}