use anyhow::format_err;
use bt_avdtp::{Error, MediaCodecType, MediaType, ServiceCapability, StreamEndpointId};
use fidl::endpoints::RequestStream;
use fidl_fuchsia_bluetooth_avdtp_test::{
PeerControllerMarker, PeerControllerRequest, PeerControllerRequestStream, PeerError,
PeerManagerControlHandle, PeerManagerRequest, PeerManagerRequestStream,
};
use fuchsia_async as fasync;
use fuchsia_bluetooth::detachable_map::DetachableWeak;
use fuchsia_bluetooth::types::PeerId;
use fuchsia_sync::Mutex;
use futures::{TryFutureExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, info};
use crate::peer::Peer;
const SBC_SEID: u8 = 6;
struct Controller {}
impl Controller {
async fn handle_controller_request(
a2dp: Arc<Peer>,
request: PeerControllerRequest,
endpoint_id: &StreamEndpointId,
) -> Result<(), fidl::Error> {
match request {
PeerControllerRequest::SetConfiguration { responder } => {
let generic_capabilities = vec![ServiceCapability::MediaTransport];
match a2dp
.avdtp()
.set_configuration(endpoint_id, endpoint_id, &generic_capabilities)
.await
{
Ok(resp) => {
info!("SetConfiguration successful: {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
error!("SetConfiguration for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::GetConfiguration { responder } => {
match a2dp.avdtp().get_configuration(endpoint_id).await {
Ok(service_capabilities) => {
info!(
"Service capabilities from GetConfiguration: {:?}",
service_capabilities
);
responder.send(Ok(()))?;
}
Err(e) => {
error!("GetConfiguration for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::GetCapabilities { responder } => {
match a2dp.avdtp().get_capabilities(endpoint_id).await {
Ok(service_capabilities) => {
info!(
"Service capabilities from GetCapabilities {:?}",
service_capabilities
);
responder.send(Ok(()))?;
}
Err(e) => {
error!("GetCapabilities for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::GetAllCapabilities { responder } => {
match a2dp.avdtp().get_all_capabilities(endpoint_id).await {
Ok(service_capabilities) => {
info!(
"Service capabilities from GetAllCapabilities: {:?}",
service_capabilities
);
responder.send(Ok(()))?;
}
Err(e) => {
error!("GetAllCapabilities for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::SuspendStream { responder } => {
let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
match a2dp.stream_suspend(local_id).await {
Ok(_) => {
info!("SuspendStream was successful");
responder.send(Ok(()))?;
}
Err(e) => {
error!("SuspendStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::ReconfigureStream { responder } => {
let generic_capabilities = [ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x15, 2, 250],
}];
match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await {
Ok(resp) => {
info!("ReconfigureStream was successful {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
error!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
match e {
Error::RemoteRejected(e) if e.service_category().is_some() => {}
_ => responder.send(Err(PeerError::ProtocolError))?,
}
}
}
}
PeerControllerRequest::ReleaseStream { responder } => {
match a2dp.avdtp().close(endpoint_id).await {
Ok(resp) => {
info!("ReleaseStream was successful: {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
error!("ReleaseStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::StartStream { responder } => {
match a2dp.avdtp().start(&[endpoint_id.clone()]).await {
Ok(resp) => {
info!("StartStream was successful: {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
error!("StartStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::AbortStream { responder } => {
match a2dp.avdtp().abort(endpoint_id).await {
Ok(resp) => {
info!("AbortStream was successful: {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
info!("AbortStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::EstablishStream { responder } => {
match a2dp.avdtp().open(endpoint_id).await {
Ok(resp) => {
info!("EstablishStream was successful: {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
info!("EstablishStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
PeerControllerRequest::SuspendAndReconfigure { responder } => {
let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
match a2dp.stream_suspend(local_id).await {
Ok(_) => {
info!("SuspendStream was successful");
let generic_capabilities = [ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x15, 2, 250],
}];
match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await
{
Ok(resp) => {
info!("ReconfigureStream was successful {:?}", resp);
responder.send(Ok(()))?;
}
Err(e) => {
info!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
Err(e) => {
info!("SuspendStream for {} failed: {:?}", endpoint_id, e);
responder.send(Err(PeerError::ProtocolError))?;
}
}
}
}
Ok(())
}
async fn process_requests(
mut stream: PeerControllerRequestStream,
peer: DetachableWeak<PeerId, Peer>,
peer_id: PeerId,
) -> Result<(), anyhow::Error> {
while let Some(req) = stream.try_next().await? {
let peer = { peer.upgrade().ok_or_else(|| format_err!("Peer disconnected"))? };
let infos = match peer.collect_capabilities().await {
Ok(endpoints) => endpoints,
Err(e) => {
info!("Error collecting capabilities from peer: {:?}", e);
continue;
}
};
let remote_id = match infos.first() {
Some(stream_info) => stream_info.local_id().clone(),
None => {
info!("Can't execute {:?} - no streams exist on the peer.", req);
continue;
}
};
let fut = Self::handle_controller_request(peer.clone(), req, &remote_id);
if let Err(e) = fut.await {
error!("{} error handling request: {:?}", peer_id, e);
}
}
info!("Controller finished for id: {}", peer_id);
Ok(())
}
}
async fn start_control_service(
mut stream: PeerManagerRequestStream,
controller_pool: Arc<Mutex<ControllerPoolInner>>,
) -> Result<(), anyhow::Error> {
while let Some(req) = stream.try_next().await? {
match req {
PeerManagerRequest::GetPeer { peer_id, handle, .. } => {
let handle_to_client: fidl::endpoints::ServerEnd<PeerControllerMarker> = handle;
let peer_id: PeerId = peer_id.into();
let peer = match controller_pool.lock().get_peer(&peer_id) {
None => {
info!("GetPeer: request for nonexistent peer {}, closing.", peer_id);
continue;
}
Some(peer) => peer.clone(),
};
info!("GetPeer: Creating peer controller for peer with id {}.", peer_id);
let client_stream = handle_to_client.into_stream();
fasync::Task::local(async move {
Controller::process_requests(client_stream, peer, peer_id)
.await
.unwrap_or_else(|e| error!("Requests failed: {:?}", e))
})
.detach();
}
PeerManagerRequest::ConnectedPeers { responder } => {
let connected_peers: Vec<fidl_fuchsia_bluetooth::PeerId> =
controller_pool.lock().connected_peers().into_iter().map(Into::into).collect();
responder.send(&connected_peers)?;
info!("ConnectedPeers request. Peers: {:?}", connected_peers);
}
}
}
Ok(())
}
struct ControllerPoolInner {
peers: HashMap<PeerId, DetachableWeak<PeerId, Peer>>,
control_handle: Option<PeerManagerControlHandle>,
}
impl ControllerPoolInner {
pub fn new() -> Self {
Self { control_handle: None, peers: HashMap::new() }
}
#[cfg(test)]
fn control_handle(&self) -> Option<PeerManagerControlHandle> {
self.control_handle.clone()
}
fn connected_peers(&self) -> Vec<PeerId> {
self.peers.keys().cloned().collect()
}
fn get_peer(&self, id: &PeerId) -> Option<&DetachableWeak<PeerId, Peer>> {
self.peers.get(id)
}
fn set_control_handle(&mut self, control_handle: PeerManagerControlHandle) -> bool {
if self.control_handle.is_none() {
self.control_handle = Some(control_handle);
return true;
}
false
}
fn peer_connected(&mut self, peer: DetachableWeak<PeerId, Peer>) {
let peer_id = peer.key().clone();
drop(self.peers.insert(peer_id, peer));
if let Some(handle) = self.control_handle.as_ref() {
if let Err(e) = handle.send_on_peer_connected(&peer_id.into()) {
info!("Peer connected callback failed: {:?}", e);
}
}
}
}
pub struct ControllerPool {
inner: Arc<Mutex<ControllerPoolInner>>,
}
impl ControllerPool {
pub fn new() -> Self {
Self { inner: Arc::new(Mutex::new(ControllerPoolInner::new())) }
}
#[cfg(test)]
fn control_handle(&self) -> Option<PeerManagerControlHandle> {
self.inner.lock().control_handle()
}
#[cfg(test)]
fn get_peer(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
self.inner.lock().get_peer(id).cloned()
}
pub fn connected(&self, stream: PeerManagerRequestStream) {
if self.inner.lock().set_control_handle(stream.control_handle()) {
let inner = self.inner.clone();
fasync::Task::local(
start_control_service(stream, inner)
.unwrap_or_else(|e| error!("Error handling requests {:?}", e)),
)
.detach()
}
}
pub fn peer_connected(&self, peer: DetachableWeak<PeerId, Peer>) {
self.inner.lock().peer_connected(peer);
}
}
#[cfg(test)]
mod tests {
use super::*;
use bt_avdtp::{EndpointType, ErrorCode, Peer as AvdtpPeer, Request, StreamInformation};
use fidl::endpoints::{create_endpoints, create_proxy_and_stream};
use fidl_fuchsia_bluetooth_avdtp_test::*;
use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
use fuchsia_async as fasync;
use fuchsia_bluetooth::detachable_map::DetachableMap;
use fuchsia_bluetooth::types::Channel;
use futures::StreamExt;
use crate::media_task::tests::TestMediaTaskBuilder;
use crate::stream::tests::make_sbc_endpoint;
use crate::stream::{Stream, Streams};
async fn listen_for_avdtp_requests(remote: Channel) {
let remote_avdtp = AvdtpPeer::new(remote);
let mut remote_requests = remote_avdtp.take_request_stream();
while let Some(request) = remote_requests.next().await {
match request {
Ok(Request::Discover { responder }) => {
let endpoint_id = StreamEndpointId::try_from(1).expect("endpoint id creation");
let information = StreamInformation::new(
endpoint_id,
false,
MediaType::Audio,
EndpointType::Source,
);
responder.send(&[information]).expect("Sending response should have worked");
}
Ok(Request::GetCapabilities { responder, .. })
| Ok(Request::GetAllCapabilities { responder, .. })
| Ok(Request::GetConfiguration { responder, .. }) => {
responder.send(&[]).expect("Sending response should have worked");
}
Ok(Request::SetConfiguration { responder, .. })
| Ok(Request::Reconfigure { responder, .. }) => {
responder.send().expect("Sending response should have worked");
}
Ok(Request::Suspend { responder, .. }) | Ok(Request::Start { responder, .. }) => {
responder.send().expect("Sending response should have worked");
}
Ok(Request::Open { responder, .. })
| Ok(Request::Close { responder, .. })
| Ok(Request::Abort { responder, .. }) => {
responder
.reject(ErrorCode::UnsupportedConfiguration)
.expect("Sending response should have worked");
}
_ => {
panic!("Got an unhandled AVDTP request");
}
}
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_client_connected_to_peer_manager() {
let (pm_proxy, pm_stream) = create_proxy_and_stream::<PeerManagerMarker>();
let controller_pool = ControllerPool::new();
let mut peer_map = DetachableMap::new();
controller_pool.connected(pm_stream);
let fake_peer_id = PeerId(12345);
let (profile_proxy, _requests) = create_proxy_and_stream::<ProfileMarker>();
let (remote, signaling) = Channel::create();
let avdtp_peer = AvdtpPeer::new(signaling);
let mut streams = Streams::default();
let test_builder = TestMediaTaskBuilder::new();
streams.insert(Stream::build(
make_sbc_endpoint(1, EndpointType::Source),
test_builder.builder(),
));
let peer = Peer::create(
fake_peer_id,
avdtp_peer,
streams,
None,
profile_proxy,
bt_metrics::MetricsLogger::default(),
);
let _ = peer_map.insert(fake_peer_id, peer);
let weak_peer = peer_map.get(&fake_peer_id).expect("just inserted");
controller_pool.peer_connected(weak_peer);
assert!(controller_pool.control_handle().is_some());
assert!(controller_pool.get_peer(&fake_peer_id).is_some());
let (client, server) = create_endpoints::<PeerControllerMarker>();
let client_proxy = client.into_proxy();
let res = pm_proxy.get_peer(&fake_peer_id.into(), server);
assert_eq!(Ok(()), res.map_err(|e| e.to_string()));
fasync::Task::spawn(listen_for_avdtp_requests(remote)).detach();
let res = client_proxy.set_configuration().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.get_configuration().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.get_capabilities().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.get_all_capabilities().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.suspend_stream().await;
assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
let res = client_proxy.reconfigure_stream().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.suspend_and_reconfigure().await;
assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
let res = client_proxy.start_stream().await;
assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
let res = client_proxy.release_stream().await.expect("Command should succeed");
assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
let res = client_proxy.establish_stream().await.expect("Command should succeed");
assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
let res = client_proxy.abort_stream().await.expect("Command should succeed");
assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
}
}