Skip to main content

fuchsia_bluetooth/types/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::{ClientEnd, Proxy};
6use fuchsia_sync::Mutex;
7use futures::stream::{FusedStream, Stream};
8use futures::{Future, TryFutureExt, io};
9use log::warn;
10use std::fmt;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use {
15    fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
16    fuchsia_async as fasync,
17};
18
19use crate::error::Error;
20
21/// The Channel mode in use for a L2CAP channel.
22#[derive(PartialEq, Debug, Clone)]
23pub enum ChannelMode {
24    Basic,
25    EnhancedRetransmissionMode,
26    LeCreditBasedFlowControl,
27    EnhancedCreditBasedFlowControl,
28}
29
30impl fmt::Display for ChannelMode {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            ChannelMode::Basic => write!(f, "Basic"),
34            ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
35            ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
36            ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
37        }
38    }
39}
40
41pub enum A2dpDirection {
42    Normal,
43    Source,
44    Sink,
45}
46
47impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
48    fn from(pri: A2dpDirection) -> Self {
49        match pri {
50            A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
51            A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
52            A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
53        }
54    }
55}
56
57impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
58    type Error = Error;
59    fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
60        match fidl {
61            fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
62            fidl_bt::ChannelMode::EnhancedRetransmission => {
63                Ok(ChannelMode::EnhancedRetransmissionMode)
64            }
65            fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
66                Ok(ChannelMode::LeCreditBasedFlowControl)
67            }
68            fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
69                Ok(ChannelMode::EnhancedCreditBasedFlowControl)
70            }
71            x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
72        }
73    }
74}
75
76impl From<ChannelMode> for fidl_bt::ChannelMode {
77    fn from(x: ChannelMode) -> Self {
78        match x {
79            ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
80            ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
81            ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
82            ChannelMode::EnhancedCreditBasedFlowControl => {
83                fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
84            }
85        }
86    }
87}
88
89/// A data channel to a remote Peer. Channels are the primary data transfer mechanism for
90/// Bluetooth profiles and protocols.
91/// Channel currently implements Deref<Target = Socket> to easily access the underlying
92/// socket, and also implements AsyncWrite using a forwarding implementation.
93#[derive(Debug)]
94pub struct Channel {
95    socket: fasync::Socket,
96    mode: ChannelMode,
97    max_tx_size: usize,
98    flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
99    audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
100    l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
101    audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
102    terminated: bool,
103}
104
105impl Channel {
106    /// Attempt to make a Channel from a zircon socket and a Maximum TX size received out of band.
107    /// Returns Err(status) if there is an error.
108    pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
109        Ok(Self::from_socket_infallible(socket, max_tx_size))
110    }
111
112    /// Make a Channel from a zircon socket and a Maximum TX size received out of band.
113    pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
114        Channel {
115            socket: fasync::Socket::from_socket(socket),
116            mode: ChannelMode::Basic,
117            max_tx_size,
118            flush_timeout: Arc::new(Mutex::new(None)),
119            audio_direction_ext: None,
120            l2cap_parameters_ext: None,
121            audio_offload_ext: None,
122            terminated: false,
123        }
124    }
125
126    /// The default max tx size is the default MTU size for L2CAP minus the channel header content.
127    /// See the Bluetooth Core Specification, Vol 3, Part A, Sec 5.1
128    pub const DEFAULT_MAX_TX: usize = 672;
129
130    /// Makes a pair of channels which are connected to each other, used commonly for testing.
131    /// The max_tx_size is set to `Channel::DEFAULT_MAX_TX`.
132    pub fn create() -> (Self, Self) {
133        Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
134    }
135
136    /// Make a pair of channels which are connected to each other, used commonly for testing.
137    /// The maximum transmittable unit is taken from `max_tx_size`.
138    pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
139        let (remote, local) = zx::Socket::create_datagram();
140        (
141            Channel::from_socket(remote, max_tx_size).unwrap(),
142            Channel::from_socket(local, max_tx_size).unwrap(),
143        )
144    }
145
146    /// The maximum transmittable size of a packet, in bytes.
147    /// Trying to send packets larger than this may cause the channel to be closed.
148    pub fn max_tx_size(&self) -> usize {
149        self.max_tx_size
150    }
151
152    pub fn channel_mode(&self) -> &ChannelMode {
153        &self.mode
154    }
155
156    pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
157        self.flush_timeout.lock().clone()
158    }
159
160    /// Returns a future which will set the audio priority of the channel.
161    /// The future will return Err if setting the priority is not supported.
162    pub fn set_audio_priority(
163        &self,
164        dir: A2dpDirection,
165    ) -> impl Future<Output = Result<(), Error>> + use<> {
166        let proxy = self.audio_direction_ext.clone();
167        async move {
168            match proxy {
169                None => return Err(Error::profile("audio priority not supported")),
170                Some(proxy) => proxy
171                    .set_priority(dir.into())
172                    .await?
173                    .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
174            }
175        }
176    }
177
178    /// Attempt to set the flush timeout for this channel.
179    /// If the timeout is not already set within 1ms of `duration`, we attempt to set it using the
180    /// L2cap parameter extension.
181    /// `duration` can be infinite to set packets flushable without a timeout.
182    /// Returns a future that when polled will set the flush timeout and return the new timeout,
183    /// or return an error setting the parameter is not supported.
184    pub fn set_flush_timeout(
185        &self,
186        duration: Option<zx::MonotonicDuration>,
187    ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
188        let flush_timeout = self.flush_timeout.clone();
189        let current = self.flush_timeout.lock().clone();
190        let proxy = self.l2cap_parameters_ext.clone();
191        async move {
192            match (current, duration) {
193                (None, None) => return Ok(None),
194                (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
195                    return Ok(current);
196                }
197                _ => {}
198            };
199            let proxy =
200                proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
201            let parameters = fidl_bt::ChannelParameters {
202                flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
203                ..Default::default()
204            };
205            let new_params = proxy.request_parameters(&parameters).await?;
206            let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
207            *(flush_timeout.lock()) = new_timeout.clone();
208            Ok(new_timeout)
209        }
210    }
211
212    /// Get a copy of the Audio Offload Proxy for this channel, if it exists
213    pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
214        self.audio_offload_ext.clone()
215    }
216
217    pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
218        let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
219        let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
220        close_wait.map_ok(|_o| ())
221    }
222
223    pub fn is_closed<'a>(&'a self) -> bool {
224        self.socket.is_closed()
225    }
226
227    pub fn poll_datagram(
228        &self,
229        cx: &mut Context<'_>,
230        out: &mut Vec<u8>,
231    ) -> Poll<Result<usize, zx::Status>> {
232        self.socket.poll_datagram(cx, out)
233    }
234
235    /// Write to the channel.  This will return zx::Status::SHOULD_WAIT if the
236    /// the channel is too full.  Use the poll_write for asynchronous writing.
237    pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
238        self.socket.as_ref().write(bytes)
239    }
240}
241
242impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
243    type Error = zx::Status;
244
245    fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
246        let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
247            Err(e) => {
248                warn!("Unsupported channel mode type: {e:?}");
249                return Err(zx::Status::INTERNAL);
250            }
251            Ok(c) => c,
252        };
253
254        Ok(Self {
255            socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
256            mode: channel,
257            max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
258            flush_timeout: Arc::new(Mutex::new(
259                fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
260            )),
261            audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
262            l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
263            audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
264            terminated: false,
265        })
266    }
267}
268
269impl TryFrom<Channel> for bredr::Channel {
270    type Error = Error;
271
272    fn try_from(channel: Channel) -> Result<Self, Self::Error> {
273        let socket = channel.socket.into_zx_socket();
274        let ext_direction = channel
275            .audio_direction_ext
276            .map(|proxy| {
277                let chan = proxy.into_channel()?;
278                Ok(ClientEnd::new(chan.into()))
279            })
280            .transpose()
281            .map_err(|_: bredr::AudioDirectionExtProxy| {
282                Error::profile("AudioDirection proxy in use")
283            })?;
284        let ext_l2cap = channel
285            .l2cap_parameters_ext
286            .map(|proxy| {
287                let chan = proxy.into_channel()?;
288                Ok(ClientEnd::new(chan.into()))
289            })
290            .transpose()
291            .map_err(|_: bredr::L2capParametersExtProxy| {
292                Error::profile("l2cap parameters proxy in use")
293            })?;
294        let ext_audio_offload = channel
295            .audio_offload_ext
296            .map(|proxy| {
297                let chan = proxy.into_channel()?;
298                Ok(ClientEnd::new(chan.into()))
299            })
300            .transpose()
301            .map_err(|_: bredr::AudioOffloadExtProxy| {
302                Error::profile("audio offload proxy in use")
303            })?;
304        let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
305        Ok(bredr::Channel {
306            socket: Some(socket),
307            channel_mode: Some(channel.mode.into()),
308            max_tx_sdu_size: Some(channel.max_tx_size as u16),
309            ext_direction,
310            flush_timeout,
311            ext_l2cap,
312            ext_audio_offload,
313            ..Default::default()
314        })
315    }
316}
317
318impl Stream for Channel {
319    type Item = Result<Vec<u8>, zx::Status>;
320
321    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
322        if self.terminated {
323            panic!("Channel polled after terminated");
324        }
325
326        let mut res = Vec::<u8>::new();
327        loop {
328            break match self.socket.poll_datagram(cx, &mut res) {
329                // TODO(https://fxbug.dev/42072274): Sometimes sockets return spirious 0 byte packets when polled.
330                // Try again.
331                Poll::Ready(Ok(0)) => continue,
332                Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
333                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
334                    self.terminated = true;
335                    Poll::Ready(None)
336                }
337                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
338                Poll::Pending => Poll::Pending,
339            };
340        }
341    }
342}
343
344impl FusedStream for Channel {
345    fn is_terminated(&self) -> bool {
346        self.terminated
347    }
348}
349
350impl io::AsyncRead for Channel {
351    fn poll_read(
352        mut self: Pin<&mut Self>,
353        cx: &mut Context<'_>,
354        buf: &mut [u8],
355    ) -> Poll<Result<usize, futures::io::Error>> {
356        Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
357    }
358}
359
360impl io::AsyncWrite for Channel {
361    fn poll_write(
362        mut self: Pin<&mut Self>,
363        cx: &mut Context<'_>,
364        buf: &[u8],
365    ) -> Poll<Result<usize, io::Error>> {
366        Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
367    }
368
369    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
370        Pin::new(&mut self.socket).as_mut().poll_flush(cx)
371    }
372
373    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
374        Pin::new(&mut self.socket).as_mut().poll_close(cx)
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use fidl::endpoints::create_request_stream;
382    use futures::{AsyncReadExt, FutureExt, StreamExt};
383    use std::pin::pin;
384
385    #[test]
386    fn test_channel_create_and_write() {
387        let _exec = fasync::TestExecutor::new();
388        let (mut recv, send) = Channel::create();
389
390        let mut buf: [u8; 8] = [0; 8];
391        let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
392
393        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
394        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
395
396        match read_fut.now_or_never() {
397            Some(Ok(4)) => {}
398            x => panic!("Expected Ok(4) from the read, got {x:?}"),
399        };
400        assert_eq!(heart, &buf[0..4]);
401    }
402
403    #[test]
404    fn test_channel_from_fidl() {
405        let _exec = fasync::TestExecutor::new();
406        let empty = bredr::Channel::default();
407        assert!(Channel::try_from(empty).is_err());
408
409        let (remote, _local) = zx::Socket::create_datagram();
410
411        let okay = bredr::Channel {
412            socket: Some(remote),
413            channel_mode: Some(fidl_bt::ChannelMode::Basic),
414            max_tx_sdu_size: Some(1004),
415            ..Default::default()
416        };
417
418        let chan = Channel::try_from(okay).expect("okay channel to be converted");
419
420        assert_eq!(1004, chan.max_tx_size());
421        assert_eq!(&ChannelMode::Basic, chan.channel_mode());
422    }
423
424    #[test]
425    fn test_channel_closed() {
426        let mut exec = fasync::TestExecutor::new();
427
428        let (recv, send) = Channel::create();
429
430        let closed_fut = recv.closed();
431        let mut closed_fut = pin!(closed_fut);
432
433        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
434        assert!(!recv.is_closed());
435
436        drop(send);
437
438        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
439        assert!(recv.is_closed());
440    }
441
442    #[test]
443    fn test_direction_ext() {
444        let mut exec = fasync::TestExecutor::new();
445
446        let (remote, _local) = zx::Socket::create_datagram();
447        let no_ext = bredr::Channel {
448            socket: Some(remote),
449            channel_mode: Some(fidl_bt::ChannelMode::Basic),
450            max_tx_sdu_size: Some(1004),
451            ..Default::default()
452        };
453        let channel = Channel::try_from(no_ext).unwrap();
454
455        assert!(
456            exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
457        );
458        assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
459
460        let (remote, _local) = zx::Socket::create_datagram();
461        let (client_end, mut direction_request_stream) =
462            create_request_stream::<bredr::AudioDirectionExtMarker>();
463        let ext = bredr::Channel {
464            socket: Some(remote),
465            channel_mode: Some(fidl_bt::ChannelMode::Basic),
466            max_tx_sdu_size: Some(1004),
467            ext_direction: Some(client_end),
468            ..Default::default()
469        };
470
471        let channel = Channel::try_from(ext).unwrap();
472
473        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
474        let mut audio_direction_fut = pin!(audio_direction_fut);
475
476        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
477
478        match exec.run_until_stalled(&mut direction_request_stream.next()) {
479            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
480                priority,
481                responder,
482            }))) => {
483                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
484                responder.send(Ok(())).expect("response to send cleanly");
485            }
486            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
487        };
488
489        match exec.run_until_stalled(&mut audio_direction_fut) {
490            Poll::Ready(Ok(())) => {}
491            _x => panic!("Expected ok result from audio direction response"),
492        };
493
494        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
495        let mut audio_direction_fut = pin!(audio_direction_fut);
496
497        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
498
499        match exec.run_until_stalled(&mut direction_request_stream.next()) {
500            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
501                priority,
502                responder,
503            }))) => {
504                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
505                responder
506                    .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
507                    .expect("response to send cleanly");
508            }
509            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
510        };
511
512        match exec.run_until_stalled(&mut audio_direction_fut) {
513            Poll::Ready(Err(_)) => {}
514            _x => panic!("Expected error result from audio direction response"),
515        };
516    }
517
518    #[test]
519    fn test_flush_timeout() {
520        let mut exec = fasync::TestExecutor::new();
521
522        let (remote, _local) = zx::Socket::create_datagram();
523        let no_ext = bredr::Channel {
524            socket: Some(remote),
525            channel_mode: Some(fidl_bt::ChannelMode::Basic),
526            max_tx_sdu_size: Some(1004),
527            flush_timeout: Some(50_000_000), // 50 milliseconds
528            ..Default::default()
529        };
530        let channel = Channel::try_from(no_ext).unwrap();
531
532        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
533
534        // Within 2 milliseconds, doesn't change.
535        let res = exec.run_singlethreaded(
536            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
537        );
538        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
539        let res = exec.run_singlethreaded(
540            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
541        );
542        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
543
544        assert!(
545            exec.run_singlethreaded(
546                channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
547            )
548            .is_err()
549        );
550        assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
551
552        let (remote, _local) = zx::Socket::create_datagram();
553        let (client_end, mut l2cap_request_stream) =
554            create_request_stream::<bredr::L2capParametersExtMarker>();
555        let ext = bredr::Channel {
556            socket: Some(remote),
557            channel_mode: Some(fidl_bt::ChannelMode::Basic),
558            max_tx_sdu_size: Some(1004),
559            flush_timeout: None,
560            ext_l2cap: Some(client_end),
561            ..Default::default()
562        };
563
564        let channel = Channel::try_from(ext).unwrap();
565
566        {
567            let flush_timeout_fut = channel.set_flush_timeout(None);
568            let mut flush_timeout_fut = pin!(flush_timeout_fut);
569
570            // Requesting no change returns right away with no change.
571            match exec.run_until_stalled(&mut flush_timeout_fut) {
572                Poll::Ready(Ok(None)) => {}
573                x => panic!("Expected no flush timeout to not stall, got {:?}", x),
574            }
575        }
576
577        let req_duration = zx::MonotonicDuration::from_millis(42);
578
579        {
580            let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
581            let mut flush_timeout_fut = pin!(flush_timeout_fut);
582
583            assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
584
585            match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
586                Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
587                    request,
588                    responder,
589                }))) => {
590                    assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
591                    // Send a different response
592                    let params = fidl_bt::ChannelParameters {
593                        flush_timeout: Some(50_000_000), // 50ms
594                        ..Default::default()
595                    };
596                    responder.send(&params).expect("response to send cleanly");
597                }
598                x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
599            };
600
601            match exec.run_until_stalled(&mut flush_timeout_fut) {
602                Poll::Ready(Ok(Some(duration))) => {
603                    assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
604                }
605                x => panic!("Expected ready result from params response, got {:?}", x),
606            };
607        }
608
609        // Channel should have recorded the new flush timeout.
610        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
611    }
612
613    #[test]
614    fn test_audio_offload() {
615        let _exec = fasync::TestExecutor::new();
616
617        let (remote, _local) = zx::Socket::create_datagram();
618        let no_ext = bredr::Channel {
619            socket: Some(remote),
620            channel_mode: Some(fidl_bt::ChannelMode::Basic),
621            max_tx_sdu_size: Some(1004),
622            ..Default::default()
623        };
624        let channel = Channel::try_from(no_ext).unwrap();
625
626        assert!(channel.audio_offload().is_none());
627
628        let (remote, _local) = zx::Socket::create_datagram();
629        let (client_end, mut _audio_offload_ext_req_stream) =
630            create_request_stream::<bredr::AudioOffloadExtMarker>();
631        let ext = bredr::Channel {
632            socket: Some(remote),
633            channel_mode: Some(fidl_bt::ChannelMode::Basic),
634            max_tx_sdu_size: Some(1004),
635            ext_audio_offload: Some(client_end),
636            ..Default::default()
637        };
638
639        let channel = Channel::try_from(ext).unwrap();
640
641        let offload_ext = channel.audio_offload();
642        assert!(offload_ext.is_some());
643        // We can get the audio offload multiple times without dropping
644        assert!(channel.audio_offload().is_some());
645        // And with dropping
646        drop(offload_ext);
647        assert!(channel.audio_offload().is_some());
648    }
649
650    #[test]
651    fn channel_async_read() {
652        let mut exec = fasync::TestExecutor::new();
653        let (mut recv, send) = Channel::create();
654
655        // Test `read` with a datagram smaller than the read buffer.
656        let max_tx_size = recv.max_tx_size();
657        let mut read_buf = vec![0; max_tx_size];
658        let mut read_fut = recv.read(&mut read_buf[..]);
659
660        assert!(exec.run_until_stalled(&mut read_fut).is_pending());
661
662        let data = &[0x01, 0x02, 0x03, 0x04];
663        assert_eq!(data.len(), send.write(data).expect("should write successfully"));
664
665        // The read should complete, with the length of the datagram.
666        let read_len = match exec.run_until_stalled(&mut read_fut) {
667            Poll::Ready(Ok(read_len)) => read_len,
668            x => panic!("Expected successful read, got {x:?}"),
669        };
670        assert_eq!(read_len, data.len());
671        assert_eq!(&data[..], &read_buf[..data.len()]);
672
673        // Test `read` with a datagram that is larger than the read buffer.
674        let mut read_buf = [0; 4]; // buffer too small
675        let mut read_fut = recv.read(&mut read_buf);
676
677        let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
678        assert_eq!(
679            oversized_data.len(),
680            send.write(oversized_data).expect("should write successfully")
681        );
682
683        // The read should complete, filling the buffer.
684        let read_len = match exec.run_until_stalled(&mut read_fut) {
685            Poll::Ready(Ok(read_len)) => read_len,
686            x => panic!("Expected successful read, got {x:?}"),
687        };
688        assert_eq!(read_len, read_buf.len());
689        assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
690
691        // The rest of the datagram should be discarded. A subsequent read should be pending.
692        let mut leftover_buf = [0; 1];
693        let mut leftover_fut = recv.read(&mut leftover_buf);
694        assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
695    }
696
697    #[test]
698    fn channel_stream() {
699        let mut exec = fasync::TestExecutor::new();
700        let (mut recv, send) = Channel::create();
701
702        let mut stream_fut = recv.next();
703
704        assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
705
706        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
707        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
708
709        match exec.run_until_stalled(&mut stream_fut) {
710            Poll::Ready(Some(Ok(bytes))) => {
711                assert_eq!(heart.to_vec(), bytes);
712            }
713            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
714        };
715
716        // After the sender is dropped, the stream should terminate.
717        drop(send);
718
719        let mut stream_fut = recv.next();
720        match exec.run_until_stalled(&mut stream_fut) {
721            Poll::Ready(None) => {}
722            x => panic!("Expected None from the stream after close, got {x:?}"),
723        }
724
725        // It should continue to report terminated.
726        assert!(recv.is_terminated());
727    }
728}