use anyhow::{format_err, Error};
use fidl_fuchsia_bluetooth::ChannelParameters;
use fidl_fuchsia_bluetooth_bredr::{self as bredr, ProfileDescriptor, ProfileProxy};
use fuchsia_bluetooth::detachable_map::{DetachableMap, DetachableWeak};
use fuchsia_bluetooth::inspect::DebugExt;
use fuchsia_bluetooth::types::{Channel, PeerId};
use fuchsia_inspect::{self as inspect, NumericProperty, Property};
use fuchsia_inspect_derive::{AttachError, Inspect};
use fuchsia_sync::Mutex;
use futures::channel::{mpsc, oneshot};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures::{Future, FutureExt, TryFutureExt};
use log::{info, warn};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use {bt_avdtp as avdtp, fuchsia_async as fasync};
use crate::codec::CodecNegotiation;
use crate::peer::Peer;
use crate::permits::Permits;
use crate::stream::StreamsBuilder;
struct PeerStats {
id: PeerId,
inspect_node: inspect::Node,
connection_count: inspect::UintProperty,
}
impl PeerStats {
fn new(id: PeerId) -> Self {
Self { id, inspect_node: Default::default(), connection_count: Default::default() }
}
fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
self.inspect_node.record_string("descriptor", descriptor.debug());
}
fn record_connected(&mut self) {
let _ = self.connection_count.add(1);
}
}
impl Inspect for &mut PeerStats {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect_node = parent.create_child(name.as_ref());
self.inspect_node.record_string("id", self.id.to_string());
self.connection_count = self.inspect_node.create_uint("connection_count", 0);
Ok(())
}
}
#[derive(Default)]
struct DiscoveredPeers {
descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
stats: HashMap<PeerId, PeerStats>,
inspect_node: inspect::Node,
}
impl DiscoveredPeers {
fn insert(
&mut self,
id: PeerId,
descriptor: ProfileDescriptor,
directions: HashSet<avdtp::EndpointType>,
) {
self.stats
.entry(id)
.or_insert_with(|| {
let mut new_stats = PeerStats::new(id);
let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
new_stats
})
.set_descriptor(&descriptor);
match self.descriptors.entry(id) {
Entry::Occupied(mut entry) => {
entry.get_mut().0 = descriptor;
entry.get_mut().1.extend(&directions);
}
Entry::Vacant(entry) => {
let _ = entry.insert((descriptor, directions));
}
};
}
fn connected(&mut self, id: PeerId) {
if let Some(stats) = self.stats.get_mut(&id) {
stats.record_connected();
}
}
fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
}
}
impl Inspect for &mut DiscoveredPeers {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect_node = parent.create_child(name.as_ref());
Ok(())
}
}
fn find_preferred_direction(
directions: &HashSet<avdtp::EndpointType>,
) -> Option<avdtp::EndpointType> {
if directions.len() == 1 {
directions.iter().next().cloned()
} else {
None
}
}
async fn connect_peer(
proxy: ProfileProxy,
id: PeerId,
channel_params: ChannelParameters,
) -> Result<Channel, Error> {
info!(id:%; "Connecting to peer");
let connect_fut = proxy.connect(
&id.into(),
&bredr::ConnectParameters::L2cap(bredr::L2capParameters {
psm: Some(bredr::PSM_AVDTP),
parameters: Some(channel_params),
..Default::default()
}),
);
let channel = match connect_fut.await {
Err(e) => {
warn!(id:%, e:?; "FIDL error on connect");
return Err(e.into());
}
Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
Ok(Ok(channel)) => channel,
};
let channel = channel
.try_into()
.map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
Ok(channel)
}
pub struct ConnectedPeers {
connected: DetachableMap<PeerId, Peer>,
connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
discovered: Mutex<DiscoveredPeers>,
streams_builder: StreamsBuilder,
permits: Permits,
profile: ProfileProxy,
metrics: bt_metrics::MetricsLogger,
inspect: inspect::Node,
inspect_peer_direction: inspect::StringProperty,
connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
preferred_peer_direction: Mutex<avdtp::EndpointType>,
}
impl ConnectedPeers {
pub fn new(
streams_builder: StreamsBuilder,
permits: Permits,
profile: ProfileProxy,
metrics: bt_metrics::MetricsLogger,
) -> Self {
Self {
connected: DetachableMap::new(),
connection_attempts: Mutex::new(HashMap::new()),
discovered: Default::default(),
streams_builder,
profile,
permits,
inspect: inspect::Node::default(),
inspect_peer_direction: inspect::StringProperty::default(),
metrics,
connected_peer_senders: Default::default(),
start_stream_tasks: Default::default(),
preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
}
}
pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
self.connected.get(id)
}
pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
self.get_weak(id).and_then(|p| p.upgrade())
}
pub fn is_connected(&self, id: &PeerId) -> bool {
self.connected.contains_key(id)
}
async fn start_streaming(
peer: &DetachableWeak<PeerId, Peer>,
negotiation: CodecNegotiation,
) -> Result<(), anyhow::Error> {
let remote_streams = {
let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
if strong.streaming_active() {
return Ok(());
}
strong.collect_capabilities()
}
.await?;
let (negotiated, remote_seid) = negotiation
.select(&remote_streams)
.ok_or_else(|| format_err!("No compatible stream found"))?;
let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
if strong.streaming_active() {
let peer_id = peer.key();
info!(peer_id:%; "Not starting streaming, it's already started");
return Ok(());
}
strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
}
pub fn found(
&self,
id: PeerId,
desc: ProfileDescriptor,
preferred_directions: HashSet<avdtp::EndpointType>,
) {
self.discovered.lock().insert(id, desc.clone(), preferred_directions);
if let Some(peer) = self.get(&id) {
let _ = peer.set_descriptor(desc);
}
}
pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
*self.preferred_peer_direction.lock() = direction;
self.inspect_peer_direction.set(&format!("{direction:?}"));
}
pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
*self.preferred_peer_direction.lock()
}
pub fn try_connect(
&self,
id: PeerId,
channel_params: ChannelParameters,
) -> impl Future<Output = Result<Option<Channel>, Error>> {
let proxy = self.profile.clone();
let connected = self.is_connected(&id);
let (sender, recv) = oneshot::channel();
let recv =
recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
if connected {
if let Err(e) = sender.send(Ok(None)) {
warn!(id:%, e:?; "Failed to notify already-connected");
}
return recv;
}
let mut attempts = self.connection_attempts.lock();
if let Some(previous_connect_task) = attempts.remove(&id) {
if previous_connect_task.now_or_never().is_none() {
warn!(id:%; "Cancelling previous connect attempt");
}
}
let connect_task = fasync::Task::spawn(async move {
if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
warn!(id:%, e:?; "Failed to send channel connect result");
}
});
let _ = attempts.insert(id, connect_task);
recv
}
pub async fn connected(
&self,
id: PeerId,
channel: Channel,
initiator_delay: Option<zx::MonotonicDuration>,
) -> Result<DetachableWeak<PeerId, Peer>, Error> {
if let Some(weak) = self.get_weak(&id) {
let peer =
weak.upgrade().ok_or_else(|| format_err!("Disconnected connecting transport"))?;
if let Err(e) = peer.receive_channel(channel) {
warn!(id:%, e:%; "failed to connect channel");
return Err(e.into());
}
return Ok(weak);
}
let entry = self.connected.lazy_entry(&id);
info!(id:%; "peer connected");
let avdtp_peer = avdtp::Peer::new(channel);
let mut peer = Peer::create(
id,
avdtp_peer,
self.streams_builder.peer_streams(&id, None).await?,
Some(self.permits.clone()),
self.profile.clone(),
self.metrics.clone(),
);
self.discovered.lock().connected(id);
let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
let _ = peer.set_descriptor(desc);
dir
} else {
None
};
if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
warn!(id:%, e:?; "Couldn't attach inspect");
}
let closed_fut = peer.closed();
let peer = match entry.try_insert(peer) {
Err(_peer) => {
warn!(id:%; "Peer connected while we were setting up");
return self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"));
}
Ok(weak_peer) => weak_peer,
};
if let Some(delay) = initiator_delay {
let peer = peer.clone();
let peer_id = peer.key().clone();
let negotiation = self
.streams_builder
.negotiation(
&id,
None,
peer_preferred_direction.unwrap_or_else(|| self.preferred_peer_direction()),
)
.await?;
let start_stream_task = fuchsia_async::Task::local(async move {
let delay_sec = delay.into_millis() as f64 / 1000.0;
info!(id:% = peer.key(); "dwelling {delay_sec}s for peer initiation");
fasync::Timer::new(fasync::MonotonicInstant::after(delay)).await;
if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
info!(id:% = peer.key(), e:?; "Peer start streaming failed");
peer.detach();
}
});
if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
info!(peer_id:%; "Replacing previous start stream dwell");
}
}
fasync::Task::local(async move {
closed_fut.await;
peer.detach();
})
.detach();
let peer = self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"))?;
self.notify_connected(&peer);
Ok(peer)
}
fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
let mut senders = self.connected_peer_senders.lock();
senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
}
pub fn connected_stream(&self) -> PeerConnections {
let (sender, receiver) = mpsc::channel(0);
self.connected_peer_senders.lock().push(sender);
PeerConnections { stream: receiver }
}
}
impl Inspect for &mut ConnectedPeers {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect = parent.create_child(name.as_ref());
let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
self.inspect_peer_direction =
self.inspect.create_string("preferred_peer_direction", peer_dir_str);
self.streams_builder.iattach(&self.inspect, "streams_builder")?;
self.discovered.lock().iattach(&self.inspect, "discovered")
}
}
pub struct PeerConnections {
stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
}
impl Stream for PeerConnections {
type Item = DetachableWeak<PeerId, Peer>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use bt_avdtp::{Request, ServiceCapability};
use diagnostics_assertions::assert_data_tree;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_bluetooth_bredr::{
AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
};
use futures::future::BoxFuture;
use std::pin::pin;
use crate::codec::MediaCodecConfig;
use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
use crate::media_types::*;
fn run_to_stalled(exec: &mut fasync::TestExecutor) {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
let remote_avdtp = avdtp::Peer::new(remote);
let mut remote_requests = remote_avdtp.take_request_stream();
let avdtp = peer.avdtp();
let discover_fut = avdtp.discover();
let mut discover_fut = pin!(discover_fut);
assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
x => panic!("Expected a Ready Discovery request but got {:?}", x),
};
let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
let information = avdtp::StreamInformation::new(
endpoint_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
);
responder.send(&[information]).expect("Sending response should have worked");
let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
Poll::Ready(Ok(infos)) => infos,
x => panic!("Expected a Ready response but got {:?}", x),
};
}
fn setup_connected_peer_test(
) -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
let exec = fasync::TestExecutor::new();
let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
let id = PeerId(1);
let peers = ConnectedPeers::new(
StreamsBuilder::default(),
Permits::new(1),
proxy,
bt_metrics::MetricsLogger::default(),
);
(exec, id, peers, stream)
}
#[fuchsia::test]
fn connect_creates_peer() {
let (mut exec, id, peers, _stream) = setup_connected_peer_test();
let (remote, channel) = Channel::create();
let peer = exec
.run_singlethreaded(peers.connected(id, channel, None))
.expect("peer should connect");
let peer = peer.upgrade().expect("peer should be connected");
exercise_avdtp(&mut exec, remote, &peer);
}
#[fuchsia::test]
fn connect_notifies_streams() {
let (mut exec, id, peers, _stream) = setup_connected_peer_test();
let (remote, channel) = Channel::create();
let mut peer_stream = peers.connected_stream();
let mut peer_stream_two = peers.connected_stream();
let peer = exec
.run_singlethreaded(peers.connected(id, channel, None))
.expect("peer should connect");
let peer = peer.upgrade().expect("peer should be connected");
let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
assert_eq!(weak.key(), &id);
let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
assert_eq!(weak.key(), &id);
exercise_avdtp(&mut exec, remote, &peer);
drop(peer_stream);
let id2 = PeerId(2);
let (remote2, channel2) = Channel::create();
let peer2 = exec
.run_singlethreaded(peers.connected(id2, channel2, None))
.expect("peer should connect");
let peer2 = peer2.upgrade().expect("peer two should be connected");
let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
assert_eq!(weak.key(), &id2);
exercise_avdtp(&mut exec, remote2, &peer2);
}
#[fuchsia::test]
fn find_preferred_direction_returns_correct_endpoints() {
let empty = HashSet::new();
assert_eq!(find_preferred_direction(&empty), None);
let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));
let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));
let both = HashSet::from_iter(
vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
);
assert_eq!(find_preferred_direction(&both), None);
}
const AAC_SEID: u8 = 8;
const SBC_SINK_SEID: u8 = 9;
const SBC_SOURCE_SEID: u8 = 10;
fn aac_sink_codec() -> avdtp::ServiceCapability {
AacCodecInfo::new(
AacObjectType::MANDATORY_SNK,
AacSamplingFrequency::MANDATORY_SNK,
AacChannels::MANDATORY_SNK,
true,
0, )
.unwrap()
.into()
}
fn sbc_sink_codec() -> avdtp::ServiceCapability {
SbcCodecInfo::new(
SbcSamplingFrequency::MANDATORY_SNK,
SbcChannelMode::MANDATORY_SNK,
SbcBlockCount::MANDATORY_SNK,
SbcSubBands::MANDATORY_SNK,
SbcAllocation::MANDATORY_SNK,
SbcCodecInfo::BITPOOL_MIN,
SbcCodecInfo::BITPOOL_MAX,
)
.unwrap()
.into()
}
fn sbc_source_codec() -> avdtp::ServiceCapability {
SbcCodecInfo::new(
SbcSamplingFrequency::FREQ48000HZ,
SbcChannelMode::JOINT_STEREO,
SbcBlockCount::MANDATORY_SRC,
SbcSubBands::MANDATORY_SRC,
SbcAllocation::MANDATORY_SRC,
SbcCodecInfo::BITPOOL_MIN,
SbcCodecInfo::BITPOOL_MAX,
)
.unwrap()
.into()
}
#[derive(Clone)]
struct FakeBuilder {
capability: avdtp::ServiceCapability,
direction: avdtp::EndpointType,
}
impl MediaTaskBuilder for FakeBuilder {
fn configure(
&self,
_peer_id: &PeerId,
codec_config: &MediaCodecConfig,
) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
if self.capability.codec_type() == Some(codec_config.codec_type()) {
return Ok(Box::new(FakeRunner {}));
}
Err(MediaTaskError::Other(String::from("Unsupported configuring")))
}
fn direction(&self) -> bt_avdtp::EndpointType {
self.direction
}
fn supported_configs(
&self,
_peer_id: &PeerId,
_offload: Option<AudioOffloadExtProxy>,
) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
}
}
struct FakeRunner {}
impl MediaTaskRunner for FakeRunner {
fn start(
&mut self,
_stream: avdtp::MediaStream,
_offload: Option<AudioOffloadExtProxy>,
) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
Err(MediaTaskError::Other(String::from("unimplemented starting")))
}
}
fn setup_negotiation_test() -> (
fasync::TestExecutor,
ConnectedPeers,
ProfileRequestStream,
ServiceCapability,
ServiceCapability,
) {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000));
let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
let aac_sink_codec = aac_sink_codec();
let sbc_sink_codec = sbc_sink_codec();
let aac_sink_builder = FakeBuilder {
capability: aac_sink_codec.clone(),
direction: avdtp::EndpointType::Sink,
};
let sbc_sink_builder = FakeBuilder {
capability: sbc_sink_codec.clone(),
direction: avdtp::EndpointType::Sink,
};
let sbc_source_builder =
FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };
let mut streams_builder = StreamsBuilder::default();
streams_builder.add_builder(aac_sink_builder);
streams_builder.add_builder(sbc_sink_builder);
streams_builder.add_builder(sbc_source_builder);
let peers = ConnectedPeers::new(
streams_builder,
Permits::new(1),
proxy,
bt_metrics::MetricsLogger::default(),
);
(exec, peers, stream, sbc_sink_codec, aac_sink_codec)
}
#[fuchsia::test]
fn streaming_start_with_streaming_peer_is_noop() {
let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
let id = PeerId(1);
let (remote, channel) = Channel::create();
let remote = avdtp::Peer::new(remote);
let delay = zx::MonotonicDuration::from_seconds(1);
let mut remote_requests = remote.take_request_stream();
let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Expected set config to be ready and Ok, got {:?}", x),
};
exec.set_fake_time(
fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
);
let _ = exec.wake_expired_timers();
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
}
fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
let info = avdtp::StreamInformation::new(
remote_sbc_seid.clone(),
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
);
(remote_sbc_seid, info)
}
fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
let info = avdtp::StreamInformation::new(
remote_aac_seid.clone(),
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
);
(remote_aac_seid, info)
}
fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
let info = avdtp::StreamInformation::new(
remote_sbc_seid.clone(),
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Sink,
);
(remote_sbc_seid, info)
}
fn expect_peer_discovery(
exec: &mut fasync::TestExecutor,
requests: &mut avdtp::RequestStream,
response: Vec<avdtp::StreamInformation>,
) {
match exec.run_until_stalled(&mut requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
responder.send(&response).expect("response succeeds");
}
x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
};
}
#[fuchsia::test]
fn streaming_start_configure_while_discovery() {
let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
let id = PeerId(1);
let (remote, channel) = Channel::create();
let remote = avdtp::Peer::new(remote);
let delay = zx::MonotonicDuration::from_seconds(1);
let mut remote_requests = remote.take_request_stream();
let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
exec.set_fake_time(
fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
);
let _ = exec.wake_expired_timers();
expect_peer_discovery(
&mut exec,
&mut remote_requests,
vec![sbc_source_endpoint().1, aac_source_endpoint().1],
);
let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
let mut set_config_fut = pin!(set_config_fut);
match exec.run_until_stalled(&mut set_config_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Expected set config to be ready and Ok, got {:?}", x),
};
loop {
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
responder
.send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
.expect("respond succeeds");
}
Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
Poll::Pending => break,
}
}
}
#[fuchsia::test]
fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
let id = PeerId(1);
let (remote, channel) = Channel::create();
peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
let remote = avdtp::Peer::new(remote);
let desc = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(2),
..Default::default()
};
let preferred_direction = vec![avdtp::EndpointType::Sink];
let delay = zx::MonotonicDuration::from_seconds(1);
peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));
let connected_fut = peers.connected(id, channel, Some(delay));
let mut connected_fut = std::pin::pin!(connected_fut);
let _ = exec
.run_until_stalled(&mut connected_fut)
.expect("is ready")
.expect("connect control channel is ok");
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let mut remote_requests = remote.take_request_stream();
assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
exec.set_fake_time(fasync::MonotonicInstant::after(
delay + zx::MonotonicDuration::from_micros(1),
));
let _ = exec.wake_expired_timers();
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
expect_peer_discovery(
&mut exec,
&mut remote_requests,
vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
);
for _twice in 1..=2 {
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
let codec = match stream_id {
id if id == peer_sbc_source_seid => sbc_codec.clone(),
id if id == peer_sbc_sink_seid => sbc_codec.clone(),
x => panic!("Got unexpected get_capabilities seid {:?}", x),
};
responder
.send(&[avdtp::ServiceCapability::MediaTransport, codec])
.expect("respond succeeds");
}
x => panic!("Expected a ready get capabilities request, got {:?}", x),
};
}
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
local_stream_id,
remote_stream_id,
capabilities: _,
responder,
}))) => {
assert_eq!(peer_sbc_sink_seid, local_stream_id);
let local_sbc_source_seid: avdtp::StreamEndpointId =
SBC_SOURCE_SEID.try_into().unwrap();
assert_eq!(local_sbc_source_seid, remote_stream_id);
responder.send().expect("response sends");
}
x => panic!("Expected a ready set configuration request, got {:?}", x),
};
}
#[fuchsia::test]
fn connect_initiation_uses_biased_codec_negotiation_by_system() {
let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
let id = PeerId(1);
let (remote, channel) = Channel::create();
peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
let remote = avdtp::Peer::new(remote);
let desc = ProfileDescriptor {
profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
major_version: Some(1),
minor_version: Some(2),
..Default::default()
};
peers.found(
id,
desc.clone(),
HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
);
peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));
let delay = zx::MonotonicDuration::from_seconds(1);
let connect_fut = peers.connected(id, channel, Some(delay));
let mut connect_fut = std::pin::pin!(connect_fut);
let _ = exec
.run_until_stalled(&mut connect_fut)
.expect("ready")
.expect("connect control channel is ok");
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let mut remote_requests = remote.take_request_stream();
assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
exec.set_fake_time(fasync::MonotonicInstant::after(
delay + zx::MonotonicDuration::from_micros(1),
));
let _ = exec.wake_expired_timers();
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
expect_peer_discovery(
&mut exec,
&mut remote_requests,
vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
);
for _twice in 1..=2 {
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
let codec = match stream_id {
id if id == peer_sbc_source_seid => sbc_codec.clone(),
id if id == peer_sbc_sink_seid => sbc_codec.clone(),
x => panic!("Got unexpected get_capabilities seid {:?}", x),
};
responder
.send(&[avdtp::ServiceCapability::MediaTransport, codec])
.expect("respond succeeds");
}
x => panic!("Expected a ready get capabilities request, got {:?}", x),
};
}
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
local_stream_id,
remote_stream_id,
capabilities: _,
responder,
}))) => {
assert_eq!(peer_sbc_source_seid, local_stream_id);
let local_sbc_sink_seid: avdtp::StreamEndpointId =
SBC_SINK_SEID.try_into().unwrap();
assert_eq!(local_sbc_sink_seid, remote_stream_id);
responder.send().expect("response sends");
}
x => panic!("Expected a ready set configuration request, got {:?}", x),
};
}
#[fuchsia::test]
fn connect_initiation_uses_negotiation() {
let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
let id = PeerId(1);
let (remote, channel) = Channel::create();
let remote = avdtp::Peer::new(remote);
let delay = zx::MonotonicDuration::from_seconds(1);
let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
let _ = exec
.run_until_stalled(&mut connect_fut)
.expect("ready")
.expect("connect control channel is ok");
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let mut remote_requests = remote.take_request_stream();
assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
exec.set_fake_time(fasync::MonotonicInstant::after(
delay + zx::MonotonicDuration::from_micros(1),
));
let _ = exec.wake_expired_timers();
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
expect_peer_discovery(
&mut exec,
&mut remote_requests,
vec![peer_sbc_endpoint, peer_aac_endpoint],
);
for _twice in 1..=2 {
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
let codec = match stream_id {
id if id == peer_sbc_seid => sbc_codec.clone(),
id if id == peer_aac_seid => aac_codec.clone(),
x => panic!("Got unexpected get_capabilities seid {:?}", x),
};
responder
.send(&[avdtp::ServiceCapability::MediaTransport, codec])
.expect("respond succeeds");
}
x => panic!("Expected a ready get capabilities request, got {:?}", x),
};
}
match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
local_stream_id,
remote_stream_id,
capabilities: _,
responder,
}))) => {
assert_eq!(peer_aac_seid, local_stream_id);
let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
assert_eq!(local_aac_seid, remote_stream_id);
responder.send().expect("response sends");
}
x => panic!("Expected a ready set configuration request, got {:?}", x),
};
}
#[fuchsia::test]
fn connected_peers_inspect() {
let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();
let inspect = inspect::Inspector::default();
peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");
assert_data_tree!(inspect, root: {
peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});
peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
assert_data_tree!(inspect, root: {
peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});
let (_remote, channel) = Channel::create();
assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
assert_data_tree!(inspect, root: {
peers: {
discovered: contains {},
preferred_peer_direction: "Source",
streams_builder: contains {},
peer_0: { id: "0000000000000001", local_streams: contains {} }
}
});
}
#[fuchsia::test]
fn try_connect_cancels_previous_attempt() {
let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();
let mut connect_fut = peers.try_connect(id, ChannelParameters::default());
let responder = match exec.run_singlethreaded(profile_stream.next()) {
Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
x => panic!("Expected Profile connect, got {x:?}"),
};
let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
x => panic!("Expected Profile connect, got {x:?}"),
};
let first_result = exec.run_singlethreaded(&mut connect_fut);
let _ = first_result.expect_err("Should have an error from first attempt");
responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();
exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");
let (_remote, local) = Channel::create();
responder_two.send(Ok(local.try_into().unwrap())).unwrap();
let second_result = exec.run_singlethreaded(&mut connect_again_fut);
let _ = second_result.expect("should receive the channel");
}
#[fuchsia::test]
fn connected_peers_peer_disconnect_removes_peer() {
let (mut exec, id, peers, _stream) = setup_connected_peer_test();
let (remote, channel) = Channel::create();
assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
run_to_stalled(&mut exec);
drop(remote);
run_to_stalled(&mut exec);
assert!(peers.get(&id).is_none());
}
#[fuchsia::test]
fn connected_peers_reconnect_works() {
let (mut exec, id, peers, _stream) = setup_connected_peer_test();
let (remote, channel) = Channel::create();
assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
run_to_stalled(&mut exec);
drop(remote);
run_to_stalled(&mut exec);
assert!(peers.get(&id).is_none());
let (_remote, channel) = Channel::create();
assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
run_to_stalled(&mut exec);
assert!(peers.get(&id).is_some());
}
}