1use futures::SinkExt;
6use log::trace;
7
8use crate::client::SrmOperation;
9use crate::error::Error;
10use crate::header::{HeaderSet, SingleResponseMode};
11use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket};
12use crate::transport::ObexTransport;
13
14#[must_use]
31#[derive(Debug)]
32pub struct GetOperation<'a> {
33 transport: ObexTransport<'a>,
35 headers: Option<HeaderSet>,
38 is_started: bool,
40 srm: SingleResponseMode,
43}
44
45impl<'a> GetOperation<'a> {
46 pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
47 let srm = transport.srm_supported().into();
48 Self { transport, headers: Some(headers), is_started: false, srm }
49 }
50
51 fn set_started(&mut self) {
52 let _ = self.headers.take().unwrap();
53 self.is_started = true;
54 }
55
56 fn update_headers_before_start(
59 &mut self,
60 application_headers: &mut HeaderSet,
61 ) -> Result<(), Error> {
62 if self.is_started {
63 return Ok(());
64 }
65 let initial = self.headers.replace(HeaderSet::new()).unwrap();
67 application_headers.try_append(initial)?;
68 self.try_enable_srm(application_headers)?;
70 Ok(())
71 }
72
73 fn handle_get_response(response: ResponsePacket) -> Result<HeaderSet, Error> {
76 response.expect_code(OpCode::Get, ResponseCode::Continue).map(Into::into)
77 }
78
79 fn handle_get_final_response(
82 response: ResponsePacket,
83 ) -> Result<(bool, HeaderSet, Vec<u8>), Error> {
84 if *response.code() == ResponseCode::Ok {
86 let mut headers = HeaderSet::from(response);
88 return headers.remove_body(true).map(|eob| (true, headers, eob));
89 }
90
91 let mut headers =
93 response.expect_code(OpCode::GetFinal, ResponseCode::Continue).map(HeaderSet::from)?;
94 headers.remove_body(false).map(|b| (false, headers, b))
96 }
97
98 pub async fn get_information(&mut self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
104 self.update_headers_before_start(&mut headers)?;
106
107 if headers.is_empty() {
109 return Err(Error::operation(OpCode::Get, "missing headers"));
110 }
111
112 let srm_active = self.is_started && self.get_srm() == SingleResponseMode::Enable;
114
115 let request = RequestPacket::new_get(headers);
116 trace!(request:?; "Making outgoing GET request");
117 self.transport.send(request).await?;
118 trace!("Successfully made GET request");
119
120 let response_headers = if !srm_active {
122 let response = self.transport.receive_response(OpCode::Get).await?;
123 Self::handle_get_response(response)?
124 } else {
125 HeaderSet::new()
126 };
127 if !self.is_started {
128 self.check_response_for_srm(&response_headers);
129 self.set_started();
130 }
131 Ok(response_headers)
132 }
133
134 pub async fn get_data(mut self, mut headers: HeaderSet) -> Result<Vec<u8>, Error> {
139 self.update_headers_before_start(&mut headers)?;
141
142 let mut request = RequestPacket::new_get_final(headers);
143 let mut first_request = true;
144 let mut body = vec![];
145 loop {
146 if first_request || self.srm != SingleResponseMode::Enable {
149 trace!(request:?; "Making outgoing GET final request");
150 self.transport.send(request.clone()).await?;
151 trace!("Successfully made GET final request");
152 request = RequestPacket::new_get_final(HeaderSet::new());
154 first_request = false;
155 }
156 let response = self.transport.receive_response(OpCode::GetFinal).await?;
157 let (final_packet, response_headers, mut response_body) =
158 Self::handle_get_final_response(response)?;
159 body.append(&mut response_body);
160
161 if !self.is_started {
163 self.check_response_for_srm(&response_headers);
164 self.set_started();
165 }
166
167 if final_packet {
168 trace!("Found terminal GET final packet");
169 break;
170 }
171 }
172 Ok(body)
173 }
174
175 pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
180 let opcode = OpCode::Abort;
181 if !self.is_started {
182 return Err(Error::operation(opcode, "can't abort when not started"));
183 }
184
185 let request = RequestPacket::new_abort(headers);
186 trace!(request:?; "Making outgoing {opcode:?} request");
187 self.transport.send(request).await?;
188 trace!("Successfully made {opcode:?} request");
189 let response = self.transport.receive_response(opcode).await?;
190 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
191 }
192}
193
194impl SrmOperation for GetOperation<'_> {
195 const OPERATION_TYPE: OpCode = OpCode::Get;
196
197 fn get_srm(&self) -> SingleResponseMode {
198 self.srm
199 }
200
201 fn set_srm(&mut self, mode: SingleResponseMode) {
202 self.srm = mode;
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 use assert_matches::assert_matches;
211 use async_test_helpers::expect_stream_pending;
212 use async_utils::PollExt;
213 use fuchsia_async as fasync;
214 use std::pin::pin;
215
216 use crate::error::PacketError;
217 use crate::header::{Header, HeaderIdentifier};
218 use crate::transport::ObexTransportManager;
219 use crate::transport::test_utils::{
220 expect_code, expect_request, expect_request_and_reply, new_manager, reply,
221 };
222
223 fn setup_get_operation(mgr: &ObexTransportManager, initial: HeaderSet) -> GetOperation<'_> {
224 let transport = mgr.try_new_operation().expect("can start operation");
225 GetOperation::new(initial, transport)
226 }
227
228 #[fuchsia::test]
229 fn get_operation() {
230 let mut exec = fasync::TestExecutor::new();
231 let (manager, mut remote) = new_manager(false);
232 let mut operation = setup_get_operation(&manager, HeaderSet::new());
233 assert!(!operation.is_started);
234
235 {
238 let info_headers = HeaderSet::from_header(Header::name("text"));
239 let info_fut = operation.get_information(info_headers);
240 let mut info_fut = pin!(info_fut);
241 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
242 let response_headers = HeaderSet::from_header(Header::name("bar"));
243 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
244 let expectation = |request: RequestPacket| {
245 assert_eq!(*request.code(), OpCode::Get);
246 let headers = HeaderSet::from(request);
247 assert!(headers.contains_header(&HeaderIdentifier::Name));
248 };
249 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
250 let received_headers = exec
251 .run_until_stalled(&mut info_fut)
252 .expect("response received")
253 .expect("valid response");
254 assert!(received_headers.contains_header(&HeaderIdentifier::Name));
255 }
256 assert!(operation.is_started);
257
258 {
261 let info_headers = HeaderSet::from_header(Header::Type("file".into()));
262 let info_fut = operation.get_information(info_headers);
263 let mut info_fut = pin!(info_fut);
264 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
265 let response_headers = HeaderSet::from_header(Header::Description("big file".into()));
266 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
267 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Get), response);
268 let received_headers = exec
269 .run_until_stalled(&mut info_fut)
270 .expect("response received")
271 .expect("valid response");
272 assert!(received_headers.contains_header(&HeaderIdentifier::Description));
273 }
274
275 let data_fut = operation.get_data(HeaderSet::new());
279 let mut data_fut = pin!(data_fut);
280 exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
281 let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
282 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
283 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
284 exec.run_until_stalled(&mut data_fut)
285 .expect_pending("waiting for additional peer responses");
286 let response_headers2 = HeaderSet::from_header(Header::EndOfBody(vec![4, 5, 6]));
288 let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers2);
289 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response2);
290 let user_data = exec
292 .run_until_stalled(&mut data_fut)
293 .expect("received all responses")
294 .expect("valid user data");
295 assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6]);
296 }
297
298 #[fuchsia::test]
299 fn get_operation_terminate_success() {
300 let mut exec = fasync::TestExecutor::new();
301 let (manager, mut remote) = new_manager(false);
302 let initial = HeaderSet::from_header(Header::name("foo"));
303 let mut operation = setup_get_operation(&manager, initial);
304
305 operation.set_started();
307
308 let headers = HeaderSet::from_header(Header::name("terminated"));
310 let terminate_fut = operation.terminate(headers);
311 let mut terminate_fut = pin!(terminate_fut);
312 let _ =
313 exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for peer response");
314 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
315 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
316 }
317
318 #[fuchsia::test]
319 fn get_operation_srm() {
320 let mut exec = fasync::TestExecutor::new();
321 let (manager, mut remote) = new_manager(true);
322 let mut operation = setup_get_operation(&manager, HeaderSet::new());
323
324 {
327 let info_headers = HeaderSet::from_header(Header::name("foo"));
328 let info_fut = operation.get_information(info_headers);
329 let mut info_fut = pin!(info_fut);
330 exec.run_until_stalled(&mut info_fut).expect_pending("waiting for peer response");
331 let response_headers = HeaderSet::from_headers(vec![
332 Header::name("bar"),
333 SingleResponseMode::Enable.into(),
334 ])
335 .unwrap();
336 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
337 let expectation = |request: RequestPacket| {
338 assert_eq!(*request.code(), OpCode::Get);
339 let headers = HeaderSet::from(request);
340 assert!(headers.contains_header(&HeaderIdentifier::Name));
341 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
342 };
343 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
344 let _received_headers = exec
345 .run_until_stalled(&mut info_fut)
346 .expect("response received")
347 .expect("valid response");
348 }
349 assert!(operation.is_started);
350 assert_eq!(operation.srm, SingleResponseMode::Enable);
351
352 {
356 let info_headers = HeaderSet::from_header(Header::Type("file".into()));
357 let info_fut = operation.get_information(info_headers);
358 let mut info_fut = pin!(info_fut);
359 let received_headers = exec
360 .run_until_stalled(&mut info_fut)
361 .expect("ready without peer response")
362 .expect("successful request");
363 assert_eq!(received_headers, HeaderSet::new());
364 let expectation = |request: RequestPacket| {
365 assert_eq!(*request.code(), OpCode::Get);
366 let headers = HeaderSet::from(request);
367 assert!(headers.contains_header(&HeaderIdentifier::Type));
368 assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
369 };
370 expect_request(&mut exec, &mut remote, expectation);
371 }
372
373 let data_fut = operation.get_data(HeaderSet::new());
376 let mut data_fut = pin!(data_fut);
377 exec.run_until_stalled(&mut data_fut).expect_pending("waiting for peer response");
378 let response_headers1 = HeaderSet::from_header(Header::Body(vec![1, 2, 3]));
379 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
380 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::GetFinal), response1);
381 exec.run_until_stalled(&mut data_fut)
382 .expect_pending("waiting for additional peer responses");
383 let response_headers2 = HeaderSet::from_header(Header::Body(vec![4, 5, 6]));
385 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
386 expect_stream_pending(&mut exec, &mut remote);
387 reply(&mut exec, &mut remote, response2);
388 let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![7, 8, 9]));
390 let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
391 expect_stream_pending(&mut exec, &mut remote);
392 reply(&mut exec, &mut remote, response3);
393 let user_data = exec
395 .run_until_stalled(&mut data_fut)
396 .expect("received all responses")
397 .expect("valid user data");
398 assert_eq!(user_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
399 }
400
401 #[fuchsia::test]
402 fn client_disable_srm_mid_get_is_ignored() {
403 let mut exec = fasync::TestExecutor::new();
404 let (manager, mut remote) = new_manager(true);
405 let transport = manager.try_new_operation().expect("can start operation");
406 let mut operation = GetOperation::new(HeaderSet::new(), transport);
407 operation.set_started();
409 assert_eq!(operation.srm, SingleResponseMode::Enable);
410
411 {
414 let info_headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
415 let info_fut = operation.get_information(info_headers);
416 let mut info_fut = pin!(info_fut);
417 let received_headers = exec
418 .run_until_stalled(&mut info_fut)
419 .expect("ready without peer response")
420 .expect("successful request");
421 assert_eq!(received_headers, HeaderSet::new());
422 expect_request(&mut exec, &mut remote, expect_code(OpCode::Get));
423 }
424 assert_eq!(operation.srm, SingleResponseMode::Enable);
425 }
426
427 #[fuchsia::test]
428 fn get_operation_information_error() {
429 let mut exec = fasync::TestExecutor::new();
430 let (manager, _remote) = new_manager(false);
431 let initial = HeaderSet::from_header(Header::name("foo"));
432 let mut operation = setup_get_operation(&manager, initial);
433
434 operation.set_started();
436
437 let get_info_fut = operation.get_information(HeaderSet::new());
439 let mut get_info_fut = pin!(get_info_fut);
440 let get_info_result =
441 exec.run_until_stalled(&mut get_info_fut).expect("resolves with error");
442 assert_matches!(get_info_result, Err(Error::OperationError { .. }));
443 }
444
445 #[fuchsia::test]
446 fn get_operation_data_before_start_is_ok() {
447 let mut exec = fasync::TestExecutor::new();
448 let (manager, mut remote) = new_manager(false);
449 let initial = HeaderSet::from_header(Header::name("foo"));
450 let operation = setup_get_operation(&manager, initial);
451
452 let get_data_fut = operation.get_data(HeaderSet::new());
454 let mut get_data_fut = pin!(get_data_fut);
455 exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
456 let response_headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2, 3]));
457 let response = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers);
458 let expectation = |request: RequestPacket| {
459 assert_eq!(*request.code(), OpCode::GetFinal);
460 let headers = HeaderSet::from(request);
461 assert!(headers.contains_header(&HeaderIdentifier::Name));
462 };
463 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
464 let user_data = exec
465 .run_until_stalled(&mut get_data_fut)
466 .expect("received all responses")
467 .expect("valid user data");
468 assert_eq!(user_data, vec![1, 2, 3]);
469 }
470
471 #[fuchsia::test]
472 fn get_operation_data_before_start_with_srm_is_ok() {
473 let mut exec = fasync::TestExecutor::new();
474 let (manager, mut remote) = new_manager(true);
475 let operation = setup_get_operation(&manager, HeaderSet::new());
476
477 let get_data_fut = operation.get_data(HeaderSet::new());
479 let mut get_data_fut = pin!(get_data_fut);
480 exec.run_until_stalled(&mut get_data_fut).expect_pending("waiting for peer response");
481 let response_headers1 = HeaderSet::from_headers(vec![
482 Header::Body(vec![1, 1]),
483 Header::SingleResponseMode(SingleResponseMode::Enable.into()),
484 ])
485 .unwrap();
486 let response1 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers1);
487 let expectation = |request: RequestPacket| {
488 assert_eq!(*request.code(), OpCode::GetFinal);
489 let headers = HeaderSet::from(request);
490 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
491 };
492 expect_request_and_reply(&mut exec, &mut remote, expectation, response1);
493 exec.run_until_stalled(&mut get_data_fut)
494 .expect_pending("waiting for additional peer responses");
495
496 let response_headers2 = HeaderSet::from_header(Header::Body(vec![2, 2]));
498 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers2);
499 expect_stream_pending(&mut exec, &mut remote);
500 reply(&mut exec, &mut remote, response2);
501
502 let response_headers3 = HeaderSet::from_header(Header::EndOfBody(vec![3, 3]));
504 let response3 = ResponsePacket::new_no_data(ResponseCode::Ok, response_headers3);
505 expect_stream_pending(&mut exec, &mut remote);
506 reply(&mut exec, &mut remote, response3);
507 let user_data = exec
509 .run_until_stalled(&mut get_data_fut)
510 .expect("received all responses")
511 .expect("valid user data");
512 assert_eq!(user_data, vec![1, 1, 2, 2, 3, 3]);
513 }
514
515 #[fuchsia::test]
516 fn get_operation_data_peer_disconnect_is_error() {
517 let mut exec = fasync::TestExecutor::new();
518 let (manager, remote) = new_manager(false);
519 let initial = HeaderSet::from_header(Header::name("foo"));
520 let mut operation = setup_get_operation(&manager, initial);
521 operation.set_started();
523
524 drop(remote);
526 let get_data_fut = operation.get_data(HeaderSet::new());
527 let mut get_data_fut = pin!(get_data_fut);
528 let get_data_result =
529 exec.run_until_stalled(&mut get_data_fut).expect("resolves with error");
530 assert_matches!(get_data_result, Err(Error::IOError(_)));
531 }
532
533 #[fuchsia::test]
534 async fn get_operation_terminate_before_start_error() {
535 let (manager, _remote) = new_manager(false);
536 let initial = HeaderSet::from_header(Header::name("bar"));
537 let operation = setup_get_operation(&manager, initial);
538
539 let terminate_result = operation.terminate(HeaderSet::new()).await;
541 assert_matches!(terminate_result, Err(Error::OperationError { .. }));
542 }
543
544 #[fuchsia::test]
545 fn handle_get_response_success() {
546 let headers = HeaderSet::from_header(Header::name("foo"));
547 let response = ResponsePacket::new_no_data(ResponseCode::Continue, headers.clone());
548 let result = GetOperation::handle_get_response(response).expect("valid response");
549 assert_eq!(result, headers);
550 }
551
552 #[fuchsia::test]
553 fn handle_get_response_error() {
554 let headers = HeaderSet::from_header(Header::name("foo"));
555 let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers.clone());
557 assert_matches!(
558 GetOperation::handle_get_response(response1),
559 Err(Error::PeerRejected { .. })
560 );
561
562 let response1 = ResponsePacket::new_no_data(ResponseCode::NotFound, headers);
564 assert_matches!(
565 GetOperation::handle_get_response(response1),
566 Err(Error::PeerRejected { .. })
567 );
568 }
569
570 #[fuchsia::test]
571 fn handle_get_final_response_success() {
572 let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
573 let response1 = ResponsePacket::new_no_data(ResponseCode::Ok, headers);
574 let result1 = GetOperation::handle_get_final_response(response1).expect("valid response");
575 assert_eq!(result1, (true, HeaderSet::new(), vec![1, 2]));
576
577 let headers = HeaderSet::from_header(Header::Body(vec![1, 3, 5]));
578 let response2 = ResponsePacket::new_no_data(ResponseCode::Continue, headers);
579 let result2 = GetOperation::handle_get_final_response(response2).expect("valid response");
580 assert_eq!(result2, (false, HeaderSet::new(), vec![1, 3, 5]));
581 }
582
583 #[fuchsia::test]
584 fn get_final_response_error() {
585 let headers = HeaderSet::from_header(Header::EndOfBody(vec![1, 2]));
587 let response1 = ResponsePacket::new_no_data(ResponseCode::Forbidden, headers);
588 assert_matches!(
589 GetOperation::handle_get_final_response(response1),
590 Err(Error::PeerRejected { .. })
591 );
592
593 let response2 = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
595 assert_matches!(
596 GetOperation::handle_get_final_response(response2),
597 Err(Error::Packet(PacketError::Data(_)))
598 );
599
600 let response3 = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
602 assert_matches!(
603 GetOperation::handle_get_final_response(response3),
604 Err(Error::Packet(PacketError::Data(_)))
605 );
606 }
607}