Skip to main content

bt_avdtp/
stream_endpoint.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
6use fuchsia_async::{DurationExt, Task, TimeoutExt};
7use fuchsia_bluetooth::types::{A2dpDirection, Channel};
8use fuchsia_sync::{Mutex, RwLock};
9use futures::stream::{FusedStream, Stream};
10use futures::{FutureExt, Sink};
11use log::warn;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{Context, Poll};
15use std::{fmt, io};
16use zx::{MonotonicDuration, Status};
17
18use crate::types::{
19    EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
20    ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
21};
22use crate::{Peer, SimpleResponder};
23
24pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
25
26/// The state of a StreamEndpoint.
27#[derive(PartialEq, Debug, Default, Clone, Copy)]
28pub enum StreamState {
29    #[default]
30    Idle,
31    Configured,
32    // An Open command has been accepted, but streams have not been established yet.
33    Opening,
34    Open,
35    Streaming,
36    Closing,
37    Aborting,
38}
39
40/// An AVDTP StreamEndpoint. StreamEndpoints represent a particular capability of the application
41/// to be a source of sink of media. Included here to aid negotiating the stream connection.
42/// See Section 5.3 of the AVDTP 1.3 Specification for more information about the Stream Endpoint
43/// Architecture.
44pub struct StreamEndpoint {
45    /// Local stream endpoint id.  This should be unique per AVDTP Peer.
46    id: StreamEndpointId,
47    /// The type of endpoint this is (TSEP), Source or Sink.
48    endpoint_type: EndpointType,
49    /// The media type this stream represents.
50    media_type: MediaType,
51    /// Current state the stream is in. See Section 6.5 for an overview.
52    state: Arc<Mutex<StreamState>>,
53    /// The media transport channel
54    /// This should be Some(channel) when state is Open or Streaming.
55    transport: Option<Arc<RwLock<Channel>>>,
56    /// True when the MediaStream is held.
57    /// Prevents multiple threads from owning the media stream.
58    stream_held: Arc<Mutex<bool>>,
59    /// The capabilities of this endpoint.
60    capabilities: Vec<ServiceCapability>,
61    /// The remote stream endpoint id.  None if the stream has never been configured.
62    remote_id: Option<StreamEndpointId>,
63    /// The current configuration of this endpoint.  Empty if the stream has never been configured.
64    configuration: Vec<ServiceCapability>,
65    /// Callback that is run whenever the endpoint is updated
66    update_callback: Option<StreamEndpointUpdateCallback>,
67    /// In-progress task. This is only used for the Release procedure which places the state in Closing
68    /// and must wait for the peer to close transport channels.
69    in_progress: Option<Task<()>>,
70}
71
72impl fmt::Debug for StreamEndpoint {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("StreamEndpoint")
75            .field("id", &self.id.0)
76            .field("endpoint_type", &self.endpoint_type)
77            .field("media_type", &self.media_type)
78            .field("state", &self.state)
79            .field("capabilities", &self.capabilities)
80            .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
81            .field("configuration", &self.configuration)
82            .finish()
83    }
84}
85
86impl StreamEndpoint {
87    /// Make a new StreamEndpoint.
88    /// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E).
89    /// StreamEndpoints start in the Idle state.
90    pub fn new(
91        id: u8,
92        media_type: MediaType,
93        endpoint_type: EndpointType,
94        capabilities: Vec<ServiceCapability>,
95    ) -> AvdtpResult<StreamEndpoint> {
96        let seid = StreamEndpointId::try_from(id)?;
97        Ok(StreamEndpoint {
98            id: seid,
99            capabilities,
100            media_type,
101            endpoint_type,
102            state: Default::default(),
103            transport: None,
104            stream_held: Arc::new(Mutex::new(false)),
105            remote_id: None,
106            configuration: vec![],
107            update_callback: None,
108            in_progress: None,
109        })
110    }
111
112    pub fn as_new(&self) -> Self {
113        StreamEndpoint::new(
114            self.id.0,
115            self.media_type.clone(),
116            self.endpoint_type.clone(),
117            self.capabilities.clone(),
118        )
119        .expect("as_new")
120    }
121
122    /// Set the state to the given value and run the `update_callback` afterwards
123    fn set_state(&mut self, state: StreamState) {
124        *self.state.lock() = state;
125        self.update_callback();
126    }
127
128    /// Pass update callback to StreamEndpoint that will be called anytime `StreamEndpoint` is
129    /// modified.
130    pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
131        self.update_callback = callback;
132    }
133
134    fn update_callback(&self) {
135        if let Some(cb) = self.update_callback.as_ref() {
136            cb(self);
137        }
138    }
139
140    /// Build a new StreamEndpoint from a StreamInformation and associated Capabilities.
141    /// This makes it easy to build from AVDTP Discover and GetCapabilities procedures.
142    /// StreamEndpooints start in the Idle state.
143    pub fn from_info(
144        info: &StreamInformation,
145        capabilities: Vec<ServiceCapability>,
146    ) -> StreamEndpoint {
147        StreamEndpoint {
148            id: info.id().clone(),
149            capabilities,
150            media_type: info.media_type().clone(),
151            endpoint_type: info.endpoint_type().clone(),
152            state: Default::default(),
153            transport: None,
154            stream_held: Arc::new(Mutex::new(false)),
155            remote_id: None,
156            configuration: vec![],
157            update_callback: None,
158            in_progress: None,
159        }
160    }
161
162    /// Checks that the state is in the set of states.
163    /// If not, returns Err(ErrorCode::BadState).
164    fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
165        (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
166    }
167
168    /// Attempt to Configure this stream using the capabilities given.
169    /// If the stream is not in an Idle state, fails with Err(InvalidState).
170    /// Used for the Stream Configuration procedure, see Section 6.9
171    pub fn configure(
172        &mut self,
173        remote_id: &StreamEndpointId,
174        capabilities: Vec<ServiceCapability>,
175    ) -> Result<(), (ServiceCategory, ErrorCode)> {
176        self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
177        self.remote_id = Some(remote_id.clone());
178        for cap in &capabilities {
179            if !self
180                .capabilities
181                .iter()
182                .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
183            {
184                return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
185            }
186        }
187        self.configuration = capabilities;
188        self.set_state(StreamState::Configured);
189        Ok(())
190    }
191
192    /// Attempt to reconfigure this stream with the capabilities given.  If any capability is not
193    /// valid to set, fails with the first such category and InvalidCapabilities If the stream is
194    /// not in the Open state, fails with Err((None, BadState)) Used for the Stream Reconfiguration
195    /// procedure, see Section 6.15.
196    pub fn reconfigure(
197        &mut self,
198        mut capabilities: Vec<ServiceCapability>,
199    ) -> Result<(), (ServiceCategory, ErrorCode)> {
200        self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
201        // Only application capabilities are allowed to be reconfigured. See Section 8.11.1
202        if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
203            return Err((cap.category(), ErrorCode::InvalidCapabilities));
204        }
205        // Should only replace the capabilities that have been configured. See Section 8.11.2
206        let to_replace: std::vec::Vec<_> =
207            capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
208        self.configuration.retain(|x| {
209            let disc = std::mem::discriminant(x);
210            !to_replace.contains(&disc)
211        });
212        self.configuration.append(&mut capabilities);
213        self.update_callback();
214        Ok(())
215    }
216
217    /// Get the current configuration of this stream.
218    /// If the stream is not configured, returns None.
219    /// Used for the Steam Get Configuration Procedure, see Section 6.10
220    pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
221        if self.configuration.is_empty() {
222            return None;
223        }
224        Some(&self.configuration)
225    }
226
227    // 100 milliseconds chosen based on end of range testing, to allow for recovery after normal
228    // packet delivery continues.
229    const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
230
231    /// When a L2CAP channel is received after an Open command is accepted, it should be
232    /// delivered via receive_channel.
233    /// Returns true if this Endpoint expects more channels to be established before
234    /// streaming is started.
235    /// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established,
236    /// closing |c|.
237    pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
238        if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
239            return Err(Error::InvalidState);
240        }
241        self.transport = Some(Arc::new(RwLock::new(c)));
242        self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
243        self.stream_held = Arc::new(Mutex::new(false));
244        // TODO(jamuraa, https://fxbug.dev/42051664, https://fxbug.dev/42051776): Reporting and Recovery channels
245        self.set_state(StreamState::Open);
246        Ok(false)
247    }
248
249    /// Begin opening this stream.  The stream must be in a Configured state.
250    /// See Stream Establishment, Section 6.11
251    pub fn establish(&mut self) -> Result<(), ErrorCode> {
252        if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
253            return Err(ErrorCode::BadState);
254        }
255        self.set_state(StreamState::Opening);
256        Ok(())
257    }
258
259    /// Attempts to set audio direction priority of the MediaTransport channel based on
260    /// whether the stream is a source or sink endpoint if `active` is true.  If `active` is
261    /// false, set the priority to Normal instead.  Does nothing on failure.
262    pub fn try_priority(&self, active: bool) {
263        let priority = match (active, &self.endpoint_type) {
264            (false, _) => A2dpDirection::Normal,
265            (true, EndpointType::Source) => A2dpDirection::Source,
266            (true, EndpointType::Sink) => A2dpDirection::Sink,
267        };
268        let fut = match self.transport.as_ref().unwrap().try_read() {
269            None => return,
270            Some(channel) => channel.set_audio_priority(priority).map(|_| ()),
271        };
272        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
273        Task::spawn(fut).detach();
274    }
275
276    /// Attempts to set the flush timeout for the MediaTransport channel, for source endpoints.
277    pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
278        if self.endpoint_type != EndpointType::Source {
279            return;
280        }
281        let fut = match self.transport.as_ref().unwrap().try_write() {
282            None => return,
283            Some(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
284        };
285        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
286        Task::spawn(fut).detach();
287    }
288
289    /// Close this stream.  This procedure will wait until media channels are closed before
290    /// transitioning to Idle.  If the channels are not closed in 3 seconds, we initiate an abort
291    /// procedure with the remote |peer| to force a transition to Idle.
292    pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
293        {
294            let lock = self.state.lock();
295            if *lock != StreamState::Open && *lock != StreamState::Streaming {
296                return responder.reject(ErrorCode::BadState);
297            }
298        }
299        self.set_state(StreamState::Closing);
300        responder.send()?;
301        let release_wait_fut = {
302            // Take our transport and remote id - after this procedure it will be closed.
303            // These must be Some(_) because we are in Open / Streaming state.
304            let seid = self.remote_id.take().unwrap();
305            let transport = self.transport.take().unwrap();
306            let peer = peer.clone();
307            let state = self.state.clone();
308            async move {
309                let Some(transport) = transport.try_read() else {
310                    warn!("unable to lock transport channel, dropping and assuming closed");
311                    *state.lock() = StreamState::Idle;
312                    return;
313                };
314                let closed_fut = transport
315                    .closed()
316                    .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
317                        Err(Status::TIMED_OUT)
318                    });
319                if let Err(Status::TIMED_OUT) = closed_fut.await {
320                    let _ = peer.abort(&seid).await;
321                    *state.lock() = StreamState::Aborting;
322                    // As the initiator of the Abort, we close our channel.
323                    drop(transport);
324                }
325                *state.lock() = StreamState::Idle;
326            }
327        };
328        self.in_progress = Some(Task::local(release_wait_fut));
329        // Closing will return this endpoint to the Idle state, one way or another with no
330        // configuration
331        self.configuration.clear();
332        self.update_callback();
333        Ok(())
334    }
335
336    /// Returns the current state of this endpoint.
337    pub fn state(&self) -> StreamState {
338        *self.state.lock()
339    }
340
341    /// Start this stream.  This can be done only from the Open State.
342    /// Used for the Stream Start procedure, See Section 6.12
343    pub fn start(&mut self) -> Result<(), ErrorCode> {
344        self.state_is(StreamState::Open)?;
345        self.try_priority(true);
346        self.set_state(StreamState::Streaming);
347        Ok(())
348    }
349
350    /// Suspend this stream.  This can be done only from the Streaming state.
351    /// Used for the Stream Suspend procedure, See Section 6.14
352    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
353        self.state_is(StreamState::Streaming)?;
354        self.set_state(StreamState::Open);
355        self.try_priority(false);
356        Ok(())
357    }
358
359    /// Abort this stream.  This can be done from any state, and will always return the state
360    /// to Idle.  We are initiating this procedure so will wait for a response and all our
361    /// channels will be closed.
362    pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
363        if let Some(seid) = self.remote_id.take() {
364            let _ = peer.abort(&seid).await;
365            self.set_state(StreamState::Aborting);
366        }
367        self.abort()
368    }
369
370    /// Abort this stream.  This can be done from any state, and will always return the state
371    /// to Idle.  We are receiving this abort from the peer, and all our channels will close.
372    pub fn abort(&mut self) {
373        self.set_state(StreamState::Aborting);
374        self.configuration.clear();
375        self.remote_id = None;
376        self.transport = None;
377        self.set_state(StreamState::Idle);
378    }
379
380    /// Capabilities of this StreamEndpoint.
381    /// Provides support for the Get Capabilities and Get All Capabilities signaling procedures.
382    /// See Sections 6.7 and 6.8
383    pub fn capabilities(&self) -> &Vec<ServiceCapability> {
384        &self.capabilities
385    }
386
387    /// Returns the CodecType of this StreamEndpoint.
388    /// Returns None if there is no MediaCodec capability in the endpoint.
389    /// Note: a MediaCodec capability is required by all endpoints by the spec.
390    pub fn codec_type(&self) -> Option<&MediaCodecType> {
391        self.capabilities.iter().find_map(|cap| match cap {
392            ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
393            _ => None,
394        })
395    }
396
397    /// Returns the local StreamEndpointId for this endpoint.
398    pub fn local_id(&self) -> &StreamEndpointId {
399        &self.id
400    }
401
402    /// Returns the remote StreamEndpointId for this endpoint, if it's configured.
403    pub fn remote_id(&self) -> Option<&StreamEndpointId> {
404        self.remote_id.as_ref()
405    }
406
407    /// Returns the EndpointType of this endpoint
408    pub fn endpoint_type(&self) -> &EndpointType {
409        &self.endpoint_type
410    }
411
412    /// Make a StreamInformation which represents the current state of this stream.
413    pub fn information(&self) -> StreamInformation {
414        let in_use = self.state_is(StreamState::Idle).is_err();
415        StreamInformation::new(
416            self.id.clone(),
417            in_use,
418            self.media_type.clone(),
419            self.endpoint_type.clone(),
420        )
421    }
422
423    /// Take the media transport channel, which transmits (or receives) any media for this
424    /// StreamEndpoint.  Returns None if the channel is held already, or if the channel has not
425    /// been opened.
426    pub fn take_transport(&mut self) -> Option<MediaStream> {
427        let mut stream_held = self.stream_held.lock();
428        if *stream_held || self.transport.is_none() {
429            return None;
430        }
431
432        *stream_held = true;
433
434        Some(MediaStream::new(
435            self.stream_held.clone(),
436            Arc::downgrade(self.transport.as_ref().unwrap()),
437        ))
438    }
439
440    /// Get an AudioOffloadExtProxy if it exists in the transport channel
441    pub fn audio_offload(&self) -> Option<AudioOffloadExtProxy> {
442        self.transport.as_ref().and_then(|c| c.read().audio_offload())
443    }
444}
445
446/// Represents a media transport stream.
447/// If a sink, produces the bytes that have been delivered from the peer.
448/// If a source, can send bytes using `send`
449pub struct MediaStream {
450    in_use: Arc<Mutex<bool>>,
451    channel: Weak<RwLock<Channel>>,
452    terminated: bool,
453}
454
455impl MediaStream {
456    pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
457        Self { in_use, channel, terminated: false }
458    }
459
460    fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
461        self.channel
462            .upgrade()
463            .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
464    }
465
466    pub fn max_tx_size(&self) -> Result<usize, io::Error> {
467        match self.try_upgrade()?.try_read() {
468            None => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
469            Some(lock) => Ok(lock.max_tx_size()),
470        }
471    }
472}
473
474impl Drop for MediaStream {
475    fn drop(&mut self) {
476        let mut l = self.in_use.lock();
477        *l = false;
478    }
479}
480
481impl Stream for MediaStream {
482    type Item = AvdtpResult<Vec<u8>>;
483
484    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485        let Ok(arc_chan) = self.try_upgrade() else {
486            self.terminated = true;
487            return Poll::Ready(None);
488        };
489        let Some(lock) = arc_chan.try_write() else {
490            self.terminated = true;
491            return Poll::Ready(None);
492        };
493        let mut pin_chan = Pin::new(lock);
494        match pin_chan.as_mut().poll_next(cx) {
495            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
496            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
497            Poll::Ready(None) => {
498                self.terminated = true;
499                Poll::Ready(None)
500            }
501            Poll::Pending => Poll::Pending,
502        }
503    }
504}
505
506impl FusedStream for MediaStream {
507    fn is_terminated(&self) -> bool {
508        self.terminated
509    }
510}
511
512impl Sink<Vec<u8>> for MediaStream {
513    type Error = io::Error;
514
515    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
516        let arc_chan = self.try_upgrade()?;
517        let mut lock = arc_chan
518            .try_write()
519            .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
520        Pin::new(&mut *lock).poll_ready(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
521    }
522
523    fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
524        let arc_chan = self.try_upgrade()?;
525        let mut lock = arc_chan
526            .try_write()
527            .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
528        Pin::new(&mut *lock).start_send(item).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
529    }
530
531    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532        let arc_chan = self.try_upgrade()?;
533        let mut lock = arc_chan
534            .try_write()
535            .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
536        Pin::new(&mut *lock).poll_flush(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
537    }
538
539    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
540        let arc_chan = self.try_upgrade()?;
541        let mut lock = arc_chan
542            .try_write()
543            .ok_or_else(|| io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))?;
544        Pin::new(&mut *lock).poll_close(cx).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::Request;
552    use crate::tests::{expect_remote_recv, setup_peer};
553
554    use assert_matches::assert_matches;
555    use async_utils::PollExt;
556    use fidl::endpoints::create_request_stream;
557    use fidl_fuchsia_bluetooth as fidl_bt;
558    use fidl_fuchsia_bluetooth_bredr as bredr;
559    use fuchsia_async as fasync;
560    use futures::SinkExt;
561    use futures::stream::StreamExt;
562
563    const REMOTE_ID_VAL: u8 = 1;
564    const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
565
566    #[test]
567    fn make() {
568        let s = StreamEndpoint::new(
569            REMOTE_ID_VAL,
570            MediaType::Audio,
571            EndpointType::Sink,
572            vec![ServiceCapability::MediaTransport],
573        );
574        assert!(s.is_ok());
575        let s = s.unwrap();
576        assert_eq!(&StreamEndpointId(1), s.local_id());
577
578        let info = s.information();
579        assert!(!info.in_use());
580
581        let no = StreamEndpoint::new(
582            0,
583            MediaType::Audio,
584            EndpointType::Sink,
585            vec![ServiceCapability::MediaTransport],
586        );
587        assert!(no.is_err());
588    }
589
590    fn establish_stream(s: &mut StreamEndpoint) -> Channel {
591        assert_matches!(s.establish(), Ok(()));
592        let (chan, remote) = Channel::create();
593        assert_matches!(s.receive_channel(chan), Ok(false));
594        remote
595    }
596
597    #[test]
598    fn from_info() {
599        let seid = StreamEndpointId::try_from(5).unwrap();
600        let info =
601            StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
602        let capabilities = vec![ServiceCapability::MediaTransport];
603
604        let endpoint = StreamEndpoint::from_info(&info, capabilities);
605
606        assert_eq!(&seid, endpoint.local_id());
607        assert_eq!(&false, endpoint.information().in_use());
608        assert_eq!(1, endpoint.capabilities().len());
609    }
610
611    #[test]
612    fn codec_type() {
613        let s = StreamEndpoint::new(
614            REMOTE_ID_VAL,
615            MediaType::Audio,
616            EndpointType::Sink,
617            vec![
618                ServiceCapability::MediaTransport,
619                ServiceCapability::MediaCodec {
620                    media_type: MediaType::Audio,
621                    codec_type: MediaCodecType::new(0x40),
622                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
623                },
624            ],
625        )
626        .unwrap();
627
628        assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
629
630        let s = StreamEndpoint::new(
631            REMOTE_ID_VAL,
632            MediaType::Audio,
633            EndpointType::Sink,
634            vec![ServiceCapability::MediaTransport],
635        )
636        .unwrap();
637
638        assert_eq!(None, s.codec_type());
639    }
640
641    fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
642        StreamEndpoint::new(
643            REMOTE_ID_VAL,
644            MediaType::Audio,
645            r#type,
646            vec![
647                ServiceCapability::MediaTransport,
648                ServiceCapability::MediaCodec {
649                    media_type: MediaType::Audio,
650                    codec_type: MediaCodecType::new(0x40),
651                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
652                },
653            ],
654        )
655        .unwrap()
656    }
657
658    #[test]
659    fn stream_configure_reconfigure() {
660        let _exec = fasync::TestExecutor::new();
661        let mut s = test_endpoint(EndpointType::Sink);
662
663        // Can't configure items that aren't in range.
664        assert_matches!(
665            s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
666            Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
667        );
668
669        assert_matches!(
670            s.configure(
671                &REMOTE_ID,
672                vec![
673                    ServiceCapability::MediaTransport,
674                    ServiceCapability::MediaCodec {
675                        media_type: MediaType::Audio,
676                        codec_type: MediaCodecType::new(0x40),
677                        // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
678                        codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
679                    }
680                ]
681            ),
682            Ok(())
683        );
684
685        // Note: we allow endpoints to be configured (and reconfigured) again when they
686        // are only configured, even though this is probably not allowed per the spec.
687
688        // Can't configure while open
689        let _channel = establish_stream(&mut s);
690
691        assert_matches!(
692            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
693            Err((_, ErrorCode::BadState))
694        );
695
696        let reconfiguration = vec![ServiceCapability::MediaCodec {
697            media_type: MediaType::Audio,
698            codec_type: MediaCodecType::new(0x40),
699            // Reconfigure to yet another different codec_extra value.
700            codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
701        }];
702
703        // The new configuration should match the previous one, but with the reconfigured
704        // capabilities updated.
705        let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
706
707        // Reconfiguring while open is fine though.
708        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
709
710        assert_eq!(Some(&new_configuration), s.get_configuration());
711
712        // Can't reconfigure non-application types
713        assert_matches!(
714            s.reconfigure(vec![ServiceCapability::MediaTransport]),
715            Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
716        );
717
718        // Can't configure or reconfigure while streaming
719        assert_matches!(s.start(), Ok(()));
720
721        assert_matches!(
722            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
723            Err((_, ErrorCode::BadState))
724        );
725
726        assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
727
728        assert_matches!(s.suspend(), Ok(()));
729
730        // Reconfigure should be fine again in open state.
731        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
732
733        // Configure is still not allowed.
734        assert_matches!(
735            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
736            Err((_, ErrorCode::BadState))
737        );
738    }
739
740    #[test]
741    fn stream_establishment() {
742        let mut exec = fasync::TestExecutor::new();
743        let mut s = test_endpoint(EndpointType::Sink);
744
745        let (mut remote, transport) = Channel::create();
746
747        // Can't establish before configuring
748        assert_matches!(s.establish(), Err(ErrorCode::BadState));
749
750        // Trying to receive a channel in the wrong state closes the channel
751        assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
752
753        let mut read_fut = remote.next();
754        let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
755        // When the peer is closed, None is returned.
756        assert_matches!(res, None);
757
758        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
759
760        assert_matches!(s.establish(), Ok(()));
761
762        // And we should be able to give a channel now.
763        let (_remote, transport) = Channel::create();
764        assert_matches!(s.receive_channel(transport), Ok(false));
765    }
766
767    fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
768        let (peer, mut signaling) = setup_peer();
769        // Send a close from the other side to produce an event we can respond to.
770        exec.run_until_stalled(&mut signaling.send(vec![0x40, 0x08, 0x04]))
771            .expect("signaling write")
772            .expect("write successful");
773        let mut req_stream = peer.take_request_stream();
774        let mut req_fut = req_stream.next();
775        let complete = exec.run_until_stalled(&mut req_fut);
776        let responder = match complete {
777            Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
778            _ => panic!("Expected a close request"),
779        };
780        (peer, signaling, responder)
781    }
782
783    #[test]
784    fn stream_release_without_abort() {
785        let mut exec = fasync::TestExecutor::new();
786        let mut s = test_endpoint(EndpointType::Sink);
787
788        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
789
790        let remote_transport = establish_stream(&mut s);
791
792        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
793
794        // We expect release to succeed in this state.
795        s.release(responder, &peer).unwrap();
796        // Expect a "yes" response.
797        expect_remote_recv(&[0x42, 0x08], &mut signaling);
798
799        // Close the transport channel by dropping it.
800        drop(remote_transport);
801
802        // After the transport is closed we should transition to Idle.
803        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
804        assert_eq!(s.state(), StreamState::Idle);
805    }
806
807    #[test]
808    fn test_mediastream() {
809        let mut exec = fasync::TestExecutor::new();
810        let mut s = test_endpoint(EndpointType::Sink);
811
812        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
813
814        // Before the stream is opened, we shouldn't be able to take the transport.
815        assert!(s.take_transport().is_none());
816
817        let mut remote_transport = establish_stream(&mut s);
818
819        // Should be able to get the transport from the stream now.
820        let temp_stream = s.take_transport();
821        assert!(temp_stream.is_some());
822
823        // But only once
824        assert!(s.take_transport().is_none());
825
826        // Until you drop the stream
827        drop(temp_stream);
828
829        let media_stream = s.take_transport();
830        assert!(media_stream.is_some());
831        let mut media_stream = media_stream.unwrap();
832
833        // Max TX size is taken from the underlying channel.
834        assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
835
836        // Writing to the media stream should send it through the transport channel.
837        let hearts = vec![0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
838        let mut write_fut = media_stream.send(hearts.clone());
839
840        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(())));
841
842        expect_remote_recv(&hearts, &mut remote_transport);
843
844        // Closing the media stream should close the channel.
845        let mut close_fut = media_stream.close();
846        assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
847        // Note: there's no effect on the other end of the channel when a close occurs,
848        // until the channel is dropped.
849
850        drop(s);
851
852        // Reading from the remote end should return None.
853        let mut read_fut = remote_transport.next();
854        let res = exec.run_until_stalled(&mut read_fut).expect("should be ready");
855        // When the peer is closed, None is returned.
856        assert_matches!(res, None);
857
858        // After the stream is gone, any write should return an Err
859        let mut write_fut = media_stream.send(vec![0xDE, 0xAD]);
860        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
861
862        // After the stream is gone, the stream should be fused done.
863        let mut next_fut = media_stream.next();
864        assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
865
866        assert!(media_stream.is_terminated(), "should be terminated");
867
868        // And the Max TX should be an error.
869        assert_matches!(media_stream.max_tx_size(), Err(_));
870    }
871
872    #[test]
873    fn stream_release_with_abort() {
874        let mut exec = fasync::TestExecutor::new();
875        let mut s = test_endpoint(EndpointType::Sink);
876
877        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
878        let remote_transport = establish_stream(&mut s);
879        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
880
881        // We expect release to succeed in this state, then start the task to wait for the close.
882        s.release(responder, &peer).unwrap();
883        // Expect a "yes" response.
884        expect_remote_recv(&[0x42, 0x08], &mut signaling);
885
886        // Should get an abort
887        let next = std::pin::pin!(signaling.next());
888        let received =
889            exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
890        assert_eq!(0x0A, received[1]);
891        let txlabel = received[0] & 0xF0;
892        // Send a response
893        exec.run_until_stalled(&mut signaling.send(vec![txlabel | 0x02, 0x0A]))
894            .expect("signaling write")
895            .expect("write successful");
896
897        let _ = exec.run_singlethreaded(&mut std::pin::pin!(remote_transport.closed()));
898
899        // We will then end up in Idle.
900        while s.state() != StreamState::Idle {
901            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
902        }
903    }
904
905    #[test]
906    fn start_and_suspend() {
907        let mut exec = fasync::TestExecutor::new();
908        let mut s = test_endpoint(EndpointType::Sink);
909
910        // Can't start or suspend until configured and open.
911        assert_matches!(s.start(), Err(ErrorCode::BadState));
912        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
913
914        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
915
916        assert_matches!(s.start(), Err(ErrorCode::BadState));
917        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
918
919        assert_matches!(s.establish(), Ok(()));
920
921        assert_matches!(s.start(), Err(ErrorCode::BadState));
922        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
923
924        let (remote, local) = zx::Socket::create_datagram();
925        let (client_end, mut direction_request_stream) =
926            create_request_stream::<bredr::AudioDirectionExtMarker>();
927        let ext = bredr::Channel {
928            socket: Some(local),
929            channel_mode: Some(fidl_bt::ChannelMode::Basic),
930            max_tx_sdu_size: Some(1004),
931            ext_direction: Some(client_end),
932            ..Default::default()
933        };
934        let transport = Channel::try_from(ext).unwrap();
935        assert_matches!(s.receive_channel(transport), Ok(false));
936
937        // Should be able to start but not suspend now.
938        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
939        assert_matches!(s.start(), Ok(()));
940
941        match exec.run_until_stalled(&mut direction_request_stream.next()) {
942            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
943                priority,
944                responder,
945            }))) => {
946                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
947                responder.send(Ok(())).expect("response to send cleanly");
948            }
949            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
950        };
951
952        // Are started, so we should be able to suspend but not start again here.
953        assert_matches!(s.start(), Err(ErrorCode::BadState));
954        assert_matches!(s.suspend(), Ok(()));
955
956        match exec.run_until_stalled(&mut direction_request_stream.next()) {
957            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
958                priority,
959                responder,
960            }))) => {
961                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
962                responder.send(Ok(())).expect("response to send cleanly");
963            }
964            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
965        };
966
967        // Now we're suspended, so we can start it again.
968        assert_matches!(s.start(), Ok(()));
969        assert_matches!(s.suspend(), Ok(()));
970
971        // After we close, we are back at idle and can't start / stop
972        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
973
974        {
975            s.release(responder, &peer).unwrap();
976            // Expect a "yes" response.
977            expect_remote_recv(&[0x42, 0x08], &mut signaling);
978            // Close the transport channel by dropping it.
979            drop(remote);
980            while s.state() != StreamState::Idle {
981                let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
982            }
983        }
984
985        // Shouldn't be able to start or suspend again.
986        assert_matches!(s.start(), Err(ErrorCode::BadState));
987        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
988    }
989
990    fn receive_l2cap_params_channel(
991        s: &mut StreamEndpoint,
992    ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
993        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
994        assert_matches!(s.establish(), Ok(()));
995
996        let (remote, local) = zx::Socket::create_datagram();
997        let (client_end, l2cap_params_requests) =
998            create_request_stream::<bredr::L2capParametersExtMarker>();
999        let ext = bredr::Channel {
1000            socket: Some(local),
1001            channel_mode: Some(fidl_bt::ChannelMode::Basic),
1002            max_tx_sdu_size: Some(1004),
1003            ext_l2cap: Some(client_end),
1004            ..Default::default()
1005        };
1006        let transport = Channel::try_from(ext).unwrap();
1007        assert_matches!(s.receive_channel(transport), Ok(false));
1008        (remote, l2cap_params_requests)
1009    }
1010
1011    #[test]
1012    fn sets_flush_timeout_for_source_transports() {
1013        let mut exec = fasync::TestExecutor::new();
1014        let mut s = test_endpoint(EndpointType::Source);
1015        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1016
1017        // Should request to set the flush timeout.
1018        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1019            Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1020                request,
1021                responder,
1022            }))) => {
1023                assert_eq!(
1024                    Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1025                    request.flush_timeout
1026                );
1027                responder.send(&request).expect("response to send cleanly");
1028            }
1029            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1030        };
1031    }
1032
1033    #[test]
1034    fn no_flush_timeout_for_sink_transports() {
1035        let mut exec = fasync::TestExecutor::new();
1036        let mut s = test_endpoint(EndpointType::Sink);
1037        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1038
1039        // Should NOT request to set the flush timeout.
1040        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1041            Poll::Pending => {}
1042            x => panic!("Expected no request to set flush timeout, got {:?}", x),
1043        };
1044    }
1045
1046    #[test]
1047    fn get_configuration() {
1048        let mut s = test_endpoint(EndpointType::Sink);
1049
1050        // Can't get configuration if we aren't configured.
1051        assert!(s.get_configuration().is_none());
1052
1053        let config = vec![
1054            ServiceCapability::MediaTransport,
1055            ServiceCapability::MediaCodec {
1056                media_type: MediaType::Audio,
1057                codec_type: MediaCodecType::new(0),
1058                // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
1059                codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1060            },
1061        ];
1062
1063        assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1064
1065        match s.get_configuration() {
1066            Some(c) => assert_eq!(&config, c),
1067            x => panic!("Expected Ok from get_configuration but got {:?}", x),
1068        };
1069
1070        // Abort this stream, putting it back to the idle state.
1071        s.abort();
1072
1073        assert!(s.get_configuration().is_none());
1074    }
1075
1076    use std::sync::atomic::{AtomicUsize, Ordering};
1077
1078    /// Create a callback that tracks how many times it has been called
1079    fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1080        let call_count = Arc::new(AtomicUsize::new(0));
1081        let call_count_reader = call_count.clone();
1082        let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1083            let _ = call_count.fetch_add(1, Ordering::SeqCst);
1084        });
1085        (Some(count_cb), call_count_reader)
1086    }
1087
1088    /// Test that the update callback is run at least once for all methods that mutate the state of
1089    /// the StreamEndpoint. This is done through an atomic counter in the callback that increments
1090    /// when the callback is run.
1091    ///
1092    /// Note that the _results_ of calling these mutating methods on the state of StreamEndpoint are
1093    /// not validated here. They are validated in other tests.
1094    #[test]
1095    fn update_callback() {
1096        // Need an executor to make a socket
1097        let _exec = fasync::TestExecutor::new();
1098        let mut s = test_endpoint(EndpointType::Sink);
1099        let (cb, call_count) = call_count_callback();
1100        s.set_update_callback(cb);
1101
1102        s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1103            .expect("Configure to succeed in test");
1104        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1105        call_count.store(0, Ordering::SeqCst); // clear call count
1106
1107        s.establish().expect("Establish to succeed in test");
1108        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1109        call_count.store(0, Ordering::SeqCst); // clear call count
1110
1111        let (_, transport) = Channel::create();
1112        assert_eq!(
1113            s.receive_channel(transport).expect("Receive channel to succeed in test"),
1114            false
1115        );
1116        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1117        call_count.store(0, Ordering::SeqCst); // clear call count
1118
1119        s.start().expect("Start to succeed in test");
1120        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1121        call_count.store(0, Ordering::SeqCst); // clear call count
1122
1123        s.suspend().expect("Suspend to succeed in test");
1124        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1125        call_count.store(0, Ordering::SeqCst); // clear call count
1126
1127        s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1128        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1129        call_count.store(0, Ordering::SeqCst); // clear call count
1130
1131        // Abort this stream, putting it back to the idle state.
1132        s.abort();
1133    }
1134}