use anyhow::{format_err, Error};
use bt_rfcomm::profile::server_channel_from_protocol;
use bt_rfcomm::ServerChannel;
use derivative::Derivative;
use fuchsia_bluetooth::profile::ProtocolDescriptor;
use fuchsia_bluetooth::types::{Channel, PeerId, Uuid};
use fuchsia_sync::Mutex;
use futures::channel::mpsc;
use futures::{select, StreamExt};
use log::{info, warn};
use profile_client::{ProfileClient, ProfileEvent};
use std::cell::Cell;
use std::collections::HashMap;
use std::sync::Arc;
use {
fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
fidl_fuchsia_bluetooth_rfcomm_test as rfcomm, fuchsia_async as fasync,
};
const USER_DATA_BUFFER_SIZE: usize = 50;
fn spp_service_definition() -> bredr::ServiceDefinition {
bredr::ServiceDefinition {
service_class_uuids: Some(vec![Uuid::new16(
bredr::ServiceClassProfileIdentifier::SerialPort.into_primitive(),
)
.into()]),
protocol_descriptor_list: Some(vec![
bredr::ProtocolDescriptor {
protocol: Some(bredr::ProtocolIdentifier::L2Cap),
params: Some(vec![]),
..Default::default()
},
bredr::ProtocolDescriptor {
protocol: Some(bredr::ProtocolIdentifier::Rfcomm),
params: Some(vec![]),
..Default::default()
},
]),
profile_descriptors: Some(vec![bredr::ProfileDescriptor {
profile_id: Some(bredr::ServiceClassProfileIdentifier::SerialPort),
major_version: Some(1),
minor_version: Some(2),
..Default::default()
}]),
..Default::default()
}
}
#[derive(Debug)]
pub struct RfcommSession {
_id: PeerId,
active_channels: HashMap<ServerChannel, mpsc::Sender<Vec<u8>>>,
}
impl RfcommSession {
fn new(id: PeerId) -> Self {
Self { _id: id, active_channels: HashMap::new() }
}
fn is_active(&self, server_channel: &ServerChannel) -> bool {
self.active_channels.get(server_channel).is_some_and(|s| !s.is_closed())
}
fn close_rfcomm_channel(&mut self, server_channel: &ServerChannel) -> bool {
self.active_channels.remove(server_channel).is_some()
}
fn new_rfcomm_channel(&mut self, server_channel: ServerChannel, channel: Channel) {
if self.is_active(&server_channel) {
info!("Overwriting existing RFCOMM channel: {:?}", server_channel);
}
let (sender, receiver) = mpsc::channel(USER_DATA_BUFFER_SIZE);
fasync::Task::spawn(Self::rfcomm_channel_task(server_channel, channel, receiver)).detach();
let _ = self.active_channels.insert(server_channel, sender);
}
async fn rfcomm_channel_task(
server_channel: ServerChannel,
mut channel: Channel,
mut write_requests: mpsc::Receiver<Vec<u8>>,
) {
info!("Starting processing task for RFCOMM channel: {:?}", server_channel);
loop {
select! {
bytes_from_peer = channel.next() => {
let user_data = match bytes_from_peer {
Some(Ok(bytes)) => bytes,
Some(Err(e)) => {
info!("Error receiving data: {:?}", e);
continue;
}
None => {
info!("Peer closed RFCOMM channel {:?}", server_channel);
break;
}
};
info!("{:?}: Received user data from peer: {:?}", server_channel, user_data);
}
bytes_to_peer = write_requests.next() => {
match bytes_to_peer {
Some(bytes) => {
match channel.write(&bytes) {
Ok(_) => info!("Sent user data over RFCOMM channel ({:?}).", server_channel),
Err(e) => info!("Couldn't send user data for channel ({:?}): {:?}", server_channel, e),
}
}
None => break, }
}
complete => break,
}
}
info!("RFCOMM channel ({:?}) task ended", server_channel);
}
fn send_user_data(
&mut self,
server_channel: ServerChannel,
user_data: Vec<u8>,
) -> Result<(), Error> {
if let Some(sender) = self.active_channels.get_mut(&server_channel) {
sender.try_send(user_data).map_err(|e| format_err!("{:?}", e))
} else {
Err(format_err!("No registered server channel"))
}
}
}
#[derive(Derivative, Default)]
#[derivative(Debug)]
pub struct RfcommState {
#[derivative(Debug = "ignore")]
service: Option<fasync::Task<()>>,
active_sessions: HashMap<PeerId, RfcommSession>,
}
impl RfcommState {
fn new() -> Self {
Self { service: None, active_sessions: HashMap::new() }
}
fn get_active_session(&mut self, id: &PeerId) -> Option<&mut RfcommSession> {
match self.active_sessions.get_mut(id) {
None => {
info!("No active RFCOMM session with peer {}", id);
None
}
session => session,
}
}
fn clear_services(&mut self) {
if let Some(old_task) = self.service.take() {
info!("Clearing SPP service advertisement/search");
let _ = old_task.cancel();
}
self.active_sessions.clear();
}
fn new_rfcomm_channel(&mut self, id: PeerId, server_channel: ServerChannel, channel: Channel) {
let _ = self
.active_sessions
.entry(id)
.or_insert_with(|| RfcommSession::new(id))
.new_rfcomm_channel(server_channel, channel);
}
}
#[derive(Derivative, Default)]
#[derivative(Debug)]
pub struct RfcommManager {
#[derivative(Debug = "ignore")]
profile: Cell<Option<bredr::ProfileProxy>>,
#[derivative(Debug = "ignore")]
rfcomm: Cell<Option<rfcomm::RfcommTestProxy>>,
inner: Arc<Mutex<RfcommState>>,
}
impl Clone for RfcommManager {
fn clone(&self) -> Self {
let profile = self.profile.take();
if let Some(p) = profile.as_ref() {
self.profile.set(Some(p.clone()));
}
let rfcomm = self.rfcomm.take();
if let Some(rf) = rfcomm.as_ref() {
self.rfcomm.set(Some(rf.clone()));
}
Self { profile: Cell::new(profile), rfcomm: Cell::new(rfcomm), inner: self.inner.clone() }
}
}
impl RfcommManager {
pub fn new() -> Result<Self, Error> {
Ok(Self::default())
}
pub fn from_proxy(profile: bredr::ProfileProxy, rfcomm: rfcomm::RfcommTestProxy) -> Self {
Self {
profile: Cell::new(Some(profile)),
rfcomm: Cell::new(Some(rfcomm)),
inner: Arc::new(Mutex::new(RfcommState::new())),
}
}
pub fn clear_services(&self) {
self.inner.lock().clear_services();
}
fn get_profile_proxy(&self) -> Result<bredr::ProfileProxy, Error> {
let proxy = match self.profile.take() {
Some(proxy) => proxy,
None => fuchsia_component::client::connect_to_protocol::<bredr::ProfileMarker>()?,
};
self.profile.set(Some(proxy.clone()));
Ok(proxy)
}
fn get_rfcomm_test_proxy(&self) -> Result<rfcomm::RfcommTestProxy, Error> {
let proxy = match self.rfcomm.take() {
Some(proxy) => proxy,
None => fuchsia_component::client::connect_to_protocol::<rfcomm::RfcommTestMarker>()?,
};
self.rfcomm.set(Some(proxy.clone()));
Ok(proxy)
}
pub fn advertise(&self) -> Result<(), Error> {
self.clear_services();
let profile_proxy = self.get_profile_proxy()?;
let inner_clone = self.inner.clone();
let mut inner = self.inner.lock();
let spp_service = vec![spp_service_definition()];
let mut client = ProfileClient::advertise(
profile_proxy,
spp_service,
fidl_bt::ChannelParameters::default(),
)?;
let _ = client.add_search(bredr::ServiceClassProfileIdentifier::SerialPort, None)?;
let service_task = fasync::Task::spawn(async move {
let result = Self::handle_profile_events(client, inner_clone).await;
info!("Profile event handler ended: {:?}", result);
});
inner.service = Some(service_task);
info!("Advertising and searching for SPP services");
Ok(())
}
async fn handle_profile_events(
mut client: ProfileClient,
state: Arc<Mutex<RfcommState>>,
) -> Result<(), Error> {
while let Some(request) = client.next().await {
match request {
Ok(ProfileEvent::PeerConnected { id, protocol, channel, .. }) => {
let protocol = protocol
.iter()
.map(|p| ProtocolDescriptor::try_from(p))
.collect::<Result<Vec<_>, _>>()?;
let server_channel = server_channel_from_protocol(&protocol)
.ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
state.lock().new_rfcomm_channel(id, server_channel, channel);
info!("Peer {} established RFCOMM Channel ({:?}) ", id, server_channel);
}
Ok(ProfileEvent::SearchResult { id, protocol, .. }) => {
let protocol = protocol
.expect("Protocol should exist")
.iter()
.map(|p| ProtocolDescriptor::try_from(p))
.collect::<Result<Vec<_>, _>>()?;
let server_channel = server_channel_from_protocol(&protocol)
.ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
info!("Found SPP service for {} with server channel: {:?}", id, server_channel);
}
Err(e) => warn!("Error in ProfileClient results: {:?}", e),
}
}
Ok(())
}
pub fn close_session(&self, id: PeerId) -> Result<(), Error> {
let _ = self
.get_rfcomm_test_proxy()?
.disconnect(&id.into())
.map_err::<fidl::Error, _>(Into::into)?;
let mut inner = self.inner.lock();
if let Some(session) = inner.active_sessions.remove(&id) {
drop(session);
}
Ok(())
}
pub fn close_rfcomm_channel(
&self,
id: PeerId,
server_channel: ServerChannel,
) -> Result<(), Error> {
let mut inner = self.inner.lock();
if let Some(session) = inner.get_active_session(&id) {
let _ = session.close_rfcomm_channel(&server_channel);
Ok(())
} else {
Err(format_err!("No RFCOMM session with peer: {:?}", id))
}
}
pub async fn outgoing_rfcomm_channel(
&self,
id: PeerId,
server_channel: ServerChannel,
) -> Result<(), Error> {
let channel = self
.get_profile_proxy()?
.connect(
&id.into(),
&bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
channel: Some(server_channel.into()),
..Default::default()
}),
)
.await?
.map_err(|e| format_err!("{:?}", e))?;
let channel = Channel::try_from(channel).expect("valid channel");
self.inner.lock().new_rfcomm_channel(id, server_channel, channel);
Ok(())
}
pub fn send_rls(&self, id: PeerId, server_channel: ServerChannel) -> Result<(), Error> {
let rfcomm_test_proxy = self.get_rfcomm_test_proxy()?;
let mut inner = self.inner.lock();
if inner.get_active_session(&id).is_some() {
let status = rfcomm::Status::FramingError;
let _ = rfcomm_test_proxy
.remote_line_status(&id.into(), server_channel.into(), status)
.map_err::<fidl::Error, _>(Into::into)?;
Ok(())
} else {
Err(format_err!("No RFCOMM session with peer: {:?}", id))
}
}
pub fn send_user_data(
&self,
id: PeerId,
server_channel: ServerChannel,
data: Vec<u8>,
) -> Result<(), Error> {
let mut inner = self.inner.lock();
if let Some(session) = inner.get_active_session(&id) {
session.send_user_data(server_channel, data)
} else {
Err(format_err!("No RFCOMM session with peer: {:?}", id))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_utils::PollExt;
use bt_rfcomm::profile::build_rfcomm_protocol;
use fidl::endpoints::Proxy;
use fidl_fuchsia_bluetooth::ErrorCode;
use fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream};
use fidl_fuchsia_bluetooth_rfcomm_test::{RfcommTestMarker, RfcommTestRequestStream};
use fixture::fixture;
type TestFixture = (RfcommManager, ProfileRequestStream, RfcommTestRequestStream);
async fn setup_rfcomm_mgr<F, Fut>(_name: &str, test: F)
where
F: FnOnce(TestFixture) -> Fut,
Fut: futures::Future<Output = ()>,
{
let (profile, profile_server) = fidl::endpoints::create_proxy_and_stream::<ProfileMarker>();
let (rfcomm_test, rfcomm_test_server) =
fidl::endpoints::create_proxy_and_stream::<RfcommTestMarker>();
let rfcomm_mgr = RfcommManager::from_proxy(profile, rfcomm_test);
test((rfcomm_mgr, profile_server, rfcomm_test_server)).await
}
async fn expect_data(remote: &mut Channel, expected_data: Vec<u8>) {
let read_result = remote.next().await.expect("data").expect("okay");
assert_eq!(read_result, expected_data);
}
async fn expect_advertisement_and_search(
profile: &mut ProfileRequestStream,
) -> (
bredr::SearchResultsProxy,
(bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder),
) {
let mut search_request = None;
let mut advertisement = None;
while let Some(req) = profile.next().await {
match req {
Ok(bredr::ProfileRequest::Advertise { payload, responder, .. }) => {
let connect_proxy = payload.receiver.unwrap().into_proxy();
advertisement = Some((connect_proxy, responder));
}
Ok(bredr::ProfileRequest::Search { payload, .. }) => {
search_request = Some(payload.results.unwrap().into_proxy())
}
x => panic!("Expected one Advertise and Search but got: {:?}", x),
}
if search_request.is_some() && advertisement.is_some() {
break;
}
}
(search_request.expect("just set"), advertisement.expect("just set"))
}
#[fixture(setup_rfcomm_mgr)]
#[fuchsia::test]
async fn initiate_rfcomm_channel_to_peer_is_ok(
(rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
) {
let _profile_requests = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
let remote_id = PeerId(123);
let random_channel_number = ServerChannel::try_from(5).unwrap();
let mut peer_channel = {
let ch_fut =
Box::pin(rfcomm_mgr.outgoing_rfcomm_channel(remote_id, random_channel_number));
let profile_fut = async {
match profile_server.next().await {
Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => {
let (left, right) = Channel::create();
let _ = responder
.send(left.try_into().map_err(|_e| ErrorCode::Failed))
.unwrap();
right
}
x => panic!("Expected connect request, got: {:?}", x),
}
};
match futures::future::join(ch_fut, profile_fut).await {
(Ok(_), channel) => channel,
x => panic!("Expected both futures to complete: {:?}", x),
}
};
let user_data = vec![0x98, 0x97, 0x96, 0x95];
{
assert_matches!(
rfcomm_mgr.send_user_data(remote_id, random_channel_number, user_data.clone()),
Ok(_)
);
expect_data(&mut peer_channel, user_data).await;
}
let buf = vec![0x99, 0x11, 0x44];
assert_eq!(peer_channel.write(&buf), Ok(3));
assert_matches!(rfcomm_mgr.send_rls(remote_id, random_channel_number), Ok(_));
match rfcomm_test_server.next().await.expect("valid fidl request") {
Ok(rfcomm::RfcommTestRequest::RemoteLineStatus { id, channel_number, .. }) => {
assert_eq!(id, remote_id.into());
assert_eq!(channel_number, u8::from(random_channel_number));
}
x => panic!("Expected RLS request but got: {:?}", x),
}
}
#[fixture(setup_rfcomm_mgr)]
#[fuchsia::test]
async fn peer_initiating_rfcomm_channel_is_delivered(
(rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
) {
let (_search_proxy, (connect_proxy, _adv_fut)) = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
let remote_id = PeerId(8978);
let random_channel_number = ServerChannel::try_from(7).unwrap();
let (_peer_channel, local_channel) = Channel::create();
let protocol: Vec<bredr::ProtocolDescriptor> =
build_rfcomm_protocol(random_channel_number).iter().map(Into::into).collect();
assert_matches!(
connect_proxy.connected(
&remote_id.into(),
local_channel.try_into().unwrap(),
&protocol,
),
Ok(_)
);
}
#[fixture(setup_rfcomm_mgr)]
#[fuchsia::test]
async fn disconnect_session_received_by_rfcomm_test(
(rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
) {
let _profile_requests = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
let remote = PeerId(834);
assert_matches!(rfcomm_mgr.close_session(remote), Ok(_));
match rfcomm_test_server.next().await.expect("valid fidl request") {
Ok(rfcomm::RfcommTestRequest::Disconnect { id, .. }) if id == remote.into() => {}
x => panic!("Expected Disconnect request but got: {:?}", x),
}
}
#[fixture(setup_rfcomm_mgr)]
#[fuchsia::test]
async fn rls_update_before_established_channel_is_error(
(rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
) {
let _profile_requests = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
let remote = PeerId(222);
let random_channel_number = ServerChannel::try_from(9).unwrap();
assert_matches!(rfcomm_mgr.send_rls(remote, random_channel_number), Err(_));
}
#[fixture(setup_rfcomm_mgr)]
#[fuchsia::test]
async fn clear_services_unregisters_profile_requests(
(rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
) {
let (search_proxy, (connect_proxy, _advertise_fut)) = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
assert!(!search_proxy.is_closed());
assert!(!connect_proxy.is_closed());
rfcomm_mgr.clear_services();
let _profile = {
assert_matches!(rfcomm_mgr.advertise(), Ok(_));
expect_advertisement_and_search(&mut profile_server).await
};
}
#[fuchsia::test]
async fn rfcomm_session_task() {
let id = PeerId(999);
let mut session = RfcommSession::new(id);
let random_channel_number = ServerChannel::try_from(4).unwrap();
let (local, mut remote) = Channel::create();
session.new_rfcomm_channel(random_channel_number, local);
assert!(session.is_active(&random_channel_number));
let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
let unregistered = ServerChannel::try_from(9).unwrap();
assert_matches!(session.send_user_data(unregistered, data.clone()), Err(_));
assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
expect_data(&mut remote, data).await;
let data1 = vec![0x09];
let data2 = vec![0x11];
assert_matches!(session.send_user_data(random_channel_number, data1.clone()), Ok(_));
assert_matches!(session.send_user_data(random_channel_number, data2.clone()), Ok(_));
expect_data(&mut remote, data1).await;
expect_data(&mut remote, data2).await;
assert!(session.close_rfcomm_channel(&random_channel_number));
assert_matches!(remote.closed().await, Ok(_));
assert!(!session.close_rfcomm_channel(&random_channel_number));
}
#[fuchsia::test]
async fn second_channel_overwrites_first_in_rfcomm_session() {
let id = PeerId(78);
let mut session = RfcommSession::new(id);
let random_channel_number = ServerChannel::try_from(10).unwrap();
let (local1, remote1) = Channel::create();
session.new_rfcomm_channel(random_channel_number, local1);
assert!(session.is_active(&random_channel_number));
let (local2, mut remote2) = Channel::create();
session.new_rfcomm_channel(random_channel_number, local2);
assert!(session.is_active(&random_channel_number));
assert_matches!(remote1.closed().await, Ok(_));
let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
expect_data(&mut remote2, data).await;
}
#[fuchsia::test]
fn closing_sender_closes_rfcomm_channel_task() {
let mut exec = fasync::TestExecutor::new();
let random_channel_number = ServerChannel::try_from(10).unwrap();
let (local, _remote) = Channel::create();
let (_sender, receiver) = mpsc::channel(0);
let mut channel_task =
Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
drop(_sender);
let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
}
#[fuchsia::test]
fn closing_channel_closes_rfcomm_channel_task() {
let mut exec = fasync::TestExecutor::new();
let random_channel_number = ServerChannel::try_from(10).unwrap();
let (local, _remote) = Channel::create();
let (_sender, receiver) = mpsc::channel(0);
let mut channel_task =
Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
drop(_remote);
let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
}
}