bt_avdtp/
stream_endpoint.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
6use fuchsia_async::{DurationExt, Task, TimeoutExt};
7use fuchsia_bluetooth::types::{A2dpDirection, Channel};
8use fuchsia_sync::Mutex;
9use futures::stream::{FusedStream, Stream};
10use futures::{FutureExt, io};
11use log::warn;
12use std::fmt;
13use std::pin::Pin;
14use std::sync::{Arc, RwLock, Weak};
15use std::task::{Context, Poll};
16use zx::{MonotonicDuration, Status};
17
18use crate::types::{
19    EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
20    ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
21};
22use crate::{Peer, SimpleResponder};
23
24pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
25
26/// The state of a StreamEndpoint.
27#[derive(PartialEq, Debug, Default, Clone, Copy)]
28pub enum StreamState {
29    #[default]
30    Idle,
31    Configured,
32    // An Open command has been accepted, but streams have not been established yet.
33    Opening,
34    Open,
35    Streaming,
36    Closing,
37    Aborting,
38}
39
40/// An AVDTP StreamEndpoint. StreamEndpoints represent a particular capability of the application
41/// to be a source of sink of media. Included here to aid negotiating the stream connection.
42/// See Section 5.3 of the AVDTP 1.3 Specification for more information about the Stream Endpoint
43/// Architecture.
44pub struct StreamEndpoint {
45    /// Local stream endpoint id.  This should be unique per AVDTP Peer.
46    id: StreamEndpointId,
47    /// The type of endpoint this is (TSEP), Source or Sink.
48    endpoint_type: EndpointType,
49    /// The media type this stream represents.
50    media_type: MediaType,
51    /// Current state the stream is in. See Section 6.5 for an overview.
52    state: Arc<Mutex<StreamState>>,
53    /// The media transport channel
54    /// This should be Some(channel) when state is Open or Streaming.
55    transport: Option<Arc<RwLock<Channel>>>,
56    /// True when the MediaStream is held.
57    /// Prevents multiple threads from owning the media stream.
58    stream_held: Arc<Mutex<bool>>,
59    /// The capabilities of this endpoint.
60    capabilities: Vec<ServiceCapability>,
61    /// The remote stream endpoint id.  None if the stream has never been configured.
62    remote_id: Option<StreamEndpointId>,
63    /// The current configuration of this endpoint.  Empty if the stream has never been configured.
64    configuration: Vec<ServiceCapability>,
65    /// Callback that is run whenever the endpoint is updated
66    update_callback: Option<StreamEndpointUpdateCallback>,
67    /// In-progress task. This is only used for the Release procedure which places the state in Closing
68    /// and must wait for the peer to close transport channels.
69    in_progress: Option<Task<()>>,
70}
71
72impl fmt::Debug for StreamEndpoint {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("StreamEndpoint")
75            .field("id", &self.id.0)
76            .field("endpoint_type", &self.endpoint_type)
77            .field("media_type", &self.media_type)
78            .field("state", &self.state)
79            .field("capabilities", &self.capabilities)
80            .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
81            .field("configuration", &self.configuration)
82            .finish()
83    }
84}
85
86impl StreamEndpoint {
87    /// Make a new StreamEndpoint.
88    /// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E).
89    /// StreamEndpoints start in the Idle state.
90    pub fn new(
91        id: u8,
92        media_type: MediaType,
93        endpoint_type: EndpointType,
94        capabilities: Vec<ServiceCapability>,
95    ) -> AvdtpResult<StreamEndpoint> {
96        let seid = StreamEndpointId::try_from(id)?;
97        Ok(StreamEndpoint {
98            id: seid,
99            capabilities,
100            media_type,
101            endpoint_type,
102            state: Default::default(),
103            transport: None,
104            stream_held: Arc::new(Mutex::new(false)),
105            remote_id: None,
106            configuration: vec![],
107            update_callback: None,
108            in_progress: None,
109        })
110    }
111
112    pub fn as_new(&self) -> Self {
113        StreamEndpoint::new(
114            self.id.0,
115            self.media_type.clone(),
116            self.endpoint_type.clone(),
117            self.capabilities.clone(),
118        )
119        .expect("as_new")
120    }
121
122    /// Set the state to the given value and run the `update_callback` afterwards
123    fn set_state(&mut self, state: StreamState) {
124        *self.state.lock() = state;
125        self.update_callback();
126    }
127
128    /// Pass update callback to StreamEndpoint that will be called anytime `StreamEndpoint` is
129    /// modified.
130    pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
131        self.update_callback = callback;
132    }
133
134    fn update_callback(&self) {
135        if let Some(cb) = self.update_callback.as_ref() {
136            cb(self);
137        }
138    }
139
140    /// Build a new StreamEndpoint from a StreamInformation and associated Capabilities.
141    /// This makes it easy to build from AVDTP Discover and GetCapabilities procedures.
142    /// StreamEndpooints start in the Idle state.
143    pub fn from_info(
144        info: &StreamInformation,
145        capabilities: Vec<ServiceCapability>,
146    ) -> StreamEndpoint {
147        StreamEndpoint {
148            id: info.id().clone(),
149            capabilities,
150            media_type: info.media_type().clone(),
151            endpoint_type: info.endpoint_type().clone(),
152            state: Default::default(),
153            transport: None,
154            stream_held: Arc::new(Mutex::new(false)),
155            remote_id: None,
156            configuration: vec![],
157            update_callback: None,
158            in_progress: None,
159        }
160    }
161
162    /// Checks that the state is in the set of states.
163    /// If not, returns Err(ErrorCode::BadState).
164    fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
165        (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
166    }
167
168    /// Attempt to Configure this stream using the capabilities given.
169    /// If the stream is not in an Idle state, fails with Err(InvalidState).
170    /// Used for the Stream Configuration procedure, see Section 6.9
171    pub fn configure(
172        &mut self,
173        remote_id: &StreamEndpointId,
174        capabilities: Vec<ServiceCapability>,
175    ) -> Result<(), (ServiceCategory, ErrorCode)> {
176        self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
177        self.remote_id = Some(remote_id.clone());
178        for cap in &capabilities {
179            if !self
180                .capabilities
181                .iter()
182                .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
183            {
184                return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
185            }
186        }
187        self.configuration = capabilities;
188        self.set_state(StreamState::Configured);
189        Ok(())
190    }
191
192    /// Attempt to reconfigure this stream with the capabilities given.  If any capability is not
193    /// valid to set, fails with the first such category and InvalidCapabilities If the stream is
194    /// not in the Open state, fails with Err((None, BadState)) Used for the Stream Reconfiguration
195    /// procedure, see Section 6.15.
196    pub fn reconfigure(
197        &mut self,
198        mut capabilities: Vec<ServiceCapability>,
199    ) -> Result<(), (ServiceCategory, ErrorCode)> {
200        self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
201        // Only application capabilities are allowed to be reconfigured. See Section 8.11.1
202        if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
203            return Err((cap.category(), ErrorCode::InvalidCapabilities));
204        }
205        // Should only replace the capabilities that have been configured. See Section 8.11.2
206        let to_replace: std::vec::Vec<_> =
207            capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
208        self.configuration.retain(|x| {
209            let disc = std::mem::discriminant(x);
210            !to_replace.contains(&disc)
211        });
212        self.configuration.append(&mut capabilities);
213        self.update_callback();
214        Ok(())
215    }
216
217    /// Get the current configuration of this stream.
218    /// If the stream is not configured, returns None.
219    /// Used for the Steam Get Configuration Procedure, see Section 6.10
220    pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
221        if self.configuration.is_empty() {
222            return None;
223        }
224        Some(&self.configuration)
225    }
226
227    // 100 milliseconds chosen based on end of range testing, to allow for recovery after normal
228    // packet delivery continues.
229    const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
230
231    /// When a L2CAP channel is received after an Open command is accepted, it should be
232    /// delivered via receive_channel.
233    /// Returns true if this Endpoint expects more channels to be established before
234    /// streaming is started.
235    /// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established,
236    /// closing |c|.
237    pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
238        if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
239            return Err(Error::InvalidState);
240        }
241        self.transport = Some(Arc::new(RwLock::new(c)));
242        self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
243        self.stream_held = Arc::new(Mutex::new(false));
244        // TODO(jamuraa, https://fxbug.dev/42051664, https://fxbug.dev/42051776): Reporting and Recovery channels
245        self.set_state(StreamState::Open);
246        Ok(false)
247    }
248
249    /// Begin opening this stream.  The stream must be in a Configured state.
250    /// See Stream Establishment, Section 6.11
251    pub fn establish(&mut self) -> Result<(), ErrorCode> {
252        if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
253            return Err(ErrorCode::BadState);
254        }
255        self.set_state(StreamState::Opening);
256        Ok(())
257    }
258
259    /// Attempts to set audio direction priority of the MediaTransport channel based on
260    /// whether the stream is a source or sink endpoint if `active` is true.  If `active` is
261    /// false, set the priority to Normal instead.  Does nothing on failure.
262    pub fn try_priority(&self, active: bool) {
263        let priority = match (active, &self.endpoint_type) {
264            (false, _) => A2dpDirection::Normal,
265            (true, EndpointType::Source) => A2dpDirection::Source,
266            (true, EndpointType::Sink) => A2dpDirection::Sink,
267        };
268        let fut = match self.transport.as_ref().unwrap().try_read() {
269            Err(_) => return,
270            Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
271        };
272        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
273        Task::spawn(fut).detach();
274    }
275
276    /// Attempts to set the flush timeout for the MediaTransport channel, for source endpoints.
277    pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
278        if self.endpoint_type != EndpointType::Source {
279            return;
280        }
281        let fut = match self.transport.as_ref().unwrap().try_write() {
282            Err(_) => return,
283            Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
284        };
285        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
286        Task::spawn(fut).detach();
287    }
288
289    /// Close this stream.  This procedure will wait until media channels are closed before
290    /// transitioning to Idle.  If the channels are not closed in 3 seconds, we initiate an abort
291    /// procedure with the remote |peer| to force a transition to Idle.
292    pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
293        {
294            let lock = self.state.lock();
295            if *lock != StreamState::Open && *lock != StreamState::Streaming {
296                return responder.reject(ErrorCode::BadState);
297            }
298        }
299        self.set_state(StreamState::Closing);
300        responder.send()?;
301        let release_wait_fut = {
302            // Take our transport and remote id - after this procedure it will be closed.
303            // These must be Some(_) because we are in Open / Streaming state.
304            let seid = self.remote_id.take().unwrap();
305            let transport = self.transport.take().unwrap();
306            let peer = peer.clone();
307            let state = self.state.clone();
308            async move {
309                let Ok(transport) = transport.try_read() else {
310                    warn!("unable to lock transport channel, dropping and assuming closed");
311                    *state.lock() = StreamState::Idle;
312                    return;
313                };
314                let closed_fut = transport
315                    .closed()
316                    .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
317                        Err(Status::TIMED_OUT)
318                    });
319                if let Err(Status::TIMED_OUT) = closed_fut.await {
320                    let _ = peer.abort(&seid).await;
321                    *state.lock() = StreamState::Aborting;
322                    // As the initiator of the Abort, we close our channel.
323                    drop(transport);
324                }
325                *state.lock() = StreamState::Idle;
326            }
327        };
328        self.in_progress = Some(Task::local(release_wait_fut));
329        // Closing will return this endpoint to the Idle state, one way or another with no
330        // configuration
331        self.configuration.clear();
332        self.update_callback();
333        Ok(())
334    }
335
336    /// Returns the current state of this endpoint.
337    pub fn state(&self) -> StreamState {
338        *self.state.lock()
339    }
340
341    /// Start this stream.  This can be done only from the Open State.
342    /// Used for the Stream Start procedure, See Section 6.12
343    pub fn start(&mut self) -> Result<(), ErrorCode> {
344        self.state_is(StreamState::Open)?;
345        self.try_priority(true);
346        self.set_state(StreamState::Streaming);
347        Ok(())
348    }
349
350    /// Suspend this stream.  This can be done only from the Streaming state.
351    /// Used for the Stream Suspend procedure, See Section 6.14
352    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
353        self.state_is(StreamState::Streaming)?;
354        self.set_state(StreamState::Open);
355        self.try_priority(false);
356        Ok(())
357    }
358
359    /// Abort this stream.  This can be done from any state, and will always return the state
360    /// to Idle.  We are initiating this procedure so will wait for a response and all our
361    /// channels will be closed.
362    pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
363        if let Some(seid) = self.remote_id.take() {
364            let _ = peer.abort(&seid).await;
365            self.set_state(StreamState::Aborting);
366        }
367        self.abort()
368    }
369
370    /// Abort this stream.  This can be done from any state, and will always return the state
371    /// to Idle.  We are receiving this abort from the peer, and all our channels will close.
372    pub fn abort(&mut self) {
373        self.set_state(StreamState::Aborting);
374        self.configuration.clear();
375        self.remote_id = None;
376        self.transport = None;
377        self.set_state(StreamState::Idle);
378    }
379
380    /// Capabilities of this StreamEndpoint.
381    /// Provides support for the Get Capabilities and Get All Capabilities signaling procedures.
382    /// See Sections 6.7 and 6.8
383    pub fn capabilities(&self) -> &Vec<ServiceCapability> {
384        &self.capabilities
385    }
386
387    /// Returns the CodecType of this StreamEndpoint.
388    /// Returns None if there is no MediaCodec capability in the endpoint.
389    /// Note: a MediaCodec capability is required by all endpoints by the spec.
390    pub fn codec_type(&self) -> Option<&MediaCodecType> {
391        self.capabilities.iter().find_map(|cap| match cap {
392            ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
393            _ => None,
394        })
395    }
396
397    /// Returns the local StreamEndpointId for this endpoint.
398    pub fn local_id(&self) -> &StreamEndpointId {
399        &self.id
400    }
401
402    /// Returns the remote StreamEndpointId for this endpoint, if it's configured.
403    pub fn remote_id(&self) -> Option<&StreamEndpointId> {
404        self.remote_id.as_ref()
405    }
406
407    /// Returns the EndpointType of this endpoint
408    pub fn endpoint_type(&self) -> &EndpointType {
409        &self.endpoint_type
410    }
411
412    /// Make a StreamInformation which represents the current state of this stream.
413    pub fn information(&self) -> StreamInformation {
414        let in_use = self.state_is(StreamState::Idle).is_err();
415        StreamInformation::new(
416            self.id.clone(),
417            in_use,
418            self.media_type.clone(),
419            self.endpoint_type.clone(),
420        )
421    }
422
423    /// Take the media transport channel, which transmits (or receives) any media for this
424    /// StreamEndpoint.  Returns None if the channel is held already, or if the channel has not
425    /// been opened.
426    pub fn take_transport(&mut self) -> Option<MediaStream> {
427        let mut stream_held = self.stream_held.lock();
428        if *stream_held || self.transport.is_none() {
429            return None;
430        }
431
432        *stream_held = true;
433
434        Some(MediaStream::new(
435            self.stream_held.clone(),
436            Arc::downgrade(self.transport.as_ref().unwrap()),
437        ))
438    }
439
440    /// Get an AudioOffloadExtProxy if it exists in the transport channel
441    pub fn audio_offload(&self) -> Option<AudioOffloadExtProxy> {
442        self.transport.as_ref().and_then(|c| c.read().unwrap().audio_offload())
443    }
444}
445
446/// Represents a media transport stream.
447/// If a sink, produces the bytes that have been delivered from the peer.
448/// If a source, can send bytes using `send`
449pub struct MediaStream {
450    in_use: Arc<Mutex<bool>>,
451    channel: Weak<RwLock<Channel>>,
452    terminated: bool,
453}
454
455impl MediaStream {
456    pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
457        Self { in_use, channel, terminated: false }
458    }
459
460    fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
461        self.channel
462            .upgrade()
463            .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
464    }
465
466    pub fn max_tx_size(&self) -> Result<usize, io::Error> {
467        match self.try_upgrade()?.try_read() {
468            Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
469            Ok(lock) => Ok(lock.max_tx_size()),
470        }
471    }
472}
473
474impl Drop for MediaStream {
475    fn drop(&mut self) {
476        let mut l = self.in_use.lock();
477        *l = false;
478    }
479}
480
481impl Stream for MediaStream {
482    type Item = AvdtpResult<Vec<u8>>;
483
484    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485        let Ok(arc_chan) = self.try_upgrade() else {
486            self.terminated = true;
487            return Poll::Ready(None);
488        };
489        let Ok(lock) = arc_chan.try_write() else {
490            self.terminated = true;
491            return Poll::Ready(None);
492        };
493        let mut pin_chan = Pin::new(lock);
494        match pin_chan.as_mut().poll_next(cx) {
495            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
496            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
497            Poll::Ready(None) => {
498                self.terminated = true;
499                Poll::Ready(None)
500            }
501            Poll::Pending => Poll::Pending,
502        }
503    }
504}
505
506impl FusedStream for MediaStream {
507    fn is_terminated(&self) -> bool {
508        self.terminated
509    }
510}
511
512impl io::AsyncWrite for MediaStream {
513    fn poll_write(
514        self: Pin<&mut Self>,
515        cx: &mut Context<'_>,
516        buf: &[u8],
517    ) -> Poll<Result<usize, io::Error>> {
518        let arc_chan = match self.try_upgrade() {
519            Err(e) => return Poll::Ready(Err(e)),
520            Ok(c) => c,
521        };
522        let lock = match arc_chan.try_write() {
523            Err(_) => {
524                return Poll::Ready(Err(io::Error::new(
525                    io::ErrorKind::WouldBlock,
526                    "couldn't lock",
527                )));
528            }
529            Ok(lock) => lock,
530        };
531        let mut pin_chan = Pin::new(lock);
532        pin_chan.as_mut().poll_write(cx, buf)
533    }
534
535    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
536        let arc_chan = match self.try_upgrade() {
537            Err(e) => return Poll::Ready(Err(e)),
538            Ok(c) => c,
539        };
540        let lock = match arc_chan.try_write() {
541            Err(_) => {
542                return Poll::Ready(Err(io::Error::new(
543                    io::ErrorKind::WouldBlock,
544                    "couldn't lock",
545                )));
546            }
547            Ok(lock) => lock,
548        };
549        let mut pin_chan = Pin::new(lock);
550        pin_chan.as_mut().poll_flush(cx)
551    }
552
553    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
554        let arc_chan = match self.try_upgrade() {
555            Err(e) => return Poll::Ready(Err(e)),
556            Ok(c) => c,
557        };
558        let lock = match arc_chan.try_write() {
559            Err(_) => {
560                return Poll::Ready(Err(io::Error::new(
561                    io::ErrorKind::WouldBlock,
562                    "couldn't lock",
563                )));
564            }
565            Ok(lock) => lock,
566        };
567        let mut pin_chan = Pin::new(lock);
568        pin_chan.as_mut().poll_close(cx)
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use crate::Request;
576    use crate::tests::{expect_remote_recv, setup_peer};
577
578    use assert_matches::assert_matches;
579    use fidl::endpoints::create_request_stream;
580    use futures::io::AsyncWriteExt;
581    use futures::stream::StreamExt;
582    use {
583        fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
584        fuchsia_async as fasync,
585    };
586
587    const REMOTE_ID_VAL: u8 = 1;
588    const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
589
590    #[test]
591    fn make() {
592        let s = StreamEndpoint::new(
593            REMOTE_ID_VAL,
594            MediaType::Audio,
595            EndpointType::Sink,
596            vec![ServiceCapability::MediaTransport],
597        );
598        assert!(s.is_ok());
599        let s = s.unwrap();
600        assert_eq!(&StreamEndpointId(1), s.local_id());
601
602        let info = s.information();
603        assert!(!info.in_use());
604
605        let no = StreamEndpoint::new(
606            0,
607            MediaType::Audio,
608            EndpointType::Sink,
609            vec![ServiceCapability::MediaTransport],
610        );
611        assert!(no.is_err());
612    }
613
614    fn establish_stream(s: &mut StreamEndpoint) -> Channel {
615        assert_matches!(s.establish(), Ok(()));
616        let (chan, remote) = Channel::create();
617        assert_matches!(s.receive_channel(chan), Ok(false));
618        remote
619    }
620
621    #[test]
622    fn from_info() {
623        let seid = StreamEndpointId::try_from(5).unwrap();
624        let info =
625            StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
626        let capabilities = vec![ServiceCapability::MediaTransport];
627
628        let endpoint = StreamEndpoint::from_info(&info, capabilities);
629
630        assert_eq!(&seid, endpoint.local_id());
631        assert_eq!(&false, endpoint.information().in_use());
632        assert_eq!(1, endpoint.capabilities().len());
633    }
634
635    #[test]
636    fn codec_type() {
637        let s = StreamEndpoint::new(
638            REMOTE_ID_VAL,
639            MediaType::Audio,
640            EndpointType::Sink,
641            vec![
642                ServiceCapability::MediaTransport,
643                ServiceCapability::MediaCodec {
644                    media_type: MediaType::Audio,
645                    codec_type: MediaCodecType::new(0x40),
646                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
647                },
648            ],
649        )
650        .unwrap();
651
652        assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
653
654        let s = StreamEndpoint::new(
655            REMOTE_ID_VAL,
656            MediaType::Audio,
657            EndpointType::Sink,
658            vec![ServiceCapability::MediaTransport],
659        )
660        .unwrap();
661
662        assert_eq!(None, s.codec_type());
663    }
664
665    fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
666        StreamEndpoint::new(
667            REMOTE_ID_VAL,
668            MediaType::Audio,
669            r#type,
670            vec![
671                ServiceCapability::MediaTransport,
672                ServiceCapability::MediaCodec {
673                    media_type: MediaType::Audio,
674                    codec_type: MediaCodecType::new(0x40),
675                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
676                },
677            ],
678        )
679        .unwrap()
680    }
681
682    #[test]
683    fn stream_configure_reconfigure() {
684        let _exec = fasync::TestExecutor::new();
685        let mut s = test_endpoint(EndpointType::Sink);
686
687        // Can't configure items that aren't in range.
688        assert_matches!(
689            s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
690            Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
691        );
692
693        assert_matches!(
694            s.configure(
695                &REMOTE_ID,
696                vec![
697                    ServiceCapability::MediaTransport,
698                    ServiceCapability::MediaCodec {
699                        media_type: MediaType::Audio,
700                        codec_type: MediaCodecType::new(0x40),
701                        // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
702                        codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
703                    }
704                ]
705            ),
706            Ok(())
707        );
708
709        // Note: we allow endpoints to be configured (and reconfigured) again when they
710        // are only configured, even though this is probably not allowed per the spec.
711
712        // Can't configure while open
713        let _channel = establish_stream(&mut s);
714
715        assert_matches!(
716            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
717            Err((_, ErrorCode::BadState))
718        );
719
720        let reconfiguration = vec![ServiceCapability::MediaCodec {
721            media_type: MediaType::Audio,
722            codec_type: MediaCodecType::new(0x40),
723            // Reconfigure to yet another different codec_extra value.
724            codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
725        }];
726
727        // The new configuration should match the previous one, but with the reconfigured
728        // capabilities updated.
729        let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
730
731        // Reconfiguring while open is fine though.
732        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
733
734        assert_eq!(Some(&new_configuration), s.get_configuration());
735
736        // Can't reconfigure non-application types
737        assert_matches!(
738            s.reconfigure(vec![ServiceCapability::MediaTransport]),
739            Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
740        );
741
742        // Can't configure or reconfigure while streaming
743        assert_matches!(s.start(), Ok(()));
744
745        assert_matches!(
746            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
747            Err((_, ErrorCode::BadState))
748        );
749
750        assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
751
752        assert_matches!(s.suspend(), Ok(()));
753
754        // Reconfigure should be fine again in open state.
755        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
756
757        // Configure is still not allowed.
758        assert_matches!(
759            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
760            Err((_, ErrorCode::BadState))
761        );
762    }
763
764    #[test]
765    fn stream_establishment() {
766        let _exec = fasync::TestExecutor::new();
767        let mut s = test_endpoint(EndpointType::Sink);
768
769        let (remote, transport) = Channel::create();
770
771        // Can't establish before configuring
772        assert_matches!(s.establish(), Err(ErrorCode::BadState));
773
774        // Trying to receive a channel in the wrong state closes the channel
775        assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
776
777        let buf: &mut [u8] = &mut [0; 1];
778
779        assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
780
781        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
782
783        assert_matches!(s.establish(), Ok(()));
784
785        // And we should be able to give a channel now.
786        let (_remote, transport) = Channel::create();
787        assert_matches!(s.receive_channel(transport), Ok(false));
788    }
789
790    fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
791        let (peer, signaling) = setup_peer();
792        // Send a close from the other side to produce an event we can respond to.
793        let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
794        let mut req_stream = peer.take_request_stream();
795        let mut req_fut = req_stream.next();
796        let complete = exec.run_until_stalled(&mut req_fut);
797        let responder = match complete {
798            Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
799            _ => panic!("Expected a close request"),
800        };
801        (peer, signaling, responder)
802    }
803
804    #[test]
805    fn stream_release_without_abort() {
806        let mut exec = fasync::TestExecutor::new();
807        let mut s = test_endpoint(EndpointType::Sink);
808
809        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
810
811        let remote_transport = establish_stream(&mut s);
812
813        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
814
815        // We expect release to succeed in this state.
816        s.release(responder, &peer).unwrap();
817        // Expect a "yes" response.
818        expect_remote_recv(&[0x42, 0x08], &signaling);
819
820        // Close the transport channel by dropping it.
821        drop(remote_transport);
822
823        // After the transport is closed we should transition to Idle.
824        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
825        assert_eq!(s.state(), StreamState::Idle);
826    }
827
828    #[test]
829    fn test_mediastream() {
830        let mut exec = fasync::TestExecutor::new();
831        let mut s = test_endpoint(EndpointType::Sink);
832
833        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
834
835        // Before the stream is opened, we shouldn't be able to take the transport.
836        assert!(s.take_transport().is_none());
837
838        let remote_transport = establish_stream(&mut s);
839
840        // Should be able to get the transport from the stream now.
841        let temp_stream = s.take_transport();
842        assert!(temp_stream.is_some());
843
844        // But only once
845        assert!(s.take_transport().is_none());
846
847        // Until you drop the stream
848        drop(temp_stream);
849
850        let media_stream = s.take_transport();
851        assert!(media_stream.is_some());
852        let mut media_stream = media_stream.unwrap();
853
854        // Max TX size is taken from the underlying channel.
855        assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
856
857        // Writing to the media stream should send it through the transport channel.
858        let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
859        let mut write_fut = media_stream.write(hearts);
860
861        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
862
863        expect_remote_recv(hearts, &remote_transport);
864
865        // Closing the media stream should close the channel.
866        let mut close_fut = media_stream.close();
867        assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
868        // Note: there's no effect on the other end of the channel when a close occurs,
869        // until the channel is dropped.
870
871        drop(s);
872
873        // Reading from the remote end should fail.
874        let mut result = vec![0];
875        assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
876
877        // After the stream is gone, any write should return an Err
878        let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
879        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
880
881        // After the stream is gone, the stream should be fused done.
882        let mut next_fut = media_stream.next();
883        assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
884
885        assert!(media_stream.is_terminated(), "should be terminated");
886
887        // And the Max TX should be an error.
888        assert_matches!(media_stream.max_tx_size(), Err(_));
889    }
890
891    #[test]
892    fn stream_release_with_abort() {
893        let mut exec = fasync::TestExecutor::new();
894        let mut s = test_endpoint(EndpointType::Sink);
895
896        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
897        let remote_transport = establish_stream(&mut s);
898        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
899
900        // We expect release to succeed in this state, then start the task to wait for the close.
901        s.release(responder, &peer).unwrap();
902        // Expect a "yes" response.
903        expect_remote_recv(&[0x42, 0x08], &signaling);
904
905        // Should get an abort
906        let next = std::pin::pin!(signaling.next());
907        let received =
908            exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
909        assert_eq!(0x0A, received[1]);
910        let txlabel = received[0] & 0xF0;
911        // Send a response
912        assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
913
914        let _ = exec.run_singlethreaded(&mut remote_transport.closed());
915
916        // We will then end up in Idle.
917        while s.state() != StreamState::Idle {
918            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
919        }
920    }
921
922    #[test]
923    fn start_and_suspend() {
924        let mut exec = fasync::TestExecutor::new();
925        let mut s = test_endpoint(EndpointType::Sink);
926
927        // Can't start or suspend until configured and open.
928        assert_matches!(s.start(), Err(ErrorCode::BadState));
929        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
930
931        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
932
933        assert_matches!(s.start(), Err(ErrorCode::BadState));
934        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
935
936        assert_matches!(s.establish(), Ok(()));
937
938        assert_matches!(s.start(), Err(ErrorCode::BadState));
939        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
940
941        let (remote, local) = zx::Socket::create_datagram();
942        let (client_end, mut direction_request_stream) =
943            create_request_stream::<bredr::AudioDirectionExtMarker>();
944        let ext = bredr::Channel {
945            socket: Some(local),
946            channel_mode: Some(fidl_bt::ChannelMode::Basic),
947            max_tx_sdu_size: Some(1004),
948            ext_direction: Some(client_end),
949            ..Default::default()
950        };
951        let transport = Channel::try_from(ext).unwrap();
952        assert_matches!(s.receive_channel(transport), Ok(false));
953
954        // Should be able to start but not suspend now.
955        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
956        assert_matches!(s.start(), Ok(()));
957
958        match exec.run_until_stalled(&mut direction_request_stream.next()) {
959            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
960                priority,
961                responder,
962            }))) => {
963                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
964                responder.send(Ok(())).expect("response to send cleanly");
965            }
966            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
967        };
968
969        // Are started, so we should be able to suspend but not start again here.
970        assert_matches!(s.start(), Err(ErrorCode::BadState));
971        assert_matches!(s.suspend(), Ok(()));
972
973        match exec.run_until_stalled(&mut direction_request_stream.next()) {
974            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
975                priority,
976                responder,
977            }))) => {
978                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
979                responder.send(Ok(())).expect("response to send cleanly");
980            }
981            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
982        };
983
984        // Now we're suspended, so we can start it again.
985        assert_matches!(s.start(), Ok(()));
986        assert_matches!(s.suspend(), Ok(()));
987
988        // After we close, we are back at idle and can't start / stop
989        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
990
991        {
992            s.release(responder, &peer).unwrap();
993            // Expect a "yes" response.
994            expect_remote_recv(&[0x42, 0x08], &signaling);
995            // Close the transport channel by dropping it.
996            drop(remote);
997            while s.state() != StreamState::Idle {
998                let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
999            }
1000        }
1001
1002        // Shouldn't be able to start or suspend again.
1003        assert_matches!(s.start(), Err(ErrorCode::BadState));
1004        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
1005    }
1006
1007    fn receive_l2cap_params_channel(
1008        s: &mut StreamEndpoint,
1009    ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
1010        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
1011        assert_matches!(s.establish(), Ok(()));
1012
1013        let (remote, local) = zx::Socket::create_datagram();
1014        let (client_end, l2cap_params_requests) =
1015            create_request_stream::<bredr::L2capParametersExtMarker>();
1016        let ext = bredr::Channel {
1017            socket: Some(local),
1018            channel_mode: Some(fidl_bt::ChannelMode::Basic),
1019            max_tx_sdu_size: Some(1004),
1020            ext_l2cap: Some(client_end),
1021            ..Default::default()
1022        };
1023        let transport = Channel::try_from(ext).unwrap();
1024        assert_matches!(s.receive_channel(transport), Ok(false));
1025        (remote, l2cap_params_requests)
1026    }
1027
1028    #[test]
1029    fn sets_flush_timeout_for_source_transports() {
1030        let mut exec = fasync::TestExecutor::new();
1031        let mut s = test_endpoint(EndpointType::Source);
1032        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1033
1034        // Should request to set the flush timeout.
1035        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1036            Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1037                request,
1038                responder,
1039            }))) => {
1040                assert_eq!(
1041                    Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1042                    request.flush_timeout
1043                );
1044                responder.send(&request).expect("response to send cleanly");
1045            }
1046            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1047        };
1048    }
1049
1050    #[test]
1051    fn no_flush_timeout_for_sink_transports() {
1052        let mut exec = fasync::TestExecutor::new();
1053        let mut s = test_endpoint(EndpointType::Sink);
1054        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1055
1056        // Should NOT request to set the flush timeout.
1057        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1058            Poll::Pending => {}
1059            x => panic!("Expected no request to set flush timeout, got {:?}", x),
1060        };
1061    }
1062
1063    #[test]
1064    fn get_configuration() {
1065        let mut s = test_endpoint(EndpointType::Sink);
1066
1067        // Can't get configuration if we aren't configured.
1068        assert!(s.get_configuration().is_none());
1069
1070        let config = vec![
1071            ServiceCapability::MediaTransport,
1072            ServiceCapability::MediaCodec {
1073                media_type: MediaType::Audio,
1074                codec_type: MediaCodecType::new(0),
1075                // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
1076                codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1077            },
1078        ];
1079
1080        assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1081
1082        match s.get_configuration() {
1083            Some(c) => assert_eq!(&config, c),
1084            x => panic!("Expected Ok from get_configuration but got {:?}", x),
1085        };
1086
1087        // Abort this stream, putting it back to the idle state.
1088        s.abort();
1089
1090        assert!(s.get_configuration().is_none());
1091    }
1092
1093    use std::sync::atomic::{AtomicUsize, Ordering};
1094
1095    /// Create a callback that tracks how many times it has been called
1096    fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1097        let call_count = Arc::new(AtomicUsize::new(0));
1098        let call_count_reader = call_count.clone();
1099        let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1100            let _ = call_count.fetch_add(1, Ordering::SeqCst);
1101        });
1102        (Some(count_cb), call_count_reader)
1103    }
1104
1105    /// Test that the update callback is run at least once for all methods that mutate the state of
1106    /// the StreamEndpoint. This is done through an atomic counter in the callback that increments
1107    /// when the callback is run.
1108    ///
1109    /// Note that the _results_ of calling these mutating methods on the state of StreamEndpoint are
1110    /// not validated here. They are validated in other tests.
1111    #[test]
1112    fn update_callback() {
1113        // Need an executor to make a socket
1114        let _exec = fasync::TestExecutor::new();
1115        let mut s = test_endpoint(EndpointType::Sink);
1116        let (cb, call_count) = call_count_callback();
1117        s.set_update_callback(cb);
1118
1119        s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1120            .expect("Configure to succeed in test");
1121        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1122        call_count.store(0, Ordering::SeqCst); // clear call count
1123
1124        s.establish().expect("Establish to succeed in test");
1125        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1126        call_count.store(0, Ordering::SeqCst); // clear call count
1127
1128        let (_, transport) = Channel::create();
1129        assert_eq!(
1130            s.receive_channel(transport).expect("Receive channel to succeed in test"),
1131            false
1132        );
1133        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1134        call_count.store(0, Ordering::SeqCst); // clear call count
1135
1136        s.start().expect("Start to succeed in test");
1137        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1138        call_count.store(0, Ordering::SeqCst); // clear call count
1139
1140        s.suspend().expect("Suspend to succeed in test");
1141        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1142        call_count.store(0, Ordering::SeqCst); // clear call count
1143
1144        s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1145        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1146        call_count.store(0, Ordering::SeqCst); // clear call count
1147
1148        // Abort this stream, putting it back to the idle state.
1149        s.abort();
1150    }
1151}