1use fuchsia_bluetooth::types::Channel;
6use futures::SinkExt;
7use log::{trace, warn};
8
9pub use crate::client::get::GetOperation;
10pub use crate::client::put::PutOperation;
11use crate::error::Error;
12use crate::header::{
13 ConnectionIdentifier, Header, HeaderIdentifier, HeaderSet, SingleResponseMode,
14};
15use crate::operation::{OpCode, RequestPacket, ResponseCode, ResponsePacket, SetPathFlags};
16pub use crate::transport::TransportType;
17use crate::transport::{ObexTransportManager, max_packet_size_from_transport};
18use fuchsia_sync::Mutex;
19
20mod put;
22
23mod get;
25
26pub(crate) trait SrmOperation {
28 const OPERATION_TYPE: OpCode;
29
30 fn get_srm(&self) -> SingleResponseMode;
32
33 fn set_srm(&mut self, mode: SingleResponseMode);
35
36 fn try_enable_srm(&mut self, headers: &mut HeaderSet) -> Result<(), Error> {
40 let requested_srm = headers.try_add_srm(self.get_srm())?;
41 self.set_srm(requested_srm);
42 trace!(operation:? = Self::OPERATION_TYPE; "Requesting SRM {requested_srm:?}");
43 Ok(())
44 }
45
46 fn check_response_for_srm(&mut self, headers: &HeaderSet) {
49 let srm_response = if let Some(Header::SingleResponseMode(srm)) =
50 headers.get(&HeaderIdentifier::SingleResponseMode)
51 {
52 *srm
53 } else {
54 trace!(operation:? = Self::OPERATION_TYPE; "Response doesn't contain SRM header");
56 SingleResponseMode::Disable
57 };
58
59 trace!(current_status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "Peer responded with {srm_response:?}");
60 match (srm_response, self.get_srm()) {
61 (SingleResponseMode::Enable, SingleResponseMode::Disable) => {
62 warn!("SRM stays disabled");
63 }
64 (SingleResponseMode::Disable, SingleResponseMode::Enable) => {
65 trace!("SRM is disabled");
66 self.set_srm(SingleResponseMode::Disable);
67 }
68 _ => {} }
70 trace!(status:? = self.get_srm(), operation:? = Self::OPERATION_TYPE; "SRM status");
71 }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Default)]
75enum ConnectionStatus {
76 #[default]
78 Initialized,
79 Connected { id: Option<ConnectionIdentifier> },
83 Disconnected,
85}
86
87impl ConnectionStatus {
88 #[cfg(test)]
89 fn connected_no_id() -> Self {
90 Self::Connected { id: None }
91 }
92}
93
94#[derive(Debug)]
98pub struct ObexClient {
99 connected: Mutex<ConnectionStatus>,
101 max_packet_size: Mutex<u16>,
103 transport: ObexTransportManager,
106}
107
108impl ObexClient {
109 pub fn new(channel: Channel, type_: TransportType) -> Self {
110 let max_packet_size = max_packet_size_from_transport(channel.max_tx_size());
111 let transport = ObexTransportManager::new(channel, type_);
112 Self {
113 connected: Mutex::new(ConnectionStatus::default()),
114 max_packet_size: Mutex::new(max_packet_size),
115 transport,
116 }
117 }
118
119 pub fn is_transport_connected(&self) -> bool {
120 !self.transport.is_transport_closed()
121 }
122
123 fn set_connection_status(&self, status: ConnectionStatus) {
124 *self.connected.lock() = status;
125 }
126
127 fn connection_status(&self) -> ConnectionStatus {
128 *self.connected.lock()
129 }
130
131 pub fn is_connected(&self) -> bool {
132 matches!(*self.connected.lock(), ConnectionStatus::Connected { .. })
133 }
134
135 pub fn connection_id(&self) -> Option<ConnectionIdentifier> {
136 match self.connection_status() {
137 ConnectionStatus::Connected { id } => id.clone(),
138 _ => None,
139 }
140 }
141
142 fn set_max_packet_size(&self, peer_max_packet_size: u16) {
143 *self.max_packet_size.lock() = peer_max_packet_size;
145 trace!("Max packet size set to {peer_max_packet_size}");
146 }
147
148 fn max_packet_size(&self) -> u16 {
149 *self.max_packet_size.lock()
150 }
151
152 fn handle_connect_response(&self, response: ResponsePacket) -> Result<HeaderSet, Error> {
153 let request = OpCode::Connect;
154 let response = response.expect_code(request, ResponseCode::Ok)?;
155
156 if response.data().len() != request.response_data_length() {
159 return Err(Error::response(request, "Invalid CONNECT data"));
160 }
161 let peer_max_packet_size = u16::from_be_bytes(response.data()[2..4].try_into().unwrap());
162 self.set_max_packet_size(peer_max_packet_size);
163
164 let headers: HeaderSet = response.into();
167 if let Some(Header::ConnectionId(id)) = headers.get(&HeaderIdentifier::ConnectionId) {
168 trace!(id:? = id; "Found Connection Identifier in CONNECT response");
169 self.set_connection_status(ConnectionStatus::Connected { id: Some(*id) });
170 }
171 Ok(headers)
172 }
173
174 pub async fn connect(&self, headers: HeaderSet) -> Result<HeaderSet, Error> {
178 if self.is_connected() {
179 return Err(Error::operation(OpCode::Connect, "already connected"));
180 }
181
182 let response = {
183 let request = RequestPacket::new_connect(self.max_packet_size(), headers);
184 let mut transport = self.transport.try_new_operation()?;
185 transport.send(request).await?;
186 trace!("Successfully made CONNECT request");
187 transport.receive_response(OpCode::Connect).await?
188 };
189
190 let response_headers = self.handle_connect_response(response)?;
191 Ok(response_headers)
192 }
193
194 pub async fn disconnect(self, mut headers: HeaderSet) -> Result<HeaderSet, Error> {
199 let opcode = OpCode::Disconnect;
200 if !self.is_connected() {
201 return Err(Error::operation(opcode, "session not connected"));
202 }
203 headers.try_add_connection_id(&self.connection_id())?;
204 let response = {
205 let request = RequestPacket::new_disconnect(headers);
206 let mut transport = self.transport.try_new_operation()?;
207 trace!("Making outgoing DISCONNECT request: {request:?}");
208 transport.send(request).await?;
209 trace!("Successfully made DISCONNECT request");
210 transport.receive_response(opcode).await?
211 };
212 self.set_connection_status(ConnectionStatus::Disconnected);
213 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
214 }
215
216 pub fn get(&self) -> Result<GetOperation<'_>, Error> {
219 if !self.is_connected() {
221 return Err(Error::operation(OpCode::Get, "session not connected"));
222 }
223
224 let mut headers = HeaderSet::new();
225 headers.try_add_connection_id(&self.connection_id())?;
226
227 let transport = self.transport.try_new_operation()?;
229 Ok(GetOperation::new(headers, transport))
230 }
231
232 pub fn put(&self) -> Result<PutOperation<'_>, Error> {
235 if !self.is_connected() {
237 return Err(Error::operation(OpCode::Put, "session not connected"));
238 }
239
240 let mut headers = HeaderSet::new();
241 headers.try_add_connection_id(&self.connection_id())?;
242
243 let transport = self.transport.try_new_operation()?;
245 Ok(PutOperation::new(headers, transport))
246 }
247
248 pub async fn set_path(
253 &self,
254 flags: SetPathFlags,
255 mut headers: HeaderSet,
256 ) -> Result<HeaderSet, Error> {
257 let opcode = OpCode::SetPath;
258 if !self.is_connected() {
260 return Err(Error::operation(opcode, "session not connected"));
261 }
262 headers.try_add_connection_id(&self.connection_id())?;
263 let request = RequestPacket::new_set_path(flags, headers)?;
264 let response = {
265 let mut transport = self.transport.try_new_operation()?;
266 trace!("Making outgoing SETPATH request: {request:?}");
267 transport.send(request).await?;
268 trace!("Successfully made SETPATH request");
269 transport.receive_response(opcode).await?
270 };
271
272 if *response.code() == ResponseCode::BadRequest
275 || *response.code() == ResponseCode::Forbidden
276 {
277 return Err(Error::not_implemented(opcode));
278 }
279 response.expect_code(opcode, ResponseCode::Ok).map(Into::into)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 use assert_matches::assert_matches;
288 use async_utils::PollExt;
289 use fuchsia_async as fasync;
290 use std::pin::pin;
291
292 use crate::transport::test_utils::{expect_code, expect_request_and_reply};
293
294 #[fuchsia::test]
295 fn max_packet_size_calculation() {
296 let transport_max = 1000;
298 assert_eq!(max_packet_size_from_transport(transport_max), 1000);
299
300 let transport_max_small = 40;
302 assert_eq!(max_packet_size_from_transport(transport_max_small), 255);
303
304 let transport_max_large = 700000;
306 assert_eq!(max_packet_size_from_transport(transport_max_large), std::u16::MAX);
307 }
308
309 fn new_obex_client(connected: ConnectionStatus) -> (ObexClient, Channel) {
313 let (local, remote) = Channel::create();
314 let client = ObexClient::new(local, TransportType::Rfcomm);
315 client.set_connection_status(connected);
316 (client, remote)
317 }
318
319 #[fuchsia::test]
320 fn client_connect_success() {
321 let mut exec = fasync::TestExecutor::new();
322 let (client, mut remote) = new_obex_client(ConnectionStatus::default());
323
324 assert!(!client.is_connected());
325 assert_eq!(client.max_packet_size(), Channel::DEFAULT_MAX_TX as u16);
326 assert_eq!(client.connection_id(), None);
327
328 {
329 let connect_fut = client.connect(HeaderSet::new());
330 let mut connect_fut = pin!(connect_fut);
331 exec.run_until_stalled(&mut connect_fut).expect_pending("waiting for response");
332
333 let response_headers =
335 HeaderSet::from_headers(vec![Header::ConnectionId(ConnectionIdentifier(1))])
336 .unwrap();
337 let response = ResponsePacket::new(
338 ResponseCode::Ok,
339 vec![0x10, 0x00, 0xff, 0xff], response_headers.clone(),
341 );
342 expect_request_and_reply(
343 &mut exec,
344 &mut remote,
345 expect_code(OpCode::Connect),
346 response,
347 );
348
349 let connect_result = exec
350 .run_until_stalled(&mut connect_fut)
351 .expect("received response")
352 .expect("response is ok");
353 assert_eq!(connect_result, response_headers);
354 }
355
356 assert!(client.is_connected());
358 assert_eq!(client.max_packet_size(), 0xffff);
359 assert_eq!(client.connection_id(), Some(ConnectionIdentifier(1)));
360 }
361
362 #[fuchsia::test]
363 async fn multiple_connect_is_error() {
364 let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
365
366 let result = client.connect(HeaderSet::new()).await;
368 assert_matches!(result, Err(Error::OperationError { .. }));
369 }
370
371 #[fuchsia::test]
372 fn get_before_connect_is_error() {
373 let _exec = fasync::TestExecutor::new();
374 let (client, _remote) = new_obex_client(ConnectionStatus::default());
375
376 let get_result = client.get();
377 assert_matches!(get_result, Err(Error::OperationError { .. }));
378 }
379
380 #[fuchsia::test]
381 fn sequential_get_operations_is_ok() {
382 let _exec = fasync::TestExecutor::new();
383 let (client, _remote) = new_obex_client(ConnectionStatus::connected_no_id());
384
385 let _get_operation1 = client.get().expect("can initialize first get");
387
388 drop(_get_operation1);
390 let _get_operation2 = client.get().expect("can initialize second get");
391 }
392
393 #[fuchsia::test]
394 fn disconnect_success() {
395 let mut exec = fasync::TestExecutor::new();
396 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
397
398 let headers = HeaderSet::from_header(Header::Description("finished".into()));
399 let disconnect_fut = client.disconnect(headers);
400 let mut disconnect_fut = pin!(disconnect_fut);
401 exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
402
403 let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
405 let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
406 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
407
408 let disconnect_result = exec
409 .run_until_stalled(&mut disconnect_fut)
410 .expect("received response")
411 .expect("response is ok");
412 assert_eq!(disconnect_result, response_headers);
413 }
414
415 #[fuchsia::test]
416 async fn disconnect_before_connect_error() {
417 let (client, _remote) = new_obex_client(ConnectionStatus::default());
418
419 let headers = HeaderSet::from_header(Header::Description("finished".into()));
420 let disconnect_result = client.disconnect(headers).await;
421 assert_matches!(disconnect_result, Err(Error::OperationError { .. }))
422 }
423
424 #[fuchsia::test]
425 fn disconnect_error_response_error() {
426 let mut exec = fasync::TestExecutor::new();
427 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
428
429 let disconnect_fut = client.disconnect(HeaderSet::new());
430 let mut disconnect_fut = pin!(disconnect_fut);
431 exec.run_until_stalled(&mut disconnect_fut).expect_pending("waiting for response");
432
433 let response_headers = HeaderSet::from_header(Header::Description("accepted".into()));
436 let response = ResponsePacket::new(
437 ResponseCode::InternalServerError,
438 vec![],
439 response_headers.clone(),
440 );
441 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::Disconnect), response);
442
443 let disconnect_result =
444 exec.run_until_stalled(&mut disconnect_fut).expect("received response");
445 assert_matches!(disconnect_result, Err(Error::PeerRejected { .. }));
446 }
447
448 #[fuchsia::test]
449 fn setpath_success() {
450 let mut exec = fasync::TestExecutor::new();
451 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
452
453 let headers = HeaderSet::from_header(Header::name("myfolder"));
454 let setpath_fut = client.set_path(SetPathFlags::empty(), headers);
455 let mut setpath_fut = pin!(setpath_fut);
456 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
457
458 let response_headers =
460 HeaderSet::from_header(Header::Description("updated current folder".into()));
461 let response = ResponsePacket::new(ResponseCode::Ok, vec![], response_headers.clone());
462 let expectation = |request: RequestPacket| {
463 assert_eq!(*request.code(), OpCode::SetPath);
464 assert_eq!(request.data(), &[0, 0]);
465 };
466 expect_request_and_reply(&mut exec, &mut remote, expectation, response);
467
468 let setpath_result = exec
469 .run_until_stalled(&mut setpath_fut)
470 .expect("received response")
471 .expect("response is ok");
472 assert_eq!(setpath_result, response_headers);
473 }
474
475 #[fuchsia::test]
476 fn setpath_error_response_is_error() {
477 let mut exec = fasync::TestExecutor::new();
478 let (client, mut remote) = new_obex_client(ConnectionStatus::connected_no_id());
479
480 {
482 let setpath_fut = client.set_path(SetPathFlags::BACKUP, HeaderSet::new());
483 let mut setpath_fut = pin!(setpath_fut);
484 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
485
486 let response_headers =
488 HeaderSet::from_header(Header::Description("not implemented".into()));
489 let response = ResponsePacket::new(ResponseCode::BadRequest, vec![], response_headers);
490 expect_request_and_reply(
491 &mut exec,
492 &mut remote,
493 expect_code(OpCode::SetPath),
494 response,
495 );
496
497 let setpath_result =
498 exec.run_until_stalled(&mut setpath_fut).expect("received response");
499 assert_matches!(setpath_result, Err(Error::NotImplemented { .. }));
500 }
501
502 let headers = HeaderSet::from_header(Header::name("file"));
504 let setpath_fut = client.set_path(SetPathFlags::DONT_CREATE, headers);
505 let mut setpath_fut = pin!(setpath_fut);
506 exec.run_until_stalled(&mut setpath_fut).expect_pending("waiting for response");
507
508 let response_headers =
510 HeaderSet::from_header(Header::Description("not implemented".into()));
511 let response =
512 ResponsePacket::new(ResponseCode::InternalServerError, vec![], response_headers);
513 expect_request_and_reply(&mut exec, &mut remote, expect_code(OpCode::SetPath), response);
514
515 let setpath_result = exec.run_until_stalled(&mut setpath_fut).expect("received response");
516 assert_matches!(setpath_result, Err(Error::PeerRejected { .. }));
517 }
518}