bt_obex/client/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use log::{trace, warn};
7
8pub use crate::client::get::GetOperation;
9pub use crate::client::put::PutOperation;
10use crate::error::Error;
11use crate::header::{
12    ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
13};
14use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
15pub use crate::transport::TransportType;
16use crate::transport::{max_packet_size_from_transport, ObexTransportManager};
17use fuchsia_sync::Mutex;
18
19/// Implements the OBEX PUT operation.
20mod put;
21
22/// Implements the OBEX GET operation.
23mod get;
24
25/// An interface for the Single Response Mode (SRM) feature for an OBEX client operation.
26pub(crate) trait SrmOperation {
27    const OPERATION_TYPE: OpCode;
28
29    /// Returns the current SRM mode.
30    fn get_srm(&self) -> SingleResponseMode;
31
32    /// Sets SRM to the provided `mode`.
33    fn set_srm(&mut self, mode: SingleResponseMode);
34
35    /// Attempts to enable SRM for the operation by updating the provided `headers` with the SRM
36    /// header.
37    /// Returns Error if `headers` couldn't be updated with SRM, Ok otherwise.
38    fn try_enable_srm(&mut self, headers: &mut HeaderSet) -> Result<(), Error> {
39        let requested_srm = headers.try_add_srm(self.get_srm())?;
40        self.set_srm(requested_srm);
41        trace!(operation:? = Self::OPERATION_TYPE; "Requesting SRM {requested_srm:?}");
42        Ok(())
43    }
44
45    /// Checks the provided response `headers` for the SRM flag and updates the local SRM state
46    /// for the operation.
47    fn check_response_for_srm(&mut self, headers: &HeaderSet) {
48        let srm_response = if let Some(Header::SingleResponseMode(srm)) =
49            headers.get(&HeaderIdentifier::SingleResponseMode)
50        {
51            *srm
52        } else {
53            // No SRM indication from peer defaults to disabled.
54            trace!(operation:? = Self::OPERATION_TYPE; "Response doesn't contain SRM header");
55            SingleResponseMode::Disable
56        };
57
58        trace!(current_status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "Peer responded with {srm_response:?}");
59        match (srm_response, self.get_srm()) {
60            (SingleResponseMode::Enable, SingleResponseMode::Disable) => {
61                warn!("SRM stays disabled");
62            }
63            (SingleResponseMode::Disable, SingleResponseMode::Enable) => {
64                trace!("SRM is disabled");
65                self.set_srm(SingleResponseMode::Disable);
66            }
67            _ => {} // Otherwise, both sides agree on the SRM status.
68        }
69        trace!(status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "SRM status");
70    }
71}
72
73#[derive(Clone, Copy, Debug, PartialEq, Default)]
74enum ConnectionStatus {
75    /// The transport is created but the CONNECT operation has not been completed.
76    #[default]
77    Initialized,
78    /// The CONNECT operation has been completed and the transport is considered connected.
79    /// `id` contains the optional identifier for this connection. This is configured during the
80    /// CONNECT operation and is a unique value assigned by the remote OBEX server.
81    Connected { id: Option<ConnectionIdentifier> },
82    /// The transport is considered disconnected -- the OBEX client has Disconnected the session.
83    Disconnected,
84}
85
86impl ConnectionStatus {
87    #[cfg(test)]
88    fn connected_no_id() -> Self {
89        Self::Connected { id: None }
90    }
91}
92
93/// The Client role for an OBEX session.
94/// Provides an interface for connecting to a remote OBEX server and initiating the operations
95/// specified in OBEX 1.5.
96#[derive(Debug)]
97pub struct ObexClient {
98    /// Whether the CONNECT operation has completed.
99    connected: Mutex<ConnectionStatus>,
100    /// The maximum OBEX packet length for this OBEX session.
101    max_packet_size: Mutex<u16>,
102    /// Manages the RFCOMM or L2CAP transport and provides a reservation system for starting
103    /// new operations.
104    transport: ObexTransportManager,
105}
106
107impl ObexClient {
108    pub fn new(channel: Channel, type_: TransportType) -> Self {
109        let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
110        let transport = ObexTransportManager::new(channel, type_);
111        Self {
112            connected: Mutex::new(ConnectionStatus::default()),
113            max_packet_size: Mutex::new(max_packet_size),
114            transport,
115        }
116    }
117
118    pub fn is_transport_connected(&self) -> bool {
119        !self.transport.is_transport_closed()
120    }
121
122    fn set_connection_status(&self, status: ConnectionStatus) {
123        *self.connected.lock() = status;
124    }
125
126    fn connection_status(&self) -> ConnectionStatus {
127        *self.connected.lock()
128    }
129
130    pub fn is_connected(&self) -> bool {
131        matches!(*self.connected.lock(), ConnectionStatus::Connected { .. })
132    }
133
134    pub fn connection_id(&self) -> Option<ConnectionIdentifier> {
135        match self.connection_status() {
136            ConnectionStatus::Connected { id } => id.clone(),
137            _ => None,
138        }
139    }
140
141    fn set_max_packet_size(&self, peer_max_packet_size: u16) {
142        // We have no opinion on the preferred max packet size, so just use the peer's.
143        *self.max_packet_size.lock() = peer_max_packet_size;
144        trace!("Max packet size set to {peer_max_packet_size}");
145    }
146
147    fn max_packet_size(&self) -> u16 {
148        *self.max_packet_size.lock()
149    }
150
151    fn handle_connect_response(&self, response: ResponsePacket) -> Result<HeaderSet, Error> {
152        let request = OpCode::Connect;
153        let response = response.expect_code(request, ResponseCode::Ok)?;
154
155        // Expect the 4 bytes of additional data. We negotiate the max packet length based on what
156        // the peer requests. See OBEX 1.5 Section 3.4.1.
157        if response.data().len() != request.response_data_length() {
158            return Err(Error::response(request, "Invalid CONNECT data"));
159        }
160        let peer_max_packet_size = u16::from_be_bytes(response.data()[2..4].try_into().unwrap());
161        self.set_max_packet_size(peer_max_packet_size);
162
163        // Check to see if the response headers contains a Connection Identifier. If so, this should
164        // be included in all subsequent operations.
165        let headers: HeaderSet = response.into();
166        if let Some(Header::ConnectionId(id)) = headers.get(&HeaderIdentifier::ConnectionId) {
167            trace!(id:? = id; "Found Connection Identifier in CONNECT response");
168            self.set_connection_status(ConnectionStatus::Connected { id: Some(*id) });
169        }
170        Ok(headers)
171    }
172
173    /// Initiates a CONNECT request to the remote peer.
174    /// Returns the Headers associated with the response on success.
175    /// Returns Error if the CONNECT operation could not be completed.
176    pub async fn connect(&self, headers: HeaderSet) -> Result<HeaderSet, Error> {
177        if self.is_connected() {
178            return Err(Error::operation(OpCode::Connect, "already connected"));
179        }
180
181        let response = {
182            let request = RequestPacket::new_connect(self.max_packet_size(), headers);
183            let mut transport = self.transport.try_new_operation()?;
184            trace!("Making outgoing CONNECT request: {request:?}");
185            transport.send(request)?;
186            trace!("Successfully made CONNECT request");
187            transport.receive_response(OpCode::Connect).await?
188        };
189
190        let response_headers = self.handle_connect_response(response)?;
191        Ok(response_headers)
192    }
193
194    /// Initiates a DISCONNECT request to the remote peer.
195    /// Returns the Headers associated with the response on success.
196    /// Returns Error if the DISCONNECT operation couldn't be completed or was rejected by the peer.
197    /// The OBEX Session with the peer is considered terminated, regardless.
198    pub async fn disconnect(self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
199        let opcode = OpCode::Disconnect;
200        if !self.is_connected() {
201            return Err(Error::operation(opcode, "session not connected"));
202        }
203        headers.try_add_connection_id(&self.connection_id())?;
204        let response = {
205            let request = RequestPacket::new_disconnect(headers);
206            let mut transport = self.transport.try_new_operation()?;
207            trace!("Making outgoing DISCONNECT request: {request:?}");
208            transport.send(request)?;
209            trace!("Successfully made DISCONNECT request");
210            transport.receive_response(opcode).await?
211        };
212        self.set_connection_status(ConnectionStatus::Disconnected);
213        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
214    }
215
216    /// Initializes a GET Operation to retrieve data from the remote OBEX Server.
217    /// Returns a `GetOperation` on success, Error if the new operation couldn't be started.
218    pub fn get(&self) -> Result<GetOperation<'_>, Error> {
219        // A GET can only be initiated after the OBEX session is connected.
220        if !self.is_connected() {
221            return Err(Error::operation(OpCode::Get, "session not connected"));
222        }
223
224        let mut headers = HeaderSet::new();
225        headers.try_add_connection_id(&self.connection_id())?;
226
227        // Only one operation can be active at a time.
228        let transport = self.transport.try_new_operation()?;
229        Ok(GetOperation::new(headers, transport))
230    }
231
232    /// Initializes a PUT Operation to write data to the remote OBEX Server.
233    /// Returns a `PutOperation` on success, Error if the new operation couldn't be started.
234    pub fn put(&self) -> Result<PutOperation<'_>, Error> {
235        // A PUT can only be initiated after the OBEX session is connected.
236        if !self.is_connected() {
237            return Err(Error::operation(OpCode::Put, "session not connected"));
238        }
239
240        let mut headers = HeaderSet::new();
241        headers.try_add_connection_id(&self.connection_id())?;
242
243        // Only one operation can be active at a time.
244        let transport = self.transport.try_new_operation()?;
245        Ok(PutOperation::new(headers, transport))
246    }
247
248    /// Initializes a SETPATH Operation to set the current folder on the remote OBEX Server.
249    /// Returns the Headers associated with the response on success.
250    /// Returns `Error::NotImplemented` if the remote server does not support SETPATH.
251    /// Returns Error for all other errors.
252    pub async fn set_path(
253        &self,
254        flags: SetPathFlags,
255        mut headers: HeaderSet,
256    ) -> Result<HeaderSet, Error> {
257        let opcode = OpCode::SetPath;
258        // A SETPATH can only be initiated after the OBEX session is connected.
259        if !self.is_connected() {
260            return Err(Error::operation(opcode, "session not connected"));
261        }
262        headers.try_add_connection_id(&self.connection_id())?;
263        let request = RequestPacket::new_set_path(flags, headers)?;
264        let response = {
265            let mut transport = self.transport.try_new_operation()?;
266            trace!("Making outgoing SETPATH request: {request:?}");
267            transport.send(request)?;
268            trace!("Successfully made SETPATH request");
269            transport.receive_response(opcode).await?
270        };
271
272        // Per OBEX Section 3.4.6, the server may respond with BadRequest or Forbidden if it does
273        // not support the operation.
274        if *response.code() == ResponseCode::BadRequest
275            || *response.code() == ResponseCode::Forbidden
276        {
277            return Err(Error::not_implemented(opcode));
278        }
279        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    use assert_matches::assert_matches;
288    use async_utils::PollExt;
289    use fuchsia_async as fasync;
290    use std::pin::pin;
291
292    use crate::transport::test_utils::{expect_code, expect_request_and_reply};
293
294    #[fuchsia::test]
295    fn max_packet_size_calculation() {
296        // Value in [255, 65535] should be kept.
297        let transport_max = 1000;
298        assert_eq!(max_packet_size_from_transport(transport_max), 1000);
299
300        // Lower bound should be enforced.
301        let transport_max_small = 40;
302        assert_eq!(max_packet_size_from_transport(transport_max_small), 255);
303
304        // Upper bound should be enforced.
305        let transport_max_large = 700000;
306        assert_eq!(max_packet_size_from_transport(transport_max_large), std::u16::MAX);
307    }
308
309    /// Returns a new ObexClient and the remote end of the transport.
310    /// If `connected` is set, returns an ObexClient in the connected state, indicating the
311    /// completion of the OBEX CONNECT procedure.
312    fn new_obex_client(connected: ConnectionStatus) -> (ObexClient, Channel) {
313        let (local, remote) = Channel::create();
314        let client = ObexClient::new(local, TransportType::Rfcomm);
315        client.set_connection_status(connected);
316        (client, remote)
317    }
318
319    #[fuchsia::test]
320    fn client_connect_success() {
321        let mut exec = fasync::TestExecutor::new();
322        let (client, mut remote) = new_obex_client(ConnectionStatus::default());
323
324        assert!(!client.is_connected());
325        assert_eq!(client.max_packet_size(), Channel::DEFAULT_MAX_TX as u16);
326        assert_eq!(client.connection_id(), None);
327
328        {
329            let connect_fut = client.connect(HeaderSet::new());
330            let mut connect_fut = pin!(connect_fut);
331            exec.run_until_stalled(&mut connect_fut).expect_pending("waiting for response");
332
333            // Expect the Connect request on the remote and reply positively.
334            let response_headers =
335                HeaderSet::from_headers(vec![Header::ConnectionId(ConnectionIdentifier(1))])
336                    .unwrap();
337            let response = ResponsePacket::new(
338                ResponseCode::Ok,
339                vec![0x10, 0x00, 0xff, 0xff], // Version = 1.0, Flags = 0, Max packet = 0xffff
340                response_headers.clone(),
341            );
342            expect_request_and_reply(
343                &mut exec,
344                &mut remote,
345                expect_code(OpCode::Connect),
346                response,
347            );
348
349            let connect_result = exec
350                .run_until_stalled(&mut connect_fut)
351                .expect("received response")
352                .expect("response is ok");
353            assert_eq!(connect_result, response_headers);
354        }
355
356        // Should be connected with the max packet size specified by the peer.
357        assert!(client.is_connected());
358        assert_eq!(client.max_packet_size(), 0xffff);
359        assert_eq!(client.connection_id(), Some(ConnectionIdentifier(1)));
360    }
361
362    #[fuchsia::test]
363    async fn multiple_connect_is_error() {
364        let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
365
366        // Trying to connect again is an Error since it can only be done once.
367        let result = client.connect(HeaderSet::new()).await;
368        assert_matches!(result, Err(Error::OperationError { .. }));
369    }
370
371    #[fuchsia::test]
372    fn get_before_connect_is_error() {
373        let _exec = fasync::TestExecutor::new();
374        let (client, _remote) = new_obex_client(ConnectionStatus::default());
375
376        let get_result = client.get();
377        assert_matches!(get_result, Err(Error::OperationError { .. }));
378    }
379
380    #[fuchsia::test]
381    fn sequential_get_operations_is_ok() {
382        let _exec = fasync::TestExecutor::new();
383        let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
384
385        // Creating the first GET operation should succeed.
386        let _get_operation1 = client.get().expect("can initialize first get");
387
388        // After the first one "completes" (e.g. no longer held), it's okay to initiate a GET.
389        drop(_get_operation1);
390        let _get_operation2 = client.get().expect("can initialize second get");
391    }
392
393    #[fuchsia::test]
394    fn disconnect_success() {
395        let mut exec = fasync::TestExecutor::new();
396        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
397
398        let headers = HeaderSet::from_header(Header::Description("finished".into()));
399        let disconnect_fut = client.disconnect(headers);
400        let mut disconnect_fut = pin!(disconnect_fut);
401        exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
402
403        // Expect the Disconnect request on the remote. The typical response is a positive `Ok`.
404        let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
405        let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
406        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
407
408        let disconnect_result = exec
409            .run_until_stalled(&mut disconnect_fut)
410            .expect("received response")
411            .expect("response is ok");
412        assert_eq!(disconnect_result, response_headers);
413    }
414
415    #[fuchsia::test]
416    async fn disconnect_before_connect_error() {
417        let (client, _remote) = new_obex_client(ConnectionStatus::default());
418
419        let headers = HeaderSet::from_header(Header::Description("finished".into()));
420        let disconnect_result = client.disconnect(headers).await;
421        assert_matches!(disconnect_result, Err(Error::OperationError { .. }))
422    }
423
424    #[fuchsia::test]
425    fn disconnect_error_response_error() {
426        let mut exec = fasync::TestExecutor::new();
427        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
428
429        let disconnect_fut = client.disconnect(HeaderSet::new());
430        let mut disconnect_fut = pin!(disconnect_fut);
431        exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
432
433        // Expect the Disconnect request on the remote. An Error response still results in
434        // disconnection.
435        let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
436        let response = ResponsePacket::new(
437            ResponseCode::InternalServerError,
438            vec![],
439            response_headers.clone(),
440        );
441        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
442
443        let disconnect_result =
444            exec.run_until_stalled(&mut disconnect_fut).expect("received response");
445        assert_matches!(disconnect_result, Err(Error::PeerRejected { .. }));
446    }
447
448    #[fuchsia::test]
449    fn setpath_success() {
450        let mut exec = fasync::TestExecutor::new();
451        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
452
453        let headers = HeaderSet::from_header(Header::name("myfolder"));
454        let setpath_fut = client.set_path(SetPathFlags::empty(), headers);
455        let mut setpath_fut = pin!(setpath_fut);
456        exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
457
458        // Expect the SetPath request on the remote. The typical response is a positive `Ok`.
459        let response_headers =
460            HeaderSet::from_header(Header::Description("updated current folder".into()));
461        let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
462        let expectation = |request: RequestPacket| {
463            assert_eq!(*request.code(), OpCode::SetPath);
464            assert_eq!(request.data(), &[0, 0]);
465        };
466        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
467
468        let setpath_result = exec
469            .run_until_stalled(&mut setpath_fut)
470            .expect("received response")
471            .expect("response is ok");
472        assert_eq!(setpath_result, response_headers);
473    }
474
475    #[fuchsia::test]
476    fn setpath_error_response_is_error() {
477        let mut exec = fasync::TestExecutor::new();
478        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
479
480        // Peer doesn't support SetPath.
481        {
482            let setpath_fut = client.set_path(SetPathFlags::BACKUP, HeaderSet::new());
483            let mut setpath_fut = pin!(setpath_fut);
484            exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
485
486            // Expect the SetPath request on the remote - peer doesn't support SetPath.
487            let response_headers =
488                HeaderSet::from_header(Header::Description("not implemented".into()));
489            let response = ResponsePacket::new(ResponseCode::BadRequest, vec![], response_headers);
490            expect_request_and_reply(
491                &mut exec,
492                &mut remote,
493                expect_code(OpCode::SetPath),
494                response,
495            );
496
497            let setpath_result =
498                exec.run_until_stalled(&mut setpath_fut).expect("received response");
499            assert_matches!(setpath_result, Err(Error::NotImplemented { .. }));
500        }
501
502        // Peer rejects SetPath.
503        let headers = HeaderSet::from_header(Header::name("file"));
504        let setpath_fut = client.set_path(SetPathFlags::DONT_CREATE, headers);
505        let mut setpath_fut = pin!(setpath_fut);
506        exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
507
508        // Expect the SetPath request on the remote - peer responds with error.
509        let response_headers =
510            HeaderSet::from_header(Header::Description("not implemented".into()));
511        let response =
512            ResponsePacket::new(ResponseCode::InternalServerError, vec![], response_headers);
513        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::SetPath), response);
514
515        let setpath_result = exec.run_until_stalled(&mut setpath_fut).expect("received response");
516        assert_matches!(setpath_result, Err(Error::PeerRejected { .. }));
517    }
518}