Skip to main content

bt_a2dp/peer/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use bt_avdtp::{
7    self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8    StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12    ConnectParameters, L2capParameters, PSM_AVDTP, ProfileDescriptor, ProfileProxy,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{Future, FutureExt, StreamExt, select};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30/// For sending out-of-band commands over the A2DP peer.
31mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38/// A Peer represents an A2DP peer which may be connected to this device.
39/// Only one A2DP peer should exist for each Bluetooth peer.
40#[derive(Inspect)]
41pub struct Peer {
42    /// The id of the peer we are connected to.
43    id: PeerId,
44    /// Inner keeps track of the peer and the streams.
45    #[inspect(forward)]
46    inner: Arc<Mutex<PeerInner>>,
47    /// Profile Proxy to connect new transport channels
48    profile: ProfileProxy,
49    /// The profile descriptor for this peer, if it has been discovered.
50    descriptor: Mutex<Option<ProfileDescriptor>>,
51    /// Wakers that are to be woken when the peer disconnects.  If None, the peers have been woken
52    /// and this peer is disconnected.  Shared weakly with ClosedPeer future objects that complete
53    /// when the peer disconnects.
54    closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55    /// Used to report peer metrics to Cobalt.
56    metrics: bt_metrics::MetricsLogger,
57    /// A task waiting to start a stream if it hasn't been started yet.
58    start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61/// StreamPermits handles reserving and retrieving permits for streaming audio.
62/// Reservations are automatically retrieved for streams that are revoked, and when the
63/// reservation is completed, the permit is stored and a StreamPermit is sent so it can be started.
64#[derive(Clone)]
65struct StreamPermits {
66    permits: Permits,
67    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68    reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69    inner: Weak<Mutex<PeerInner>>,
70    peer_id: PeerId,
71    sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76    local_id: StreamEndpointId,
77    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81    fn local_id(&self) -> &StreamEndpointId {
82        &self.local_id
83    }
84
85    /// Returns true if a Permit is held for this stream endpoint.
86    fn is_held(&self) -> bool {
87        self.open_streams.lock().contains_key(&self.local_id)
88    }
89}
90
91impl Drop for StreamPermit {
92    fn drop(&mut self) {
93        let _ = self.open_streams.lock().remove(&self.local_id);
94    }
95}
96
97impl StreamPermits {
98    fn new(
99        inner: Weak<Mutex<PeerInner>>,
100        peer_id: PeerId,
101        permits: Permits,
102    ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103        let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104        (
105            Self {
106                inner,
107                permits,
108                peer_id,
109                sender,
110                open_streams: Default::default(),
111                reserved_streams: Default::default(),
112            },
113            reservations_receiver,
114        )
115    }
116
117    fn label_for(&self, local_id: &StreamEndpointId) -> String {
118        format!("{} {}", self.peer_id, local_id)
119    }
120
121    /// Get a permit to stream on the stream with id `local_id`.
122    /// Returns Some() if there is a permit available.
123    fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124        let revoke_fn = self.make_revocation_fn(&local_id);
125        let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126            info!("No permits available: {:?}", self.permits);
127            return None;
128        };
129        permit.relabel(self.label_for(&local_id));
130        if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131            warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132        }
133        Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134    }
135
136    /// Get a reservation that will resolve to a StreamPermit to start a stream with the id
137    /// `local_id`
138    fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139        if !self.reserved_streams.lock().insert(local_id.clone()) {
140            // Already reserved.
141            return;
142        }
143        let restart_stream_available_fut = {
144            let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145            let reservation = self.permits.reserve_revokable(self_revoke_fn);
146            let open_streams = self.open_streams.clone();
147            let reserved_streams = self.reserved_streams.clone();
148            let label = self.label_for(&local_id);
149            let local_id = local_id.clone();
150            async move {
151                let permit = reservation.await;
152                permit.relabel(label);
153                if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154                    warn!("Reservation replaces acquired permit for {}", local_id.clone());
155                }
156                if !reserved_streams.lock().remove(&local_id) {
157                    warn!(local_id:%; "Unrecorded reservation resolved");
158                }
159                StreamPermit { local_id, open_streams }
160            }
161        };
162        if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163            warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164        }
165    }
166
167    /// Revokes a permit that was previously delivered, suspending the local stream and signaling
168    /// the peer.
169    /// The permit must have been previously received through StreamPermits::get or a have been
170    /// restarted after being revoked, otherwise will panic.
171    fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172        if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173            {
174                let mut lock = peer.lock();
175                match lock.suspend_local_stream(&local_id) {
176                    Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177                    Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178                }
179            }
180            self.setup_reservation_for(local_id.clone());
181        }
182        self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183    }
184
185    fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit + use<> {
186        let local_id = local_id.clone();
187        let cloned = self.clone();
188        move || cloned.revocation_fn(local_id)
189    }
190}
191
192impl Peer {
193    /// Make a new Peer which is connected to the peer `id` using the AVDTP `peer`.
194    /// The `streams` are the local endpoints available to the peer.
195    /// `profile` will be used to initiate connections for Media Transport.
196    /// The `permits`, if provided, will acquire a permit before starting streams on this peer.
197    /// If `metrics` is included, metrics for codec availability will be reported.
198    /// This also starts a task on the executor to handle incoming events from the peer.
199    pub fn create(
200        id: PeerId,
201        peer: avdtp::Peer,
202        streams: Streams,
203        permits: Option<Permits>,
204        profile: ProfileProxy,
205        metrics: bt_metrics::MetricsLogger,
206    ) -> Self {
207        let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208        let reservations_receiver = if let Some(permits) = permits {
209            let (stream_permits, receiver) =
210                StreamPermits::new(Arc::downgrade(&inner), id, permits);
211            inner.lock().permits = Some(stream_permits);
212            receiver
213        } else {
214            let (_, receiver) = mpsc::unbounded();
215            receiver
216        };
217        let res = Self {
218            id,
219            inner,
220            profile,
221            descriptor: Mutex::new(None),
222            closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223            metrics,
224            start_stream_task: Mutex::new(None),
225        };
226        res.start_requests_task(reservations_receiver);
227        res
228    }
229
230    pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231        self.descriptor.lock().replace(descriptor)
232    }
233
234    /// How long to wait after a non-local establishment of a stream to start the stream.
235    /// Chosen to produce reasonably quick startup while allowing for peer start.
236    const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238    /// Receive a channel from the peer that was initiated remotely.
239    /// This function should be called whenever the peer associated with this opens an L2CAP channel.
240    /// If this completes opening a stream, streams that are suspended will be scheduled to start.
241    pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242        let mut lock = self.inner.lock();
243        if lock.receive_channel(channel)? {
244            let weak = Arc::downgrade(&self.inner);
245            let mut task_lock = self.start_stream_task.lock();
246            *task_lock = Some(fasync::Task::local(async move {
247                trace!("Dwelling to start remotely-opened stream..");
248                fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249                PeerInner::start_opened(weak).await
250            }));
251        }
252        Ok(())
253    }
254
255    /// Return a handle to the AVDTP peer, to use as initiator of commands.
256    pub fn avdtp(&self) -> avdtp::Peer {
257        let lock = self.inner.lock();
258        lock.peer.clone()
259    }
260
261    /// Returns the stream endpoints discovered by this peer.
262    pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263        self.inner.lock().remote_endpoints()
264    }
265
266    /// Perform Discovery and Collect Capabilities to enumerate the endpoints and capabilities of
267    /// the connected peer.
268    /// Returns a future which performs the work and resolves to a vector of peer stream endpoints.
269    pub fn collect_capabilities(
270        &self,
271    ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> + use<> {
272        let avdtp = self.avdtp();
273        let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274        let inner = self.inner.clone();
275        let metrics = self.metrics.clone();
276        let peer_id = self.id;
277        async move {
278            if let Some(caps) = inner.lock().remote_endpoints() {
279                return Ok(caps);
280            }
281            trace!("Discovering peer streams..");
282            let infos = avdtp.discover().await?;
283            trace!("Discovered {} streams", infos.len());
284            let mut remote_streams = Vec::new();
285            for info in infos {
286                let capabilities = if get_all {
287                    avdtp.get_all_capabilities(info.id()).await
288                } else {
289                    avdtp.get_capabilities(info.id()).await
290                };
291                match capabilities {
292                    Ok(capabilities) => {
293                        trace!("Stream {:?}", info);
294                        for cap in &capabilities {
295                            trace!("  - {:?}", cap);
296                        }
297                        remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298                    }
299                    Err(e) => {
300                        info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301                    }
302                };
303            }
304            inner.lock().set_remote_endpoints(&remote_streams);
305            Self::record_cobalt_metrics(metrics, &remote_streams);
306            Ok(remote_streams)
307        }
308    }
309
310    fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311        let codec_metrics: HashSet<_> = endpoints
312            .iter()
313            .filter_map(|endpoint| {
314                endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315            })
316            .collect();
317        metrics
318            .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320        let cap_metrics: HashSet<_> = endpoints
321            .iter()
322            .flat_map(|endpoint| {
323                endpoint
324                    .capabilities()
325                    .iter()
326                    .filter_map(|t| capability_to_metric(t))
327                    .chain(std::iter::once(
328                        bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329                    ))
330                    .map(|t| t as u32)
331            })
332            .collect();
333        metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334    }
335
336    fn transport_channel_params() -> L2capParameters {
337        L2capParameters {
338            psm: Some(PSM_AVDTP),
339            parameters: Some(ChannelParameters {
340                max_rx_packet_size: Some(65535),
341                ..Default::default()
342            }),
343            ..Default::default()
344        }
345    }
346
347    /// Open and start a media transport stream, connecting a compatible local stream to the remote
348    /// stream `remote_id`, configuring it with the `capabilities` provided.
349    /// Returns a future which should be awaited on.
350    /// The future returns Ok(()) if successfully started, and an appropriate error otherwise.
351    pub fn stream_start(
352        &self,
353        remote_id: StreamEndpointId,
354        capabilities: Vec<ServiceCapability>,
355    ) -> impl Future<Output = avdtp::Result<()>> {
356        let peer = Arc::downgrade(&self.inner);
357        let peer_id = self.id.clone();
358        let avdtp = self.avdtp();
359        let profile = self.profile.clone();
360
361        async move {
362            let codec_params =
363                capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364            let (local_id, local_capabilities) = {
365                let peer = PeerInner::upgrade(peer.clone())?;
366                let lock = peer.lock();
367                lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368            };
369
370            let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371                local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373            // Filter things out if they don't have a match in the local capabilities.
374            // Order them by the ServiceCategory ordinal - some noncompliant devices care about it.
375            let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376                .into_iter()
377                .filter_map(|cap| {
378                    let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379                        return None;
380                    };
381                    if cap.category() == ServiceCategory::MediaCodec {
382                        let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383                            return None;
384                        };
385                        let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386                            return None;
387                        };
388                        let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389                            return None;
390                        };
391                        Some((cap.category(), (&negotiated).into()))
392                    } else {
393                        Some((cap.category(), cap))
394                    }
395                })
396                .collect();
397            let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399            trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401            avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402            {
403                let strong = PeerInner::upgrade(peer.clone())?;
404                strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405            }
406            avdtp.open(&remote_id).await?;
407
408            debug!(peer_id:%; "Connecting transport channel");
409            let channel = profile
410                .connect(
411                    &peer_id.into(),
412                    &ConnectParameters::L2cap(Self::transport_channel_params()),
413                )
414                .await
415                .context("FIDL error: {}")?
416                .or(Err(avdtp::Error::PeerDisconnected))?;
417            trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418            let channel = match channel.try_into() {
419                Err(e) => {
420                    warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421                    return Err(avdtp::Error::PeerDisconnected);
422                }
423                Ok(c) => c,
424            };
425
426            trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428            {
429                let strong = PeerInner::upgrade(peer.clone())?;
430                let _ = strong.lock().receive_channel(channel)?;
431            }
432            // Start streams immediately if the channel is locally initiated.
433            PeerInner::start_opened(peer).await
434        }
435    }
436
437    /// Query whether any streams are currently started or scheduled to start.
438    pub fn streaming_active(&self) -> bool {
439        self.inner.lock().is_streaming() || self.will_start_streaming()
440    }
441
442    /// Returns true if there are any streams that are currently started.
443    #[cfg(test)]
444    fn is_streaming_now(&self) -> bool {
445        self.inner.lock().is_streaming_now()
446    }
447
448    /// Polls the task scheduled to start streaming, returning true if the task is still scheduled
449    /// to start streaming.
450    fn will_start_streaming(&self) -> bool {
451        let mut task_lock = self.start_stream_task.lock();
452        if task_lock.is_none() {
453            return false;
454        }
455        // This is the only thing that can poll the start task, so it is okay to ignore the wakeup.
456        let mut cx = Context::from_waker(&std::task::Waker::noop());
457        if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458            return true;
459        }
460        // Reset the task to None so that we don't try to re-poll it.
461        let _ = task_lock.take();
462        false
463    }
464
465    /// Suspend a media transport stream `local_id`.
466    /// It's possible that the stream is not active - a suspend will be attempted, but an
467    /// error from the command will be returned.
468    /// Returns the result of the suspend command.
469    pub fn stream_suspend(
470        &self,
471        local_id: StreamEndpointId,
472    ) -> impl Future<Output = avdtp::Result<()>> {
473        let peer = Arc::downgrade(&self.inner);
474        PeerInner::suspend(peer, local_id)
475    }
476
477    /// Start an asynchronous task to handle any requests from the AVDTP peer.
478    /// This task completes when the remote end closes the signaling connection.
479    fn start_requests_task(
480        &self,
481        mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482    ) {
483        let lock = self.inner.lock();
484        let mut request_stream = lock.peer.take_request_stream();
485        let id = self.id.clone();
486        let peer = Arc::downgrade(&self.inner);
487        let mut stream_reservations = FuturesUnordered::new();
488        let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489        fuchsia_async::Task::local(async move {
490            loop {
491                select! {
492                    request = request_stream.next() => {
493                        match request {
494                            None => break,
495                            Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496                            Some(Ok(request)) => match peer.upgrade() {
497                                None => return,
498                                Some(p) => {
499                                    let result_or_future = p.lock().handle_request(request);
500                                    let result = match result_or_future {
501                                        Either::Left(result) => result,
502                                        Either::Right(future) => future.await,
503                                    };
504                                    if let Err(e) = result {
505                                        warn!(peer_id:% = id, e:?; "Error handling request");
506                                    }
507                                }
508                            },
509                        }
510                    },
511                    reservation_fut = reservations_receiver.select_next_some() => {
512                        stream_reservations.push(reservation_fut)
513                    },
514                    permit = stream_reservations.select_next_some() => {
515                        if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516                            warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517                        }
518                    }
519                    complete => break,
520                }
521            }
522            info!(peer_id:% = id; "disconnected");
523            if let Some(wakers) = disconnect_wakers.upgrade() {
524                for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525                    waker.wake();
526                }
527            }
528        })
529        .detach();
530    }
531
532    /// Returns a future that will complete when the peer disconnects.
533    pub fn closed(&self) -> ClosedPeer {
534        ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535    }
536}
537
538/// Future which completes when the A2DP peer has closed the control connection.
539/// See `Peer::closed`
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542    inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546    type Output = ();
547
548    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549        match self.inner.upgrade() {
550            None => Poll::Ready(()),
551            Some(inner) => match inner.lock().as_mut() {
552                None => Poll::Ready(()),
553                Some(wakers) => {
554                    wakers.push(cx.waker().clone());
555                    Poll::Pending
556                }
557            },
558        }
559    }
560}
561
562/// Determines if Peer profile version is newer (>= 1.3) or older (< 1.3)
563fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564    let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565        return false;
566    };
567    (major == 1 && minor >= 3) || major > 1
568}
569
570/// Peer handles the communication with the AVDTP layer, and provides responses as appropriate
571/// based on the current state of local streams available.
572/// Each peer has its own set of local stream endpoints, and tracks a set of remote peer endpoints.
573struct PeerInner {
574    /// AVDTP peer communicating to this.
575    peer: avdtp::Peer,
576    /// The PeerId that this peer is representing
577    peer_id: PeerId,
578    /// Some(local_id) if an endpoint has been configured but hasn't finished opening.
579    /// Per AVDTP Sec 6.11 only up to one stream can be in this state.
580    opening: Option<StreamEndpointId>,
581    /// The local stream endpoint collection
582    local: Streams,
583    /// The permits that are available for this peer.
584    permits: Option<StreamPermits>,
585    /// Tasks watching for the end of a started stream. Key is the local stream id.
586    started: HashMap<StreamEndpointId, WatchedStream>,
587    /// The inspect node for this peer
588    inspect: fuchsia_inspect::Node,
589    /// The set of discovered remote endpoints. None until set.
590    remote_endpoints: Option<Vec<StreamEndpoint>>,
591    /// The inspect node representing the remote endpoints.
592    remote_inspect: fuchsia_inspect::Node,
593    /// Cobalt logger used to report peer metrics.
594    metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598    // Set up the StreamEndpoint to update the state
599    // The MediaTask node will be created when the media task is started.
600    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601        self.inspect = parent.create_child(name.as_ref());
602        self.inspect.record_string("id", self.peer_id.to_string());
603        self.local.iattach(&self.inspect, "local_streams")
604    }
605}
606
607impl PeerInner {
608    pub fn new(
609        peer: avdtp::Peer,
610        peer_id: PeerId,
611        local: Streams,
612        metrics: bt_metrics::MetricsLogger,
613    ) -> Self {
614        Self {
615            peer,
616            peer_id,
617            opening: None,
618            local,
619            permits: None,
620            started: HashMap::new(),
621            inspect: Default::default(),
622            remote_endpoints: None,
623            remote_inspect: Default::default(),
624            metrics,
625        }
626    }
627
628    /// Returns an endpoint from the local set or a BadAcpSeid error if it doesn't exist.
629    fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630        self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631    }
632
633    fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634        self.remote_inspect = self.inspect.create_child("remote_endpoints");
635        for endpoint in endpoints {
636            self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637                node.record_string("endpoint_id", endpoint.local_id().debug());
638                node.record_string("capabilities", endpoint.capabilities().debug());
639                node.record_string("type", endpoint.endpoint_type().debug());
640            });
641        }
642        self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643    }
644
645    /// If the remote endpoints have been set, returns a copy of the endpoints.
646    fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647        self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648    }
649
650    /// If the remote endpoint with endpoint `id` exists, return a copy of the endpoint.
651    fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652        self.remote_endpoints
653            .as_ref()
654            .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655    }
656
657    /// Returns true if there is at least one stream that has started or is starting for this peer.
658    fn is_streaming(&self) -> bool {
659        self.is_streaming_now() || self.opening.is_some()
660    }
661
662    /// Returns true if there is at least one stream in the started state for this peer.
663    fn is_streaming_now(&self) -> bool {
664        self.local.streaming().next().is_some()
665    }
666
667    fn set_opening(
668        &mut self,
669        local_id: &StreamEndpointId,
670        remote_id: &StreamEndpointId,
671        capabilities: Vec<ServiceCapability>,
672    ) -> avdtp::Result<()> {
673        if self.opening.is_some() {
674            return Err(avdtp::Error::InvalidState);
675        }
676        let peer_id = self.peer_id;
677        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678        stream
679            .configure(&peer_id, &remote_id, capabilities)
680            .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681        stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682        self.opening = Some(local_id.clone());
683        Ok(())
684    }
685
686    fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687        weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688    }
689
690    /// Start the stream that is opening, completing the opened procedure.
691    async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692        let (avdtp, stream_pairs) = {
693            let peer = Self::upgrade(weak.clone())?;
694            let peer = peer.lock();
695            let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696                .local
697                .open()
698                .filter_map(|stream| {
699                    let endpoint = stream.endpoint();
700                    endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701                })
702                .collect();
703            (peer.peer.clone(), stream_pairs)
704        };
705        for (local_id, remote_id) in stream_pairs {
706            let permit_result =
707                Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708            if let Ok(permit) = permit_result {
709                Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710                    .await?;
711            }
712        }
713        Ok(())
714    }
715
716    async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717        let local_id = permit.local_id().clone();
718        let (avdtp, remote_id) = {
719            let peer = Self::upgrade(weak.clone())?;
720            let mut peer = peer.lock();
721            let remote_id = peer
722                .get_mut(&local_id)
723                .map_err(|e| avdtp::Error::RequestInvalid(e))?
724                .endpoint()
725                .remote_id()
726                .ok_or(avdtp::Error::InvalidState)?
727                .clone();
728            (peer.peer.clone(), remote_id)
729        };
730        Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731    }
732
733    /// Start a stream for a local reason.  Requires a Permit to start streaming for the local stream.
734    async fn initiated_start(
735        avdtp: avdtp::Peer,
736        weak: Weak<Mutex<Self>>,
737        permit: Option<StreamPermit>,
738        local_id: &StreamEndpointId,
739        remote_id: &StreamEndpointId,
740    ) -> avdtp::Result<()> {
741        trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742        let to_start = std::slice::from_ref(remote_id);
743        avdtp.start(to_start).await?;
744        trace!("Start response received: {permit:?}");
745        let peer = Self::upgrade(weak.clone())?;
746        let (peer_id, start_result) = {
747            let mut peer = peer.lock();
748            (peer.peer_id, peer.start_local_stream(permit, &local_id))
749        };
750        if let Err(e) = start_result {
751            warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752            avdtp.suspend(to_start).await?;
753        }
754        Ok(())
755    }
756
757    /// Suspend a stream locally, returning a future to get the result from the peer.
758    fn suspend(
759        weak: Weak<Mutex<Self>>,
760        local_id: StreamEndpointId,
761    ) -> impl Future<Output = avdtp::Result<()>> {
762        let res = (move || {
763            let peer = Self::upgrade(weak.clone())?;
764            let mut peer = peer.lock();
765            Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766        })();
767        let (avdtp, remote_id) = match res {
768            Err(e) => return futures::future::err(e).left_future(),
769            Ok(r) => r,
770        };
771        let to_suspend = &[remote_id];
772        avdtp.suspend(to_suspend).right_future()
773    }
774
775    /// Finds a stream in the local stream set which is compatible with the remote_id given the codec config.
776    /// Returns the local stream ID and capabilities if found, or OutOfRange if one could not be found.
777    pub fn find_compatible_local_capabilities(
778        &self,
779        codec_params: &ServiceCapability,
780        remote_id: &StreamEndpointId,
781    ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782        let config = codec_params.try_into()?;
783        let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784        debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785        self.local
786            .compatible(config)
787            .find_map(|s| {
788                let endpoint = s.endpoint();
789                if let Some(d) = our_direction {
790                    if &d != endpoint.endpoint_type() {
791                        return None;
792                    }
793                }
794                Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795            })
796            .ok_or(avdtp::Error::OutOfRange)
797    }
798
799    /// Attempts to acquire a permit for streaming, if the permits are set.
800    /// Returns Ok if is is okay to stream, and Err if the permit was not available and a
801    /// reservation was made.
802    fn get_permit_or_reserve(
803        &self,
804        local_id: &StreamEndpointId,
805    ) -> Result<Option<StreamPermit>, ()> {
806        let Some(permits) = self.permits.as_ref() else {
807            return Ok(None);
808        };
809        if let Some(permit) = permits.get(local_id.clone()) {
810            return Ok(Some(permit));
811        }
812        info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813        permits.setup_reservation_for(local_id.clone());
814        Err(())
815    }
816
817    /// Starts the stream which is in the local Streams with `local_id`.
818    /// Requires a permit to stream.
819    fn start_local_stream(
820        &mut self,
821        permit: Option<StreamPermit>,
822        local_id: &StreamEndpointId,
823    ) -> avdtp::Result<()> {
824        let peer_id = self.peer_id;
825        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826        // The streaming permit can be revoked while stream setup is in progress. If so, return
827        // without starting the local stream.
828        if permit.as_ref().is_some_and(|p| !p.is_held()) {
829            return Err(avdtp::Error::Other(anyhow::format_err!(
830                "streaming permit revoked during setup"
831            )));
832        }
833
834        info!(peer_id:%, stream:?; "Starting");
835        let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836        // TODO(https://fxbug.dev/42147239): if streaming stops unexpectedly, send a suspend to match to peer
837        let watched_stream = WatchedStream::new(permit, stream_finished);
838        if self.started.insert(local_id.clone(), watched_stream).is_some() {
839            warn!(peer_id:%, local_id:%; "Stream that was already started");
840        }
841        Ok(())
842    }
843
844    /// Suspend a stream on the local side. Returns the remote StreamEndpointId if the stream was suspended,
845    /// or a RequestInvalid error with the error code otherwise.
846    fn suspend_local_stream(
847        &mut self,
848        local_id: &StreamEndpointId,
849    ) -> avdtp::Result<StreamEndpointId> {
850        let peer_id = self.peer_id;
851        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852        let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853        info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854        stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855        let _ = self.started.remove(local_id);
856        Ok(remote_id)
857    }
858
859    /// Provide a new established L2CAP channel to this remote peer.
860    /// This function should be called whenever the remote associated with this peer opens an
861    /// L2CAP channel after the first.
862    /// Returns true if this channel completed the opening sequence.
863    fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864        let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865        let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866        let done = !stream.endpoint_mut().receive_channel(channel)?;
867        if done {
868            self.opening = None;
869        }
870        info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871        Ok(done)
872    }
873
874    /// Handle a single request event from the avdtp peer.
875    fn handle_request(
876        &mut self,
877        request: avdtp::Request,
878    ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>> + use<>> {
879        use avdtp::ErrorCode;
880        use avdtp::Request::*;
881        trace!("Handling {request:?} from peer..");
882        let immediate_result = 'result: {
883            match request {
884                Discover { responder } => responder.send(&self.local.information()),
885                GetCapabilities { responder, stream_id }
886                | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887                    None => responder.reject(ErrorCode::BadAcpSeid),
888                    Some(stream) => responder.send(stream.endpoint().capabilities()),
889                },
890                Open { responder, stream_id } => {
891                    if self.opening.is_none() {
892                        break 'result responder.reject(ErrorCode::BadState);
893                    }
894                    let Ok(stream) = self.get_mut(&stream_id) else {
895                        break 'result responder.reject(ErrorCode::BadAcpSeid);
896                    };
897                    match stream.endpoint_mut().establish() {
898                        Ok(()) => responder.send(),
899                        Err(_) => responder.reject(ErrorCode::BadState),
900                    }
901                }
902                Close { responder, stream_id } => {
903                    let peer = self.peer.clone();
904                    let Ok(stream) = self.get_mut(&stream_id) else {
905                        break 'result responder.reject(ErrorCode::BadAcpSeid);
906                    };
907                    stream.release(responder, &peer)
908                }
909                SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910                    if self.opening.is_some() {
911                        break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912                    }
913                    let peer_id = self.peer_id;
914                    let Ok(stream) = self.get_mut(&local_stream_id) else {
915                        break 'result responder
916                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917                    };
918                    match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919                        Ok(_) => {
920                            self.opening = Some(local_stream_id);
921                            responder.send()
922                        }
923                        Err((category, code)) => responder.reject(category, code),
924                    }
925                }
926                GetConfiguration { stream_id, responder } => {
927                    let Ok(stream) = self.get_mut(&stream_id) else {
928                        break 'result responder.reject(ErrorCode::BadAcpSeid);
929                    };
930                    let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931                        break 'result responder.reject(ErrorCode::BadState);
932                    };
933                    responder.send(vec_capabilities.as_slice())
934                }
935                Reconfigure { responder, local_stream_id, capabilities } => {
936                    let Ok(stream) = self.get_mut(&local_stream_id) else {
937                        break 'result responder
938                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939                    };
940                    match stream.reconfigure(capabilities) {
941                        Ok(_) => responder.send(),
942                        Err((cat, code)) => responder.reject(cat, code),
943                    }
944                }
945                Start { responder, stream_ids } => {
946                    let mut immediate_suspend = Vec::new();
947                    // Fail on the first failed endpoint, as per the AVDTP spec 8.13 Note 5
948                    let result = stream_ids.into_iter().try_for_each(|seid| {
949                        let Some(stream) = self.local.get_mut(&seid) else {
950                            return Err((seid, ErrorCode::BadAcpSeid));
951                        };
952                        let remote_id = stream.endpoint().remote_id().cloned();
953                        let Some(remote_id) = remote_id else {
954                            return Err((seid, ErrorCode::BadState));
955                        };
956                        let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957                            // Happens when we cannot start because of permits.
958                            // Accept this one, then queue up for suspend.
959                            // We are already reserved for a permit.
960                            immediate_suspend.push(remote_id);
961                            return Ok(());
962                        };
963                        match self.start_local_stream(permit, &seid) {
964                            Ok(()) => Ok(()),
965                            Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966                            Err(_) => Err((seid, ErrorCode::BadState)),
967                        }
968                    });
969                    let response_result = match result {
970                        Ok(()) => responder.send(),
971                        Err((seid, code)) => responder.reject(&seid, code),
972                    };
973                    {
974                        let peer = self.peer.clone();
975                        return Either::Right(async move {
976                            if !immediate_suspend.is_empty() {
977                                peer.suspend(immediate_suspend.as_slice()).await?;
978                            }
979                            response_result
980                        });
981                    }
982                }
983                Suspend { responder, stream_ids } => {
984                    for seid in stream_ids {
985                        match self.suspend_local_stream(&seid) {
986                            Ok(_remote_id) => {}
987                            Err(avdtp::Error::RequestInvalid(code)) => {
988                                break 'result responder.reject(&seid, code);
989                            }
990                            Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991                        }
992                    }
993                    responder.send()
994                }
995                Abort { responder, stream_id } => {
996                    let Ok(stream) = self.get_mut(&stream_id) else {
997                        // No response is sent on an invalid ID for an Abort
998                        break 'result Ok(());
999                    };
1000                    stream.abort();
1001                    self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002                    responder.send()
1003                }
1004                DelayReport { responder, delay, stream_id } => {
1005                    // Delay is in 1/10 ms
1006                    let delay_ns = delay as u64 * 100000;
1007                    // Record delay to cobalt.
1008                    self.metrics.log_integer(
1009                        bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010                        delay_ns.try_into().unwrap_or(-1),
1011                        vec![],
1012                    );
1013                    // Report should only come after a stream is configured
1014                    let Some(stream) = self.local.get_mut(&stream_id) else {
1015                        break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016                    };
1017                    let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018                    let peer = self.peer_id;
1019                    match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020                        Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021                        Err(avdtp::ErrorCode::BadState) => {
1022                            info!(peer:%, stream_id:%; "bad state {delay_str}");
1023                            break 'result responder.reject(avdtp::ErrorCode::BadState);
1024                        }
1025                        Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026                    };
1027                    // Can't really respond with an Error
1028                    responder.send()
1029                }
1030            }
1031        };
1032        Either::Left(immediate_result)
1033    }
1034}
1035
1036/// A WatchedStream holds a task tracking a started stream and ensures actions are performed when
1037/// the stream media task finishes.
1038struct WatchedStream {
1039    _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043    fn new(
1044        permit: Option<StreamPermit>,
1045        finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046    ) -> Self {
1047        let permit_task = fasync::Task::spawn(async move {
1048            let _ = finish_fut.await;
1049            drop(permit);
1050        });
1051        Self { _permit_task: permit_task }
1052    }
1053}
1054
1055fn codectype_to_availability_metric(
1056    codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058    match codec_type {
1059        &MediaCodecType::AUDIO_SBC => {
1060            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061        }
1062        &MediaCodecType::AUDIO_MPEG12 => {
1063            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064        }
1065        &MediaCodecType::AUDIO_AAC => {
1066            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067        }
1068        &MediaCodecType::AUDIO_ATRAC => {
1069            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070        }
1071        &MediaCodecType::AUDIO_NON_A2DP => {
1072            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073        }
1074        _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075    }
1076}
1077
1078fn capability_to_metric(
1079    cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081    match cap {
1082        ServiceCapability::DelayReporting => {
1083            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084        }
1085        ServiceCapability::Reporting => {
1086            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087        }
1088        ServiceCapability::Recovery { .. } => {
1089            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090        }
1091        ServiceCapability::ContentProtection { .. } => {
1092            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093        }
1094        ServiceCapability::HeaderCompression { .. } => {
1095            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096        }
1097        ServiceCapability::Multiplexing { .. } => {
1098            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099        }
1100        // We ignore capabilities that we don't care to track.
1101        other => {
1102            trace!("untracked remote peer capability: {:?}", other);
1103            None
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111
1112    use async_utils::PollExt;
1113    use bt_metrics::respond_to_metrics_req_for_test;
1114    use fidl::endpoints::create_proxy_and_stream;
1115    use fidl_fuchsia_bluetooth::ErrorCode;
1116    use fidl_fuchsia_bluetooth_bredr::{
1117        ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118    };
1119    use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120    use futures::future::Either;
1121    use std::pin::pin;
1122
1123    use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124    use crate::media_types::*;
1125    use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127    fn fake_metrics()
1128    -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129        let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130            fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131        >();
1132        (bt_metrics::MetricsLogger::from_proxy(c), s)
1133    }
1134
1135    fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136        let (remote, signaling) = Channel::create();
1137        let peer = avdtp::Peer::new(signaling);
1138        (peer, remote)
1139    }
1140
1141    fn build_test_streams() -> Streams {
1142        let mut streams = Streams::default();
1143        let source = Stream::build(
1144            make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145            TestMediaTaskBuilder::new_delayable().builder(),
1146        );
1147        streams.insert(source);
1148        let sink = Stream::build(
1149            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150            TestMediaTaskBuilder::new().builder(),
1151        );
1152        streams.insert(sink);
1153        streams
1154    }
1155
1156    fn build_test_streams_delayable() -> Streams {
1157        fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158            StreamEndpoint::new(
1159                seid,
1160                avdtp::MediaType::Audio,
1161                direction,
1162                vec![
1163                    avdtp::ServiceCapability::MediaTransport,
1164                    avdtp::ServiceCapability::DelayReporting,
1165                    sbc_mediacodec_capability(),
1166                ],
1167            )
1168            .expect("endpoint creation should succeed")
1169        }
1170        let mut streams = Streams::default();
1171        let source = Stream::build(
1172            with_delay(1, avdtp::EndpointType::Source),
1173            TestMediaTaskBuilder::new_delayable().builder(),
1174        );
1175        streams.insert(source);
1176        let sink = Stream::build(
1177            with_delay(2, avdtp::EndpointType::Sink),
1178            TestMediaTaskBuilder::new().builder(),
1179        );
1180        streams.insert(sink);
1181        streams
1182    }
1183
1184    #[track_caller]
1185    pub(crate) fn recv_remote(remote: &mut Channel) -> Result<Vec<u8>, zx::Status> {
1186        let fut = remote.next();
1187        match fut.now_or_never() {
1188            Some(Some(res)) => res,
1189            Some(None) => Err(zx::Status::PEER_CLOSED),
1190            None => Err(zx::Status::SHOULD_WAIT),
1191        }
1192    }
1193
1194    /// Creates a Peer object, returning a channel connected ot the remote end, a
1195    /// ProfileRequestStream connected to the profile_proxy, and the Peer object.
1196    fn setup_test_peer(
1197        use_cobalt: bool,
1198        streams: Streams,
1199        permits: Option<Permits>,
1200    ) -> (
1201        Channel,
1202        ProfileRequestStream,
1203        Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1204        Peer,
1205    ) {
1206        let (avdtp, remote) = setup_avdtp_peer();
1207        let (metrics_logger, cobalt_receiver) = if use_cobalt {
1208            let (l, r) = fake_metrics();
1209            (l, Some(r))
1210        } else {
1211            (bt_metrics::MetricsLogger::default(), None)
1212        };
1213        let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1214        let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1215
1216        (remote, requests, cobalt_receiver, peer)
1217    }
1218
1219    fn expect_get_capabilities_and_respond(
1220        remote: &mut Channel,
1221        expected_seid: u8,
1222        response_capabilities: &[u8],
1223    ) {
1224        let received = recv_remote(remote).unwrap();
1225        // Last half of header must be Single (0b00) and Command (0b00)
1226        assert_eq!(0x00, received[0] & 0xF);
1227        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1228        assert_eq!(expected_seid << 2, received[2]);
1229
1230        let txlabel_raw = received[0] & 0xF0;
1231
1232        // Expect a get capabilities and respond.
1233        #[rustfmt::skip]
1234        let mut get_capabilities_rsp = vec![
1235            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1236            0x02 // GetCapabilities
1237        ];
1238
1239        get_capabilities_rsp.extend_from_slice(response_capabilities);
1240
1241        assert!(remote.write(&get_capabilities_rsp).is_ok());
1242    }
1243
1244    fn expect_get_all_capabilities_and_respond(
1245        remote: &mut Channel,
1246        expected_seid: u8,
1247        response_capabilities: &[u8],
1248    ) {
1249        let received = recv_remote(remote).unwrap();
1250        // Last half of header must be Single (0b00) and Command (0b00)
1251        assert_eq!(0x00, received[0] & 0xF);
1252        assert_eq!(0x0C, received[1]); // 0x0C = Get All Capabilities
1253        assert_eq!(expected_seid << 2, received[2]);
1254
1255        let txlabel_raw = received[0] & 0xF0;
1256
1257        // Expect a get capabilities and respond.
1258        #[rustfmt::skip]
1259        let mut get_capabilities_rsp = vec![
1260            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1261            0x0C // GetAllCapabilities
1262        ];
1263
1264        get_capabilities_rsp.extend_from_slice(response_capabilities);
1265
1266        assert!(remote.write(&get_capabilities_rsp).is_ok());
1267    }
1268
1269    #[fuchsia::test]
1270    fn disconnected() {
1271        let mut exec = fasync::TestExecutor::new();
1272        let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1273        let (remote, signaling) = Channel::create();
1274
1275        let id = PeerId(1);
1276
1277        let avdtp = avdtp::Peer::new(signaling);
1278        let peer = Peer::create(
1279            id,
1280            avdtp,
1281            Streams::default(),
1282            None,
1283            proxy,
1284            bt_metrics::MetricsLogger::default(),
1285        );
1286
1287        let closed_fut = peer.closed();
1288
1289        let mut closed_fut = pin!(closed_fut);
1290
1291        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1292
1293        // Close the remote channel
1294        drop(remote);
1295
1296        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1297    }
1298
1299    #[fuchsia::test]
1300    fn peer_collect_capabilities_success() {
1301        let mut exec = fasync::TestExecutor::new();
1302
1303        let (mut remote, _, cobalt_receiver, peer) =
1304            setup_test_peer(true, build_test_streams(), None);
1305
1306        let p: ProfileDescriptor = ProfileDescriptor {
1307            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1308            major_version: Some(1),
1309            minor_version: Some(2),
1310            ..Default::default()
1311        };
1312        let _ = peer.set_descriptor(p);
1313
1314        let collect_future = peer.collect_capabilities();
1315        let mut collect_future = pin!(collect_future);
1316
1317        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1318
1319        // Expect a discover command.
1320        let received = recv_remote(&mut remote).unwrap();
1321        // Last half of header must be Single (0b00) and Command (0b00)
1322        assert_eq!(0x00, received[0] & 0xF);
1323        assert_eq!(0x01, received[1]); // 0x01 = Discover
1324
1325        let txlabel_raw = received[0] & 0xF0;
1326
1327        // Respond with a set of streams.
1328        let response: &[u8] = &[
1329            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1330            0x01,                              // Discover
1331            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1332            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1333            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1334            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1335        ];
1336        assert!(remote.write(response).is_ok());
1337
1338        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1339
1340        // Expect a get capabilities and respond.
1341        #[rustfmt::skip]
1342        let capabilities_rsp = &[
1343            // MediaTransport (Length of Service Capability = 0)
1344            0x01, 0x00,
1345            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x04), Codec specific 0xF09F9296
1346            0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1347        ];
1348        expect_get_capabilities_and_respond(&mut remote, 0x3E, capabilities_rsp);
1349
1350        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1351
1352        // Expect a get capabilities and respond.
1353        #[rustfmt::skip]
1354        let capabilities_rsp = &[
1355            // MediaTransport (Length of Service Capability = 0)
1356            0x01, 0x00,
1357            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1358            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1359        ];
1360        expect_get_capabilities_and_respond(&mut remote, 0x01, capabilities_rsp);
1361
1362        match exec.run_until_stalled(&mut collect_future) {
1363            Poll::Pending => panic!("collect capabilities should be complete"),
1364            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1365            Poll::Ready(Ok(endpoints)) => {
1366                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1367                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1368                for stream in endpoints {
1369                    if stream.local_id() == &first_seid {
1370                        let expected_caps = vec![
1371                            ServiceCapability::MediaTransport,
1372                            ServiceCapability::MediaCodec {
1373                                media_type: avdtp::MediaType::Audio,
1374                                codec_type: avdtp::MediaCodecType::new(0x04),
1375                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1376                            },
1377                        ];
1378                        assert_eq!(&expected_caps, stream.capabilities());
1379                    } else if stream.local_id() == &second_seid {
1380                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1381                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1382                    } else {
1383                        panic!("Unexpected endpoint in the streams collected");
1384                    }
1385                }
1386            }
1387        }
1388
1389        // Collect reported cobalt logs.
1390        let mut recv = cobalt_receiver.expect("should have receiver");
1391        let mut log_events = Vec::new();
1392        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1393            log_events.push(respond_to_metrics_req_for_test(req));
1394        }
1395
1396        // Should have sent two metric events for codec and one for capability.
1397        assert_eq!(3, log_events.len());
1398        assert!(log_events.contains(&MetricEvent {
1399            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1400            event_codes: vec![
1401                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1402            ],
1403            payload: MetricEventPayload::Count(1),
1404        }));
1405        assert!(log_events.contains(&MetricEvent {
1406            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1407            event_codes: vec![
1408                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1409            ],
1410            payload: MetricEventPayload::Count(1),
1411        }));
1412        assert!(log_events.contains(&MetricEvent {
1413            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1414            event_codes: vec![
1415                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1416            ],
1417            payload: MetricEventPayload::Count(1),
1418        }));
1419
1420        // The second time, we don't expect to ask the peer again.
1421        let collect_future = peer.collect_capabilities();
1422        let mut collect_future = pin!(collect_future);
1423
1424        match exec.run_until_stalled(&mut collect_future) {
1425            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1426            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1427        };
1428    }
1429
1430    #[fuchsia::test]
1431    fn peer_collect_all_capabilities_success() {
1432        let mut exec = fasync::TestExecutor::new();
1433
1434        let (mut remote, _, cobalt_receiver, peer) =
1435            setup_test_peer(true, build_test_streams(), None);
1436        let p: ProfileDescriptor = ProfileDescriptor {
1437            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1438            major_version: Some(1),
1439            minor_version: Some(3),
1440            ..Default::default()
1441        };
1442        let _ = peer.set_descriptor(p);
1443
1444        let collect_future = peer.collect_capabilities();
1445        let mut collect_future = pin!(collect_future);
1446
1447        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1448
1449        // Expect a discover command.
1450        let received = recv_remote(&mut remote).unwrap();
1451        // Last half of header must be Single (0b00) and Command (0b00)
1452        assert_eq!(0x00, received[0] & 0xF);
1453        assert_eq!(0x01, received[1]); // 0x01 = Discover
1454
1455        let txlabel_raw = received[0] & 0xF0;
1456
1457        // Respond with a set of streams.
1458        let response: &[u8] = &[
1459            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1460            0x01,                              // Discover
1461            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1462            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1463            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1464            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1465        ];
1466        assert!(remote.write(response).is_ok());
1467
1468        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1469
1470        // Expect a get all capabilities and respond.
1471        #[rustfmt::skip]
1472        let capabilities_rsp = &[
1473            // MediaTransport (Length of Service Capability = 0)
1474            0x01, 0x00,
1475            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x40), Codec specific 0xF09F9296
1476            0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1477            // Delay Reporting (LOSC = 0)
1478            0x08, 0x00
1479        ];
1480        expect_get_all_capabilities_and_respond(&mut remote, 0x3E, capabilities_rsp);
1481
1482        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1483
1484        // Expect a get all capabilities and respond.
1485        #[rustfmt::skip]
1486        let capabilities_rsp = &[
1487            // MediaTransport (Length of Service Capability = 0)
1488            0x01, 0x00,
1489            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1490            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1491        ];
1492        expect_get_all_capabilities_and_respond(&mut remote, 0x01, capabilities_rsp);
1493
1494        match exec.run_until_stalled(&mut collect_future) {
1495            Poll::Pending => panic!("collect capabilities should be complete"),
1496            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1497            Poll::Ready(Ok(endpoints)) => {
1498                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1499                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1500                for stream in endpoints {
1501                    if stream.local_id() == &first_seid {
1502                        let expected_caps = vec![
1503                            ServiceCapability::MediaTransport,
1504                            ServiceCapability::MediaCodec {
1505                                media_type: avdtp::MediaType::Audio,
1506                                codec_type: avdtp::MediaCodecType::new(0x40),
1507                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1508                            },
1509                            ServiceCapability::DelayReporting,
1510                        ];
1511                        assert_eq!(&expected_caps, stream.capabilities());
1512                    } else if stream.local_id() == &second_seid {
1513                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1514                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1515                    } else {
1516                        panic!("Unexpected endpoint in the streams collected");
1517                    }
1518                }
1519            }
1520        }
1521
1522        // Collect reported cobalt logs.
1523        let mut recv = cobalt_receiver.expect("should have receiver");
1524        let mut log_events = Vec::new();
1525        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1526            log_events.push(respond_to_metrics_req_for_test(req));
1527        }
1528
1529        // Should have sent two metric events for codec and two for capability.
1530        assert_eq!(4, log_events.len());
1531        assert!(log_events.contains(&MetricEvent {
1532            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1533            event_codes: vec![
1534                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1535            ],
1536            payload: MetricEventPayload::Count(1),
1537        }));
1538        assert!(log_events.contains(&MetricEvent {
1539            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1540            event_codes: vec![
1541                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1542            ],
1543            payload: MetricEventPayload::Count(1),
1544        }));
1545        assert!(log_events.contains(&MetricEvent {
1546            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1547            event_codes: vec![
1548                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1549            ],
1550            payload: MetricEventPayload::Count(1),
1551        }));
1552        assert!(log_events.contains(&MetricEvent {
1553            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1554            event_codes: vec![
1555                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1556            ],
1557            payload: MetricEventPayload::Count(1),
1558        }));
1559
1560        // The second time, we don't expect to ask the peer again.
1561        let collect_future = peer.collect_capabilities();
1562        let mut collect_future = pin!(collect_future);
1563
1564        match exec.run_until_stalled(&mut collect_future) {
1565            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1566            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1567        };
1568    }
1569
1570    #[fuchsia::test]
1571    fn peer_collect_capabilities_discovery_fails() {
1572        let mut exec = fasync::TestExecutor::new();
1573
1574        let (mut remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1575
1576        let collect_future = peer.collect_capabilities();
1577        let mut collect_future = pin!(collect_future);
1578
1579        // Shouldn't finish yet.
1580        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1581
1582        // Expect a discover command.
1583        let received = recv_remote(&mut remote).unwrap();
1584        // Last half of header must be Single (0b00) and Command (0b00)
1585        assert_eq!(0x00, received[0] & 0xF);
1586        assert_eq!(0x01, received[1]); // 0x01 = Discover
1587
1588        let txlabel_raw = received[0] & 0xF0;
1589
1590        // Respond with an error.
1591        let response: &[u8] = &[
1592            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1593            0x01,                         // Discover
1594            0x31,                         // BAD_STATE
1595        ];
1596        assert!(remote.write(response).is_ok());
1597
1598        // Should be done with an error.
1599        // Should finish!
1600        match exec.run_until_stalled(&mut collect_future) {
1601            Poll::Pending => panic!("Should be ready after discovery failure"),
1602            Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1603            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1604                assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1605            }
1606            Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1607        }
1608    }
1609
1610    #[fuchsia::test]
1611    fn peer_collect_capabilities_get_capability_fails() {
1612        let mut exec = fasync::TestExecutor::new();
1613
1614        let (mut remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1615
1616        let collect_future = peer.collect_capabilities();
1617        let mut collect_future = pin!(collect_future);
1618
1619        // Shouldn't finish yet.
1620        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1621
1622        // Expect a discover command.
1623        let received = recv_remote(&mut remote).unwrap();
1624        // Last half of header must be Single (0b00) and Command (0b00)
1625        assert_eq!(0x00, received[0] & 0xF);
1626        assert_eq!(0x01, received[1]); // 0x01 = Discover
1627
1628        let txlabel_raw = received[0] & 0xF0;
1629
1630        // Respond with a set of streams.
1631        let response: &[u8] = &[
1632            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1633            0x01,                              // Discover
1634            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1635            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1636            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1637            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1638        ];
1639        assert!(remote.write(response).is_ok());
1640
1641        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1642
1643        // Expect a get capabilities request
1644        let expected_seid = 0x3E;
1645        let received = recv_remote(&mut remote).unwrap();
1646        // Last half of header must be Single (0b00) and Command (0b00)
1647        assert_eq!(0x00, received[0] & 0xF);
1648        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1649        assert_eq!(expected_seid << 2, received[2]);
1650
1651        let txlabel_raw = received[0] & 0xF0;
1652
1653        let response: &[u8] = &[
1654            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1655            0x02,                         // Get Capabilities
1656            0x12,                         // BAD_ACP_SEID
1657        ];
1658        assert!(remote.write(response).is_ok());
1659
1660        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1661
1662        // Expect a get capabilities request (skipped the last one)
1663        let expected_seid = 0x01;
1664        let received = recv_remote(&mut remote).unwrap();
1665        // Last half of header must be Single (0b00) and Command (0b00)
1666        assert_eq!(0x00, received[0] & 0xF);
1667        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1668        assert_eq!(expected_seid << 2, received[2]);
1669
1670        let txlabel_raw = received[0] & 0xF0;
1671
1672        let response: &[u8] = &[
1673            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1674            0x02,                         // Get Capabilities
1675            0x12,                         // BAD_ACP_SEID
1676        ];
1677        assert!(remote.write(response).is_ok());
1678
1679        // Should be done without an error, but with no streams.
1680        match exec.run_until_stalled(&mut collect_future) {
1681            Poll::Pending => panic!("Should be ready after discovery failure"),
1682            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1683            Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1684        }
1685    }
1686
1687    fn receive_simple_accept(remote: &mut Channel, signal_id: u8) {
1688        let received = recv_remote(remote).expect("expected a packet");
1689        // Last half of header must be Single (0b00) and Command (0b00)
1690        assert_eq!(0x00, received[0] & 0xF);
1691        assert_eq!(signal_id, received[1]);
1692
1693        let txlabel_raw = received[0] & 0xF0;
1694
1695        let response: &[u8] = &[
1696            txlabel_raw | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1697            signal_id,
1698        ];
1699        assert!(remote.write(response).is_ok());
1700    }
1701
1702    #[fuchsia::test]
1703    fn peer_stream_start_success() {
1704        let mut exec = fasync::TestExecutor::new();
1705
1706        let (mut remote, mut profile_request_stream, _, peer) =
1707            setup_test_peer(false, build_test_streams(), None);
1708
1709        let remote_seid = 2_u8.try_into().unwrap();
1710
1711        let codec_params = ServiceCapability::MediaCodec {
1712            media_type: avdtp::MediaType::Audio,
1713            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1714            codec_extra: vec![0x11, 0x45, 51, 51],
1715        };
1716
1717        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1718        let mut start_future = pin!(start_future);
1719
1720        match exec.run_until_stalled(&mut start_future) {
1721            Poll::Pending => {}
1722            x => panic!("Expected pending, but got {x:?}"),
1723        };
1724
1725        receive_simple_accept(&mut remote, 0x03); // Set Configuration
1726
1727        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1728
1729        receive_simple_accept(&mut remote, 0x06); // Open
1730
1731        match exec.run_until_stalled(&mut start_future) {
1732            Poll::Pending => {}
1733            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1734            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1735        };
1736
1737        // Should connect the media channel after open.
1738        let (_, transport) = Channel::create();
1739
1740        let request = exec.run_until_stalled(&mut profile_request_stream.next());
1741        match request {
1742            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1743                assert_eq!(PeerId(1), peer_id.into());
1744                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1745                let channel = transport.try_into().unwrap();
1746                responder.send(Ok(channel)).expect("responder sends");
1747            }
1748            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1749        };
1750
1751        match exec.run_until_stalled(&mut start_future) {
1752            Poll::Pending => {}
1753            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1754            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1755        };
1756
1757        receive_simple_accept(&mut remote, 0x07); // Start
1758
1759        // Should return the media stream (which should be connected)
1760        // Should be done without an error, but with no streams.
1761        match exec.run_until_stalled(&mut start_future) {
1762            Poll::Pending => panic!("Should be ready after start succeeds"),
1763            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1764            // TODO: confirm the stream is usable
1765            Poll::Ready(Ok(())) => {
1766                assert!(peer.is_streaming_now());
1767            }
1768        }
1769    }
1770
1771    #[fuchsia::test]
1772    fn peer_stream_start_picks_correct_direction() {
1773        let mut exec = fasync::TestExecutor::new();
1774
1775        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1776        let remote = avdtp::Peer::new(remote);
1777        let mut remote_events = remote.take_request_stream();
1778
1779        // Respond as if we have a single SBC Source Stream
1780        fn remote_handle_request(req: avdtp::Request) {
1781            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1782            let res = match req {
1783                avdtp::Request::Discover { responder } => {
1784                    let infos = [avdtp::StreamInformation::new(
1785                        expected_stream_id,
1786                        false,
1787                        avdtp::MediaType::Audio,
1788                        avdtp::EndpointType::Source,
1789                    )];
1790                    responder.send(&infos)
1791                }
1792                avdtp::Request::GetAllCapabilities { stream_id, responder }
1793                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1794                    assert_eq!(expected_stream_id, stream_id);
1795                    let caps = vec![
1796                        ServiceCapability::MediaTransport,
1797                        ServiceCapability::MediaCodec {
1798                            media_type: avdtp::MediaType::Audio,
1799                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1800                            codec_extra: vec![0x11, 0x45, 51, 250],
1801                        },
1802                    ];
1803                    responder.send(&caps[..])
1804                }
1805                avdtp::Request::Open { responder, stream_id } => {
1806                    assert_eq!(expected_stream_id, stream_id);
1807                    responder.send()
1808                }
1809                avdtp::Request::SetConfiguration {
1810                    responder,
1811                    local_stream_id,
1812                    remote_stream_id,
1813                    ..
1814                } => {
1815                    assert_eq!(local_stream_id, expected_stream_id);
1816                    // This is the "sink" local stream id.
1817                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1818                    responder.send()
1819                }
1820                x => panic!("Unexpected request: {:?}", x),
1821            };
1822            res.expect("should be able to respond");
1823        }
1824
1825        // Need to discover the remote streams first, or the stream start will not work.
1826        let collect_capabilities_fut = peer.collect_capabilities();
1827        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1828
1829        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1830
1831        let request = exec.run_singlethreaded(&mut remote_events.next());
1832        remote_handle_request(request.expect("should have a discovery request").unwrap());
1833
1834        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1835        let request = exec.run_singlethreaded(&mut remote_events.next());
1836        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1837
1838        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1839
1840        // Try to start the stream.  It should continue to configure and connect.
1841        let remote_seid = 4_u8.try_into().unwrap();
1842
1843        let codec_params = ServiceCapability::MediaCodec {
1844            media_type: avdtp::MediaType::Audio,
1845            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1846            codec_extra: vec![0x11, 0x45, 51, 51],
1847        };
1848        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1849        let mut start_future = pin!(start_future);
1850
1851        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1852        let request = exec.run_singlethreaded(&mut remote_events.next());
1853        remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1854
1855        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1856        let request = exec.run_singlethreaded(&mut remote_events.next());
1857        remote_handle_request(request.expect("should have an open request").unwrap());
1858    }
1859
1860    #[fuchsia::test]
1861    fn peer_stream_start_strips_unsupported_local_capabilities() {
1862        let mut exec = fasync::TestExecutor::new();
1863
1864        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1865        let remote = avdtp::Peer::new(remote);
1866        let mut remote_events = remote.take_request_stream();
1867
1868        // Respond as if we have a single SBC Source Stream
1869        fn remote_handle_request(req: avdtp::Request) {
1870            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1871            let res = match req {
1872                avdtp::Request::Discover { responder } => {
1873                    let infos = [avdtp::StreamInformation::new(
1874                        expected_stream_id,
1875                        false,
1876                        avdtp::MediaType::Audio,
1877                        avdtp::EndpointType::Source,
1878                    )];
1879                    responder.send(&infos)
1880                }
1881                avdtp::Request::GetAllCapabilities { stream_id, responder }
1882                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1883                    assert_eq!(expected_stream_id, stream_id);
1884                    let caps = vec![
1885                        ServiceCapability::MediaTransport,
1886                        // We don't have a local delay-reporting, so this shouldn't be requested.
1887                        ServiceCapability::DelayReporting,
1888                        ServiceCapability::MediaCodec {
1889                            media_type: avdtp::MediaType::Audio,
1890                            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1891                            codec_extra: vec![128, 0, 132, 134, 0, 0],
1892                        },
1893                    ];
1894                    responder.send(&caps[..])
1895                }
1896                avdtp::Request::Open { responder, stream_id } => {
1897                    assert_eq!(expected_stream_id, stream_id);
1898                    responder.send()
1899                }
1900                avdtp::Request::SetConfiguration {
1901                    responder,
1902                    local_stream_id,
1903                    remote_stream_id,
1904                    capabilities,
1905                } => {
1906                    assert_eq!(local_stream_id, expected_stream_id);
1907                    // This is the "sink" local stream id.
1908                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1909                    // Make sure we didn't request a DelayReport since the local Sink doesn't
1910                    // support it.
1911                    assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1912                    responder.send()
1913                }
1914                x => panic!("Unexpected request: {:?}", x),
1915            };
1916            res.expect("should be able to respond");
1917        }
1918
1919        // Need to discover the remote streams first, or the stream start will not work.
1920        let collect_capabilities_fut = peer.collect_capabilities();
1921        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1922
1923        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1924
1925        let request = exec.run_singlethreaded(&mut remote_events.next());
1926        remote_handle_request(request.expect("should have a discovery request").unwrap());
1927
1928        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1929        let request = exec.run_singlethreaded(&mut remote_events.next());
1930        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1931
1932        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1933
1934        // Try to start the stream.  It should continue to configure and connect.
1935        let remote_seid = 4_u8.try_into().unwrap();
1936
1937        let codec_params = ServiceCapability::MediaCodec {
1938            media_type: avdtp::MediaType::Audio,
1939            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1940            codec_extra: vec![0x11, 0x45, 51, 51],
1941        };
1942        let start_future =
1943            peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1944        let mut start_future = pin!(start_future);
1945
1946        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1947        let request = exec.run_singlethreaded(&mut remote_events.next());
1948        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1949
1950        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1951        let request = exec.run_singlethreaded(&mut remote_events.next());
1952        remote_handle_request(request.expect("should have an open request").unwrap());
1953    }
1954
1955    #[fuchsia::test]
1956    fn peer_stream_start_orders_local_capabilities() {
1957        let mut exec = fasync::TestExecutor::new();
1958
1959        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1960        let remote = avdtp::Peer::new(remote);
1961        let mut remote_events = remote.take_request_stream();
1962
1963        // Respond as if we have a single SBC Source Stream
1964        fn remote_handle_request(req: avdtp::Request) {
1965            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1966            let res = match req {
1967                avdtp::Request::Discover { responder } => {
1968                    let infos = [avdtp::StreamInformation::new(
1969                        expected_stream_id,
1970                        false,
1971                        avdtp::MediaType::Audio,
1972                        avdtp::EndpointType::Source,
1973                    )];
1974                    responder.send(&infos)
1975                }
1976                avdtp::Request::GetAllCapabilities { stream_id, responder }
1977                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1978                    assert_eq!(expected_stream_id, stream_id);
1979                    let caps = &[
1980                        ServiceCapability::MediaTransport,
1981                        ServiceCapability::MediaCodec {
1982                            media_type: avdtp::MediaType::Audio,
1983                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1984                            codec_extra: vec![0x11, 0x45, 51, 250],
1985                        },
1986                        ServiceCapability::DelayReporting,
1987                    ];
1988                    responder.send(caps)
1989                }
1990                avdtp::Request::Open { responder, stream_id } => {
1991                    assert_eq!(expected_stream_id, stream_id);
1992                    responder.send()
1993                }
1994                avdtp::Request::SetConfiguration {
1995                    responder,
1996                    local_stream_id,
1997                    remote_stream_id,
1998                    capabilities,
1999                } => {
2000                    assert_eq!(local_stream_id, expected_stream_id);
2001                    // This is the "sink" local stream id.
2002                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
2003                    // The capabilities should be in order.
2004                    let mut capabilities_ordered = capabilities.clone();
2005                    capabilities_ordered.sort_by_key(ServiceCapability::category);
2006                    assert_eq!(capabilities, capabilities_ordered);
2007                    responder.send()
2008                }
2009                x => panic!("Unexpected request: {:?}", x),
2010            };
2011            res.expect("should be able to respond");
2012        }
2013
2014        // Need to discover the remote streams first, or the stream start will not work.
2015        let collect_capabilities_fut = peer.collect_capabilities();
2016        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2017
2018        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2019
2020        let request = exec.run_singlethreaded(&mut remote_events.next());
2021        remote_handle_request(request.expect("should have a discovery request").unwrap());
2022
2023        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2024        let request = exec.run_singlethreaded(&mut remote_events.next());
2025        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2026
2027        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2028
2029        // Try to start the stream.  It should continue to configure and connect.
2030        let remote_seid = 4_u8.try_into().unwrap();
2031
2032        let codec_params = ServiceCapability::MediaCodec {
2033            media_type: avdtp::MediaType::Audio,
2034            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2035            codec_extra: vec![0x11, 0x45, 51, 51],
2036        };
2037        let start_future = peer.stream_start(
2038            remote_seid,
2039            vec![
2040                ServiceCapability::MediaTransport,
2041                ServiceCapability::DelayReporting,
2042                codec_params,
2043            ],
2044        );
2045        let mut start_future = pin!(start_future);
2046
2047        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2048        let request = exec.run_singlethreaded(&mut remote_events.next());
2049        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2050
2051        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2052        let request = exec.run_singlethreaded(&mut remote_events.next());
2053        remote_handle_request(request.expect("should have an open request").unwrap());
2054    }
2055
2056    /// Tests that A2DP streaming does not start if the streaming permit is revoked during streaming
2057    /// setup.
2058    #[fuchsia::test]
2059    fn peer_stream_start_permit_revoked() {
2060        let mut exec = fasync::TestExecutor::new();
2061
2062        let test_permits = Permits::new(1);
2063        let (mut remote, mut profile_request_stream, _, peer) =
2064            setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2065
2066        let remote_seid = 2_u8.try_into().unwrap();
2067
2068        let codec_params = ServiceCapability::MediaCodec {
2069            media_type: avdtp::MediaType::Audio,
2070            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2071            codec_extra: vec![0x11, 0x45, 51, 51],
2072        };
2073
2074        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2075        let mut start_future = pin!(start_future);
2076
2077        let _ = exec
2078            .run_until_stalled(&mut start_future)
2079            .expect_pending("waiting for set config response");
2080        receive_simple_accept(&mut remote, 0x03); // Set Configuration
2081        exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2082        receive_simple_accept(&mut remote, 0x06); // Open
2083        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2084        assert!(!peer.is_streaming_now());
2085
2086        // Should connect the media channel after open.
2087        let (_, transport) = Channel::create();
2088
2089        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2090        match request {
2091            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2092                assert_eq!(PeerId(1), peer_id.into());
2093                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2094                let channel = transport.try_into().unwrap();
2095                responder.send(Ok(channel)).expect("responder sends");
2096            }
2097            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2098        };
2099
2100        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2101        assert!(!peer.is_streaming_now());
2102
2103        // Before peer responds to start, the permit gets taken.
2104        let seized_permits = test_permits.seize();
2105        assert_eq!(seized_permits.len(), 1);
2106        receive_simple_accept(&mut remote, 0x07); // Start
2107
2108        // Streaming should not locally begin because there is no available permit. The Start
2109        // response is handled gracefully.
2110        exec.run_until_stalled(&mut start_future)
2111            .expect_pending("waiting to send outgoing suspend");
2112        assert!(!peer.is_streaming_now());
2113        // We should issue an outgoing suspend request to synchronize state with the remote peer.
2114        receive_simple_accept(&mut remote, 0x09); // Suspend
2115
2116        // The start future should resolve without Error, and A2DP should not have started
2117        // streaming.
2118        let () = exec
2119            .run_until_stalled(&mut start_future)
2120            .expect("start finished")
2121            .expect("suspended stream is ok");
2122        assert!(!peer.is_streaming_now());
2123    }
2124
2125    #[fuchsia::test]
2126    fn peer_stream_start_fails_wrong_direction() {
2127        let mut exec = fasync::TestExecutor::new();
2128
2129        // Setup peers with only one Source Stream.
2130        let mut streams = Streams::default();
2131        let source = Stream::build(
2132            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2133            TestMediaTaskBuilder::new().builder(),
2134        );
2135        streams.insert(source);
2136
2137        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2138        let remote = avdtp::Peer::new(remote);
2139        let mut remote_events = remote.take_request_stream();
2140
2141        // Respond as if we have a single SBC Source Stream
2142        fn remote_handle_request(req: avdtp::Request) {
2143            let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2144            let res = match req {
2145                avdtp::Request::Discover { responder } => {
2146                    let infos = [avdtp::StreamInformation::new(
2147                        expected_stream_id,
2148                        false,
2149                        avdtp::MediaType::Audio,
2150                        avdtp::EndpointType::Source,
2151                    )];
2152                    responder.send(&infos)
2153                }
2154                avdtp::Request::GetAllCapabilities { stream_id, responder }
2155                | avdtp::Request::GetCapabilities { stream_id, responder } => {
2156                    assert_eq!(expected_stream_id, stream_id);
2157                    let caps = vec![
2158                        ServiceCapability::MediaTransport,
2159                        ServiceCapability::MediaCodec {
2160                            media_type: avdtp::MediaType::Audio,
2161                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2162                            codec_extra: vec![0x11, 0x45, 51, 250],
2163                        },
2164                    ];
2165                    responder.send(&caps[..])
2166                }
2167                avdtp::Request::Open { responder, .. } => responder.send(),
2168                avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2169                x => panic!("Unexpected request: {:?}", x),
2170            };
2171            res.expect("should be able to respond");
2172        }
2173
2174        // Need to discover the remote streams first, or the stream start will always work.
2175        let collect_capabilities_fut = peer.collect_capabilities();
2176        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2177
2178        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2179
2180        let request = exec.run_singlethreaded(&mut remote_events.next());
2181        remote_handle_request(request.expect("should have a discovery request").unwrap());
2182
2183        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2184        let request = exec.run_singlethreaded(&mut remote_events.next());
2185        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2186
2187        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2188
2189        // Try to start the stream.  It should fail with OutOfRange because we can't connect a Source to a Source.
2190        let remote_seid = 2_u8.try_into().unwrap();
2191
2192        let codec_params = ServiceCapability::MediaCodec {
2193            media_type: avdtp::MediaType::Audio,
2194            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2195            codec_extra: vec![0x11, 0x45, 51, 51],
2196        };
2197        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2198        let mut start_future = pin!(start_future);
2199
2200        match exec.run_until_stalled(&mut start_future) {
2201            Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2202            x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2203        };
2204    }
2205
2206    #[fuchsia::test]
2207    fn peer_stream_start_fails_to_connect() {
2208        let mut exec = fasync::TestExecutor::new();
2209
2210        let (mut remote, mut profile_request_stream, _, peer) =
2211            setup_test_peer(false, build_test_streams(), None);
2212
2213        let remote_seid = 2_u8.try_into().unwrap();
2214
2215        let codec_params = ServiceCapability::MediaCodec {
2216            media_type: avdtp::MediaType::Audio,
2217            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2218            codec_extra: vec![0x11, 0x45, 51, 51],
2219        };
2220
2221        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2222        let mut start_future = pin!(start_future);
2223
2224        match exec.run_until_stalled(&mut start_future) {
2225            Poll::Pending => {}
2226            x => panic!("was expecting pending but got {x:?}"),
2227        };
2228
2229        receive_simple_accept(&mut remote, 0x03); // Set Configuration
2230
2231        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2232
2233        receive_simple_accept(&mut remote, 0x06); // Open
2234
2235        match exec.run_until_stalled(&mut start_future) {
2236            Poll::Pending => {}
2237            Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2238        };
2239
2240        // Should connect the media channel after open.
2241        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2242        match request {
2243            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2244                assert_eq!(PeerId(1), peer_id.into());
2245                responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2246            }
2247            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2248        };
2249
2250        // Should return an error.
2251        // Should be done without an error, but with no streams.
2252        match exec.run_until_stalled(&mut start_future) {
2253            Poll::Pending => panic!("Should be ready after start fails"),
2254            Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2255            Poll::Ready(Err(_)) => {}
2256        }
2257    }
2258
2259    /// Test that the delay reports get acknowledged and they are sent to cobalt.
2260    #[fuchsia::test]
2261    async fn peer_delay_report() {
2262        let (remote, _profile_requests, cobalt_recv, peer) =
2263            setup_test_peer(true, build_test_streams(), None);
2264        let remote_peer = avdtp::Peer::new(remote);
2265        let mut remote_events = remote_peer.take_request_stream();
2266
2267        // Respond as if we have a single SBC Sink Stream
2268        async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2269            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2270            // "peer" in this case is the test code Peer stream
2271            let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2272            use avdtp::Request::*;
2273            match req {
2274                Discover { responder } => {
2275                    let infos = [avdtp::StreamInformation::new(
2276                        expected_stream_id,
2277                        false,
2278                        avdtp::MediaType::Audio,
2279                        avdtp::EndpointType::Sink,
2280                    )];
2281                    responder.send(&infos).expect("response should succeed");
2282                }
2283                GetAllCapabilities { stream_id, responder }
2284                | GetCapabilities { stream_id, responder } => {
2285                    assert_eq!(expected_stream_id, stream_id);
2286                    let caps = vec![
2287                        ServiceCapability::MediaTransport,
2288                        ServiceCapability::MediaCodec {
2289                            media_type: avdtp::MediaType::Audio,
2290                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2291                            codec_extra: vec![0x11, 0x45, 51, 250],
2292                        },
2293                    ];
2294                    responder.send(&caps[..]).expect("response should succeed");
2295                    // Sending a delayreport before the stream is configured is not allowed, it's a
2296                    // bad state.
2297                    assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2298                }
2299                Open { responder, stream_id } => {
2300                    // Configuration has happened but open not succeeded yet, send delay reports.
2301                    assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2302                    // Send a delay report to the peer.
2303                    peer.delay_report(&expected_peer_stream_id, 0xc0de)
2304                        .await
2305                        .expect("should get acked correctly");
2306                    assert_eq!(expected_stream_id, stream_id);
2307                    responder.send().expect("response should succeed");
2308                }
2309                SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2310                    assert_eq!(local_stream_id, expected_stream_id);
2311                    assert_eq!(remote_stream_id, expected_peer_stream_id);
2312                    responder.send().expect("should send back response without issue");
2313                }
2314                x => panic!("Unexpected request: {:?}", x),
2315            };
2316        }
2317
2318        let collect_fut = pin!(peer.collect_capabilities());
2319
2320        // Discover then a GetCapabilities.
2321        let Either::Left((request, collect_fut)) =
2322            futures::future::select(remote_events.next(), collect_fut).await
2323        else {
2324            panic!("Collect future shouldn't finish first");
2325        };
2326        let collect_fut = pin!(collect_fut);
2327        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2328        let Either::Left((request, collect_fut)) =
2329            futures::future::select(remote_events.next(), collect_fut).await
2330        else {
2331            panic!("Collect future shouldn't finish first");
2332        };
2333        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2334
2335        // Collect future should be able to finish now.
2336        assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2337
2338        // Try to start the stream.  It should go through the normal motions,
2339        let remote_seid = 4_u8.try_into().unwrap();
2340
2341        let codec_params = ServiceCapability::MediaCodec {
2342            media_type: avdtp::MediaType::Audio,
2343            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2344            codec_extra: vec![0x11, 0x45, 51, 51],
2345        };
2346
2347        // We don't expect this task to finish before being dropped, since we never respond to the
2348        // request to open the transport channel.
2349        let _start_task = fasync::Task::spawn(async move {
2350            let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2351            panic!("stream start task finished");
2352        });
2353
2354        let request = remote_events.next().await.expect("should have set_config").unwrap();
2355        remote_handle_request(request, &remote_peer).await;
2356
2357        let request = remote_events.next().await.expect("should have open").unwrap();
2358        remote_handle_request(request, &remote_peer).await;
2359
2360        let mut cobalt = cobalt_recv.expect("should have receiver");
2361
2362        let mut got_ids = HashMap::new();
2363        let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2364        while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2365            let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2366            let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2367            // All the delay reports should report the same value correctly.
2368            if report.metric_id == delay_metric_id {
2369                assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2370            }
2371        }
2372        assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2373        assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2374        assert!(got_ids.contains_key(&delay_metric_id));
2375        // There should have been three reports.
2376        // We report the delay amount even if it fails to work.
2377        assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2378    }
2379
2380    fn sbc_capabilities() -> Vec<ServiceCapability> {
2381        let sbc_codec_info = SbcCodecInfo::new(
2382            SbcSamplingFrequency::FREQ48000HZ,
2383            SbcChannelMode::JOINT_STEREO,
2384            SbcBlockCount::SIXTEEN,
2385            SbcSubBands::EIGHT,
2386            SbcAllocation::LOUDNESS,
2387            /* min_bpv= */ 53,
2388            /* max_bpv= */ 53,
2389        )
2390        .expect("sbc codec info");
2391
2392        vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2393    }
2394
2395    /// Test that the remote end can configure and start a stream.
2396    #[fuchsia::test]
2397    fn peer_as_acceptor() {
2398        let mut exec = fasync::TestExecutor::new();
2399
2400        let mut streams = Streams::default();
2401        let mut test_builder = TestMediaTaskBuilder::new();
2402        streams.insert(Stream::build(
2403            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2404            test_builder.builder(),
2405        ));
2406
2407        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2408        let remote_peer = avdtp::Peer::new(remote);
2409
2410        let discover_fut = remote_peer.discover();
2411        let mut discover_fut = pin!(discover_fut);
2412
2413        let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2414        match exec.run_until_stalled(&mut discover_fut) {
2415            Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2416            x => panic!("Expected discovery to complete and got {:?}", x),
2417        };
2418
2419        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2420        let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2421
2422        let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2423        let mut get_caps_fut = pin!(get_caps_fut);
2424
2425        match exec.run_until_stalled(&mut get_caps_fut) {
2426            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2427            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2428            x => panic!("Get capabilities should be ready but got {:?}", x),
2429        };
2430
2431        let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2432        let mut get_caps_fut = pin!(get_caps_fut);
2433
2434        match exec.run_until_stalled(&mut get_caps_fut) {
2435            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2436                assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2437            }
2438            x => panic!("Get capabilities should be a ready error but got {:?}", x),
2439        };
2440
2441        let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2442        let mut get_caps_fut = pin!(get_caps_fut);
2443
2444        match exec.run_until_stalled(&mut get_caps_fut) {
2445            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2446            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2447            x => panic!("Get capabilities should be ready but got {:?}", x),
2448        };
2449
2450        let sbc_caps = sbc_capabilities();
2451        let set_config_fut =
2452            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2453        let mut set_config_fut = pin!(set_config_fut);
2454
2455        match exec.run_until_stalled(&mut set_config_fut) {
2456            Poll::Ready(Ok(())) => {}
2457            x => panic!("Set capabilities should be ready but got {:?}", x),
2458        };
2459
2460        let open_fut = remote_peer.open(&sbc_endpoint_id);
2461        let mut open_fut = pin!(open_fut);
2462        match exec.run_until_stalled(&mut open_fut) {
2463            Poll::Ready(Ok(())) => {}
2464            x => panic!("Open should be ready but got {:?}", x),
2465        };
2466
2467        // Establish a media transport stream
2468        let (_remote_transport, transport) = Channel::create();
2469
2470        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2471
2472        let stream_ids = vec![sbc_endpoint_id.clone()];
2473        let start_fut = remote_peer.start(&stream_ids);
2474        let mut start_fut = pin!(start_fut);
2475        match exec.run_until_stalled(&mut start_fut) {
2476            Poll::Ready(Ok(())) => {}
2477            x => panic!("Start should be ready but got {:?}", x),
2478        };
2479
2480        // The task should be created locally and started.
2481        let media_task = test_builder.expect_task();
2482        assert!(media_task.is_started());
2483
2484        let suspend_fut = remote_peer.suspend(&stream_ids);
2485        let mut suspend_fut = pin!(suspend_fut);
2486        match exec.run_until_stalled(&mut suspend_fut) {
2487            Poll::Ready(Ok(())) => {}
2488            x => panic!("Start should be ready but got {:?}", x),
2489        };
2490
2491        // Should have stopped the media task on suspend.
2492        assert!(!media_task.is_started());
2493    }
2494
2495    #[fuchsia::test]
2496    fn peer_set_config_reject_first() {
2497        let mut exec = fasync::TestExecutor::new();
2498
2499        let mut streams = Streams::default();
2500        let test_builder = TestMediaTaskBuilder::new();
2501        streams.insert(Stream::build(
2502            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2503            test_builder.builder(),
2504        ));
2505
2506        let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2507        let remote_peer = avdtp::Peer::new(remote);
2508
2509        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2510
2511        let wrong_freq_sbc = &[SbcCodecInfo::new(
2512            SbcSamplingFrequency::FREQ44100HZ, // 44.1 is not supported by the caps from above.
2513            SbcChannelMode::JOINT_STEREO,
2514            SbcBlockCount::SIXTEEN,
2515            SbcSubBands::EIGHT,
2516            SbcAllocation::LOUDNESS,
2517            /* min_bpv= */ 53,
2518            /* max_bpv= */ 53,
2519        )
2520        .expect("sbc codec info")
2521        .into()];
2522
2523        let set_config_fut =
2524            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2525        let mut set_config_fut = pin!(set_config_fut);
2526
2527        match exec.run_until_stalled(&mut set_config_fut) {
2528            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2529                assert!(e.service_category().is_some())
2530            }
2531            x => panic!("Set capabilities should have been rejected but got {:?}", x),
2532        };
2533
2534        let sbc_caps = sbc_capabilities();
2535        let set_config_fut =
2536            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2537        let mut set_config_fut = pin!(set_config_fut);
2538
2539        match exec.run_until_stalled(&mut set_config_fut) {
2540            Poll::Ready(Ok(())) => {}
2541            x => panic!("Set capabilities should be ready but got {:?}", x),
2542        };
2543    }
2544
2545    #[fuchsia::test]
2546    fn peer_starts_waiting_streams() {
2547        let mut exec = fasync::TestExecutor::new_with_fake_time();
2548        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2549
2550        let mut streams = Streams::default();
2551        let mut test_builder = TestMediaTaskBuilder::new();
2552        streams.insert(Stream::build(
2553            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2554            test_builder.builder(),
2555        ));
2556
2557        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2558        let remote_peer = avdtp::Peer::new(remote);
2559
2560        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2561
2562        let sbc_caps = sbc_capabilities();
2563        let set_config_fut =
2564            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2565        let mut set_config_fut = pin!(set_config_fut);
2566
2567        match exec.run_until_stalled(&mut set_config_fut) {
2568            Poll::Ready(Ok(())) => {}
2569            x => panic!("Set capabilities should be ready but got {:?}", x),
2570        };
2571
2572        let open_fut = remote_peer.open(&sbc_endpoint_id);
2573        let mut open_fut = pin!(open_fut);
2574        match exec.run_until_stalled(&mut open_fut) {
2575            Poll::Ready(Ok(())) => {}
2576            x => panic!("Open should be ready but got {:?}", x),
2577        };
2578
2579        // Establish a media transport stream
2580        let (_remote_transport, transport) = Channel::create();
2581        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2582
2583        // The remote end should get a start request after the timeout.
2584        let mut remote_requests = remote_peer.take_request_stream();
2585        let next_remote_request_fut = remote_requests.next();
2586        let mut next_remote_request_fut = pin!(next_remote_request_fut);
2587
2588        // Nothing should happen immediately.
2589        assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2590
2591        // After the timeout has passed..
2592        exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2593        let _ = exec.wake_expired_timers();
2594
2595        let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2596            Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2597                responder.send().unwrap();
2598                stream_ids
2599            }
2600            x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2601        };
2602
2603        // We should start the media task, so the task should be created locally
2604        let media_task =
2605            exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2606        assert!(media_task.is_started());
2607
2608        // Remote peer should still be able to suspend the stream.
2609        let suspend_fut = remote_peer.suspend(&stream_ids);
2610        let mut suspend_fut = pin!(suspend_fut);
2611        match exec.run_until_stalled(&mut suspend_fut) {
2612            Poll::Ready(Ok(())) => {}
2613            x => panic!("Suspend should be ready but got {:?}", x),
2614        };
2615
2616        // Should have stopped the media task on suspend.
2617        assert!(!media_task.is_started());
2618    }
2619
2620    #[fuchsia::test]
2621    fn needs_permit_to_start_streams() {
2622        let mut exec = fasync::TestExecutor::new();
2623
2624        let mut streams = Streams::default();
2625        let mut test_builder = TestMediaTaskBuilder::new();
2626        streams.insert(Stream::build(
2627            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2628            test_builder.builder(),
2629        ));
2630        streams.insert(Stream::build(
2631            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2632            test_builder.builder(),
2633        ));
2634        let mut next_task_fut = test_builder.next_task();
2635
2636        let permits = Permits::new(1);
2637        let taken_permit = permits.get().expect("permit taken");
2638        let (remote, _profile_request_stream, _, peer) =
2639            setup_test_peer(false, streams, Some(permits.clone()));
2640        let remote_peer = avdtp::Peer::new(remote);
2641
2642        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2643
2644        let sbc_caps = sbc_capabilities();
2645        let mut set_config_fut =
2646            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2647
2648        match exec.run_until_stalled(&mut set_config_fut) {
2649            Poll::Ready(Ok(())) => {}
2650            x => panic!("Set capabilities should be ready but got {:?}", x),
2651        };
2652
2653        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2654        match exec.run_until_stalled(&mut open_fut) {
2655            Poll::Ready(Ok(())) => {}
2656            x => panic!("Open should be ready but got {:?}", x),
2657        };
2658
2659        // Establish a media transport stream
2660        let (_remote_transport, transport) = Channel::create();
2661        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2662
2663        // Do the same, but for the OTHER stream.
2664        let sbc_endpoint_two = 2_u8.try_into().unwrap();
2665
2666        let mut set_config_fut =
2667            remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2668
2669        match exec.run_until_stalled(&mut set_config_fut) {
2670            Poll::Ready(Ok(())) => {}
2671            x => panic!("Set capabilities should be ready but got {:?}", x),
2672        };
2673
2674        let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2675        match exec.run_until_stalled(&mut open_fut) {
2676            Poll::Ready(Ok(())) => {}
2677            x => panic!("Open should be ready but got {:?}", x),
2678        };
2679
2680        // Establish a media transport stream
2681        let (_remote_transport_two, transport_two) = Channel::create();
2682        assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2683
2684        // Remote peer should still be able to try to start the stream, and we will say yes, but
2685        // that last seid looks wonky.
2686        let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2687        let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2688        let mut start_fut = remote_peer.start(&stream_ids);
2689        match exec.run_until_stalled(&mut start_fut) {
2690            Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2691                assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2692                assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2693            }
2694            x => panic!("Start should be ready but got {:?}", x),
2695        };
2696
2697        // We can't get a permit (none are available) so we suspend the one we didn't error on.
2698        let mut remote_requests = remote_peer.take_request_stream();
2699
2700        let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2701            Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2702                responder.send().unwrap();
2703                stream_ids
2704            }
2705            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2706        };
2707
2708        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2709        assert_eq!(1, suspended_stream_ids.len());
2710
2711        // And we should have not tried to start a task.
2712        match exec.run_until_stalled(&mut next_task_fut) {
2713            Poll::Pending => {}
2714            x => panic!("Local task should not have been created at this point: {:?}", x),
2715        };
2716
2717        // No matter how many times they ask to start, we will still suspend (but not queue another
2718        // reservation for the same id)
2719        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2720        match exec.run_until_stalled(&mut start_fut) {
2721            Poll::Ready(Ok(())) => {}
2722            x => panic!("Start should be ready but got {:?}", x),
2723        }
2724
2725        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2726            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2727                responder.send().unwrap();
2728                stream_ids
2729            }
2730            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2731        };
2732        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2733
2734        // After a permit is available, should try to start the first endpoint that failed.
2735        drop(taken_permit);
2736
2737        match exec.run_singlethreaded(&mut remote_requests.next()) {
2738            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2739                assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2740                responder.send().unwrap();
2741            }
2742            x => panic!("Expected start on permit available but got {x:?}"),
2743        };
2744
2745        // And we should start a task.
2746        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2747            Poll::Ready(Some(task)) => task,
2748            x => panic!("Local task should be created at this point: {:?}", x),
2749        };
2750
2751        assert!(media_task.is_started());
2752
2753        // If the remote asks to start another one, we still suspend it immediately.
2754        let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2755        match exec.run_until_stalled(&mut start_fut) {
2756            Poll::Ready(Ok(())) => {}
2757            x => panic!("Start should be ready but got {:?}", x),
2758        }
2759
2760        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2761            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2762                responder.send().unwrap();
2763                stream_ids
2764            }
2765            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2766        };
2767
2768        assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2769        assert_eq!(1, suspended_stream_ids.len());
2770
2771        // Once the first one is done, the second can start.
2772        let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2773        match exec.run_until_stalled(&mut suspend_fut) {
2774            Poll::Ready(Ok(())) => {}
2775            x => panic!("Start should be ready but got {:?}", x),
2776        }
2777
2778        match exec.run_singlethreaded(&mut remote_requests.next()) {
2779            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2780                assert_eq!(stream_ids, &[sbc_endpoint_two]);
2781                responder.send().unwrap();
2782            }
2783            x => panic!("Expected start on permit available but got {x:?}"),
2784        };
2785    }
2786
2787    fn start_sbc_stream(
2788        exec: &mut fasync::TestExecutor,
2789        media_test_builder: &mut TestMediaTaskBuilder,
2790        peer: &Peer,
2791        remote_peer: &avdtp::Peer,
2792        local_id: &StreamEndpointId,
2793        remote_id: &StreamEndpointId,
2794    ) -> TestMediaTask {
2795        let sbc_caps = sbc_capabilities();
2796        let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2797        let mut set_config_fut = pin!(set_config_fut);
2798
2799        match exec.run_until_stalled(&mut set_config_fut) {
2800            Poll::Ready(Ok(())) => {}
2801            x => panic!("Set capabilities should be ready but got {:?}", x),
2802        };
2803
2804        let open_fut = remote_peer.open(&local_id);
2805        let mut open_fut = pin!(open_fut);
2806        match exec.run_until_stalled(&mut open_fut) {
2807            Poll::Ready(Ok(())) => {}
2808            x => panic!("Open should be ready but got {:?}", x),
2809        };
2810
2811        // Establish a media transport stream
2812        let (_remote_transport, transport) = Channel::create();
2813        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2814
2815        // Remote peer should still be able to try to start the stream, and we will say yes.
2816        let stream_ids = [local_id.clone()];
2817        let start_fut = remote_peer.start(&stream_ids);
2818        let mut start_fut = pin!(start_fut);
2819        match exec.run_until_stalled(&mut start_fut) {
2820            Poll::Ready(Ok(())) => {}
2821            x => panic!("Start should be ready but got {:?}", x),
2822        };
2823
2824        // And we should start a media task.
2825        let media_task = media_test_builder.expect_task();
2826        assert!(media_task.is_started());
2827
2828        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2829        media_task
2830    }
2831
2832    #[fuchsia::test]
2833    fn permits_can_be_revoked_and_reinstated_all() {
2834        let mut exec = fasync::TestExecutor::new();
2835
2836        let mut streams = Streams::default();
2837        let mut test_builder = TestMediaTaskBuilder::new();
2838        streams.insert(Stream::build(
2839            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2840            test_builder.builder(),
2841        ));
2842        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2843        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2844
2845        streams.insert(Stream::build(
2846            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2847            test_builder.builder(),
2848        ));
2849        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2850        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2851
2852        let permits = Permits::new(2);
2853
2854        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2855        let remote_peer = avdtp::Peer::new(remote);
2856
2857        let one_media_task = start_sbc_stream(
2858            &mut exec,
2859            &mut test_builder,
2860            &peer,
2861            &remote_peer,
2862            &sbc_endpoint_id,
2863            &remote_sbc_endpoint_id,
2864        );
2865        let two_media_task = start_sbc_stream(
2866            &mut exec,
2867            &mut test_builder,
2868            &peer,
2869            &remote_peer,
2870            &sbc2_endpoint_id,
2871            &remote_sbc2_endpoint_id,
2872        );
2873
2874        // Someone comes along and revokes our permits.
2875        let taken_permits = permits.seize();
2876
2877        let remote_endpoints: HashSet<_> =
2878            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2879
2880        // We should send a suspend to the other end, for both of them.
2881        let mut remote_requests = remote_peer.take_request_stream();
2882        let mut expected_suspends = remote_endpoints.clone();
2883        while !expected_suspends.is_empty() {
2884            match exec.run_until_stalled(&mut remote_requests.next()) {
2885                Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2886                    for stream_id in stream_ids {
2887                        assert!(expected_suspends.remove(&stream_id));
2888                    }
2889                    responder.send().expect("send response okay");
2890                }
2891                x => panic!("Expected suspension and got {:?}", x),
2892            }
2893        }
2894
2895        // And the media tasks should be stopped.
2896        assert!(!one_media_task.is_started());
2897        assert!(!two_media_task.is_started());
2898
2899        // After the permits are available again, we send a start, and start the media stream.
2900        drop(taken_permits);
2901
2902        let mut expected_starts = remote_endpoints.clone();
2903        while !expected_starts.is_empty() {
2904            match exec.run_singlethreaded(&mut remote_requests.next()) {
2905                Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2906                    for stream_id in stream_ids {
2907                        assert!(expected_starts.remove(&stream_id));
2908                    }
2909                    responder.send().expect("send response okay");
2910                }
2911                x => panic!("Expected start and got {:?}", x),
2912            }
2913        }
2914        // And we should start two media tasks.
2915
2916        let one_media_task = test_builder.expect_task();
2917        assert!(one_media_task.is_started());
2918        let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2919            Poll::Ready(Some(task)) => task,
2920            x => panic!("Expected another ready task but {x:?}"),
2921        };
2922        assert!(two_media_task.is_started());
2923    }
2924
2925    #[fuchsia::test]
2926    fn permits_can_be_revoked_one_at_a_time() {
2927        let mut exec = fasync::TestExecutor::new();
2928
2929        let mut streams = Streams::default();
2930        let mut test_builder = TestMediaTaskBuilder::new();
2931        streams.insert(Stream::build(
2932            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2933            test_builder.builder(),
2934        ));
2935        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2936        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2937
2938        streams.insert(Stream::build(
2939            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2940            test_builder.builder(),
2941        ));
2942        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2943        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2944
2945        let permits = Permits::new(2);
2946
2947        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2948        let remote_peer = avdtp::Peer::new(remote);
2949
2950        let one_media_task = start_sbc_stream(
2951            &mut exec,
2952            &mut test_builder,
2953            &peer,
2954            &remote_peer,
2955            &sbc_endpoint_id,
2956            &remote_sbc_endpoint_id,
2957        );
2958        let two_media_task = start_sbc_stream(
2959            &mut exec,
2960            &mut test_builder,
2961            &peer,
2962            &remote_peer,
2963            &sbc2_endpoint_id,
2964            &remote_sbc2_endpoint_id,
2965        );
2966
2967        // Someone comes along and revokes one of our permits.
2968        let taken_permit = permits.take();
2969
2970        let remote_endpoints: HashSet<_> =
2971            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2972
2973        // We should send a suspend to the other end, for both of them.
2974        let mut remote_requests = remote_peer.take_request_stream();
2975        let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2976            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2977                assert!(stream_ids.len() == 1);
2978                assert!(remote_endpoints.contains(&stream_ids[0]));
2979                responder.send().expect("send response okay");
2980                stream_ids[0].clone()
2981            }
2982            x => panic!("Expected suspension and got {:?}", x),
2983        };
2984
2985        // And the correct one of the media tasks should be stopped.
2986        if suspended_id == remote_sbc_endpoint_id {
2987            assert!(!one_media_task.is_started());
2988            assert!(two_media_task.is_started());
2989        } else {
2990            assert!(one_media_task.is_started());
2991            assert!(!two_media_task.is_started());
2992        }
2993
2994        // After the permits are available again, we send a start, and start the media stream.
2995        drop(taken_permit);
2996
2997        match exec.run_singlethreaded(&mut remote_requests.next()) {
2998            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2999                assert_eq!(stream_ids, &[suspended_id]);
3000                responder.send().expect("send response okay");
3001            }
3002            x => panic!("Expected start and got {:?}", x),
3003        }
3004        // And we should start another media task.
3005        let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
3006            Poll::Ready(Some(task)) => task,
3007            x => panic!("Expected media task to start: {x:?}"),
3008        };
3009        assert!(media_task.is_started());
3010    }
3011
3012    // Scenario: when we are waiting for a suspend response from the peer after a permit was not
3013    // available, we try to start the peer (because a dwell has expired)
3014    #[fuchsia::test]
3015    fn permit_suspend_start_while_suspending() {
3016        let mut exec = fasync::TestExecutor::new();
3017
3018        let mut streams = Streams::default();
3019        let mut test_builder = TestMediaTaskBuilder::new();
3020        streams.insert(Stream::build(
3021            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3022            test_builder.builder(),
3023        ));
3024        streams.insert(Stream::build(
3025            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3026            test_builder.builder(),
3027        ));
3028        let mut next_task_fut = test_builder.next_task();
3029
3030        let permits = Permits::new(1);
3031        let (remote, _profile_request_stream, _, peer) =
3032            setup_test_peer(false, streams, Some(permits.clone()));
3033
3034        let remote_peer = avdtp::Peer::new(remote);
3035        let mut remote_requests = remote_peer.take_request_stream();
3036
3037        let sbc_endpoint_id = 1_u8.try_into().unwrap();
3038
3039        let sbc_caps = sbc_capabilities();
3040        let mut set_config_fut =
3041            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3042
3043        match exec.run_until_stalled(&mut set_config_fut) {
3044            Poll::Ready(Ok(())) => {}
3045            x => panic!("Set capabilities should be ready but got {:?}", x),
3046        };
3047
3048        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3049        match exec.run_until_stalled(&mut open_fut) {
3050            Poll::Ready(Ok(())) => {}
3051            x => panic!("Open should be ready but got {:?}", x),
3052        };
3053
3054        // Establish a media transport stream
3055        let (_remote_transport, transport) = Channel::create();
3056        assert_eq!(Some(()), peer.receive_channel(transport).ok());
3057
3058        // At this point, we are dwelling, waiting for the peer to start the stream. Skip the timer.
3059        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3060        let Some(_deadline) = exec.wake_next_timer() else {
3061            panic!("Expected a timer to be waiting to run");
3062        };
3063
3064        // We will try to start it ourselves, which will take the only permit and send a start.
3065        let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3066            Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3067                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3068                responder
3069            }
3070            x => panic!("Expected a Start request, got {x:?}"),
3071        };
3072
3073        assert!(permits.get().is_none());
3074
3075        // The peer doesn't notice. Instead try to start it from the peer side (bad timing)
3076        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3077
3078        // We get an OK, and then immediately a suspend request because there are no
3079        // permits available.
3080        match exec.run_singlethreaded(&mut start_fut) {
3081            Ok(()) => {}
3082            x => panic!("Expected OK response from start future but got {x:?}"),
3083        }
3084
3085        let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3086            Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3087                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3088                responder
3089            }
3090            x => panic!("Expected a suspend got {x:?}"),
3091        };
3092
3093        // At this point, the peer notices the start request and responds.
3094        start_responder.send().unwrap();
3095
3096        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3097
3098        // Okay I guess..
3099        suspend_responder.send().unwrap();
3100
3101        // And we should start a task.
3102        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3103            Poll::Ready(Some(task)) => task,
3104            x => panic!("Local task should be created at this point: {:?}", x),
3105        };
3106
3107        assert!(media_task.is_started());
3108    }
3109
3110    /// Test that the version check method correctly differentiates between newer
3111    /// and older A2DP versions.
3112    #[fuchsia::test]
3113    fn version_check() {
3114        let p1: ProfileDescriptor = ProfileDescriptor {
3115            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3116            major_version: Some(1),
3117            minor_version: Some(3),
3118            ..Default::default()
3119        };
3120        assert_eq!(true, a2dp_version_check(p1));
3121
3122        let p1: ProfileDescriptor = ProfileDescriptor {
3123            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3124            major_version: Some(2),
3125            minor_version: Some(10),
3126            ..Default::default()
3127        };
3128        assert_eq!(true, a2dp_version_check(p1));
3129
3130        let p1: ProfileDescriptor = ProfileDescriptor {
3131            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3132            major_version: Some(1),
3133            minor_version: Some(0),
3134            ..Default::default()
3135        };
3136        assert_eq!(false, a2dp_version_check(p1));
3137
3138        let p1: ProfileDescriptor = ProfileDescriptor {
3139            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3140            major_version: None,
3141            minor_version: Some(9),
3142            ..Default::default()
3143        };
3144        assert_eq!(false, a2dp_version_check(p1));
3145
3146        let p1: ProfileDescriptor = ProfileDescriptor {
3147            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3148            major_version: Some(2),
3149            minor_version: Some(2),
3150            ..Default::default()
3151        };
3152        assert_eq!(true, a2dp_version_check(p1));
3153    }
3154}