use tracing::trace;
use crate::client::SrmOperation;
use crate::error::Error;
use crate::header::{HeaderSet, SingleResponseMode};
use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket};
use crate::transport::ObexTransport;
#[must_use]
#[derive(Debug)]
pub struct GetOperation<'a> {
transport: ObexTransport<'a>,
headers: Option<HeaderSet>,
is_started: bool,
srm: SingleResponseMode,
}
impl<'a> GetOperation<'a> {
pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
let srm = transport.srm_supported().into();
Self { transport, headers: Some(headers), is_started: false, srm }
}
fn set_started(&mut self) {
let _ = self.headers.take().unwrap();
self.is_started = true;
}
fn update_headers_before_start(
&mut self,
application_headers: &mut HeaderSet,
) -> Result<(), Error> {
if self.is_started {
return Ok(());
}
let initial = self.headers.replace(HeaderSet::new()).unwrap();
application_headers.try_append(initial)?;
self.try_enable_srm(application_headers)?;
Ok(())
}
fn handle_get_response(response: ResponsePacket) -> Result<HeaderSet, Error> {
response.expect_code(OpCode::Get, ResponseCode::Continue).map(Into::into)
}
fn handle_get_final_response(
response: ResponsePacket,
) -> Result<(bool, HeaderSet, Vec<u8>), Error> {
if *response.code() == ResponseCode::Ok {
let mut headers = HeaderSet::from(response);
return headers.remove_body(true).map(|eob| (true, headers, eob));
}
let mut headers =
response.expect_code(OpCode::GetFinal, ResponseCode::Continue).map(HeaderSet::from)?;
headers.remove_body(false).map(|b| (false, headers, b))
}
pub async fn get_information(&mut self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
self.update_headers_before_start(&mut headers)?;
if headers.is_empty() {
return Err(Error::operation(OpCode::Get, "missing headers"));
}
let srm_active = self.is_started && self.get_srm() == SingleResponseMode::Enable;
let request = RequestPacket::new_get(headers);
trace!(?request, "Making outgoing GET request");
self.transport.send(request)?;
trace!("Successfully made GET request");
let response_headers = if !srm_active {
let response = self.transport.receive_response(OpCode::Get).await?;
Self::handle_get_response(response)?
} else {
HeaderSet::new()
};
if !self.is_started {
self.check_response_for_srm(&response_headers);
self.set_started();
}
Ok(response_headers)
}
pub async fn get_data(mut self, mut headers: HeaderSet) -> Result<Vec<u8>, Error> {
self.update_headers_before_start(&mut headers)?;
let mut request = RequestPacket::new_get_final(headers);
let mut first_request = true;
let mut body = vec![];
loop {
if first_request || self.srm != SingleResponseMode::Enable {
trace!(?request, "Making outgoing GET final request");
self.transport.send(request.clone())?;
trace!("Successfully made GET final request");
request = RequestPacket::new_get_final(HeaderSet::new());
first_request = false;
}
let response = self.transport.receive_response(OpCode::GetFinal).await?;
let (final_packet, response_headers, mut response_body) =
Self::handle_get_final_response(response)?;
body.append(&mut response_body);
if !self.is_started {
self.check_response_for_srm(&response_headers);
self.set_started();
}
if final_packet {
trace!("Found terminal GET final packet");
break;
}
}
Ok(body)
}
pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
let opcode = OpCode::Abort;
if !self.is_started {
return Err(Error::operation(opcode, "can't abort when not started"));
}
let request = RequestPacket::new_abort(headers);
trace!(?request, "Making outgoing {opcode:?} request");
self.transport.send(request)?;
trace!("Successfully made {opcode:?} request");
let response = self.transport.receive_response(opcode).await?;
response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
}
}
impl SrmOperation for GetOperation<'_> {
const OPERATION_TYPE: OpCode = OpCode::Get;
fn get_srm(&self) -> SingleResponseMode {
self.srm
}
fn set_srm(&mut self, mode: SingleResponseMode) {
self.srm = mode;
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_test_helpers::expect_stream_pending;
use async_utils::PollExt;
use fuchsia_async as fasync;
use std::pin::pin;
use crate::error::PacketError;
use crate::header::{Header, HeaderIdentifier};
use crate::transport::test_utils::{
expect_code, expect_request, expect_request_and_reply, new_manager, reply,
};
use crate::transport::ObexTransportManager;
fn setup_get_operation(mgr: &ObexTransportManager, initial: HeaderSet) -> GetOperation<'_> {
let transport = mgr.try_new_operation().expect("can start operation");
GetOperation::new(initial, transport)
}
#[fuchsia::test]
fn get_operation() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(false);
let mut operation = setup_get_operation(&manager, HeaderSet::new());
assert!(!operation.is_started);
{
let info_headers = HeaderSet::from_header(Header::name("text"));
let info_fut = operation.get_information(info_headers);
let mut info_fut = pin!(info_fut);
exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
let response_headers = HeaderSet::from_header(Header::name("bar"));
let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::Get);
let headers = HeaderSet::from(request);
assert!(headers.contains_header(&HeaderIdentifier::Name));
};
expect_request_and_reply(&mut exec, &mut remote, expectation, response);
let received_headers = exec
.run_until_stalled(&mut info_fut)
.expect("response received")
.expect("valid response");
assert!(received_headers.contains_header(&HeaderIdentifier::Name));
}
assert!(operation.is_started);
{
let info_headers = HeaderSet::from_header(Header::Type("file".into()));
let info_fut = operation.get_information(info_headers);
let mut info_fut = pin!(info_fut);
exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
let response_headers = HeaderSet::from_header(Header::Description("big file".into()));
let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Get), response);
let received_headers = exec
.run_until_stalled(&mut info_fut)
.expect("response received")
.expect("valid response");
assert!(received_headers.contains_header(&HeaderIdentifier::Description));
}
let data_fut = operation.get_data(HeaderSet::new());
let mut data_fut = pin!(data_fut);
exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
exec.run_until_stalled(&mut data_fut)
.expect_pending("waiting for additional peer responses");
let response_headers2 = HeaderSet::from_header(Header::EndOfBody(vec![4, 5, 6]));
let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers2);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response2);
let user_data = exec
.run_until_stalled(&mut data_fut)
.expect("received all responses")
.expect("valid user data");
assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6]);
}
#[fuchsia::test]
fn get_operation_terminate_success() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(false);
let initial = HeaderSet::from_header(Header::name("foo"));
let mut operation = setup_get_operation(&manager, initial);
operation.set_started();
let headers = HeaderSet::from_header(Header::name("terminated"));
let terminate_fut = operation.terminate(headers);
let mut terminate_fut = pin!(terminate_fut);
let _ =
exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for peer response");
let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
}
#[fuchsia::test]
fn get_operation_srm() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(true);
let mut operation = setup_get_operation(&manager, HeaderSet::new());
{
let info_headers = HeaderSet::from_header(Header::name("foo"));
let info_fut = operation.get_information(info_headers);
let mut info_fut = pin!(info_fut);
exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
let response_headers = HeaderSet::from_headers(vec![
Header::name("bar"),
SingleResponseMode::Enable.into(),
])
.unwrap();
let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::Get);
let headers = HeaderSet::from(request);
assert!(headers.contains_header(&HeaderIdentifier::Name));
assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
};
expect_request_and_reply(&mut exec, &mut remote, expectation, response);
let _received_headers = exec
.run_until_stalled(&mut info_fut)
.expect("response received")
.expect("valid response");
}
assert!(operation.is_started);
assert_eq!(operation.srm, SingleResponseMode::Enable);
{
let info_headers = HeaderSet::from_header(Header::Type("file".into()));
let info_fut = operation.get_information(info_headers);
let mut info_fut = pin!(info_fut);
let received_headers = exec
.run_until_stalled(&mut info_fut)
.expect("ready without peer response")
.expect("successful request");
assert_eq!(received_headers, HeaderSet::new());
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::Get);
let headers = HeaderSet::from(request);
assert!(headers.contains_header(&HeaderIdentifier::Type));
assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
};
expect_request(&mut exec, &mut remote, expectation);
}
let data_fut = operation.get_data(HeaderSet::new());
let mut data_fut = pin!(data_fut);
exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
exec.run_until_stalled(&mut data_fut)
.expect_pending("waiting for additional peer responses");
let response_headers2 = HeaderSet::from_header(Header::Body(vec![4, 5, 6]));
let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
expect_stream_pending(&mut exec, &mut remote);
reply(&mut remote, response2);
let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![7, 8, 9]));
let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
expect_stream_pending(&mut exec, &mut remote);
reply(&mut remote, response3);
let user_data = exec
.run_until_stalled(&mut data_fut)
.expect("received all responses")
.expect("valid user data");
assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
}
#[fuchsia::test]
fn client_disable_srm_mid_get_is_ignored() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(true);
let transport = manager.try_new_operation().expect("can start operation");
let mut operation = GetOperation::new(HeaderSet::new(), transport);
operation.set_started();
assert_eq!(operation.srm, SingleResponseMode::Enable);
{
let info_headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
let info_fut = operation.get_information(info_headers);
let mut info_fut = pin!(info_fut);
let received_headers = exec
.run_until_stalled(&mut info_fut)
.expect("ready without peer response")
.expect("successful request");
assert_eq!(received_headers, HeaderSet::new());
expect_request(&mut exec, &mut remote, expect_code(OpCode::Get));
}
assert_eq!(operation.srm, SingleResponseMode::Enable);
}
#[fuchsia::test]
fn get_operation_information_error() {
let mut exec = fasync::TestExecutor::new();
let (manager, _remote) = new_manager(false);
let initial = HeaderSet::from_header(Header::name("foo"));
let mut operation = setup_get_operation(&manager, initial);
operation.set_started();
let get_info_fut = operation.get_information(HeaderSet::new());
let mut get_info_fut = pin!(get_info_fut);
let get_info_result =
exec.run_until_stalled(&mut get_info_fut).expect("resolves with error");
assert_matches!(get_info_result, Err(Error::OperationError { .. }));
}
#[fuchsia::test]
fn get_operation_data_before_start_is_ok() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(false);
let initial = HeaderSet::from_header(Header::name("foo"));
let operation = setup_get_operation(&manager, initial);
let get_data_fut = operation.get_data(HeaderSet::new());
let mut get_data_fut = pin!(get_data_fut);
exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
let response_headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2, 3]));
let response = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers);
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::GetFinal);
let headers = HeaderSet::from(request);
assert!(headers.contains_header(&HeaderIdentifier::Name));
};
expect_request_and_reply(&mut exec, &mut remote, expectation, response);
let user_data = exec
.run_until_stalled(&mut get_data_fut)
.expect("received all responses")
.expect("valid user data");
assert_eq!(user_data, vec![1, 2, 3]);
}
#[fuchsia::test]
fn get_operation_data_before_start_with_srm_is_ok() {
let mut exec = fasync::TestExecutor::new();
let (manager, mut remote) = new_manager(true);
let operation = setup_get_operation(&manager, HeaderSet::new());
let get_data_fut = operation.get_data(HeaderSet::new());
let mut get_data_fut = pin!(get_data_fut);
exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
let response_headers1 = HeaderSet::from_headers(vec![
Header::Body(vec![1, 1]),
Header::SingleResponseMode(SingleResponseMode::Enable.into()),
])
.unwrap();
let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
let expectation = |request: RequestPacket| {
assert_eq!(*request.code(), OpCode::GetFinal);
let headers = HeaderSet::from(request);
assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
};
expect_request_and_reply(&mut exec, &mut remote, expectation, response1);
exec.run_until_stalled(&mut get_data_fut)
.expect_pending("waiting for additional peer responses");
let response_headers2 = HeaderSet::from_header(Header::Body(vec![2, 2]));
let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
expect_stream_pending(&mut exec, &mut remote);
reply(&mut remote, response2);
let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![3, 3]));
let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
expect_stream_pending(&mut exec, &mut remote);
reply(&mut remote, response3);
let user_data = exec
.run_until_stalled(&mut get_data_fut)
.expect("received all responses")
.expect("valid user data");
assert_eq!(user_data, vec![1, 1, 2, 2, 3, 3]);
}
#[fuchsia::test]
fn get_operation_data_peer_disconnect_is_error() {
let mut exec = fasync::TestExecutor::new();
let (manager, remote) = new_manager(false);
let initial = HeaderSet::from_header(Header::name("foo"));
let mut operation = setup_get_operation(&manager, initial);
operation.set_started();
drop(remote);
let get_data_fut = operation.get_data(HeaderSet::new());
let mut get_data_fut = pin!(get_data_fut);
let get_data_result =
exec.run_until_stalled(&mut get_data_fut).expect("resolves with error");
assert_matches!(get_data_result, Err(Error::IOError(_)));
}
#[fuchsia::test]
async fn get_operation_terminate_before_start_error() {
let (manager, _remote) = new_manager(false);
let initial = HeaderSet::from_header(Header::name("bar"));
let operation = setup_get_operation(&manager, initial);
let terminate_result = operation.terminate(HeaderSet::new()).await;
assert_matches!(terminate_result, Err(Error::OperationError { .. }));
}
#[fuchsia::test]
fn handle_get_response_success() {
let headers = HeaderSet::from_header(Header::name("foo"));
let response = ResponsePacket::new_no_data(ResponseCode::Continue, headers.clone());
let result = GetOperation::handle_get_response(response).expect("valid response");
assert_eq!(result, headers);
}
#[fuchsia::test]
fn handle_get_response_error() {
let headers = HeaderSet::from_header(Header::name("foo"));
let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers.clone());
assert_matches!(
GetOperation::handle_get_response(response1),
Err(Error::PeerRejected { .. })
);
let response1 = ResponsePacket::new_no_data(ResponseCode::NotFound, headers);
assert_matches!(
GetOperation::handle_get_response(response1),
Err(Error::PeerRejected { .. })
);
}
#[fuchsia::test]
fn handle_get_final_response_success() {
let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers);
let result1 = GetOperation::handle_get_final_response(response1).expect("valid response");
assert_eq!(result1, (true, HeaderSet::new(), vec![1, 2]));
let headers = HeaderSet::from_header(Header::Body(vec![1, 3, 5]));
let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, headers);
let result2 = GetOperation::handle_get_final_response(response2).expect("valid response");
assert_eq!(result2, (false, HeaderSet::new(), vec![1, 3, 5]));
}
#[fuchsia::test]
fn get_final_response_error() {
let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
let response1 = ResponsePacket::new_no_data(ResponseCode::Forbidden, headers);
assert_matches!(
GetOperation::handle_get_final_response(response1),
Err(Error::PeerRejected { .. })
);
let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
assert_matches!(
GetOperation::handle_get_final_response(response2),
Err(Error::Packet(PacketError::Data(_)))
);
let response3 = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
assert_matches!(
GetOperation::handle_get_final_response(response3),
Err(Error::Packet(PacketError::Data(_)))
);
}
}