Skip to main content

bt_obex/client/
put.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::SinkExt;
6use log::trace;
7
8use crate::client::SrmOperation;
9use crate::error::Error;
10use crate::header::{Header, HeaderIdentifier, HeaderSet, SingleResponseMode};
11use crate::operation::{OpCode, RequestPacket, ResponseCode};
12use crate::transport::ObexTransport;
13
14/// Represents the status of the PUT operation.
15#[derive(Debug)]
16enum Status {
17    /// First write call has not been completed yet.
18    /// Holds the initial headers that need to be included in the
19    /// first write call.
20    NotStarted(HeaderSet),
21    /// First write has been completed and the operation is ongoing.
22    Started,
23}
24
25/// Represents an in-progress PUT Operation.
26/// Defined in OBEX 1.5 Section 3.4.3.
27///
28/// Example Usage:
29/// ```
30/// let obex_client = ObexClient::new(..);
31/// let put_operation = obex_client.put()?;
32/// let user_data: Vec<u8> = vec![];
33/// for user_data_chunk in user_data.chunks(50) {
34///   let received_headers = put_operation.write(&user_data_chunk[..], HeaderSet::new()).await?;
35/// }
36/// // `PutOperation::write_final` must be called before it is dropped. An empty payload is OK.
37/// let final_headers = put_operation.write_final(&[], HeaderSet::new()).await?;
38/// // PUT operation is complete and `put_operation` is consumed.
39/// ```
40#[must_use]
41#[derive(Debug)]
42pub struct PutOperation<'a> {
43    /// The L2CAP or RFCOMM connection to the remote peer.
44    transport: ObexTransport<'a>,
45    /// Status of the operation.
46    status: Status,
47    /// The status of SRM for this operation. By default, SRM will be enabled if the transport
48    /// supports it. However, it may be disabled if the peer requests to disable it.
49    srm: SingleResponseMode,
50}
51
52impl<'a> PutOperation<'a> {
53    pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
54        let srm = transport.srm_supported().into();
55        Self { transport, status: Status::NotStarted(headers), srm }
56    }
57
58    /// Returns true by checking whether the initial headers were taken
59    /// out for the first put operation.
60    fn is_started(&self) -> bool {
61        match self.status {
62            Status::NotStarted(_) => false,
63            Status::Started => true,
64        }
65    }
66
67    /// Sets the operation as started.
68    fn set_started(&mut self) -> Result<(), Error> {
69        match std::mem::replace(&mut self.status, Status::Started) {
70            Status::NotStarted(_) => Ok(()),
71            Status::Started => {
72                Err(Error::other("Attempted to start a PUT operation that was already started"))
73            }
74        }
75    }
76
77    /// Returns the HeaderSet that takes the initial headers and
78    /// combines them with the input headers.
79    fn combine_with_initial_headers(&mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
80        let mut initial_headers = match &mut self.status {
81            Status::NotStarted(initial_headers) => std::mem::take(initial_headers),
82            Status::Started => {
83                return Err(Error::other(
84                    "Cannot add initial headers when PUT operation already started",
85                ));
86            }
87        };
88        let _ = initial_headers.try_append(headers)?;
89        Ok(initial_headers)
90    }
91
92    /// Returns Error if the `headers` contain non-informational OBEX Headers.
93    fn validate_headers(headers: &HeaderSet) -> Result<(), Error> {
94        if headers.contains_header(&HeaderIdentifier::Body) {
95            return Err(Error::operation(OpCode::Put, "info headers can't contain body"));
96        }
97        if headers.contains_header(&HeaderIdentifier::EndOfBody) {
98            return Err(Error::operation(OpCode::Put, "info headers can't contain end of body"));
99        }
100        Ok(())
101    }
102
103    /// Attempts to initiate a PUT operation with the `final_` bit set.
104    /// Returns the peer response headers on success, Error otherwise.
105    async fn do_put(&mut self, final_: bool, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
106        let is_started = self.is_started();
107        if !is_started {
108            headers = self.combine_with_initial_headers(headers)?;
109        }
110
111        // SRM is considered active if this is a subsequent PUT request & the transport supports it.
112        let srm_active = is_started && self.get_srm() == SingleResponseMode::Enable;
113        let (opcode, request, expected_response_code) = if final_ {
114            (OpCode::PutFinal, RequestPacket::new_put_final(headers), ResponseCode::Ok)
115        } else {
116            (OpCode::Put, RequestPacket::new_put(headers), ResponseCode::Continue)
117        };
118        trace!("Making outgoing PUT request: {request:?}");
119        self.transport.send(request).await?;
120        trace!("Successfully made PUT request");
121        if !is_started {
122            self.set_started()?;
123        }
124        // Expect a response if this is the final PUT request or if SRM is inactive, in which case
125        // every request must be responded to.
126        if final_ || !srm_active {
127            let response = self.transport.receive_response(opcode).await?;
128            response.expect_code(opcode, expected_response_code).map(Into::into)
129        } else {
130            Ok(HeaderSet::new())
131        }
132    }
133
134    /// Attempts to delete an object from the remote OBEX server specified by the provided
135    /// `headers`.
136    /// Returns the informational headers from the peer response on success, Error otherwise.
137    pub async fn delete(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
138        Self::validate_headers(&headers)?;
139        // No Body or EndOfBody Headers are included in a delete request.
140        // See OBEX 1.5 Section 3.4.3.6.
141        self.do_put(true, headers).await
142    }
143
144    /// Attempts to write the `data` object to the remote OBEX server.
145    /// Returns the informational headers from the peer response on success, Error otherwise.
146    /// The returned informational headers will be empty if Single Response Mode is enabled for the
147    /// operation. Only the final write request (`Self::write_final`) will potentially return a
148    /// non-empty set of headers.
149    pub async fn write(&mut self, data: &[u8], mut headers: HeaderSet) -> Result<HeaderSet, Error> {
150        Self::validate_headers(&headers)?;
151        let is_first_write = !self.is_started();
152        if is_first_write {
153            // Try to enable SRM if this is the first packet of the operation.
154            self.try_enable_srm(&mut headers)?;
155        }
156        headers.add(Header::Body(data.to_vec()))?;
157        let response_headers = self.do_put(false, headers).await?;
158        if is_first_write {
159            self.check_response_for_srm(&response_headers);
160        }
161        Ok(response_headers)
162    }
163
164    /// Attempts to write the final `data` object to the remote OBEX server.
165    /// This must be called before the PutOperation object is dropped.
166    /// Returns the informational headers from the peer response on success, Error otherwise.
167    ///
168    /// The PUT operation is considered complete after this.
169    pub async fn write_final(
170        mut self,
171        data: &[u8],
172        mut headers: HeaderSet,
173    ) -> Result<HeaderSet, Error> {
174        Self::validate_headers(&headers)?;
175        headers.add(Header::EndOfBody(data.to_vec()))?;
176        self.do_put(true, headers).await
177    }
178
179    /// Request to terminate a multi-packet PUT request early.
180    /// Returns the informational headers from the peer response on success, Error otherwise.
181    /// If Error is returned, there are no guarantees about the synchronization between the local
182    /// OBEX client and remote OBEX server.
183    pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
184        let opcode = OpCode::Abort;
185        if !self.is_started() {
186            return Err(Error::operation(opcode, "can't abort PUT that hasn't started"));
187        }
188        let request = RequestPacket::new_abort(headers);
189        trace!(request:?; "Making outgoing {opcode:?} request");
190        self.transport.send(request).await?;
191        trace!("Successfully made {opcode:?} request");
192        let response = self.transport.receive_response(opcode).await?;
193        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
194    }
195}
196
197impl SrmOperation for PutOperation<'_> {
198    const OPERATION_TYPE: OpCode = OpCode::Put;
199
200    fn get_srm(&self) -> SingleResponseMode {
201        self.srm
202    }
203
204    fn set_srm(&mut self, mode: SingleResponseMode) {
205        self.srm = mode;
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    use assert_matches::assert_matches;
214    use async_utils::PollExt;
215    use fuchsia_async as fasync;
216    use std::pin::pin;
217
218    use crate::header::ConnectionIdentifier;
219    use crate::operation::ResponsePacket;
220    use crate::transport::ObexTransportManager;
221    use crate::transport::test_utils::{
222        expect_code, expect_request, expect_request_and_reply, new_manager,
223    };
224
225    fn setup_put_operation(
226        mgr: &ObexTransportManager,
227        initial_headers: Vec<Header>,
228    ) -> PutOperation<'_> {
229        let transport = mgr.try_new_operation().expect("can start operation");
230        PutOperation::new(HeaderSet::from_headers(initial_headers).unwrap(), transport)
231    }
232
233    #[fuchsia::test]
234    fn put_operation_single_chunk_is_ok() {
235        let mut exec = fasync::TestExecutor::new();
236        let (manager, mut remote) = new_manager(/* srm_supported */ false);
237        let operation =
238            setup_put_operation(&manager, vec![Header::ConnectionId(0x1u32.try_into().unwrap())]);
239
240        let payload = vec![5, 6, 7, 8, 9];
241        let headers =
242            HeaderSet::from_headers(vec![Header::Type("file".into()), Header::name("foobar.txt")])
243                .unwrap();
244        let put_fut = operation.write_final(&payload[..], headers);
245        let mut put_fut = pin!(put_fut);
246        let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
247        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
248        let expectation = |request: RequestPacket| {
249            assert_eq!(*request.code(), OpCode::PutFinal);
250            let headers = HeaderSet::from(request);
251            assert!(headers.contains_header(&HeaderIdentifier::ConnectionId));
252            assert!(!headers.contains_header(&HeaderIdentifier::Body));
253            assert!(headers.contains_headers(&vec![
254                HeaderIdentifier::EndOfBody,
255                HeaderIdentifier::Type,
256                HeaderIdentifier::Name
257            ]));
258        };
259        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
260        let _received_headers = exec
261            .run_until_stalled(&mut put_fut)
262            .expect("response received")
263            .expect("valid response");
264    }
265
266    #[fuchsia::test]
267    fn put_operation_multiple_chunks_is_ok() {
268        let mut exec = fasync::TestExecutor::new();
269        let (manager, mut remote) = new_manager(/* srm_supported */ false);
270        let mut operation = setup_put_operation(&manager, vec![]);
271
272        let payload: Vec<u8> = (1..100).collect();
273        for chunk in payload.chunks(20) {
274            let put_fut = operation.write(&chunk[..], HeaderSet::new());
275            let mut put_fut = pin!(put_fut);
276            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
277            let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
278            let expectation = |request: RequestPacket| {
279                assert_eq!(*request.code(), OpCode::Put);
280                let headers = HeaderSet::from(request);
281                assert!(headers.contains_header(&HeaderIdentifier::Body));
282            };
283            expect_request_and_reply(&mut exec, &mut remote, expectation, response);
284            let _received_headers = exec
285                .run_until_stalled(&mut put_fut)
286                .expect("response received")
287                .expect("valid response");
288        }
289
290        // Can send final response that is empty to complete the operation.
291        let put_final_fut = operation.write_final(&[], HeaderSet::new());
292        let mut put_final_fut = pin!(put_final_fut);
293        let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
294        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
295        let expectation = |request: RequestPacket| {
296            assert_eq!(*request.code(), OpCode::PutFinal);
297            let headers = HeaderSet::from(request);
298            assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
299        };
300        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
301        let _ = exec
302            .run_until_stalled(&mut put_final_fut)
303            .expect("response received")
304            .expect("valid response");
305    }
306
307    #[fuchsia::test]
308    fn put_operation_delete_is_ok() {
309        let mut exec = fasync::TestExecutor::new();
310        let (manager, mut remote) = new_manager(/* srm_supported */ false);
311        let operation = setup_put_operation(&manager, vec![]);
312
313        let headers = HeaderSet::from_headers(vec![
314            Header::Description("deleting file".into()),
315            Header::name("foobar.txt"),
316        ])
317        .unwrap();
318        let put_fut = operation.delete(headers);
319        let mut put_fut = pin!(put_fut);
320        let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
321        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
322        let expectation = |request: RequestPacket| {
323            assert_eq!(*request.code(), OpCode::PutFinal);
324            let headers = HeaderSet::from(request);
325            assert!(!headers.contains_header(&HeaderIdentifier::Body));
326            assert!(!headers.contains_header(&HeaderIdentifier::EndOfBody));
327        };
328        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
329        let _ = exec
330            .run_until_stalled(&mut put_fut)
331            .expect("response received")
332            .expect("valid response");
333    }
334
335    #[fuchsia::test]
336    fn put_operation_terminate_success() {
337        let mut exec = fasync::TestExecutor::new();
338        let (manager, mut remote) = new_manager(/* srm_supported */ false);
339        let mut operation = setup_put_operation(&manager, vec![]);
340
341        // Write the first chunk of data to "start" the operation.
342        {
343            let put_fut = operation.write(&[1, 2, 3, 4, 5], HeaderSet::new());
344            let mut put_fut = pin!(put_fut);
345            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
346            let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
347            expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Put), response);
348            let _received_headers = exec
349                .run_until_stalled(&mut put_fut)
350                .expect("response received")
351                .expect("valid response");
352        }
353
354        // Terminating early should be Ok - peer acknowledges.
355        let terminate_fut = operation.terminate(HeaderSet::new());
356        let mut terminate_fut = pin!(terminate_fut);
357        let _ = exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for response");
358        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
359        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
360        let _received_headers = exec
361            .run_until_stalled(&mut terminate_fut)
362            .expect("response received")
363            .expect("valid response");
364    }
365
366    #[fuchsia::test]
367    async fn put_with_body_header_is_error() {
368        let (manager, _remote) = new_manager(/* srm_supported */ false);
369        let mut operation = setup_put_operation(&manager, vec![]);
370
371        let payload = vec![1, 2, 3];
372        // The payload should only be included as an argument. All other headers must be
373        // informational.
374        let body_headers = HeaderSet::from_headers(vec![
375            Header::Body(payload.clone()),
376            Header::name("foobar.txt"),
377        ])
378        .unwrap();
379        let result = operation.write(&payload[..], body_headers.clone()).await;
380        assert_matches!(result, Err(Error::OperationError { .. }));
381
382        // EndOfBody header is also an Error.
383        let eob_headers = HeaderSet::from_headers(vec![
384            Header::EndOfBody(payload.clone()),
385            Header::name("foobar1.txt"),
386        ])
387        .unwrap();
388        let result = operation.write(&payload[..], eob_headers.clone()).await;
389        assert_matches!(result, Err(Error::OperationError { .. }));
390    }
391
392    #[fuchsia::test]
393    async fn delete_with_body_header_is_error() {
394        let (manager, _remote) = new_manager(/* srm_supported */ false);
395
396        let payload = vec![1, 2, 3];
397        // Body shouldn't be included in delete.
398        let operation = setup_put_operation(&manager, vec![]);
399        let body_headers = HeaderSet::from_headers(vec![
400            Header::Body(payload.clone()),
401            Header::name("foobar.txt"),
402        ])
403        .unwrap();
404        let result = operation.delete(body_headers).await;
405        assert_matches!(result, Err(Error::OperationError { .. }));
406
407        // EndOfBody shouldn't be included in delete.
408        let operation = setup_put_operation(&manager, vec![]);
409        let eob_headers = HeaderSet::from_headers(vec![
410            Header::EndOfBody(payload.clone()),
411            Header::name("foobar1.txt"),
412        ])
413        .unwrap();
414        let result = operation.delete(eob_headers).await;
415        assert_matches!(result, Err(Error::OperationError { .. }));
416    }
417
418    #[fuchsia::test]
419    async fn put_operation_terminate_before_start_error() {
420        let (manager, _remote) = new_manager(/* srm_supported */ false);
421        let operation = setup_put_operation(&manager, vec![]);
422
423        // Trying to terminate early doesn't work as the operation has not started.
424        let headers = HeaderSet::from_header(Header::Description("terminating test".into()));
425        let terminate_result = operation.terminate(headers).await;
426        assert_matches!(terminate_result, Err(Error::OperationError { .. }));
427    }
428
429    #[fuchsia::test]
430    fn put_operation_srm_enabled_is_ok() {
431        let mut exec = fasync::TestExecutor::new();
432        let (manager, mut remote) = new_manager(/* srm_supported */ true);
433        let mut operation = setup_put_operation(&manager, vec![]);
434
435        {
436            let first_buf = [1, 2, 3];
437            // Even though the input headers are empty, we should prefer to enable SRM.
438            let put_fut = operation.write(&first_buf[..], HeaderSet::new());
439            let mut put_fut = pin!(put_fut);
440            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
441
442            // Expect the outgoing request with the SRM Header. Peer responds positively with a SRM
443            // Enable response.
444            let expectation = |request: RequestPacket| {
445                assert_eq!(*request.code(), OpCode::Put);
446                let headers = HeaderSet::from(request);
447                assert!(headers.contains_header(&HeaderIdentifier::Body));
448                assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
449            };
450            let response_headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
451            let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
452            expect_request_and_reply(&mut exec, &mut remote, expectation, response);
453            let _received_headers = exec
454                .run_until_stalled(&mut put_fut)
455                .expect("response received")
456                .expect("valid response");
457        }
458        // At this point SRM is enabled for the duration of the operation.
459        assert_eq!(operation.srm, SingleResponseMode::Enable);
460        // Second write doesn't require a response.
461        {
462            let second_buf = [4, 5, 6];
463            let put_fut2 = operation.write(&second_buf[..], HeaderSet::new());
464            let mut put_fut2 = pin!(put_fut2);
465            let _ = exec
466                .run_until_stalled(&mut put_fut2)
467                .expect("ready without peer response")
468                .expect("success");
469            let expectation = |request: RequestPacket| {
470                assert_eq!(*request.code(), OpCode::Put);
471                let headers = HeaderSet::from(request);
472                assert!(headers.contains_header(&HeaderIdentifier::Body));
473                assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
474            };
475            expect_request(&mut exec, &mut remote, expectation);
476        }
477
478        // Only the final write request will result in a response.
479        let put_final_fut = operation.write_final(&[], HeaderSet::new());
480        let mut put_final_fut = pin!(put_final_fut);
481        let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
482        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
483        let expectation = |request: RequestPacket| {
484            assert_eq!(*request.code(), OpCode::PutFinal);
485            let headers = HeaderSet::from(request);
486            assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
487        };
488        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
489        let _ = exec
490            .run_until_stalled(&mut put_final_fut)
491            .expect("response received")
492            .expect("valid response");
493    }
494
495    #[fuchsia::test]
496    fn client_disable_srm_mid_operation_is_ignored() {
497        let mut exec = fasync::TestExecutor::new();
498        let (manager, mut remote) = new_manager(/* srm_supported */ true);
499        let mut operation = setup_put_operation(&manager, vec![]);
500        // Pretend first write happened already by manually setting the operation as started.
501        if let Status::NotStarted(_) = &mut operation.status {
502            let _ = operation.set_started().unwrap();
503        } else {
504            panic!("At this point operation not started");
505        };
506        // SRM is enabled for the duration of the operation.
507        assert_eq!(operation.srm, SingleResponseMode::Enable);
508
509        // Client tries to disable SRM in a subsequent write attempt. Ignored.
510        {
511            let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
512            let put_fut = operation.write(&[], headers);
513            let mut put_fut = pin!(put_fut);
514            let _ = exec
515                .run_until_stalled(&mut put_fut)
516                .expect("ready without peer response")
517                .expect("success");
518            let expectation = |request: RequestPacket| {
519                assert_eq!(*request.code(), OpCode::Put);
520            };
521            expect_request(&mut exec, &mut remote, expectation);
522        }
523        // SRM is still enabled.
524        assert_eq!(operation.srm, SingleResponseMode::Enable);
525    }
526
527    #[fuchsia::test]
528    fn application_select_srm_success() {
529        let _exec = fasync::TestExecutor::new();
530        let (manager, _remote) = new_manager(/* srm_supported */ false);
531        let mut operation = setup_put_operation(&manager, vec![]);
532        assert_eq!(operation.srm, SingleResponseMode::Disable);
533        // The application requesting to disable SRM when it isn't supported is OK.
534        let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
535        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
536        assert_eq!(operation.srm, SingleResponseMode::Disable);
537
538        // The application requesting to disable SRM when it is supported is OK.
539        let (manager, _remote) = new_manager(/* srm_supported */ true);
540        let mut operation = setup_put_operation(&manager, vec![]);
541        assert_eq!(operation.srm, SingleResponseMode::Enable);
542        let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
543        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
544        assert_eq!(operation.srm, SingleResponseMode::Disable);
545
546        // The application requesting to enable SRM when it is supported is OK.
547        let (manager, _remote) = new_manager(/* srm_supported */ true);
548        let mut operation = setup_put_operation(&manager, vec![]);
549        assert_eq!(operation.srm, SingleResponseMode::Enable);
550        let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
551        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
552        assert_eq!(operation.srm, SingleResponseMode::Enable);
553    }
554
555    #[fuchsia::test]
556    fn application_enable_srm_when_not_supported_is_error() {
557        let _exec = fasync::TestExecutor::new();
558        let (manager, _remote) = new_manager(/* srm_supported */ false);
559        let mut operation = setup_put_operation(&manager, vec![]);
560        assert_eq!(operation.srm, SingleResponseMode::Disable);
561        let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
562        assert_matches!(operation.try_enable_srm(&mut headers), Err(Error::SrmNotSupported));
563        assert_eq!(operation.srm, SingleResponseMode::Disable);
564    }
565
566    #[fuchsia::test]
567    fn peer_srm_response() {
568        let _exec = fasync::TestExecutor::new();
569        let (manager, _remote) = new_manager(/* srm_supported */ false);
570        let mut operation = setup_put_operation(&manager, vec![]);
571        // An enable response from the peer when SRM is disabled locally should not enable SRM.
572        let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
573        operation.check_response_for_srm(&headers);
574        assert_eq!(operation.srm, SingleResponseMode::Disable);
575        // A disable response from the peer when SRM is disabled locally is a no-op.
576        let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
577        operation.check_response_for_srm(&headers);
578        assert_eq!(operation.srm, SingleResponseMode::Disable);
579
580        let (manager, _remote) = new_manager(/* srm_supported */ true);
581        let mut operation = setup_put_operation(&manager, vec![]);
582        // An enable response from the peer when SRM is enable is a no-op.
583        let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
584        operation.check_response_for_srm(&headers);
585        assert_eq!(operation.srm, SingleResponseMode::Enable);
586        // A disable response from the peer when SRM is enabled should disable SRM.
587        let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
588        operation.check_response_for_srm(&headers);
589        assert_eq!(operation.srm, SingleResponseMode::Disable);
590
591        let (manager, _remote) = new_manager(/* srm_supported */ true);
592        let mut operation = setup_put_operation(&manager, vec![]);
593        // A response with no SRM header should be treated like a disable request.
594        operation.check_response_for_srm(&HeaderSet::new());
595        assert_eq!(operation.srm, SingleResponseMode::Disable);
596    }
597
598    #[fuchsia::test]
599    fn put_with_connection_id_already_set_is_error() {
600        let mut exec = fasync::TestExecutor::new();
601        let (manager, _remote) = new_manager(/* srm_supported */ false);
602        // The initial operation contains a ConnectionId header which was negotiated during CONNECT.
603        let mut operation =
604            setup_put_operation(&manager, vec![Header::ConnectionId(ConnectionIdentifier(5))]);
605
606        let write_headers = HeaderSet::from_header(Header::ConnectionId(ConnectionIdentifier(10)));
607        let write_fut = operation.write(&[1, 2, 3], write_headers);
608        let mut write_fut = pin!(write_fut);
609        let result = exec.run_until_stalled(&mut write_fut).expect("finished with error");
610        assert_matches!(result, Err(Error::AlreadyExists(_)));
611    }
612}