1use fidl::endpoints::{ClientEnd, Proxy};
6use futures::stream::{FusedStream, Stream};
7use futures::{Future, TryFutureExt, io};
8use log::warn;
9use std::fmt;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use {
14 fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
15 fuchsia_async as fasync,
16};
17
18use crate::error::Error;
19
20#[derive(PartialEq, Debug, Clone)]
22pub enum ChannelMode {
23 Basic,
24 EnhancedRetransmissionMode,
25 LeCreditBasedFlowControl,
26 EnhancedCreditBasedFlowControl,
27}
28
29impl fmt::Display for ChannelMode {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 ChannelMode::Basic => write!(f, "Basic"),
33 ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
34 ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
35 ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
36 }
37 }
38}
39
40pub enum A2dpDirection {
41 Normal,
42 Source,
43 Sink,
44}
45
46impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
47 fn from(pri: A2dpDirection) -> Self {
48 match pri {
49 A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
50 A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
51 A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
52 }
53 }
54}
55
56impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
57 type Error = Error;
58 fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
59 match fidl {
60 fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
61 fidl_bt::ChannelMode::EnhancedRetransmission => {
62 Ok(ChannelMode::EnhancedRetransmissionMode)
63 }
64 fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
65 Ok(ChannelMode::LeCreditBasedFlowControl)
66 }
67 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
68 Ok(ChannelMode::EnhancedCreditBasedFlowControl)
69 }
70 x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
71 }
72 }
73}
74
75impl From<ChannelMode> for fidl_bt::ChannelMode {
76 fn from(x: ChannelMode) -> Self {
77 match x {
78 ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
79 ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
80 ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
81 ChannelMode::EnhancedCreditBasedFlowControl => {
82 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
83 }
84 }
85 }
86}
87
88#[derive(Debug)]
93pub struct Channel {
94 socket: fasync::Socket,
95 mode: ChannelMode,
96 max_tx_size: usize,
97 flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
98 audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
99 l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
100 audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
101 terminated: bool,
102}
103
104impl Channel {
105 pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
108 Ok(Self::from_socket_infallible(socket, max_tx_size))
109 }
110
111 pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
113 Channel {
114 socket: fasync::Socket::from_socket(socket),
115 mode: ChannelMode::Basic,
116 max_tx_size,
117 flush_timeout: Arc::new(Mutex::new(None)),
118 audio_direction_ext: None,
119 l2cap_parameters_ext: None,
120 audio_offload_ext: None,
121 terminated: false,
122 }
123 }
124
125 pub const DEFAULT_MAX_TX: usize = 672;
128
129 pub fn create() -> (Self, Self) {
132 Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
133 }
134
135 pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
138 let (remote, local) = zx::Socket::create_datagram();
139 (
140 Channel::from_socket(remote, max_tx_size).unwrap(),
141 Channel::from_socket(local, max_tx_size).unwrap(),
142 )
143 }
144
145 pub fn max_tx_size(&self) -> usize {
148 self.max_tx_size
149 }
150
151 pub fn channel_mode(&self) -> &ChannelMode {
152 &self.mode
153 }
154
155 pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
156 self.flush_timeout.lock().unwrap().clone()
157 }
158
159 pub fn set_audio_priority(
162 &self,
163 dir: A2dpDirection,
164 ) -> impl Future<Output = Result<(), Error>> {
165 let proxy = self.audio_direction_ext.clone();
166 async move {
167 match proxy {
168 None => return Err(Error::profile("audio priority not supported")),
169 Some(proxy) => proxy
170 .set_priority(dir.into())
171 .await?
172 .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
173 }
174 }
175 }
176
177 pub fn set_flush_timeout(
184 &self,
185 duration: Option<zx::MonotonicDuration>,
186 ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> {
187 let flush_timeout = self.flush_timeout.clone();
188 let current = self.flush_timeout.lock().unwrap().clone();
189 let proxy = self.l2cap_parameters_ext.clone();
190 async move {
191 match (current, duration) {
192 (None, None) => return Ok(None),
193 (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
194 return Ok(current);
195 }
196 _ => {}
197 };
198 let proxy =
199 proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
200 let parameters = fidl_bt::ChannelParameters {
201 flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
202 ..Default::default()
203 };
204 let new_params = proxy.request_parameters(¶meters).await?;
205 let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
206 *(flush_timeout.lock().unwrap()) = new_timeout.clone();
207 Ok(new_timeout)
208 }
209 }
210
211 pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
213 self.audio_offload_ext.clone()
214 }
215
216 pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
217 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
218 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
219 close_wait.map_ok(|_o| ())
220 }
221
222 pub fn is_closed<'a>(&'a self) -> bool {
223 self.socket.is_closed()
224 }
225
226 pub fn poll_datagram(
227 &self,
228 cx: &mut Context<'_>,
229 out: &mut Vec<u8>,
230 ) -> Poll<Result<usize, zx::Status>> {
231 self.socket.poll_datagram(cx, out)
232 }
233
234 pub fn read_packet(&self) -> Result<Vec<u8>, zx::Status> {
237 let bytes_waiting = self.socket.as_ref().outstanding_read_bytes()?;
238 if bytes_waiting == 0 {
239 return Err(zx::Status::SHOULD_WAIT);
240 }
241 let mut packet = vec![0; bytes_waiting];
242 let _ = self.read(&mut packet[..])?;
243 Ok(packet)
244 }
245
246 pub fn read(&self, buf: &mut [u8]) -> Result<usize, zx::Status> {
249 self.socket.as_ref().read(buf)
250 }
251
252 pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
255 self.socket.as_ref().write(bytes)
256 }
257}
258
259impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
260 type Error = zx::Status;
261
262 fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
263 let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
264 Err(e) => {
265 warn!("Unsupported channel mode type: {e:?}");
266 return Err(zx::Status::INTERNAL);
267 }
268 Ok(c) => c,
269 };
270
271 Ok(Self {
272 socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
273 mode: channel,
274 max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
275 flush_timeout: Arc::new(Mutex::new(
276 fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
277 )),
278 audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
279 l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
280 audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
281 terminated: false,
282 })
283 }
284}
285
286impl TryFrom<Channel> for bredr::Channel {
287 type Error = Error;
288
289 fn try_from(channel: Channel) -> Result<Self, Self::Error> {
290 let socket = channel.socket.into_zx_socket();
291 let ext_direction = channel
292 .audio_direction_ext
293 .map(|proxy| {
294 let chan = proxy.into_channel()?;
295 Ok(ClientEnd::new(chan.into()))
296 })
297 .transpose()
298 .map_err(|_: bredr::AudioDirectionExtProxy| {
299 Error::profile("AudioDirection proxy in use")
300 })?;
301 let ext_l2cap = channel
302 .l2cap_parameters_ext
303 .map(|proxy| {
304 let chan = proxy.into_channel()?;
305 Ok(ClientEnd::new(chan.into()))
306 })
307 .transpose()
308 .map_err(|_: bredr::L2capParametersExtProxy| {
309 Error::profile("l2cap parameters proxy in use")
310 })?;
311 let ext_audio_offload = channel
312 .audio_offload_ext
313 .map(|proxy| {
314 let chan = proxy.into_channel()?;
315 Ok(ClientEnd::new(chan.into()))
316 })
317 .transpose()
318 .map_err(|_: bredr::AudioOffloadExtProxy| {
319 Error::profile("audio offload proxy in use")
320 })?;
321 let flush_timeout =
322 channel.flush_timeout.lock().unwrap().map(zx::MonotonicDuration::into_nanos);
323 Ok(bredr::Channel {
324 socket: Some(socket),
325 channel_mode: Some(channel.mode.into()),
326 max_tx_sdu_size: Some(channel.max_tx_size as u16),
327 ext_direction,
328 flush_timeout,
329 ext_l2cap,
330 ext_audio_offload,
331 ..Default::default()
332 })
333 }
334}
335
336impl Stream for Channel {
337 type Item = Result<Vec<u8>, zx::Status>;
338
339 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
340 if self.terminated {
341 panic!("Channel polled after terminated");
342 }
343
344 let mut res = Vec::<u8>::new();
345 loop {
346 break match self.socket.poll_datagram(cx, &mut res) {
347 Poll::Ready(Ok(0)) => continue,
350 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
351 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
352 self.terminated = true;
353 Poll::Ready(None)
354 }
355 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
356 Poll::Pending => Poll::Pending,
357 };
358 }
359 }
360}
361
362impl FusedStream for Channel {
363 fn is_terminated(&self) -> bool {
364 self.terminated
365 }
366}
367
368impl io::AsyncRead for Channel {
369 fn poll_read(
370 mut self: Pin<&mut Self>,
371 cx: &mut Context<'_>,
372 buf: &mut [u8],
373 ) -> Poll<Result<usize, futures::io::Error>> {
374 Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
375 }
376}
377
378impl io::AsyncWrite for Channel {
379 fn poll_write(
380 mut self: Pin<&mut Self>,
381 cx: &mut Context<'_>,
382 buf: &[u8],
383 ) -> Poll<Result<usize, io::Error>> {
384 Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
385 }
386
387 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
388 Pin::new(&mut self.socket).as_mut().poll_flush(cx)
389 }
390
391 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
392 Pin::new(&mut self.socket).as_mut().poll_close(cx)
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use fidl::endpoints::create_request_stream;
400 use futures::{AsyncReadExt, FutureExt, StreamExt};
401 use std::pin::pin;
402
403 #[test]
404 fn test_channel_create_and_write() {
405 let _exec = fasync::TestExecutor::new();
406 let (mut recv, send) = Channel::create();
407
408 let mut buf: [u8; 8] = [0; 8];
409 let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
410
411 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
412 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
413
414 match read_fut.now_or_never() {
415 Some(Ok(4)) => {}
416 x => panic!("Expected Ok(4) from the read, got {x:?}"),
417 };
418 assert_eq!(heart, &buf[0..4]);
419 }
420
421 #[test]
422 fn test_channel_from_fidl() {
423 let _exec = fasync::TestExecutor::new();
424 let empty = bredr::Channel::default();
425 assert!(Channel::try_from(empty).is_err());
426
427 let (remote, _local) = zx::Socket::create_datagram();
428
429 let okay = bredr::Channel {
430 socket: Some(remote),
431 channel_mode: Some(fidl_bt::ChannelMode::Basic),
432 max_tx_sdu_size: Some(1004),
433 ..Default::default()
434 };
435
436 let chan = Channel::try_from(okay).expect("okay channel to be converted");
437
438 assert_eq!(1004, chan.max_tx_size());
439 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
440 }
441
442 #[test]
443 fn test_channel_closed() {
444 let mut exec = fasync::TestExecutor::new();
445
446 let (recv, send) = Channel::create();
447
448 let closed_fut = recv.closed();
449 let mut closed_fut = pin!(closed_fut);
450
451 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
452 assert!(!recv.is_closed());
453
454 drop(send);
455
456 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
457 assert!(recv.is_closed());
458 }
459
460 #[test]
461 fn test_direction_ext() {
462 let mut exec = fasync::TestExecutor::new();
463
464 let (remote, _local) = zx::Socket::create_datagram();
465 let no_ext = bredr::Channel {
466 socket: Some(remote),
467 channel_mode: Some(fidl_bt::ChannelMode::Basic),
468 max_tx_sdu_size: Some(1004),
469 ..Default::default()
470 };
471 let channel = Channel::try_from(no_ext).unwrap();
472
473 assert!(
474 exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
475 );
476 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
477
478 let (remote, _local) = zx::Socket::create_datagram();
479 let (client_end, mut direction_request_stream) =
480 create_request_stream::<bredr::AudioDirectionExtMarker>();
481 let ext = bredr::Channel {
482 socket: Some(remote),
483 channel_mode: Some(fidl_bt::ChannelMode::Basic),
484 max_tx_sdu_size: Some(1004),
485 ext_direction: Some(client_end),
486 ..Default::default()
487 };
488
489 let channel = Channel::try_from(ext).unwrap();
490
491 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
492 let mut audio_direction_fut = pin!(audio_direction_fut);
493
494 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
495
496 match exec.run_until_stalled(&mut direction_request_stream.next()) {
497 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
498 priority,
499 responder,
500 }))) => {
501 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
502 responder.send(Ok(())).expect("response to send cleanly");
503 }
504 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
505 };
506
507 match exec.run_until_stalled(&mut audio_direction_fut) {
508 Poll::Ready(Ok(())) => {}
509 _x => panic!("Expected ok result from audio direction response"),
510 };
511
512 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
513 let mut audio_direction_fut = pin!(audio_direction_fut);
514
515 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
516
517 match exec.run_until_stalled(&mut direction_request_stream.next()) {
518 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
519 priority,
520 responder,
521 }))) => {
522 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
523 responder
524 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
525 .expect("response to send cleanly");
526 }
527 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
528 };
529
530 match exec.run_until_stalled(&mut audio_direction_fut) {
531 Poll::Ready(Err(_)) => {}
532 _x => panic!("Expected error result from audio direction response"),
533 };
534 }
535
536 #[test]
537 fn test_flush_timeout() {
538 let mut exec = fasync::TestExecutor::new();
539
540 let (remote, _local) = zx::Socket::create_datagram();
541 let no_ext = bredr::Channel {
542 socket: Some(remote),
543 channel_mode: Some(fidl_bt::ChannelMode::Basic),
544 max_tx_sdu_size: Some(1004),
545 flush_timeout: Some(50_000_000), ..Default::default()
547 };
548 let channel = Channel::try_from(no_ext).unwrap();
549
550 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
551
552 let res = exec.run_singlethreaded(
554 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
555 );
556 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
557 let res = exec.run_singlethreaded(
558 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
559 );
560 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
561
562 assert!(
563 exec.run_singlethreaded(
564 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
565 )
566 .is_err()
567 );
568 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
569
570 let (remote, _local) = zx::Socket::create_datagram();
571 let (client_end, mut l2cap_request_stream) =
572 create_request_stream::<bredr::L2capParametersExtMarker>();
573 let ext = bredr::Channel {
574 socket: Some(remote),
575 channel_mode: Some(fidl_bt::ChannelMode::Basic),
576 max_tx_sdu_size: Some(1004),
577 flush_timeout: None,
578 ext_l2cap: Some(client_end),
579 ..Default::default()
580 };
581
582 let channel = Channel::try_from(ext).unwrap();
583
584 {
585 let flush_timeout_fut = channel.set_flush_timeout(None);
586 let mut flush_timeout_fut = pin!(flush_timeout_fut);
587
588 match exec.run_until_stalled(&mut flush_timeout_fut) {
590 Poll::Ready(Ok(None)) => {}
591 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
592 }
593 }
594
595 let req_duration = zx::MonotonicDuration::from_millis(42);
596
597 {
598 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
599 let mut flush_timeout_fut = pin!(flush_timeout_fut);
600
601 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
602
603 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
604 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
605 request,
606 responder,
607 }))) => {
608 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
609 let params = fidl_bt::ChannelParameters {
611 flush_timeout: Some(50_000_000), ..Default::default()
613 };
614 responder.send(¶ms).expect("response to send cleanly");
615 }
616 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
617 };
618
619 match exec.run_until_stalled(&mut flush_timeout_fut) {
620 Poll::Ready(Ok(Some(duration))) => {
621 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
622 }
623 x => panic!("Expected ready result from params response, got {:?}", x),
624 };
625 }
626
627 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
629 }
630
631 #[test]
632 fn test_audio_offload() {
633 let _exec = fasync::TestExecutor::new();
634
635 let (remote, _local) = zx::Socket::create_datagram();
636 let no_ext = bredr::Channel {
637 socket: Some(remote),
638 channel_mode: Some(fidl_bt::ChannelMode::Basic),
639 max_tx_sdu_size: Some(1004),
640 ..Default::default()
641 };
642 let channel = Channel::try_from(no_ext).unwrap();
643
644 assert!(channel.audio_offload().is_none());
645
646 let (remote, _local) = zx::Socket::create_datagram();
647 let (client_end, mut _audio_offload_ext_req_stream) =
648 create_request_stream::<bredr::AudioOffloadExtMarker>();
649 let ext = bredr::Channel {
650 socket: Some(remote),
651 channel_mode: Some(fidl_bt::ChannelMode::Basic),
652 max_tx_sdu_size: Some(1004),
653 ext_audio_offload: Some(client_end),
654 ..Default::default()
655 };
656
657 let channel = Channel::try_from(ext).unwrap();
658
659 let offload_ext = channel.audio_offload();
660 assert!(offload_ext.is_some());
661 assert!(channel.audio_offload().is_some());
663 drop(offload_ext);
665 assert!(channel.audio_offload().is_some());
666 }
667}