Skip to main content

fuchsia_bluetooth/types/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::{ClientEnd, Proxy};
6use fidl_fuchsia_bluetooth as fidl_bt;
7use fidl_fuchsia_bluetooth_bredr as bredr;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::stream::{FusedStream, Stream};
11use futures::{Future, TryFutureExt, io};
12use log::warn;
13use std::fmt;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use crate::error::Error;
19
20/// The Channel mode in use for a L2CAP channel.
21#[derive(PartialEq, Debug, Clone)]
22pub enum ChannelMode {
23    Basic,
24    EnhancedRetransmissionMode,
25    LeCreditBasedFlowControl,
26    EnhancedCreditBasedFlowControl,
27}
28
29impl fmt::Display for ChannelMode {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        match self {
32            ChannelMode::Basic => write!(f, "Basic"),
33            ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
34            ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
35            ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
36        }
37    }
38}
39
40pub enum A2dpDirection {
41    Normal,
42    Source,
43    Sink,
44}
45
46impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
47    fn from(pri: A2dpDirection) -> Self {
48        match pri {
49            A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
50            A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
51            A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
52        }
53    }
54}
55
56impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
57    type Error = Error;
58    fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
59        match fidl {
60            fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
61            fidl_bt::ChannelMode::EnhancedRetransmission => {
62                Ok(ChannelMode::EnhancedRetransmissionMode)
63            }
64            fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
65                Ok(ChannelMode::LeCreditBasedFlowControl)
66            }
67            fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
68                Ok(ChannelMode::EnhancedCreditBasedFlowControl)
69            }
70            x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
71        }
72    }
73}
74
75impl From<ChannelMode> for fidl_bt::ChannelMode {
76    fn from(x: ChannelMode) -> Self {
77        match x {
78            ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
79            ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
80            ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
81            ChannelMode::EnhancedCreditBasedFlowControl => {
82                fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
83            }
84        }
85    }
86}
87
88/// A data channel to a remote Peer. Channels are the primary data transfer mechanism for
89/// Bluetooth profiles and protocols.
90/// Channel currently implements Deref<Target = Socket> to easily access the underlying
91/// socket, and also implements AsyncWrite using a forwarding implementation.
92#[derive(Debug)]
93pub struct Channel {
94    socket: fasync::Socket,
95    mode: ChannelMode,
96    max_tx_size: usize,
97    flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
98    audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
99    l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
100    audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
101    terminated: bool,
102}
103
104impl Channel {
105    /// Attempt to make a Channel from a zircon socket and a Maximum TX size received out of band.
106    /// Returns Err(status) if there is an error.
107    pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
108        Ok(Self::from_socket_infallible(socket, max_tx_size))
109    }
110
111    /// Make a Channel from a zircon socket and a Maximum TX size received out of band.
112    pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
113        Channel {
114            socket: fasync::Socket::from_socket(socket),
115            mode: ChannelMode::Basic,
116            max_tx_size,
117            flush_timeout: Arc::new(Mutex::new(None)),
118            audio_direction_ext: None,
119            l2cap_parameters_ext: None,
120            audio_offload_ext: None,
121            terminated: false,
122        }
123    }
124
125    /// The default max tx size is the default MTU size for L2CAP minus the channel header content.
126    /// See the Bluetooth Core Specification, Vol 3, Part A, Sec 5.1
127    pub const DEFAULT_MAX_TX: usize = 672;
128
129    /// Makes a pair of channels which are connected to each other, used commonly for testing.
130    /// The max_tx_size is set to `Channel::DEFAULT_MAX_TX`.
131    pub fn create() -> (Self, Self) {
132        Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
133    }
134
135    /// Make a pair of channels which are connected to each other, used commonly for testing.
136    /// The maximum transmittable unit is taken from `max_tx_size`.
137    pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
138        let (remote, local) = zx::Socket::create_datagram();
139        (
140            Channel::from_socket(remote, max_tx_size).unwrap(),
141            Channel::from_socket(local, max_tx_size).unwrap(),
142        )
143    }
144
145    /// The maximum transmittable size of a packet, in bytes.
146    /// Trying to send packets larger than this may cause the channel to be closed.
147    pub fn max_tx_size(&self) -> usize {
148        self.max_tx_size
149    }
150
151    pub fn channel_mode(&self) -> &ChannelMode {
152        &self.mode
153    }
154
155    pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
156        self.flush_timeout.lock().clone()
157    }
158
159    /// Returns a future which will set the audio priority of the channel.
160    /// The future will return Err if setting the priority is not supported.
161    pub fn set_audio_priority(
162        &self,
163        dir: A2dpDirection,
164    ) -> impl Future<Output = Result<(), Error>> + use<> {
165        let proxy = self.audio_direction_ext.clone();
166        async move {
167            match proxy {
168                None => return Err(Error::profile("audio priority not supported")),
169                Some(proxy) => proxy
170                    .set_priority(dir.into())
171                    .await?
172                    .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
173            }
174        }
175    }
176
177    /// Attempt to set the flush timeout for this channel.
178    /// If the timeout is not already set within 1ms of `duration`, we attempt to set it using the
179    /// L2cap parameter extension.
180    /// `duration` can be infinite to set packets flushable without a timeout.
181    /// Returns a future that when polled will set the flush timeout and return the new timeout,
182    /// or return an error setting the parameter is not supported.
183    pub fn set_flush_timeout(
184        &self,
185        duration: Option<zx::MonotonicDuration>,
186    ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
187        let flush_timeout = self.flush_timeout.clone();
188        let current = self.flush_timeout.lock().clone();
189        let proxy = self.l2cap_parameters_ext.clone();
190        async move {
191            match (current, duration) {
192                (None, None) => return Ok(None),
193                (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
194                    return Ok(current);
195                }
196                _ => {}
197            };
198            let proxy =
199                proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
200            let parameters = fidl_bt::ChannelParameters {
201                flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
202                ..Default::default()
203            };
204            let new_params = proxy.request_parameters(&parameters).await?;
205            let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
206            *(flush_timeout.lock()) = new_timeout.clone();
207            Ok(new_timeout)
208        }
209    }
210
211    /// Get a copy of the Audio Offload Proxy for this channel, if it exists
212    pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
213        self.audio_offload_ext.clone()
214    }
215
216    pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
217        let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
218        let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
219        close_wait.map_ok(|_o| ())
220    }
221
222    pub fn is_closed<'a>(&'a self) -> bool {
223        self.socket.is_closed()
224    }
225
226    /// Write to the channel.  This will return zx::Status::SHOULD_WAIT if the
227    /// the channel is too full.  Use the poll_write for asynchronous writing.
228    pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
229        self.socket.as_ref().write(bytes)
230    }
231}
232
233impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
234    type Error = zx::Status;
235
236    fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
237        let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
238            Err(e) => {
239                warn!("Unsupported channel mode type: {e:?}");
240                return Err(zx::Status::INTERNAL);
241            }
242            Ok(c) => c,
243        };
244
245        Ok(Self {
246            socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
247            mode: channel,
248            max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
249            flush_timeout: Arc::new(Mutex::new(
250                fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
251            )),
252            audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
253            l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
254            audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
255            terminated: false,
256        })
257    }
258}
259
260impl TryFrom<Channel> for bredr::Channel {
261    type Error = Error;
262
263    fn try_from(channel: Channel) -> Result<Self, Self::Error> {
264        let socket = channel.socket.into_zx_socket();
265        let ext_direction = channel
266            .audio_direction_ext
267            .map(|proxy| {
268                let chan = proxy.into_channel()?;
269                Ok(ClientEnd::new(chan.into()))
270            })
271            .transpose()
272            .map_err(|_: bredr::AudioDirectionExtProxy| {
273                Error::profile("AudioDirection proxy in use")
274            })?;
275        let ext_l2cap = channel
276            .l2cap_parameters_ext
277            .map(|proxy| {
278                let chan = proxy.into_channel()?;
279                Ok(ClientEnd::new(chan.into()))
280            })
281            .transpose()
282            .map_err(|_: bredr::L2capParametersExtProxy| {
283                Error::profile("l2cap parameters proxy in use")
284            })?;
285        let ext_audio_offload = channel
286            .audio_offload_ext
287            .map(|proxy| {
288                let chan = proxy.into_channel()?;
289                Ok(ClientEnd::new(chan.into()))
290            })
291            .transpose()
292            .map_err(|_: bredr::AudioOffloadExtProxy| {
293                Error::profile("audio offload proxy in use")
294            })?;
295        let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
296        Ok(bredr::Channel {
297            socket: Some(socket),
298            channel_mode: Some(channel.mode.into()),
299            max_tx_sdu_size: Some(channel.max_tx_size as u16),
300            ext_direction,
301            flush_timeout,
302            ext_l2cap,
303            ext_audio_offload,
304            ..Default::default()
305        })
306    }
307}
308
309impl Stream for Channel {
310    type Item = Result<Vec<u8>, zx::Status>;
311
312    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
313        if self.terminated {
314            panic!("Channel polled after terminated");
315        }
316
317        let mut res = Vec::<u8>::new();
318        loop {
319            break match self.socket.poll_datagram(cx, &mut res) {
320                // TODO(https://fxbug.dev/42072274): Sometimes sockets return spirious 0 byte packets when polled.
321                // Try again.
322                Poll::Ready(Ok(0)) => continue,
323                Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
324                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
325                    self.terminated = true;
326                    Poll::Ready(None)
327                }
328                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
329                Poll::Pending => Poll::Pending,
330            };
331        }
332    }
333}
334
335impl FusedStream for Channel {
336    fn is_terminated(&self) -> bool {
337        self.terminated
338    }
339}
340
341impl io::AsyncRead for Channel {
342    fn poll_read(
343        mut self: Pin<&mut Self>,
344        cx: &mut Context<'_>,
345        buf: &mut [u8],
346    ) -> Poll<Result<usize, futures::io::Error>> {
347        Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
348    }
349}
350
351impl io::AsyncWrite for Channel {
352    fn poll_write(
353        mut self: Pin<&mut Self>,
354        cx: &mut Context<'_>,
355        buf: &[u8],
356    ) -> Poll<Result<usize, io::Error>> {
357        Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
358    }
359
360    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
361        Pin::new(&mut self.socket).as_mut().poll_flush(cx)
362    }
363
364    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
365        Pin::new(&mut self.socket).as_mut().poll_close(cx)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use fidl::endpoints::create_request_stream;
373    use futures::{AsyncReadExt, FutureExt, StreamExt};
374    use std::pin::pin;
375
376    #[test]
377    fn test_channel_create_and_write() {
378        let _exec = fasync::TestExecutor::new();
379        let (mut recv, send) = Channel::create();
380
381        let mut buf: [u8; 8] = [0; 8];
382        let read_fut = AsyncReadExt::read(&mut recv, &mut buf);
383
384        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
385        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
386
387        match read_fut.now_or_never() {
388            Some(Ok(4)) => {}
389            x => panic!("Expected Ok(4) from the read, got {x:?}"),
390        };
391        assert_eq!(heart, &buf[0..4]);
392    }
393
394    #[test]
395    fn test_channel_from_fidl() {
396        let _exec = fasync::TestExecutor::new();
397        let empty = bredr::Channel::default();
398        assert!(Channel::try_from(empty).is_err());
399
400        let (remote, _local) = zx::Socket::create_datagram();
401
402        let okay = bredr::Channel {
403            socket: Some(remote),
404            channel_mode: Some(fidl_bt::ChannelMode::Basic),
405            max_tx_sdu_size: Some(1004),
406            ..Default::default()
407        };
408
409        let chan = Channel::try_from(okay).expect("okay channel to be converted");
410
411        assert_eq!(1004, chan.max_tx_size());
412        assert_eq!(&ChannelMode::Basic, chan.channel_mode());
413    }
414
415    #[test]
416    fn test_channel_closed() {
417        let mut exec = fasync::TestExecutor::new();
418
419        let (recv, send) = Channel::create();
420
421        let closed_fut = recv.closed();
422        let mut closed_fut = pin!(closed_fut);
423
424        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
425        assert!(!recv.is_closed());
426
427        drop(send);
428
429        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
430        assert!(recv.is_closed());
431    }
432
433    #[test]
434    fn test_direction_ext() {
435        let mut exec = fasync::TestExecutor::new();
436
437        let (remote, _local) = zx::Socket::create_datagram();
438        let no_ext = bredr::Channel {
439            socket: Some(remote),
440            channel_mode: Some(fidl_bt::ChannelMode::Basic),
441            max_tx_sdu_size: Some(1004),
442            ..Default::default()
443        };
444        let channel = Channel::try_from(no_ext).unwrap();
445
446        assert!(
447            exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
448        );
449        assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
450
451        let (remote, _local) = zx::Socket::create_datagram();
452        let (client_end, mut direction_request_stream) =
453            create_request_stream::<bredr::AudioDirectionExtMarker>();
454        let ext = bredr::Channel {
455            socket: Some(remote),
456            channel_mode: Some(fidl_bt::ChannelMode::Basic),
457            max_tx_sdu_size: Some(1004),
458            ext_direction: Some(client_end),
459            ..Default::default()
460        };
461
462        let channel = Channel::try_from(ext).unwrap();
463
464        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
465        let mut audio_direction_fut = pin!(audio_direction_fut);
466
467        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
468
469        match exec.run_until_stalled(&mut direction_request_stream.next()) {
470            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
471                priority,
472                responder,
473            }))) => {
474                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
475                responder.send(Ok(())).expect("response to send cleanly");
476            }
477            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
478        };
479
480        match exec.run_until_stalled(&mut audio_direction_fut) {
481            Poll::Ready(Ok(())) => {}
482            _x => panic!("Expected ok result from audio direction response"),
483        };
484
485        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
486        let mut audio_direction_fut = pin!(audio_direction_fut);
487
488        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
489
490        match exec.run_until_stalled(&mut direction_request_stream.next()) {
491            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
492                priority,
493                responder,
494            }))) => {
495                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
496                responder
497                    .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
498                    .expect("response to send cleanly");
499            }
500            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
501        };
502
503        match exec.run_until_stalled(&mut audio_direction_fut) {
504            Poll::Ready(Err(_)) => {}
505            _x => panic!("Expected error result from audio direction response"),
506        };
507    }
508
509    #[test]
510    fn test_flush_timeout() {
511        let mut exec = fasync::TestExecutor::new();
512
513        let (remote, _local) = zx::Socket::create_datagram();
514        let no_ext = bredr::Channel {
515            socket: Some(remote),
516            channel_mode: Some(fidl_bt::ChannelMode::Basic),
517            max_tx_sdu_size: Some(1004),
518            flush_timeout: Some(50_000_000), // 50 milliseconds
519            ..Default::default()
520        };
521        let channel = Channel::try_from(no_ext).unwrap();
522
523        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
524
525        // Within 2 milliseconds, doesn't change.
526        let res = exec.run_singlethreaded(
527            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
528        );
529        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
530        let res = exec.run_singlethreaded(
531            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
532        );
533        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
534
535        assert!(
536            exec.run_singlethreaded(
537                channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
538            )
539            .is_err()
540        );
541        assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
542
543        let (remote, _local) = zx::Socket::create_datagram();
544        let (client_end, mut l2cap_request_stream) =
545            create_request_stream::<bredr::L2capParametersExtMarker>();
546        let ext = bredr::Channel {
547            socket: Some(remote),
548            channel_mode: Some(fidl_bt::ChannelMode::Basic),
549            max_tx_sdu_size: Some(1004),
550            flush_timeout: None,
551            ext_l2cap: Some(client_end),
552            ..Default::default()
553        };
554
555        let channel = Channel::try_from(ext).unwrap();
556
557        {
558            let flush_timeout_fut = channel.set_flush_timeout(None);
559            let mut flush_timeout_fut = pin!(flush_timeout_fut);
560
561            // Requesting no change returns right away with no change.
562            match exec.run_until_stalled(&mut flush_timeout_fut) {
563                Poll::Ready(Ok(None)) => {}
564                x => panic!("Expected no flush timeout to not stall, got {:?}", x),
565            }
566        }
567
568        let req_duration = zx::MonotonicDuration::from_millis(42);
569
570        {
571            let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
572            let mut flush_timeout_fut = pin!(flush_timeout_fut);
573
574            assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
575
576            match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
577                Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
578                    request,
579                    responder,
580                }))) => {
581                    assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
582                    // Send a different response
583                    let params = fidl_bt::ChannelParameters {
584                        flush_timeout: Some(50_000_000), // 50ms
585                        ..Default::default()
586                    };
587                    responder.send(&params).expect("response to send cleanly");
588                }
589                x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
590            };
591
592            match exec.run_until_stalled(&mut flush_timeout_fut) {
593                Poll::Ready(Ok(Some(duration))) => {
594                    assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
595                }
596                x => panic!("Expected ready result from params response, got {:?}", x),
597            };
598        }
599
600        // Channel should have recorded the new flush timeout.
601        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
602    }
603
604    #[test]
605    fn test_audio_offload() {
606        let _exec = fasync::TestExecutor::new();
607
608        let (remote, _local) = zx::Socket::create_datagram();
609        let no_ext = bredr::Channel {
610            socket: Some(remote),
611            channel_mode: Some(fidl_bt::ChannelMode::Basic),
612            max_tx_sdu_size: Some(1004),
613            ..Default::default()
614        };
615        let channel = Channel::try_from(no_ext).unwrap();
616
617        assert!(channel.audio_offload().is_none());
618
619        let (remote, _local) = zx::Socket::create_datagram();
620        let (client_end, mut _audio_offload_ext_req_stream) =
621            create_request_stream::<bredr::AudioOffloadExtMarker>();
622        let ext = bredr::Channel {
623            socket: Some(remote),
624            channel_mode: Some(fidl_bt::ChannelMode::Basic),
625            max_tx_sdu_size: Some(1004),
626            ext_audio_offload: Some(client_end),
627            ..Default::default()
628        };
629
630        let channel = Channel::try_from(ext).unwrap();
631
632        let offload_ext = channel.audio_offload();
633        assert!(offload_ext.is_some());
634        // We can get the audio offload multiple times without dropping
635        assert!(channel.audio_offload().is_some());
636        // And with dropping
637        drop(offload_ext);
638        assert!(channel.audio_offload().is_some());
639    }
640
641    #[test]
642    fn channel_async_read() {
643        let mut exec = fasync::TestExecutor::new();
644        let (mut recv, send) = Channel::create();
645
646        // Test `read` with a datagram smaller than the read buffer.
647        let max_tx_size = recv.max_tx_size();
648        let mut read_buf = vec![0; max_tx_size];
649        let mut read_fut = recv.read(&mut read_buf[..]);
650
651        assert!(exec.run_until_stalled(&mut read_fut).is_pending());
652
653        let data = &[0x01, 0x02, 0x03, 0x04];
654        assert_eq!(data.len(), send.write(data).expect("should write successfully"));
655
656        // The read should complete, with the length of the datagram.
657        let read_len = match exec.run_until_stalled(&mut read_fut) {
658            Poll::Ready(Ok(read_len)) => read_len,
659            x => panic!("Expected successful read, got {x:?}"),
660        };
661        assert_eq!(read_len, data.len());
662        assert_eq!(&data[..], &read_buf[..data.len()]);
663
664        // Test `read` with a datagram that is larger than the read buffer.
665        let mut read_buf = [0; 4]; // buffer too small
666        let mut read_fut = recv.read(&mut read_buf);
667
668        let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
669        assert_eq!(
670            oversized_data.len(),
671            send.write(oversized_data).expect("should write successfully")
672        );
673
674        // The read should complete, filling the buffer.
675        let read_len = match exec.run_until_stalled(&mut read_fut) {
676            Poll::Ready(Ok(read_len)) => read_len,
677            x => panic!("Expected successful read, got {x:?}"),
678        };
679        assert_eq!(read_len, read_buf.len());
680        assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
681
682        // The rest of the datagram should be discarded. A subsequent read should be pending.
683        let mut leftover_buf = [0; 1];
684        let mut leftover_fut = recv.read(&mut leftover_buf);
685        assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
686    }
687
688    #[test]
689    fn channel_stream() {
690        let mut exec = fasync::TestExecutor::new();
691        let (mut recv, send) = Channel::create();
692
693        let mut stream_fut = recv.next();
694
695        assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
696
697        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
698        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
699
700        match exec.run_until_stalled(&mut stream_fut) {
701            Poll::Ready(Some(Ok(bytes))) => {
702                assert_eq!(heart.to_vec(), bytes);
703            }
704            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
705        };
706
707        // After the sender is dropped, the stream should terminate.
708        drop(send);
709
710        let mut stream_fut = recv.next();
711        match exec.run_until_stalled(&mut stream_fut) {
712            Poll::Ready(None) => {}
713            x => panic!("Expected None from the stream after close, got {x:?}"),
714        }
715
716        // It should continue to report terminated.
717        assert!(recv.is_terminated());
718    }
719}