Skip to main content

bt_obex/
transport.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_bluetooth::types::Channel;
6use futures::stream::{FusedStream, TryStreamExt};
7use log::{info, trace};
8use packet_encoding::Encodable;
9use std::cell::{RefCell, RefMut};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use crate::error::{Error, PacketError};
14use crate::operation::{MAX_PACKET_SIZE, MIN_MAX_PACKET_SIZE, OpCode, ResponsePacket};
15
16/// Returns the maximum packet size that will be used for the OBEX session.
17/// `transport_max` is the maximum size that the underlying transport (e.g. L2CAP, RFCOMM) supports.
18pub fn max_packet_size_from_transport(transport_max: usize) -> u16 {
19    let bounded = transport_max.clamp(MIN_MAX_PACKET_SIZE, MAX_PACKET_SIZE);
20    bounded.try_into().expect("bounded by u16 max")
21}
22
23/// The underlying communication protocol used for the OBEX transport.
24#[derive(Copy, Clone, Debug, PartialEq)]
25pub enum TransportType {
26    L2cap,
27    Rfcomm,
28}
29
30impl TransportType {
31    pub fn srm_supported(&self) -> bool {
32        match &self {
33            // Per GOEP Section 7.1, SRM can be used with the L2CAP transport.
34            Self::L2cap => true,
35            // Neither the OBEX nor GOEP specifications explicitly state that SRM cannot be used
36            // with the RFCOMM transport. However, all qualification tests and spec language
37            // suggest that SRM is to be used only on the L2CAP transport.
38            Self::Rfcomm => false,
39        }
40    }
41}
42
43/// Holds the underlying RFCOMM or L2CAP transport for an OBEX operation.
44#[derive(Debug)]
45pub struct ObexTransport<'a> {
46    /// A mutable reference to the permit given to the operation.
47    /// The L2CAP or RFCOMM connection to the remote peer.
48    channel: RefMut<'a, Channel>,
49    /// The type of transport used in the OBEX connection.
50    type_: TransportType,
51}
52
53impl<'a> ObexTransport<'a> {
54    pub fn new(channel: RefMut<'a, Channel>, type_: TransportType) -> Self {
55        Self { channel, type_ }
56    }
57
58    /// Returns true if this transport supports the Single Response Mode (SRM) feature.
59    pub fn srm_supported(&self) -> bool {
60        self.type_.srm_supported()
61    }
62
63    /// Attempts to receive and parse an OBEX response packet from the `channel`.
64    /// Returns the parsed packet on success, Error otherwise.
65    // TODO(https://fxbug.dev/42076096): Make this more generic to decode either request or response packets
66    // when OBEX Server functionality is needed.
67    pub async fn receive_response(&mut self, code: OpCode) -> Result<ResponsePacket, Error> {
68        if self.channel.is_terminated() {
69            return Err(Error::PeerDisconnected);
70        }
71
72        match self.channel.try_next().await? {
73            Some(raw_data) => {
74                let decoded = ResponsePacket::decode(&raw_data[..], code)?;
75                trace!("Received response: {decoded:?}");
76                Ok(decoded)
77            }
78            None => {
79                info!("OBEX transport closed");
80                Err(Error::PeerDisconnected)
81            }
82        }
83    }
84}
85
86impl<'a, T> futures::sink::Sink<T> for ObexTransport<'a>
87where
88    T: Encodable<Error = PacketError>,
89{
90    type Error = Error;
91
92    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93        let this = self.get_mut();
94        Pin::new(&mut *this.channel).poll_ready(cx).map_err(Into::into)
95    }
96
97    fn start_send(self: Pin<&mut Self>, data: T) -> Result<(), Self::Error> {
98        let mut buf = vec![0; data.encoded_len()];
99        data.encode(&mut buf[..])?;
100        let this = self.get_mut();
101        Pin::new(&mut *this.channel).start_send(buf).map_err(Into::into)
102    }
103
104    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105        let this = self.get_mut();
106        Pin::new(&mut *this.channel).poll_flush(cx).map_err(Into::into)
107    }
108
109    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110        let this = self.get_mut();
111        Pin::new(&mut *this.channel).poll_close(cx).map_err(Into::into)
112    }
113}
114
115/// Manages the transport connection (L2CAP/RFCOMM) to a remote peer.
116/// Provides a reservation system for acquiring the transport for an in-progress OBEX operation.
117#[derive(Debug)]
118pub struct ObexTransportManager {
119    /// Holds the underlying transport. The type of transport is indicated by the `type_` field.
120    /// There can only be one operation outstanding at any time. A mutable reference to the
121    /// `Channel` will be held by the `ObexTransport` during an ongoing operation and is
122    /// assigned using `ObexTransportManager::try_new_operation`. On operation termination (e.g.
123    /// `ObexTransport` is dropped), the `Channel` will be available for subsequent mutable access.
124    channel: RefCell<Channel>,
125    /// The transport type (L2CAP or RFCOMM) for the `channel`.
126    type_: TransportType,
127}
128
129impl ObexTransportManager {
130    pub fn new(channel: Channel, type_: TransportType) -> Self {
131        Self { channel: RefCell::new(channel), type_ }
132    }
133
134    fn new_permit(&self) -> Result<RefMut<'_, Channel>, Error> {
135        self.channel.try_borrow_mut().map_err(|_| Error::OperationInProgress)
136    }
137
138    pub fn is_transport_closed(&self) -> bool {
139        self.channel.try_borrow().map_or(false, |chan| chan.is_closed())
140    }
141
142    pub fn try_new_operation(&self) -> Result<ObexTransport<'_>, Error> {
143        // Only one operation can be outstanding at a time.
144        let channel = self.new_permit()?;
145        Ok(ObexTransport::new(channel, self.type_))
146    }
147}
148
149#[cfg(test)]
150pub(crate) mod test_utils {
151    use super::*;
152    use futures::SinkExt;
153
154    use async_test_helpers::expect_stream_item;
155    use fuchsia_async as fasync;
156    use packet_encoding::Decodable;
157
158    use crate::operation::RequestPacket;
159    use async_utils::PollExt;
160
161    /// Set `srm_supported` to true to build a transport that supports the OBEX SRM feature.
162    pub(crate) fn new_manager(srm_supported: bool) -> (ObexTransportManager, Channel) {
163        let (local, remote) = Channel::create();
164        let type_ = if srm_supported { TransportType::L2cap } else { TransportType::Rfcomm };
165        let manager = ObexTransportManager::new(local, type_);
166        (manager, remote)
167    }
168
169    #[derive(Clone)]
170    pub struct TestPacket(pub u8);
171
172    impl Encodable for TestPacket {
173        type Error = PacketError;
174        fn encoded_len(&self) -> usize {
175            1
176        }
177        fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
178            buf[0] = self.0;
179            Ok(())
180        }
181    }
182
183    impl Decodable for TestPacket {
184        type Error = PacketError;
185        fn decode(buf: &[u8]) -> Result<Self, Self::Error> {
186            Ok(TestPacket(buf[0]))
187        }
188    }
189
190    #[track_caller]
191    pub fn reply(exec: &mut fasync::TestExecutor, channel: &mut Channel, response: ResponsePacket) {
192        let mut response_buf = vec![0; response.encoded_len()];
193        response.encode(&mut response_buf[..]).expect("can encode response");
194        let mut fut = channel.send(response_buf.to_vec());
195        exec.run_until_stalled(&mut fut).expect("write to channel success").expect("write success");
196    }
197
198    #[track_caller]
199    pub fn send_packet<T>(exec: &mut fasync::TestExecutor, channel: &mut Channel, packet: T)
200    where
201        T: Encodable,
202        <T as Encodable>::Error: std::fmt::Debug,
203    {
204        let mut buf = vec![0; packet.encoded_len()];
205        packet.encode(&mut buf[..]).expect("can encode packet");
206        let mut fut = channel.send(buf.to_vec());
207        exec.run_until_stalled(&mut fut).expect("write to channel success").expect("write success");
208    }
209
210    #[track_caller]
211    pub fn expect_request<F>(exec: &mut fasync::TestExecutor, channel: &mut Channel, expectation: F)
212    where
213        F: FnOnce(RequestPacket),
214    {
215        let request_raw = expect_stream_item(exec, channel).expect("request");
216        let request = RequestPacket::decode(&request_raw[..]).expect("can decode request");
217        expectation(request);
218    }
219
220    #[track_caller]
221    pub fn expect_response<F>(
222        exec: &mut fasync::TestExecutor,
223        channel: &mut Channel,
224        expectation: F,
225        opcode: OpCode,
226    ) where
227        F: FnOnce(ResponsePacket),
228    {
229        let request_raw = expect_stream_item(exec, channel).expect("request");
230        let request = ResponsePacket::decode(&request_raw[..], opcode).expect("can decode request");
231        expectation(request);
232    }
233
234    /// Expects a request packet on the `channel` and validates the contents with the provided
235    /// `expectation`. Sends a `response` back on the channel.
236    #[track_caller]
237    pub fn expect_request_and_reply<F>(
238        exec: &mut fasync::TestExecutor,
239        channel: &mut Channel,
240        expectation: F,
241        response: ResponsePacket,
242    ) where
243        F: FnOnce(RequestPacket),
244    {
245        expect_request(exec, channel, expectation);
246        reply(exec, channel, response)
247    }
248
249    pub fn expect_code(code: OpCode) -> impl FnOnce(RequestPacket) {
250        let f = move |request: RequestPacket| {
251            assert_eq!(*request.code(), code);
252        };
253        f
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use futures::SinkExt;
261
262    use assert_matches::assert_matches;
263
264    use async_utils::PollExt;
265    use fuchsia_async as fasync;
266    use std::pin::pin;
267
268    use crate::header::HeaderSet;
269    use crate::operation::{RequestPacket, ResponseCode};
270    use crate::transport::test_utils::{
271        TestPacket, expect_code, expect_request_and_reply, new_manager,
272    };
273
274    #[fuchsia::test]
275    fn transport_manager_new_operation() {
276        let mut exec = fasync::TestExecutor::new();
277        let (manager, _remote) = new_manager(/* srm_supported */ false);
278
279        // Nothing should be in progress.
280        assert_matches!(manager.new_permit(), Ok(_));
281
282        // Should be able to start a new operation.
283        let transport1 = manager.try_new_operation().expect("can start operation");
284        // Trying to start another should be an Error.
285        assert_matches!(manager.try_new_operation(), Err(Error::OperationInProgress));
286
287        // Once the first finishes, another can be claimed.
288        drop(transport1);
289        let mut transport2 = manager.try_new_operation().expect("can start another operation");
290        let request = RequestPacket::new_connect(100, HeaderSet::new());
291        let mut send_fut = pin!(transport2.send(request));
292        exec.run_until_stalled(&mut send_fut)
293            .expect("send result ready")
294            .expect("can send request");
295    }
296
297    #[fuchsia::test]
298    fn send_and_receive() {
299        let mut exec = fasync::TestExecutor::new();
300        let (manager, mut remote) = new_manager(/* srm_supported */ false);
301        let mut transport = manager.try_new_operation().expect("can start operation");
302
303        // Local makes a request
304        let request = RequestPacket::new_connect(100, HeaderSet::new());
305        {
306            let mut send_fut = pin!(transport.send(request));
307            exec.run_until_stalled(&mut send_fut)
308                .expect("send result ready")
309                .expect("can send request");
310        }
311        // Remote end should receive it - send an example response back.
312        let peer_response =
313            ResponsePacket::new(ResponseCode::Ok, vec![0x10, 0x00, 0x00, 0xff], HeaderSet::new());
314        expect_request_and_reply(
315            &mut exec,
316            &mut remote,
317            expect_code(OpCode::Connect),
318            peer_response,
319        );
320        // Expect it on the ObexTransport
321        let receive_fut = transport.receive_response(OpCode::Connect);
322        let mut receive_fut = pin!(receive_fut);
323        let received_response = exec
324            .run_until_stalled(&mut receive_fut)
325            .expect("stream item from response")
326            .expect("valid response");
327        assert_eq!(*received_response.code(), ResponseCode::Ok);
328    }
329
330    #[fuchsia::test]
331    async fn send_while_channel_closed_is_error() {
332        let (manager, remote) = new_manager(/* srm_supported */ false);
333        let mut transport = manager.try_new_operation().expect("can start operation");
334        drop(remote);
335
336        let data = TestPacket(10);
337        let send_result = transport.send(data.clone()).await;
338        assert_matches!(send_result, Err(Error::IOError(_)));
339        // Trying again is still an Error.
340        let send_result = transport.send(data.clone()).await;
341        assert_matches!(send_result, Err(Error::IOError(_)));
342    }
343
344    #[fuchsia::test]
345    async fn is_transport_closed() {
346        let (manager, remote) = new_manager(/* srm_supported */ false);
347        assert!(!manager.is_transport_closed());
348
349        {
350            let _transport = manager.try_new_operation().expect("can start operation");
351            assert!(!manager.is_transport_closed());
352
353            // Even when the remote end is dropped, transport is deemed
354            // as active since there is currently an ongoing operation.
355            drop(remote);
356            assert!(!manager.is_transport_closed());
357        }
358
359        // When transport goes out of scope, finally transport is
360        // considered fully closed.
361        assert!(manager.is_transport_closed());
362    }
363
364    #[fuchsia::test]
365    async fn receive_while_channel_closed_is_error() {
366        let (manager, remote) = new_manager(/* srm_supported */ false);
367        let mut transport = manager.try_new_operation().expect("can start operation");
368        drop(remote);
369
370        let receive_result = transport.receive_response(OpCode::Connect).await;
371        assert_matches!(receive_result, Err(Error::PeerDisconnected));
372        // Trying again is handled gracefully - still an Error.
373        let receive_result = transport.receive_response(OpCode::Connect).await;
374        assert_matches!(receive_result, Err(Error::PeerDisconnected));
375    }
376}