Skip to main content

bt_obex/client/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use futures::SinkExt;
7use log::{trace, warn};
8
9pub use crate::client::get::GetOperation;
10pub use crate::client::put::PutOperation;
11use crate::error::Error;
12use crate::header::{
13    ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
14};
15use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
16pub use crate::transport::TransportType;
17use crate::transport::{ObexTransportManager, max_packet_size_from_transport};
18use fuchsia_sync::Mutex;
19
20/// Implements the OBEX PUT operation.
21mod put;
22
23/// Implements the OBEX GET operation.
24mod get;
25
26/// An interface for the Single Response Mode (SRM) feature for an OBEX client operation.
27pub(crate) trait SrmOperation {
28    const OPERATION_TYPE: OpCode;
29
30    /// Returns the current SRM mode.
31    fn get_srm(&self) -> SingleResponseMode;
32
33    /// Sets SRM to the provided `mode`.
34    fn set_srm(&mut self, mode: SingleResponseMode);
35
36    /// Attempts to enable SRM for the operation by updating the provided `headers` with the SRM
37    /// header.
38    /// Returns Error if `headers` couldn't be updated with SRM, Ok otherwise.
39    fn try_enable_srm(&mut self, headers: &mut HeaderSet) -> Result<(), Error> {
40        let requested_srm = headers.try_add_srm(self.get_srm())?;
41        self.set_srm(requested_srm);
42        trace!(operation:? = Self::OPERATION_TYPE; "Requesting SRM {requested_srm:?}");
43        Ok(())
44    }
45
46    /// Checks the provided response `headers` for the SRM flag and updates the local SRM state
47    /// for the operation.
48    fn check_response_for_srm(&mut self, headers: &HeaderSet) {
49        let srm_response = if let Some(Header::SingleResponseMode(srm)) =
50            headers.get(&HeaderIdentifier::SingleResponseMode)
51        {
52            *srm
53        } else {
54            // No SRM indication from peer defaults to disabled.
55            trace!(operation:? = Self::OPERATION_TYPE; "Response doesn't contain SRM header");
56            SingleResponseMode::Disable
57        };
58
59        trace!(current_status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "Peer responded with {srm_response:?}");
60        match (srm_response, self.get_srm()) {
61            (SingleResponseMode::Enable, SingleResponseMode::Disable) => {
62                warn!("SRM stays disabled");
63            }
64            (SingleResponseMode::Disable, SingleResponseMode::Enable) => {
65                trace!("SRM is disabled");
66                self.set_srm(SingleResponseMode::Disable);
67            }
68            _ => {} // Otherwise, both sides agree on the SRM status.
69        }
70        trace!(status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "SRM status");
71    }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Default)]
75enum ConnectionStatus {
76    /// The transport is created but the CONNECT operation has not been completed.
77    #[default]
78    Initialized,
79    /// The CONNECT operation has been completed and the transport is considered connected.
80    /// `id` contains the optional identifier for this connection. This is configured during the
81    /// CONNECT operation and is a unique value assigned by the remote OBEX server.
82    Connected { id: Option<ConnectionIdentifier> },
83    /// The transport is considered disconnected -- the OBEX client has Disconnected the session.
84    Disconnected,
85}
86
87impl ConnectionStatus {
88    #[cfg(test)]
89    fn connected_no_id() -> Self {
90        Self::Connected { id: None }
91    }
92}
93
94/// The Client role for an OBEX session.
95/// Provides an interface for connecting to a remote OBEX server and initiating the operations
96/// specified in OBEX 1.5.
97#[derive(Debug)]
98pub struct ObexClient {
99    /// Whether the CONNECT operation has completed.
100    connected: Mutex<ConnectionStatus>,
101    /// The maximum OBEX packet length for this OBEX session.
102    max_packet_size: Mutex<u16>,
103    /// Manages the RFCOMM or L2CAP transport and provides a reservation system for starting
104    /// new operations.
105    transport: ObexTransportManager,
106}
107
108impl ObexClient {
109    pub fn new(channel: Channel, type_: TransportType) -> Self {
110        let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
111        let transport = ObexTransportManager::new(channel, type_);
112        Self {
113            connected: Mutex::new(ConnectionStatus::default()),
114            max_packet_size: Mutex::new(max_packet_size),
115            transport,
116        }
117    }
118
119    pub fn is_transport_connected(&self) -> bool {
120        !self.transport.is_transport_closed()
121    }
122
123    fn set_connection_status(&self, status: ConnectionStatus) {
124        *self.connected.lock() = status;
125    }
126
127    fn connection_status(&self) -> ConnectionStatus {
128        *self.connected.lock()
129    }
130
131    pub fn is_connected(&self) -> bool {
132        matches!(*self.connected.lock(), ConnectionStatus::Connected { .. })
133    }
134
135    pub fn connection_id(&self) -> Option<ConnectionIdentifier> {
136        match self.connection_status() {
137            ConnectionStatus::Connected { id } => id.clone(),
138            _ => None,
139        }
140    }
141
142    fn set_max_packet_size(&self, peer_max_packet_size: u16) {
143        // We have no opinion on the preferred max packet size, so just use the peer's.
144        *self.max_packet_size.lock() = peer_max_packet_size;
145        trace!("Max packet size set to {peer_max_packet_size}");
146    }
147
148    fn max_packet_size(&self) -> u16 {
149        *self.max_packet_size.lock()
150    }
151
152    fn handle_connect_response(&self, response: ResponsePacket) -> Result<HeaderSet, Error> {
153        let request = OpCode::Connect;
154        let response = response.expect_code(request, ResponseCode::Ok)?;
155
156        // Expect the 4 bytes of additional data. We negotiate the max packet length based on what
157        // the peer requests. See OBEX 1.5 Section 3.4.1.
158        if response.data().len() != request.response_data_length() {
159            return Err(Error::response(request, "Invalid CONNECT data"));
160        }
161        let peer_max_packet_size = u16::from_be_bytes(response.data()[2..4].try_into().unwrap());
162        self.set_max_packet_size(peer_max_packet_size);
163
164        // Check to see if the response headers contains a Connection Identifier. If so, this should
165        // be included in all subsequent operations.
166        let headers: HeaderSet = response.into();
167        if let Some(Header::ConnectionId(id)) = headers.get(&HeaderIdentifier::ConnectionId) {
168            trace!(id:? = id; "Found Connection Identifier in CONNECT response");
169            self.set_connection_status(ConnectionStatus::Connected { id: Some(*id) });
170        }
171        Ok(headers)
172    }
173
174    /// Initiates a CONNECT request to the remote peer.
175    /// Returns the Headers associated with the response on success.
176    /// Returns Error if the CONNECT operation could not be completed.
177    pub async fn connect(&self, headers: HeaderSet) -> Result<HeaderSet, Error> {
178        if self.is_connected() {
179            return Err(Error::operation(OpCode::Connect, "already connected"));
180        }
181
182        let response = {
183            let request = RequestPacket::new_connect(self.max_packet_size(), headers);
184            let mut transport = self.transport.try_new_operation()?;
185            transport.send(request).await?;
186            trace!("Successfully made CONNECT request");
187            transport.receive_response(OpCode::Connect).await?
188        };
189
190        let response_headers = self.handle_connect_response(response)?;
191        Ok(response_headers)
192    }
193
194    /// Initiates a DISCONNECT request to the remote peer.
195    /// Returns the Headers associated with the response on success.
196    /// Returns Error if the DISCONNECT operation couldn't be completed or was rejected by the peer.
197    /// The OBEX Session with the peer is considered terminated, regardless.
198    pub async fn disconnect(self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
199        let opcode = OpCode::Disconnect;
200        if !self.is_connected() {
201            return Err(Error::operation(opcode, "session not connected"));
202        }
203        headers.try_add_connection_id(&self.connection_id())?;
204        let response = {
205            let request = RequestPacket::new_disconnect(headers);
206            let mut transport = self.transport.try_new_operation()?;
207            trace!("Making outgoing DISCONNECT request: {request:?}");
208            transport.send(request).await?;
209            trace!("Successfully made DISCONNECT request");
210            transport.receive_response(opcode).await?
211        };
212        self.set_connection_status(ConnectionStatus::Disconnected);
213        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
214    }
215
216    /// Initializes a GET Operation to retrieve data from the remote OBEX Server.
217    /// Returns a `GetOperation` on success, Error if the new operation couldn't be started.
218    pub fn get(&self) -> Result<GetOperation<'_>, Error> {
219        // A GET can only be initiated after the OBEX session is connected.
220        if !self.is_connected() {
221            return Err(Error::operation(OpCode::Get, "session not connected"));
222        }
223
224        let mut headers = HeaderSet::new();
225        headers.try_add_connection_id(&self.connection_id())?;
226
227        // Only one operation can be active at a time.
228        let transport = self.transport.try_new_operation()?;
229        Ok(GetOperation::new(headers, transport))
230    }
231
232    /// Initializes a PUT Operation to write data to the remote OBEX Server.
233    /// Returns a `PutOperation` on success, Error if the new operation couldn't be started.
234    pub fn put(&self) -> Result<PutOperation<'_>, Error> {
235        // A PUT can only be initiated after the OBEX session is connected.
236        if !self.is_connected() {
237            return Err(Error::operation(OpCode::Put, "session not connected"));
238        }
239
240        let mut headers = HeaderSet::new();
241        headers.try_add_connection_id(&self.connection_id())?;
242
243        // Only one operation can be active at a time.
244        let transport = self.transport.try_new_operation()?;
245        Ok(PutOperation::new(headers, transport))
246    }
247
248    /// Initializes a SETPATH Operation to set the current folder on the remote OBEX Server.
249    /// Returns the Headers associated with the response on success.
250    /// Returns `Error::NotImplemented` if the remote server does not support SETPATH.
251    /// Returns Error for all other errors.
252    pub async fn set_path(
253        &self,
254        flags: SetPathFlags,
255        mut headers: HeaderSet,
256    ) -> Result<HeaderSet, Error> {
257        let opcode = OpCode::SetPath;
258        // A SETPATH can only be initiated after the OBEX session is connected.
259        if !self.is_connected() {
260            return Err(Error::operation(opcode, "session not connected"));
261        }
262        headers.try_add_connection_id(&self.connection_id())?;
263        let request = RequestPacket::new_set_path(flags, headers)?;
264        let response = {
265            let mut transport = self.transport.try_new_operation()?;
266            trace!("Making outgoing SETPATH request: {request:?}");
267            transport.send(request).await?;
268            trace!("Successfully made SETPATH request");
269            transport.receive_response(opcode).await?
270        };
271
272        // Per OBEX Section 3.4.6, the server may respond with BadRequest or Forbidden if it does
273        // not support the operation.
274        if *response.code() == ResponseCode::BadRequest
275            || *response.code() == ResponseCode::Forbidden
276        {
277            return Err(Error::not_implemented(opcode));
278        }
279        response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    use assert_matches::assert_matches;
288    use async_utils::PollExt;
289    use fuchsia_async as fasync;
290    use std::pin::pin;
291
292    use crate::transport::test_utils::{expect_code, expect_request_and_reply};
293
294    #[fuchsia::test]
295    fn max_packet_size_calculation() {
296        // Value in [255, 65535] should be kept.
297        let transport_max = 1000;
298        assert_eq!(max_packet_size_from_transport(transport_max), 1000);
299
300        // Lower bound should be enforced.
301        let transport_max_small = 40;
302        assert_eq!(max_packet_size_from_transport(transport_max_small), 255);
303
304        // Upper bound should be enforced.
305        let transport_max_large = 700000;
306        assert_eq!(max_packet_size_from_transport(transport_max_large), std::u16::MAX);
307    }
308
309    /// Returns a new ObexClient and the remote end of the transport.
310    /// If `connected` is set, returns an ObexClient in the connected state, indicating the
311    /// completion of the OBEX CONNECT procedure.
312    fn new_obex_client(connected: ConnectionStatus) -> (ObexClient, Channel) {
313        let (local, remote) = Channel::create();
314        let client = ObexClient::new(local, TransportType::Rfcomm);
315        client.set_connection_status(connected);
316        (client, remote)
317    }
318
319    #[fuchsia::test]
320    fn client_connect_success() {
321        let mut exec = fasync::TestExecutor::new();
322        let (client, mut remote) = new_obex_client(ConnectionStatus::default());
323
324        assert!(!client.is_connected());
325        assert_eq!(client.max_packet_size(), Channel::DEFAULT_MAX_TX as u16);
326        assert_eq!(client.connection_id(), None);
327
328        {
329            let connect_fut = client.connect(HeaderSet::new());
330            let mut connect_fut = pin!(connect_fut);
331            exec.run_until_stalled(&mut connect_fut).expect_pending("waiting for response");
332
333            // Expect the Connect request on the remote and reply positively.
334            let response_headers =
335                HeaderSet::from_headers(vec![Header::ConnectionId(ConnectionIdentifier(1))])
336                    .unwrap();
337            let response = ResponsePacket::new(
338                ResponseCode::Ok,
339                vec![0x10, 0x00, 0xff, 0xff], // Version = 1.0, Flags = 0, Max packet = 0xffff
340                response_headers.clone(),
341            );
342            expect_request_and_reply(
343                &mut exec,
344                &mut remote,
345                expect_code(OpCode::Connect),
346                response,
347            );
348
349            let connect_result = exec
350                .run_until_stalled(&mut connect_fut)
351                .expect("received response")
352                .expect("response is ok");
353            assert_eq!(connect_result, response_headers);
354        }
355
356        // Should be connected with the max packet size specified by the peer.
357        assert!(client.is_connected());
358        assert_eq!(client.max_packet_size(), 0xffff);
359        assert_eq!(client.connection_id(), Some(ConnectionIdentifier(1)));
360    }
361
362    #[fuchsia::test]
363    async fn multiple_connect_is_error() {
364        let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
365
366        // Trying to connect again is an Error since it can only be done once.
367        let result = client.connect(HeaderSet::new()).await;
368        assert_matches!(result, Err(Error::OperationError { .. }));
369    }
370
371    #[fuchsia::test]
372    fn get_before_connect_is_error() {
373        let _exec = fasync::TestExecutor::new();
374        let (client, _remote) = new_obex_client(ConnectionStatus::default());
375
376        let get_result = client.get();
377        assert_matches!(get_result, Err(Error::OperationError { .. }));
378    }
379
380    #[fuchsia::test]
381    fn sequential_get_operations_is_ok() {
382        let _exec = fasync::TestExecutor::new();
383        let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
384
385        // Creating the first GET operation should succeed.
386        let _get_operation1 = client.get().expect("can initialize first get");
387
388        // After the first one "completes" (e.g. no longer held), it's okay to initiate a GET.
389        drop(_get_operation1);
390        let _get_operation2 = client.get().expect("can initialize second get");
391    }
392
393    #[fuchsia::test]
394    fn disconnect_success() {
395        let mut exec = fasync::TestExecutor::new();
396        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
397
398        let headers = HeaderSet::from_header(Header::Description("finished".into()));
399        let disconnect_fut = client.disconnect(headers);
400        let mut disconnect_fut = pin!(disconnect_fut);
401        exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
402
403        // Expect the Disconnect request on the remote. The typical response is a positive `Ok`.
404        let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
405        let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
406        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
407
408        let disconnect_result = exec
409            .run_until_stalled(&mut disconnect_fut)
410            .expect("received response")
411            .expect("response is ok");
412        assert_eq!(disconnect_result, response_headers);
413    }
414
415    #[fuchsia::test]
416    async fn disconnect_before_connect_error() {
417        let (client, _remote) = new_obex_client(ConnectionStatus::default());
418
419        let headers = HeaderSet::from_header(Header::Description("finished".into()));
420        let disconnect_result = client.disconnect(headers).await;
421        assert_matches!(disconnect_result, Err(Error::OperationError { .. }))
422    }
423
424    #[fuchsia::test]
425    fn disconnect_error_response_error() {
426        let mut exec = fasync::TestExecutor::new();
427        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
428
429        let disconnect_fut = client.disconnect(HeaderSet::new());
430        let mut disconnect_fut = pin!(disconnect_fut);
431        exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
432
433        // Expect the Disconnect request on the remote. An Error response still results in
434        // disconnection.
435        let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
436        let response = ResponsePacket::new(
437            ResponseCode::InternalServerError,
438            vec![],
439            response_headers.clone(),
440        );
441        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
442
443        let disconnect_result =
444            exec.run_until_stalled(&mut disconnect_fut).expect("received response");
445        assert_matches!(disconnect_result, Err(Error::PeerRejected { .. }));
446    }
447
448    #[fuchsia::test]
449    fn setpath_success() {
450        let mut exec = fasync::TestExecutor::new();
451        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
452
453        let headers = HeaderSet::from_header(Header::name("myfolder"));
454        let setpath_fut = client.set_path(SetPathFlags::empty(), headers);
455        let mut setpath_fut = pin!(setpath_fut);
456        exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
457
458        // Expect the SetPath request on the remote. The typical response is a positive `Ok`.
459        let response_headers =
460            HeaderSet::from_header(Header::Description("updated current folder".into()));
461        let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
462        let expectation = |request: RequestPacket| {
463            assert_eq!(*request.code(), OpCode::SetPath);
464            assert_eq!(request.data(), &[0, 0]);
465        };
466        expect_request_and_reply(&mut exec, &mut remote, expectation, response);
467
468        let setpath_result = exec
469            .run_until_stalled(&mut setpath_fut)
470            .expect("received response")
471            .expect("response is ok");
472        assert_eq!(setpath_result, response_headers);
473    }
474
475    #[fuchsia::test]
476    fn setpath_error_response_is_error() {
477        let mut exec = fasync::TestExecutor::new();
478        let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
479
480        // Peer doesn't support SetPath.
481        {
482            let setpath_fut = client.set_path(SetPathFlags::BACKUP, HeaderSet::new());
483            let mut setpath_fut = pin!(setpath_fut);
484            exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
485
486            // Expect the SetPath request on the remote - peer doesn't support SetPath.
487            let response_headers =
488                HeaderSet::from_header(Header::Description("not implemented".into()));
489            let response = ResponsePacket::new(ResponseCode::BadRequest, vec![], response_headers);
490            expect_request_and_reply(
491                &mut exec,
492                &mut remote,
493                expect_code(OpCode::SetPath),
494                response,
495            );
496
497            let setpath_result =
498                exec.run_until_stalled(&mut setpath_fut).expect("received response");
499            assert_matches!(setpath_result, Err(Error::NotImplemented { .. }));
500        }
501
502        // Peer rejects SetPath.
503        let headers = HeaderSet::from_header(Header::name("file"));
504        let setpath_fut = client.set_path(SetPathFlags::DONT_CREATE, headers);
505        let mut setpath_fut = pin!(setpath_fut);
506        exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
507
508        // Expect the SetPath request on the remote - peer responds with error.
509        let response_headers =
510            HeaderSet::from_header(Header::Description("not implemented".into()));
511        let response =
512            ResponsePacket::new(ResponseCode::InternalServerError, vec![], response_headers);
513        expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::SetPath), response);
514
515        let setpath_result = exec.run_until_stalled(&mut setpath_fut).expect("received response");
516        assert_matches!(setpath_result, Err(Error::PeerRejected { .. }));
517    }
518}