1use futures::SinkExt;
6use log::trace;
7
8use crate::client::SrmOperation;
9use crate::error::Error;
10use crate::header::{Header, HeaderIdentifier, HeaderSet, SingleResponseMode};
11use crate::operation::{OpCode, RequestPacket, ResponseCode};
12use crate::transport::ObexTransport;
13
14#[derive(Debug)]
16enum Status {
17 NotStarted(HeaderSet),
21 Started,
23}
24
25#[must_use]
41#[derive(Debug)]
42pub struct PutOperation<'a> {
43 transport: ObexTransport<'a>,
45 status: Status,
47 srm: SingleResponseMode,
50}
51
52impl<'a> PutOperation<'a> {
53 pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
54 let srm = transport.srm_supported().into();
55 Self { transport, status: Status::NotStarted(headers), srm }
56 }
57
58 fn is_started(&self) -> bool {
61 match self.status {
62 Status::NotStarted(_) => false,
63 Status::Started => true,
64 }
65 }
66
67 fn set_started(&mut self) -> Result<(), Error> {
69 match std::mem::replace(&mut self.status, Status::Started) {
70 Status::NotStarted(_) => Ok(()),
71 Status::Started => {
72 Err(Error::other("Attempted to start a PUT operation that was already started"))
73 }
74 }
75 }
76
77 fn combine_with_initial_headers(&mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
80 let mut initial_headers = match &mut self.status {
81 Status::NotStarted(initial_headers) => std::mem::take(initial_headers),
82 Status::Started => {
83 return Err(Error::other(
84 "Cannot add initial headers when PUT operation already started",
85 ));
86 }
87 };
88 let _ = initial_headers.try_append(headers)?;
89 Ok(initial_headers)
90 }
91
92 fn validate_headers(headers: &HeaderSet) -> Result<(), Error> {
94 if headers.contains_header(&HeaderIdentifier::Body) {
95 return Err(Error::operation(OpCode::Put, "info headers can't contain body"));
96 }
97 if headers.contains_header(&HeaderIdentifier::EndOfBody) {
98 return Err(Error::operation(OpCode::Put, "info headers can't contain end of body"));
99 }
100 Ok(())
101 }
102
103 async fn do_put(&mut self, final_: bool, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
106 let is_started = self.is_started();
107 if !is_started {
108 headers = self.combine_with_initial_headers(headers)?;
109 }
110
111 let srm_active = is_started && self.get_srm() == SingleResponseMode::Enable;
113 let (opcode, request, expected_response_code) = if final_ {
114 (OpCode::PutFinal, RequestPacket::new_put_final(headers), ResponseCode::Ok)
115 } else {
116 (OpCode::Put, RequestPacket::new_put(headers), ResponseCode::Continue)
117 };
118 trace!("Making outgoing PUT request: {request:?}");
119 self.transport.send(request).await?;
120 trace!("Successfully made PUT request");
121 if !is_started {
122 self.set_started()?;
123 }
124 if final_ || !srm_active {
127 let response = self.transport.receive_response(opcode).await?;
128 response.expect_code(opcode, expected_response_code).map(Into::into)
129 } else {
130 Ok(HeaderSet::new())
131 }
132 }
133
134 pub async fn delete(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
138 Self::validate_headers(&headers)?;
139 self.do_put(true, headers).await
142 }
143
144 pub async fn write(&mut self, data: &[u8], mut headers: HeaderSet) -> Result<HeaderSet, Error> {
150 Self::validate_headers(&headers)?;
151 let is_first_write = !self.is_started();
152 if is_first_write {
153 self.try_enable_srm(&mut headers)?;
155 }
156 headers.add(Header::Body(data.to_vec()))?;
157 let response_headers = self.do_put(false, headers).await?;
158 if is_first_write {
159 self.check_response_for_srm(&response_headers);
160 }
161 Ok(response_headers)
162 }
163
164 pub async fn write_final(
170 mut self,
171 data: &[u8],
172 mut headers: HeaderSet,
173 ) -> Result<HeaderSet, Error> {
174 Self::validate_headers(&headers)?;
175 headers.add(Header::EndOfBody(data.to_vec()))?;
176 self.do_put(true, headers).await
177 }
178
179 pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
184 let opcode = OpCode::Abort;
185 if !self.is_started() {
186 return Err(Error::operation(opcode, "can't abort PUT that hasn't started"));
187 }
188 let request = RequestPacket::new_abort(headers);
189 trace!(request:?; "Making outgoing {opcode:?} request");
190 self.transport.send(request).await?;
191 trace!("Successfully made {opcode:?} request");
192 let response = self.transport.receive_response(opcode).await?;
193 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
194 }
195}
196
197impl SrmOperation for PutOperation<'_> {
198 const OPERATION_TYPE: OpCode = OpCode::Put;
199
200 fn get_srm(&self) -> SingleResponseMode {
201 self.srm
202 }
203
204 fn set_srm(&mut self, mode: SingleResponseMode) {
205 self.srm = mode;
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 use assert_matches::assert_matches;
214 use async_utils::PollExt;
215 use fuchsia_async as fasync;
216 use std::pin::pin;
217
218 use crate::header::ConnectionIdentifier;
219 use crate::operation::ResponsePacket;
220 use crate::transport::ObexTransportManager;
221 use crate::transport::test_utils::{
222 expect_code, expect_request, expect_request_and_reply, new_manager,
223 };
224
225 fn setup_put_operation(
226 mgr: &ObexTransportManager,
227 initial_headers: Vec<Header>,
228 ) -> PutOperation<'_> {
229 let transport = mgr.try_new_operation().expect("can start operation");
230 PutOperation::new(HeaderSet::from_headers(initial_headers).unwrap(), transport)
231 }
232
233 #[fuchsia::test]
234 fn put_operation_single_chunk_is_ok() {
235 let mut exec = fasync::TestExecutor::new();
236 let (manager, mut remote) = new_manager(false);
237 let operation =
238 setup_put_operation(&manager, vec![Header::ConnectionId(0x1u32.try_into().unwrap())]);
239
240 let payload = vec![5, 6, 7, 8, 9];
241 let headers =
242 HeaderSet::from_headers(vec![Header::Type("file".into()), Header::name("foobar.txt")])
243 .unwrap();
244 let put_fut = operation.write_final(&payload[..], headers);
245 let mut put_fut = pin!(put_fut);
246 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
247 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
248 let expectation = |request: RequestPacket| {
249 assert_eq!(*request.code(), OpCode::PutFinal);
250 let headers = HeaderSet::from(request);
251 assert!(headers.contains_header(&HeaderIdentifier::ConnectionId));
252 assert!(!headers.contains_header(&HeaderIdentifier::Body));
253 assert!(headers.contains_headers(&vec![
254 HeaderIdentifier::EndOfBody,
255 HeaderIdentifier::Type,
256 HeaderIdentifier::Name
257 ]));
258 };
259 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
260 let _received_headers = exec
261 .run_until_stalled(&mut put_fut)
262 .expect("response received")
263 .expect("valid response");
264 }
265
266 #[fuchsia::test]
267 fn put_operation_multiple_chunks_is_ok() {
268 let mut exec = fasync::TestExecutor::new();
269 let (manager, mut remote) = new_manager(false);
270 let mut operation = setup_put_operation(&manager, vec![]);
271
272 let payload: Vec<u8> = (1..100).collect();
273 for chunk in payload.chunks(20) {
274 let put_fut = operation.write(&chunk[..], HeaderSet::new());
275 let mut put_fut = pin!(put_fut);
276 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
277 let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
278 let expectation = |request: RequestPacket| {
279 assert_eq!(*request.code(), OpCode::Put);
280 let headers = HeaderSet::from(request);
281 assert!(headers.contains_header(&HeaderIdentifier::Body));
282 };
283 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
284 let _received_headers = exec
285 .run_until_stalled(&mut put_fut)
286 .expect("response received")
287 .expect("valid response");
288 }
289
290 let put_final_fut = operation.write_final(&[], HeaderSet::new());
292 let mut put_final_fut = pin!(put_final_fut);
293 let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
294 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
295 let expectation = |request: RequestPacket| {
296 assert_eq!(*request.code(), OpCode::PutFinal);
297 let headers = HeaderSet::from(request);
298 assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
299 };
300 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
301 let _ = exec
302 .run_until_stalled(&mut put_final_fut)
303 .expect("response received")
304 .expect("valid response");
305 }
306
307 #[fuchsia::test]
308 fn put_operation_delete_is_ok() {
309 let mut exec = fasync::TestExecutor::new();
310 let (manager, mut remote) = new_manager(false);
311 let operation = setup_put_operation(&manager, vec![]);
312
313 let headers = HeaderSet::from_headers(vec![
314 Header::Description("deleting file".into()),
315 Header::name("foobar.txt"),
316 ])
317 .unwrap();
318 let put_fut = operation.delete(headers);
319 let mut put_fut = pin!(put_fut);
320 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
321 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
322 let expectation = |request: RequestPacket| {
323 assert_eq!(*request.code(), OpCode::PutFinal);
324 let headers = HeaderSet::from(request);
325 assert!(!headers.contains_header(&HeaderIdentifier::Body));
326 assert!(!headers.contains_header(&HeaderIdentifier::EndOfBody));
327 };
328 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
329 let _ = exec
330 .run_until_stalled(&mut put_fut)
331 .expect("response received")
332 .expect("valid response");
333 }
334
335 #[fuchsia::test]
336 fn put_operation_terminate_success() {
337 let mut exec = fasync::TestExecutor::new();
338 let (manager, mut remote) = new_manager(false);
339 let mut operation = setup_put_operation(&manager, vec![]);
340
341 {
343 let put_fut = operation.write(&[1, 2, 3, 4, 5], HeaderSet::new());
344 let mut put_fut = pin!(put_fut);
345 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
346 let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
347 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Put), response);
348 let _received_headers = exec
349 .run_until_stalled(&mut put_fut)
350 .expect("response received")
351 .expect("valid response");
352 }
353
354 let terminate_fut = operation.terminate(HeaderSet::new());
356 let mut terminate_fut = pin!(terminate_fut);
357 let _ = exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for response");
358 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
359 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
360 let _received_headers = exec
361 .run_until_stalled(&mut terminate_fut)
362 .expect("response received")
363 .expect("valid response");
364 }
365
366 #[fuchsia::test]
367 async fn put_with_body_header_is_error() {
368 let (manager, _remote) = new_manager(false);
369 let mut operation = setup_put_operation(&manager, vec![]);
370
371 let payload = vec![1, 2, 3];
372 let body_headers = HeaderSet::from_headers(vec![
375 Header::Body(payload.clone()),
376 Header::name("foobar.txt"),
377 ])
378 .unwrap();
379 let result = operation.write(&payload[..], body_headers.clone()).await;
380 assert_matches!(result, Err(Error::OperationError { .. }));
381
382 let eob_headers = HeaderSet::from_headers(vec![
384 Header::EndOfBody(payload.clone()),
385 Header::name("foobar1.txt"),
386 ])
387 .unwrap();
388 let result = operation.write(&payload[..], eob_headers.clone()).await;
389 assert_matches!(result, Err(Error::OperationError { .. }));
390 }
391
392 #[fuchsia::test]
393 async fn delete_with_body_header_is_error() {
394 let (manager, _remote) = new_manager(false);
395
396 let payload = vec![1, 2, 3];
397 let operation = setup_put_operation(&manager, vec![]);
399 let body_headers = HeaderSet::from_headers(vec![
400 Header::Body(payload.clone()),
401 Header::name("foobar.txt"),
402 ])
403 .unwrap();
404 let result = operation.delete(body_headers).await;
405 assert_matches!(result, Err(Error::OperationError { .. }));
406
407 let operation = setup_put_operation(&manager, vec![]);
409 let eob_headers = HeaderSet::from_headers(vec![
410 Header::EndOfBody(payload.clone()),
411 Header::name("foobar1.txt"),
412 ])
413 .unwrap();
414 let result = operation.delete(eob_headers).await;
415 assert_matches!(result, Err(Error::OperationError { .. }));
416 }
417
418 #[fuchsia::test]
419 async fn put_operation_terminate_before_start_error() {
420 let (manager, _remote) = new_manager(false);
421 let operation = setup_put_operation(&manager, vec![]);
422
423 let headers = HeaderSet::from_header(Header::Description("terminating test".into()));
425 let terminate_result = operation.terminate(headers).await;
426 assert_matches!(terminate_result, Err(Error::OperationError { .. }));
427 }
428
429 #[fuchsia::test]
430 fn put_operation_srm_enabled_is_ok() {
431 let mut exec = fasync::TestExecutor::new();
432 let (manager, mut remote) = new_manager(true);
433 let mut operation = setup_put_operation(&manager, vec![]);
434
435 {
436 let first_buf = [1, 2, 3];
437 let put_fut = operation.write(&first_buf[..], HeaderSet::new());
439 let mut put_fut = pin!(put_fut);
440 let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
441
442 let expectation = |request: RequestPacket| {
445 assert_eq!(*request.code(), OpCode::Put);
446 let headers = HeaderSet::from(request);
447 assert!(headers.contains_header(&HeaderIdentifier::Body));
448 assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
449 };
450 let response_headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
451 let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
452 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
453 let _received_headers = exec
454 .run_until_stalled(&mut put_fut)
455 .expect("response received")
456 .expect("valid response");
457 }
458 assert_eq!(operation.srm, SingleResponseMode::Enable);
460 {
462 let second_buf = [4, 5, 6];
463 let put_fut2 = operation.write(&second_buf[..], HeaderSet::new());
464 let mut put_fut2 = pin!(put_fut2);
465 let _ = exec
466 .run_until_stalled(&mut put_fut2)
467 .expect("ready without peer response")
468 .expect("success");
469 let expectation = |request: RequestPacket| {
470 assert_eq!(*request.code(), OpCode::Put);
471 let headers = HeaderSet::from(request);
472 assert!(headers.contains_header(&HeaderIdentifier::Body));
473 assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
474 };
475 expect_request(&mut exec, &mut remote, expectation);
476 }
477
478 let put_final_fut = operation.write_final(&[], HeaderSet::new());
480 let mut put_final_fut = pin!(put_final_fut);
481 let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
482 let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
483 let expectation = |request: RequestPacket| {
484 assert_eq!(*request.code(), OpCode::PutFinal);
485 let headers = HeaderSet::from(request);
486 assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
487 };
488 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
489 let _ = exec
490 .run_until_stalled(&mut put_final_fut)
491 .expect("response received")
492 .expect("valid response");
493 }
494
495 #[fuchsia::test]
496 fn client_disable_srm_mid_operation_is_ignored() {
497 let mut exec = fasync::TestExecutor::new();
498 let (manager, mut remote) = new_manager(true);
499 let mut operation = setup_put_operation(&manager, vec![]);
500 if let Status::NotStarted(_) = &mut operation.status {
502 let _ = operation.set_started().unwrap();
503 } else {
504 panic!("At this point operation not started");
505 };
506 assert_eq!(operation.srm, SingleResponseMode::Enable);
508
509 {
511 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
512 let put_fut = operation.write(&[], headers);
513 let mut put_fut = pin!(put_fut);
514 let _ = exec
515 .run_until_stalled(&mut put_fut)
516 .expect("ready without peer response")
517 .expect("success");
518 let expectation = |request: RequestPacket| {
519 assert_eq!(*request.code(), OpCode::Put);
520 };
521 expect_request(&mut exec, &mut remote, expectation);
522 }
523 assert_eq!(operation.srm, SingleResponseMode::Enable);
525 }
526
527 #[fuchsia::test]
528 fn application_select_srm_success() {
529 let _exec = fasync::TestExecutor::new();
530 let (manager, _remote) = new_manager(false);
531 let mut operation = setup_put_operation(&manager, vec![]);
532 assert_eq!(operation.srm, SingleResponseMode::Disable);
533 let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
535 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
536 assert_eq!(operation.srm, SingleResponseMode::Disable);
537
538 let (manager, _remote) = new_manager(true);
540 let mut operation = setup_put_operation(&manager, vec![]);
541 assert_eq!(operation.srm, SingleResponseMode::Enable);
542 let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
543 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
544 assert_eq!(operation.srm, SingleResponseMode::Disable);
545
546 let (manager, _remote) = new_manager(true);
548 let mut operation = setup_put_operation(&manager, vec![]);
549 assert_eq!(operation.srm, SingleResponseMode::Enable);
550 let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
551 assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
552 assert_eq!(operation.srm, SingleResponseMode::Enable);
553 }
554
555 #[fuchsia::test]
556 fn application_enable_srm_when_not_supported_is_error() {
557 let _exec = fasync::TestExecutor::new();
558 let (manager, _remote) = new_manager(false);
559 let mut operation = setup_put_operation(&manager, vec![]);
560 assert_eq!(operation.srm, SingleResponseMode::Disable);
561 let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
562 assert_matches!(operation.try_enable_srm(&mut headers), Err(Error::SrmNotSupported));
563 assert_eq!(operation.srm, SingleResponseMode::Disable);
564 }
565
566 #[fuchsia::test]
567 fn peer_srm_response() {
568 let _exec = fasync::TestExecutor::new();
569 let (manager, _remote) = new_manager(false);
570 let mut operation = setup_put_operation(&manager, vec![]);
571 let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
573 operation.check_response_for_srm(&headers);
574 assert_eq!(operation.srm, SingleResponseMode::Disable);
575 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
577 operation.check_response_for_srm(&headers);
578 assert_eq!(operation.srm, SingleResponseMode::Disable);
579
580 let (manager, _remote) = new_manager(true);
581 let mut operation = setup_put_operation(&manager, vec![]);
582 let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
584 operation.check_response_for_srm(&headers);
585 assert_eq!(operation.srm, SingleResponseMode::Enable);
586 let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
588 operation.check_response_for_srm(&headers);
589 assert_eq!(operation.srm, SingleResponseMode::Disable);
590
591 let (manager, _remote) = new_manager(true);
592 let mut operation = setup_put_operation(&manager, vec![]);
593 operation.check_response_for_srm(&HeaderSet::new());
595 assert_eq!(operation.srm, SingleResponseMode::Disable);
596 }
597
598 #[fuchsia::test]
599 fn put_with_connection_id_already_set_is_error() {
600 let mut exec = fasync::TestExecutor::new();
601 let (manager, _remote) = new_manager(false);
602 let mut operation =
604 setup_put_operation(&manager, vec![Header::ConnectionId(ConnectionIdentifier(5))]);
605
606 let write_headers = HeaderSet::from_header(Header::ConnectionId(ConnectionIdentifier(10)));
607 let write_fut = operation.write(&[1, 2, 3], write_headers);
608 let mut write_fut = pin!(write_fut);
609 let result = exec.run_until_stalled(&mut write_fut).expect("finished with error");
610 assert_matches!(result, Err(Error::AlreadyExists(_)));
611 }
612}