bt_avdtp/
stream_endpoint.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{DurationExt, Task, TimeoutExt};
6use fuchsia_bluetooth::types::{A2dpDirection, Channel};
7use fuchsia_sync::Mutex;
8use futures::stream::Stream;
9use futures::{io, FutureExt};
10use log::warn;
11use std::fmt;
12use std::pin::Pin;
13use std::sync::{Arc, RwLock, Weak};
14use std::task::{Context, Poll};
15use zx::{MonotonicDuration, Status};
16
17use crate::types::{
18    EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
19    ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
20};
21use crate::{Peer, SimpleResponder};
22
23pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
24
25/// The state of a StreamEndpoint.
26#[derive(PartialEq, Debug, Default, Clone, Copy)]
27pub enum StreamState {
28    #[default]
29    Idle,
30    Configured,
31    // An Open command has been accepted, but streams have not been established yet.
32    Opening,
33    Open,
34    Streaming,
35    Closing,
36    Aborting,
37}
38
39/// An AVDTP StreamEndpoint. StreamEndpoints represent a particular capability of the application
40/// to be a source of sink of media. Included here to aid negotiating the stream connection.
41/// See Section 5.3 of the AVDTP 1.3 Specification for more information about the Stream Endpoint
42/// Architecture.
43pub struct StreamEndpoint {
44    /// Local stream endpoint id.  This should be unique per AVDTP Peer.
45    id: StreamEndpointId,
46    /// The type of endpoint this is (TSEP), Source or Sink.
47    endpoint_type: EndpointType,
48    /// The media type this stream represents.
49    media_type: MediaType,
50    /// Current state the stream is in. See Section 6.5 for an overview.
51    state: Arc<Mutex<StreamState>>,
52    /// The media transport channel
53    /// This should be Some(channel) when state is Open or Streaming.
54    transport: Option<Arc<RwLock<Channel>>>,
55    /// True when the MediaStream is held.
56    /// Prevents multiple threads from owning the media stream.
57    stream_held: Arc<Mutex<bool>>,
58    /// The capabilities of this endpoint.
59    capabilities: Vec<ServiceCapability>,
60    /// The remote stream endpoint id.  None if the stream has never been configured.
61    remote_id: Option<StreamEndpointId>,
62    /// The current configuration of this endpoint.  Empty if the stream has never been configured.
63    configuration: Vec<ServiceCapability>,
64    /// Callback that is run whenever the endpoint is updated
65    update_callback: Option<StreamEndpointUpdateCallback>,
66    /// In-progress task. This is only used for the Release procedure which places the state in Closing
67    /// and must wait for the peer to close transport channels.
68    in_progress: Option<Task<()>>,
69}
70
71impl fmt::Debug for StreamEndpoint {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("StreamEndpoint")
74            .field("id", &self.id.0)
75            .field("endpoint_type", &self.endpoint_type)
76            .field("media_type", &self.media_type)
77            .field("state", &self.state)
78            .field("capabilities", &self.capabilities)
79            .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
80            .field("configuration", &self.configuration)
81            .finish()
82    }
83}
84
85impl StreamEndpoint {
86    /// Make a new StreamEndpoint.
87    /// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E).
88    /// StreamEndpoints start in the Idle state.
89    pub fn new(
90        id: u8,
91        media_type: MediaType,
92        endpoint_type: EndpointType,
93        capabilities: Vec<ServiceCapability>,
94    ) -> AvdtpResult<StreamEndpoint> {
95        let seid = StreamEndpointId::try_from(id)?;
96        Ok(StreamEndpoint {
97            id: seid,
98            capabilities,
99            media_type,
100            endpoint_type,
101            state: Default::default(),
102            transport: None,
103            stream_held: Arc::new(Mutex::new(false)),
104            remote_id: None,
105            configuration: vec![],
106            update_callback: None,
107            in_progress: None,
108        })
109    }
110
111    pub fn as_new(&self) -> Self {
112        StreamEndpoint::new(
113            self.id.0,
114            self.media_type.clone(),
115            self.endpoint_type.clone(),
116            self.capabilities.clone(),
117        )
118        .expect("as_new")
119    }
120
121    /// Set the state to the given value and run the `update_callback` afterwards
122    fn set_state(&mut self, state: StreamState) {
123        *self.state.lock() = state;
124        self.update_callback();
125    }
126
127    /// Pass update callback to StreamEndpoint that will be called anytime `StreamEndpoint` is
128    /// modified.
129    pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
130        self.update_callback = callback;
131    }
132
133    fn update_callback(&self) {
134        if let Some(cb) = self.update_callback.as_ref() {
135            cb(self);
136        }
137    }
138
139    /// Build a new StreamEndpoint from a StreamInformation and associated Capabilities.
140    /// This makes it easy to build from AVDTP Discover and GetCapabilities procedures.
141    /// StreamEndpooints start in the Idle state.
142    pub fn from_info(
143        info: &StreamInformation,
144        capabilities: Vec<ServiceCapability>,
145    ) -> StreamEndpoint {
146        StreamEndpoint {
147            id: info.id().clone(),
148            capabilities,
149            media_type: info.media_type().clone(),
150            endpoint_type: info.endpoint_type().clone(),
151            state: Default::default(),
152            transport: None,
153            stream_held: Arc::new(Mutex::new(false)),
154            remote_id: None,
155            configuration: vec![],
156            update_callback: None,
157            in_progress: None,
158        }
159    }
160
161    /// Checks that the state is in the set of states.
162    /// If not, returns Err(ErrorCode::BadState).
163    fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
164        (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
165    }
166
167    /// Attempt to Configure this stream using the capabilities given.
168    /// If the stream is not in an Idle state, fails with Err(InvalidState).
169    /// Used for the Stream Configuration procedure, see Section 6.9
170    pub fn configure(
171        &mut self,
172        remote_id: &StreamEndpointId,
173        capabilities: Vec<ServiceCapability>,
174    ) -> Result<(), (ServiceCategory, ErrorCode)> {
175        self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
176        self.remote_id = Some(remote_id.clone());
177        for cap in &capabilities {
178            if !self
179                .capabilities
180                .iter()
181                .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
182            {
183                return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
184            }
185        }
186        self.configuration = capabilities;
187        self.set_state(StreamState::Configured);
188        Ok(())
189    }
190
191    /// Attempt to reconfigure this stream with the capabilities given.  If any capability is not
192    /// valid to set, fails with the first such category and InvalidCapabilities If the stream is
193    /// not in the Open state, fails with Err((None, BadState)) Used for the Stream Reconfiguration
194    /// procedure, see Section 6.15.
195    pub fn reconfigure(
196        &mut self,
197        mut capabilities: Vec<ServiceCapability>,
198    ) -> Result<(), (ServiceCategory, ErrorCode)> {
199        self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
200        // Only application capabilities are allowed to be reconfigured. See Section 8.11.1
201        if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
202            return Err((cap.category(), ErrorCode::InvalidCapabilities));
203        }
204        // Should only replace the capabilities that have been configured. See Section 8.11.2
205        let to_replace: std::vec::Vec<_> =
206            capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
207        self.configuration.retain(|x| {
208            let disc = std::mem::discriminant(x);
209            !to_replace.contains(&disc)
210        });
211        self.configuration.append(&mut capabilities);
212        self.update_callback();
213        Ok(())
214    }
215
216    /// Get the current configuration of this stream.
217    /// If the stream is not configured, returns None.
218    /// Used for the Steam Get Configuration Procedure, see Section 6.10
219    pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
220        if self.configuration.is_empty() {
221            return None;
222        }
223        Some(&self.configuration)
224    }
225
226    // 100 milliseconds chosen based on end of range testing, to allow for recovery after normal
227    // packet delivery continues.
228    const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
229
230    /// When a L2CAP channel is received after an Open command is accepted, it should be
231    /// delivered via receive_channel.
232    /// Returns true if this Endpoint expects more channels to be established before
233    /// streaming is started.
234    /// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established,
235    /// closing |c|.
236    pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
237        if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
238            return Err(Error::InvalidState);
239        }
240        self.transport = Some(Arc::new(RwLock::new(c)));
241        self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
242        self.stream_held = Arc::new(Mutex::new(false));
243        // TODO(jamuraa, https://fxbug.dev/42051664, https://fxbug.dev/42051776): Reporting and Recovery channels
244        self.set_state(StreamState::Open);
245        Ok(false)
246    }
247
248    /// Begin opening this stream.  The stream must be in a Configured state.
249    /// See Stream Establishment, Section 6.11
250    pub fn establish(&mut self) -> Result<(), ErrorCode> {
251        if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
252            return Err(ErrorCode::BadState);
253        }
254        self.set_state(StreamState::Opening);
255        Ok(())
256    }
257
258    /// Attempts to set audio direction priority of the MediaTransport channel based on
259    /// whether the stream is a source or sink endpoint if `active` is true.  If `active` is
260    /// false, set the priority to Normal instead.  Does nothing on failure.
261    pub fn try_priority(&self, active: bool) {
262        let priority = match (active, &self.endpoint_type) {
263            (false, _) => A2dpDirection::Normal,
264            (true, EndpointType::Source) => A2dpDirection::Source,
265            (true, EndpointType::Sink) => A2dpDirection::Sink,
266        };
267        let fut = match self.transport.as_ref().unwrap().try_read() {
268            Err(_) => return,
269            Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
270        };
271        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
272        Task::spawn(fut).detach();
273    }
274
275    /// Attempts to set the flush timeout for the MediaTransport channel, for source endpoints.
276    pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
277        if self.endpoint_type != EndpointType::Source {
278            return;
279        }
280        let fut = match self.transport.as_ref().unwrap().try_write() {
281            Err(_) => return,
282            Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
283        };
284        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
285        Task::spawn(fut).detach();
286    }
287
288    /// Close this stream.  This procedure will wait until media channels are closed before
289    /// transitioning to Idle.  If the channels are not closed in 3 seconds, we initiate an abort
290    /// procedure with the remote |peer| to force a transition to Idle.
291    pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
292        {
293            let lock = self.state.lock();
294            if *lock != StreamState::Open && *lock != StreamState::Streaming {
295                return responder.reject(ErrorCode::BadState);
296            }
297        }
298        self.set_state(StreamState::Closing);
299        responder.send()?;
300        let release_wait_fut = {
301            // Take our transport and remote id - after this procedure it will be closed.
302            // These must be Some(_) because we are in Open / Streaming state.
303            let seid = self.remote_id.take().unwrap();
304            let transport = self.transport.take().unwrap();
305            let peer = peer.clone();
306            let state = self.state.clone();
307            async move {
308                let Ok(transport) = transport.try_read() else {
309                    warn!("unable to lock transport channel, dropping and assuming closed");
310                    *state.lock() = StreamState::Idle;
311                    return;
312                };
313                let closed_fut = transport
314                    .closed()
315                    .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
316                        Err(Status::TIMED_OUT)
317                    });
318                if let Err(Status::TIMED_OUT) = closed_fut.await {
319                    let _ = peer.abort(&seid).await;
320                    *state.lock() = StreamState::Aborting;
321                    // As the initiator of the Abort, we close our channel.
322                    drop(transport);
323                }
324                *state.lock() = StreamState::Idle;
325            }
326        };
327        self.in_progress = Some(Task::local(release_wait_fut));
328        // Closing will return this endpoint to the Idle state, one way or another with no
329        // configuration
330        self.configuration.clear();
331        self.update_callback();
332        Ok(())
333    }
334
335    /// Returns the current state of this endpoint.
336    pub fn state(&self) -> StreamState {
337        *self.state.lock()
338    }
339
340    /// Start this stream.  This can be done only from the Open State.
341    /// Used for the Stream Start procedure, See Section 6.12
342    pub fn start(&mut self) -> Result<(), ErrorCode> {
343        self.state_is(StreamState::Open)?;
344        self.try_priority(true);
345        self.set_state(StreamState::Streaming);
346        Ok(())
347    }
348
349    /// Suspend this stream.  This can be done only from the Streaming state.
350    /// Used for the Stream Suspend procedure, See Section 6.14
351    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
352        self.state_is(StreamState::Streaming)?;
353        self.set_state(StreamState::Open);
354        self.try_priority(false);
355        Ok(())
356    }
357
358    /// Abort this stream.  This can be done from any state, and will always return the state
359    /// to Idle.  We are initiating this procedure so will wait for a response and all our
360    /// channels will be closed.
361    pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
362        if let Some(seid) = self.remote_id.take() {
363            let _ = peer.abort(&seid).await;
364            self.set_state(StreamState::Aborting);
365        }
366        self.abort()
367    }
368
369    /// Abort this stream.  This can be done from any state, and will always return the state
370    /// to Idle.  We are receiving this abort from the peer, and all our channels will close.
371    pub fn abort(&mut self) {
372        self.set_state(StreamState::Aborting);
373        self.configuration.clear();
374        self.remote_id = None;
375        self.transport = None;
376        self.set_state(StreamState::Idle);
377    }
378
379    /// Capabilities of this StreamEndpoint.
380    /// Provides support for the Get Capabilities and Get All Capabilities signaling procedures.
381    /// See Sections 6.7 and 6.8
382    pub fn capabilities(&self) -> &Vec<ServiceCapability> {
383        &self.capabilities
384    }
385
386    /// Returns the CodecType of this StreamEndpoint.
387    /// Returns None if there is no MediaCodec capability in the endpoint.
388    /// Note: a MediaCodec capability is required by all endpoints by the spec.
389    pub fn codec_type(&self) -> Option<&MediaCodecType> {
390        self.capabilities.iter().find_map(|cap| match cap {
391            ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
392            _ => None,
393        })
394    }
395
396    /// Returns the local StreamEndpointId for this endpoint.
397    pub fn local_id(&self) -> &StreamEndpointId {
398        &self.id
399    }
400
401    /// Returns the remote StreamEndpointId for this endpoint, if it's configured.
402    pub fn remote_id(&self) -> Option<&StreamEndpointId> {
403        self.remote_id.as_ref()
404    }
405
406    /// Returns the EndpointType of this endpoint
407    pub fn endpoint_type(&self) -> &EndpointType {
408        &self.endpoint_type
409    }
410
411    /// Make a StreamInformation which represents the current state of this stream.
412    pub fn information(&self) -> StreamInformation {
413        let in_use = self.state_is(StreamState::Idle).is_err();
414        StreamInformation::new(
415            self.id.clone(),
416            in_use,
417            self.media_type.clone(),
418            self.endpoint_type.clone(),
419        )
420    }
421
422    /// Take the media transport channel, which transmits (or receives) any media for this
423    /// StreamEndpoint.  Returns None if the channel is held already, or if the channel has not
424    /// been opened.
425    pub fn take_transport(&mut self) -> Option<MediaStream> {
426        let mut stream_held = self.stream_held.lock();
427        if *stream_held || self.transport.is_none() {
428            return None;
429        }
430
431        *stream_held = true;
432
433        Some(MediaStream::new(
434            self.stream_held.clone(),
435            Arc::downgrade(self.transport.as_ref().unwrap()),
436        ))
437    }
438}
439
440/// Represents a media transport stream.
441/// If a sink, produces the bytes that have been delivered from the peer.
442/// If a source, can send bytes using `send`
443pub struct MediaStream {
444    in_use: Arc<Mutex<bool>>,
445    channel: Weak<RwLock<Channel>>,
446}
447
448impl MediaStream {
449    pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
450        Self { in_use, channel }
451    }
452
453    fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
454        self.channel
455            .upgrade()
456            .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
457    }
458
459    pub fn max_tx_size(&self) -> Result<usize, io::Error> {
460        match self.try_upgrade()?.try_read() {
461            Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
462            Ok(lock) => Ok(lock.max_tx_size()),
463        }
464    }
465}
466
467impl Drop for MediaStream {
468    fn drop(&mut self) {
469        let mut l = self.in_use.lock();
470        *l = false;
471    }
472}
473
474impl Stream for MediaStream {
475    type Item = AvdtpResult<Vec<u8>>;
476
477    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
478        let arc_chan = match self.try_upgrade() {
479            Err(_e) => return Poll::Ready(None),
480            Ok(c) => c,
481        };
482        let lock = match arc_chan.try_write() {
483            Err(_e) => return Poll::Ready(None),
484            Ok(lock) => lock,
485        };
486        let mut pin_chan = Pin::new(lock);
487        match pin_chan.as_mut().poll_next(cx) {
488            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
489            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
490            Poll::Ready(None) => Poll::Ready(None),
491            Poll::Pending => Poll::Pending,
492        }
493    }
494}
495
496impl io::AsyncWrite for MediaStream {
497    fn poll_write(
498        self: Pin<&mut Self>,
499        cx: &mut Context<'_>,
500        buf: &[u8],
501    ) -> Poll<Result<usize, io::Error>> {
502        let arc_chan = match self.try_upgrade() {
503            Err(e) => return Poll::Ready(Err(e)),
504            Ok(c) => c,
505        };
506        let lock = match arc_chan.try_write() {
507            Err(_) => {
508                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
509            }
510            Ok(lock) => lock,
511        };
512        let mut pin_chan = Pin::new(lock);
513        pin_chan.as_mut().poll_write(cx, buf)
514    }
515
516    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
517        let arc_chan = match self.try_upgrade() {
518            Err(e) => return Poll::Ready(Err(e)),
519            Ok(c) => c,
520        };
521        let lock = match arc_chan.try_write() {
522            Err(_) => {
523                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
524            }
525            Ok(lock) => lock,
526        };
527        let mut pin_chan = Pin::new(lock);
528        pin_chan.as_mut().poll_flush(cx)
529    }
530
531    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
532        let arc_chan = match self.try_upgrade() {
533            Err(e) => return Poll::Ready(Err(e)),
534            Ok(c) => c,
535        };
536        let lock = match arc_chan.try_write() {
537            Err(_) => {
538                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
539            }
540            Ok(lock) => lock,
541        };
542        let mut pin_chan = Pin::new(lock);
543        pin_chan.as_mut().poll_close(cx)
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550    use crate::tests::{expect_remote_recv, setup_peer};
551    use crate::Request;
552
553    use assert_matches::assert_matches;
554    use fidl::endpoints::create_request_stream;
555    use futures::io::AsyncWriteExt;
556    use futures::stream::StreamExt;
557    use {
558        fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
559        fuchsia_async as fasync,
560    };
561
562    const REMOTE_ID_VAL: u8 = 1;
563    const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
564
565    #[test]
566    fn make() {
567        let s = StreamEndpoint::new(
568            REMOTE_ID_VAL,
569            MediaType::Audio,
570            EndpointType::Sink,
571            vec![ServiceCapability::MediaTransport],
572        );
573        assert!(s.is_ok());
574        let s = s.unwrap();
575        assert_eq!(&StreamEndpointId(1), s.local_id());
576
577        let info = s.information();
578        assert!(!info.in_use());
579
580        let no = StreamEndpoint::new(
581            0,
582            MediaType::Audio,
583            EndpointType::Sink,
584            vec![ServiceCapability::MediaTransport],
585        );
586        assert!(no.is_err());
587    }
588
589    fn establish_stream(s: &mut StreamEndpoint) -> Channel {
590        assert_matches!(s.establish(), Ok(()));
591        let (chan, remote) = Channel::create();
592        assert_matches!(s.receive_channel(chan), Ok(false));
593        remote
594    }
595
596    #[test]
597    fn from_info() {
598        let seid = StreamEndpointId::try_from(5).unwrap();
599        let info =
600            StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
601        let capabilities = vec![ServiceCapability::MediaTransport];
602
603        let endpoint = StreamEndpoint::from_info(&info, capabilities);
604
605        assert_eq!(&seid, endpoint.local_id());
606        assert_eq!(&false, endpoint.information().in_use());
607        assert_eq!(1, endpoint.capabilities().len());
608    }
609
610    #[test]
611    fn codec_type() {
612        let s = StreamEndpoint::new(
613            REMOTE_ID_VAL,
614            MediaType::Audio,
615            EndpointType::Sink,
616            vec![
617                ServiceCapability::MediaTransport,
618                ServiceCapability::MediaCodec {
619                    media_type: MediaType::Audio,
620                    codec_type: MediaCodecType::new(0x40),
621                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
622                },
623            ],
624        )
625        .unwrap();
626
627        assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
628
629        let s = StreamEndpoint::new(
630            REMOTE_ID_VAL,
631            MediaType::Audio,
632            EndpointType::Sink,
633            vec![ServiceCapability::MediaTransport],
634        )
635        .unwrap();
636
637        assert_eq!(None, s.codec_type());
638    }
639
640    fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
641        StreamEndpoint::new(
642            REMOTE_ID_VAL,
643            MediaType::Audio,
644            r#type,
645            vec![
646                ServiceCapability::MediaTransport,
647                ServiceCapability::MediaCodec {
648                    media_type: MediaType::Audio,
649                    codec_type: MediaCodecType::new(0x40),
650                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
651                },
652            ],
653        )
654        .unwrap()
655    }
656
657    #[test]
658    fn stream_configure_reconfigure() {
659        let _exec = fasync::TestExecutor::new();
660        let mut s = test_endpoint(EndpointType::Sink);
661
662        // Can't configure items that aren't in range.
663        assert_matches!(
664            s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
665            Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
666        );
667
668        assert_matches!(
669            s.configure(
670                &REMOTE_ID,
671                vec![
672                    ServiceCapability::MediaTransport,
673                    ServiceCapability::MediaCodec {
674                        media_type: MediaType::Audio,
675                        codec_type: MediaCodecType::new(0x40),
676                        // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
677                        codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
678                    }
679                ]
680            ),
681            Ok(())
682        );
683
684        // Note: we allow endpoints to be configured (and reconfigured) again when they
685        // are only configured, even though this is probably not allowed per the spec.
686
687        // Can't configure while open
688        let _channel = establish_stream(&mut s);
689
690        assert_matches!(
691            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
692            Err((_, ErrorCode::BadState))
693        );
694
695        let reconfiguration = vec![ServiceCapability::MediaCodec {
696            media_type: MediaType::Audio,
697            codec_type: MediaCodecType::new(0x40),
698            // Reconfigure to yet another different codec_extra value.
699            codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
700        }];
701
702        // The new configuration should match the previous one, but with the reconfigured
703        // capabilities updated.
704        let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
705
706        // Reconfiguring while open is fine though.
707        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
708
709        assert_eq!(Some(&new_configuration), s.get_configuration());
710
711        // Can't reconfigure non-application types
712        assert_matches!(
713            s.reconfigure(vec![ServiceCapability::MediaTransport]),
714            Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
715        );
716
717        // Can't configure or reconfigure while streaming
718        assert_matches!(s.start(), Ok(()));
719
720        assert_matches!(
721            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
722            Err((_, ErrorCode::BadState))
723        );
724
725        assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
726
727        assert_matches!(s.suspend(), Ok(()));
728
729        // Reconfigure should be fine again in open state.
730        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
731
732        // Configure is still not allowed.
733        assert_matches!(
734            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
735            Err((_, ErrorCode::BadState))
736        );
737    }
738
739    #[test]
740    fn stream_establishment() {
741        let _exec = fasync::TestExecutor::new();
742        let mut s = test_endpoint(EndpointType::Sink);
743
744        let (remote, transport) = Channel::create();
745
746        // Can't establish before configuring
747        assert_matches!(s.establish(), Err(ErrorCode::BadState));
748
749        // Trying to receive a channel in the wrong state closes the channel
750        assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
751
752        let buf: &mut [u8] = &mut [0; 1];
753
754        assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
755
756        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
757
758        assert_matches!(s.establish(), Ok(()));
759
760        // And we should be able to give a channel now.
761        let (_remote, transport) = Channel::create();
762        assert_matches!(s.receive_channel(transport), Ok(false));
763    }
764
765    fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
766        let (peer, signaling) = setup_peer();
767        // Send a close from the other side to produce an event we can respond to.
768        let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
769        let mut req_stream = peer.take_request_stream();
770        let mut req_fut = req_stream.next();
771        let complete = exec.run_until_stalled(&mut req_fut);
772        let responder = match complete {
773            Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
774            _ => panic!("Expected a close request"),
775        };
776        (peer, signaling, responder)
777    }
778
779    #[test]
780    fn stream_release_without_abort() {
781        let mut exec = fasync::TestExecutor::new();
782        let mut s = test_endpoint(EndpointType::Sink);
783
784        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
785
786        let remote_transport = establish_stream(&mut s);
787
788        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
789
790        // We expect release to succeed in this state.
791        s.release(responder, &peer).unwrap();
792        // Expect a "yes" response.
793        expect_remote_recv(&[0x42, 0x08], &signaling);
794
795        // Close the transport channel by dropping it.
796        drop(remote_transport);
797
798        // After the transport is closed we should transition to Idle.
799        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
800        assert_eq!(s.state(), StreamState::Idle);
801    }
802
803    #[test]
804    fn test_mediastream() {
805        let mut exec = fasync::TestExecutor::new();
806        let mut s = test_endpoint(EndpointType::Sink);
807
808        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
809
810        // Before the stream is opened, we shouldn't be able to take the transport.
811        assert!(s.take_transport().is_none());
812
813        let remote_transport = establish_stream(&mut s);
814
815        // Should be able to get the transport from the stream now.
816        let temp_stream = s.take_transport();
817        assert!(temp_stream.is_some());
818
819        // But only once
820        assert!(s.take_transport().is_none());
821
822        // Until you drop the stream
823        drop(temp_stream);
824
825        let media_stream = s.take_transport();
826        assert!(media_stream.is_some());
827        let mut media_stream = media_stream.unwrap();
828
829        // Max TX size is taken from the underlying channel.
830        assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
831
832        // Writing to the media stream should send it through the transport channel.
833        let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
834        let mut write_fut = media_stream.write(hearts);
835
836        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
837
838        expect_remote_recv(hearts, &remote_transport);
839
840        // Closing the media stream should close the channel.
841        let mut close_fut = media_stream.close();
842        assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
843        // Note: there's no effect on the other end of the channel when a close occurs,
844        // until the channel is dropped.
845
846        drop(s);
847
848        // Reading from the remote end should fail.
849        let mut result = vec![0];
850        assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
851
852        // After the stream is gone, any write should return an Err
853        let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
854        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
855
856        // After the stream is gone, the stream should be fused done.
857        let mut next_fut = media_stream.next();
858        assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
859
860        // And the Max TX should be an error.
861        assert_matches!(media_stream.max_tx_size(), Err(_));
862    }
863
864    #[test]
865    fn stream_release_with_abort() {
866        let mut exec = fasync::TestExecutor::new();
867        let mut s = test_endpoint(EndpointType::Sink);
868
869        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
870        let remote_transport = establish_stream(&mut s);
871        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
872
873        // We expect release to succeed in this state, then start the task to wait for the close.
874        s.release(responder, &peer).unwrap();
875        // Expect a "yes" response.
876        expect_remote_recv(&[0x42, 0x08], &signaling);
877
878        // Should get an abort
879        let next = std::pin::pin!(signaling.next());
880        let received =
881            exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
882        assert_eq!(0x0A, received[1]);
883        let txlabel = received[0] & 0xF0;
884        // Send a response
885        assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
886
887        let _ = exec.run_singlethreaded(&mut remote_transport.closed());
888
889        // We will then end up in Idle.
890        while s.state() != StreamState::Idle {
891            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
892        }
893    }
894
895    #[test]
896    fn start_and_suspend() {
897        let mut exec = fasync::TestExecutor::new();
898        let mut s = test_endpoint(EndpointType::Sink);
899
900        // Can't start or suspend until configured and open.
901        assert_matches!(s.start(), Err(ErrorCode::BadState));
902        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
903
904        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
905
906        assert_matches!(s.start(), Err(ErrorCode::BadState));
907        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
908
909        assert_matches!(s.establish(), Ok(()));
910
911        assert_matches!(s.start(), Err(ErrorCode::BadState));
912        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
913
914        let (remote, local) = zx::Socket::create_datagram();
915        let (client_end, mut direction_request_stream) =
916            create_request_stream::<bredr::AudioDirectionExtMarker>();
917        let ext = bredr::Channel {
918            socket: Some(local),
919            channel_mode: Some(fidl_bt::ChannelMode::Basic),
920            max_tx_sdu_size: Some(1004),
921            ext_direction: Some(client_end),
922            ..Default::default()
923        };
924        let transport = Channel::try_from(ext).unwrap();
925        assert_matches!(s.receive_channel(transport), Ok(false));
926
927        // Should be able to start but not suspend now.
928        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
929        assert_matches!(s.start(), Ok(()));
930
931        match exec.run_until_stalled(&mut direction_request_stream.next()) {
932            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
933                priority,
934                responder,
935            }))) => {
936                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
937                responder.send(Ok(())).expect("response to send cleanly");
938            }
939            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
940        };
941
942        // Are started, so we should be able to suspend but not start again here.
943        assert_matches!(s.start(), Err(ErrorCode::BadState));
944        assert_matches!(s.suspend(), Ok(()));
945
946        match exec.run_until_stalled(&mut direction_request_stream.next()) {
947            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
948                priority,
949                responder,
950            }))) => {
951                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
952                responder.send(Ok(())).expect("response to send cleanly");
953            }
954            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
955        };
956
957        // Now we're suspended, so we can start it again.
958        assert_matches!(s.start(), Ok(()));
959        assert_matches!(s.suspend(), Ok(()));
960
961        // After we close, we are back at idle and can't start / stop
962        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
963
964        {
965            s.release(responder, &peer).unwrap();
966            // Expect a "yes" response.
967            expect_remote_recv(&[0x42, 0x08], &signaling);
968            // Close the transport channel by dropping it.
969            drop(remote);
970            while s.state() != StreamState::Idle {
971                let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
972            }
973        }
974
975        // Shouldn't be able to start or suspend again.
976        assert_matches!(s.start(), Err(ErrorCode::BadState));
977        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
978    }
979
980    fn receive_l2cap_params_channel(
981        s: &mut StreamEndpoint,
982    ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
983        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
984        assert_matches!(s.establish(), Ok(()));
985
986        let (remote, local) = zx::Socket::create_datagram();
987        let (client_end, l2cap_params_requests) =
988            create_request_stream::<bredr::L2capParametersExtMarker>();
989        let ext = bredr::Channel {
990            socket: Some(local),
991            channel_mode: Some(fidl_bt::ChannelMode::Basic),
992            max_tx_sdu_size: Some(1004),
993            ext_l2cap: Some(client_end),
994            ..Default::default()
995        };
996        let transport = Channel::try_from(ext).unwrap();
997        assert_matches!(s.receive_channel(transport), Ok(false));
998        (remote, l2cap_params_requests)
999    }
1000
1001    #[test]
1002    fn sets_flush_timeout_for_source_transports() {
1003        let mut exec = fasync::TestExecutor::new();
1004        let mut s = test_endpoint(EndpointType::Source);
1005        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1006
1007        // Should request to set the flush timeout.
1008        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1009            Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1010                request,
1011                responder,
1012            }))) => {
1013                assert_eq!(
1014                    Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1015                    request.flush_timeout
1016                );
1017                responder.send(&request).expect("response to send cleanly");
1018            }
1019            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1020        };
1021    }
1022
1023    #[test]
1024    fn no_flush_timeout_for_sink_transports() {
1025        let mut exec = fasync::TestExecutor::new();
1026        let mut s = test_endpoint(EndpointType::Sink);
1027        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1028
1029        // Should NOT request to set the flush timeout.
1030        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1031            Poll::Pending => {}
1032            x => panic!("Expected no request to set flush timeout, got {:?}", x),
1033        };
1034    }
1035
1036    #[test]
1037    fn get_configuration() {
1038        let mut s = test_endpoint(EndpointType::Sink);
1039
1040        // Can't get configuration if we aren't configured.
1041        assert!(s.get_configuration().is_none());
1042
1043        let config = vec![
1044            ServiceCapability::MediaTransport,
1045            ServiceCapability::MediaCodec {
1046                media_type: MediaType::Audio,
1047                codec_type: MediaCodecType::new(0),
1048                // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
1049                codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1050            },
1051        ];
1052
1053        assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1054
1055        match s.get_configuration() {
1056            Some(c) => assert_eq!(&config, c),
1057            x => panic!("Expected Ok from get_configuration but got {:?}", x),
1058        };
1059
1060        // Abort this stream, putting it back to the idle state.
1061        s.abort();
1062
1063        assert!(s.get_configuration().is_none());
1064    }
1065
1066    use std::sync::atomic::{AtomicUsize, Ordering};
1067
1068    /// Create a callback that tracks how many times it has been called
1069    fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1070        let call_count = Arc::new(AtomicUsize::new(0));
1071        let call_count_reader = call_count.clone();
1072        let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1073            let _ = call_count.fetch_add(1, Ordering::SeqCst);
1074        });
1075        (Some(count_cb), call_count_reader)
1076    }
1077
1078    /// Test that the update callback is run at least once for all methods that mutate the state of
1079    /// the StreamEndpoint. This is done through an atomic counter in the callback that increments
1080    /// when the callback is run.
1081    ///
1082    /// Note that the _results_ of calling these mutating methods on the state of StreamEndpoint are
1083    /// not validated here. They are validated in other tests.
1084    #[test]
1085    fn update_callback() {
1086        // Need an executor to make a socket
1087        let _exec = fasync::TestExecutor::new();
1088        let mut s = test_endpoint(EndpointType::Sink);
1089        let (cb, call_count) = call_count_callback();
1090        s.set_update_callback(cb);
1091
1092        s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1093            .expect("Configure to succeed in test");
1094        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1095        call_count.store(0, Ordering::SeqCst); // clear call count
1096
1097        s.establish().expect("Establish to succeed in test");
1098        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1099        call_count.store(0, Ordering::SeqCst); // clear call count
1100
1101        let (_, transport) = Channel::create();
1102        assert_eq!(
1103            s.receive_channel(transport).expect("Receive channel to succeed in test"),
1104            false
1105        );
1106        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1107        call_count.store(0, Ordering::SeqCst); // clear call count
1108
1109        s.start().expect("Start to succeed in test");
1110        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1111        call_count.store(0, Ordering::SeqCst); // clear call count
1112
1113        s.suspend().expect("Suspend to succeed in test");
1114        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1115        call_count.store(0, Ordering::SeqCst); // clear call count
1116
1117        s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1118        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1119        call_count.store(0, Ordering::SeqCst); // clear call count
1120
1121        // Abort this stream, putting it back to the idle state.
1122        s.abort();
1123    }
1124}