1use fuchsia_bluetooth::types::Channel;
6use futures::stream::{FusedStream, TryStreamExt};
7use log::{info, trace};
8use packet_encoding::Encodable;
9use std::cell::{RefCell, RefMut};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use crate::error::{Error, PacketError};
14use crate::operation::{MAX_PACKET_SIZE, MIN_MAX_PACKET_SIZE, OpCode, ResponsePacket};
15
16pub fn max_packet_size_from_transport(transport_max: usize) -> u16 {
19 let bounded = transport_max.clamp(MIN_MAX_PACKET_SIZE, MAX_PACKET_SIZE);
20 bounded.try_into().expect("bounded by u16 max")
21}
22
23#[derive(Copy, Clone, Debug, PartialEq)]
25pub enum TransportType {
26 L2cap,
27 Rfcomm,
28}
29
30impl TransportType {
31 pub fn srm_supported(&self) -> bool {
32 match &self {
33 Self::L2cap => true,
35 Self::Rfcomm => false,
39 }
40 }
41}
42
43#[derive(Debug)]
45pub struct ObexTransport<'a> {
46 channel: RefMut<'a, Channel>,
49 type_: TransportType,
51}
52
53impl<'a> ObexTransport<'a> {
54 pub fn new(channel: RefMut<'a, Channel>, type_: TransportType) -> Self {
55 Self { channel, type_ }
56 }
57
58 pub fn srm_supported(&self) -> bool {
60 self.type_.srm_supported()
61 }
62
63 pub async fn receive_response(&mut self, code: OpCode) -> Result<ResponsePacket, Error> {
68 if self.channel.is_terminated() {
69 return Err(Error::PeerDisconnected);
70 }
71
72 match self.channel.try_next().await? {
73 Some(raw_data) => {
74 let decoded = ResponsePacket::decode(&raw_data[..], code)?;
75 trace!("Received response: {decoded:?}");
76 Ok(decoded)
77 }
78 None => {
79 info!("OBEX transport closed");
80 Err(Error::PeerDisconnected)
81 }
82 }
83 }
84}
85
86impl<'a, T> futures::sink::Sink<T> for ObexTransport<'a>
87where
88 T: Encodable<Error = PacketError>,
89{
90 type Error = Error;
91
92 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
93 let this = self.get_mut();
94 Pin::new(&mut *this.channel).poll_ready(cx).map_err(Into::into)
95 }
96
97 fn start_send(self: Pin<&mut Self>, data: T) -> Result<(), Self::Error> {
98 let mut buf = vec![0; data.encoded_len()];
99 data.encode(&mut buf[..])?;
100 let this = self.get_mut();
101 Pin::new(&mut *this.channel).start_send(buf).map_err(Into::into)
102 }
103
104 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105 let this = self.get_mut();
106 Pin::new(&mut *this.channel).poll_flush(cx).map_err(Into::into)
107 }
108
109 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110 let this = self.get_mut();
111 Pin::new(&mut *this.channel).poll_close(cx).map_err(Into::into)
112 }
113}
114
115#[derive(Debug)]
118pub struct ObexTransportManager {
119 channel: RefCell<Channel>,
125 type_: TransportType,
127}
128
129impl ObexTransportManager {
130 pub fn new(channel: Channel, type_: TransportType) -> Self {
131 Self { channel: RefCell::new(channel), type_ }
132 }
133
134 fn new_permit(&self) -> Result<RefMut<'_, Channel>, Error> {
135 self.channel.try_borrow_mut().map_err(|_| Error::OperationInProgress)
136 }
137
138 pub fn is_transport_closed(&self) -> bool {
139 self.channel.try_borrow().map_or(false, |chan| chan.is_closed())
140 }
141
142 pub fn try_new_operation(&self) -> Result<ObexTransport<'_>, Error> {
143 let channel = self.new_permit()?;
145 Ok(ObexTransport::new(channel, self.type_))
146 }
147}
148
149#[cfg(test)]
150pub(crate) mod test_utils {
151 use super::*;
152 use futures::SinkExt;
153
154 use async_test_helpers::expect_stream_item;
155 use fuchsia_async as fasync;
156 use packet_encoding::Decodable;
157
158 use crate::operation::RequestPacket;
159 use async_utils::PollExt;
160
161 pub(crate) fn new_manager(srm_supported: bool) -> (ObexTransportManager, Channel) {
163 let (local, remote) = Channel::create();
164 let type_ = if srm_supported { TransportType::L2cap } else { TransportType::Rfcomm };
165 let manager = ObexTransportManager::new(local, type_);
166 (manager, remote)
167 }
168
169 #[derive(Clone)]
170 pub struct TestPacket(pub u8);
171
172 impl Encodable for TestPacket {
173 type Error = PacketError;
174 fn encoded_len(&self) -> usize {
175 1
176 }
177 fn encode(&self, buf: &mut [u8]) -> Result<(), Self::Error> {
178 buf[0] = self.0;
179 Ok(())
180 }
181 }
182
183 impl Decodable for TestPacket {
184 type Error = PacketError;
185 fn decode(buf: &[u8]) -> Result<Self, Self::Error> {
186 Ok(TestPacket(buf[0]))
187 }
188 }
189
190 #[track_caller]
191 pub fn reply(exec: &mut fasync::TestExecutor, channel: &mut Channel, response: ResponsePacket) {
192 let mut response_buf = vec![0; response.encoded_len()];
193 response.encode(&mut response_buf[..]).expect("can encode response");
194 let mut fut = channel.send(response_buf.to_vec());
195 exec.run_until_stalled(&mut fut).expect("write to channel success").expect("write success");
196 }
197
198 #[track_caller]
199 pub fn send_packet<T>(exec: &mut fasync::TestExecutor, channel: &mut Channel, packet: T)
200 where
201 T: Encodable,
202 <T as Encodable>::Error: std::fmt::Debug,
203 {
204 let mut buf = vec![0; packet.encoded_len()];
205 packet.encode(&mut buf[..]).expect("can encode packet");
206 let mut fut = channel.send(buf.to_vec());
207 exec.run_until_stalled(&mut fut).expect("write to channel success").expect("write success");
208 }
209
210 #[track_caller]
211 pub fn expect_request<F>(exec: &mut fasync::TestExecutor, channel: &mut Channel, expectation: F)
212 where
213 F: FnOnce(RequestPacket),
214 {
215 let request_raw = expect_stream_item(exec, channel).expect("request");
216 let request = RequestPacket::decode(&request_raw[..]).expect("can decode request");
217 expectation(request);
218 }
219
220 #[track_caller]
221 pub fn expect_response<F>(
222 exec: &mut fasync::TestExecutor,
223 channel: &mut Channel,
224 expectation: F,
225 opcode: OpCode,
226 ) where
227 F: FnOnce(ResponsePacket),
228 {
229 let request_raw = expect_stream_item(exec, channel).expect("request");
230 let request = ResponsePacket::decode(&request_raw[..], opcode).expect("can decode request");
231 expectation(request);
232 }
233
234 #[track_caller]
237 pub fn expect_request_and_reply<F>(
238 exec: &mut fasync::TestExecutor,
239 channel: &mut Channel,
240 expectation: F,
241 response: ResponsePacket,
242 ) where
243 F: FnOnce(RequestPacket),
244 {
245 expect_request(exec, channel, expectation);
246 reply(exec, channel, response)
247 }
248
249 pub fn expect_code(code: OpCode) -> impl FnOnce(RequestPacket) {
250 let f = move |request: RequestPacket| {
251 assert_eq!(*request.code(), code);
252 };
253 f
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use futures::SinkExt;
261
262 use assert_matches::assert_matches;
263
264 use async_utils::PollExt;
265 use fuchsia_async as fasync;
266 use std::pin::pin;
267
268 use crate::header::HeaderSet;
269 use crate::operation::{RequestPacket, ResponseCode};
270 use crate::transport::test_utils::{
271 TestPacket, expect_code, expect_request_and_reply, new_manager,
272 };
273
274 #[fuchsia::test]
275 fn transport_manager_new_operation() {
276 let mut exec = fasync::TestExecutor::new();
277 let (manager, _remote) = new_manager(false);
278
279 assert_matches!(manager.new_permit(), Ok(_));
281
282 let transport1 = manager.try_new_operation().expect("can start operation");
284 assert_matches!(manager.try_new_operation(), Err(Error::OperationInProgress));
286
287 drop(transport1);
289 let mut transport2 = manager.try_new_operation().expect("can start another operation");
290 let request = RequestPacket::new_connect(100, HeaderSet::new());
291 let mut send_fut = pin!(transport2.send(request));
292 exec.run_until_stalled(&mut send_fut)
293 .expect("send result ready")
294 .expect("can send request");
295 }
296
297 #[fuchsia::test]
298 fn send_and_receive() {
299 let mut exec = fasync::TestExecutor::new();
300 let (manager, mut remote) = new_manager(false);
301 let mut transport = manager.try_new_operation().expect("can start operation");
302
303 let request = RequestPacket::new_connect(100, HeaderSet::new());
305 {
306 let mut send_fut = pin!(transport.send(request));
307 exec.run_until_stalled(&mut send_fut)
308 .expect("send result ready")
309 .expect("can send request");
310 }
311 let peer_response =
313 ResponsePacket::new(ResponseCode::Ok, vec![0x10, 0x00, 0x00, 0xff], HeaderSet::new());
314 expect_request_and_reply(
315 &mut exec,
316 &mut remote,
317 expect_code(OpCode::Connect),
318 peer_response,
319 );
320 let receive_fut = transport.receive_response(OpCode::Connect);
322 let mut receive_fut = pin!(receive_fut);
323 let received_response = exec
324 .run_until_stalled(&mut receive_fut)
325 .expect("stream item from response")
326 .expect("valid response");
327 assert_eq!(*received_response.code(), ResponseCode::Ok);
328 }
329
330 #[fuchsia::test]
331 async fn send_while_channel_closed_is_error() {
332 let (manager, remote) = new_manager(false);
333 let mut transport = manager.try_new_operation().expect("can start operation");
334 drop(remote);
335
336 let data = TestPacket(10);
337 let send_result = transport.send(data.clone()).await;
338 assert_matches!(send_result, Err(Error::IOError(_)));
339 let send_result = transport.send(data.clone()).await;
341 assert_matches!(send_result, Err(Error::IOError(_)));
342 }
343
344 #[fuchsia::test]
345 async fn is_transport_closed() {
346 let (manager, remote) = new_manager(false);
347 assert!(!manager.is_transport_closed());
348
349 {
350 let _transport = manager.try_new_operation().expect("can start operation");
351 assert!(!manager.is_transport_closed());
352
353 drop(remote);
356 assert!(!manager.is_transport_closed());
357 }
358
359 assert!(manager.is_transport_closed());
362 }
363
364 #[fuchsia::test]
365 async fn receive_while_channel_closed_is_error() {
366 let (manager, remote) = new_manager(false);
367 let mut transport = manager.try_new_operation().expect("can start operation");
368 drop(remote);
369
370 let receive_result = transport.receive_response(OpCode::Connect).await;
371 assert_matches!(receive_result, Err(Error::PeerDisconnected));
372 let receive_result = transport.receive_response(OpCode::Connect).await;
374 assert_matches!(receive_result, Err(Error::PeerDisconnected));
375 }
376}