use fuchsia_bluetooth::types::Channel;
use futures::stream::{FusedStream, TryStreamExt};
use packet_encoding::Encodable;
use std::cell::{RefCell, RefMut};
use tracing::{info, trace};
use crate::error::{Error, PacketError};
use crate::operation::{OpCode, ResponsePacket, MAX_PACKET_SIZE, MIN_MAX_PACKET_SIZE};
pub fn max_packet_size_from_transport(transport_max: usize) -> u16 {
let bounded = transport_max.clamp(MIN_MAX_PACKET_SIZE, MAX_PACKET_SIZE);
bounded.try_into().expect("bounded by u16 max")
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum TransportType {
L2cap,
Rfcomm,
}
impl TransportType {
pub fn srm_supported(&self) -> bool {
match &self {
Self::L2cap => true,
Self::Rfcomm => false,
}
}
}
#[derive(Debug)]
pub struct ObexTransport<'a> {
channel: RefMut<'a, Channel>,
type_: TransportType,
}
impl<'a> ObexTransport<'a> {
pub fn new(channel: RefMut<'a, Channel>, type_: TransportType) -> Self {
Self { channel, type_ }
}
pub fn srm_supported(&self) -> bool {
self.type_.srm_supported()
}
pub fn send(&self, data: impl Encodable<Error = PacketError>) -> Result<(), Error> {
let mut buf = vec![0; data.encoded_len()];
data.encode(&mut buf[..])?;
let _ = self.channel.write(&buf)?;
Ok(())
}
pub async fn receive_response(&mut self, code: OpCode) -> Result<ResponsePacket, Error> {
if self.channel.is_terminated() {
return Err(Error::PeerDisconnected);
}
match self.channel.try_next().await? {
Some(raw_data) => {
let decoded = ResponsePacket::decode(&raw_data[..], code)?;
trace!("Received response: {decoded:?}");
Ok(decoded)
}
None => {
info!("OBEX transport closed");
Err(Error::PeerDisconnected)
}
}
}
}
#[derive(Debug)]
pub struct ObexTransportManager {
channel: RefCell<Channel>,
type_: TransportType,
}
impl ObexTransportManager {
pub fn new(channel: Channel, type_: TransportType) -> Self {
Self { channel: RefCell::new(channel), type_ }
}
fn new_permit(&self) -> Result<RefMut<'_, Channel>, Error> {
self.channel.try_borrow_mut().map_err(|_| Error::OperationInProgress)
}
pub fn is_transport_closed(&self) -> bool {
self.channel.try_borrow().map_or(false, |chan| chan.is_closed())
}
pub fn try_new_operation(&self) -> Result<ObexTransport<'_>, Error> {
let channel = self.new_permit()?;
Ok(ObexTransport::new(channel, self.type_))
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use super::*;
use async_test_helpers::expect_stream_item;
use fuchsia_async as fasync;
use packet_encoding::Decodable;
use crate::operation::RequestPacket;
pub(crate) fn new_manager(srm_supported: bool) -> (ObexTransportManager, Channel) {
let (local, remote) = Channel::create();
let type_ = if srm_supported { TransportType::L2cap } else { TransportType::Rfcomm };
let manager = ObexTransportManager::new(local, type_);
(manager, remote)
}
#[derive(Clone)]
pub struct TestPacket(pub u8);
impl Encodable for TestPacket {
type Error = PacketError;
fn encoded_len(&self) -> usize {
1
}
fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
buf[0] = self.0;
Ok(())
}
}
impl Decodable for TestPacket {
type Error = PacketError;
fn decode(buf: &[u8]) -> Result<Self, Self::Error> {
Ok(TestPacket(buf[0]))
}
}
#[track_caller]
pub fn reply(channel: &mut Channel, response: ResponsePacket) {
let mut response_buf = vec![0; response.encoded_len()];
response.encode(&mut response_buf[..]).expect("can encode response");
let _ = channel.write(&response_buf[..]).expect("write to channel success");
}
#[track_caller]
pub fn send_packet<T>(channel: &mut Channel, packet: T)
where
T: Encodable,
<T as Encodable>::Error: std::fmt::Debug,
{
let mut buf = vec![0; packet.encoded_len()];
packet.encode(&mut buf[..]).expect("can encode packet");
let _ = channel.write(&buf[..]).expect("write to channel success");
}
#[track_caller]
pub fn expect_request<F>(exec: &mut fasync::TestExecutor, channel: &mut Channel, expectation: F)
where
F: FnOnce(RequestPacket),
{
let request_raw = expect_stream_item(exec, channel).expect("request");
let request = RequestPacket::decode(&request_raw[..]).expect("can decode request");
expectation(request);
}
#[track_caller]
pub fn expect_response<F>(
exec: &mut fasync::TestExecutor,
channel: &mut Channel,
expectation: F,
opcode: OpCode,
) where
F: FnOnce(ResponsePacket),
{
let request_raw = expect_stream_item(exec, channel).expect("request");
let request = ResponsePacket::decode(&request_raw[..], opcode).expect("can decode request");
expectation(request);
}
#[track_caller]
pub fn expect_request_and_reply<F>(
exec: &mut fasync::TestExecutor,
channel: &mut Channel,
expectation: F,
response: ResponsePacket,
) where
F: FnOnce(RequestPacket),
{
expect_request(exec, channel, expectation);
reply(channel, response)
}
pub fn expect_code(code: OpCode) -> impl FnOnce(RequestPacket) {
let f = move |request: RequestPacket| {
assert_eq!(*request.code(), code);
};
f
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_utils::PollExt;
use fuchsia_async as fasync;
use std::pin::pin;
use crate::header::HeaderSet;
use crate::operation::{RequestPacket, ResponseCode};
use crate::transport::test_utils::{
expect_code, expect_request_and_reply, new_manager, TestPacket,
};
#[fuchsia::test]
fn transport_manager_new_operation() {
let _exec = fasync::TestExecutor::new();
let (manager, _remote) = new_manager(false);
assert_matches!(manager.new_permit(), Ok(_));
let transport1 = manager.try_new_operation().expect("can start operation");
assert_matches!(manager.try_new_operation(), Err(Error::OperationInProgress));
drop(transport1);
let transport2 = manager.try_new_operation().expect("can start another operation");
let request = RequestPacket::new_connect(100, HeaderSet::new());
transport2.send(request).expect("can send request");
}
#[fuchsia::test]
fn send_and_receive() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(false);
let mut transport = manager.try_new_operation().expect("can start operation");
let request = RequestPacket::new_connect(100, HeaderSet::new());
transport.send(request).expect("can send request");
let peer_response =
ResponsePacket::new(ResponseCode::Ok, vec![0x10, 0x00, 0x00, 0xff], HeaderSet::new());
expect_request_and_reply(
&mut exec,
&mut remote,
expect_code(OpCode::Connect),
peer_response,
);
let receive_fut = transport.receive_response(OpCode::Connect);
let mut receive_fut = pin!(receive_fut);
let received_response = exec
.run_until_stalled(&mut receive_fut)
.expect("stream item from response")
.expect("valid response");
assert_eq!(*received_response.code(), ResponseCode::Ok);
}
#[fuchsia::test]
async fn send_while_channel_closed_is_error() {
let (manager, remote) = new_manager(false);
let transport = manager.try_new_operation().expect("can start operation");
drop(remote);
let data = TestPacket(10);
let send_result = transport.send(data.clone());
assert_matches!(send_result, Err(Error::IOError(_)));
let send_result = transport.send(data.clone());
assert_matches!(send_result, Err(Error::IOError(_)));
}
#[fuchsia::test]
async fn is_transport_closed() {
let (manager, remote) = new_manager(false);
assert!(!manager.is_transport_closed());
{
let _transport = manager.try_new_operation().expect("can start operation");
assert!(!manager.is_transport_closed());
drop(remote);
assert!(!manager.is_transport_closed());
}
assert!(manager.is_transport_closed());
}
#[fuchsia::test]
async fn receive_while_channel_closed_is_error() {
let (manager, remote) = new_manager(false);
let mut transport = manager.try_new_operation().expect("can start operation");
drop(remote);
let receive_result = transport.receive_response(OpCode::Connect).await;
assert_matches!(receive_result, Err(Error::PeerDisconnected));
let receive_result = transport.receive_response(OpCode::Connect).await;
assert_matches!(receive_result, Err(Error::PeerDisconnected));
}
}