use fuchsia_bluetooth::types::Channel;
use tracing::{trace, warn};
pub use crate::client::get::GetOperation;
pub use crate::client::put::PutOperation;
use crate::error::Error;
use crate::header::{
ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
};
use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
pub use crate::transport::TransportType;
use crate::transport::{max_packet_size_from_transport, ObexTransportManager};
use fuchsia_sync::Mutex;
mod put;
mod get;
pub(crate) trait SrmOperation {
const OPERATION_TYPE: OpCode;
fn get_srm(&self) -> SingleResponseMode;
fn set_srm(&mut self, mode: SingleResponseMode);
fn try_enable_srm(&mut self, headers: &mut HeaderSet) -> Result<(), Error> {
let requested_srm = headers.try_add_srm(self.get_srm())?;
self.set_srm(requested_srm);
trace!(operation = ?Self::OPERATION_TYPE, "Requesting SRM {requested_srm:?}");
Ok(())
}
fn check_response_for_srm(&mut self, headers: &HeaderSet) {
let srm_response = if let Some(Header::SingleResponseMode(srm)) =
headers.get(&HeaderIdentifier::SingleResponseMode)
{
*srm
} else {
trace!(operation = ?Self::OPERATION_TYPE, "Response doesn't contain SRM header");
SingleResponseMode::Disable
};
trace!(current_status = ?self.get_srm(), operation = ?Self::OPERATION_TYPE, "Peer responded with {srm_response:?}");
match (srm_response, self.get_srm()) {
(SingleResponseMode::Enable, SingleResponseMode::Disable) => {
warn!("SRM stays disabled");
}
(SingleResponseMode::Disable, SingleResponseMode::Enable) => {
trace!("SRM is disabled");
self.set_srm(SingleResponseMode::Disable);
}
_ => {} }
trace!(status = ?self.get_srm(), operation = ?Self::OPERATION_TYPE, "SRM status");
}
}
#[derive(Clone, Copy, Debug, PartialEq, Default)]
enum ConnectionStatus {
#[default]
Initialized,
Connected { id: Option<ConnectionIdentifier> },
Disconnected,
}
impl ConnectionStatus {
#[cfg(test)]
fn connected_no_id() -> Self {
Self::Connected { id: None }
}
}
#[derive(Debug)]
pub struct ObexClient {
connected: Mutex<ConnectionStatus>,
max_packet_size: Mutex<u16>,
transport: ObexTransportManager,
}
impl ObexClient {
pub fn new(channel: Channel, type_: TransportType) -> Self {
let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
let transport = ObexTransportManager::new(channel, type_);
Self {
connected: Mutex::new(ConnectionStatus::default()),
max_packet_size: Mutex::new(max_packet_size),
transport,
}
}
pub fn is_transport_connected(&self) -> bool {
!self.transport.is_transport_closed()
}
fn set_connection_status(&self, status: ConnectionStatus) {
*self.connected.lock() = status;
}
fn connection_status(&self) -> ConnectionStatus {
*self.connected.lock()
}
pub fn is_connected(&self) -> bool {
matches!(*self.connected.lock(), ConnectionStatus::Connected { .. })
}
pub fn connection_id(&self) -> Option<ConnectionIdentifier> {
match self.connection_status() {
ConnectionStatus::Connected { id } => id.clone(),
_ => None,
}
}
fn set_max_packet_size(&self, peer_max_packet_size: u16) {
*self.max_packet_size.lock() = peer_max_packet_size;
trace!("Max packet size set to {peer_max_packet_size}");
}
fn max_packet_size(&self) -> u16 {
*self.max_packet_size.lock()
}
fn handle_connect_response(&self, response: ResponsePacket) -> Result<HeaderSet, Error> {
let request = OpCode::Connect;
let response = response.expect_code(request, ResponseCode::Ok)?;
if response.data().len() != request.response_data_length() {
return Err(Error::response(request, "Invalid CONNECT data"));
}
let peer_max_packet_size = u16::from_be_bytes(response.data()[2..4].try_into().unwrap());
self.set_max_packet_size(peer_max_packet_size);
let headers: HeaderSet = response.into();
if let Some(Header::ConnectionId(id)) = headers.get(&HeaderIdentifier::ConnectionId) {
trace!(id = ?id, "Found Connection Identifier in CONNECT response");
self.set_connection_status(ConnectionStatus::Connected { id: Some(*id) });
}
Ok(headers)
}
pub async fn connect(&self, headers: HeaderSet) -> Result<HeaderSet, Error> {
if self.is_connected() {
return Err(Error::operation(OpCode::Connect, "already connected"));
}
let response = {
let request = RequestPacket::new_connect(self.max_packet_size(), headers);
let mut transport = self.transport.try_new_operation()?;
trace!("Making outgoing CONNECT request: {request:?}");
transport.send(request)?;
trace!("Successfully made CONNECT request");
transport.receive_response(OpCode::Connect).await?
};
let response_headers = self.handle_connect_response(response)?;
Ok(response_headers)
}
pub async fn disconnect(self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
let opcode = OpCode::Disconnect;
if !self.is_connected() {
return Err(Error::operation(opcode, "session not connected"));
}
headers.try_add_connection_id(&self.connection_id())?;
let response = {
let request = RequestPacket::new_disconnect(headers);
let mut transport = self.transport.try_new_operation()?;
trace!("Making outgoing DISCONNECT request: {request:?}");
transport.send(request)?;
trace!("Successfully made DISCONNECT request");
transport.receive_response(opcode).await?
};
self.set_connection_status(ConnectionStatus::Disconnected);
response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
}
pub fn get(&self) -> Result<GetOperation<'_>, Error> {
if !self.is_connected() {
return Err(Error::operation(OpCode::Get, "session not connected"));
}
let mut headers = HeaderSet::new();
headers.try_add_connection_id(&self.connection_id())?;
let transport = self.transport.try_new_operation()?;
Ok(GetOperation::new(headers, transport))
}
pub fn put(&self) -> Result<PutOperation<'_>, Error> {
if !self.is_connected() {
return Err(Error::operation(OpCode::Put, "session not connected"));
}
let mut headers = HeaderSet::new();
headers.try_add_connection_id(&self.connection_id())?;
let transport = self.transport.try_new_operation()?;
Ok(PutOperation::new(headers, transport))
}
pub async fn set_path(
&self,
flags: SetPathFlags,
mut headers: HeaderSet,
) -> Result<HeaderSet, Error> {
let opcode = OpCode::SetPath;
if !self.is_connected() {
return Err(Error::operation(opcode, "session not connected"));
}
headers.try_add_connection_id(&self.connection_id())?;
let request = RequestPacket::new_set_path(flags, headers)?;
let response = {
let mut transport = self.transport.try_new_operation()?;
trace!("Making outgoing SETPATH request: {request:?}");
transport.send(request)?;
trace!("Successfully made SETPATH request");
transport.receive_response(opcode).await?
};
if *response.code() == ResponseCode::BadRequest
|| *response.code() == ResponseCode::Forbidden
{
return Err(Error::not_implemented(opcode));
}
response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_utils::PollExt;
use fuchsia_async as fasync;
use std::pin::pin;
use crate::transport::test_utils::{expect_code, expect_request_and_reply};
#[fuchsia::test]
fn max_packet_size_calculation() {
let transport_max = 1000;
assert_eq!(max_packet_size_from_transport(transport_max), 1000);
let transport_max_small = 40;
assert_eq!(max_packet_size_from_transport(transport_max_small), 255);
let transport_max_large = 700000;
assert_eq!(max_packet_size_from_transport(transport_max_large), std::u16::MAX);
}
fn new_obex_client(connected: ConnectionStatus) -> (ObexClient, Channel) {
let (local, remote) = Channel::create();
let client = ObexClient::new(local, TransportType::Rfcomm);
client.set_connection_status(connected);
(client, remote)
}
#[fuchsia::test]
fn client_connect_success() {
let mut exec = fasync::TestExecutor::new();
let (client, mut remote) = new_obex_client(ConnectionStatus::default());
assert!(!client.is_connected());
assert_eq!(client.max_packet_size(), Channel::DEFAULT_MAX_TX as u16);
assert_eq!(client.connection_id(), None);
{
let connect_fut = client.connect(HeaderSet::new());
let mut connect_fut = pin!(connect_fut);
exec.run_until_stalled(&mut connect_fut).expect_pending("waiting for response");
let response_headers =
HeaderSet::from_headers(vec![Header::ConnectionId(ConnectionIdentifier(1))])
.unwrap();
let response = ResponsePacket::new(
ResponseCode::Ok,
vec![0x10, 0x00, 0xff, 0xff], response_headers.clone(),
);
expect_request_and_reply(
&mut exec,
&mut remote,
expect_code(OpCode::Connect),
response,
);
let connect_result = exec
.run_until_stalled(&mut connect_fut)
.expect("received response")
.expect("response is ok");
assert_eq!(connect_result, response_headers);
}
assert!(client.is_connected());
assert_eq!(client.max_packet_size(), 0xffff);
assert_eq!(client.connection_id(), Some(ConnectionIdentifier(1)));
}
#[fuchsia::test]
async fn multiple_connect_is_error() {
let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
let result = client.connect(HeaderSet::new()).await;
assert_matches!(result, Err(Error::OperationError { .. }));
}
#[fuchsia::test]
fn get_before_connect_is_error() {
let _exec = fasync::TestExecutor::new();
let (client, _remote) = new_obex_client(ConnectionStatus::default());
let get_result = client.get();
assert_matches!(get_result, Err(Error::OperationError { .. }));
}
#[fuchsia::test]
fn sequential_get_operations_is_ok() {
let _exec = fasync::TestExecutor::new();
let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
let _get_operation1 = client.get().expect("can initialize first get");
drop(_get_operation1);
let _get_operation2 = client.get().expect("can initialize second get");
}
#[fuchsia::test]
fn disconnect_success() {
let mut exec = fasync::TestExecutor::new();
let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
let headers = HeaderSet::from_header(Header::Description("finished".into()));
let disconnect_fut = client.disconnect(headers);
let mut disconnect_fut = pin!(disconnect_fut);
exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
let disconnect_result = exec
.run_until_stalled(&mut disconnect_fut)
.expect("received response")
.expect("response is ok");
assert_eq!(disconnect_result, response_headers);
}
#[fuchsia::test]
async fn disconnect_before_connect_error() {
let (client, _remote) = new_obex_client(ConnectionStatus::default());
let headers = HeaderSet::from_header(Header::Description("finished".into()));
let disconnect_result = client.disconnect(headers).await;
assert_matches!(disconnect_result, Err(Error::OperationError { .. }))
}
#[fuchsia::test]
fn disconnect_error_response_error() {
let mut exec = fasync::TestExecutor::new();
let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
let disconnect_fut = client.disconnect(HeaderSet::new());
let mut disconnect_fut = pin!(disconnect_fut);
exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
let response = ResponsePacket::new(
ResponseCode::InternalServerError,
vec![],
response_headers.clone(),
);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
let disconnect_result =
exec.run_until_stalled(&mut disconnect_fut).expect("received response");
assert_matches!(disconnect_result, Err(Error::PeerRejected { .. }));
}
#[fuchsia::test]
fn setpath_success() {
let mut exec = fasync::TestExecutor::new();
let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
let headers = HeaderSet::from_header(Header::name("myfolder"));
let setpath_fut = client.set_path(SetPathFlags::empty(), headers);
let mut setpath_fut = pin!(setpath_fut);
exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
let response_headers =
HeaderSet::from_header(Header::Description("updated current folder".into()));
let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::SetPath);
assert_eq!(request.data(), &[0, 0]);
};
expect_request_and_reply(&mut exec, &mut remote, expectation, response);
let setpath_result = exec
.run_until_stalled(&mut setpath_fut)
.expect("received response")
.expect("response is ok");
assert_eq!(setpath_result, response_headers);
}
#[fuchsia::test]
fn setpath_error_response_is_error() {
let mut exec = fasync::TestExecutor::new();
let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
{
let setpath_fut = client.set_path(SetPathFlags::BACKUP, HeaderSet::new());
let mut setpath_fut = pin!(setpath_fut);
exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
let response_headers =
HeaderSet::from_header(Header::Description("not implemented".into()));
let response = ResponsePacket::new(ResponseCode::BadRequest, vec![], response_headers);
expect_request_and_reply(
&mut exec,
&mut remote,
expect_code(OpCode::SetPath),
response,
);
let setpath_result =
exec.run_until_stalled(&mut setpath_fut).expect("received response");
assert_matches!(setpath_result, Err(Error::NotImplemented { .. }));
}
let headers = HeaderSet::from_header(Header::name("file"));
let setpath_fut = client.set_path(SetPathFlags::DONT_CREATE, headers);
let mut setpath_fut = pin!(setpath_fut);
exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
let response_headers =
HeaderSet::from_header(Header::Description("not implemented".into()));
let response =
ResponsePacket::new(ResponseCode::InternalServerError, vec![], response_headers);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::SetPath), response);
let setpath_result = exec.run_until_stalled(&mut setpath_fut).expect("received response");
assert_matches!(setpath_result, Err(Error::PeerRejected { .. }));
}
}