use fuchsia_bluetooth::types::Channel;
use futures::future::Future;
use futures::stream::StreamExt;
use packet_encoding::{Decodable, Encodable};
use tracing::{info, trace, warn};
use crate::error::{Error, PacketError};
use crate::header::{
ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
};
use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
use crate::transport::max_packet_size_from_transport;
pub use crate::transport::TransportType;
mod handler;
pub use handler::{new_operation_error, ObexOperationError, ObexServerHandler};
mod get;
use get::GetOperation;
mod put;
use put::PutOperation;
#[derive(Debug)]
pub enum OperationRequest {
SendPackets(Vec<ResponsePacket>),
GetApplicationInfo(HeaderSet),
GetApplicationData(HeaderSet),
PutApplicationData(Vec<u8>, HeaderSet),
DeleteApplicationData(HeaderSet),
None,
}
#[derive(Debug)]
pub enum ApplicationResponse {
GetInfo(HeaderSet),
GetData((Vec<u8>, HeaderSet)),
Put,
}
impl ApplicationResponse {
#[cfg(test)]
fn accept_get(data: Vec<u8>, headers: HeaderSet) -> Result<Self, ObexOperationError> {
Ok(ApplicationResponse::GetData((data, headers)))
}
#[cfg(test)]
fn accept_get_info(headers: HeaderSet) -> Result<Self, ObexOperationError> {
Ok(ApplicationResponse::GetInfo(headers))
}
#[cfg(test)]
fn accept_put() -> Result<Self, ObexOperationError> {
Ok(ApplicationResponse::Put)
}
}
pub trait ServerOperation {
fn srm_status(&self) -> SingleResponseMode;
fn check_headers_for_srm(
srm_supported_locally: bool,
headers: &HeaderSet,
) -> Option<SingleResponseMode>
where
Self: Sized,
{
let Some(Header::SingleResponseMode(srm)) =
headers.get(&HeaderIdentifier::SingleResponseMode)
else {
trace!("No SRM header in request");
return None;
};
if srm_supported_locally && *srm == SingleResponseMode::Enable {
Some(SingleResponseMode::Enable)
} else {
Some(SingleResponseMode::Disable)
}
}
fn is_complete(&self) -> bool;
fn handle_peer_request(&mut self, request: RequestPacket) -> Result<OperationRequest, Error>;
fn handle_application_response(
&mut self,
response: Result<ApplicationResponse, ObexOperationError>,
) -> Result<Vec<ResponsePacket>, Error>;
}
#[derive(Clone, Copy, Debug, Default, PartialEq)]
enum ConnectionStatus {
#[default]
Initialized,
Connected { id: Option<ConnectionIdentifier> },
DisconnectReceived,
}
impl ConnectionStatus {
#[cfg(test)]
fn connected_no_id() -> Self {
Self::Connected { id: None }
}
}
pub struct ObexServer {
connected: ConnectionStatus,
max_packet_size: u16,
active_operation: Option<Box<dyn ServerOperation>>,
channel: Channel,
type_: TransportType,
handler: Box<dyn ObexServerHandler>,
}
impl ObexServer {
const DIRECTED_CONNECTION_ID: ConnectionIdentifier = ConnectionIdentifier(1);
pub fn new(
channel: Channel,
type_: TransportType,
handler: Box<dyn ObexServerHandler>,
) -> Self {
let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
Self {
connected: ConnectionStatus::default(),
max_packet_size,
active_operation: None,
channel,
type_,
handler,
}
}
fn is_connected(&self) -> bool {
matches!(self.connected, ConnectionStatus::Connected { .. })
}
fn set_connection_status(&mut self, status: ConnectionStatus) {
self.connected = status;
}
fn set_max_packet_size(&mut self, peer_max_packet_size: u16) {
let min_ = std::cmp::min(peer_max_packet_size, self.max_packet_size);
self.max_packet_size = min_;
trace!("Max packet size set to {}", self.max_packet_size);
}
fn send(&self, data: impl Encodable<Error = PacketError>) -> Result<(), Error> {
let mut buf = vec![0; data.encoded_len()];
data.encode(&mut buf[..])?;
let _ = self.channel.write(&buf)?;
Ok(())
}
async fn connect_request(&mut self, request: RequestPacket) -> Result<ResponsePacket, Error> {
let data = request.data();
let version = data[0];
let flags = data[1];
let peer_max_packet_size = u16::from_be_bytes(data[2..4].try_into().unwrap());
trace!(version, flags, peer_max_packet_size, "Additional data in CONNECT request");
self.set_max_packet_size(peer_max_packet_size);
let headers = HeaderSet::from(request);
let id = if headers.contains_header(&HeaderIdentifier::Target) {
Some(Self::DIRECTED_CONNECTION_ID)
} else {
None
};
let (code, response_headers) = match self.handler.connect(headers).await {
Ok(mut headers) => {
trace!("Application accepted CONNECT request");
let _ = headers.try_add_connection_id(&id);
self.set_connection_status(ConnectionStatus::Connected { id });
(ResponseCode::Ok, headers)
}
Err(reject_parameters) => {
trace!("Application rejected CONNECT request");
reject_parameters
}
};
let response_packet =
ResponsePacket::new_connect(code, self.max_packet_size, response_headers);
Ok(response_packet)
}
async fn disconnect_request(
&mut self,
request: RequestPacket,
) -> Result<ResponsePacket, Error> {
let headers = HeaderSet::from(request);
let response_headers = self.handler.disconnect(headers).await;
let response_packet = ResponsePacket::new_disconnect(response_headers);
self.set_connection_status(ConnectionStatus::DisconnectReceived);
Ok(response_packet)
}
async fn setpath_request(&mut self, request: RequestPacket) -> Result<ResponsePacket, Error> {
if !self.is_connected() {
return Err(Error::operation(OpCode::SetPath, "CONNECT not completed"));
}
let data = request.data();
let flags = SetPathFlags::from_bits_truncate(data[0]);
let backup = flags.contains(SetPathFlags::BACKUP);
let create = !flags.contains(SetPathFlags::DONT_CREATE);
let headers = HeaderSet::from(request);
let (code, response_headers) = match self.handler.set_path(headers, backup, create).await {
Ok(headers) => {
trace!("Application accepted SETPATH request");
(ResponseCode::Ok, headers)
}
Err(reject_parameters) => {
trace!("Application rejected SETPATH request");
reject_parameters
}
};
let response_packet = ResponsePacket::new_setpath(code, response_headers);
Ok(response_packet)
}
fn maybe_start_new_operation(&mut self, code: &OpCode) -> bool {
if self.active_operation.as_ref().is_some_and(|o| !o.is_complete()) {
return false;
}
let op: Box<dyn ServerOperation> = match code {
OpCode::Get | OpCode::GetFinal => {
Box::new(GetOperation::new(self.max_packet_size, self.type_.srm_supported()))
}
OpCode::Put | OpCode::PutFinal => {
Box::new(PutOperation::new(self.type_.srm_supported()))
}
_ => unreachable!("only called from `Self::multistep_request`"),
};
trace!("Started new operation ({code:?})");
self.active_operation = Some(op);
return true;
}
async fn multistep_request(
&mut self,
request: RequestPacket,
) -> Result<Vec<ResponsePacket>, Error> {
let _ = self.maybe_start_new_operation(request.code());
let operation = self.active_operation.as_mut().expect("just initialized");
let application_response = match operation.handle_peer_request(request) {
Ok(OperationRequest::SendPackets(responses)) => return Ok(responses),
Ok(OperationRequest::GetApplicationInfo(info_headers)) => {
self.handler.get_info(info_headers).await.map(|x| ApplicationResponse::GetInfo(x))
}
Ok(OperationRequest::GetApplicationData(request_headers)) => self
.handler
.get_data(request_headers)
.await
.map(|x| ApplicationResponse::GetData(x)),
Ok(OperationRequest::PutApplicationData(data, request_headers)) => {
self.handler.put(data, request_headers).await.map(|_| ApplicationResponse::Put)
}
Ok(OperationRequest::DeleteApplicationData(request_headers)) => {
self.handler.delete(request_headers).await.map(|_| ApplicationResponse::Put)
}
Ok(OperationRequest::None) => return Ok(vec![]),
Err(e) => {
warn!("Internal error in operation: {e:?}");
return Ok(vec![ResponsePacket::new_no_data(
ResponseCode::InternalServerError,
HeaderSet::new(),
)]);
}
};
operation.handle_application_response(application_response)
}
async fn receive_packet(&mut self, packet: Vec<u8>) -> Result<Vec<ResponsePacket>, Error> {
let decoded = RequestPacket::decode(&packet[..])?;
trace!(packet = ?decoded, "Received request from OBEX client");
let response = match decoded.code() {
OpCode::Connect => self.connect_request(decoded).await?,
OpCode::Disconnect => self.disconnect_request(decoded).await?,
OpCode::SetPath => self.setpath_request(decoded).await?,
OpCode::Put | OpCode::PutFinal | OpCode::Get | OpCode::GetFinal => {
return self.multistep_request(decoded).await;
}
_code => todo!("Support other OBEX requests"),
};
Ok(vec![response])
}
pub fn run(mut self) -> impl Future<Output = Result<(), Error>> {
async move {
while let Some(packet) = self.channel.next().await {
match packet {
Ok(bytes) => {
let responses = self.receive_packet(bytes).await?;
for response in responses {
self.send(response)?;
}
if self.connected == ConnectionStatus::DisconnectReceived {
trace!("Disconnect request - closing transport");
return Ok(());
}
}
Err(e) => warn!("Error reading data from transport: {e:?}"),
}
}
info!("Peer disconnected transport");
Ok(())
}
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use super::*;
#[track_caller]
pub fn expect_single_packet(request: OperationRequest) -> ResponsePacket {
let OperationRequest::SendPackets(mut packets) = request else {
panic!("Expected outgoing packet request, got: {request:?}");
};
assert_eq!(packets.len(), 1);
packets.pop().unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_test_helpers::expect_stream_pending;
use async_utils::PollExt;
use fuchsia_async as fasync;
use std::pin::pin;
use crate::header::header_set::{expect_body, expect_end_of_body};
use crate::server::handler::test_utils::TestApplicationProfile;
use crate::transport::test_utils::{expect_response, send_packet};
fn new_obex_server(srm: bool) -> (ObexServer, TestApplicationProfile, Channel) {
let (local, remote) = Channel::create();
let app = TestApplicationProfile::new();
let type_ = if srm { TransportType::L2cap } else { TransportType::Rfcomm };
let obex_server = ObexServer::new(local, type_, Box::new(app.clone()));
(obex_server, app, remote)
}
#[fuchsia::test]
fn obex_server_terminates_when_channel_closes() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, _test_app, remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server still active");
drop(remote);
let result = exec.run_until_stalled(&mut server_fut).expect("server finished");
assert_matches!(result, Ok(_));
}
#[fuchsia::test]
fn connect_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, test_app, mut remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let connect_request = RequestPacket::new_connect(500, HeaderSet::new());
send_packet(&mut remote, connect_request);
let headers = HeaderSet::from_header(Header::Description("foo".into()));
test_app.set_response(Ok(headers));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
assert_eq!(response.data(), &[0x10, 0, 0x01, 0xf4]);
assert!(response.headers().contains_header(&HeaderIdentifier::Description));
assert!(!response.headers().contains_header(&HeaderIdentifier::ConnectionId));
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Connect);
}
#[fuchsia::test]
fn directed_connect_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, test_app, mut remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let request_headers = HeaderSet::from_header(Header::Target(vec![5, 6]));
let connect_request = RequestPacket::new_connect(500, request_headers);
send_packet(&mut remote, connect_request);
let headers = HeaderSet::from_header(Header::name("foo"));
test_app.set_response(Ok(headers));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
assert_eq!(response.data(), &[0x10, 0, 0x01, 0xf4]);
assert!(response.headers().contains_header(&HeaderIdentifier::Name));
assert!(response.headers().contains_header(&HeaderIdentifier::ConnectionId));
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Connect);
}
#[fuchsia::test]
fn connect_rejected_by_app_is_ok() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, test_app, mut remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let connect_request = RequestPacket::new_connect(255, HeaderSet::new());
send_packet(&mut remote, connect_request);
test_app.set_response(Err((ResponseCode::Forbidden, HeaderSet::new())));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Forbidden);
assert_eq!(response.data(), &[0x10, 0, 0x00, 0xff]);
let headers = HeaderSet::from(response);
assert!(headers.is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Connect);
}
#[fuchsia::test]
fn invalid_connect_request_is_error() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, _test_app, remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server still active");
let _ = remote.write(&[0x80, 0x00, 0x05, 0x00, 0x00]).expect("can send data");
let result = exec.run_until_stalled(&mut server_fut).expect("terminate due to error");
assert_matches!(result, Err(Error::Packet(_)));
}
#[fuchsia::test]
fn peer_disconnect_request_terminates_server() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, test_app, mut remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers = HeaderSet::from_header(Header::Description("done".into()));
let disconnect_request = RequestPacket::new_disconnect(headers);
send_packet(&mut remote, disconnect_request);
let headers = HeaderSet::from_header(Header::Description("disconnecting".into()));
test_app.set_response(Ok(headers));
let result =
exec.run_until_stalled(&mut server_fut).expect("server terminated from disconnect");
assert_matches!(result, Ok(_));
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
let headers = HeaderSet::from(response);
assert!(headers.contains_header(&HeaderIdentifier::Description));
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Disconnect);
}
#[fuchsia::test]
fn setpath_request_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers = HeaderSet::from_header(Header::name("folder1"));
let setpath_request =
RequestPacket::new_set_path(SetPathFlags::all(), headers).expect("valid request");
send_packet(&mut remote, setpath_request);
test_app.set_response(Ok(HeaderSet::new()));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
let headers = HeaderSet::from(response);
assert!(headers.is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::SetPath);
}
#[fuchsia::test]
fn setpath_request_rejected_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let setpath_request = RequestPacket::new_set_path(SetPathFlags::BACKUP, HeaderSet::new())
.expect("valid request");
send_packet(&mut remote, setpath_request);
test_app.set_response(Err((ResponseCode::Forbidden, HeaderSet::new())));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Forbidden);
let headers = HeaderSet::from(response);
assert!(headers.is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::SetPath);
}
#[fuchsia::test]
fn setpath_request_before_connect_is_error() {
let mut exec = fasync::TestExecutor::new();
let (obex_server, _test_app, mut remote) = new_obex_server(false);
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let setpath_request = RequestPacket::new_set_path(SetPathFlags::BACKUP, HeaderSet::new())
.expect("valid request");
send_packet(&mut remote, setpath_request);
let result = exec
.run_until_stalled(&mut server_fut)
.expect("server terminated from invalid setpath");
assert_matches!(result, Err(Error::OperationError { operation: OpCode::SetPath, .. }));
}
#[fuchsia::test]
fn get_request_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let get_request1 =
RequestPacket::new_get(HeaderSet::from_header(Header::name("random object")));
send_packet(&mut remote, get_request1);
test_app.set_response(Ok(HeaderSet::from_header(Header::Length(0x10))));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Continue);
assert!(response.headers().contains_header(&HeaderIdentifier::Length));
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Get);
let get_request2 = RequestPacket::new_get_final(HeaderSet::new());
send_packet(&mut remote, get_request2);
let application_response_buf = vec![1, 2, 3, 4, 5, 6];
let response_headers = HeaderSet::from_header(Header::Description("foo".into()));
test_app.set_get_response((application_response_buf.clone(), response_headers));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
let mut headers = HeaderSet::from(response);
assert!(headers.contains_header(&HeaderIdentifier::Description));
let received_body = headers.remove_body(true).expect("contains body");
assert_eq!(received_body, application_response_buf);
};
expect_response(&mut exec, &mut remote, expectation, OpCode::GetFinal);
}
#[fuchsia::test]
fn get_request_rejected_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, _test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers = HeaderSet::from_header(Header::name("random object123"));
let get_request = RequestPacket::new_get_final(headers);
send_packet(&mut remote, get_request);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::NotImplemented);
assert!(response.headers().is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::GetFinal);
}
#[fuchsia::test]
fn get_request_with_srm_enabled_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(true);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
obex_server.set_max_packet_size(20); let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers1 = HeaderSet::from_headers(vec![
Header::name("random object"),
SingleResponseMode::Enable.into(),
])
.unwrap();
let get_request1 = RequestPacket::new_get(headers1);
send_packet(&mut remote, get_request1);
test_app.set_response(Ok(HeaderSet::from_header(Header::Length(0x20))));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation1 = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Continue);
let Header::SingleResponseMode(SingleResponseMode::Enable) =
response.headers().get(&HeaderIdentifier::SingleResponseMode).unwrap()
else {
panic!("Expected SRM enable in response");
};
assert!(response.headers().contains_header(&HeaderIdentifier::Length));
};
expect_response(&mut exec, &mut remote, expectation1, OpCode::Get);
let get_request2 = RequestPacket::new_get_final(HeaderSet::new());
send_packet(&mut remote, get_request2);
let application_response_buf = (0..50).collect::<Vec<u8>>();
test_app.set_get_response((application_response_buf, HeaderSet::new()));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expected_bufs = vec![
(0..14).collect::<Vec<u8>>(),
(14..28).collect::<Vec<u8>>(),
(28..42).collect::<Vec<u8>>(),
];
for expected_buf in expected_bufs {
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Continue);
expect_body(response.headers(), expected_buf);
};
expect_response(&mut exec, &mut remote, expectation, OpCode::Get);
}
let final_expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
expect_end_of_body(response.headers(), (42..50).collect::<Vec<u8>>());
};
expect_response(&mut exec, &mut remote, final_expectation, OpCode::GetFinal);
}
#[fuchsia::test]
fn put_request_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers = HeaderSet::from_headers(vec![
Header::name("random object"),
Header::EndOfBody(vec![1, 2, 3, 4, 5]),
])
.unwrap();
let put_request = RequestPacket::new_put_final(headers);
send_packet(&mut remote, put_request);
test_app.set_put_response(Ok(()));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let (rec_data, rec_headers) = test_app.put_data();
assert_eq!(rec_data, vec![1, 2, 3, 4, 5]);
assert!(rec_headers.contains_header(&HeaderIdentifier::Name));
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
assert!(response.headers().is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::PutFinal);
}
#[fuchsia::test]
fn put_request_with_srm_enabled_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(true);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers1 = HeaderSet::from_headers(vec![
Header::name("my file"),
SingleResponseMode::Enable.into(),
])
.unwrap();
let put_request1 = RequestPacket::new_put(headers1);
send_packet(&mut remote, put_request1);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation1 = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Continue);
let Header::SingleResponseMode(SingleResponseMode::Enable) =
response.headers().get(&HeaderIdentifier::SingleResponseMode).unwrap()
else {
panic!("Expected SRM enable in response");
};
};
expect_response(&mut exec, &mut remote, expectation1, OpCode::Put);
let headers2 = HeaderSet::from_header(Header::Body(vec![1, 2, 3, 4, 5]));
let put_request2 = RequestPacket::new_put(headers2);
send_packet(&mut remote, put_request2);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
expect_stream_pending(&mut exec, &mut remote);
let headers3 = HeaderSet::from_header(Header::EndOfBody(vec![6, 7, 8, 9, 10]));
let put_request3 = RequestPacket::new_put_final(headers3);
send_packet(&mut remote, put_request3);
test_app.set_put_response(Ok(()));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let (rec_data, rec_headers) = test_app.put_data();
assert_eq!(rec_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assert!(rec_headers.contains_header(&HeaderIdentifier::Name));
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
assert!(response.headers().is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::PutFinal);
}
#[fuchsia::test]
fn delete_request_accepted_by_app_success() {
let mut exec = fasync::TestExecutor::new();
let (mut obex_server, test_app, mut remote) = new_obex_server(false);
obex_server.set_connection_status(ConnectionStatus::connected_no_id());
let server_fut = obex_server.run();
let mut server_fut = pin!(server_fut);
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let headers = HeaderSet::from_header(Header::name("foo.txt"));
let put_request = RequestPacket::new_put_final(headers);
send_packet(&mut remote, put_request);
test_app.set_put_response(Ok(()));
let _ = exec.run_until_stalled(&mut server_fut).expect_pending("server active");
let expectation = |response: ResponsePacket| {
assert_eq!(*response.code(), ResponseCode::Ok);
assert!(response.headers().is_empty());
};
expect_response(&mut exec, &mut remote, expectation, OpCode::PutFinal);
}
}