use anyhow::Context as _;
use bt_avdtp::{
self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
StreamEndpointId,
};
use fidl_fuchsia_bluetooth::ChannelParameters;
use fidl_fuchsia_bluetooth_bredr::{
ConnectParameters, L2capParameters, ProfileDescriptor, ProfileProxy, PSM_AVDTP,
};
use fuchsia_async::{self as fasync, DurationExt};
use fuchsia_bluetooth::inspect::DebugExt;
use fuchsia_bluetooth::types::{Channel, PeerId};
use fuchsia_inspect as inspect;
use fuchsia_inspect_derive::{AttachError, Inspect};
use fuchsia_sync::Mutex;
use futures::channel::mpsc;
use futures::future::{BoxFuture, Either};
use futures::stream::FuturesUnordered;
use futures::task::{Context, Poll, Waker};
use futures::{select, Future, FutureExt, StreamExt};
use log::{debug, info, trace, warn};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::pin::Pin;
use std::sync::{Arc, Weak};
mod controller;
pub use controller::ControllerPool;
use crate::codec::MediaCodecConfig;
use crate::permits::{Permit, Permits};
use crate::stream::{Stream, Streams};
#[derive(Inspect)]
pub struct Peer {
id: PeerId,
#[inspect(forward)]
inner: Arc<Mutex<PeerInner>>,
profile: ProfileProxy,
descriptor: Mutex<Option<ProfileDescriptor>>,
closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
metrics: bt_metrics::MetricsLogger,
start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
}
#[derive(Clone)]
struct StreamPermits {
permits: Permits,
open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
inner: Weak<Mutex<PeerInner>>,
peer_id: PeerId,
sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
}
#[derive(Debug)]
struct StreamPermit {
local_id: StreamEndpointId,
open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
}
impl StreamPermit {
fn local_id(&self) -> &StreamEndpointId {
&self.local_id
}
fn is_held(&self) -> bool {
self.open_streams.lock().contains_key(&self.local_id)
}
}
impl Drop for StreamPermit {
fn drop(&mut self) {
let _ = self.open_streams.lock().remove(&self.local_id);
}
}
impl StreamPermits {
fn new(
inner: Weak<Mutex<PeerInner>>,
peer_id: PeerId,
permits: Permits,
) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
(
Self {
inner,
permits,
peer_id,
sender,
open_streams: Default::default(),
reserved_streams: Default::default(),
},
reservations_receiver,
)
}
fn label_for(&self, local_id: &StreamEndpointId) -> String {
format!("{} {}", self.peer_id, local_id)
}
fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
let revoke_fn = self.make_revocation_fn(&local_id);
let Some(permit) = self.permits.get_revokable(revoke_fn) else {
info!("No permits available: {:?}", self.permits);
return None;
};
permit.relabel(self.label_for(&local_id));
if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
}
Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
}
fn setup_reservation_for(&self, local_id: StreamEndpointId) {
if !self.reserved_streams.lock().insert(local_id.clone()) {
return;
}
let restart_stream_available_fut = {
let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
let reservation = self.permits.reserve_revokable(self_revoke_fn);
let open_streams = self.open_streams.clone();
let reserved_streams = self.reserved_streams.clone();
let label = self.label_for(&local_id);
let local_id = local_id.clone();
async move {
let permit = reservation.await;
permit.relabel(label);
if open_streams.lock().insert(local_id.clone(), permit).is_some() {
warn!("Reservation replaces acquired permit for {}", local_id.clone());
}
if !reserved_streams.lock().remove(&local_id) {
warn!(local_id:%; "Unrecorded reservation resolved");
}
StreamPermit { local_id, open_streams }
}
};
if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
}
}
fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
{
let mut lock = peer.lock();
match lock.suspend_local_stream(&local_id) {
Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
}
}
self.setup_reservation_for(local_id.clone());
}
self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
}
fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit {
let local_id = local_id.clone();
let cloned = self.clone();
move || cloned.revocation_fn(local_id)
}
}
impl Peer {
pub fn create(
id: PeerId,
peer: avdtp::Peer,
streams: Streams,
permits: Option<Permits>,
profile: ProfileProxy,
metrics: bt_metrics::MetricsLogger,
) -> Self {
let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
let reservations_receiver = if let Some(permits) = permits {
let (stream_permits, receiver) =
StreamPermits::new(Arc::downgrade(&inner), id, permits);
inner.lock().permits = Some(stream_permits);
receiver
} else {
let (_, receiver) = mpsc::unbounded();
receiver
};
let res = Self {
id,
inner,
profile,
descriptor: Mutex::new(None),
closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
metrics,
start_stream_task: Mutex::new(None),
};
res.start_requests_task(reservations_receiver);
res
}
pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
self.descriptor.lock().replace(descriptor)
}
const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
let mut lock = self.inner.lock();
if lock.receive_channel(channel)? {
let weak = Arc::downgrade(&self.inner);
let mut task_lock = self.start_stream_task.lock();
*task_lock = Some(fasync::Task::local(async move {
trace!("Dwelling to start remotely-opened stream..");
fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
PeerInner::start_opened(weak).await
}));
}
Ok(())
}
pub fn avdtp(&self) -> avdtp::Peer {
let lock = self.inner.lock();
lock.peer.clone()
}
pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
self.inner.lock().remote_endpoints()
}
pub fn collect_capabilities(
&self,
) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> {
let avdtp = self.avdtp();
let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
let inner = self.inner.clone();
let metrics = self.metrics.clone();
let peer_id = self.id;
async move {
if let Some(caps) = inner.lock().remote_endpoints() {
return Ok(caps);
}
trace!("Discovering peer streams..");
let infos = avdtp.discover().await?;
trace!("Discovered {} streams", infos.len());
let mut remote_streams = Vec::new();
for info in infos {
let capabilities = if get_all {
avdtp.get_all_capabilities(info.id()).await
} else {
avdtp.get_capabilities(info.id()).await
};
match capabilities {
Ok(capabilities) => {
trace!("Stream {:?}", info);
for cap in &capabilities {
trace!(" - {:?}", cap);
}
remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
}
Err(e) => {
info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
}
};
}
inner.lock().set_remote_endpoints(&remote_streams);
Self::record_cobalt_metrics(metrics, &remote_streams);
Ok(remote_streams)
}
}
fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
let codec_metrics: HashSet<_> = endpoints
.iter()
.filter_map(|endpoint| {
endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
})
.collect();
metrics
.log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
let cap_metrics: HashSet<_> = endpoints
.iter()
.flat_map(|endpoint| {
endpoint
.capabilities()
.iter()
.filter_map(|t| capability_to_metric(t))
.chain(std::iter::once(
bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
))
.map(|t| t as u32)
})
.collect();
metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
}
fn transport_channel_params() -> L2capParameters {
L2capParameters {
psm: Some(PSM_AVDTP),
parameters: Some(ChannelParameters {
max_rx_packet_size: Some(65535),
..Default::default()
}),
..Default::default()
}
}
pub fn stream_start(
&self,
remote_id: StreamEndpointId,
capabilities: Vec<ServiceCapability>,
) -> impl Future<Output = avdtp::Result<()>> {
let peer = Arc::downgrade(&self.inner);
let peer_id = self.id.clone();
let avdtp = self.avdtp();
let profile = self.profile.clone();
async move {
let codec_params =
capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
let (local_id, local_capabilities) = {
let peer = PeerInner::upgrade(peer.clone())?;
let lock = peer.lock();
lock.find_compatible_local_capabilities(codec_params, &remote_id)?
};
let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
.into_iter()
.filter_map(|cap| {
let Some(local_cap) = local_by_cat.get(&cap.category()) else {
return None;
};
if cap.category() == ServiceCategory::MediaCodec {
let Ok(a) = MediaCodecConfig::try_from(&cap) else {
return None;
};
let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
return None;
};
let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
return None;
};
Some((cap.category(), (&negotiated).into()))
} else {
Some((cap.category(), cap))
}
})
.collect();
let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
{
let strong = PeerInner::upgrade(peer.clone())?;
strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
}
avdtp.open(&remote_id).await?;
debug!(peer_id:%; "Connecting transport channel");
let channel = profile
.connect(
&peer_id.into(),
&ConnectParameters::L2cap(Self::transport_channel_params()),
)
.await
.context("FIDL error: {}")?
.or(Err(avdtp::Error::PeerDisconnected))?;
trace!(peer_id:%; "Connected transport channel, converting to local Channel");
let channel = match channel.try_into() {
Err(e) => {
warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
return Err(avdtp::Error::PeerDisconnected);
}
Ok(c) => c,
};
trace!(peer_id:%; "Connected transport channel, passing to Peer..");
{
let strong = PeerInner::upgrade(peer.clone())?;
let _ = strong.lock().receive_channel(channel)?;
}
PeerInner::start_opened(peer).await
}
}
pub fn streaming_active(&self) -> bool {
self.inner.lock().is_streaming() || self.will_start_streaming()
}
#[cfg(test)]
fn is_streaming_now(&self) -> bool {
self.inner.lock().is_streaming_now()
}
fn will_start_streaming(&self) -> bool {
let mut task_lock = self.start_stream_task.lock();
if task_lock.is_none() {
return false;
}
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
return true;
}
let _ = task_lock.take();
false
}
pub fn stream_suspend(
&self,
local_id: StreamEndpointId,
) -> impl Future<Output = avdtp::Result<()>> {
let peer = Arc::downgrade(&self.inner);
PeerInner::suspend(peer, local_id)
}
fn start_requests_task(
&self,
mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
) {
let lock = self.inner.lock();
let mut request_stream = lock.peer.take_request_stream().fuse();
let id = self.id.clone();
let peer = Arc::downgrade(&self.inner);
let mut stream_reservations = FuturesUnordered::new();
let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
fuchsia_async::Task::local(async move {
loop {
select! {
request = request_stream.next() => {
match request {
None => break,
Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
Some(Ok(request)) => match peer.upgrade() {
None => return,
Some(p) => {
let result_or_future = p.lock().handle_request(request);
let result = match result_or_future {
Either::Left(result) => result,
Either::Right(future) => future.await,
};
if let Err(e) = result {
warn!(peer_id:% = id, e:?; "Error handling request");
}
}
},
}
},
reservation_fut = reservations_receiver.select_next_some() => {
stream_reservations.push(reservation_fut)
},
permit = stream_reservations.select_next_some() => {
if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
}
}
complete => break,
}
}
info!(peer_id:% = id; "disconnected");
if let Some(wakers) = disconnect_wakers.upgrade() {
for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
waker.wake();
}
}
})
.detach();
}
pub fn closed(&self) -> ClosedPeer {
ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ClosedPeer {
inner: Weak<Mutex<Option<Vec<Waker>>>>,
}
impl Future for ClosedPeer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.upgrade() {
None => Poll::Ready(()),
Some(inner) => match inner.lock().as_mut() {
None => Poll::Ready(()),
Some(wakers) => {
wakers.push(cx.waker().clone());
Poll::Pending
}
},
}
}
}
fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
return false;
};
(major == 1 && minor >= 3) || major > 1
}
struct PeerInner {
peer: avdtp::Peer,
peer_id: PeerId,
opening: Option<StreamEndpointId>,
local: Streams,
permits: Option<StreamPermits>,
started: HashMap<StreamEndpointId, WatchedStream>,
inspect: fuchsia_inspect::Node,
remote_endpoints: Option<Vec<StreamEndpoint>>,
remote_inspect: fuchsia_inspect::Node,
metrics: bt_metrics::MetricsLogger,
}
impl Inspect for &mut PeerInner {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect = parent.create_child(name.as_ref());
self.inspect.record_string("id", self.peer_id.to_string());
self.local.iattach(&self.inspect, "local_streams")
}
}
impl PeerInner {
pub fn new(
peer: avdtp::Peer,
peer_id: PeerId,
local: Streams,
metrics: bt_metrics::MetricsLogger,
) -> Self {
Self {
peer,
peer_id,
opening: None,
local,
permits: None,
started: HashMap::new(),
inspect: Default::default(),
remote_endpoints: None,
remote_inspect: Default::default(),
metrics,
}
}
fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
}
fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
self.remote_inspect = self.inspect.create_child("remote_endpoints");
for endpoint in endpoints {
self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
node.record_string("endpoint_id", endpoint.local_id().debug());
node.record_string("capabilities", endpoint.capabilities().debug());
node.record_string("type", endpoint.endpoint_type().debug());
});
}
self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
}
fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
}
fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
self.remote_endpoints
.as_ref()
.and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
}
fn is_streaming(&self) -> bool {
self.is_streaming_now() || self.opening.is_some()
}
fn is_streaming_now(&self) -> bool {
self.local.streaming().next().is_some()
}
fn set_opening(
&mut self,
local_id: &StreamEndpointId,
remote_id: &StreamEndpointId,
capabilities: Vec<ServiceCapability>,
) -> avdtp::Result<()> {
if self.opening.is_some() {
return Err(avdtp::Error::InvalidState);
}
let peer_id = self.peer_id;
let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
stream
.configure(&peer_id, &remote_id, capabilities)
.map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
self.opening = Some(local_id.clone());
Ok(())
}
fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
}
async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
let (avdtp, stream_pairs) = {
let peer = Self::upgrade(weak.clone())?;
let peer = peer.lock();
let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
.local
.open()
.filter_map(|stream| {
let endpoint = stream.endpoint();
endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
})
.collect();
(peer.peer.clone(), stream_pairs)
};
for (local_id, remote_id) in stream_pairs {
let permit_result =
Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
if let Ok(permit) = permit_result {
Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
.await?;
}
}
Ok(())
}
async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
let local_id = permit.local_id().clone();
let (avdtp, remote_id) = {
let peer = Self::upgrade(weak.clone())?;
let mut peer = peer.lock();
let remote_id = peer
.get_mut(&local_id)
.map_err(|e| avdtp::Error::RequestInvalid(e))?
.endpoint()
.remote_id()
.ok_or(avdtp::Error::InvalidState)?
.clone();
(peer.peer.clone(), remote_id)
};
Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
}
async fn initiated_start(
avdtp: avdtp::Peer,
weak: Weak<Mutex<Self>>,
permit: Option<StreamPermit>,
local_id: &StreamEndpointId,
remote_id: &StreamEndpointId,
) -> avdtp::Result<()> {
trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
let to_start = &[remote_id.clone()];
avdtp.start(to_start).await?;
trace!("Start response received: {permit:?}");
let peer = Self::upgrade(weak.clone())?;
let (peer_id, start_result) = {
let mut peer = peer.lock();
(peer.peer_id, peer.start_local_stream(permit, &local_id))
};
if let Err(e) = start_result {
warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
avdtp.suspend(to_start).await?;
}
Ok(())
}
fn suspend(
weak: Weak<Mutex<Self>>,
local_id: StreamEndpointId,
) -> impl Future<Output = avdtp::Result<()>> {
let res = (move || {
let peer = Self::upgrade(weak.clone())?;
let mut peer = peer.lock();
Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
})();
let (avdtp, remote_id) = match res {
Err(e) => return futures::future::err(e).left_future(),
Ok(r) => r,
};
let to_suspend = &[remote_id];
avdtp.suspend(to_suspend).right_future()
}
pub fn find_compatible_local_capabilities(
&self,
codec_params: &ServiceCapability,
remote_id: &StreamEndpointId,
) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
let config = codec_params.try_into()?;
let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
self.local
.compatible(config)
.find_map(|s| {
let endpoint = s.endpoint();
if let Some(d) = our_direction {
if &d != endpoint.endpoint_type() {
return None;
}
}
Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
})
.ok_or(avdtp::Error::OutOfRange)
}
fn get_permit_or_reserve(
&self,
local_id: &StreamEndpointId,
) -> Result<Option<StreamPermit>, ()> {
let Some(permits) = self.permits.as_ref() else {
return Ok(None);
};
if let Some(permit) = permits.get(local_id.clone()) {
return Ok(Some(permit));
}
info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
permits.setup_reservation_for(local_id.clone());
Err(())
}
fn start_local_stream(
&mut self,
permit: Option<StreamPermit>,
local_id: &StreamEndpointId,
) -> avdtp::Result<()> {
let peer_id = self.peer_id;
let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
if permit.as_ref().is_some_and(|p| !p.is_held()) {
return Err(avdtp::Error::Other(anyhow::format_err!(
"streaming permit revoked during setup"
)));
}
info!(peer_id:%, stream:?; "Starting");
let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
let watched_stream = WatchedStream::new(permit, stream_finished);
if self.started.insert(local_id.clone(), watched_stream).is_some() {
warn!(peer_id:%, local_id:%; "Stream that was already started");
}
Ok(())
}
fn suspend_local_stream(
&mut self,
local_id: &StreamEndpointId,
) -> avdtp::Result<StreamEndpointId> {
let peer_id = self.peer_id;
let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
let _ = self.started.remove(local_id);
Ok(remote_id)
}
fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
let done = !stream.endpoint_mut().receive_channel(channel)?;
if done {
self.opening = None;
}
info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
Ok(done)
}
fn handle_request(
&mut self,
request: avdtp::Request,
) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>>> {
use avdtp::ErrorCode;
use avdtp::Request::*;
trace!("Handling {request:?} from peer..");
let immediate_result = 'result: {
match request {
Discover { responder } => responder.send(&self.local.information()),
GetCapabilities { responder, stream_id }
| GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
None => responder.reject(ErrorCode::BadAcpSeid),
Some(stream) => responder.send(stream.endpoint().capabilities()),
},
Open { responder, stream_id } => {
if self.opening.is_none() {
break 'result responder.reject(ErrorCode::BadState);
}
let Ok(stream) = self.get_mut(&stream_id) else {
break 'result responder.reject(ErrorCode::BadAcpSeid);
};
match stream.endpoint_mut().establish() {
Ok(()) => responder.send(),
Err(_) => responder.reject(ErrorCode::BadState),
}
}
Close { responder, stream_id } => {
let peer = self.peer.clone();
let Ok(stream) = self.get_mut(&stream_id) else {
break 'result responder.reject(ErrorCode::BadAcpSeid);
};
stream.release(responder, &peer)
}
SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
if self.opening.is_some() {
break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
}
let peer_id = self.peer_id;
let Ok(stream) = self.get_mut(&local_stream_id) else {
break 'result responder
.reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
};
match stream.configure(&peer_id, &remote_stream_id, capabilities) {
Ok(_) => {
self.opening = Some(local_stream_id);
responder.send()
}
Err((category, code)) => responder.reject(category, code),
}
}
GetConfiguration { stream_id, responder } => {
let Ok(stream) = self.get_mut(&stream_id) else {
break 'result responder.reject(ErrorCode::BadAcpSeid);
};
let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
break 'result responder.reject(ErrorCode::BadState);
};
responder.send(vec_capabilities.as_slice())
}
Reconfigure { responder, local_stream_id, capabilities } => {
let Ok(stream) = self.get_mut(&local_stream_id) else {
break 'result responder
.reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
};
match stream.reconfigure(capabilities) {
Ok(_) => responder.send(),
Err((cat, code)) => responder.reject(cat, code),
}
}
Start { responder, stream_ids } => {
let mut immediate_suspend = Vec::new();
let result = stream_ids.into_iter().try_for_each(|seid| {
let Some(stream) = self.local.get_mut(&seid) else {
return Err((seid, ErrorCode::BadAcpSeid));
};
let remote_id = stream.endpoint().remote_id().cloned();
let Some(remote_id) = remote_id else {
return Err((seid, ErrorCode::BadState));
};
let Ok(permit) = self.get_permit_or_reserve(&seid) else {
immediate_suspend.push(remote_id);
return Ok(());
};
match self.start_local_stream(permit, &seid) {
Ok(()) => Ok(()),
Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
Err(_) => Err((seid, ErrorCode::BadState)),
}
});
let response_result = match result {
Ok(()) => responder.send(),
Err((seid, code)) => responder.reject(&seid, code),
};
{
let peer = self.peer.clone();
return Either::Right(async move {
if !immediate_suspend.is_empty() {
peer.suspend(immediate_suspend.as_slice()).await?;
}
response_result
});
}
}
Suspend { responder, stream_ids } => {
for seid in stream_ids {
match self.suspend_local_stream(&seid) {
Ok(_remote_id) => {}
Err(avdtp::Error::RequestInvalid(code)) => {
break 'result responder.reject(&seid, code)
}
Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
}
}
responder.send()
}
Abort { responder, stream_id } => {
let Ok(stream) = self.get_mut(&stream_id) else {
break 'result Ok(());
};
stream.abort();
self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
responder.send()
}
DelayReport { responder, delay, stream_id } => {
let delay_ns = delay as u64 * 100000;
self.metrics.log_integer(
bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
delay_ns.try_into().unwrap_or(-1),
vec![],
);
let Some(stream) = self.local.get_mut(&stream_id) else {
break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
};
let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
let peer = self.peer_id;
match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
Err(avdtp::ErrorCode::BadState) => {
info!(peer:%, stream_id:%; "bad state {delay_str}");
break 'result responder.reject(avdtp::ErrorCode::BadState);
}
Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
};
responder.send()
}
}
};
Either::Left(immediate_result)
}
}
struct WatchedStream {
_permit_task: fasync::Task<()>,
}
impl WatchedStream {
fn new(
permit: Option<StreamPermit>,
finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
) -> Self {
let permit_task = fasync::Task::spawn(async move {
let _ = finish_fut.await;
drop(permit);
});
Self { _permit_task: permit_task }
}
}
fn codectype_to_availability_metric(
codec_type: &MediaCodecType,
) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
match codec_type {
&MediaCodecType::AUDIO_SBC => {
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
}
&MediaCodecType::AUDIO_MPEG12 => {
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
}
&MediaCodecType::AUDIO_AAC => {
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
}
&MediaCodecType::AUDIO_ATRAC => {
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
}
&MediaCodecType::AUDIO_NON_A2DP => {
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
}
_ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
}
}
fn capability_to_metric(
cap: &ServiceCapability,
) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
match cap {
ServiceCapability::DelayReporting => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
}
ServiceCapability::Reporting => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
}
ServiceCapability::Recovery { .. } => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
}
ServiceCapability::ContentProtection { .. } => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
}
ServiceCapability::HeaderCompression { .. } => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
}
ServiceCapability::Multiplexing { .. } => {
Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
}
other => {
trace!("untracked remote peer capability: {:?}", other);
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use bt_metrics::respond_to_metrics_req_for_test;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_bluetooth::ErrorCode;
use fidl_fuchsia_bluetooth_bredr::{
ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
};
use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
use futures::future::Either;
use std::pin::pin;
use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
use crate::media_types::*;
use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
fn fake_metrics(
) -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
let (c, s) = fidl::endpoints::create_proxy_and_stream::<
fidl_fuchsia_metrics::MetricEventLoggerMarker,
>();
(bt_metrics::MetricsLogger::from_proxy(c), s)
}
fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
let (remote, signaling) = Channel::create();
let peer = avdtp::Peer::new(signaling);
(peer, remote)
}
fn build_test_streams() -> Streams {
let mut streams = Streams::default();
let source = Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Source),
TestMediaTaskBuilder::new_delayable().builder(),
);
streams.insert(source);
let sink = Stream::build(
make_sbc_endpoint(2, avdtp::EndpointType::Sink),
TestMediaTaskBuilder::new().builder(),
);
streams.insert(sink);
streams
}
fn build_test_streams_delayable() -> Streams {
fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
StreamEndpoint::new(
seid,
avdtp::MediaType::Audio,
direction,
vec![
avdtp::ServiceCapability::MediaTransport,
avdtp::ServiceCapability::DelayReporting,
sbc_mediacodec_capability(),
],
)
.expect("endpoint creation should succeed")
}
let mut streams = Streams::default();
let source = Stream::build(
with_delay(1, avdtp::EndpointType::Source),
TestMediaTaskBuilder::new_delayable().builder(),
);
streams.insert(source);
let sink = Stream::build(
with_delay(2, avdtp::EndpointType::Sink),
TestMediaTaskBuilder::new().builder(),
);
streams.insert(sink);
streams
}
pub(crate) fn recv_remote(remote: &Channel) -> Result<Vec<u8>, zx::Status> {
remote.read_packet()
}
fn setup_test_peer(
use_cobalt: bool,
streams: Streams,
permits: Option<Permits>,
) -> (
Channel,
ProfileRequestStream,
Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
Peer,
) {
let (avdtp, remote) = setup_avdtp_peer();
let (metrics_logger, cobalt_receiver) = if use_cobalt {
let (l, r) = fake_metrics();
(l, Some(r))
} else {
(bt_metrics::MetricsLogger::default(), None)
};
let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
(remote, requests, cobalt_receiver, peer)
}
fn expect_get_capabilities_and_respond(
remote: &Channel,
expected_seid: u8,
response_capabilities: &[u8],
) {
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
let txlabel_raw = received[0] & 0xF0;
#[rustfmt::skip]
let mut get_capabilities_rsp = vec![
txlabel_raw << 4 | 0x2, 0x02 ];
get_capabilities_rsp.extend_from_slice(response_capabilities);
assert!(remote.write(&get_capabilities_rsp).is_ok());
}
fn expect_get_all_capabilities_and_respond(
remote: &Channel,
expected_seid: u8,
response_capabilities: &[u8],
) {
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x0C, received[1]); assert_eq!(expected_seid << 2, received[2]);
let txlabel_raw = received[0] & 0xF0;
#[rustfmt::skip]
let mut get_capabilities_rsp = vec![
txlabel_raw << 4 | 0x2, 0x0C ];
get_capabilities_rsp.extend_from_slice(response_capabilities);
assert!(remote.write(&get_capabilities_rsp).is_ok());
}
#[fuchsia::test]
fn disconnected() {
let mut exec = fasync::TestExecutor::new();
let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
let (remote, signaling) = Channel::create();
let id = PeerId(1);
let avdtp = avdtp::Peer::new(signaling);
let peer = Peer::create(
id,
avdtp,
Streams::default(),
None,
proxy,
bt_metrics::MetricsLogger::default(),
);
let closed_fut = peer.closed();
let mut closed_fut = pin!(closed_fut);
assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
drop(remote);
assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
}
#[fuchsia::test]
fn peer_collect_capabilities_success() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
let p: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(2),
..Default::default()
};
let _ = peer.set_descriptor(p);
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
assert!(remote.write(response).is_ok());
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
#[rustfmt::skip]
let capabilities_rsp = &[
0x01, 0x00,
0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
];
expect_get_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
#[rustfmt::skip]
let capabilities_rsp = &[
0x01, 0x00,
0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
];
expect_get_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
match exec.run_until_stalled(&mut collect_future) {
Poll::Pending => panic!("collect capabilities should be complete"),
Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
Poll::Ready(Ok(endpoints)) => {
let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
for stream in endpoints {
if stream.local_id() == &first_seid {
let expected_caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::new(0x04),
codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
},
];
assert_eq!(&expected_caps, stream.capabilities());
} else if stream.local_id() == &second_seid {
let expected_codec_type = avdtp::MediaCodecType::new(0x00);
assert_eq!(Some(&expected_codec_type), stream.codec_type());
} else {
panic!("Unexpected endpoint in the streams collected");
}
}
}
}
let mut recv = cobalt_receiver.expect("should have receiver");
let mut log_events = Vec::new();
while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
log_events.push(respond_to_metrics_req_for_test(req));
}
assert_eq!(3, log_events.len());
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
],
payload: MetricEventPayload::Count(1),
}));
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
],
payload: MetricEventPayload::Count(1),
}));
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
],
payload: MetricEventPayload::Count(1),
}));
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
match exec.run_until_stalled(&mut collect_future) {
Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
x => panic!("Expected get remote capabilities to be done, got {:?}", x),
};
}
#[fuchsia::test]
fn peer_collect_all_capabilities_success() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
let p: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(3),
..Default::default()
};
let _ = peer.set_descriptor(p);
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
assert!(remote.write(response).is_ok());
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
#[rustfmt::skip]
let capabilities_rsp = &[
0x01, 0x00,
0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
0x08, 0x00
];
expect_get_all_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
#[rustfmt::skip]
let capabilities_rsp = &[
0x01, 0x00,
0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
];
expect_get_all_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
match exec.run_until_stalled(&mut collect_future) {
Poll::Pending => panic!("collect capabilities should be complete"),
Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
Poll::Ready(Ok(endpoints)) => {
let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
for stream in endpoints {
if stream.local_id() == &first_seid {
let expected_caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::new(0x40),
codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
},
ServiceCapability::DelayReporting,
];
assert_eq!(&expected_caps, stream.capabilities());
} else if stream.local_id() == &second_seid {
let expected_codec_type = avdtp::MediaCodecType::new(0x00);
assert_eq!(Some(&expected_codec_type), stream.codec_type());
} else {
panic!("Unexpected endpoint in the streams collected");
}
}
}
}
let mut recv = cobalt_receiver.expect("should have receiver");
let mut log_events = Vec::new();
while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
log_events.push(respond_to_metrics_req_for_test(req));
}
assert_eq!(4, log_events.len());
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
],
payload: MetricEventPayload::Count(1),
}));
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
],
payload: MetricEventPayload::Count(1),
}));
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
],
payload: MetricEventPayload::Count(1),
}));
assert!(log_events.contains(&MetricEvent {
metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
event_codes: vec![
bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
],
payload: MetricEventPayload::Count(1),
}));
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
match exec.run_until_stalled(&mut collect_future) {
Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
x => panic!("Expected get remote capabilities to be done, got {:?}", x),
};
}
#[fuchsia::test]
fn peer_collect_capabilities_discovery_fails() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw | 0x0 << 2 | 0x3, 0x01, 0x31, ];
assert!(remote.write(response).is_ok());
match exec.run_until_stalled(&mut collect_future) {
Poll::Pending => panic!("Should be ready after discovery failure"),
Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
}
Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
}
}
#[fuchsia::test]
fn peer_collect_capabilities_get_capability_fails() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
let collect_future = peer.collect_capabilities();
let mut collect_future = pin!(collect_future);
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
assert!(remote.write(response).is_ok());
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let expected_seid = 0x3E;
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
assert!(remote.write(response).is_ok());
assert!(exec.run_until_stalled(&mut collect_future).is_pending());
let expected_seid = 0x01;
let received = recv_remote(&remote).unwrap();
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
assert!(remote.write(response).is_ok());
match exec.run_until_stalled(&mut collect_future) {
Poll::Pending => panic!("Should be ready after discovery failure"),
Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
}
}
fn receive_simple_accept(remote: &Channel, signal_id: u8) {
let received = recv_remote(&remote).expect("expected a packet");
assert_eq!(0x00, received[0] & 0xF);
assert_eq!(signal_id, received[1]);
let txlabel_raw = received[0] & 0xF0;
let response: &[u8] = &[
txlabel_raw | 0x0 << 2 | 0x2, signal_id,
];
assert!(remote.write(response).is_ok());
}
#[fuchsia::test]
fn peer_stream_start_success() {
let mut exec = fasync::TestExecutor::new();
let (remote, mut profile_request_stream, _, peer) =
setup_test_peer(false, build_test_streams(), None);
let remote_seid = 2_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(remote_seid, vec![codec_params]);
let mut start_future = pin!(start_future);
match exec.run_until_stalled(&mut start_future) {
Poll::Pending => {}
x => panic!("Expected pending, but got {x:?}"),
};
receive_simple_accept(&remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
receive_simple_accept(&remote, 0x06); match exec.run_until_stalled(&mut start_future) {
Poll::Pending => {}
Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
};
let (_, transport) = Channel::create();
let request = exec.run_until_stalled(&mut profile_request_stream.next());
match request {
Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
assert_eq!(PeerId(1), peer_id.into());
assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
let channel = transport.try_into().unwrap();
responder.send(Ok(channel)).expect("responder sends");
}
x => panic!("Should have sent a open l2cap request, but got {:?}", x),
};
match exec.run_until_stalled(&mut start_future) {
Poll::Pending => {}
Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
};
receive_simple_accept(&remote, 0x07); match exec.run_until_stalled(&mut start_future) {
Poll::Pending => panic!("Should be ready after start succeeds"),
Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
Poll::Ready(Ok(())) => {
assert!(peer.is_streaming_now());
}
}
}
#[fuchsia::test]
fn peer_stream_start_picks_correct_direction() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
let remote = avdtp::Peer::new(remote);
let mut remote_events = remote.take_request_stream();
fn remote_handle_request(req: avdtp::Request) {
let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
let res = match req {
avdtp::Request::Discover { responder } => {
let infos = [avdtp::StreamInformation::new(
expected_stream_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
)];
responder.send(&infos)
}
avdtp::Request::GetAllCapabilities { stream_id, responder }
| avdtp::Request::GetCapabilities { stream_id, responder } => {
assert_eq!(expected_stream_id, stream_id);
let caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 250],
},
];
responder.send(&caps[..])
}
avdtp::Request::Open { responder, stream_id } => {
assert_eq!(expected_stream_id, stream_id);
responder.send()
}
avdtp::Request::SetConfiguration {
responder,
local_stream_id,
remote_stream_id,
..
} => {
assert_eq!(local_stream_id, expected_stream_id);
assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
responder.send()
}
x => panic!("Unexpected request: {:?}", x),
};
res.expect("should be able to respond");
}
let collect_capabilities_fut = peer.collect_capabilities();
let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a discovery request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
let remote_seid = 4_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(remote_seid, vec![codec_params]);
let mut start_future = pin!(start_future);
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have an open request").unwrap());
}
#[fuchsia::test]
fn peer_stream_start_strips_unsupported_local_capabilities() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
let remote = avdtp::Peer::new(remote);
let mut remote_events = remote.take_request_stream();
fn remote_handle_request(req: avdtp::Request) {
let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
let res = match req {
avdtp::Request::Discover { responder } => {
let infos = [avdtp::StreamInformation::new(
expected_stream_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
)];
responder.send(&infos)
}
avdtp::Request::GetAllCapabilities { stream_id, responder }
| avdtp::Request::GetCapabilities { stream_id, responder } => {
assert_eq!(expected_stream_id, stream_id);
let caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::DelayReporting,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_AAC,
codec_extra: vec![128, 0, 132, 134, 0, 0],
},
];
responder.send(&caps[..])
}
avdtp::Request::Open { responder, stream_id } => {
assert_eq!(expected_stream_id, stream_id);
responder.send()
}
avdtp::Request::SetConfiguration {
responder,
local_stream_id,
remote_stream_id,
capabilities,
} => {
assert_eq!(local_stream_id, expected_stream_id);
assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
responder.send()
}
x => panic!("Unexpected request: {:?}", x),
};
res.expect("should be able to respond");
}
let collect_capabilities_fut = peer.collect_capabilities();
let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a discovery request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
let remote_seid = 4_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future =
peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
let mut start_future = pin!(start_future);
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a set_configuration request").unwrap());
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have an open request").unwrap());
}
#[fuchsia::test]
fn peer_stream_start_orders_local_capabilities() {
let mut exec = fasync::TestExecutor::new();
let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
let remote = avdtp::Peer::new(remote);
let mut remote_events = remote.take_request_stream();
fn remote_handle_request(req: avdtp::Request) {
let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
let res = match req {
avdtp::Request::Discover { responder } => {
let infos = [avdtp::StreamInformation::new(
expected_stream_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
)];
responder.send(&infos)
}
avdtp::Request::GetAllCapabilities { stream_id, responder }
| avdtp::Request::GetCapabilities { stream_id, responder } => {
assert_eq!(expected_stream_id, stream_id);
let caps = &[
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 250],
},
ServiceCapability::DelayReporting,
];
responder.send(caps)
}
avdtp::Request::Open { responder, stream_id } => {
assert_eq!(expected_stream_id, stream_id);
responder.send()
}
avdtp::Request::SetConfiguration {
responder,
local_stream_id,
remote_stream_id,
capabilities,
} => {
assert_eq!(local_stream_id, expected_stream_id);
assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
let mut capabilities_ordered = capabilities.clone();
capabilities_ordered.sort_by_key(ServiceCapability::category);
assert_eq!(capabilities, capabilities_ordered);
responder.send()
}
x => panic!("Unexpected request: {:?}", x),
};
res.expect("should be able to respond");
}
let collect_capabilities_fut = peer.collect_capabilities();
let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a discovery request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
let remote_seid = 4_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(
remote_seid,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::DelayReporting,
codec_params,
],
);
let mut start_future = pin!(start_future);
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a set_configuration request").unwrap());
assert!(exec.run_until_stalled(&mut start_future).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have an open request").unwrap());
}
#[fuchsia::test]
fn peer_stream_start_permit_revoked() {
let mut exec = fasync::TestExecutor::new();
let test_permits = Permits::new(1);
let (remote, mut profile_request_stream, _, peer) =
setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
let remote_seid = 2_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(remote_seid, vec![codec_params]);
let mut start_future = pin!(start_future);
let _ = exec
.run_until_stalled(&mut start_future)
.expect_pending("waiting for set config response");
receive_simple_accept(&remote, 0x03); exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
receive_simple_accept(&remote, 0x06); exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
assert!(!peer.is_streaming_now());
let (_, transport) = Channel::create();
let request = exec.run_until_stalled(&mut profile_request_stream.next());
match request {
Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
assert_eq!(PeerId(1), peer_id.into());
assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
let channel = transport.try_into().unwrap();
responder.send(Ok(channel)).expect("responder sends");
}
x => panic!("Should have sent a open l2cap request, but got {:?}", x),
};
exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
assert!(!peer.is_streaming_now());
let seized_permits = test_permits.seize();
assert_eq!(seized_permits.len(), 1);
receive_simple_accept(&remote, 0x07); exec.run_until_stalled(&mut start_future)
.expect_pending("waiting to send outgoing suspend");
assert!(!peer.is_streaming_now());
receive_simple_accept(&remote, 0x09); let () = exec
.run_until_stalled(&mut start_future)
.expect("start finished")
.expect("suspended stream is ok");
assert!(!peer.is_streaming_now());
}
#[fuchsia::test]
fn peer_stream_start_fails_wrong_direction() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let source = Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Source),
TestMediaTaskBuilder::new().builder(),
);
streams.insert(source);
let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
let remote = avdtp::Peer::new(remote);
let mut remote_events = remote.take_request_stream();
fn remote_handle_request(req: avdtp::Request) {
let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
let res = match req {
avdtp::Request::Discover { responder } => {
let infos = [avdtp::StreamInformation::new(
expected_stream_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
)];
responder.send(&infos)
}
avdtp::Request::GetAllCapabilities { stream_id, responder }
| avdtp::Request::GetCapabilities { stream_id, responder } => {
assert_eq!(expected_stream_id, stream_id);
let caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 250],
},
];
responder.send(&caps[..])
}
avdtp::Request::Open { responder, .. } => responder.send(),
avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
x => panic!("Unexpected request: {:?}", x),
};
res.expect("should be able to respond");
}
let collect_capabilities_fut = peer.collect_capabilities();
let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a discovery request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
let request = exec.run_singlethreaded(&mut remote_events.next());
remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
let remote_seid = 2_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(remote_seid, vec![codec_params]);
let mut start_future = pin!(start_future);
match exec.run_until_stalled(&mut start_future) {
Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
x => panic!("Expected a ready OutOfRange error but got {:?}", x),
};
}
#[fuchsia::test]
fn peer_stream_start_fails_to_connect() {
let mut exec = fasync::TestExecutor::new();
let (remote, mut profile_request_stream, _, peer) =
setup_test_peer(false, build_test_streams(), None);
let remote_seid = 2_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let start_future = peer.stream_start(remote_seid, vec![codec_params]);
let mut start_future = pin!(start_future);
match exec.run_until_stalled(&mut start_future) {
Poll::Pending => {}
x => panic!("was expecting pending but got {x:?}"),
};
receive_simple_accept(&remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
receive_simple_accept(&remote, 0x06); match exec.run_until_stalled(&mut start_future) {
Poll::Pending => {}
Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
};
let request = exec.run_until_stalled(&mut profile_request_stream.next());
match request {
Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
assert_eq!(PeerId(1), peer_id.into());
responder.send(Err(ErrorCode::Failed)).expect("responder sends");
}
x => panic!("Should have sent a open l2cap request, but got {:?}", x),
};
match exec.run_until_stalled(&mut start_future) {
Poll::Pending => panic!("Should be ready after start fails"),
Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
Poll::Ready(Err(_)) => {}
}
}
#[fuchsia::test]
async fn peer_delay_report() {
let (remote, _profile_requests, cobalt_recv, peer) =
setup_test_peer(true, build_test_streams(), None);
let remote_peer = avdtp::Peer::new(remote);
let mut remote_events = remote_peer.take_request_stream();
async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
use avdtp::Request::*;
match req {
Discover { responder } => {
let infos = [avdtp::StreamInformation::new(
expected_stream_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Sink,
)];
responder.send(&infos).expect("response should succeed");
}
GetAllCapabilities { stream_id, responder }
| GetCapabilities { stream_id, responder } => {
assert_eq!(expected_stream_id, stream_id);
let caps = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 250],
},
];
responder.send(&caps[..]).expect("response should succeed");
assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
}
Open { responder, stream_id } => {
assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
peer.delay_report(&expected_peer_stream_id, 0xc0de)
.await
.expect("should get acked correctly");
assert_eq!(expected_stream_id, stream_id);
responder.send().expect("response should succeed");
}
SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
assert_eq!(local_stream_id, expected_stream_id);
assert_eq!(remote_stream_id, expected_peer_stream_id);
responder.send().expect("should send back response without issue");
}
x => panic!("Unexpected request: {:?}", x),
};
}
let collect_fut = pin!(peer.collect_capabilities());
let Either::Left((request, collect_fut)) =
futures::future::select(remote_events.next(), collect_fut).await
else {
panic!("Collect future shouldn't finish first");
};
let collect_fut = pin!(collect_fut);
remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
let Either::Left((request, collect_fut)) =
futures::future::select(remote_events.next(), collect_fut).await
else {
panic!("Collect future shouldn't finish first");
};
remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
let remote_seid = 4_u8.try_into().unwrap();
let codec_params = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x45, 51, 51],
};
let _start_task = fasync::Task::spawn(async move {
let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
panic!("stream start task finished");
});
let request = remote_events.next().await.expect("should have set_config").unwrap();
remote_handle_request(request, &remote_peer).await;
let request = remote_events.next().await.expect("should have open").unwrap();
remote_handle_request(request, &remote_peer).await;
let mut cobalt = cobalt_recv.expect("should have receiver");
let mut got_ids = HashMap::new();
let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
if report.metric_id == delay_metric_id {
assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
}
}
assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
assert!(got_ids.contains_key(&delay_metric_id));
assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
}
fn sbc_capabilities() -> Vec<ServiceCapability> {
let sbc_codec_info = SbcCodecInfo::new(
SbcSamplingFrequency::FREQ48000HZ,
SbcChannelMode::JOINT_STEREO,
SbcBlockCount::SIXTEEN,
SbcSubBands::EIGHT,
SbcAllocation::LOUDNESS,
53,
53,
)
.expect("sbc codec info");
vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
}
#[fuchsia::test]
fn peer_as_acceptor() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Source),
test_builder.builder(),
));
let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
let remote_peer = avdtp::Peer::new(remote);
let discover_fut = remote_peer.discover();
let mut discover_fut = pin!(discover_fut);
let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
match exec.run_until_stalled(&mut discover_fut) {
Poll::Ready(Ok(res)) => assert_eq!(res, expected),
x => panic!("Expected discovery to complete and got {:?}", x),
};
let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
let mut get_caps_fut = pin!(get_caps_fut);
match exec.run_until_stalled(&mut get_caps_fut) {
Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
x => panic!("Get capabilities should be ready but got {:?}", x),
};
let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
let mut get_caps_fut = pin!(get_caps_fut);
match exec.run_until_stalled(&mut get_caps_fut) {
Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
}
x => panic!("Get capabilities should be a ready error but got {:?}", x),
};
let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
let mut get_caps_fut = pin!(get_caps_fut);
match exec.run_until_stalled(&mut get_caps_fut) {
Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
x => panic!("Get capabilities should be ready but got {:?}", x),
};
let sbc_caps = sbc_capabilities();
let set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let open_fut = remote_peer.open(&sbc_endpoint_id);
let mut open_fut = pin!(open_fut);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport, transport) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport).ok());
let stream_ids = vec![sbc_endpoint_id.clone()];
let start_fut = remote_peer.start(&stream_ids);
let mut start_fut = pin!(start_fut);
match exec.run_until_stalled(&mut start_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
};
let media_task = test_builder.expect_task();
assert!(media_task.is_started());
let suspend_fut = remote_peer.suspend(&stream_ids);
let mut suspend_fut = pin!(suspend_fut);
match exec.run_until_stalled(&mut suspend_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
};
assert!(!media_task.is_started());
}
#[fuchsia::test]
fn peer_set_config_reject_first() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Source),
test_builder.builder(),
));
let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
let remote_peer = avdtp::Peer::new(remote);
let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
let wrong_freq_sbc = &[SbcCodecInfo::new(
SbcSamplingFrequency::FREQ44100HZ, SbcChannelMode::JOINT_STEREO,
SbcBlockCount::SIXTEEN,
SbcSubBands::EIGHT,
SbcAllocation::LOUDNESS,
53,
53,
)
.expect("sbc codec info")
.into()];
let set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
assert!(e.service_category().is_some())
}
x => panic!("Set capabilities should have been rejected but got {:?}", x),
};
let sbc_caps = sbc_capabilities();
let set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
}
#[fuchsia::test]
fn peer_starts_waiting_streams() {
let mut exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Source),
test_builder.builder(),
));
let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
let remote_peer = avdtp::Peer::new(remote);
let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
let sbc_caps = sbc_capabilities();
let set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let open_fut = remote_peer.open(&sbc_endpoint_id);
let mut open_fut = pin!(open_fut);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport, transport) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport).ok());
let mut remote_requests = remote_peer.take_request_stream();
let next_remote_request_fut = remote_requests.next();
let mut next_remote_request_fut = pin!(next_remote_request_fut);
assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
let _ = exec.wake_expired_timers();
let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
responder.send().unwrap();
stream_ids
}
x => panic!("Expected to receive a start request for the stream, got {:?}", x),
};
let media_task =
exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
assert!(media_task.is_started());
let suspend_fut = remote_peer.suspend(&stream_ids);
let mut suspend_fut = pin!(suspend_fut);
match exec.run_until_stalled(&mut suspend_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Suspend should be ready but got {:?}", x),
};
assert!(!media_task.is_started());
}
#[fuchsia::test]
fn needs_permit_to_start_streams() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Sink),
test_builder.builder(),
));
streams.insert(Stream::build(
make_sbc_endpoint(2, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let mut next_task_fut = test_builder.next_task();
let permits = Permits::new(1);
let taken_permit = permits.get().expect("permit taken");
let (remote, _profile_request_stream, _, peer) =
setup_test_peer(false, streams, Some(permits.clone()));
let remote_peer = avdtp::Peer::new(remote);
let sbc_endpoint_id = 1_u8.try_into().unwrap();
let sbc_caps = sbc_capabilities();
let mut set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let mut open_fut = remote_peer.open(&sbc_endpoint_id);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport, transport) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport).ok());
let sbc_endpoint_two = 2_u8.try_into().unwrap();
let mut set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let mut open_fut = remote_peer.open(&sbc_endpoint_two);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport_two, transport_two) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
let mut start_fut = remote_peer.start(&stream_ids);
match exec.run_until_stalled(&mut start_fut) {
Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
}
x => panic!("Start should be ready but got {:?}", x),
};
let mut remote_requests = remote_peer.take_request_stream();
let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
responder.send().unwrap();
stream_ids
}
x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
};
assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
assert_eq!(1, suspended_stream_ids.len());
match exec.run_until_stalled(&mut next_task_fut) {
Poll::Pending => {}
x => panic!("Local task should not have been created at this point: {:?}", x),
};
let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
match exec.run_until_stalled(&mut start_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
}
let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
responder.send().unwrap();
stream_ids
}
x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
};
assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
drop(taken_permit);
match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
responder.send().unwrap();
}
x => panic!("Expected start on permit available but got {x:?}"),
};
let media_task = match exec.run_until_stalled(&mut next_task_fut) {
Poll::Ready(Some(task)) => task,
x => panic!("Local task should be created at this point: {:?}", x),
};
assert!(media_task.is_started());
let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
match exec.run_until_stalled(&mut start_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
}
let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
responder.send().unwrap();
stream_ids
}
x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
};
assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
assert_eq!(1, suspended_stream_ids.len());
let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
match exec.run_until_stalled(&mut suspend_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
}
match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
assert_eq!(stream_ids, &[sbc_endpoint_two]);
responder.send().unwrap();
}
x => panic!("Expected start on permit available but got {x:?}"),
};
}
fn start_sbc_stream(
exec: &mut fasync::TestExecutor,
media_test_builder: &mut TestMediaTaskBuilder,
peer: &Peer,
remote_peer: &avdtp::Peer,
local_id: &StreamEndpointId,
remote_id: &StreamEndpointId,
) -> TestMediaTask {
let sbc_caps = sbc_capabilities();
let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let open_fut = remote_peer.open(&local_id);
let mut open_fut = pin!(open_fut);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport, transport) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport).ok());
let stream_ids = [local_id.clone()];
let start_fut = remote_peer.start(&stream_ids);
let mut start_fut = pin!(start_fut);
match exec.run_until_stalled(&mut start_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Start should be ready but got {:?}", x),
};
let media_task = media_test_builder.expect_task();
assert!(media_task.is_started());
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
media_task
}
#[fuchsia::test]
fn permits_can_be_revoked_and_reinstated_all() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let sbc_endpoint_id = 1_u8.try_into().unwrap();
let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
streams.insert(Stream::build(
make_sbc_endpoint(2, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let sbc2_endpoint_id = 2_u8.try_into().unwrap();
let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
let permits = Permits::new(2);
let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
let remote_peer = avdtp::Peer::new(remote);
let one_media_task = start_sbc_stream(
&mut exec,
&mut test_builder,
&peer,
&remote_peer,
&sbc_endpoint_id,
&remote_sbc_endpoint_id,
);
let two_media_task = start_sbc_stream(
&mut exec,
&mut test_builder,
&peer,
&remote_peer,
&sbc2_endpoint_id,
&remote_sbc2_endpoint_id,
);
let taken_permits = permits.seize();
let remote_endpoints: HashSet<_> =
[&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
let mut remote_requests = remote_peer.take_request_stream();
let mut expected_suspends = remote_endpoints.clone();
while !expected_suspends.is_empty() {
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
for stream_id in stream_ids {
assert!(expected_suspends.remove(&stream_id));
}
responder.send().expect("send response okay");
}
x => panic!("Expected suspension and got {:?}", x),
}
}
assert!(!one_media_task.is_started());
assert!(!two_media_task.is_started());
drop(taken_permits);
let mut expected_starts = remote_endpoints.clone();
while !expected_starts.is_empty() {
match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
for stream_id in stream_ids {
assert!(expected_starts.remove(&stream_id));
}
responder.send().expect("send response okay");
}
x => panic!("Expected start and got {:?}", x),
}
}
let one_media_task = test_builder.expect_task();
assert!(one_media_task.is_started());
let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
Poll::Ready(Some(task)) => task,
x => panic!("Expected another ready task but {x:?}"),
};
assert!(two_media_task.is_started());
}
#[fuchsia::test]
fn permits_can_be_revoked_one_at_a_time() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let sbc_endpoint_id = 1_u8.try_into().unwrap();
let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
streams.insert(Stream::build(
make_sbc_endpoint(2, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let sbc2_endpoint_id = 2_u8.try_into().unwrap();
let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
let permits = Permits::new(2);
let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
let remote_peer = avdtp::Peer::new(remote);
let one_media_task = start_sbc_stream(
&mut exec,
&mut test_builder,
&peer,
&remote_peer,
&sbc_endpoint_id,
&remote_sbc_endpoint_id,
);
let two_media_task = start_sbc_stream(
&mut exec,
&mut test_builder,
&peer,
&remote_peer,
&sbc2_endpoint_id,
&remote_sbc2_endpoint_id,
);
let taken_permit = permits.take();
let remote_endpoints: HashSet<_> =
[&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
let mut remote_requests = remote_peer.take_request_stream();
let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
assert!(stream_ids.len() == 1);
assert!(remote_endpoints.contains(&stream_ids[0]));
responder.send().expect("send response okay");
stream_ids[0].clone()
}
x => panic!("Expected suspension and got {:?}", x),
};
if suspended_id == remote_sbc_endpoint_id {
assert!(!one_media_task.is_started());
assert!(two_media_task.is_started());
} else {
assert!(one_media_task.is_started());
assert!(!two_media_task.is_started());
}
drop(taken_permit);
match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
assert_eq!(stream_ids, &[suspended_id]);
responder.send().expect("send response okay");
}
x => panic!("Expected start and got {:?}", x),
}
let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
Poll::Ready(Some(task)) => task,
x => panic!("Expected media task to start: {x:?}"),
};
assert!(media_task.is_started());
}
#[fuchsia::test]
fn permit_suspend_start_while_suspending() {
let mut exec = fasync::TestExecutor::new();
let mut streams = Streams::default();
let mut test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, avdtp::EndpointType::Sink),
test_builder.builder(),
));
streams.insert(Stream::build(
make_sbc_endpoint(2, avdtp::EndpointType::Sink),
test_builder.builder(),
));
let mut next_task_fut = test_builder.next_task();
let permits = Permits::new(1);
let (remote, _profile_request_stream, _, peer) =
setup_test_peer(false, streams, Some(permits.clone()));
let remote_peer = avdtp::Peer::new(remote);
let mut remote_requests = remote_peer.take_request_stream();
let sbc_endpoint_id = 1_u8.try_into().unwrap();
let sbc_caps = sbc_capabilities();
let mut set_config_fut =
remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Set capabilities should be ready but got {:?}", x),
};
let mut open_fut = remote_peer.open(&sbc_endpoint_id);
match exec.run_until_stalled(&mut open_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Open should be ready but got {:?}", x),
};
let (_remote_transport, transport) = Channel::create();
assert_eq!(Some(()), peer.receive_channel(transport).ok());
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let Some(_deadline) = exec.wake_next_timer() else {
panic!("Expected a timer to be waiting to run");
};
let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
responder
}
x => panic!("Expected a Start request, got {x:?}"),
};
assert!(permits.get().is_none());
let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
match exec.run_singlethreaded(&mut start_fut) {
Ok(()) => {}
x => panic!("Expected OK response from start future but got {x:?}"),
}
let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
responder
}
x => panic!("Expected a suspend got {x:?}"),
};
start_responder.send().unwrap();
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
suspend_responder.send().unwrap();
let media_task = match exec.run_until_stalled(&mut next_task_fut) {
Poll::Ready(Some(task)) => task,
x => panic!("Local task should be created at this point: {:?}", x),
};
assert!(media_task.is_started());
}
#[fuchsia::test]
fn version_check() {
let p1: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(3),
..Default::default()
};
assert_eq!(true, a2dp_version_check(p1));
let p1: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(2),
minor_version: Some(10),
..Default::default()
};
assert_eq!(true, a2dp_version_check(p1));
let p1: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(0),
..Default::default()
};
assert_eq!(false, a2dp_version_check(p1));
let p1: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: None,
minor_version: Some(9),
..Default::default()
};
assert_eq!(false, a2dp_version_check(p1));
let p1: ProfileDescriptor = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(2),
minor_version: Some(2),
..Default::default()
};
assert_eq!(true, a2dp_version_check(p1));
}
}