1use fuchsia_bluetooth::types::Channel;
6use futures::stream::{FusedStream, TryStreamExt};
7use log::{info, trace};
8use packet_encoding::Encodable;
9use std::cell::{RefCell, RefMut};
10
11use crate::error::{Error, PacketError};
12use crate::operation::{OpCode, ResponsePacket, MAX_PACKET_SIZE, MIN_MAX_PACKET_SIZE};
13
14pub fn max_packet_size_from_transport(transport_max: usize) -> u16 {
17 let bounded = transport_max.clamp(MIN_MAX_PACKET_SIZE, MAX_PACKET_SIZE);
18 bounded.try_into().expect("bounded by u16 max")
19}
20
21#[derive(Copy, Clone, Debug, PartialEq)]
23pub enum TransportType {
24 L2cap,
25 Rfcomm,
26}
27
28impl TransportType {
29 pub fn srm_supported(&self) -> bool {
30 match &self {
31 Self::L2cap => true,
33 Self::Rfcomm => false,
37 }
38 }
39}
40
41#[derive(Debug)]
43pub struct ObexTransport<'a> {
44 channel: RefMut<'a, Channel>,
47 type_: TransportType,
49}
50
51impl<'a> ObexTransport<'a> {
52 pub fn new(channel: RefMut<'a, Channel>, type_: TransportType) -> Self {
53 Self { channel, type_ }
54 }
55
56 pub fn srm_supported(&self) -> bool {
58 self.type_.srm_supported()
59 }
60
61 pub fn send(&self, data: impl Encodable<Error = PacketError>) -> Result<(), Error> {
64 let mut buf = vec![0; data.encoded_len()];
65 data.encode(&mut buf[..])?;
66 let _ = self.channel.write(&buf)?;
67 Ok(())
68 }
69
70 pub async fn receive_response(&mut self, code: OpCode) -> Result<ResponsePacket, Error> {
75 if self.channel.is_terminated() {
76 return Err(Error::PeerDisconnected);
77 }
78
79 match self.channel.try_next().await? {
80 Some(raw_data) => {
81 let decoded = ResponsePacket::decode(&raw_data[..], code)?;
82 trace!("Received response: {decoded:?}");
83 Ok(decoded)
84 }
85 None => {
86 info!("OBEX transport closed");
87 Err(Error::PeerDisconnected)
88 }
89 }
90 }
91}
92
93#[derive(Debug)]
96pub struct ObexTransportManager {
97 channel: RefCell<Channel>,
103 type_: TransportType,
105}
106
107impl ObexTransportManager {
108 pub fn new(channel: Channel, type_: TransportType) -> Self {
109 Self { channel: RefCell::new(channel), type_ }
110 }
111
112 fn new_permit(&self) -> Result<RefMut<'_, Channel>, Error> {
113 self.channel.try_borrow_mut().map_err(|_| Error::OperationInProgress)
114 }
115
116 pub fn is_transport_closed(&self) -> bool {
117 self.channel.try_borrow().map_or(false, |chan| chan.is_closed())
118 }
119
120 pub fn try_new_operation(&self) -> Result<ObexTransport<'_>, Error> {
121 let channel = self.new_permit()?;
123 Ok(ObexTransport::new(channel, self.type_))
124 }
125}
126
127#[cfg(test)]
128pub(crate) mod test_utils {
129 use super::*;
130
131 use async_test_helpers::expect_stream_item;
132 use fuchsia_async as fasync;
133 use packet_encoding::Decodable;
134
135 use crate::operation::RequestPacket;
136
137 pub(crate) fn new_manager(srm_supported: bool) -> (ObexTransportManager, Channel) {
139 let (local, remote) = Channel::create();
140 let type_ = if srm_supported { TransportType::L2cap } else { TransportType::Rfcomm };
141 let manager = ObexTransportManager::new(local, type_);
142 (manager, remote)
143 }
144
145 #[derive(Clone)]
146 pub struct TestPacket(pub u8);
147
148 impl Encodable for TestPacket {
149 type Error = PacketError;
150 fn encoded_len(&self) -> usize {
151 1
152 }
153 fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
154 buf[0] = self.0;
155 Ok(())
156 }
157 }
158
159 impl Decodable for TestPacket {
160 type Error = PacketError;
161 fn decode(buf: &[u8]) -> Result<Self, Self::Error> {
162 Ok(TestPacket(buf[0]))
163 }
164 }
165
166 #[track_caller]
167 pub fn reply(channel: &mut Channel, response: ResponsePacket) {
168 let mut response_buf = vec![0; response.encoded_len()];
169 response.encode(&mut response_buf[..]).expect("can encode response");
170 let _ = channel.write(&response_buf[..]).expect("write to channel success");
171 }
172
173 #[track_caller]
175 pub fn send_packet<T>(channel: &mut Channel, packet: T)
176 where
177 T: Encodable,
178 <T as Encodable>::Error: std::fmt::Debug,
179 {
180 let mut buf = vec![0; packet.encoded_len()];
181 packet.encode(&mut buf[..]).expect("can encode packet");
182 let _ = channel.write(&buf[..]).expect("write to channel success");
183 }
184
185 #[track_caller]
186 pub fn expect_request<F>(exec: &mut fasync::TestExecutor, channel: &mut Channel, expectation: F)
187 where
188 F: FnOnce(RequestPacket),
189 {
190 let request_raw = expect_stream_item(exec, channel).expect("request");
191 let request = RequestPacket::decode(&request_raw[..]).expect("can decode request");
192 expectation(request);
193 }
194
195 #[track_caller]
196 pub fn expect_response<F>(
197 exec: &mut fasync::TestExecutor,
198 channel: &mut Channel,
199 expectation: F,
200 opcode: OpCode,
201 ) where
202 F: FnOnce(ResponsePacket),
203 {
204 let request_raw = expect_stream_item(exec, channel).expect("request");
205 let request = ResponsePacket::decode(&request_raw[..], opcode).expect("can decode request");
206 expectation(request);
207 }
208
209 #[track_caller]
212 pub fn expect_request_and_reply<F>(
213 exec: &mut fasync::TestExecutor,
214 channel: &mut Channel,
215 expectation: F,
216 response: ResponsePacket,
217 ) where
218 F: FnOnce(RequestPacket),
219 {
220 expect_request(exec, channel, expectation);
221 reply(channel, response)
222 }
223
224 pub fn expect_code(code: OpCode) -> impl FnOnce(RequestPacket) {
225 let f = move |request: RequestPacket| {
226 assert_eq!(*request.code(), code);
227 };
228 f
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 use assert_matches::assert_matches;
237
238 use async_utils::PollExt;
239 use fuchsia_async as fasync;
240 use std::pin::pin;
241
242 use crate::header::HeaderSet;
243 use crate::operation::{RequestPacket, ResponseCode};
244 use crate::transport::test_utils::{
245 expect_code, expect_request_and_reply, new_manager, TestPacket,
246 };
247
248 #[fuchsia::test]
249 fn transport_manager_new_operation() {
250 let _exec = fasync::TestExecutor::new();
251 let (manager, _remote) = new_manager(false);
252
253 assert_matches!(manager.new_permit(), Ok(_));
255
256 let transport1 = manager.try_new_operation().expect("can start operation");
258 assert_matches!(manager.try_new_operation(), Err(Error::OperationInProgress));
260
261 drop(transport1);
263 let transport2 = manager.try_new_operation().expect("can start another operation");
264 let request = RequestPacket::new_connect(100, HeaderSet::new());
265 transport2.send(request).expect("can send request");
266 }
267
268 #[fuchsia::test]
269 fn send_and_receive() {
270 let mut exec = fasync::TestExecutor::new();
271 let (manager, mut remote) = new_manager(false);
272 let mut transport = manager.try_new_operation().expect("can start operation");
273
274 let request = RequestPacket::new_connect(100, HeaderSet::new());
276 transport.send(request).expect("can send request");
277 let peer_response =
279 ResponsePacket::new(ResponseCode::Ok, vec![0x10, 0x00, 0x00, 0xff], HeaderSet::new());
280 expect_request_and_reply(
281 &mut exec,
282 &mut remote,
283 expect_code(OpCode::Connect),
284 peer_response,
285 );
286 let receive_fut = transport.receive_response(OpCode::Connect);
288 let mut receive_fut = pin!(receive_fut);
289 let received_response = exec
290 .run_until_stalled(&mut receive_fut)
291 .expect("stream item from response")
292 .expect("valid response");
293 assert_eq!(*received_response.code(), ResponseCode::Ok);
294 }
295
296 #[fuchsia::test]
297 async fn send_while_channel_closed_is_error() {
298 let (manager, remote) = new_manager(false);
299 let transport = manager.try_new_operation().expect("can start operation");
300 drop(remote);
301
302 let data = TestPacket(10);
303 let send_result = transport.send(data.clone());
304 assert_matches!(send_result, Err(Error::IOError(_)));
305 let send_result = transport.send(data.clone());
307 assert_matches!(send_result, Err(Error::IOError(_)));
308 }
309
310 #[fuchsia::test]
311 async fn is_transport_closed() {
312 let (manager, remote) = new_manager(false);
313 assert!(!manager.is_transport_closed());
314
315 {
316 let _transport = manager.try_new_operation().expect("can start operation");
317 assert!(!manager.is_transport_closed());
318
319 drop(remote);
322 assert!(!manager.is_transport_closed());
323 }
324
325 assert!(manager.is_transport_closed());
328 }
329
330 #[fuchsia::test]
331 async fn receive_while_channel_closed_is_error() {
332 let (manager, remote) = new_manager(false);
333 let mut transport = manager.try_new_operation().expect("can start operation");
334 drop(remote);
335
336 let receive_result = transport.receive_response(OpCode::Connect).await;
337 assert_matches!(receive_result, Err(Error::PeerDisconnected));
338 let receive_result = transport.receive_response(OpCode::Connect).await;
340 assert_matches!(receive_result, Err(Error::PeerDisconnected));
341 }
342}