fuchsia_bluetooth/types/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::{ClientEnd, Proxy};
6use futures::stream::{FusedStream, Stream};
7use futures::{io, Future, TryFutureExt};
8use log::warn;
9use std::fmt;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use {
14    fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
15    fuchsia_async as fasync,
16};
17
18use crate::error::Error;
19
20/// The Channel mode in use for a L2CAP channel.
21#[derive(PartialEq, Debug, Clone)]
22pub enum ChannelMode {
23    Basic,
24    EnhancedRetransmissionMode,
25    LeCreditBasedFlowControl,
26    EnhancedCreditBasedFlowControl,
27}
28
29impl fmt::Display for ChannelMode {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        match self {
32            ChannelMode::Basic => write!(f, "Basic"),
33            ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
34            ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
35            ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
36        }
37    }
38}
39
40pub enum A2dpDirection {
41    Normal,
42    Source,
43    Sink,
44}
45
46impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
47    fn from(pri: A2dpDirection) -> Self {
48        match pri {
49            A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
50            A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
51            A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
52        }
53    }
54}
55
56impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
57    type Error = Error;
58    fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
59        match fidl {
60            fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
61            fidl_bt::ChannelMode::EnhancedRetransmission => {
62                Ok(ChannelMode::EnhancedRetransmissionMode)
63            }
64            fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
65                Ok(ChannelMode::LeCreditBasedFlowControl)
66            }
67            fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
68                Ok(ChannelMode::EnhancedCreditBasedFlowControl)
69            }
70            x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
71        }
72    }
73}
74
75impl From<ChannelMode> for fidl_bt::ChannelMode {
76    fn from(x: ChannelMode) -> Self {
77        match x {
78            ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
79            ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
80            ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
81            ChannelMode::EnhancedCreditBasedFlowControl => {
82                fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
83            }
84        }
85    }
86}
87
88/// A data channel to a remote Peer. Channels are the primary data transfer mechanism for
89/// Bluetooth profiles and protocols.
90/// Channel currently implements Deref<Target = Socket> to easily access the underlying
91/// socket, and also implements AsyncWrite using a forwarding implementation.
92#[derive(Debug)]
93pub struct Channel {
94    socket: fasync::Socket,
95    mode: ChannelMode,
96    max_tx_size: usize,
97    flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
98    audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
99    l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
100    terminated: bool,
101}
102
103impl Channel {
104    /// Attempt to make a Channel from a zircon socket and a Maximum TX size received out of band.
105    /// Returns Err(status) if there is an error.
106    pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
107        Ok(Self::from_socket_infallible(socket, max_tx_size))
108    }
109
110    /// Make a Channel from a zircon socket and a Maximum TX size received out of band.
111    pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
112        Channel {
113            socket: fasync::Socket::from_socket(socket),
114            mode: ChannelMode::Basic,
115            max_tx_size,
116            flush_timeout: Arc::new(Mutex::new(None)),
117            audio_direction_ext: None,
118            l2cap_parameters_ext: None,
119            terminated: false,
120        }
121    }
122
123    /// The default max tx size is the default MTU size for L2CAP minus the channel header content.
124    /// See the Bluetooth Core Specification, Vol 3, Part A, Sec 5.1
125    pub const DEFAULT_MAX_TX: usize = 672;
126
127    /// Makes a pair of channels which are connected to each other, used commonly for testing.
128    /// The max_tx_size is set to `Channel::DEFAULT_MAX_TX`.
129    pub fn create() -> (Self, Self) {
130        Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
131    }
132
133    /// Make a pair of channels which are connected to each other, used commonly for testing.
134    /// The maximum transmittable unit is taken from `max_tx_size`.
135    pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
136        let (remote, local) = zx::Socket::create_datagram();
137        (
138            Channel::from_socket(remote, max_tx_size).unwrap(),
139            Channel::from_socket(local, max_tx_size).unwrap(),
140        )
141    }
142
143    /// The maximum transmittable size of a packet, in bytes.
144    /// Trying to send packets larger than this may cause the channel to be closed.
145    pub fn max_tx_size(&self) -> usize {
146        self.max_tx_size
147    }
148
149    pub fn channel_mode(&self) -> &ChannelMode {
150        &self.mode
151    }
152
153    pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
154        self.flush_timeout.lock().unwrap().clone()
155    }
156
157    /// Returns a future which will set the audio priority of the channel.
158    /// The future will return Err if setting the priority is not supported.
159    pub fn set_audio_priority(
160        &self,
161        dir: A2dpDirection,
162    ) -> impl Future<Output = Result<(), Error>> {
163        let proxy = self.audio_direction_ext.clone();
164        async move {
165            match proxy {
166                None => return Err(Error::profile("audio priority not supported")),
167                Some(proxy) => proxy
168                    .set_priority(dir.into())
169                    .await?
170                    .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
171            }
172        }
173    }
174
175    /// Attempt to set the flush timeout for this channel.
176    /// If the timeout is not already set within 1ms of `duration`, we attempt to set it using the
177    /// L2cap parameter extension.
178    /// `duration` can be infinite to set packets flushable without a timeout.
179    /// Returns a future that when polled will set the flush timeout and return the new timeout,
180    /// or return an error setting the parameter is not supported.
181    pub fn set_flush_timeout(
182        &self,
183        duration: Option<zx::MonotonicDuration>,
184    ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> {
185        let flush_timeout = self.flush_timeout.clone();
186        let current = self.flush_timeout.lock().unwrap().clone();
187        let proxy = self.l2cap_parameters_ext.clone();
188        async move {
189            match (current, duration) {
190                (None, None) => return Ok(None),
191                (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
192                    return Ok(current)
193                }
194                _ => {}
195            };
196            let proxy =
197                proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
198            let parameters = fidl_bt::ChannelParameters {
199                flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
200                ..Default::default()
201            };
202            let new_params = proxy.request_parameters(&parameters).await?;
203            let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
204            *(flush_timeout.lock().unwrap()) = new_timeout.clone();
205            Ok(new_timeout)
206        }
207    }
208
209    pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
210        let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
211        let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
212        close_wait.map_ok(|_o| ())
213    }
214
215    pub fn is_closed<'a>(&'a self) -> bool {
216        self.socket.is_closed()
217    }
218
219    pub fn poll_datagram(
220        &self,
221        cx: &mut Context<'_>,
222        out: &mut Vec<u8>,
223    ) -> Poll<Result<usize, zx::Status>> {
224        self.socket.poll_datagram(cx, out)
225    }
226
227    /// Read from the channel, allocating a Vec for the packet.
228    /// This will return zx::Status::SHOULD_WAIT if there is no data waiting to read.
229    pub fn read_packet(&self) -> Result<Vec<u8>, zx::Status> {
230        let bytes_waiting = self.socket.as_ref().outstanding_read_bytes()?;
231        if bytes_waiting == 0 {
232            return Err(zx::Status::SHOULD_WAIT);
233        }
234        let mut packet = vec![0; bytes_waiting];
235        let _ = self.read(&mut packet[..])?;
236        Ok(packet)
237    }
238
239    /// Read from the channel. This will return zx::Status::SHOULD_WAIT if there is no data
240    /// available.  If `buf` is not large enough, the rest of the packet will be thrown out.
241    pub fn read(&self, buf: &mut [u8]) -> Result<usize, zx::Status> {
242        self.socket.as_ref().read(buf)
243    }
244
245    /// Write to the channel.  This will return zx::Status::SHOULD_WAIT if the
246    /// the channel is too full.  Use the poll_write for asynchronous writing.
247    pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
248        self.socket.as_ref().write(bytes)
249    }
250}
251
252impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
253    type Error = zx::Status;
254
255    fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
256        let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
257            Err(e) => {
258                warn!("Unsupported channel mode type: {e:?}");
259                return Err(zx::Status::INTERNAL);
260            }
261            Ok(c) => c,
262        };
263
264        Ok(Self {
265            socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
266            mode: channel,
267            max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
268            flush_timeout: Arc::new(Mutex::new(
269                fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
270            )),
271            audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
272            l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
273            terminated: false,
274        })
275    }
276}
277
278impl TryFrom<Channel> for bredr::Channel {
279    type Error = Error;
280
281    fn try_from(channel: Channel) -> Result<Self, Self::Error> {
282        let socket = channel.socket.into_zx_socket();
283        let ext_direction = channel
284            .audio_direction_ext
285            .map(|proxy| {
286                let chan = proxy.into_channel()?;
287                Ok(ClientEnd::new(chan.into()))
288            })
289            .transpose()
290            .map_err(|_: bredr::AudioDirectionExtProxy| {
291                Error::profile("AudioDirection proxy in use")
292            })?;
293        let ext_l2cap = channel
294            .l2cap_parameters_ext
295            .map(|proxy| {
296                let chan = proxy.into_channel()?;
297                Ok(ClientEnd::new(chan.into()))
298            })
299            .transpose()
300            .map_err(|_: bredr::L2capParametersExtProxy| {
301                Error::profile("l2cap parameters proxy in use")
302            })?;
303        let flush_timeout =
304            channel.flush_timeout.lock().unwrap().map(zx::MonotonicDuration::into_nanos);
305        Ok(bredr::Channel {
306            socket: Some(socket),
307            channel_mode: Some(channel.mode.into()),
308            max_tx_sdu_size: Some(channel.max_tx_size as u16),
309            ext_direction,
310            flush_timeout,
311            ext_l2cap,
312            ..Default::default()
313        })
314    }
315}
316
317impl Stream for Channel {
318    type Item = Result<Vec<u8>, zx::Status>;
319
320    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321        if self.terminated {
322            panic!("Channel polled after terminated");
323        }
324
325        let mut res = Vec::<u8>::new();
326        loop {
327            break match self.socket.poll_datagram(cx, &mut res) {
328                // TODO(https://fxbug.dev/42072274): Sometimes sockets return spirious 0 byte packets when polled.
329                // Try again.
330                Poll::Ready(Ok(0)) => continue,
331                Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
332                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
333                    self.terminated = true;
334                    Poll::Ready(None)
335                }
336                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
337                Poll::Pending => Poll::Pending,
338            };
339        }
340    }
341}
342
343impl FusedStream for Channel {
344    fn is_terminated(&self) -> bool {
345        self.terminated
346    }
347}
348
349impl io::AsyncRead for Channel {
350    fn poll_read(
351        mut self: Pin<&mut Self>,
352        cx: &mut Context<'_>,
353        buf: &mut [u8],
354    ) -> Poll<Result<usize, futures::io::Error>> {
355        Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
356    }
357}
358
359impl io::AsyncWrite for Channel {
360    fn poll_write(
361        mut self: Pin<&mut Self>,
362        cx: &mut Context<'_>,
363        buf: &[u8],
364    ) -> Poll<Result<usize, io::Error>> {
365        Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
366    }
367
368    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
369        Pin::new(&mut self.socket).as_mut().poll_flush(cx)
370    }
371
372    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
373        Pin::new(&mut self.socket).as_mut().poll_close(cx)
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use fidl::endpoints::create_request_stream;
381    use futures::{AsyncReadExt, FutureExt, StreamExt};
382    use std::pin::pin;
383
384    #[test]
385    fn test_channel_create_and_write() {
386        let _exec = fasync::TestExecutor::new();
387        let (mut recv, send) = Channel::create();
388
389        let mut buf: [u8; 8] = [0; 8];
390        let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
391
392        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
393        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
394
395        match read_fut.now_or_never() {
396            Some(Ok(4)) => {}
397            x => panic!("Expected Ok(4) from the read, got {x:?}"),
398        };
399        assert_eq!(heart, &buf[0..4]);
400    }
401
402    #[test]
403    fn test_channel_from_fidl() {
404        let _exec = fasync::TestExecutor::new();
405        let empty = bredr::Channel::default();
406        assert!(Channel::try_from(empty).is_err());
407
408        let (remote, _local) = zx::Socket::create_datagram();
409
410        let okay = bredr::Channel {
411            socket: Some(remote),
412            channel_mode: Some(fidl_bt::ChannelMode::Basic),
413            max_tx_sdu_size: Some(1004),
414            ..Default::default()
415        };
416
417        let chan = Channel::try_from(okay).expect("okay channel to be converted");
418
419        assert_eq!(1004, chan.max_tx_size());
420        assert_eq!(&ChannelMode::Basic, chan.channel_mode());
421    }
422
423    #[test]
424    fn test_channel_closed() {
425        let mut exec = fasync::TestExecutor::new();
426
427        let (recv, send) = Channel::create();
428
429        let closed_fut = recv.closed();
430        let mut closed_fut = pin!(closed_fut);
431
432        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
433        assert!(!recv.is_closed());
434
435        drop(send);
436
437        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
438        assert!(recv.is_closed());
439    }
440
441    #[test]
442    fn test_direction_ext() {
443        let mut exec = fasync::TestExecutor::new();
444
445        let (remote, _local) = zx::Socket::create_datagram();
446        let no_ext = bredr::Channel {
447            socket: Some(remote),
448            channel_mode: Some(fidl_bt::ChannelMode::Basic),
449            max_tx_sdu_size: Some(1004),
450            ..Default::default()
451        };
452        let channel = Channel::try_from(no_ext).unwrap();
453
454        assert!(exec
455            .run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal))
456            .is_err());
457        assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
458
459        let (remote, _local) = zx::Socket::create_datagram();
460        let (client_end, mut direction_request_stream) =
461            create_request_stream::<bredr::AudioDirectionExtMarker>();
462        let ext = bredr::Channel {
463            socket: Some(remote),
464            channel_mode: Some(fidl_bt::ChannelMode::Basic),
465            max_tx_sdu_size: Some(1004),
466            ext_direction: Some(client_end),
467            ..Default::default()
468        };
469
470        let channel = Channel::try_from(ext).unwrap();
471
472        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
473        let mut audio_direction_fut = pin!(audio_direction_fut);
474
475        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
476
477        match exec.run_until_stalled(&mut direction_request_stream.next()) {
478            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
479                priority,
480                responder,
481            }))) => {
482                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
483                responder.send(Ok(())).expect("response to send cleanly");
484            }
485            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
486        };
487
488        match exec.run_until_stalled(&mut audio_direction_fut) {
489            Poll::Ready(Ok(())) => {}
490            _x => panic!("Expected ok result from audio direction response"),
491        };
492
493        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
494        let mut audio_direction_fut = pin!(audio_direction_fut);
495
496        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
497
498        match exec.run_until_stalled(&mut direction_request_stream.next()) {
499            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
500                priority,
501                responder,
502            }))) => {
503                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
504                responder
505                    .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
506                    .expect("response to send cleanly");
507            }
508            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
509        };
510
511        match exec.run_until_stalled(&mut audio_direction_fut) {
512            Poll::Ready(Err(_)) => {}
513            _x => panic!("Expected error result from audio direction response"),
514        };
515    }
516
517    #[test]
518    fn test_flush_timeout() {
519        let mut exec = fasync::TestExecutor::new();
520
521        let (remote, _local) = zx::Socket::create_datagram();
522        let no_ext = bredr::Channel {
523            socket: Some(remote),
524            channel_mode: Some(fidl_bt::ChannelMode::Basic),
525            max_tx_sdu_size: Some(1004),
526            flush_timeout: Some(50_000_000), // 50 milliseconds
527            ..Default::default()
528        };
529        let channel = Channel::try_from(no_ext).unwrap();
530
531        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
532
533        // Within 2 milliseconds, doesn't change.
534        let res = exec.run_singlethreaded(
535            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
536        );
537        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
538        let res = exec.run_singlethreaded(
539            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
540        );
541        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
542
543        assert!(exec
544            .run_singlethreaded(
545                channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
546            )
547            .is_err());
548        assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
549
550        let (remote, _local) = zx::Socket::create_datagram();
551        let (client_end, mut l2cap_request_stream) =
552            create_request_stream::<bredr::L2capParametersExtMarker>();
553        let ext = bredr::Channel {
554            socket: Some(remote),
555            channel_mode: Some(fidl_bt::ChannelMode::Basic),
556            max_tx_sdu_size: Some(1004),
557            flush_timeout: None,
558            ext_l2cap: Some(client_end),
559            ..Default::default()
560        };
561
562        let channel = Channel::try_from(ext).unwrap();
563
564        {
565            let flush_timeout_fut = channel.set_flush_timeout(None);
566            let mut flush_timeout_fut = pin!(flush_timeout_fut);
567
568            // Requesting no change returns right away with no change.
569            match exec.run_until_stalled(&mut flush_timeout_fut) {
570                Poll::Ready(Ok(None)) => {}
571                x => panic!("Expected no flush timeout to not stall, got {:?}", x),
572            }
573        }
574
575        let req_duration = zx::MonotonicDuration::from_millis(42);
576
577        {
578            let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
579            let mut flush_timeout_fut = pin!(flush_timeout_fut);
580
581            assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
582
583            match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
584                Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
585                    request,
586                    responder,
587                }))) => {
588                    assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
589                    // Send a different response
590                    let params = fidl_bt::ChannelParameters {
591                        flush_timeout: Some(50_000_000), // 50ms
592                        ..Default::default()
593                    };
594                    responder.send(&params).expect("response to send cleanly");
595                }
596                x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
597            };
598
599            match exec.run_until_stalled(&mut flush_timeout_fut) {
600                Poll::Ready(Ok(Some(duration))) => {
601                    assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
602                }
603                x => panic!("Expected ready result from params response, got {:?}", x),
604            };
605        }
606
607        // Channel should have recorded the new flush timeout.
608        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
609    }
610}