1use log::trace;
6
7use crate::client::SrmOperation;
8use crate::error::Error;
9use crate::header::{HeaderSet, SingleResponseMode};
10use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket};
11use crate::transport::ObexTransport;
12
13#[must_use]
30#[derive(Debug)]
31pub struct GetOperation<'a> {
32 transport: ObexTransport<'a>,
34 headers: Option<HeaderSet>,
37 is_started: bool,
39 srm: SingleResponseMode,
42}
43
44impl<'a> GetOperation<'a> {
45 pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
46 let srm = transport.srm_supported().into();
47 Self { transport, headers: Some(headers), is_started: false, srm }
48 }
49
50 fn set_started(&mut self) {
51 let _ = self.headers.take().unwrap();
52 self.is_started = true;
53 }
54
55 fn update_headers_before_start(
58 &mut self,
59 application_headers: &mut HeaderSet,
60 ) -> Result<(), Error> {
61 if self.is_started {
62 return Ok(());
63 }
64 let initial = self.headers.replace(HeaderSet::new()).unwrap();
66 application_headers.try_append(initial)?;
67 self.try_enable_srm(application_headers)?;
69 Ok(())
70 }
71
72 fn handle_get_response(response: ResponsePacket) -> Result<HeaderSet, Error> {
75 response.expect_code(OpCode::Get, ResponseCode::Continue).map(Into::into)
76 }
77
78 fn handle_get_final_response(
81 response: ResponsePacket,
82 ) -> Result<(bool, HeaderSet, Vec<u8>), Error> {
83 if *response.code() == ResponseCode::Ok {
85 let mut headers = HeaderSet::from(response);
87 return headers.remove_body(true).map(|eob| (true, headers, eob));
88 }
89
90 let mut headers =
92 response.expect_code(OpCode::GetFinal, ResponseCode::Continue).map(HeaderSet::from)?;
93 headers.remove_body(false).map(|b| (false, headers, b))
95 }
96
97 pub async fn get_information(&mut self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
103 self.update_headers_before_start(&mut headers)?;
105
106 if headers.is_empty() {
108 return Err(Error::operation(OpCode::Get, "missing headers"));
109 }
110
111 let srm_active = self.is_started && self.get_srm() == SingleResponseMode::Enable;
113
114 let request = RequestPacket::new_get(headers);
115 trace!(request:?; "Making outgoing GET request");
116 self.transport.send(request)?;
117 trace!("Successfully made GET request");
118
119 let response_headers = if !srm_active {
121 let response = self.transport.receive_response(OpCode::Get).await?;
122 Self::handle_get_response(response)?
123 } else {
124 HeaderSet::new()
125 };
126 if !self.is_started {
127 self.check_response_for_srm(&response_headers);
128 self.set_started();
129 }
130 Ok(response_headers)
131 }
132
133 pub async fn get_data(mut self, mut headers: HeaderSet) -> Result<Vec<u8>, Error> {
138 self.update_headers_before_start(&mut headers)?;
140
141 let mut request = RequestPacket::new_get_final(headers);
142 let mut first_request = true;
143 let mut body = vec![];
144 loop {
145 if first_request || self.srm != SingleResponseMode::Enable {
148 trace!(request:?; "Making outgoing GET final request");
149 self.transport.send(request.clone())?;
150 trace!("Successfully made GET final request");
151 request = RequestPacket::new_get_final(HeaderSet::new());
153 first_request = false;
154 }
155 let response = self.transport.receive_response(OpCode::GetFinal).await?;
156 let (final_packet, response_headers, mut response_body) =
157 Self::handle_get_final_response(response)?;
158 body.append(&mut response_body);
159
160 if !self.is_started {
162 self.check_response_for_srm(&response_headers);
163 self.set_started();
164 }
165
166 if final_packet {
167 trace!("Found terminal GET final packet");
168 break;
169 }
170 }
171 Ok(body)
172 }
173
174 pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
179 let opcode = OpCode::Abort;
180 if !self.is_started {
181 return Err(Error::operation(opcode, "can't abort when not started"));
182 }
183
184 let request = RequestPacket::new_abort(headers);
185 trace!(request:?; "Making outgoing {opcode:?} request");
186 self.transport.send(request)?;
187 trace!("Successfully made {opcode:?} request");
188 let response = self.transport.receive_response(opcode).await?;
189 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
190 }
191}
192
193impl SrmOperation for GetOperation<'_> {
194 const OPERATION_TYPE: OpCode = OpCode::Get;
195
196 fn get_srm(&self) -> SingleResponseMode {
197 self.srm
198 }
199
200 fn set_srm(&mut self, mode: SingleResponseMode) {
201 self.srm = mode;
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 use assert_matches::assert_matches;
210 use async_test_helpers::expect_stream_pending;
211 use async_utils::PollExt;
212 use fuchsia_async as fasync;
213 use std::pin::pin;
214
215 use crate::error::PacketError;
216 use crate::header::{Header, HeaderIdentifier};
217 use crate::transport::test_utils::{
218 expect_code, expect_request, expect_request_and_reply, new_manager, reply,
219 };
220 use crate::transport::ObexTransportManager;
221
222 fn setup_get_operation(mgr: &ObexTransportManager, initial: HeaderSet) -> GetOperation<'_> {
223 let transport = mgr.try_new_operation().expect("can start operation");
224 GetOperation::new(initial, transport)
225 }
226
227 #[fuchsia::test]
228 fn get_operation() {
229 let mut exec = fasync::TestExecutor::new();
230 let (manager, mut remote) = new_manager(false);
231 let mut operation = setup_get_operation(&manager, HeaderSet::new());
232 assert!(!operation.is_started);
233
234 {
237 let info_headers = HeaderSet::from_header(Header::name("text"));
238 let info_fut = operation.get_information(info_headers);
239 let mut info_fut = pin!(info_fut);
240 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
241 let response_headers = HeaderSet::from_header(Header::name("bar"));
242 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
243 let expectation = |request: RequestPacket| {
244 assert_eq!(*request.code(), OpCode::Get);
245 let headers = HeaderSet::from(request);
246 assert!(headers.contains_header(&HeaderIdentifier::Name));
247 };
248 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
249 let received_headers = exec
250 .run_until_stalled(&mut info_fut)
251 .expect("response received")
252 .expect("valid response");
253 assert!(received_headers.contains_header(&HeaderIdentifier::Name));
254 }
255 assert!(operation.is_started);
256
257 {
260 let info_headers = HeaderSet::from_header(Header::Type("file".into()));
261 let info_fut = operation.get_information(info_headers);
262 let mut info_fut = pin!(info_fut);
263 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
264 let response_headers = HeaderSet::from_header(Header::Description("big file".into()));
265 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
266 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Get), response);
267 let received_headers = exec
268 .run_until_stalled(&mut info_fut)
269 .expect("response received")
270 .expect("valid response");
271 assert!(received_headers.contains_header(&HeaderIdentifier::Description));
272 }
273
274 let data_fut = operation.get_data(HeaderSet::new());
278 let mut data_fut = pin!(data_fut);
279 exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
280 let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
281 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
282 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
283 exec.run_until_stalled(&mut data_fut)
284 .expect_pending("waiting for additional peer responses");
285 let response_headers2 = HeaderSet::from_header(Header::EndOfBody(vec![4, 5, 6]));
287 let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers2);
288 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response2);
289 let user_data = exec
291 .run_until_stalled(&mut data_fut)
292 .expect("received all responses")
293 .expect("valid user data");
294 assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6]);
295 }
296
297 #[fuchsia::test]
298 fn get_operation_terminate_success() {
299 let mut exec = fasync::TestExecutor::new();
300 let (manager, mut remote) = new_manager(false);
301 let initial = HeaderSet::from_header(Header::name("foo"));
302 let mut operation = setup_get_operation(&manager, initial);
303
304 operation.set_started();
306
307 let headers = HeaderSet::from_header(Header::name("terminated"));
309 let terminate_fut = operation.terminate(headers);
310 let mut terminate_fut = pin!(terminate_fut);
311 let _ =
312 exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for peer response");
313 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
314 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
315 }
316
317 #[fuchsia::test]
318 fn get_operation_srm() {
319 let mut exec = fasync::TestExecutor::new();
320 let (manager, mut remote) = new_manager(true);
321 let mut operation = setup_get_operation(&manager, HeaderSet::new());
322
323 {
326 let info_headers = HeaderSet::from_header(Header::name("foo"));
327 let info_fut = operation.get_information(info_headers);
328 let mut info_fut = pin!(info_fut);
329 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
330 let response_headers = HeaderSet::from_headers(vec![
331 Header::name("bar"),
332 SingleResponseMode::Enable.into(),
333 ])
334 .unwrap();
335 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
336 let expectation = |request: RequestPacket| {
337 assert_eq!(*request.code(), OpCode::Get);
338 let headers = HeaderSet::from(request);
339 assert!(headers.contains_header(&HeaderIdentifier::Name));
340 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
341 };
342 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
343 let _received_headers = exec
344 .run_until_stalled(&mut info_fut)
345 .expect("response received")
346 .expect("valid response");
347 }
348 assert!(operation.is_started);
349 assert_eq!(operation.srm, SingleResponseMode::Enable);
350
351 {
355 let info_headers = HeaderSet::from_header(Header::Type("file".into()));
356 let info_fut = operation.get_information(info_headers);
357 let mut info_fut = pin!(info_fut);
358 let received_headers = exec
359 .run_until_stalled(&mut info_fut)
360 .expect("ready without peer response")
361 .expect("successful request");
362 assert_eq!(received_headers, HeaderSet::new());
363 let expectation = |request: RequestPacket| {
364 assert_eq!(*request.code(), OpCode::Get);
365 let headers = HeaderSet::from(request);
366 assert!(headers.contains_header(&HeaderIdentifier::Type));
367 assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
368 };
369 expect_request(&mut exec, &mut remote, expectation);
370 }
371
372 let data_fut = operation.get_data(HeaderSet::new());
375 let mut data_fut = pin!(data_fut);
376 exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
377 let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
378 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
379 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
380 exec.run_until_stalled(&mut data_fut)
381 .expect_pending("waiting for additional peer responses");
382 let response_headers2 = HeaderSet::from_header(Header::Body(vec![4, 5, 6]));
384 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
385 expect_stream_pending(&mut exec, &mut remote);
386 reply(&mut remote, response2);
387 let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![7, 8, 9]));
389 let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
390 expect_stream_pending(&mut exec, &mut remote);
391 reply(&mut remote, response3);
392 let user_data = exec
394 .run_until_stalled(&mut data_fut)
395 .expect("received all responses")
396 .expect("valid user data");
397 assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
398 }
399
400 #[fuchsia::test]
401 fn client_disable_srm_mid_get_is_ignored() {
402 let mut exec = fasync::TestExecutor::new();
403 let (manager, mut remote) = new_manager(true);
404 let transport = manager.try_new_operation().expect("can start operation");
405 let mut operation = GetOperation::new(HeaderSet::new(), transport);
406 operation.set_started();
408 assert_eq!(operation.srm, SingleResponseMode::Enable);
409
410 {
413 let info_headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
414 let info_fut = operation.get_information(info_headers);
415 let mut info_fut = pin!(info_fut);
416 let received_headers = exec
417 .run_until_stalled(&mut info_fut)
418 .expect("ready without peer response")
419 .expect("successful request");
420 assert_eq!(received_headers, HeaderSet::new());
421 expect_request(&mut exec, &mut remote, expect_code(OpCode::Get));
422 }
423 assert_eq!(operation.srm, SingleResponseMode::Enable);
424 }
425
426 #[fuchsia::test]
427 fn get_operation_information_error() {
428 let mut exec = fasync::TestExecutor::new();
429 let (manager, _remote) = new_manager(false);
430 let initial = HeaderSet::from_header(Header::name("foo"));
431 let mut operation = setup_get_operation(&manager, initial);
432
433 operation.set_started();
435
436 let get_info_fut = operation.get_information(HeaderSet::new());
438 let mut get_info_fut = pin!(get_info_fut);
439 let get_info_result =
440 exec.run_until_stalled(&mut get_info_fut).expect("resolves with error");
441 assert_matches!(get_info_result, Err(Error::OperationError { .. }));
442 }
443
444 #[fuchsia::test]
445 fn get_operation_data_before_start_is_ok() {
446 let mut exec = fasync::TestExecutor::new();
447 let (manager, mut remote) = new_manager(false);
448 let initial = HeaderSet::from_header(Header::name("foo"));
449 let operation = setup_get_operation(&manager, initial);
450
451 let get_data_fut = operation.get_data(HeaderSet::new());
453 let mut get_data_fut = pin!(get_data_fut);
454 exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
455 let response_headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2, 3]));
456 let response = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers);
457 let expectation = |request: RequestPacket| {
458 assert_eq!(*request.code(), OpCode::GetFinal);
459 let headers = HeaderSet::from(request);
460 assert!(headers.contains_header(&HeaderIdentifier::Name));
461 };
462 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
463 let user_data = exec
464 .run_until_stalled(&mut get_data_fut)
465 .expect("received all responses")
466 .expect("valid user data");
467 assert_eq!(user_data, vec![1, 2, 3]);
468 }
469
470 #[fuchsia::test]
471 fn get_operation_data_before_start_with_srm_is_ok() {
472 let mut exec = fasync::TestExecutor::new();
473 let (manager, mut remote) = new_manager(true);
474 let operation = setup_get_operation(&manager, HeaderSet::new());
475
476 let get_data_fut = operation.get_data(HeaderSet::new());
478 let mut get_data_fut = pin!(get_data_fut);
479 exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
480 let response_headers1 = HeaderSet::from_headers(vec![
481 Header::Body(vec![1, 1]),
482 Header::SingleResponseMode(SingleResponseMode::Enable.into()),
483 ])
484 .unwrap();
485 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
486 let expectation = |request: RequestPacket| {
487 assert_eq!(*request.code(), OpCode::GetFinal);
488 let headers = HeaderSet::from(request);
489 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
490 };
491 expect_request_and_reply(&mut exec, &mut remote, expectation, response1);
492 exec.run_until_stalled(&mut get_data_fut)
493 .expect_pending("waiting for additional peer responses");
494
495 let response_headers2 = HeaderSet::from_header(Header::Body(vec![2, 2]));
497 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
498 expect_stream_pending(&mut exec, &mut remote);
499 reply(&mut remote, response2);
500
501 let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![3, 3]));
503 let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
504 expect_stream_pending(&mut exec, &mut remote);
505 reply(&mut remote, response3);
506 let user_data = exec
508 .run_until_stalled(&mut get_data_fut)
509 .expect("received all responses")
510 .expect("valid user data");
511 assert_eq!(user_data, vec![1, 1, 2, 2, 3, 3]);
512 }
513
514 #[fuchsia::test]
515 fn get_operation_data_peer_disconnect_is_error() {
516 let mut exec = fasync::TestExecutor::new();
517 let (manager, remote) = new_manager(false);
518 let initial = HeaderSet::from_header(Header::name("foo"));
519 let mut operation = setup_get_operation(&manager, initial);
520 operation.set_started();
522
523 drop(remote);
525 let get_data_fut = operation.get_data(HeaderSet::new());
526 let mut get_data_fut = pin!(get_data_fut);
527 let get_data_result =
528 exec.run_until_stalled(&mut get_data_fut).expect("resolves with error");
529 assert_matches!(get_data_result, Err(Error::IOError(_)));
530 }
531
532 #[fuchsia::test]
533 async fn get_operation_terminate_before_start_error() {
534 let (manager, _remote) = new_manager(false);
535 let initial = HeaderSet::from_header(Header::name("bar"));
536 let operation = setup_get_operation(&manager, initial);
537
538 let terminate_result = operation.terminate(HeaderSet::new()).await;
540 assert_matches!(terminate_result, Err(Error::OperationError { .. }));
541 }
542
543 #[fuchsia::test]
544 fn handle_get_response_success() {
545 let headers = HeaderSet::from_header(Header::name("foo"));
546 let response = ResponsePacket::new_no_data(ResponseCode::Continue, headers.clone());
547 let result = GetOperation::handle_get_response(response).expect("valid response");
548 assert_eq!(result, headers);
549 }
550
551 #[fuchsia::test]
552 fn handle_get_response_error() {
553 let headers = HeaderSet::from_header(Header::name("foo"));
554 let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers.clone());
556 assert_matches!(
557 GetOperation::handle_get_response(response1),
558 Err(Error::PeerRejected { .. })
559 );
560
561 let response1 = ResponsePacket::new_no_data(ResponseCode::NotFound, headers);
563 assert_matches!(
564 GetOperation::handle_get_response(response1),
565 Err(Error::PeerRejected { .. })
566 );
567 }
568
569 #[fuchsia::test]
570 fn handle_get_final_response_success() {
571 let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
572 let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers);
573 let result1 = GetOperation::handle_get_final_response(response1).expect("valid response");
574 assert_eq!(result1, (true, HeaderSet::new(), vec![1, 2]));
575
576 let headers = HeaderSet::from_header(Header::Body(vec![1, 3, 5]));
577 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, headers);
578 let result2 = GetOperation::handle_get_final_response(response2).expect("valid response");
579 assert_eq!(result2, (false, HeaderSet::new(), vec![1, 3, 5]));
580 }
581
582 #[fuchsia::test]
583 fn get_final_response_error() {
584 let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
586 let response1 = ResponsePacket::new_no_data(ResponseCode::Forbidden, headers);
587 assert_matches!(
588 GetOperation::handle_get_final_response(response1),
589 Err(Error::PeerRejected { .. })
590 );
591
592 let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
594 assert_matches!(
595 GetOperation::handle_get_final_response(response2),
596 Err(Error::Packet(PacketError::Data(_)))
597 );
598
599 let response3 = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
601 assert_matches!(
602 GetOperation::handle_get_final_response(response3),
603 Err(Error::Packet(PacketError::Data(_)))
604 );
605 }
606}