1use log::trace;
6
7use crate::client::SrmOperation;
8use crate::error::Error;
9use crate::header::{Header, HeaderIdentifier, HeaderSet, SingleResponseMode};
10use crate::operation::{OpCode, RequestPacket, ResponseCode};
11use crate::transport::ObexTransport;
12
13#[derive(Debug)]
15enum Status {
16 NotStarted(HeaderSet),
20 Started,
22}
23
24#[must_use]
40#[derive(Debug)]
41pub struct PutOperation<'a> {
42 transport: ObexTransport<'a>,
44 status: Status,
46 srm: SingleResponseMode,
49}
50
51impl<'a> PutOperation<'a> {
52 pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
53 let srm = transport.srm_supported().into();
54 Self { transport, status: Status::NotStarted(headers), srm }
55 }
56
57 fn is_started(&self) -> bool {
60 match self.status {
61 Status::NotStarted(_) => false,
62 Status::Started => true,
63 }
64 }
65
66 fn set_started(&mut self) -> Result<(), Error> {
68 match std::mem::replace(&mut self.status, Status::Started) {
69 Status::NotStarted(_) => Ok(()),
70 Status::Started => {
71 Err(Error::other("Attempted to start a PUT operation that was already started"))
72 }
73 }
74 }
75
76 fn combine_with_initial_headers(&mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
79 let mut initial_headers = match &mut self.status {
80 Status::NotStarted(ref mut initial_headers) => std::mem::take(initial_headers),
81 Status::Started => {
82 return Err(Error::other(
83 "Cannot add initial headers when PUT operation already started",
84 ))
85 }
86 };
87 let _ = initial_headers.try_append(headers)?;
88 Ok(initial_headers)
89 }
90
91 fn validate_headers(headers: &HeaderSet) -> Result<(), Error> {
93 if headers.contains_header(&HeaderIdentifier::Body) {
94 return Err(Error::operation(OpCode::Put, "info headers can't contain body"));
95 }
96 if headers.contains_header(&HeaderIdentifier::EndOfBody) {
97 return Err(Error::operation(OpCode::Put, "info headers can't contain end of body"));
98 }
99 Ok(())
100 }
101
102 async fn do_put(&mut self, final_: bool, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
105 let is_started = self.is_started();
106 if !is_started {
107 headers = self.combine_with_initial_headers(headers)?;
108 }
109
110 let srm_active = is_started && self.get_srm() == SingleResponseMode::Enable;
112 let (opcode, request, expected_response_code) = if final_ {
113 (OpCode::PutFinal, RequestPacket::new_put_final(headers), ResponseCode::Ok)
114 } else {
115 (OpCode::Put, RequestPacket::new_put(headers), ResponseCode::Continue)
116 };
117 trace!("Making outgoing PUT request: {request:?}");
118 self.transport.send(request)?;
119 trace!("Successfully made PUT request");
120 if !is_started {
121 self.set_started()?;
122 }
123 if final_ || !srm_active {
126 let response = self.transport.receive_response(opcode).await?;
127 response.expect_code(opcode, expected_response_code).map(Into::into)
128 } else {
129 Ok(HeaderSet::new())
130 }
131 }
132
133 pub async fn delete(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
137 Self::validate_headers(&headers)?;
138 self.do_put(true, headers).await
141 }
142
143 pub async fn write(&mut self, data: &[u8], mut headers: HeaderSet) -> Result<HeaderSet, Error> {
149 Self::validate_headers(&headers)?;
150 let is_first_write = !self.is_started();
151 if is_first_write {
152 self.try_enable_srm(&mut headers)?;
154 }
155 headers.add(Header::Body(data.to_vec()))?;
156 let response_headers = self.do_put(false, headers).await?;
157 if is_first_write {
158 self.check_response_for_srm(&response_headers);
159 }
160 Ok(response_headers)
161 }
162
163 pub async fn write_final(
169 mut self,
170 data: &[u8],
171 mut headers: HeaderSet,
172 ) -> Result<HeaderSet, Error> {
173 Self::validate_headers(&headers)?;
174 headers.add(Header::EndOfBody(data.to_vec()))?;
175 self.do_put(true, headers).await
176 }
177
178 pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
183 let opcode = OpCode::Abort;
184 if !self.is_started() {
185 return Err(Error::operation(opcode, "can't abort PUT that hasn't started"));
186 }
187 let request = RequestPacket::new_abort(headers);
188 trace!(request:?; "Making outgoing {opcode:?} request");
189 self.transport.send(request)?;
190 trace!("Successfully made {opcode:?} request");
191 let response = self.transport.receive_response(opcode).await?;
192 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
193 }
194}
195
196impl SrmOperation for PutOperation<'_> {
197 const OPERATION_TYPE: OpCode = OpCode::Put;
198
199 fn get_srm(&self) -> SingleResponseMode {
200 self.srm
201 }
202
203 fn set_srm(&mut self, mode: SingleResponseMode) {
204 self.srm = mode;
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 use assert_matches::assert_matches;
213 use async_utils::PollExt;
214 use fuchsia_async as fasync;
215 use std::pin::pin;
216
217 use crate::header::ConnectionIdentifier;
218 use crate::operation::ResponsePacket;
219 use crate::transport::test_utils::{
220 expect_code, expect_request, expect_request_and_reply, new_manager,
221 };
222 use crate::transport::ObexTransportManager;
223
224 fn setup_put_operation(
225 mgr: &ObexTransportManager,
226 initial_headers: Vec<Header>,
227 ) -> PutOperation<'_> {
228 let transport = mgr.try_new_operation().expect("can start operation");
229 PutOperation::new(HeaderSet::from_headers(initial_headers).unwrap(), transport)
230 }
231
232 #[fuchsia::test]
233 fn put_operation_single_chunk_is_ok() {
234 let mut exec = fasync::TestExecutor::new();
235 let (manager, mut remote) = new_manager(false);
236 let operation =
237 setup_put_operation(&manager, vec![Header::ConnectionId(0x1u32.try_into().unwrap())]);
238
239 let payload = vec![5, 6, 7, 8, 9];
240 let headers =
241 HeaderSet::from_headers(vec![Header::Type("file".into()), Header::name("foobar.txt")])
242 .unwrap();
243 let put_fut = operation.write_final(&payload[..], headers);
244 let mut put_fut = pin!(put_fut);
245 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
246 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
247 let expectation = |request: RequestPacket| {
248 assert_eq!(*request.code(), OpCode::PutFinal);
249 let headers = HeaderSet::from(request);
250 assert!(headers.contains_header(&HeaderIdentifier::ConnectionId));
251 assert!(!headers.contains_header(&HeaderIdentifier::Body));
252 assert!(headers.contains_headers(&vec![
253 HeaderIdentifier::EndOfBody,
254 HeaderIdentifier::Type,
255 HeaderIdentifier::Name
256 ]));
257 };
258 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
259 let _received_headers = exec
260 .run_until_stalled(&mut put_fut)
261 .expect("response received")
262 .expect("valid response");
263 }
264
265 #[fuchsia::test]
266 fn put_operation_multiple_chunks_is_ok() {
267 let mut exec = fasync::TestExecutor::new();
268 let (manager, mut remote) = new_manager(false);
269 let mut operation = setup_put_operation(&manager, vec![]);
270
271 let payload: Vec<u8> = (1..100).collect();
272 for chunk in payload.chunks(20) {
273 let put_fut = operation.write(&chunk[..], HeaderSet::new());
274 let mut put_fut = pin!(put_fut);
275 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
276 let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
277 let expectation = |request: RequestPacket| {
278 assert_eq!(*request.code(), OpCode::Put);
279 let headers = HeaderSet::from(request);
280 assert!(headers.contains_header(&HeaderIdentifier::Body));
281 };
282 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
283 let _received_headers = exec
284 .run_until_stalled(&mut put_fut)
285 .expect("response received")
286 .expect("valid response");
287 }
288
289 let put_final_fut = operation.write_final(&[], HeaderSet::new());
291 let mut put_final_fut = pin!(put_final_fut);
292 let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
293 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
294 let expectation = |request: RequestPacket| {
295 assert_eq!(*request.code(), OpCode::PutFinal);
296 let headers = HeaderSet::from(request);
297 assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
298 };
299 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
300 let _ = exec
301 .run_until_stalled(&mut put_final_fut)
302 .expect("response received")
303 .expect("valid response");
304 }
305
306 #[fuchsia::test]
307 fn put_operation_delete_is_ok() {
308 let mut exec = fasync::TestExecutor::new();
309 let (manager, mut remote) = new_manager(false);
310 let operation = setup_put_operation(&manager, vec![]);
311
312 let headers = HeaderSet::from_headers(vec![
313 Header::Description("deleting file".into()),
314 Header::name("foobar.txt"),
315 ])
316 .unwrap();
317 let put_fut = operation.delete(headers);
318 let mut put_fut = pin!(put_fut);
319 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
320 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
321 let expectation = |request: RequestPacket| {
322 assert_eq!(*request.code(), OpCode::PutFinal);
323 let headers = HeaderSet::from(request);
324 assert!(!headers.contains_header(&HeaderIdentifier::Body));
325 assert!(!headers.contains_header(&HeaderIdentifier::EndOfBody));
326 };
327 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
328 let _ = exec
329 .run_until_stalled(&mut put_fut)
330 .expect("response received")
331 .expect("valid response");
332 }
333
334 #[fuchsia::test]
335 fn put_operation_terminate_success() {
336 let mut exec = fasync::TestExecutor::new();
337 let (manager, mut remote) = new_manager(false);
338 let mut operation = setup_put_operation(&manager, vec![]);
339
340 {
342 let put_fut = operation.write(&[1, 2, 3, 4, 5], HeaderSet::new());
343 let mut put_fut = pin!(put_fut);
344 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
345 let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
346 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Put), response);
347 let _received_headers = exec
348 .run_until_stalled(&mut put_fut)
349 .expect("response received")
350 .expect("valid response");
351 }
352
353 let terminate_fut = operation.terminate(HeaderSet::new());
355 let mut terminate_fut = pin!(terminate_fut);
356 let _ = exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for response");
357 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
358 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
359 let _received_headers = exec
360 .run_until_stalled(&mut terminate_fut)
361 .expect("response received")
362 .expect("valid response");
363 }
364
365 #[fuchsia::test]
366 async fn put_with_body_header_is_error() {
367 let (manager, _remote) = new_manager(false);
368 let mut operation = setup_put_operation(&manager, vec![]);
369
370 let payload = vec![1, 2, 3];
371 let body_headers = HeaderSet::from_headers(vec![
374 Header::Body(payload.clone()),
375 Header::name("foobar.txt"),
376 ])
377 .unwrap();
378 let result = operation.write(&payload[..], body_headers.clone()).await;
379 assert_matches!(result, Err(Error::OperationError { .. }));
380
381 let eob_headers = HeaderSet::from_headers(vec![
383 Header::EndOfBody(payload.clone()),
384 Header::name("foobar1.txt"),
385 ])
386 .unwrap();
387 let result = operation.write(&payload[..], eob_headers.clone()).await;
388 assert_matches!(result, Err(Error::OperationError { .. }));
389 }
390
391 #[fuchsia::test]
392 async fn delete_with_body_header_is_error() {
393 let (manager, _remote) = new_manager(false);
394
395 let payload = vec![1, 2, 3];
396 let operation = setup_put_operation(&manager, vec![]);
398 let body_headers = HeaderSet::from_headers(vec![
399 Header::Body(payload.clone()),
400 Header::name("foobar.txt"),
401 ])
402 .unwrap();
403 let result = operation.delete(body_headers).await;
404 assert_matches!(result, Err(Error::OperationError { .. }));
405
406 let operation = setup_put_operation(&manager, vec![]);
408 let eob_headers = HeaderSet::from_headers(vec![
409 Header::EndOfBody(payload.clone()),
410 Header::name("foobar1.txt"),
411 ])
412 .unwrap();
413 let result = operation.delete(eob_headers).await;
414 assert_matches!(result, Err(Error::OperationError { .. }));
415 }
416
417 #[fuchsia::test]
418 async fn put_operation_terminate_before_start_error() {
419 let (manager, _remote) = new_manager(false);
420 let operation = setup_put_operation(&manager, vec![]);
421
422 let headers = HeaderSet::from_header(Header::Description("terminating test".into()));
424 let terminate_result = operation.terminate(headers).await;
425 assert_matches!(terminate_result, Err(Error::OperationError { .. }));
426 }
427
428 #[fuchsia::test]
429 fn put_operation_srm_enabled_is_ok() {
430 let mut exec = fasync::TestExecutor::new();
431 let (manager, mut remote) = new_manager(true);
432 let mut operation = setup_put_operation(&manager, vec![]);
433
434 {
435 let first_buf = [1, 2, 3];
436 let put_fut = operation.write(&first_buf[..], HeaderSet::new());
438 let mut put_fut = pin!(put_fut);
439 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
440
441 let expectation = |request: RequestPacket| {
444 assert_eq!(*request.code(), OpCode::Put);
445 let headers = HeaderSet::from(request);
446 assert!(headers.contains_header(&HeaderIdentifier::Body));
447 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
448 };
449 let response_headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
450 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
451 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
452 let _received_headers = exec
453 .run_until_stalled(&mut put_fut)
454 .expect("response received")
455 .expect("valid response");
456 }
457 assert_eq!(operation.srm, SingleResponseMode::Enable);
459 {
461 let second_buf = [4, 5, 6];
462 let put_fut2 = operation.write(&second_buf[..], HeaderSet::new());
463 let mut put_fut2 = pin!(put_fut2);
464 let _ = exec
465 .run_until_stalled(&mut put_fut2)
466 .expect("ready without peer response")
467 .expect("success");
468 let expectation = |request: RequestPacket| {
469 assert_eq!(*request.code(), OpCode::Put);
470 let headers = HeaderSet::from(request);
471 assert!(headers.contains_header(&HeaderIdentifier::Body));
472 assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
473 };
474 expect_request(&mut exec, &mut remote, expectation);
475 }
476
477 let put_final_fut = operation.write_final(&[], HeaderSet::new());
479 let mut put_final_fut = pin!(put_final_fut);
480 let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
481 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
482 let expectation = |request: RequestPacket| {
483 assert_eq!(*request.code(), OpCode::PutFinal);
484 let headers = HeaderSet::from(request);
485 assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
486 };
487 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
488 let _ = exec
489 .run_until_stalled(&mut put_final_fut)
490 .expect("response received")
491 .expect("valid response");
492 }
493
494 #[fuchsia::test]
495 fn client_disable_srm_mid_operation_is_ignored() {
496 let mut exec = fasync::TestExecutor::new();
497 let (manager, mut remote) = new_manager(true);
498 let mut operation = setup_put_operation(&manager, vec![]);
499 if let Status::NotStarted(_) = &mut operation.status {
501 let _ = operation.set_started().unwrap();
502 } else {
503 panic!("At this point operation not started");
504 };
505 assert_eq!(operation.srm, SingleResponseMode::Enable);
507
508 {
510 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
511 let put_fut = operation.write(&[], headers);
512 let mut put_fut = pin!(put_fut);
513 let _ = exec
514 .run_until_stalled(&mut put_fut)
515 .expect("ready without peer response")
516 .expect("success");
517 let expectation = |request: RequestPacket| {
518 assert_eq!(*request.code(), OpCode::Put);
519 };
520 expect_request(&mut exec, &mut remote, expectation);
521 }
522 assert_eq!(operation.srm, SingleResponseMode::Enable);
524 }
525
526 #[fuchsia::test]
527 fn application_select_srm_success() {
528 let _exec = fasync::TestExecutor::new();
529 let (manager, _remote) = new_manager(false);
530 let mut operation = setup_put_operation(&manager, vec![]);
531 assert_eq!(operation.srm, SingleResponseMode::Disable);
532 let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
534 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
535 assert_eq!(operation.srm, SingleResponseMode::Disable);
536
537 let (manager, _remote) = new_manager(true);
539 let mut operation = setup_put_operation(&manager, vec![]);
540 assert_eq!(operation.srm, SingleResponseMode::Enable);
541 let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
542 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
543 assert_eq!(operation.srm, SingleResponseMode::Disable);
544
545 let (manager, _remote) = new_manager(true);
547 let mut operation = setup_put_operation(&manager, vec![]);
548 assert_eq!(operation.srm, SingleResponseMode::Enable);
549 let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
550 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
551 assert_eq!(operation.srm, SingleResponseMode::Enable);
552 }
553
554 #[fuchsia::test]
555 fn application_enable_srm_when_not_supported_is_error() {
556 let _exec = fasync::TestExecutor::new();
557 let (manager, _remote) = new_manager(false);
558 let mut operation = setup_put_operation(&manager, vec![]);
559 assert_eq!(operation.srm, SingleResponseMode::Disable);
560 let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
561 assert_matches!(operation.try_enable_srm(&mut headers), Err(Error::SrmNotSupported));
562 assert_eq!(operation.srm, SingleResponseMode::Disable);
563 }
564
565 #[fuchsia::test]
566 fn peer_srm_response() {
567 let _exec = fasync::TestExecutor::new();
568 let (manager, _remote) = new_manager(false);
569 let mut operation = setup_put_operation(&manager, vec![]);
570 let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
572 operation.check_response_for_srm(&headers);
573 assert_eq!(operation.srm, SingleResponseMode::Disable);
574 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
576 operation.check_response_for_srm(&headers);
577 assert_eq!(operation.srm, SingleResponseMode::Disable);
578
579 let (manager, _remote) = new_manager(true);
580 let mut operation = setup_put_operation(&manager, vec![]);
581 let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
583 operation.check_response_for_srm(&headers);
584 assert_eq!(operation.srm, SingleResponseMode::Enable);
585 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
587 operation.check_response_for_srm(&headers);
588 assert_eq!(operation.srm, SingleResponseMode::Disable);
589
590 let (manager, _remote) = new_manager(true);
591 let mut operation = setup_put_operation(&manager, vec![]);
592 operation.check_response_for_srm(&HeaderSet::new());
594 assert_eq!(operation.srm, SingleResponseMode::Disable);
595 }
596
597 #[fuchsia::test]
598 fn put_with_connection_id_already_set_is_error() {
599 let mut exec = fasync::TestExecutor::new();
600 let (manager, _remote) = new_manager(false);
601 let mut operation =
603 setup_put_operation(&manager, vec![Header::ConnectionId(ConnectionIdentifier(5))]);
604
605 let write_headers = HeaderSet::from_header(Header::ConnectionId(ConnectionIdentifier(10)));
606 let write_fut = operation.write(&[1, 2, 3], write_headers);
607 let mut write_fut = pin!(write_fut);
608 let result = exec.run_until_stalled(&mut write_fut).expect("finished with error");
609 assert_matches!(result, Err(Error::AlreadyExists(_)));
610 }
611}