1use fidl::endpoints::{ClientEnd, Proxy};
6use futures::stream::{FusedStream, Stream};
7use futures::{io, Future, TryFutureExt};
8use log::warn;
9use std::fmt;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use {
14 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
15 fuchsia_async as fasync,
16};
17
18use crate::error::Error;
19
20#[derive(PartialEq, Debug, Clone)]
22pub enum ChannelMode {
23 Basic,
24 EnhancedRetransmissionMode,
25 LeCreditBasedFlowControl,
26 EnhancedCreditBasedFlowControl,
27}
28
29impl fmt::Display for ChannelMode {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 ChannelMode::Basic => write!(f, "Basic"),
33 ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
34 ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
35 ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
36 }
37 }
38}
39
40pub enum A2dpDirection {
41 Normal,
42 Source,
43 Sink,
44}
45
46impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
47 fn from(pri: A2dpDirection) -> Self {
48 match pri {
49 A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
50 A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
51 A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
52 }
53 }
54}
55
56impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
57 type Error = Error;
58 fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
59 match fidl {
60 fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
61 fidl_bt::ChannelMode::EnhancedRetransmission => {
62 Ok(ChannelMode::EnhancedRetransmissionMode)
63 }
64 fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
65 Ok(ChannelMode::LeCreditBasedFlowControl)
66 }
67 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
68 Ok(ChannelMode::EnhancedCreditBasedFlowControl)
69 }
70 x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
71 }
72 }
73}
74
75impl From<ChannelMode> for fidl_bt::ChannelMode {
76 fn from(x: ChannelMode) -> Self {
77 match x {
78 ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
79 ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
80 ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
81 ChannelMode::EnhancedCreditBasedFlowControl => {
82 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
83 }
84 }
85 }
86}
87
88#[derive(Debug)]
93pub struct Channel {
94 socket: fasync::Socket,
95 mode: ChannelMode,
96 max_tx_size: usize,
97 flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
98 audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
99 l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
100 terminated: bool,
101}
102
103impl Channel {
104 pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
107 Ok(Self::from_socket_infallible(socket, max_tx_size))
108 }
109
110 pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
112 Channel {
113 socket: fasync::Socket::from_socket(socket),
114 mode: ChannelMode::Basic,
115 max_tx_size,
116 flush_timeout: Arc::new(Mutex::new(None)),
117 audio_direction_ext: None,
118 l2cap_parameters_ext: None,
119 terminated: false,
120 }
121 }
122
123 pub const DEFAULT_MAX_TX: usize = 672;
126
127 pub fn create() -> (Self, Self) {
130 Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
131 }
132
133 pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
136 let (remote, local) = zx::Socket::create_datagram();
137 (
138 Channel::from_socket(remote, max_tx_size).unwrap(),
139 Channel::from_socket(local, max_tx_size).unwrap(),
140 )
141 }
142
143 pub fn max_tx_size(&self) -> usize {
146 self.max_tx_size
147 }
148
149 pub fn channel_mode(&self) -> &ChannelMode {
150 &self.mode
151 }
152
153 pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
154 self.flush_timeout.lock().unwrap().clone()
155 }
156
157 pub fn set_audio_priority(
160 &self,
161 dir: A2dpDirection,
162 ) -> impl Future<Output = Result<(), Error>> {
163 let proxy = self.audio_direction_ext.clone();
164 async move {
165 match proxy {
166 None => return Err(Error::profile("audio priority not supported")),
167 Some(proxy) => proxy
168 .set_priority(dir.into())
169 .await?
170 .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
171 }
172 }
173 }
174
175 pub fn set_flush_timeout(
182 &self,
183 duration: Option<zx::MonotonicDuration>,
184 ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> {
185 let flush_timeout = self.flush_timeout.clone();
186 let current = self.flush_timeout.lock().unwrap().clone();
187 let proxy = self.l2cap_parameters_ext.clone();
188 async move {
189 match (current, duration) {
190 (None, None) => return Ok(None),
191 (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
192 return Ok(current)
193 }
194 _ => {}
195 };
196 let proxy =
197 proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
198 let parameters = fidl_bt::ChannelParameters {
199 flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
200 ..Default::default()
201 };
202 let new_params = proxy.request_parameters(¶meters).await?;
203 let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
204 *(flush_timeout.lock().unwrap()) = new_timeout.clone();
205 Ok(new_timeout)
206 }
207 }
208
209 pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
210 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
211 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
212 close_wait.map_ok(|_o| ())
213 }
214
215 pub fn is_closed<'a>(&'a self) -> bool {
216 self.socket.is_closed()
217 }
218
219 pub fn poll_datagram(
220 &self,
221 cx: &mut Context<'_>,
222 out: &mut Vec<u8>,
223 ) -> Poll<Result<usize, zx::Status>> {
224 self.socket.poll_datagram(cx, out)
225 }
226
227 pub fn read_packet(&self) -> Result<Vec<u8>, zx::Status> {
230 let bytes_waiting = self.socket.as_ref().outstanding_read_bytes()?;
231 if bytes_waiting == 0 {
232 return Err(zx::Status::SHOULD_WAIT);
233 }
234 let mut packet = vec![0; bytes_waiting];
235 let _ = self.read(&mut packet[..])?;
236 Ok(packet)
237 }
238
239 pub fn read(&self, buf: &mut [u8]) -> Result<usize, zx::Status> {
242 self.socket.as_ref().read(buf)
243 }
244
245 pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
248 self.socket.as_ref().write(bytes)
249 }
250}
251
252impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
253 type Error = zx::Status;
254
255 fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
256 let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
257 Err(e) => {
258 warn!("Unsupported channel mode type: {e:?}");
259 return Err(zx::Status::INTERNAL);
260 }
261 Ok(c) => c,
262 };
263
264 Ok(Self {
265 socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
266 mode: channel,
267 max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
268 flush_timeout: Arc::new(Mutex::new(
269 fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
270 )),
271 audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
272 l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
273 terminated: false,
274 })
275 }
276}
277
278impl TryFrom<Channel> for bredr::Channel {
279 type Error = Error;
280
281 fn try_from(channel: Channel) -> Result<Self, Self::Error> {
282 let socket = channel.socket.into_zx_socket();
283 let ext_direction = channel
284 .audio_direction_ext
285 .map(|proxy| {
286 let chan = proxy.into_channel()?;
287 Ok(ClientEnd::new(chan.into()))
288 })
289 .transpose()
290 .map_err(|_: bredr::AudioDirectionExtProxy| {
291 Error::profile("AudioDirection proxy in use")
292 })?;
293 let ext_l2cap = channel
294 .l2cap_parameters_ext
295 .map(|proxy| {
296 let chan = proxy.into_channel()?;
297 Ok(ClientEnd::new(chan.into()))
298 })
299 .transpose()
300 .map_err(|_: bredr::L2capParametersExtProxy| {
301 Error::profile("l2cap parameters proxy in use")
302 })?;
303 let flush_timeout =
304 channel.flush_timeout.lock().unwrap().map(zx::MonotonicDuration::into_nanos);
305 Ok(bredr::Channel {
306 socket: Some(socket),
307 channel_mode: Some(channel.mode.into()),
308 max_tx_sdu_size: Some(channel.max_tx_size as u16),
309 ext_direction,
310 flush_timeout,
311 ext_l2cap,
312 ..Default::default()
313 })
314 }
315}
316
317impl Stream for Channel {
318 type Item = Result<Vec<u8>, zx::Status>;
319
320 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321 if self.terminated {
322 panic!("Channel polled after terminated");
323 }
324
325 let mut res = Vec::<u8>::new();
326 loop {
327 break match self.socket.poll_datagram(cx, &mut res) {
328 Poll::Ready(Ok(0)) => continue,
331 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
332 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
333 self.terminated = true;
334 Poll::Ready(None)
335 }
336 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
337 Poll::Pending => Poll::Pending,
338 };
339 }
340 }
341}
342
343impl FusedStream for Channel {
344 fn is_terminated(&self) -> bool {
345 self.terminated
346 }
347}
348
349impl io::AsyncRead for Channel {
350 fn poll_read(
351 mut self: Pin<&mut Self>,
352 cx: &mut Context<'_>,
353 buf: &mut [u8],
354 ) -> Poll<Result<usize, futures::io::Error>> {
355 Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
356 }
357}
358
359impl io::AsyncWrite for Channel {
360 fn poll_write(
361 mut self: Pin<&mut Self>,
362 cx: &mut Context<'_>,
363 buf: &[u8],
364 ) -> Poll<Result<usize, io::Error>> {
365 Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
366 }
367
368 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
369 Pin::new(&mut self.socket).as_mut().poll_flush(cx)
370 }
371
372 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
373 Pin::new(&mut self.socket).as_mut().poll_close(cx)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use fidl::endpoints::create_request_stream;
381 use futures::{AsyncReadExt, FutureExt, StreamExt};
382 use std::pin::pin;
383
384 #[test]
385 fn test_channel_create_and_write() {
386 let _exec = fasync::TestExecutor::new();
387 let (mut recv, send) = Channel::create();
388
389 let mut buf: [u8; 8] = [0; 8];
390 let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
391
392 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
393 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
394
395 match read_fut.now_or_never() {
396 Some(Ok(4)) => {}
397 x => panic!("Expected Ok(4) from the read, got {x:?}"),
398 };
399 assert_eq!(heart, &buf[0..4]);
400 }
401
402 #[test]
403 fn test_channel_from_fidl() {
404 let _exec = fasync::TestExecutor::new();
405 let empty = bredr::Channel::default();
406 assert!(Channel::try_from(empty).is_err());
407
408 let (remote, _local) = zx::Socket::create_datagram();
409
410 let okay = bredr::Channel {
411 socket: Some(remote),
412 channel_mode: Some(fidl_bt::ChannelMode::Basic),
413 max_tx_sdu_size: Some(1004),
414 ..Default::default()
415 };
416
417 let chan = Channel::try_from(okay).expect("okay channel to be converted");
418
419 assert_eq!(1004, chan.max_tx_size());
420 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
421 }
422
423 #[test]
424 fn test_channel_closed() {
425 let mut exec = fasync::TestExecutor::new();
426
427 let (recv, send) = Channel::create();
428
429 let closed_fut = recv.closed();
430 let mut closed_fut = pin!(closed_fut);
431
432 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
433 assert!(!recv.is_closed());
434
435 drop(send);
436
437 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
438 assert!(recv.is_closed());
439 }
440
441 #[test]
442 fn test_direction_ext() {
443 let mut exec = fasync::TestExecutor::new();
444
445 let (remote, _local) = zx::Socket::create_datagram();
446 let no_ext = bredr::Channel {
447 socket: Some(remote),
448 channel_mode: Some(fidl_bt::ChannelMode::Basic),
449 max_tx_sdu_size: Some(1004),
450 ..Default::default()
451 };
452 let channel = Channel::try_from(no_ext).unwrap();
453
454 assert!(exec
455 .run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal))
456 .is_err());
457 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
458
459 let (remote, _local) = zx::Socket::create_datagram();
460 let (client_end, mut direction_request_stream) =
461 create_request_stream::<bredr::AudioDirectionExtMarker>();
462 let ext = bredr::Channel {
463 socket: Some(remote),
464 channel_mode: Some(fidl_bt::ChannelMode::Basic),
465 max_tx_sdu_size: Some(1004),
466 ext_direction: Some(client_end),
467 ..Default::default()
468 };
469
470 let channel = Channel::try_from(ext).unwrap();
471
472 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
473 let mut audio_direction_fut = pin!(audio_direction_fut);
474
475 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
476
477 match exec.run_until_stalled(&mut direction_request_stream.next()) {
478 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
479 priority,
480 responder,
481 }))) => {
482 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
483 responder.send(Ok(())).expect("response to send cleanly");
484 }
485 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
486 };
487
488 match exec.run_until_stalled(&mut audio_direction_fut) {
489 Poll::Ready(Ok(())) => {}
490 _x => panic!("Expected ok result from audio direction response"),
491 };
492
493 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
494 let mut audio_direction_fut = pin!(audio_direction_fut);
495
496 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
497
498 match exec.run_until_stalled(&mut direction_request_stream.next()) {
499 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
500 priority,
501 responder,
502 }))) => {
503 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
504 responder
505 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
506 .expect("response to send cleanly");
507 }
508 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
509 };
510
511 match exec.run_until_stalled(&mut audio_direction_fut) {
512 Poll::Ready(Err(_)) => {}
513 _x => panic!("Expected error result from audio direction response"),
514 };
515 }
516
517 #[test]
518 fn test_flush_timeout() {
519 let mut exec = fasync::TestExecutor::new();
520
521 let (remote, _local) = zx::Socket::create_datagram();
522 let no_ext = bredr::Channel {
523 socket: Some(remote),
524 channel_mode: Some(fidl_bt::ChannelMode::Basic),
525 max_tx_sdu_size: Some(1004),
526 flush_timeout: Some(50_000_000), ..Default::default()
528 };
529 let channel = Channel::try_from(no_ext).unwrap();
530
531 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
532
533 let res = exec.run_singlethreaded(
535 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
536 );
537 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
538 let res = exec.run_singlethreaded(
539 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
540 );
541 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
542
543 assert!(exec
544 .run_singlethreaded(
545 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
546 )
547 .is_err());
548 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
549
550 let (remote, _local) = zx::Socket::create_datagram();
551 let (client_end, mut l2cap_request_stream) =
552 create_request_stream::<bredr::L2capParametersExtMarker>();
553 let ext = bredr::Channel {
554 socket: Some(remote),
555 channel_mode: Some(fidl_bt::ChannelMode::Basic),
556 max_tx_sdu_size: Some(1004),
557 flush_timeout: None,
558 ext_l2cap: Some(client_end),
559 ..Default::default()
560 };
561
562 let channel = Channel::try_from(ext).unwrap();
563
564 {
565 let flush_timeout_fut = channel.set_flush_timeout(None);
566 let mut flush_timeout_fut = pin!(flush_timeout_fut);
567
568 match exec.run_until_stalled(&mut flush_timeout_fut) {
570 Poll::Ready(Ok(None)) => {}
571 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
572 }
573 }
574
575 let req_duration = zx::MonotonicDuration::from_millis(42);
576
577 {
578 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
579 let mut flush_timeout_fut = pin!(flush_timeout_fut);
580
581 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
582
583 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
584 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
585 request,
586 responder,
587 }))) => {
588 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
589 let params = fidl_bt::ChannelParameters {
591 flush_timeout: Some(50_000_000), ..Default::default()
593 };
594 responder.send(¶ms).expect("response to send cleanly");
595 }
596 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
597 };
598
599 match exec.run_until_stalled(&mut flush_timeout_fut) {
600 Poll::Ready(Ok(Some(duration))) => {
601 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
602 }
603 x => panic!("Expected ready result from params response, got {:?}", x),
604 };
605 }
606
607 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
609 }
610}