bt_obex/client/
put.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use log::trace;
6
7use crate::client::SrmOperation;
8use crate::error::Error;
9use crate::header::{Header, HeaderIdentifier, HeaderSet, SingleResponseMode};
10use crate::operation::{OpCode, RequestPacket, ResponseCode};
11use crate::transport::ObexTransport;
12
13/// Represents the status of the PUT operation.
14#[derive(Debug)]
15enum Status {
16    /// First write call has not been completed yet.
17    /// Holds the initial headers that need to be included in the
18    /// first write call.
19    NotStarted(HeaderSet),
20    /// First write has been completed and the operation is ongoing.
21    Started,
22}
23
24/// Represents an in-progress PUT Operation.
25/// Defined in OBEX 1.5 Section 3.4.3.
26///
27/// Example Usage:
28/// ```
29/// let obex_client = ObexClient::new(..);
30/// let put_operation = obex_client.put()?;
31/// let user_data: Vec<u8> = vec![];
32/// for user_data_chunk in user_data.chunks(50) {
33///   let received_headers = put_operation.write(&user_data_chunk[..], HeaderSet::new()).await?;
34/// }
35/// // `PutOperation::write_final` must be called before it is dropped. An empty payload is OK.
36/// let final_headers = put_operation.write_final(&[], HeaderSet::new()).await?;
37/// // PUT operation is complete and `put_operation` is consumed.
38/// ```
39#[must_use]
40#[derive(Debug)]
41pub struct PutOperation<'a> {
42    /// The L2CAP or RFCOMM connection to the remote peer.
43    transport: ObexTransport<'a>,
44    /// Status of the operation.
45    status: Status,
46    /// The status of SRM for this operation. By default, SRM will be enabled if the transport
47    /// supports it. However, it may be disabled if the peer requests to disable it.
48    srm: SingleResponseMode,
49}
50
51impl<'a> PutOperation<'a> {
52    pub fn new(headers: HeaderSet, transport: ObexTransport<'a>) -> Self {
53        let srm = transport.srm_supported().into();
54        Self { transport, status: Status::NotStarted(headers), srm }
55    }
56
57    /// Returns true by checking whether the initial headers were taken
58    /// out for the first put operation.
59    fn is_started(&self) -> bool {
60        match self.status {
61            Status::NotStarted(_) => false,
62            Status::Started => true,
63        }
64    }
65
66    /// Sets the operation as started.
67    fn set_started(&mut self) -> Result<(), Error> {
68        match std::mem::replace(&mut self.status, Status::Started) {
69            Status::NotStarted(_) => Ok(()),
70            Status::Started => {
71                Err(Error::other("Attempted to start a PUT operation that was already started"))
72            }
73        }
74    }
75
76    /// Returns the HeaderSet that takes the initial headers and
77    /// combines them with the input headers.
78    fn combine_with_initial_headers(&mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
79        let mut initial_headers = match &mut self.status {
80            Status::NotStarted(ref mut initial_headers) => std::mem::take(initial_headers),
81            Status::Started => {
82                return Err(Error::other(
83                    "Cannot add initial headers when PUT operation already started",
84                ))
85            }
86        };
87        let _ = initial_headers.try_append(headers)?;
88        Ok(initial_headers)
89    }
90
91    /// Returns Error if the `headers` contain non-informational OBEX Headers.
92    fn validate_headers(headers: &HeaderSet) -> Result<(), Error> {
93        if headers.contains_header(&HeaderIdentifier::Body) {
94            return Err(Error::operation(OpCode::Put, "info headers can't contain body"));
95        }
96        if headers.contains_header(&HeaderIdentifier::EndOfBody) {
97            return Err(Error::operation(OpCode::Put, "info headers can't contain end of body"));
98        }
99        Ok(())
100    }
101
102    /// Attempts to initiate a PUT operation with the `final_` bit set.
103    /// Returns the peer response headers on success, Error otherwise.
104    async fn do_put(&mut self, final_: bool, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
105        let is_started = self.is_started();
106        if !is_started {
107            headers = self.combine_with_initial_headers(headers)?;
108        }
109
110        // SRM is considered active if this is a subsequent PUT request & the transport supports it.
111        let srm_active = is_started && self.get_srm() == SingleResponseMode::Enable;
112        let (opcode, request, expected_response_code) = if final_ {
113            (OpCode::PutFinal, RequestPacket::new_put_final(headers), ResponseCode::Ok)
114        } else {
115            (OpCode::Put, RequestPacket::new_put(headers), ResponseCode::Continue)
116        };
117        trace!("Making outgoing PUT request: {request:?}");
118        self.transport.send(request)?;
119        trace!("Successfully made PUT request");
120        if !is_started {
121            self.set_started()?;
122        }
123        // Expect a response if this is the final PUT request or if SRM is inactive, in which case
124        // every request must be responded to.
125        if final_ || !srm_active {
126            let response = self.transport.receive_response(opcode).await?;
127            response.expect_code(opcode, expected_response_code).map(Into::into)
128        } else {
129            Ok(HeaderSet::new())
130        }
131    }
132
133    /// Attempts to delete an object from the remote OBEX server specified by the provided
134    /// `headers`.
135    /// Returns the informational headers from the peer response on success, Error otherwise.
136    pub async fn delete(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
137        Self::validate_headers(&headers)?;
138        // No Body or EndOfBody Headers are included in a delete request.
139        // See OBEX 1.5 Section 3.4.3.6.
140        self.do_put(true, headers).await
141    }
142
143    /// Attempts to write the `data` object to the remote OBEX server.
144    /// Returns the informational headers from the peer response on success, Error otherwise.
145    /// The returned informational headers will be empty if Single Response Mode is enabled for the
146    /// operation. Only the final write request (`Self::write_final`) will potentially return a
147    /// non-empty set of headers.
148    pub async fn write(&mut self, data: &[u8], mut headers: HeaderSet) -> Result<HeaderSet, Error> {
149        Self::validate_headers(&headers)?;
150        let is_first_write = !self.is_started();
151        if is_first_write {
152            // Try to enable SRM if this is the first packet of the operation.
153            self.try_enable_srm(&mut headers)?;
154        }
155        headers.add(Header::Body(data.to_vec()))?;
156        let response_headers = self.do_put(false, headers).await?;
157        if is_first_write {
158            self.check_response_for_srm(&response_headers);
159        }
160        Ok(response_headers)
161    }
162
163    /// Attempts to write the final `data` object to the remote OBEX server.
164    /// This must be called before the PutOperation object is dropped.
165    /// Returns the informational headers from the peer response on success, Error otherwise.
166    ///
167    /// The PUT operation is considered complete after this.
168    pub async fn write_final(
169        mut self,
170        data: &[u8],
171        mut headers: HeaderSet,
172    ) -> Result<HeaderSet, Error> {
173        Self::validate_headers(&headers)?;
174        headers.add(Header::EndOfBody(data.to_vec()))?;
175        self.do_put(true, headers).await
176    }
177
178    /// Request to terminate a multi-packet PUT request early.
179    /// Returns the informational headers from the peer response on success, Error otherwise.
180    /// If Error is returned, there are no guarantees about the synchronization between the local
181    /// OBEX client and remote OBEX server.
182    pub async fn terminate(mut self, headers: HeaderSet) -> Result<HeaderSet, Error> {
183        let opcode = OpCode::Abort;
184        if !self.is_started() {
185            return Err(Error::operation(opcode, "can't abort PUT that hasn't started"));
186        }
187        let request = RequestPacket::new_abort(headers);
188        trace!(request:?; "Making outgoing {opcode:?} request");
189        self.transport.send(request)?;
190        trace!("Successfully made {opcode:?} request");
191        let response = self.transport.receive_response(opcode).await?;
192        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
193    }
194}
195
196impl SrmOperation for PutOperation<'_> {
197    const OPERATION_TYPE: OpCode = OpCode::Put;
198
199    fn get_srm(&self) -> SingleResponseMode {
200        self.srm
201    }
202
203    fn set_srm(&mut self, mode: SingleResponseMode) {
204        self.srm = mode;
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    use assert_matches::assert_matches;
213    use async_utils::PollExt;
214    use fuchsia_async as fasync;
215    use std::pin::pin;
216
217    use crate::header::ConnectionIdentifier;
218    use crate::operation::ResponsePacket;
219    use crate::transport::test_utils::{
220        expect_code, expect_request, expect_request_and_reply, new_manager,
221    };
222    use crate::transport::ObexTransportManager;
223
224    fn setup_put_operation(
225        mgr: &ObexTransportManager,
226        initial_headers: Vec<Header>,
227    ) -> PutOperation<'_> {
228        let transport = mgr.try_new_operation().expect("can start operation");
229        PutOperation::new(HeaderSet::from_headers(initial_headers).unwrap(), transport)
230    }
231
232    #[fuchsia::test]
233    fn put_operation_single_chunk_is_ok() {
234        let mut exec = fasync::TestExecutor::new();
235        let (manager, mut remote) = new_manager(/* srm_supported */ false);
236        let operation =
237            setup_put_operation(&manager, vec![Header::ConnectionId(0x1u32.try_into().unwrap())]);
238
239        let payload = vec![5, 6, 7, 8, 9];
240        let headers =
241            HeaderSet::from_headers(vec![Header::Type("file".into()), Header::name("foobar.txt")])
242                .unwrap();
243        let put_fut = operation.write_final(&payload[..], headers);
244        let mut put_fut = pin!(put_fut);
245        let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
246        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
247        let expectation = |request: RequestPacket| {
248            assert_eq!(*request.code(), OpCode::PutFinal);
249            let headers = HeaderSet::from(request);
250            assert!(headers.contains_header(&HeaderIdentifier::ConnectionId));
251            assert!(!headers.contains_header(&HeaderIdentifier::Body));
252            assert!(headers.contains_headers(&vec![
253                HeaderIdentifier::EndOfBody,
254                HeaderIdentifier::Type,
255                HeaderIdentifier::Name
256            ]));
257        };
258        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
259        let _received_headers = exec
260            .run_until_stalled(&mut put_fut)
261            .expect("response received")
262            .expect("valid response");
263    }
264
265    #[fuchsia::test]
266    fn put_operation_multiple_chunks_is_ok() {
267        let mut exec = fasync::TestExecutor::new();
268        let (manager, mut remote) = new_manager(/* srm_supported */ false);
269        let mut operation = setup_put_operation(&manager, vec![]);
270
271        let payload: Vec<u8> = (1..100).collect();
272        for chunk in payload.chunks(20) {
273            let put_fut = operation.write(&chunk[..], HeaderSet::new());
274            let mut put_fut = pin!(put_fut);
275            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
276            let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
277            let expectation = |request: RequestPacket| {
278                assert_eq!(*request.code(), OpCode::Put);
279                let headers = HeaderSet::from(request);
280                assert!(headers.contains_header(&HeaderIdentifier::Body));
281            };
282            expect_request_and_reply(&mut exec, &mut remote, expectation, response);
283            let _received_headers = exec
284                .run_until_stalled(&mut put_fut)
285                .expect("response received")
286                .expect("valid response");
287        }
288
289        // Can send final response that is empty to complete the operation.
290        let put_final_fut = operation.write_final(&[], HeaderSet::new());
291        let mut put_final_fut = pin!(put_final_fut);
292        let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
293        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
294        let expectation = |request: RequestPacket| {
295            assert_eq!(*request.code(), OpCode::PutFinal);
296            let headers = HeaderSet::from(request);
297            assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
298        };
299        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
300        let _ = exec
301            .run_until_stalled(&mut put_final_fut)
302            .expect("response received")
303            .expect("valid response");
304    }
305
306    #[fuchsia::test]
307    fn put_operation_delete_is_ok() {
308        let mut exec = fasync::TestExecutor::new();
309        let (manager, mut remote) = new_manager(/* srm_supported */ false);
310        let operation = setup_put_operation(&manager, vec![]);
311
312        let headers = HeaderSet::from_headers(vec![
313            Header::Description("deleting file".into()),
314            Header::name("foobar.txt"),
315        ])
316        .unwrap();
317        let put_fut = operation.delete(headers);
318        let mut put_fut = pin!(put_fut);
319        let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
320        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
321        let expectation = |request: RequestPacket| {
322            assert_eq!(*request.code(), OpCode::PutFinal);
323            let headers = HeaderSet::from(request);
324            assert!(!headers.contains_header(&HeaderIdentifier::Body));
325            assert!(!headers.contains_header(&HeaderIdentifier::EndOfBody));
326        };
327        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
328        let _ = exec
329            .run_until_stalled(&mut put_fut)
330            .expect("response received")
331            .expect("valid response");
332    }
333
334    #[fuchsia::test]
335    fn put_operation_terminate_success() {
336        let mut exec = fasync::TestExecutor::new();
337        let (manager, mut remote) = new_manager(/* srm_supported */ false);
338        let mut operation = setup_put_operation(&manager, vec![]);
339
340        // Write the first chunk of data to "start" the operation.
341        {
342            let put_fut = operation.write(&[1, 2, 3, 4, 5], HeaderSet::new());
343            let mut put_fut = pin!(put_fut);
344            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
345            let response = ResponsePacket::new_no_data(ResponseCode::Continue, HeaderSet::new());
346            expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Put), response);
347            let _received_headers = exec
348                .run_until_stalled(&mut put_fut)
349                .expect("response received")
350                .expect("valid response");
351        }
352
353        // Terminating early should be Ok - peer acknowledges.
354        let terminate_fut = operation.terminate(HeaderSet::new());
355        let mut terminate_fut = pin!(terminate_fut);
356        let _ = exec.run_until_stalled(&mut terminate_fut).expect_pending("waiting for response");
357        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
358        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Abort), response);
359        let _received_headers = exec
360            .run_until_stalled(&mut terminate_fut)
361            .expect("response received")
362            .expect("valid response");
363    }
364
365    #[fuchsia::test]
366    async fn put_with_body_header_is_error() {
367        let (manager, _remote) = new_manager(/* srm_supported */ false);
368        let mut operation = setup_put_operation(&manager, vec![]);
369
370        let payload = vec![1, 2, 3];
371        // The payload should only be included as an argument. All other headers must be
372        // informational.
373        let body_headers = HeaderSet::from_headers(vec![
374            Header::Body(payload.clone()),
375            Header::name("foobar.txt"),
376        ])
377        .unwrap();
378        let result = operation.write(&payload[..], body_headers.clone()).await;
379        assert_matches!(result, Err(Error::OperationError { .. }));
380
381        // EndOfBody header is also an Error.
382        let eob_headers = HeaderSet::from_headers(vec![
383            Header::EndOfBody(payload.clone()),
384            Header::name("foobar1.txt"),
385        ])
386        .unwrap();
387        let result = operation.write(&payload[..], eob_headers.clone()).await;
388        assert_matches!(result, Err(Error::OperationError { .. }));
389    }
390
391    #[fuchsia::test]
392    async fn delete_with_body_header_is_error() {
393        let (manager, _remote) = new_manager(/* srm_supported */ false);
394
395        let payload = vec![1, 2, 3];
396        // Body shouldn't be included in delete.
397        let operation = setup_put_operation(&manager, vec![]);
398        let body_headers = HeaderSet::from_headers(vec![
399            Header::Body(payload.clone()),
400            Header::name("foobar.txt"),
401        ])
402        .unwrap();
403        let result = operation.delete(body_headers).await;
404        assert_matches!(result, Err(Error::OperationError { .. }));
405
406        // EndOfBody shouldn't be included in delete.
407        let operation = setup_put_operation(&manager, vec![]);
408        let eob_headers = HeaderSet::from_headers(vec![
409            Header::EndOfBody(payload.clone()),
410            Header::name("foobar1.txt"),
411        ])
412        .unwrap();
413        let result = operation.delete(eob_headers).await;
414        assert_matches!(result, Err(Error::OperationError { .. }));
415    }
416
417    #[fuchsia::test]
418    async fn put_operation_terminate_before_start_error() {
419        let (manager, _remote) = new_manager(/* srm_supported */ false);
420        let operation = setup_put_operation(&manager, vec![]);
421
422        // Trying to terminate early doesn't work as the operation has not started.
423        let headers = HeaderSet::from_header(Header::Description("terminating test".into()));
424        let terminate_result = operation.terminate(headers).await;
425        assert_matches!(terminate_result, Err(Error::OperationError { .. }));
426    }
427
428    #[fuchsia::test]
429    fn put_operation_srm_enabled_is_ok() {
430        let mut exec = fasync::TestExecutor::new();
431        let (manager, mut remote) = new_manager(/* srm_supported */ true);
432        let mut operation = setup_put_operation(&manager, vec![]);
433
434        {
435            let first_buf = [1, 2, 3];
436            // Even though the input headers are empty, we should prefer to enable SRM.
437            let put_fut = operation.write(&first_buf[..], HeaderSet::new());
438            let mut put_fut = pin!(put_fut);
439            let _ = exec.run_until_stalled(&mut put_fut).expect_pending("waiting for response");
440
441            // Expect the outgoing request with the SRM Header. Peer responds positively with a SRM
442            // Enable response.
443            let expectation = |request: RequestPacket| {
444                assert_eq!(*request.code(), OpCode::Put);
445                let headers = HeaderSet::from(request);
446                assert!(headers.contains_header(&HeaderIdentifier::Body));
447                assert!(headers.contains_header(&HeaderIdentifier::SingleResponseMode));
448            };
449            let response_headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
450            let response = ResponsePacket::new_no_data(ResponseCode::Continue, response_headers);
451            expect_request_and_reply(&mut exec, &mut remote, expectation, response);
452            let _received_headers = exec
453                .run_until_stalled(&mut put_fut)
454                .expect("response received")
455                .expect("valid response");
456        }
457        // At this point SRM is enabled for the duration of the operation.
458        assert_eq!(operation.srm, SingleResponseMode::Enable);
459        // Second write doesn't require a response.
460        {
461            let second_buf = [4, 5, 6];
462            let put_fut2 = operation.write(&second_buf[..], HeaderSet::new());
463            let mut put_fut2 = pin!(put_fut2);
464            let _ = exec
465                .run_until_stalled(&mut put_fut2)
466                .expect("ready without peer response")
467                .expect("success");
468            let expectation = |request: RequestPacket| {
469                assert_eq!(*request.code(), OpCode::Put);
470                let headers = HeaderSet::from(request);
471                assert!(headers.contains_header(&HeaderIdentifier::Body));
472                assert!(!headers.contains_header(&HeaderIdentifier::SingleResponseMode));
473            };
474            expect_request(&mut exec, &mut remote, expectation);
475        }
476
477        // Only the final write request will result in a response.
478        let put_final_fut = operation.write_final(&[], HeaderSet::new());
479        let mut put_final_fut = pin!(put_final_fut);
480        let _ = exec.run_until_stalled(&mut put_final_fut).expect_pending("waiting for response");
481        let response = ResponsePacket::new_no_data(ResponseCode::Ok, HeaderSet::new());
482        let expectation = |request: RequestPacket| {
483            assert_eq!(*request.code(), OpCode::PutFinal);
484            let headers = HeaderSet::from(request);
485            assert!(headers.contains_header(&HeaderIdentifier::EndOfBody));
486        };
487        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
488        let _ = exec
489            .run_until_stalled(&mut put_final_fut)
490            .expect("response received")
491            .expect("valid response");
492    }
493
494    #[fuchsia::test]
495    fn client_disable_srm_mid_operation_is_ignored() {
496        let mut exec = fasync::TestExecutor::new();
497        let (manager, mut remote) = new_manager(/* srm_supported */ true);
498        let mut operation = setup_put_operation(&manager, vec![]);
499        // Pretend first write happened already by manually setting the operation as started.
500        if let Status::NotStarted(_) = &mut operation.status {
501            let _ = operation.set_started().unwrap();
502        } else {
503            panic!("At this point operation not started");
504        };
505        // SRM is enabled for the duration of the operation.
506        assert_eq!(operation.srm, SingleResponseMode::Enable);
507
508        // Client tries to disable SRM in a subsequent write attempt. Ignored.
509        {
510            let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
511            let put_fut = operation.write(&[], headers);
512            let mut put_fut = pin!(put_fut);
513            let _ = exec
514                .run_until_stalled(&mut put_fut)
515                .expect("ready without peer response")
516                .expect("success");
517            let expectation = |request: RequestPacket| {
518                assert_eq!(*request.code(), OpCode::Put);
519            };
520            expect_request(&mut exec, &mut remote, expectation);
521        }
522        // SRM is still enabled.
523        assert_eq!(operation.srm, SingleResponseMode::Enable);
524    }
525
526    #[fuchsia::test]
527    fn application_select_srm_success() {
528        let _exec = fasync::TestExecutor::new();
529        let (manager, _remote) = new_manager(/* srm_supported */ false);
530        let mut operation = setup_put_operation(&manager, vec![]);
531        assert_eq!(operation.srm, SingleResponseMode::Disable);
532        // The application requesting to disable SRM when it isn't supported is OK.
533        let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
534        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
535        assert_eq!(operation.srm, SingleResponseMode::Disable);
536
537        // The application requesting to disable SRM when it is supported is OK.
538        let (manager, _remote) = new_manager(/* srm_supported */ true);
539        let mut operation = setup_put_operation(&manager, vec![]);
540        assert_eq!(operation.srm, SingleResponseMode::Enable);
541        let mut headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
542        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
543        assert_eq!(operation.srm, SingleResponseMode::Disable);
544
545        // The application requesting to enable SRM when it is supported is OK.
546        let (manager, _remote) = new_manager(/* srm_supported */ true);
547        let mut operation = setup_put_operation(&manager, vec![]);
548        assert_eq!(operation.srm, SingleResponseMode::Enable);
549        let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
550        assert_matches!(operation.try_enable_srm(&mut headers), Ok(()));
551        assert_eq!(operation.srm, SingleResponseMode::Enable);
552    }
553
554    #[fuchsia::test]
555    fn application_enable_srm_when_not_supported_is_error() {
556        let _exec = fasync::TestExecutor::new();
557        let (manager, _remote) = new_manager(/* srm_supported */ false);
558        let mut operation = setup_put_operation(&manager, vec![]);
559        assert_eq!(operation.srm, SingleResponseMode::Disable);
560        let mut headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
561        assert_matches!(operation.try_enable_srm(&mut headers), Err(Error::SrmNotSupported));
562        assert_eq!(operation.srm, SingleResponseMode::Disable);
563    }
564
565    #[fuchsia::test]
566    fn peer_srm_response() {
567        let _exec = fasync::TestExecutor::new();
568        let (manager, _remote) = new_manager(/* srm_supported */ false);
569        let mut operation = setup_put_operation(&manager, vec![]);
570        // An enable response from the peer when SRM is disabled locally should not enable SRM.
571        let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
572        operation.check_response_for_srm(&headers);
573        assert_eq!(operation.srm, SingleResponseMode::Disable);
574        // A disable response from the peer when SRM is disabled locally is a no-op.
575        let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
576        operation.check_response_for_srm(&headers);
577        assert_eq!(operation.srm, SingleResponseMode::Disable);
578
579        let (manager, _remote) = new_manager(/* srm_supported */ true);
580        let mut operation = setup_put_operation(&manager, vec![]);
581        // An enable response from the peer when SRM is enable is a no-op.
582        let headers = HeaderSet::from_header(SingleResponseMode::Enable.into());
583        operation.check_response_for_srm(&headers);
584        assert_eq!(operation.srm, SingleResponseMode::Enable);
585        // A disable response from the peer when SRM is enabled should disable SRM.
586        let headers = HeaderSet::from_header(SingleResponseMode::Disable.into());
587        operation.check_response_for_srm(&headers);
588        assert_eq!(operation.srm, SingleResponseMode::Disable);
589
590        let (manager, _remote) = new_manager(/* srm_supported */ true);
591        let mut operation = setup_put_operation(&manager, vec![]);
592        // A response with no SRM header should be treated like a disable request.
593        operation.check_response_for_srm(&HeaderSet::new());
594        assert_eq!(operation.srm, SingleResponseMode::Disable);
595    }
596
597    #[fuchsia::test]
598    fn put_with_connection_id_already_set_is_error() {
599        let mut exec = fasync::TestExecutor::new();
600        let (manager, _remote) = new_manager(/* srm_supported */ false);
601        // The initial operation contains a ConnectionId header which was negotiated during CONNECT.
602        let mut operation =
603            setup_put_operation(&manager, vec![Header::ConnectionId(ConnectionIdentifier(5))]);
604
605        let write_headers = HeaderSet::from_header(Header::ConnectionId(ConnectionIdentifier(10)));
606        let write_fut = operation.write(&[1, 2, 3], write_headers);
607        let mut write_fut = pin!(write_fut);
608        let result = exec.run_until_stalled(&mut write_fut).expect("finished with error");
609        assert_matches!(result, Err(Error::AlreadyExists(_)));
610    }
611}