bt_obex/
transport.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use futures::stream::{FusedStream, TryStreamExt};
7use log::{info, trace};
8use packet_encoding::Encodable;
9use std::cell::{RefCell, RefMut};
10
11use crate::error::{Error, PacketError};
12use crate::operation::{OpCode, ResponsePacket, MAX_PACKET_SIZE, MIN_MAX_PACKET_SIZE};
13
14/// Returns the maximum packet size that will be used for the OBEX session.
15/// `transport_max` is the maximum size that the underlying transport (e.g. L2CAP, RFCOMM) supports.
16pub fn max_packet_size_from_transport(transport_max: usize) -> u16 {
17    let bounded = transport_max.clamp(MIN_MAX_PACKET_SIZE, MAX_PACKET_SIZE);
18    bounded.try_into().expect("bounded by u16 max")
19}
20
21/// The underlying communication protocol used for the OBEX transport.
22#[derive(Copy, Clone, Debug, PartialEq)]
23pub enum TransportType {
24    L2cap,
25    Rfcomm,
26}
27
28impl TransportType {
29    pub fn srm_supported(&self) -> bool {
30        match &self {
31            // Per GOEP Section 7.1, SRM can be used with the L2CAP transport.
32            Self::L2cap => true,
33            // Neither the OBEX nor GOEP specifications explicitly state that SRM cannot be used
34            // with the RFCOMM transport. However, all qualification tests and spec language
35            // suggest that SRM is to be used only on the L2CAP transport.
36            Self::Rfcomm => false,
37        }
38    }
39}
40
41/// Holds the underlying RFCOMM or L2CAP transport for an OBEX operation.
42#[derive(Debug)]
43pub struct ObexTransport<'a> {
44    /// A mutable reference to the permit given to the operation.
45    /// The L2CAP or RFCOMM connection to the remote peer.
46    channel: RefMut<'a, Channel>,
47    /// The type of transport used in the OBEX connection.
48    type_: TransportType,
49}
50
51impl<'a> ObexTransport<'a> {
52    pub fn new(channel: RefMut<'a, Channel>, type_: TransportType) -> Self {
53        Self { channel, type_ }
54    }
55
56    /// Returns true if this transport supports the Single Response Mode (SRM) feature.
57    pub fn srm_supported(&self) -> bool {
58        self.type_.srm_supported()
59    }
60
61    /// Encodes and sends the OBEX `data` to the remote peer.
62    /// Returns Error if the send operation could not be completed.
63    pub fn send(&self, data: impl Encodable<Error = PacketError>) -> Result<(), Error> {
64        let mut buf = vec![0; data.encoded_len()];
65        data.encode(&mut buf[..])?;
66        let _ = self.channel.write(&buf)?;
67        Ok(())
68    }
69
70    /// Attempts to receive and parse an OBEX response packet from the `channel`.
71    /// Returns the parsed packet on success, Error otherwise.
72    // TODO(https://fxbug.dev/42076096): Make this more generic to decode either request or response packets
73    // when OBEX Server functionality is needed.
74    pub async fn receive_response(&mut self, code: OpCode) -> Result<ResponsePacket, Error> {
75        if self.channel.is_terminated() {
76            return Err(Error::PeerDisconnected);
77        }
78
79        match self.channel.try_next().await? {
80            Some(raw_data) => {
81                let decoded = ResponsePacket::decode(&raw_data[..], code)?;
82                trace!("Received response: {decoded:?}");
83                Ok(decoded)
84            }
85            None => {
86                info!("OBEX transport closed");
87                Err(Error::PeerDisconnected)
88            }
89        }
90    }
91}
92
93/// Manages the transport connection (L2CAP/RFCOMM) to a remote peer.
94/// Provides a reservation system for acquiring the transport for an in-progress OBEX operation.
95#[derive(Debug)]
96pub struct ObexTransportManager {
97    /// Holds the underlying transport. The type of transport is indicated by the `type_` field.
98    /// There can only be one operation outstanding at any time. A mutable reference to the
99    /// `Channel` will be held by the `ObexTransport` during an ongoing operation and is
100    /// assigned using `ObexTransportManager::try_new_operation`. On operation termination (e.g.
101    /// `ObexTransport` is dropped), the `Channel` will be available for subsequent mutable access.
102    channel: RefCell<Channel>,
103    /// The transport type (L2CAP or RFCOMM) for the `channel`.
104    type_: TransportType,
105}
106
107impl ObexTransportManager {
108    pub fn new(channel: Channel, type_: TransportType) -> Self {
109        Self { channel: RefCell::new(channel), type_ }
110    }
111
112    fn new_permit(&self) -> Result<RefMut<'_, Channel>, Error> {
113        self.channel.try_borrow_mut().map_err(|_| Error::OperationInProgress)
114    }
115
116    pub fn is_transport_closed(&self) -> bool {
117        self.channel.try_borrow().map_or(false, |chan| chan.is_closed())
118    }
119
120    pub fn try_new_operation(&self) -> Result<ObexTransport<'_>, Error> {
121        // Only one operation can be outstanding at a time.
122        let channel = self.new_permit()?;
123        Ok(ObexTransport::new(channel, self.type_))
124    }
125}
126
127#[cfg(test)]
128pub(crate) mod test_utils {
129    use super::*;
130
131    use async_test_helpers::expect_stream_item;
132    use fuchsia_async as fasync;
133    use packet_encoding::Decodable;
134
135    use crate::operation::RequestPacket;
136
137    /// Set `srm_supported` to true to build a transport that supports the OBEX SRM feature.
138    pub(crate) fn new_manager(srm_supported: bool) -> (ObexTransportManager, Channel) {
139        let (local, remote) = Channel::create();
140        let type_ = if srm_supported { TransportType::L2cap } else { TransportType::Rfcomm };
141        let manager = ObexTransportManager::new(local, type_);
142        (manager, remote)
143    }
144
145    #[derive(Clone)]
146    pub struct TestPacket(pub u8);
147
148    impl Encodable for TestPacket {
149        type Error = PacketError;
150        fn encoded_len(&self) -> usize {
151            1
152        }
153        fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
154            buf[0] = self.0;
155            Ok(())
156        }
157    }
158
159    impl Decodable for TestPacket {
160        type Error = PacketError;
161        fn decode(buf: &[u8]) -> Result<Self, Self::Error> {
162            Ok(TestPacket(buf[0]))
163        }
164    }
165
166    #[track_caller]
167    pub fn reply(channel: &mut Channel, response: ResponsePacket) {
168        let mut response_buf = vec![0; response.encoded_len()];
169        response.encode(&mut response_buf[..]).expect("can encode response");
170        let _ = channel.write(&response_buf[..]).expect("write to channel success");
171    }
172
173    /// Sends the `packet` over the provided `channel`.
174    #[track_caller]
175    pub fn send_packet<T>(channel: &mut Channel, packet: T)
176    where
177        T: Encodable,
178        <T as Encodable>::Error: std::fmt::Debug,
179    {
180        let mut buf = vec![0; packet.encoded_len()];
181        packet.encode(&mut buf[..]).expect("can encode packet");
182        let _ = channel.write(&buf[..]).expect("write to channel success");
183    }
184
185    #[track_caller]
186    pub fn expect_request<F>(exec: &mut fasync::TestExecutor, channel: &mut Channel, expectation: F)
187    where
188        F: FnOnce(RequestPacket),
189    {
190        let request_raw = expect_stream_item(exec, channel).expect("request");
191        let request = RequestPacket::decode(&request_raw[..]).expect("can decode request");
192        expectation(request);
193    }
194
195    #[track_caller]
196    pub fn expect_response<F>(
197        exec: &mut fasync::TestExecutor,
198        channel: &mut Channel,
199        expectation: F,
200        opcode: OpCode,
201    ) where
202        F: FnOnce(ResponsePacket),
203    {
204        let request_raw = expect_stream_item(exec, channel).expect("request");
205        let request = ResponsePacket::decode(&request_raw[..], opcode).expect("can decode request");
206        expectation(request);
207    }
208
209    /// Expects a request packet on the `channel` and validates the contents with the provided
210    /// `expectation`. Sends a `response` back on the channel.
211    #[track_caller]
212    pub fn expect_request_and_reply<F>(
213        exec: &mut fasync::TestExecutor,
214        channel: &mut Channel,
215        expectation: F,
216        response: ResponsePacket,
217    ) where
218        F: FnOnce(RequestPacket),
219    {
220        expect_request(exec, channel, expectation);
221        reply(channel, response)
222    }
223
224    pub fn expect_code(code: OpCode) -> impl FnOnce(RequestPacket) {
225        let f = move |request: RequestPacket| {
226            assert_eq!(*request.code(), code);
227        };
228        f
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    use assert_matches::assert_matches;
237
238    use async_utils::PollExt;
239    use fuchsia_async as fasync;
240    use std::pin::pin;
241
242    use crate::header::HeaderSet;
243    use crate::operation::{RequestPacket, ResponseCode};
244    use crate::transport::test_utils::{
245        expect_code, expect_request_and_reply, new_manager, TestPacket,
246    };
247
248    #[fuchsia::test]
249    fn transport_manager_new_operation() {
250        let _exec = fasync::TestExecutor::new();
251        let (manager, _remote) = new_manager(/* srm_supported */ false);
252
253        // Nothing should be in progress.
254        assert_matches!(manager.new_permit(), Ok(_));
255
256        // Should be able to start a new operation.
257        let transport1 = manager.try_new_operation().expect("can start operation");
258        // Trying to start another should be an Error.
259        assert_matches!(manager.try_new_operation(), Err(Error::OperationInProgress));
260
261        // Once the first finishes, another can be claimed.
262        drop(transport1);
263        let transport2 = manager.try_new_operation().expect("can start another operation");
264        let request = RequestPacket::new_connect(100, HeaderSet::new());
265        transport2.send(request).expect("can send request");
266    }
267
268    #[fuchsia::test]
269    fn send_and_receive() {
270        let mut exec = fasync::TestExecutor::new();
271        let (manager, mut remote) = new_manager(/* srm_supported */ false);
272        let mut transport = manager.try_new_operation().expect("can start operation");
273
274        // Local makes a request
275        let request = RequestPacket::new_connect(100, HeaderSet::new());
276        transport.send(request).expect("can send request");
277        // Remote end should receive it - send an example response back.
278        let peer_response =
279            ResponsePacket::new(ResponseCode::Ok, vec![0x10, 0x00, 0x00, 0xff], HeaderSet::new());
280        expect_request_and_reply(
281            &mut exec,
282            &mut remote,
283            expect_code(OpCode::Connect),
284            peer_response,
285        );
286        // Expect it on the ObexTransport
287        let receive_fut = transport.receive_response(OpCode::Connect);
288        let mut receive_fut = pin!(receive_fut);
289        let received_response = exec
290            .run_until_stalled(&mut receive_fut)
291            .expect("stream item from response")
292            .expect("valid response");
293        assert_eq!(*received_response.code(), ResponseCode::Ok);
294    }
295
296    #[fuchsia::test]
297    async fn send_while_channel_closed_is_error() {
298        let (manager, remote) = new_manager(/* srm_supported */ false);
299        let transport = manager.try_new_operation().expect("can start operation");
300        drop(remote);
301
302        let data = TestPacket(10);
303        let send_result = transport.send(data.clone());
304        assert_matches!(send_result, Err(Error::IOError(_)));
305        // Trying again is still an Error.
306        let send_result = transport.send(data.clone());
307        assert_matches!(send_result, Err(Error::IOError(_)));
308    }
309
310    #[fuchsia::test]
311    async fn is_transport_closed() {
312        let (manager, remote) = new_manager(/* srm_supported */ false);
313        assert!(!manager.is_transport_closed());
314
315        {
316            let _transport = manager.try_new_operation().expect("can start operation");
317            assert!(!manager.is_transport_closed());
318
319            // Even when the remote end is dropped, transport is deemed
320            // as active since there is currently an ongoing operation.
321            drop(remote);
322            assert!(!manager.is_transport_closed());
323        }
324
325        // When transport goes out of scope, finally transport is
326        // considered fully closed.
327        assert!(manager.is_transport_closed());
328    }
329
330    #[fuchsia::test]
331    async fn receive_while_channel_closed_is_error() {
332        let (manager, remote) = new_manager(/* srm_supported */ false);
333        let mut transport = manager.try_new_operation().expect("can start operation");
334        drop(remote);
335
336        let receive_result = transport.receive_response(OpCode::Connect).await;
337        assert_matches!(receive_result, Err(Error::PeerDisconnected));
338        // Trying again is handled gracefully - still an Error.
339        let receive_result = transport.receive_response(OpCode::Connect).await;
340        assert_matches!(receive_result, Err(Error::PeerDisconnected));
341    }
342}