1use fuchsia_bluetooth::types::Channel;
6use log::{trace, warn};
7
8pub use crate::client::get::GetOperation;
9pub use crate::client::put::PutOperation;
10use crate::error::Error;
11use crate::header::{
12 ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
13};
14use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
15pub use crate::transport::TransportType;
16use crate::transport::{max_packet_size_from_transport, ObexTransportManager};
17use fuchsia_sync::Mutex;
18
19mod put;
21
22mod get;
24
25pub(crate) trait SrmOperation {
27 const OPERATION_TYPE: OpCode;
28
29 fn get_srm(&self) -> SingleResponseMode;
31
32 fn set_srm(&mut self, mode: SingleResponseMode);
34
35 fn try_enable_srm(&mut self, headers: &mut HeaderSet) -> Result<(), Error> {
39 let requested_srm = headers.try_add_srm(self.get_srm())?;
40 self.set_srm(requested_srm);
41 trace!(operation:? = Self::OPERATION_TYPE; "Requesting SRM {requested_srm:?}");
42 Ok(())
43 }
44
45 fn check_response_for_srm(&mut self, headers: &HeaderSet) {
48 let srm_response = if let Some(Header::SingleResponseMode(srm)) =
49 headers.get(&HeaderIdentifier::SingleResponseMode)
50 {
51 *srm
52 } else {
53 trace!(operation:? = Self::OPERATION_TYPE; "Response doesn't contain SRM header");
55 SingleResponseMode::Disable
56 };
57
58 trace!(current_status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "Peer responded with {srm_response:?}");
59 match (srm_response, self.get_srm()) {
60 (SingleResponseMode::Enable, SingleResponseMode::Disable) => {
61 warn!("SRM stays disabled");
62 }
63 (SingleResponseMode::Disable, SingleResponseMode::Enable) => {
64 trace!("SRM is disabled");
65 self.set_srm(SingleResponseMode::Disable);
66 }
67 _ => {} }
69 trace!(status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "SRM status");
70 }
71}
72
73#[derive(Clone, Copy, Debug, PartialEq, Default)]
74enum ConnectionStatus {
75 #[default]
77 Initialized,
78 Connected { id: Option<ConnectionIdentifier> },
82 Disconnected,
84}
85
86impl ConnectionStatus {
87 #[cfg(test)]
88 fn connected_no_id() -> Self {
89 Self::Connected { id: None }
90 }
91}
92
93#[derive(Debug)]
97pub struct ObexClient {
98 connected: Mutex<ConnectionStatus>,
100 max_packet_size: Mutex<u16>,
102 transport: ObexTransportManager,
105}
106
107impl ObexClient {
108 pub fn new(channel: Channel, type_: TransportType) -> Self {
109 let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
110 let transport = ObexTransportManager::new(channel, type_);
111 Self {
112 connected: Mutex::new(ConnectionStatus::default()),
113 max_packet_size: Mutex::new(max_packet_size),
114 transport,
115 }
116 }
117
118 pub fn is_transport_connected(&self) -> bool {
119 !self.transport.is_transport_closed()
120 }
121
122 fn set_connection_status(&self, status: ConnectionStatus) {
123 *self.connected.lock() = status;
124 }
125
126 fn connection_status(&self) -> ConnectionStatus {
127 *self.connected.lock()
128 }
129
130 pub fn is_connected(&self) -> bool {
131 matches!(*self.connected.lock(), ConnectionStatus::Connected { .. })
132 }
133
134 pub fn connection_id(&self) -> Option<ConnectionIdentifier> {
135 match self.connection_status() {
136 ConnectionStatus::Connected { id } => id.clone(),
137 _ => None,
138 }
139 }
140
141 fn set_max_packet_size(&self, peer_max_packet_size: u16) {
142 *self.max_packet_size.lock() = peer_max_packet_size;
144 trace!("Max packet size set to {peer_max_packet_size}");
145 }
146
147 fn max_packet_size(&self) -> u16 {
148 *self.max_packet_size.lock()
149 }
150
151 fn handle_connect_response(&self, response: ResponsePacket) -> Result<HeaderSet, Error> {
152 let request = OpCode::Connect;
153 let response = response.expect_code(request, ResponseCode::Ok)?;
154
155 if response.data().len() != request.response_data_length() {
158 return Err(Error::response(request, "Invalid CONNECT data"));
159 }
160 let peer_max_packet_size = u16::from_be_bytes(response.data()[2..4].try_into().unwrap());
161 self.set_max_packet_size(peer_max_packet_size);
162
163 let headers: HeaderSet = response.into();
166 if let Some(Header::ConnectionId(id)) = headers.get(&HeaderIdentifier::ConnectionId) {
167 trace!(id:? = id; "Found Connection Identifier in CONNECT response");
168 self.set_connection_status(ConnectionStatus::Connected { id: Some(*id) });
169 }
170 Ok(headers)
171 }
172
173 pub async fn connect(&self, headers: HeaderSet) -> Result<HeaderSet, Error> {
177 if self.is_connected() {
178 return Err(Error::operation(OpCode::Connect, "already connected"));
179 }
180
181 let response = {
182 let request = RequestPacket::new_connect(self.max_packet_size(), headers);
183 let mut transport = self.transport.try_new_operation()?;
184 trace!("Making outgoing CONNECT request: {request:?}");
185 transport.send(request)?;
186 trace!("Successfully made CONNECT request");
187 transport.receive_response(OpCode::Connect).await?
188 };
189
190 let response_headers = self.handle_connect_response(response)?;
191 Ok(response_headers)
192 }
193
194 pub async fn disconnect(self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
199 let opcode = OpCode::Disconnect;
200 if !self.is_connected() {
201 return Err(Error::operation(opcode, "session not connected"));
202 }
203 headers.try_add_connection_id(&self.connection_id())?;
204 let response = {
205 let request = RequestPacket::new_disconnect(headers);
206 let mut transport = self.transport.try_new_operation()?;
207 trace!("Making outgoing DISCONNECT request: {request:?}");
208 transport.send(request)?;
209 trace!("Successfully made DISCONNECT request");
210 transport.receive_response(opcode).await?
211 };
212 self.set_connection_status(ConnectionStatus::Disconnected);
213 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
214 }
215
216 pub fn get(&self) -> Result<GetOperation<'_>, Error> {
219 if !self.is_connected() {
221 return Err(Error::operation(OpCode::Get, "session not connected"));
222 }
223
224 let mut headers = HeaderSet::new();
225 headers.try_add_connection_id(&self.connection_id())?;
226
227 let transport = self.transport.try_new_operation()?;
229 Ok(GetOperation::new(headers, transport))
230 }
231
232 pub fn put(&self) -> Result<PutOperation<'_>, Error> {
235 if !self.is_connected() {
237 return Err(Error::operation(OpCode::Put, "session not connected"));
238 }
239
240 let mut headers = HeaderSet::new();
241 headers.try_add_connection_id(&self.connection_id())?;
242
243 let transport = self.transport.try_new_operation()?;
245 Ok(PutOperation::new(headers, transport))
246 }
247
248 pub async fn set_path(
253 &self,
254 flags: SetPathFlags,
255 mut headers: HeaderSet,
256 ) -> Result<HeaderSet, Error> {
257 let opcode = OpCode::SetPath;
258 if !self.is_connected() {
260 return Err(Error::operation(opcode, "session not connected"));
261 }
262 headers.try_add_connection_id(&self.connection_id())?;
263 let request = RequestPacket::new_set_path(flags, headers)?;
264 let response = {
265 let mut transport = self.transport.try_new_operation()?;
266 trace!("Making outgoing SETPATH request: {request:?}");
267 transport.send(request)?;
268 trace!("Successfully made SETPATH request");
269 transport.receive_response(opcode).await?
270 };
271
272 if *response.code() == ResponseCode::BadRequest
275 || *response.code() == ResponseCode::Forbidden
276 {
277 return Err(Error::not_implemented(opcode));
278 }
279 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 use assert_matches::assert_matches;
288 use async_utils::PollExt;
289 use fuchsia_async as fasync;
290 use std::pin::pin;
291
292 use crate::transport::test_utils::{expect_code, expect_request_and_reply};
293
294 #[fuchsia::test]
295 fn max_packet_size_calculation() {
296 let transport_max = 1000;
298 assert_eq!(max_packet_size_from_transport(transport_max), 1000);
299
300 let transport_max_small = 40;
302 assert_eq!(max_packet_size_from_transport(transport_max_small), 255);
303
304 let transport_max_large = 700000;
306 assert_eq!(max_packet_size_from_transport(transport_max_large), std::u16::MAX);
307 }
308
309 fn new_obex_client(connected: ConnectionStatus) -> (ObexClient, Channel) {
313 let (local, remote) = Channel::create();
314 let client = ObexClient::new(local, TransportType::Rfcomm);
315 client.set_connection_status(connected);
316 (client, remote)
317 }
318
319 #[fuchsia::test]
320 fn client_connect_success() {
321 let mut exec = fasync::TestExecutor::new();
322 let (client, mut remote) = new_obex_client(ConnectionStatus::default());
323
324 assert!(!client.is_connected());
325 assert_eq!(client.max_packet_size(), Channel::DEFAULT_MAX_TX as u16);
326 assert_eq!(client.connection_id(), None);
327
328 {
329 let connect_fut = client.connect(HeaderSet::new());
330 let mut connect_fut = pin!(connect_fut);
331 exec.run_until_stalled(&mut connect_fut).expect_pending("waiting for response");
332
333 let response_headers =
335 HeaderSet::from_headers(vec![Header::ConnectionId(ConnectionIdentifier(1))])
336 .unwrap();
337 let response = ResponsePacket::new(
338 ResponseCode::Ok,
339 vec![0x10, 0x00, 0xff, 0xff], response_headers.clone(),
341 );
342 expect_request_and_reply(
343 &mut exec,
344 &mut remote,
345 expect_code(OpCode::Connect),
346 response,
347 );
348
349 let connect_result = exec
350 .run_until_stalled(&mut connect_fut)
351 .expect("received response")
352 .expect("response is ok");
353 assert_eq!(connect_result, response_headers);
354 }
355
356 assert!(client.is_connected());
358 assert_eq!(client.max_packet_size(), 0xffff);
359 assert_eq!(client.connection_id(), Some(ConnectionIdentifier(1)));
360 }
361
362 #[fuchsia::test]
363 async fn multiple_connect_is_error() {
364 let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
365
366 let result = client.connect(HeaderSet::new()).await;
368 assert_matches!(result, Err(Error::OperationError { .. }));
369 }
370
371 #[fuchsia::test]
372 fn get_before_connect_is_error() {
373 let _exec = fasync::TestExecutor::new();
374 let (client, _remote) = new_obex_client(ConnectionStatus::default());
375
376 let get_result = client.get();
377 assert_matches!(get_result, Err(Error::OperationError { .. }));
378 }
379
380 #[fuchsia::test]
381 fn sequential_get_operations_is_ok() {
382 let _exec = fasync::TestExecutor::new();
383 let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
384
385 let _get_operation1 = client.get().expect("can initialize first get");
387
388 drop(_get_operation1);
390 let _get_operation2 = client.get().expect("can initialize second get");
391 }
392
393 #[fuchsia::test]
394 fn disconnect_success() {
395 let mut exec = fasync::TestExecutor::new();
396 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
397
398 let headers = HeaderSet::from_header(Header::Description("finished".into()));
399 let disconnect_fut = client.disconnect(headers);
400 let mut disconnect_fut = pin!(disconnect_fut);
401 exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
402
403 let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
405 let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
406 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
407
408 let disconnect_result = exec
409 .run_until_stalled(&mut disconnect_fut)
410 .expect("received response")
411 .expect("response is ok");
412 assert_eq!(disconnect_result, response_headers);
413 }
414
415 #[fuchsia::test]
416 async fn disconnect_before_connect_error() {
417 let (client, _remote) = new_obex_client(ConnectionStatus::default());
418
419 let headers = HeaderSet::from_header(Header::Description("finished".into()));
420 let disconnect_result = client.disconnect(headers).await;
421 assert_matches!(disconnect_result, Err(Error::OperationError { .. }))
422 }
423
424 #[fuchsia::test]
425 fn disconnect_error_response_error() {
426 let mut exec = fasync::TestExecutor::new();
427 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
428
429 let disconnect_fut = client.disconnect(HeaderSet::new());
430 let mut disconnect_fut = pin!(disconnect_fut);
431 exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
432
433 let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
436 let response = ResponsePacket::new(
437 ResponseCode::InternalServerError,
438 vec![],
439 response_headers.clone(),
440 );
441 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
442
443 let disconnect_result =
444 exec.run_until_stalled(&mut disconnect_fut).expect("received response");
445 assert_matches!(disconnect_result, Err(Error::PeerRejected { .. }));
446 }
447
448 #[fuchsia::test]
449 fn setpath_success() {
450 let mut exec = fasync::TestExecutor::new();
451 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
452
453 let headers = HeaderSet::from_header(Header::name("myfolder"));
454 let setpath_fut = client.set_path(SetPathFlags::empty(), headers);
455 let mut setpath_fut = pin!(setpath_fut);
456 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
457
458 let response_headers =
460 HeaderSet::from_header(Header::Description("updated current folder".into()));
461 let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
462 let expectation = |request: RequestPacket| {
463 assert_eq!(*request.code(), OpCode::SetPath);
464 assert_eq!(request.data(), &[0, 0]);
465 };
466 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
467
468 let setpath_result = exec
469 .run_until_stalled(&mut setpath_fut)
470 .expect("received response")
471 .expect("response is ok");
472 assert_eq!(setpath_result, response_headers);
473 }
474
475 #[fuchsia::test]
476 fn setpath_error_response_is_error() {
477 let mut exec = fasync::TestExecutor::new();
478 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
479
480 {
482 let setpath_fut = client.set_path(SetPathFlags::BACKUP, HeaderSet::new());
483 let mut setpath_fut = pin!(setpath_fut);
484 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
485
486 let response_headers =
488 HeaderSet::from_header(Header::Description("not implemented".into()));
489 let response = ResponsePacket::new(ResponseCode::BadRequest, vec![], response_headers);
490 expect_request_and_reply(
491 &mut exec,
492 &mut remote,
493 expect_code(OpCode::SetPath),
494 response,
495 );
496
497 let setpath_result =
498 exec.run_until_stalled(&mut setpath_fut).expect("received response");
499 assert_matches!(setpath_result, Err(Error::NotImplemented { .. }));
500 }
501
502 let headers = HeaderSet::from_header(Header::name("file"));
504 let setpath_fut = client.set_path(SetPathFlags::DONT_CREATE, headers);
505 let mut setpath_fut = pin!(setpath_fut);
506 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
507
508 let response_headers =
510 HeaderSet::from_header(Header::Description("not implemented".into()));
511 let response =
512 ResponsePacket::new(ResponseCode::InternalServerError, vec![], response_headers);
513 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::SetPath), response);
514
515 let setpath_result = exec.run_until_stalled(&mut setpath_fut).expect("received response");
516 assert_matches!(setpath_result, Err(Error::PeerRejected { .. }));
517 }
518}