1use anyhow::Context as _;
6use bt_avdtp::{
7 self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8 StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12 ConnectParameters, L2capParameters, PSM_AVDTP, ProfileDescriptor, ProfileProxy,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{Future, FutureExt, StreamExt, select};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38#[derive(Inspect)]
41pub struct Peer {
42 id: PeerId,
44 #[inspect(forward)]
46 inner: Arc<Mutex<PeerInner>>,
47 profile: ProfileProxy,
49 descriptor: Mutex<Option<ProfileDescriptor>>,
51 closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55 metrics: bt_metrics::MetricsLogger,
57 start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61#[derive(Clone)]
65struct StreamPermits {
66 permits: Permits,
67 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68 reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69 inner: Weak<Mutex<PeerInner>>,
70 peer_id: PeerId,
71 sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76 local_id: StreamEndpointId,
77 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81 fn local_id(&self) -> &StreamEndpointId {
82 &self.local_id
83 }
84
85 fn is_held(&self) -> bool {
87 self.open_streams.lock().contains_key(&self.local_id)
88 }
89}
90
91impl Drop for StreamPermit {
92 fn drop(&mut self) {
93 let _ = self.open_streams.lock().remove(&self.local_id);
94 }
95}
96
97impl StreamPermits {
98 fn new(
99 inner: Weak<Mutex<PeerInner>>,
100 peer_id: PeerId,
101 permits: Permits,
102 ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103 let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104 (
105 Self {
106 inner,
107 permits,
108 peer_id,
109 sender,
110 open_streams: Default::default(),
111 reserved_streams: Default::default(),
112 },
113 reservations_receiver,
114 )
115 }
116
117 fn label_for(&self, local_id: &StreamEndpointId) -> String {
118 format!("{} {}", self.peer_id, local_id)
119 }
120
121 fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124 let revoke_fn = self.make_revocation_fn(&local_id);
125 let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126 info!("No permits available: {:?}", self.permits);
127 return None;
128 };
129 permit.relabel(self.label_for(&local_id));
130 if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131 warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132 }
133 Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134 }
135
136 fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139 if !self.reserved_streams.lock().insert(local_id.clone()) {
140 return;
142 }
143 let restart_stream_available_fut = {
144 let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145 let reservation = self.permits.reserve_revokable(self_revoke_fn);
146 let open_streams = self.open_streams.clone();
147 let reserved_streams = self.reserved_streams.clone();
148 let label = self.label_for(&local_id);
149 let local_id = local_id.clone();
150 async move {
151 let permit = reservation.await;
152 permit.relabel(label);
153 if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154 warn!("Reservation replaces acquired permit for {}", local_id.clone());
155 }
156 if !reserved_streams.lock().remove(&local_id) {
157 warn!(local_id:%; "Unrecorded reservation resolved");
158 }
159 StreamPermit { local_id, open_streams }
160 }
161 };
162 if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163 warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164 }
165 }
166
167 fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172 if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173 {
174 let mut lock = peer.lock();
175 match lock.suspend_local_stream(&local_id) {
176 Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177 Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178 }
179 }
180 self.setup_reservation_for(local_id.clone());
181 }
182 self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183 }
184
185 fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit + use<> {
186 let local_id = local_id.clone();
187 let cloned = self.clone();
188 move || cloned.revocation_fn(local_id)
189 }
190}
191
192impl Peer {
193 pub fn create(
200 id: PeerId,
201 peer: avdtp::Peer,
202 streams: Streams,
203 permits: Option<Permits>,
204 profile: ProfileProxy,
205 metrics: bt_metrics::MetricsLogger,
206 ) -> Self {
207 let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208 let reservations_receiver = if let Some(permits) = permits {
209 let (stream_permits, receiver) =
210 StreamPermits::new(Arc::downgrade(&inner), id, permits);
211 inner.lock().permits = Some(stream_permits);
212 receiver
213 } else {
214 let (_, receiver) = mpsc::unbounded();
215 receiver
216 };
217 let res = Self {
218 id,
219 inner,
220 profile,
221 descriptor: Mutex::new(None),
222 closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223 metrics,
224 start_stream_task: Mutex::new(None),
225 };
226 res.start_requests_task(reservations_receiver);
227 res
228 }
229
230 pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231 self.descriptor.lock().replace(descriptor)
232 }
233
234 const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238 pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242 let mut lock = self.inner.lock();
243 if lock.receive_channel(channel)? {
244 let weak = Arc::downgrade(&self.inner);
245 let mut task_lock = self.start_stream_task.lock();
246 *task_lock = Some(fasync::Task::local(async move {
247 trace!("Dwelling to start remotely-opened stream..");
248 fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249 PeerInner::start_opened(weak).await
250 }));
251 }
252 Ok(())
253 }
254
255 pub fn avdtp(&self) -> avdtp::Peer {
257 let lock = self.inner.lock();
258 lock.peer.clone()
259 }
260
261 pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263 self.inner.lock().remote_endpoints()
264 }
265
266 pub fn collect_capabilities(
270 &self,
271 ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> + use<> {
272 let avdtp = self.avdtp();
273 let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274 let inner = self.inner.clone();
275 let metrics = self.metrics.clone();
276 let peer_id = self.id;
277 async move {
278 if let Some(caps) = inner.lock().remote_endpoints() {
279 return Ok(caps);
280 }
281 trace!("Discovering peer streams..");
282 let infos = avdtp.discover().await?;
283 trace!("Discovered {} streams", infos.len());
284 let mut remote_streams = Vec::new();
285 for info in infos {
286 let capabilities = if get_all {
287 avdtp.get_all_capabilities(info.id()).await
288 } else {
289 avdtp.get_capabilities(info.id()).await
290 };
291 match capabilities {
292 Ok(capabilities) => {
293 trace!("Stream {:?}", info);
294 for cap in &capabilities {
295 trace!(" - {:?}", cap);
296 }
297 remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298 }
299 Err(e) => {
300 info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301 }
302 };
303 }
304 inner.lock().set_remote_endpoints(&remote_streams);
305 Self::record_cobalt_metrics(metrics, &remote_streams);
306 Ok(remote_streams)
307 }
308 }
309
310 fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311 let codec_metrics: HashSet<_> = endpoints
312 .iter()
313 .filter_map(|endpoint| {
314 endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315 })
316 .collect();
317 metrics
318 .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320 let cap_metrics: HashSet<_> = endpoints
321 .iter()
322 .flat_map(|endpoint| {
323 endpoint
324 .capabilities()
325 .iter()
326 .filter_map(|t| capability_to_metric(t))
327 .chain(std::iter::once(
328 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329 ))
330 .map(|t| t as u32)
331 })
332 .collect();
333 metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334 }
335
336 fn transport_channel_params() -> L2capParameters {
337 L2capParameters {
338 psm: Some(PSM_AVDTP),
339 parameters: Some(ChannelParameters {
340 max_rx_packet_size: Some(65535),
341 ..Default::default()
342 }),
343 ..Default::default()
344 }
345 }
346
347 pub fn stream_start(
352 &self,
353 remote_id: StreamEndpointId,
354 capabilities: Vec<ServiceCapability>,
355 ) -> impl Future<Output = avdtp::Result<()>> {
356 let peer = Arc::downgrade(&self.inner);
357 let peer_id = self.id.clone();
358 let avdtp = self.avdtp();
359 let profile = self.profile.clone();
360
361 async move {
362 let codec_params =
363 capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364 let (local_id, local_capabilities) = {
365 let peer = PeerInner::upgrade(peer.clone())?;
366 let lock = peer.lock();
367 lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368 };
369
370 let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371 local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373 let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376 .into_iter()
377 .filter_map(|cap| {
378 let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379 return None;
380 };
381 if cap.category() == ServiceCategory::MediaCodec {
382 let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383 return None;
384 };
385 let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386 return None;
387 };
388 let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389 return None;
390 };
391 Some((cap.category(), (&negotiated).into()))
392 } else {
393 Some((cap.category(), cap))
394 }
395 })
396 .collect();
397 let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399 trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401 avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402 {
403 let strong = PeerInner::upgrade(peer.clone())?;
404 strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405 }
406 avdtp.open(&remote_id).await?;
407
408 debug!(peer_id:%; "Connecting transport channel");
409 let channel = profile
410 .connect(
411 &peer_id.into(),
412 &ConnectParameters::L2cap(Self::transport_channel_params()),
413 )
414 .await
415 .context("FIDL error: {}")?
416 .or(Err(avdtp::Error::PeerDisconnected))?;
417 trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418 let channel = match channel.try_into() {
419 Err(e) => {
420 warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421 return Err(avdtp::Error::PeerDisconnected);
422 }
423 Ok(c) => c,
424 };
425
426 trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428 {
429 let strong = PeerInner::upgrade(peer.clone())?;
430 let _ = strong.lock().receive_channel(channel)?;
431 }
432 PeerInner::start_opened(peer).await
434 }
435 }
436
437 pub fn streaming_active(&self) -> bool {
439 self.inner.lock().is_streaming() || self.will_start_streaming()
440 }
441
442 #[cfg(test)]
444 fn is_streaming_now(&self) -> bool {
445 self.inner.lock().is_streaming_now()
446 }
447
448 fn will_start_streaming(&self) -> bool {
451 let mut task_lock = self.start_stream_task.lock();
452 if task_lock.is_none() {
453 return false;
454 }
455 let mut cx = Context::from_waker(&std::task::Waker::noop());
457 if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458 return true;
459 }
460 let _ = task_lock.take();
462 false
463 }
464
465 pub fn stream_suspend(
470 &self,
471 local_id: StreamEndpointId,
472 ) -> impl Future<Output = avdtp::Result<()>> {
473 let peer = Arc::downgrade(&self.inner);
474 PeerInner::suspend(peer, local_id)
475 }
476
477 fn start_requests_task(
480 &self,
481 mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482 ) {
483 let lock = self.inner.lock();
484 let mut request_stream = lock.peer.take_request_stream();
485 let id = self.id.clone();
486 let peer = Arc::downgrade(&self.inner);
487 let mut stream_reservations = FuturesUnordered::new();
488 let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489 fuchsia_async::Task::local(async move {
490 loop {
491 select! {
492 request = request_stream.next() => {
493 match request {
494 None => break,
495 Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496 Some(Ok(request)) => match peer.upgrade() {
497 None => return,
498 Some(p) => {
499 let result_or_future = p.lock().handle_request(request);
500 let result = match result_or_future {
501 Either::Left(result) => result,
502 Either::Right(future) => future.await,
503 };
504 if let Err(e) = result {
505 warn!(peer_id:% = id, e:?; "Error handling request");
506 }
507 }
508 },
509 }
510 },
511 reservation_fut = reservations_receiver.select_next_some() => {
512 stream_reservations.push(reservation_fut)
513 },
514 permit = stream_reservations.select_next_some() => {
515 if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516 warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517 }
518 }
519 complete => break,
520 }
521 }
522 info!(peer_id:% = id; "disconnected");
523 if let Some(wakers) = disconnect_wakers.upgrade() {
524 for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525 waker.wake();
526 }
527 }
528 })
529 .detach();
530 }
531
532 pub fn closed(&self) -> ClosedPeer {
534 ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535 }
536}
537
538#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542 inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546 type Output = ();
547
548 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549 match self.inner.upgrade() {
550 None => Poll::Ready(()),
551 Some(inner) => match inner.lock().as_mut() {
552 None => Poll::Ready(()),
553 Some(wakers) => {
554 wakers.push(cx.waker().clone());
555 Poll::Pending
556 }
557 },
558 }
559 }
560}
561
562fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564 let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565 return false;
566 };
567 (major == 1 && minor >= 3) || major > 1
568}
569
570struct PeerInner {
574 peer: avdtp::Peer,
576 peer_id: PeerId,
578 opening: Option<StreamEndpointId>,
581 local: Streams,
583 permits: Option<StreamPermits>,
585 started: HashMap<StreamEndpointId, WatchedStream>,
587 inspect: fuchsia_inspect::Node,
589 remote_endpoints: Option<Vec<StreamEndpoint>>,
591 remote_inspect: fuchsia_inspect::Node,
593 metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601 self.inspect = parent.create_child(name.as_ref());
602 self.inspect.record_string("id", self.peer_id.to_string());
603 self.local.iattach(&self.inspect, "local_streams")
604 }
605}
606
607impl PeerInner {
608 pub fn new(
609 peer: avdtp::Peer,
610 peer_id: PeerId,
611 local: Streams,
612 metrics: bt_metrics::MetricsLogger,
613 ) -> Self {
614 Self {
615 peer,
616 peer_id,
617 opening: None,
618 local,
619 permits: None,
620 started: HashMap::new(),
621 inspect: Default::default(),
622 remote_endpoints: None,
623 remote_inspect: Default::default(),
624 metrics,
625 }
626 }
627
628 fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630 self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631 }
632
633 fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634 self.remote_inspect = self.inspect.create_child("remote_endpoints");
635 for endpoint in endpoints {
636 self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637 node.record_string("endpoint_id", endpoint.local_id().debug());
638 node.record_string("capabilities", endpoint.capabilities().debug());
639 node.record_string("type", endpoint.endpoint_type().debug());
640 });
641 }
642 self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643 }
644
645 fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647 self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648 }
649
650 fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652 self.remote_endpoints
653 .as_ref()
654 .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655 }
656
657 fn is_streaming(&self) -> bool {
659 self.is_streaming_now() || self.opening.is_some()
660 }
661
662 fn is_streaming_now(&self) -> bool {
664 self.local.streaming().next().is_some()
665 }
666
667 fn set_opening(
668 &mut self,
669 local_id: &StreamEndpointId,
670 remote_id: &StreamEndpointId,
671 capabilities: Vec<ServiceCapability>,
672 ) -> avdtp::Result<()> {
673 if self.opening.is_some() {
674 return Err(avdtp::Error::InvalidState);
675 }
676 let peer_id = self.peer_id;
677 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678 stream
679 .configure(&peer_id, &remote_id, capabilities)
680 .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681 stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682 self.opening = Some(local_id.clone());
683 Ok(())
684 }
685
686 fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687 weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688 }
689
690 async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692 let (avdtp, stream_pairs) = {
693 let peer = Self::upgrade(weak.clone())?;
694 let peer = peer.lock();
695 let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696 .local
697 .open()
698 .filter_map(|stream| {
699 let endpoint = stream.endpoint();
700 endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701 })
702 .collect();
703 (peer.peer.clone(), stream_pairs)
704 };
705 for (local_id, remote_id) in stream_pairs {
706 let permit_result =
707 Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708 if let Ok(permit) = permit_result {
709 Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710 .await?;
711 }
712 }
713 Ok(())
714 }
715
716 async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717 let local_id = permit.local_id().clone();
718 let (avdtp, remote_id) = {
719 let peer = Self::upgrade(weak.clone())?;
720 let mut peer = peer.lock();
721 let remote_id = peer
722 .get_mut(&local_id)
723 .map_err(|e| avdtp::Error::RequestInvalid(e))?
724 .endpoint()
725 .remote_id()
726 .ok_or(avdtp::Error::InvalidState)?
727 .clone();
728 (peer.peer.clone(), remote_id)
729 };
730 Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731 }
732
733 async fn initiated_start(
735 avdtp: avdtp::Peer,
736 weak: Weak<Mutex<Self>>,
737 permit: Option<StreamPermit>,
738 local_id: &StreamEndpointId,
739 remote_id: &StreamEndpointId,
740 ) -> avdtp::Result<()> {
741 trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742 let to_start = std::slice::from_ref(remote_id);
743 avdtp.start(to_start).await?;
744 trace!("Start response received: {permit:?}");
745 let peer = Self::upgrade(weak.clone())?;
746 let (peer_id, start_result) = {
747 let mut peer = peer.lock();
748 (peer.peer_id, peer.start_local_stream(permit, &local_id))
749 };
750 if let Err(e) = start_result {
751 warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752 avdtp.suspend(to_start).await?;
753 }
754 Ok(())
755 }
756
757 fn suspend(
759 weak: Weak<Mutex<Self>>,
760 local_id: StreamEndpointId,
761 ) -> impl Future<Output = avdtp::Result<()>> {
762 let res = (move || {
763 let peer = Self::upgrade(weak.clone())?;
764 let mut peer = peer.lock();
765 Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766 })();
767 let (avdtp, remote_id) = match res {
768 Err(e) => return futures::future::err(e).left_future(),
769 Ok(r) => r,
770 };
771 let to_suspend = &[remote_id];
772 avdtp.suspend(to_suspend).right_future()
773 }
774
775 pub fn find_compatible_local_capabilities(
778 &self,
779 codec_params: &ServiceCapability,
780 remote_id: &StreamEndpointId,
781 ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782 let config = codec_params.try_into()?;
783 let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784 debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785 self.local
786 .compatible(config)
787 .find_map(|s| {
788 let endpoint = s.endpoint();
789 if let Some(d) = our_direction {
790 if &d != endpoint.endpoint_type() {
791 return None;
792 }
793 }
794 Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795 })
796 .ok_or(avdtp::Error::OutOfRange)
797 }
798
799 fn get_permit_or_reserve(
803 &self,
804 local_id: &StreamEndpointId,
805 ) -> Result<Option<StreamPermit>, ()> {
806 let Some(permits) = self.permits.as_ref() else {
807 return Ok(None);
808 };
809 if let Some(permit) = permits.get(local_id.clone()) {
810 return Ok(Some(permit));
811 }
812 info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813 permits.setup_reservation_for(local_id.clone());
814 Err(())
815 }
816
817 fn start_local_stream(
820 &mut self,
821 permit: Option<StreamPermit>,
822 local_id: &StreamEndpointId,
823 ) -> avdtp::Result<()> {
824 let peer_id = self.peer_id;
825 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826 if permit.as_ref().is_some_and(|p| !p.is_held()) {
829 return Err(avdtp::Error::Other(anyhow::format_err!(
830 "streaming permit revoked during setup"
831 )));
832 }
833
834 info!(peer_id:%, stream:?; "Starting");
835 let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836 let watched_stream = WatchedStream::new(permit, stream_finished);
838 if self.started.insert(local_id.clone(), watched_stream).is_some() {
839 warn!(peer_id:%, local_id:%; "Stream that was already started");
840 }
841 Ok(())
842 }
843
844 fn suspend_local_stream(
847 &mut self,
848 local_id: &StreamEndpointId,
849 ) -> avdtp::Result<StreamEndpointId> {
850 let peer_id = self.peer_id;
851 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852 let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853 info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854 stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855 let _ = self.started.remove(local_id);
856 Ok(remote_id)
857 }
858
859 fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864 let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865 let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866 let done = !stream.endpoint_mut().receive_channel(channel)?;
867 if done {
868 self.opening = None;
869 }
870 info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871 Ok(done)
872 }
873
874 fn handle_request(
876 &mut self,
877 request: avdtp::Request,
878 ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>> + use<>> {
879 use avdtp::ErrorCode;
880 use avdtp::Request::*;
881 trace!("Handling {request:?} from peer..");
882 let immediate_result = 'result: {
883 match request {
884 Discover { responder } => responder.send(&self.local.information()),
885 GetCapabilities { responder, stream_id }
886 | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887 None => responder.reject(ErrorCode::BadAcpSeid),
888 Some(stream) => responder.send(stream.endpoint().capabilities()),
889 },
890 Open { responder, stream_id } => {
891 if self.opening.is_none() {
892 break 'result responder.reject(ErrorCode::BadState);
893 }
894 let Ok(stream) = self.get_mut(&stream_id) else {
895 break 'result responder.reject(ErrorCode::BadAcpSeid);
896 };
897 match stream.endpoint_mut().establish() {
898 Ok(()) => responder.send(),
899 Err(_) => responder.reject(ErrorCode::BadState),
900 }
901 }
902 Close { responder, stream_id } => {
903 let peer = self.peer.clone();
904 let Ok(stream) = self.get_mut(&stream_id) else {
905 break 'result responder.reject(ErrorCode::BadAcpSeid);
906 };
907 stream.release(responder, &peer)
908 }
909 SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910 if self.opening.is_some() {
911 break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912 }
913 let peer_id = self.peer_id;
914 let Ok(stream) = self.get_mut(&local_stream_id) else {
915 break 'result responder
916 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917 };
918 match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919 Ok(_) => {
920 self.opening = Some(local_stream_id);
921 responder.send()
922 }
923 Err((category, code)) => responder.reject(category, code),
924 }
925 }
926 GetConfiguration { stream_id, responder } => {
927 let Ok(stream) = self.get_mut(&stream_id) else {
928 break 'result responder.reject(ErrorCode::BadAcpSeid);
929 };
930 let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931 break 'result responder.reject(ErrorCode::BadState);
932 };
933 responder.send(vec_capabilities.as_slice())
934 }
935 Reconfigure { responder, local_stream_id, capabilities } => {
936 let Ok(stream) = self.get_mut(&local_stream_id) else {
937 break 'result responder
938 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939 };
940 match stream.reconfigure(capabilities) {
941 Ok(_) => responder.send(),
942 Err((cat, code)) => responder.reject(cat, code),
943 }
944 }
945 Start { responder, stream_ids } => {
946 let mut immediate_suspend = Vec::new();
947 let result = stream_ids.into_iter().try_for_each(|seid| {
949 let Some(stream) = self.local.get_mut(&seid) else {
950 return Err((seid, ErrorCode::BadAcpSeid));
951 };
952 let remote_id = stream.endpoint().remote_id().cloned();
953 let Some(remote_id) = remote_id else {
954 return Err((seid, ErrorCode::BadState));
955 };
956 let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957 immediate_suspend.push(remote_id);
961 return Ok(());
962 };
963 match self.start_local_stream(permit, &seid) {
964 Ok(()) => Ok(()),
965 Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966 Err(_) => Err((seid, ErrorCode::BadState)),
967 }
968 });
969 let response_result = match result {
970 Ok(()) => responder.send(),
971 Err((seid, code)) => responder.reject(&seid, code),
972 };
973 {
974 let peer = self.peer.clone();
975 return Either::Right(async move {
976 if !immediate_suspend.is_empty() {
977 peer.suspend(immediate_suspend.as_slice()).await?;
978 }
979 response_result
980 });
981 }
982 }
983 Suspend { responder, stream_ids } => {
984 for seid in stream_ids {
985 match self.suspend_local_stream(&seid) {
986 Ok(_remote_id) => {}
987 Err(avdtp::Error::RequestInvalid(code)) => {
988 break 'result responder.reject(&seid, code);
989 }
990 Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991 }
992 }
993 responder.send()
994 }
995 Abort { responder, stream_id } => {
996 let Ok(stream) = self.get_mut(&stream_id) else {
997 break 'result Ok(());
999 };
1000 stream.abort();
1001 self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002 responder.send()
1003 }
1004 DelayReport { responder, delay, stream_id } => {
1005 let delay_ns = delay as u64 * 100000;
1007 self.metrics.log_integer(
1009 bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010 delay_ns.try_into().unwrap_or(-1),
1011 vec![],
1012 );
1013 let Some(stream) = self.local.get_mut(&stream_id) else {
1015 break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016 };
1017 let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018 let peer = self.peer_id;
1019 match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020 Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021 Err(avdtp::ErrorCode::BadState) => {
1022 info!(peer:%, stream_id:%; "bad state {delay_str}");
1023 break 'result responder.reject(avdtp::ErrorCode::BadState);
1024 }
1025 Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026 };
1027 responder.send()
1029 }
1030 }
1031 };
1032 Either::Left(immediate_result)
1033 }
1034}
1035
1036struct WatchedStream {
1039 _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043 fn new(
1044 permit: Option<StreamPermit>,
1045 finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046 ) -> Self {
1047 let permit_task = fasync::Task::spawn(async move {
1048 let _ = finish_fut.await;
1049 drop(permit);
1050 });
1051 Self { _permit_task: permit_task }
1052 }
1053}
1054
1055fn codectype_to_availability_metric(
1056 codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058 match codec_type {
1059 &MediaCodecType::AUDIO_SBC => {
1060 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061 }
1062 &MediaCodecType::AUDIO_MPEG12 => {
1063 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064 }
1065 &MediaCodecType::AUDIO_AAC => {
1066 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067 }
1068 &MediaCodecType::AUDIO_ATRAC => {
1069 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070 }
1071 &MediaCodecType::AUDIO_NON_A2DP => {
1072 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073 }
1074 _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075 }
1076}
1077
1078fn capability_to_metric(
1079 cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081 match cap {
1082 ServiceCapability::DelayReporting => {
1083 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084 }
1085 ServiceCapability::Reporting => {
1086 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087 }
1088 ServiceCapability::Recovery { .. } => {
1089 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090 }
1091 ServiceCapability::ContentProtection { .. } => {
1092 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093 }
1094 ServiceCapability::HeaderCompression { .. } => {
1095 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096 }
1097 ServiceCapability::Multiplexing { .. } => {
1098 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099 }
1100 other => {
1102 trace!("untracked remote peer capability: {:?}", other);
1103 None
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111
1112 use async_utils::PollExt;
1113 use bt_metrics::respond_to_metrics_req_for_test;
1114 use fidl::endpoints::create_proxy_and_stream;
1115 use fidl_fuchsia_bluetooth::ErrorCode;
1116 use fidl_fuchsia_bluetooth_bredr::{
1117 ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118 };
1119 use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120 use futures::future::Either;
1121 use std::pin::pin;
1122
1123 use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124 use crate::media_types::*;
1125 use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127 fn fake_metrics()
1128 -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129 let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130 fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131 >();
1132 (bt_metrics::MetricsLogger::from_proxy(c), s)
1133 }
1134
1135 fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136 let (remote, signaling) = Channel::create();
1137 let peer = avdtp::Peer::new(signaling);
1138 (peer, remote)
1139 }
1140
1141 fn build_test_streams() -> Streams {
1142 let mut streams = Streams::default();
1143 let source = Stream::build(
1144 make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145 TestMediaTaskBuilder::new_delayable().builder(),
1146 );
1147 streams.insert(source);
1148 let sink = Stream::build(
1149 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150 TestMediaTaskBuilder::new().builder(),
1151 );
1152 streams.insert(sink);
1153 streams
1154 }
1155
1156 fn build_test_streams_delayable() -> Streams {
1157 fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158 StreamEndpoint::new(
1159 seid,
1160 avdtp::MediaType::Audio,
1161 direction,
1162 vec![
1163 avdtp::ServiceCapability::MediaTransport,
1164 avdtp::ServiceCapability::DelayReporting,
1165 sbc_mediacodec_capability(),
1166 ],
1167 )
1168 .expect("endpoint creation should succeed")
1169 }
1170 let mut streams = Streams::default();
1171 let source = Stream::build(
1172 with_delay(1, avdtp::EndpointType::Source),
1173 TestMediaTaskBuilder::new_delayable().builder(),
1174 );
1175 streams.insert(source);
1176 let sink = Stream::build(
1177 with_delay(2, avdtp::EndpointType::Sink),
1178 TestMediaTaskBuilder::new().builder(),
1179 );
1180 streams.insert(sink);
1181 streams
1182 }
1183
1184 #[track_caller]
1185 pub(crate) fn recv_remote(remote: &mut Channel) -> Result<Vec<u8>, zx::Status> {
1186 let fut = remote.next();
1187 match fut.now_or_never() {
1188 Some(Some(res)) => res,
1189 Some(None) => Err(zx::Status::PEER_CLOSED),
1190 None => Err(zx::Status::SHOULD_WAIT),
1191 }
1192 }
1193
1194 fn setup_test_peer(
1197 use_cobalt: bool,
1198 streams: Streams,
1199 permits: Option<Permits>,
1200 ) -> (
1201 Channel,
1202 ProfileRequestStream,
1203 Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1204 Peer,
1205 ) {
1206 let (avdtp, remote) = setup_avdtp_peer();
1207 let (metrics_logger, cobalt_receiver) = if use_cobalt {
1208 let (l, r) = fake_metrics();
1209 (l, Some(r))
1210 } else {
1211 (bt_metrics::MetricsLogger::default(), None)
1212 };
1213 let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1214 let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1215
1216 (remote, requests, cobalt_receiver, peer)
1217 }
1218
1219 fn expect_get_capabilities_and_respond(
1220 remote: &mut Channel,
1221 expected_seid: u8,
1222 response_capabilities: &[u8],
1223 ) {
1224 let received = recv_remote(remote).unwrap();
1225 assert_eq!(0x00, received[0] & 0xF);
1227 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1229
1230 let txlabel_raw = received[0] & 0xF0;
1231
1232 #[rustfmt::skip]
1234 let mut get_capabilities_rsp = vec![
1235 txlabel_raw << 4 | 0x2, 0x02 ];
1238
1239 get_capabilities_rsp.extend_from_slice(response_capabilities);
1240
1241 assert!(remote.write(&get_capabilities_rsp).is_ok());
1242 }
1243
1244 fn expect_get_all_capabilities_and_respond(
1245 remote: &mut Channel,
1246 expected_seid: u8,
1247 response_capabilities: &[u8],
1248 ) {
1249 let received = recv_remote(remote).unwrap();
1250 assert_eq!(0x00, received[0] & 0xF);
1252 assert_eq!(0x0C, received[1]); assert_eq!(expected_seid << 2, received[2]);
1254
1255 let txlabel_raw = received[0] & 0xF0;
1256
1257 #[rustfmt::skip]
1259 let mut get_capabilities_rsp = vec![
1260 txlabel_raw << 4 | 0x2, 0x0C ];
1263
1264 get_capabilities_rsp.extend_from_slice(response_capabilities);
1265
1266 assert!(remote.write(&get_capabilities_rsp).is_ok());
1267 }
1268
1269 #[fuchsia::test]
1270 fn disconnected() {
1271 let mut exec = fasync::TestExecutor::new();
1272 let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1273 let (remote, signaling) = Channel::create();
1274
1275 let id = PeerId(1);
1276
1277 let avdtp = avdtp::Peer::new(signaling);
1278 let peer = Peer::create(
1279 id,
1280 avdtp,
1281 Streams::default(),
1282 None,
1283 proxy,
1284 bt_metrics::MetricsLogger::default(),
1285 );
1286
1287 let closed_fut = peer.closed();
1288
1289 let mut closed_fut = pin!(closed_fut);
1290
1291 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1292
1293 drop(remote);
1295
1296 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1297 }
1298
1299 #[fuchsia::test]
1300 fn peer_collect_capabilities_success() {
1301 let mut exec = fasync::TestExecutor::new();
1302
1303 let (mut remote, _, cobalt_receiver, peer) =
1304 setup_test_peer(true, build_test_streams(), None);
1305
1306 let p: ProfileDescriptor = ProfileDescriptor {
1307 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1308 major_version: Some(1),
1309 minor_version: Some(2),
1310 ..Default::default()
1311 };
1312 let _ = peer.set_descriptor(p);
1313
1314 let collect_future = peer.collect_capabilities();
1315 let mut collect_future = pin!(collect_future);
1316
1317 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1318
1319 let received = recv_remote(&mut remote).unwrap();
1321 assert_eq!(0x00, received[0] & 0xF);
1323 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1326
1327 let response: &[u8] = &[
1329 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1336 assert!(remote.write(response).is_ok());
1337
1338 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1339
1340 #[rustfmt::skip]
1342 let capabilities_rsp = &[
1343 0x01, 0x00,
1345 0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1347 ];
1348 expect_get_capabilities_and_respond(&mut remote, 0x3E, capabilities_rsp);
1349
1350 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1351
1352 #[rustfmt::skip]
1354 let capabilities_rsp = &[
1355 0x01, 0x00,
1357 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1359 ];
1360 expect_get_capabilities_and_respond(&mut remote, 0x01, capabilities_rsp);
1361
1362 match exec.run_until_stalled(&mut collect_future) {
1363 Poll::Pending => panic!("collect capabilities should be complete"),
1364 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1365 Poll::Ready(Ok(endpoints)) => {
1366 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1367 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1368 for stream in endpoints {
1369 if stream.local_id() == &first_seid {
1370 let expected_caps = vec![
1371 ServiceCapability::MediaTransport,
1372 ServiceCapability::MediaCodec {
1373 media_type: avdtp::MediaType::Audio,
1374 codec_type: avdtp::MediaCodecType::new(0x04),
1375 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1376 },
1377 ];
1378 assert_eq!(&expected_caps, stream.capabilities());
1379 } else if stream.local_id() == &second_seid {
1380 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1381 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1382 } else {
1383 panic!("Unexpected endpoint in the streams collected");
1384 }
1385 }
1386 }
1387 }
1388
1389 let mut recv = cobalt_receiver.expect("should have receiver");
1391 let mut log_events = Vec::new();
1392 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1393 log_events.push(respond_to_metrics_req_for_test(req));
1394 }
1395
1396 assert_eq!(3, log_events.len());
1398 assert!(log_events.contains(&MetricEvent {
1399 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1400 event_codes: vec![
1401 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1402 ],
1403 payload: MetricEventPayload::Count(1),
1404 }));
1405 assert!(log_events.contains(&MetricEvent {
1406 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1407 event_codes: vec![
1408 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1409 ],
1410 payload: MetricEventPayload::Count(1),
1411 }));
1412 assert!(log_events.contains(&MetricEvent {
1413 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1414 event_codes: vec![
1415 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1416 ],
1417 payload: MetricEventPayload::Count(1),
1418 }));
1419
1420 let collect_future = peer.collect_capabilities();
1422 let mut collect_future = pin!(collect_future);
1423
1424 match exec.run_until_stalled(&mut collect_future) {
1425 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1426 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1427 };
1428 }
1429
1430 #[fuchsia::test]
1431 fn peer_collect_all_capabilities_success() {
1432 let mut exec = fasync::TestExecutor::new();
1433
1434 let (mut remote, _, cobalt_receiver, peer) =
1435 setup_test_peer(true, build_test_streams(), None);
1436 let p: ProfileDescriptor = ProfileDescriptor {
1437 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1438 major_version: Some(1),
1439 minor_version: Some(3),
1440 ..Default::default()
1441 };
1442 let _ = peer.set_descriptor(p);
1443
1444 let collect_future = peer.collect_capabilities();
1445 let mut collect_future = pin!(collect_future);
1446
1447 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1448
1449 let received = recv_remote(&mut remote).unwrap();
1451 assert_eq!(0x00, received[0] & 0xF);
1453 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1456
1457 let response: &[u8] = &[
1459 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1466 assert!(remote.write(response).is_ok());
1467
1468 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1469
1470 #[rustfmt::skip]
1472 let capabilities_rsp = &[
1473 0x01, 0x00,
1475 0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1477 0x08, 0x00
1479 ];
1480 expect_get_all_capabilities_and_respond(&mut remote, 0x3E, capabilities_rsp);
1481
1482 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1483
1484 #[rustfmt::skip]
1486 let capabilities_rsp = &[
1487 0x01, 0x00,
1489 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1491 ];
1492 expect_get_all_capabilities_and_respond(&mut remote, 0x01, capabilities_rsp);
1493
1494 match exec.run_until_stalled(&mut collect_future) {
1495 Poll::Pending => panic!("collect capabilities should be complete"),
1496 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1497 Poll::Ready(Ok(endpoints)) => {
1498 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1499 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1500 for stream in endpoints {
1501 if stream.local_id() == &first_seid {
1502 let expected_caps = vec![
1503 ServiceCapability::MediaTransport,
1504 ServiceCapability::MediaCodec {
1505 media_type: avdtp::MediaType::Audio,
1506 codec_type: avdtp::MediaCodecType::new(0x40),
1507 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1508 },
1509 ServiceCapability::DelayReporting,
1510 ];
1511 assert_eq!(&expected_caps, stream.capabilities());
1512 } else if stream.local_id() == &second_seid {
1513 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1514 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1515 } else {
1516 panic!("Unexpected endpoint in the streams collected");
1517 }
1518 }
1519 }
1520 }
1521
1522 let mut recv = cobalt_receiver.expect("should have receiver");
1524 let mut log_events = Vec::new();
1525 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1526 log_events.push(respond_to_metrics_req_for_test(req));
1527 }
1528
1529 assert_eq!(4, log_events.len());
1531 assert!(log_events.contains(&MetricEvent {
1532 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1533 event_codes: vec![
1534 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1535 ],
1536 payload: MetricEventPayload::Count(1),
1537 }));
1538 assert!(log_events.contains(&MetricEvent {
1539 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1540 event_codes: vec![
1541 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1542 ],
1543 payload: MetricEventPayload::Count(1),
1544 }));
1545 assert!(log_events.contains(&MetricEvent {
1546 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1547 event_codes: vec![
1548 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1549 ],
1550 payload: MetricEventPayload::Count(1),
1551 }));
1552 assert!(log_events.contains(&MetricEvent {
1553 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1554 event_codes: vec![
1555 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1556 ],
1557 payload: MetricEventPayload::Count(1),
1558 }));
1559
1560 let collect_future = peer.collect_capabilities();
1562 let mut collect_future = pin!(collect_future);
1563
1564 match exec.run_until_stalled(&mut collect_future) {
1565 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1566 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1567 };
1568 }
1569
1570 #[fuchsia::test]
1571 fn peer_collect_capabilities_discovery_fails() {
1572 let mut exec = fasync::TestExecutor::new();
1573
1574 let (mut remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1575
1576 let collect_future = peer.collect_capabilities();
1577 let mut collect_future = pin!(collect_future);
1578
1579 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1581
1582 let received = recv_remote(&mut remote).unwrap();
1584 assert_eq!(0x00, received[0] & 0xF);
1586 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1589
1590 let response: &[u8] = &[
1592 txlabel_raw | 0x0 << 2 | 0x3, 0x01, 0x31, ];
1596 assert!(remote.write(response).is_ok());
1597
1598 match exec.run_until_stalled(&mut collect_future) {
1601 Poll::Pending => panic!("Should be ready after discovery failure"),
1602 Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1603 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1604 assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1605 }
1606 Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1607 }
1608 }
1609
1610 #[fuchsia::test]
1611 fn peer_collect_capabilities_get_capability_fails() {
1612 let mut exec = fasync::TestExecutor::new();
1613
1614 let (mut remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1615
1616 let collect_future = peer.collect_capabilities();
1617 let mut collect_future = pin!(collect_future);
1618
1619 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1621
1622 let received = recv_remote(&mut remote).unwrap();
1624 assert_eq!(0x00, received[0] & 0xF);
1626 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1629
1630 let response: &[u8] = &[
1632 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1639 assert!(remote.write(response).is_ok());
1640
1641 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1642
1643 let expected_seid = 0x3E;
1645 let received = recv_remote(&mut remote).unwrap();
1646 assert_eq!(0x00, received[0] & 0xF);
1648 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1650
1651 let txlabel_raw = received[0] & 0xF0;
1652
1653 let response: &[u8] = &[
1654 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1658 assert!(remote.write(response).is_ok());
1659
1660 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1661
1662 let expected_seid = 0x01;
1664 let received = recv_remote(&mut remote).unwrap();
1665 assert_eq!(0x00, received[0] & 0xF);
1667 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1669
1670 let txlabel_raw = received[0] & 0xF0;
1671
1672 let response: &[u8] = &[
1673 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1677 assert!(remote.write(response).is_ok());
1678
1679 match exec.run_until_stalled(&mut collect_future) {
1681 Poll::Pending => panic!("Should be ready after discovery failure"),
1682 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1683 Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1684 }
1685 }
1686
1687 fn receive_simple_accept(remote: &mut Channel, signal_id: u8) {
1688 let received = recv_remote(remote).expect("expected a packet");
1689 assert_eq!(0x00, received[0] & 0xF);
1691 assert_eq!(signal_id, received[1]);
1692
1693 let txlabel_raw = received[0] & 0xF0;
1694
1695 let response: &[u8] = &[
1696 txlabel_raw | 0x0 << 2 | 0x2, signal_id,
1698 ];
1699 assert!(remote.write(response).is_ok());
1700 }
1701
1702 #[fuchsia::test]
1703 fn peer_stream_start_success() {
1704 let mut exec = fasync::TestExecutor::new();
1705
1706 let (mut remote, mut profile_request_stream, _, peer) =
1707 setup_test_peer(false, build_test_streams(), None);
1708
1709 let remote_seid = 2_u8.try_into().unwrap();
1710
1711 let codec_params = ServiceCapability::MediaCodec {
1712 media_type: avdtp::MediaType::Audio,
1713 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1714 codec_extra: vec![0x11, 0x45, 51, 51],
1715 };
1716
1717 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1718 let mut start_future = pin!(start_future);
1719
1720 match exec.run_until_stalled(&mut start_future) {
1721 Poll::Pending => {}
1722 x => panic!("Expected pending, but got {x:?}"),
1723 };
1724
1725 receive_simple_accept(&mut remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
1728
1729 receive_simple_accept(&mut remote, 0x06); match exec.run_until_stalled(&mut start_future) {
1732 Poll::Pending => {}
1733 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1734 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1735 };
1736
1737 let (_, transport) = Channel::create();
1739
1740 let request = exec.run_until_stalled(&mut profile_request_stream.next());
1741 match request {
1742 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1743 assert_eq!(PeerId(1), peer_id.into());
1744 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1745 let channel = transport.try_into().unwrap();
1746 responder.send(Ok(channel)).expect("responder sends");
1747 }
1748 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1749 };
1750
1751 match exec.run_until_stalled(&mut start_future) {
1752 Poll::Pending => {}
1753 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1754 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1755 };
1756
1757 receive_simple_accept(&mut remote, 0x07); match exec.run_until_stalled(&mut start_future) {
1762 Poll::Pending => panic!("Should be ready after start succeeds"),
1763 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1764 Poll::Ready(Ok(())) => {
1766 assert!(peer.is_streaming_now());
1767 }
1768 }
1769 }
1770
1771 #[fuchsia::test]
1772 fn peer_stream_start_picks_correct_direction() {
1773 let mut exec = fasync::TestExecutor::new();
1774
1775 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1776 let remote = avdtp::Peer::new(remote);
1777 let mut remote_events = remote.take_request_stream();
1778
1779 fn remote_handle_request(req: avdtp::Request) {
1781 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1782 let res = match req {
1783 avdtp::Request::Discover { responder } => {
1784 let infos = [avdtp::StreamInformation::new(
1785 expected_stream_id,
1786 false,
1787 avdtp::MediaType::Audio,
1788 avdtp::EndpointType::Source,
1789 )];
1790 responder.send(&infos)
1791 }
1792 avdtp::Request::GetAllCapabilities { stream_id, responder }
1793 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1794 assert_eq!(expected_stream_id, stream_id);
1795 let caps = vec![
1796 ServiceCapability::MediaTransport,
1797 ServiceCapability::MediaCodec {
1798 media_type: avdtp::MediaType::Audio,
1799 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1800 codec_extra: vec![0x11, 0x45, 51, 250],
1801 },
1802 ];
1803 responder.send(&caps[..])
1804 }
1805 avdtp::Request::Open { responder, stream_id } => {
1806 assert_eq!(expected_stream_id, stream_id);
1807 responder.send()
1808 }
1809 avdtp::Request::SetConfiguration {
1810 responder,
1811 local_stream_id,
1812 remote_stream_id,
1813 ..
1814 } => {
1815 assert_eq!(local_stream_id, expected_stream_id);
1816 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1818 responder.send()
1819 }
1820 x => panic!("Unexpected request: {:?}", x),
1821 };
1822 res.expect("should be able to respond");
1823 }
1824
1825 let collect_capabilities_fut = peer.collect_capabilities();
1827 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1828
1829 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1830
1831 let request = exec.run_singlethreaded(&mut remote_events.next());
1832 remote_handle_request(request.expect("should have a discovery request").unwrap());
1833
1834 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1835 let request = exec.run_singlethreaded(&mut remote_events.next());
1836 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1837
1838 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1839
1840 let remote_seid = 4_u8.try_into().unwrap();
1842
1843 let codec_params = ServiceCapability::MediaCodec {
1844 media_type: avdtp::MediaType::Audio,
1845 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1846 codec_extra: vec![0x11, 0x45, 51, 51],
1847 };
1848 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1849 let mut start_future = pin!(start_future);
1850
1851 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1852 let request = exec.run_singlethreaded(&mut remote_events.next());
1853 remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1854
1855 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1856 let request = exec.run_singlethreaded(&mut remote_events.next());
1857 remote_handle_request(request.expect("should have an open request").unwrap());
1858 }
1859
1860 #[fuchsia::test]
1861 fn peer_stream_start_strips_unsupported_local_capabilities() {
1862 let mut exec = fasync::TestExecutor::new();
1863
1864 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1865 let remote = avdtp::Peer::new(remote);
1866 let mut remote_events = remote.take_request_stream();
1867
1868 fn remote_handle_request(req: avdtp::Request) {
1870 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1871 let res = match req {
1872 avdtp::Request::Discover { responder } => {
1873 let infos = [avdtp::StreamInformation::new(
1874 expected_stream_id,
1875 false,
1876 avdtp::MediaType::Audio,
1877 avdtp::EndpointType::Source,
1878 )];
1879 responder.send(&infos)
1880 }
1881 avdtp::Request::GetAllCapabilities { stream_id, responder }
1882 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1883 assert_eq!(expected_stream_id, stream_id);
1884 let caps = vec![
1885 ServiceCapability::MediaTransport,
1886 ServiceCapability::DelayReporting,
1888 ServiceCapability::MediaCodec {
1889 media_type: avdtp::MediaType::Audio,
1890 codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1891 codec_extra: vec![128, 0, 132, 134, 0, 0],
1892 },
1893 ];
1894 responder.send(&caps[..])
1895 }
1896 avdtp::Request::Open { responder, stream_id } => {
1897 assert_eq!(expected_stream_id, stream_id);
1898 responder.send()
1899 }
1900 avdtp::Request::SetConfiguration {
1901 responder,
1902 local_stream_id,
1903 remote_stream_id,
1904 capabilities,
1905 } => {
1906 assert_eq!(local_stream_id, expected_stream_id);
1907 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1909 assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1912 responder.send()
1913 }
1914 x => panic!("Unexpected request: {:?}", x),
1915 };
1916 res.expect("should be able to respond");
1917 }
1918
1919 let collect_capabilities_fut = peer.collect_capabilities();
1921 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1922
1923 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1924
1925 let request = exec.run_singlethreaded(&mut remote_events.next());
1926 remote_handle_request(request.expect("should have a discovery request").unwrap());
1927
1928 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1929 let request = exec.run_singlethreaded(&mut remote_events.next());
1930 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1931
1932 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1933
1934 let remote_seid = 4_u8.try_into().unwrap();
1936
1937 let codec_params = ServiceCapability::MediaCodec {
1938 media_type: avdtp::MediaType::Audio,
1939 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1940 codec_extra: vec![0x11, 0x45, 51, 51],
1941 };
1942 let start_future =
1943 peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1944 let mut start_future = pin!(start_future);
1945
1946 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1947 let request = exec.run_singlethreaded(&mut remote_events.next());
1948 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1949
1950 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1951 let request = exec.run_singlethreaded(&mut remote_events.next());
1952 remote_handle_request(request.expect("should have an open request").unwrap());
1953 }
1954
1955 #[fuchsia::test]
1956 fn peer_stream_start_orders_local_capabilities() {
1957 let mut exec = fasync::TestExecutor::new();
1958
1959 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1960 let remote = avdtp::Peer::new(remote);
1961 let mut remote_events = remote.take_request_stream();
1962
1963 fn remote_handle_request(req: avdtp::Request) {
1965 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1966 let res = match req {
1967 avdtp::Request::Discover { responder } => {
1968 let infos = [avdtp::StreamInformation::new(
1969 expected_stream_id,
1970 false,
1971 avdtp::MediaType::Audio,
1972 avdtp::EndpointType::Source,
1973 )];
1974 responder.send(&infos)
1975 }
1976 avdtp::Request::GetAllCapabilities { stream_id, responder }
1977 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1978 assert_eq!(expected_stream_id, stream_id);
1979 let caps = &[
1980 ServiceCapability::MediaTransport,
1981 ServiceCapability::MediaCodec {
1982 media_type: avdtp::MediaType::Audio,
1983 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1984 codec_extra: vec![0x11, 0x45, 51, 250],
1985 },
1986 ServiceCapability::DelayReporting,
1987 ];
1988 responder.send(caps)
1989 }
1990 avdtp::Request::Open { responder, stream_id } => {
1991 assert_eq!(expected_stream_id, stream_id);
1992 responder.send()
1993 }
1994 avdtp::Request::SetConfiguration {
1995 responder,
1996 local_stream_id,
1997 remote_stream_id,
1998 capabilities,
1999 } => {
2000 assert_eq!(local_stream_id, expected_stream_id);
2001 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
2003 let mut capabilities_ordered = capabilities.clone();
2005 capabilities_ordered.sort_by_key(ServiceCapability::category);
2006 assert_eq!(capabilities, capabilities_ordered);
2007 responder.send()
2008 }
2009 x => panic!("Unexpected request: {:?}", x),
2010 };
2011 res.expect("should be able to respond");
2012 }
2013
2014 let collect_capabilities_fut = peer.collect_capabilities();
2016 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2017
2018 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2019
2020 let request = exec.run_singlethreaded(&mut remote_events.next());
2021 remote_handle_request(request.expect("should have a discovery request").unwrap());
2022
2023 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2024 let request = exec.run_singlethreaded(&mut remote_events.next());
2025 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2026
2027 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2028
2029 let remote_seid = 4_u8.try_into().unwrap();
2031
2032 let codec_params = ServiceCapability::MediaCodec {
2033 media_type: avdtp::MediaType::Audio,
2034 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2035 codec_extra: vec![0x11, 0x45, 51, 51],
2036 };
2037 let start_future = peer.stream_start(
2038 remote_seid,
2039 vec![
2040 ServiceCapability::MediaTransport,
2041 ServiceCapability::DelayReporting,
2042 codec_params,
2043 ],
2044 );
2045 let mut start_future = pin!(start_future);
2046
2047 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2048 let request = exec.run_singlethreaded(&mut remote_events.next());
2049 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2050
2051 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2052 let request = exec.run_singlethreaded(&mut remote_events.next());
2053 remote_handle_request(request.expect("should have an open request").unwrap());
2054 }
2055
2056 #[fuchsia::test]
2059 fn peer_stream_start_permit_revoked() {
2060 let mut exec = fasync::TestExecutor::new();
2061
2062 let test_permits = Permits::new(1);
2063 let (mut remote, mut profile_request_stream, _, peer) =
2064 setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2065
2066 let remote_seid = 2_u8.try_into().unwrap();
2067
2068 let codec_params = ServiceCapability::MediaCodec {
2069 media_type: avdtp::MediaType::Audio,
2070 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2071 codec_extra: vec![0x11, 0x45, 51, 51],
2072 };
2073
2074 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2075 let mut start_future = pin!(start_future);
2076
2077 let _ = exec
2078 .run_until_stalled(&mut start_future)
2079 .expect_pending("waiting for set config response");
2080 receive_simple_accept(&mut remote, 0x03); exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2082 receive_simple_accept(&mut remote, 0x06); exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2084 assert!(!peer.is_streaming_now());
2085
2086 let (_, transport) = Channel::create();
2088
2089 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2090 match request {
2091 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2092 assert_eq!(PeerId(1), peer_id.into());
2093 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2094 let channel = transport.try_into().unwrap();
2095 responder.send(Ok(channel)).expect("responder sends");
2096 }
2097 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2098 };
2099
2100 exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2101 assert!(!peer.is_streaming_now());
2102
2103 let seized_permits = test_permits.seize();
2105 assert_eq!(seized_permits.len(), 1);
2106 receive_simple_accept(&mut remote, 0x07); exec.run_until_stalled(&mut start_future)
2111 .expect_pending("waiting to send outgoing suspend");
2112 assert!(!peer.is_streaming_now());
2113 receive_simple_accept(&mut remote, 0x09); let () = exec
2119 .run_until_stalled(&mut start_future)
2120 .expect("start finished")
2121 .expect("suspended stream is ok");
2122 assert!(!peer.is_streaming_now());
2123 }
2124
2125 #[fuchsia::test]
2126 fn peer_stream_start_fails_wrong_direction() {
2127 let mut exec = fasync::TestExecutor::new();
2128
2129 let mut streams = Streams::default();
2131 let source = Stream::build(
2132 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2133 TestMediaTaskBuilder::new().builder(),
2134 );
2135 streams.insert(source);
2136
2137 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2138 let remote = avdtp::Peer::new(remote);
2139 let mut remote_events = remote.take_request_stream();
2140
2141 fn remote_handle_request(req: avdtp::Request) {
2143 let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2144 let res = match req {
2145 avdtp::Request::Discover { responder } => {
2146 let infos = [avdtp::StreamInformation::new(
2147 expected_stream_id,
2148 false,
2149 avdtp::MediaType::Audio,
2150 avdtp::EndpointType::Source,
2151 )];
2152 responder.send(&infos)
2153 }
2154 avdtp::Request::GetAllCapabilities { stream_id, responder }
2155 | avdtp::Request::GetCapabilities { stream_id, responder } => {
2156 assert_eq!(expected_stream_id, stream_id);
2157 let caps = vec![
2158 ServiceCapability::MediaTransport,
2159 ServiceCapability::MediaCodec {
2160 media_type: avdtp::MediaType::Audio,
2161 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2162 codec_extra: vec![0x11, 0x45, 51, 250],
2163 },
2164 ];
2165 responder.send(&caps[..])
2166 }
2167 avdtp::Request::Open { responder, .. } => responder.send(),
2168 avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2169 x => panic!("Unexpected request: {:?}", x),
2170 };
2171 res.expect("should be able to respond");
2172 }
2173
2174 let collect_capabilities_fut = peer.collect_capabilities();
2176 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2177
2178 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2179
2180 let request = exec.run_singlethreaded(&mut remote_events.next());
2181 remote_handle_request(request.expect("should have a discovery request").unwrap());
2182
2183 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2184 let request = exec.run_singlethreaded(&mut remote_events.next());
2185 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2186
2187 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2188
2189 let remote_seid = 2_u8.try_into().unwrap();
2191
2192 let codec_params = ServiceCapability::MediaCodec {
2193 media_type: avdtp::MediaType::Audio,
2194 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2195 codec_extra: vec![0x11, 0x45, 51, 51],
2196 };
2197 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2198 let mut start_future = pin!(start_future);
2199
2200 match exec.run_until_stalled(&mut start_future) {
2201 Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2202 x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2203 };
2204 }
2205
2206 #[fuchsia::test]
2207 fn peer_stream_start_fails_to_connect() {
2208 let mut exec = fasync::TestExecutor::new();
2209
2210 let (mut remote, mut profile_request_stream, _, peer) =
2211 setup_test_peer(false, build_test_streams(), None);
2212
2213 let remote_seid = 2_u8.try_into().unwrap();
2214
2215 let codec_params = ServiceCapability::MediaCodec {
2216 media_type: avdtp::MediaType::Audio,
2217 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2218 codec_extra: vec![0x11, 0x45, 51, 51],
2219 };
2220
2221 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2222 let mut start_future = pin!(start_future);
2223
2224 match exec.run_until_stalled(&mut start_future) {
2225 Poll::Pending => {}
2226 x => panic!("was expecting pending but got {x:?}"),
2227 };
2228
2229 receive_simple_accept(&mut remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
2232
2233 receive_simple_accept(&mut remote, 0x06); match exec.run_until_stalled(&mut start_future) {
2236 Poll::Pending => {}
2237 Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2238 };
2239
2240 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2242 match request {
2243 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2244 assert_eq!(PeerId(1), peer_id.into());
2245 responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2246 }
2247 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2248 };
2249
2250 match exec.run_until_stalled(&mut start_future) {
2253 Poll::Pending => panic!("Should be ready after start fails"),
2254 Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2255 Poll::Ready(Err(_)) => {}
2256 }
2257 }
2258
2259 #[fuchsia::test]
2261 async fn peer_delay_report() {
2262 let (remote, _profile_requests, cobalt_recv, peer) =
2263 setup_test_peer(true, build_test_streams(), None);
2264 let remote_peer = avdtp::Peer::new(remote);
2265 let mut remote_events = remote_peer.take_request_stream();
2266
2267 async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2269 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2270 let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2272 use avdtp::Request::*;
2273 match req {
2274 Discover { responder } => {
2275 let infos = [avdtp::StreamInformation::new(
2276 expected_stream_id,
2277 false,
2278 avdtp::MediaType::Audio,
2279 avdtp::EndpointType::Sink,
2280 )];
2281 responder.send(&infos).expect("response should succeed");
2282 }
2283 GetAllCapabilities { stream_id, responder }
2284 | GetCapabilities { stream_id, responder } => {
2285 assert_eq!(expected_stream_id, stream_id);
2286 let caps = vec![
2287 ServiceCapability::MediaTransport,
2288 ServiceCapability::MediaCodec {
2289 media_type: avdtp::MediaType::Audio,
2290 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2291 codec_extra: vec![0x11, 0x45, 51, 250],
2292 },
2293 ];
2294 responder.send(&caps[..]).expect("response should succeed");
2295 assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2298 }
2299 Open { responder, stream_id } => {
2300 assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2302 peer.delay_report(&expected_peer_stream_id, 0xc0de)
2304 .await
2305 .expect("should get acked correctly");
2306 assert_eq!(expected_stream_id, stream_id);
2307 responder.send().expect("response should succeed");
2308 }
2309 SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2310 assert_eq!(local_stream_id, expected_stream_id);
2311 assert_eq!(remote_stream_id, expected_peer_stream_id);
2312 responder.send().expect("should send back response without issue");
2313 }
2314 x => panic!("Unexpected request: {:?}", x),
2315 };
2316 }
2317
2318 let collect_fut = pin!(peer.collect_capabilities());
2319
2320 let Either::Left((request, collect_fut)) =
2322 futures::future::select(remote_events.next(), collect_fut).await
2323 else {
2324 panic!("Collect future shouldn't finish first");
2325 };
2326 let collect_fut = pin!(collect_fut);
2327 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2328 let Either::Left((request, collect_fut)) =
2329 futures::future::select(remote_events.next(), collect_fut).await
2330 else {
2331 panic!("Collect future shouldn't finish first");
2332 };
2333 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2334
2335 assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2337
2338 let remote_seid = 4_u8.try_into().unwrap();
2340
2341 let codec_params = ServiceCapability::MediaCodec {
2342 media_type: avdtp::MediaType::Audio,
2343 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2344 codec_extra: vec![0x11, 0x45, 51, 51],
2345 };
2346
2347 let _start_task = fasync::Task::spawn(async move {
2350 let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2351 panic!("stream start task finished");
2352 });
2353
2354 let request = remote_events.next().await.expect("should have set_config").unwrap();
2355 remote_handle_request(request, &remote_peer).await;
2356
2357 let request = remote_events.next().await.expect("should have open").unwrap();
2358 remote_handle_request(request, &remote_peer).await;
2359
2360 let mut cobalt = cobalt_recv.expect("should have receiver");
2361
2362 let mut got_ids = HashMap::new();
2363 let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2364 while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2365 let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2366 let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2367 if report.metric_id == delay_metric_id {
2369 assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2370 }
2371 }
2372 assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2373 assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2374 assert!(got_ids.contains_key(&delay_metric_id));
2375 assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2378 }
2379
2380 fn sbc_capabilities() -> Vec<ServiceCapability> {
2381 let sbc_codec_info = SbcCodecInfo::new(
2382 SbcSamplingFrequency::FREQ48000HZ,
2383 SbcChannelMode::JOINT_STEREO,
2384 SbcBlockCount::SIXTEEN,
2385 SbcSubBands::EIGHT,
2386 SbcAllocation::LOUDNESS,
2387 53,
2388 53,
2389 )
2390 .expect("sbc codec info");
2391
2392 vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2393 }
2394
2395 #[fuchsia::test]
2397 fn peer_as_acceptor() {
2398 let mut exec = fasync::TestExecutor::new();
2399
2400 let mut streams = Streams::default();
2401 let mut test_builder = TestMediaTaskBuilder::new();
2402 streams.insert(Stream::build(
2403 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2404 test_builder.builder(),
2405 ));
2406
2407 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2408 let remote_peer = avdtp::Peer::new(remote);
2409
2410 let discover_fut = remote_peer.discover();
2411 let mut discover_fut = pin!(discover_fut);
2412
2413 let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2414 match exec.run_until_stalled(&mut discover_fut) {
2415 Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2416 x => panic!("Expected discovery to complete and got {:?}", x),
2417 };
2418
2419 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2420 let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2421
2422 let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2423 let mut get_caps_fut = pin!(get_caps_fut);
2424
2425 match exec.run_until_stalled(&mut get_caps_fut) {
2426 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2428 x => panic!("Get capabilities should be ready but got {:?}", x),
2429 };
2430
2431 let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2432 let mut get_caps_fut = pin!(get_caps_fut);
2433
2434 match exec.run_until_stalled(&mut get_caps_fut) {
2435 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2436 assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2437 }
2438 x => panic!("Get capabilities should be a ready error but got {:?}", x),
2439 };
2440
2441 let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2442 let mut get_caps_fut = pin!(get_caps_fut);
2443
2444 match exec.run_until_stalled(&mut get_caps_fut) {
2445 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2447 x => panic!("Get capabilities should be ready but got {:?}", x),
2448 };
2449
2450 let sbc_caps = sbc_capabilities();
2451 let set_config_fut =
2452 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2453 let mut set_config_fut = pin!(set_config_fut);
2454
2455 match exec.run_until_stalled(&mut set_config_fut) {
2456 Poll::Ready(Ok(())) => {}
2457 x => panic!("Set capabilities should be ready but got {:?}", x),
2458 };
2459
2460 let open_fut = remote_peer.open(&sbc_endpoint_id);
2461 let mut open_fut = pin!(open_fut);
2462 match exec.run_until_stalled(&mut open_fut) {
2463 Poll::Ready(Ok(())) => {}
2464 x => panic!("Open should be ready but got {:?}", x),
2465 };
2466
2467 let (_remote_transport, transport) = Channel::create();
2469
2470 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2471
2472 let stream_ids = vec![sbc_endpoint_id.clone()];
2473 let start_fut = remote_peer.start(&stream_ids);
2474 let mut start_fut = pin!(start_fut);
2475 match exec.run_until_stalled(&mut start_fut) {
2476 Poll::Ready(Ok(())) => {}
2477 x => panic!("Start should be ready but got {:?}", x),
2478 };
2479
2480 let media_task = test_builder.expect_task();
2482 assert!(media_task.is_started());
2483
2484 let suspend_fut = remote_peer.suspend(&stream_ids);
2485 let mut suspend_fut = pin!(suspend_fut);
2486 match exec.run_until_stalled(&mut suspend_fut) {
2487 Poll::Ready(Ok(())) => {}
2488 x => panic!("Start should be ready but got {:?}", x),
2489 };
2490
2491 assert!(!media_task.is_started());
2493 }
2494
2495 #[fuchsia::test]
2496 fn peer_set_config_reject_first() {
2497 let mut exec = fasync::TestExecutor::new();
2498
2499 let mut streams = Streams::default();
2500 let test_builder = TestMediaTaskBuilder::new();
2501 streams.insert(Stream::build(
2502 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2503 test_builder.builder(),
2504 ));
2505
2506 let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2507 let remote_peer = avdtp::Peer::new(remote);
2508
2509 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2510
2511 let wrong_freq_sbc = &[SbcCodecInfo::new(
2512 SbcSamplingFrequency::FREQ44100HZ, SbcChannelMode::JOINT_STEREO,
2514 SbcBlockCount::SIXTEEN,
2515 SbcSubBands::EIGHT,
2516 SbcAllocation::LOUDNESS,
2517 53,
2518 53,
2519 )
2520 .expect("sbc codec info")
2521 .into()];
2522
2523 let set_config_fut =
2524 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2525 let mut set_config_fut = pin!(set_config_fut);
2526
2527 match exec.run_until_stalled(&mut set_config_fut) {
2528 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2529 assert!(e.service_category().is_some())
2530 }
2531 x => panic!("Set capabilities should have been rejected but got {:?}", x),
2532 };
2533
2534 let sbc_caps = sbc_capabilities();
2535 let set_config_fut =
2536 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2537 let mut set_config_fut = pin!(set_config_fut);
2538
2539 match exec.run_until_stalled(&mut set_config_fut) {
2540 Poll::Ready(Ok(())) => {}
2541 x => panic!("Set capabilities should be ready but got {:?}", x),
2542 };
2543 }
2544
2545 #[fuchsia::test]
2546 fn peer_starts_waiting_streams() {
2547 let mut exec = fasync::TestExecutor::new_with_fake_time();
2548 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2549
2550 let mut streams = Streams::default();
2551 let mut test_builder = TestMediaTaskBuilder::new();
2552 streams.insert(Stream::build(
2553 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2554 test_builder.builder(),
2555 ));
2556
2557 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2558 let remote_peer = avdtp::Peer::new(remote);
2559
2560 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2561
2562 let sbc_caps = sbc_capabilities();
2563 let set_config_fut =
2564 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2565 let mut set_config_fut = pin!(set_config_fut);
2566
2567 match exec.run_until_stalled(&mut set_config_fut) {
2568 Poll::Ready(Ok(())) => {}
2569 x => panic!("Set capabilities should be ready but got {:?}", x),
2570 };
2571
2572 let open_fut = remote_peer.open(&sbc_endpoint_id);
2573 let mut open_fut = pin!(open_fut);
2574 match exec.run_until_stalled(&mut open_fut) {
2575 Poll::Ready(Ok(())) => {}
2576 x => panic!("Open should be ready but got {:?}", x),
2577 };
2578
2579 let (_remote_transport, transport) = Channel::create();
2581 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2582
2583 let mut remote_requests = remote_peer.take_request_stream();
2585 let next_remote_request_fut = remote_requests.next();
2586 let mut next_remote_request_fut = pin!(next_remote_request_fut);
2587
2588 assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2590
2591 exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2593 let _ = exec.wake_expired_timers();
2594
2595 let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2596 Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2597 responder.send().unwrap();
2598 stream_ids
2599 }
2600 x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2601 };
2602
2603 let media_task =
2605 exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2606 assert!(media_task.is_started());
2607
2608 let suspend_fut = remote_peer.suspend(&stream_ids);
2610 let mut suspend_fut = pin!(suspend_fut);
2611 match exec.run_until_stalled(&mut suspend_fut) {
2612 Poll::Ready(Ok(())) => {}
2613 x => panic!("Suspend should be ready but got {:?}", x),
2614 };
2615
2616 assert!(!media_task.is_started());
2618 }
2619
2620 #[fuchsia::test]
2621 fn needs_permit_to_start_streams() {
2622 let mut exec = fasync::TestExecutor::new();
2623
2624 let mut streams = Streams::default();
2625 let mut test_builder = TestMediaTaskBuilder::new();
2626 streams.insert(Stream::build(
2627 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2628 test_builder.builder(),
2629 ));
2630 streams.insert(Stream::build(
2631 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2632 test_builder.builder(),
2633 ));
2634 let mut next_task_fut = test_builder.next_task();
2635
2636 let permits = Permits::new(1);
2637 let taken_permit = permits.get().expect("permit taken");
2638 let (remote, _profile_request_stream, _, peer) =
2639 setup_test_peer(false, streams, Some(permits.clone()));
2640 let remote_peer = avdtp::Peer::new(remote);
2641
2642 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2643
2644 let sbc_caps = sbc_capabilities();
2645 let mut set_config_fut =
2646 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2647
2648 match exec.run_until_stalled(&mut set_config_fut) {
2649 Poll::Ready(Ok(())) => {}
2650 x => panic!("Set capabilities should be ready but got {:?}", x),
2651 };
2652
2653 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2654 match exec.run_until_stalled(&mut open_fut) {
2655 Poll::Ready(Ok(())) => {}
2656 x => panic!("Open should be ready but got {:?}", x),
2657 };
2658
2659 let (_remote_transport, transport) = Channel::create();
2661 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2662
2663 let sbc_endpoint_two = 2_u8.try_into().unwrap();
2665
2666 let mut set_config_fut =
2667 remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2668
2669 match exec.run_until_stalled(&mut set_config_fut) {
2670 Poll::Ready(Ok(())) => {}
2671 x => panic!("Set capabilities should be ready but got {:?}", x),
2672 };
2673
2674 let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2675 match exec.run_until_stalled(&mut open_fut) {
2676 Poll::Ready(Ok(())) => {}
2677 x => panic!("Open should be ready but got {:?}", x),
2678 };
2679
2680 let (_remote_transport_two, transport_two) = Channel::create();
2682 assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2683
2684 let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2687 let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2688 let mut start_fut = remote_peer.start(&stream_ids);
2689 match exec.run_until_stalled(&mut start_fut) {
2690 Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2691 assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2692 assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2693 }
2694 x => panic!("Start should be ready but got {:?}", x),
2695 };
2696
2697 let mut remote_requests = remote_peer.take_request_stream();
2699
2700 let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2701 Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2702 responder.send().unwrap();
2703 stream_ids
2704 }
2705 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2706 };
2707
2708 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2709 assert_eq!(1, suspended_stream_ids.len());
2710
2711 match exec.run_until_stalled(&mut next_task_fut) {
2713 Poll::Pending => {}
2714 x => panic!("Local task should not have been created at this point: {:?}", x),
2715 };
2716
2717 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2720 match exec.run_until_stalled(&mut start_fut) {
2721 Poll::Ready(Ok(())) => {}
2722 x => panic!("Start should be ready but got {:?}", x),
2723 }
2724
2725 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2726 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2727 responder.send().unwrap();
2728 stream_ids
2729 }
2730 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2731 };
2732 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2733
2734 drop(taken_permit);
2736
2737 match exec.run_singlethreaded(&mut remote_requests.next()) {
2738 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2739 assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2740 responder.send().unwrap();
2741 }
2742 x => panic!("Expected start on permit available but got {x:?}"),
2743 };
2744
2745 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2747 Poll::Ready(Some(task)) => task,
2748 x => panic!("Local task should be created at this point: {:?}", x),
2749 };
2750
2751 assert!(media_task.is_started());
2752
2753 let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2755 match exec.run_until_stalled(&mut start_fut) {
2756 Poll::Ready(Ok(())) => {}
2757 x => panic!("Start should be ready but got {:?}", x),
2758 }
2759
2760 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2761 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2762 responder.send().unwrap();
2763 stream_ids
2764 }
2765 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2766 };
2767
2768 assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2769 assert_eq!(1, suspended_stream_ids.len());
2770
2771 let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2773 match exec.run_until_stalled(&mut suspend_fut) {
2774 Poll::Ready(Ok(())) => {}
2775 x => panic!("Start should be ready but got {:?}", x),
2776 }
2777
2778 match exec.run_singlethreaded(&mut remote_requests.next()) {
2779 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2780 assert_eq!(stream_ids, &[sbc_endpoint_two]);
2781 responder.send().unwrap();
2782 }
2783 x => panic!("Expected start on permit available but got {x:?}"),
2784 };
2785 }
2786
2787 fn start_sbc_stream(
2788 exec: &mut fasync::TestExecutor,
2789 media_test_builder: &mut TestMediaTaskBuilder,
2790 peer: &Peer,
2791 remote_peer: &avdtp::Peer,
2792 local_id: &StreamEndpointId,
2793 remote_id: &StreamEndpointId,
2794 ) -> TestMediaTask {
2795 let sbc_caps = sbc_capabilities();
2796 let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2797 let mut set_config_fut = pin!(set_config_fut);
2798
2799 match exec.run_until_stalled(&mut set_config_fut) {
2800 Poll::Ready(Ok(())) => {}
2801 x => panic!("Set capabilities should be ready but got {:?}", x),
2802 };
2803
2804 let open_fut = remote_peer.open(&local_id);
2805 let mut open_fut = pin!(open_fut);
2806 match exec.run_until_stalled(&mut open_fut) {
2807 Poll::Ready(Ok(())) => {}
2808 x => panic!("Open should be ready but got {:?}", x),
2809 };
2810
2811 let (_remote_transport, transport) = Channel::create();
2813 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2814
2815 let stream_ids = [local_id.clone()];
2817 let start_fut = remote_peer.start(&stream_ids);
2818 let mut start_fut = pin!(start_fut);
2819 match exec.run_until_stalled(&mut start_fut) {
2820 Poll::Ready(Ok(())) => {}
2821 x => panic!("Start should be ready but got {:?}", x),
2822 };
2823
2824 let media_task = media_test_builder.expect_task();
2826 assert!(media_task.is_started());
2827
2828 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2829 media_task
2830 }
2831
2832 #[fuchsia::test]
2833 fn permits_can_be_revoked_and_reinstated_all() {
2834 let mut exec = fasync::TestExecutor::new();
2835
2836 let mut streams = Streams::default();
2837 let mut test_builder = TestMediaTaskBuilder::new();
2838 streams.insert(Stream::build(
2839 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2840 test_builder.builder(),
2841 ));
2842 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2843 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2844
2845 streams.insert(Stream::build(
2846 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2847 test_builder.builder(),
2848 ));
2849 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2850 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2851
2852 let permits = Permits::new(2);
2853
2854 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2855 let remote_peer = avdtp::Peer::new(remote);
2856
2857 let one_media_task = start_sbc_stream(
2858 &mut exec,
2859 &mut test_builder,
2860 &peer,
2861 &remote_peer,
2862 &sbc_endpoint_id,
2863 &remote_sbc_endpoint_id,
2864 );
2865 let two_media_task = start_sbc_stream(
2866 &mut exec,
2867 &mut test_builder,
2868 &peer,
2869 &remote_peer,
2870 &sbc2_endpoint_id,
2871 &remote_sbc2_endpoint_id,
2872 );
2873
2874 let taken_permits = permits.seize();
2876
2877 let remote_endpoints: HashSet<_> =
2878 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2879
2880 let mut remote_requests = remote_peer.take_request_stream();
2882 let mut expected_suspends = remote_endpoints.clone();
2883 while !expected_suspends.is_empty() {
2884 match exec.run_until_stalled(&mut remote_requests.next()) {
2885 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2886 for stream_id in stream_ids {
2887 assert!(expected_suspends.remove(&stream_id));
2888 }
2889 responder.send().expect("send response okay");
2890 }
2891 x => panic!("Expected suspension and got {:?}", x),
2892 }
2893 }
2894
2895 assert!(!one_media_task.is_started());
2897 assert!(!two_media_task.is_started());
2898
2899 drop(taken_permits);
2901
2902 let mut expected_starts = remote_endpoints.clone();
2903 while !expected_starts.is_empty() {
2904 match exec.run_singlethreaded(&mut remote_requests.next()) {
2905 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2906 for stream_id in stream_ids {
2907 assert!(expected_starts.remove(&stream_id));
2908 }
2909 responder.send().expect("send response okay");
2910 }
2911 x => panic!("Expected start and got {:?}", x),
2912 }
2913 }
2914 let one_media_task = test_builder.expect_task();
2917 assert!(one_media_task.is_started());
2918 let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2919 Poll::Ready(Some(task)) => task,
2920 x => panic!("Expected another ready task but {x:?}"),
2921 };
2922 assert!(two_media_task.is_started());
2923 }
2924
2925 #[fuchsia::test]
2926 fn permits_can_be_revoked_one_at_a_time() {
2927 let mut exec = fasync::TestExecutor::new();
2928
2929 let mut streams = Streams::default();
2930 let mut test_builder = TestMediaTaskBuilder::new();
2931 streams.insert(Stream::build(
2932 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2933 test_builder.builder(),
2934 ));
2935 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2936 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2937
2938 streams.insert(Stream::build(
2939 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2940 test_builder.builder(),
2941 ));
2942 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2943 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2944
2945 let permits = Permits::new(2);
2946
2947 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2948 let remote_peer = avdtp::Peer::new(remote);
2949
2950 let one_media_task = start_sbc_stream(
2951 &mut exec,
2952 &mut test_builder,
2953 &peer,
2954 &remote_peer,
2955 &sbc_endpoint_id,
2956 &remote_sbc_endpoint_id,
2957 );
2958 let two_media_task = start_sbc_stream(
2959 &mut exec,
2960 &mut test_builder,
2961 &peer,
2962 &remote_peer,
2963 &sbc2_endpoint_id,
2964 &remote_sbc2_endpoint_id,
2965 );
2966
2967 let taken_permit = permits.take();
2969
2970 let remote_endpoints: HashSet<_> =
2971 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2972
2973 let mut remote_requests = remote_peer.take_request_stream();
2975 let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2976 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2977 assert!(stream_ids.len() == 1);
2978 assert!(remote_endpoints.contains(&stream_ids[0]));
2979 responder.send().expect("send response okay");
2980 stream_ids[0].clone()
2981 }
2982 x => panic!("Expected suspension and got {:?}", x),
2983 };
2984
2985 if suspended_id == remote_sbc_endpoint_id {
2987 assert!(!one_media_task.is_started());
2988 assert!(two_media_task.is_started());
2989 } else {
2990 assert!(one_media_task.is_started());
2991 assert!(!two_media_task.is_started());
2992 }
2993
2994 drop(taken_permit);
2996
2997 match exec.run_singlethreaded(&mut remote_requests.next()) {
2998 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2999 assert_eq!(stream_ids, &[suspended_id]);
3000 responder.send().expect("send response okay");
3001 }
3002 x => panic!("Expected start and got {:?}", x),
3003 }
3004 let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
3006 Poll::Ready(Some(task)) => task,
3007 x => panic!("Expected media task to start: {x:?}"),
3008 };
3009 assert!(media_task.is_started());
3010 }
3011
3012 #[fuchsia::test]
3015 fn permit_suspend_start_while_suspending() {
3016 let mut exec = fasync::TestExecutor::new();
3017
3018 let mut streams = Streams::default();
3019 let mut test_builder = TestMediaTaskBuilder::new();
3020 streams.insert(Stream::build(
3021 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3022 test_builder.builder(),
3023 ));
3024 streams.insert(Stream::build(
3025 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3026 test_builder.builder(),
3027 ));
3028 let mut next_task_fut = test_builder.next_task();
3029
3030 let permits = Permits::new(1);
3031 let (remote, _profile_request_stream, _, peer) =
3032 setup_test_peer(false, streams, Some(permits.clone()));
3033
3034 let remote_peer = avdtp::Peer::new(remote);
3035 let mut remote_requests = remote_peer.take_request_stream();
3036
3037 let sbc_endpoint_id = 1_u8.try_into().unwrap();
3038
3039 let sbc_caps = sbc_capabilities();
3040 let mut set_config_fut =
3041 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3042
3043 match exec.run_until_stalled(&mut set_config_fut) {
3044 Poll::Ready(Ok(())) => {}
3045 x => panic!("Set capabilities should be ready but got {:?}", x),
3046 };
3047
3048 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3049 match exec.run_until_stalled(&mut open_fut) {
3050 Poll::Ready(Ok(())) => {}
3051 x => panic!("Open should be ready but got {:?}", x),
3052 };
3053
3054 let (_remote_transport, transport) = Channel::create();
3056 assert_eq!(Some(()), peer.receive_channel(transport).ok());
3057
3058 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3060 let Some(_deadline) = exec.wake_next_timer() else {
3061 panic!("Expected a timer to be waiting to run");
3062 };
3063
3064 let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3066 Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3067 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3068 responder
3069 }
3070 x => panic!("Expected a Start request, got {x:?}"),
3071 };
3072
3073 assert!(permits.get().is_none());
3074
3075 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3077
3078 match exec.run_singlethreaded(&mut start_fut) {
3081 Ok(()) => {}
3082 x => panic!("Expected OK response from start future but got {x:?}"),
3083 }
3084
3085 let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3086 Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3087 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3088 responder
3089 }
3090 x => panic!("Expected a suspend got {x:?}"),
3091 };
3092
3093 start_responder.send().unwrap();
3095
3096 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3097
3098 suspend_responder.send().unwrap();
3100
3101 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3103 Poll::Ready(Some(task)) => task,
3104 x => panic!("Local task should be created at this point: {:?}", x),
3105 };
3106
3107 assert!(media_task.is_started());
3108 }
3109
3110 #[fuchsia::test]
3113 fn version_check() {
3114 let p1: ProfileDescriptor = ProfileDescriptor {
3115 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3116 major_version: Some(1),
3117 minor_version: Some(3),
3118 ..Default::default()
3119 };
3120 assert_eq!(true, a2dp_version_check(p1));
3121
3122 let p1: ProfileDescriptor = ProfileDescriptor {
3123 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3124 major_version: Some(2),
3125 minor_version: Some(10),
3126 ..Default::default()
3127 };
3128 assert_eq!(true, a2dp_version_check(p1));
3129
3130 let p1: ProfileDescriptor = ProfileDescriptor {
3131 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3132 major_version: Some(1),
3133 minor_version: Some(0),
3134 ..Default::default()
3135 };
3136 assert_eq!(false, a2dp_version_check(p1));
3137
3138 let p1: ProfileDescriptor = ProfileDescriptor {
3139 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3140 major_version: None,
3141 minor_version: Some(9),
3142 ..Default::default()
3143 };
3144 assert_eq!(false, a2dp_version_check(p1));
3145
3146 let p1: ProfileDescriptor = ProfileDescriptor {
3147 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3148 major_version: Some(2),
3149 minor_version: Some(2),
3150 ..Default::default()
3151 };
3152 assert_eq!(true, a2dp_version_check(p1));
3153 }
3154}