use fuchsia_async::{DurationExt, Task, TimeoutExt};
use fuchsia_bluetooth::types::{A2dpDirection, Channel};
use fuchsia_sync::Mutex;
use futures::stream::Stream;
use futures::{io, FutureExt};
use log::warn;
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::task::{Context, Poll};
use zx::{MonotonicDuration, Status};
use crate::types::{
EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
};
use crate::{Peer, SimpleResponder};
pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
#[derive(PartialEq, Debug, Default, Clone, Copy)]
pub enum StreamState {
#[default]
Idle,
Configured,
Opening,
Open,
Streaming,
Closing,
Aborting,
}
pub struct StreamEndpoint {
id: StreamEndpointId,
endpoint_type: EndpointType,
media_type: MediaType,
state: Arc<Mutex<StreamState>>,
transport: Option<Arc<RwLock<Channel>>>,
stream_held: Arc<Mutex<bool>>,
capabilities: Vec<ServiceCapability>,
remote_id: Option<StreamEndpointId>,
configuration: Vec<ServiceCapability>,
update_callback: Option<StreamEndpointUpdateCallback>,
in_progress: Option<Task<()>>,
}
impl fmt::Debug for StreamEndpoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamEndpoint")
.field("id", &self.id.0)
.field("endpoint_type", &self.endpoint_type)
.field("media_type", &self.media_type)
.field("state", &self.state)
.field("capabilities", &self.capabilities)
.field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
.field("configuration", &self.configuration)
.finish()
}
}
impl StreamEndpoint {
pub fn new(
id: u8,
media_type: MediaType,
endpoint_type: EndpointType,
capabilities: Vec<ServiceCapability>,
) -> AvdtpResult<StreamEndpoint> {
let seid = StreamEndpointId::try_from(id)?;
Ok(StreamEndpoint {
id: seid,
capabilities,
media_type,
endpoint_type,
state: Default::default(),
transport: None,
stream_held: Arc::new(Mutex::new(false)),
remote_id: None,
configuration: vec![],
update_callback: None,
in_progress: None,
})
}
pub fn as_new(&self) -> Self {
StreamEndpoint::new(
self.id.0,
self.media_type.clone(),
self.endpoint_type.clone(),
self.capabilities.clone(),
)
.expect("as_new")
}
fn set_state(&mut self, state: StreamState) {
*self.state.lock() = state;
self.update_callback();
}
pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
self.update_callback = callback;
}
fn update_callback(&self) {
if let Some(cb) = self.update_callback.as_ref() {
cb(self);
}
}
pub fn from_info(
info: &StreamInformation,
capabilities: Vec<ServiceCapability>,
) -> StreamEndpoint {
StreamEndpoint {
id: info.id().clone(),
capabilities,
media_type: info.media_type().clone(),
endpoint_type: info.endpoint_type().clone(),
state: Default::default(),
transport: None,
stream_held: Arc::new(Mutex::new(false)),
remote_id: None,
configuration: vec![],
update_callback: None,
in_progress: None,
}
}
fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
(*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
}
pub fn configure(
&mut self,
remote_id: &StreamEndpointId,
capabilities: Vec<ServiceCapability>,
) -> Result<(), (ServiceCategory, ErrorCode)> {
self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
self.remote_id = Some(remote_id.clone());
for cap in &capabilities {
if !self
.capabilities
.iter()
.any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
{
return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
}
}
self.configuration = capabilities;
self.set_state(StreamState::Configured);
Ok(())
}
pub fn reconfigure(
&mut self,
mut capabilities: Vec<ServiceCapability>,
) -> Result<(), (ServiceCategory, ErrorCode)> {
self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
return Err((cap.category(), ErrorCode::InvalidCapabilities));
}
let to_replace: std::vec::Vec<_> =
capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
self.configuration.retain(|x| {
let disc = std::mem::discriminant(x);
!to_replace.contains(&disc)
});
self.configuration.append(&mut capabilities);
self.update_callback();
Ok(())
}
pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
if self.configuration.is_empty() {
return None;
}
Some(&self.configuration)
}
const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
return Err(Error::InvalidState);
}
self.transport = Some(Arc::new(RwLock::new(c)));
self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
self.stream_held = Arc::new(Mutex::new(false));
self.set_state(StreamState::Open);
Ok(false)
}
pub fn establish(&mut self) -> Result<(), ErrorCode> {
if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
return Err(ErrorCode::BadState);
}
self.set_state(StreamState::Opening);
Ok(())
}
pub fn try_priority(&self, active: bool) {
let priority = match (active, &self.endpoint_type) {
(false, _) => A2dpDirection::Normal,
(true, EndpointType::Source) => A2dpDirection::Source,
(true, EndpointType::Sink) => A2dpDirection::Sink,
};
let fut = match self.transport.as_ref().unwrap().try_read() {
Err(_) => return,
Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
};
Task::spawn(fut).detach();
}
pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
if self.endpoint_type != EndpointType::Source {
return;
}
let fut = match self.transport.as_ref().unwrap().try_write() {
Err(_) => return,
Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
};
Task::spawn(fut).detach();
}
pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
{
let lock = self.state.lock();
if *lock != StreamState::Open && *lock != StreamState::Streaming {
return responder.reject(ErrorCode::BadState);
}
}
self.set_state(StreamState::Closing);
responder.send()?;
let release_wait_fut = {
let seid = self.remote_id.take().unwrap();
let transport = self.transport.take().unwrap();
let peer = peer.clone();
let state = self.state.clone();
async move {
let Ok(transport) = transport.try_read() else {
warn!("unable to lock transport channel, dropping and assuming closed");
*state.lock() = StreamState::Idle;
return;
};
let closed_fut = transport
.closed()
.on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
Err(Status::TIMED_OUT)
});
if let Err(Status::TIMED_OUT) = closed_fut.await {
let _ = peer.abort(&seid).await;
*state.lock() = StreamState::Aborting;
drop(transport);
}
*state.lock() = StreamState::Idle;
}
};
self.in_progress = Some(Task::local(release_wait_fut));
self.configuration.clear();
self.update_callback();
Ok(())
}
pub fn state(&self) -> StreamState {
*self.state.lock()
}
pub fn start(&mut self) -> Result<(), ErrorCode> {
self.state_is(StreamState::Open)?;
self.try_priority(true);
self.set_state(StreamState::Streaming);
Ok(())
}
pub fn suspend(&mut self) -> Result<(), ErrorCode> {
self.state_is(StreamState::Streaming)?;
self.set_state(StreamState::Open);
self.try_priority(false);
Ok(())
}
pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
if let Some(seid) = self.remote_id.take() {
let _ = peer.abort(&seid).await;
self.set_state(StreamState::Aborting);
}
self.abort()
}
pub fn abort(&mut self) {
self.set_state(StreamState::Aborting);
self.configuration.clear();
self.remote_id = None;
self.transport = None;
self.set_state(StreamState::Idle);
}
pub fn capabilities(&self) -> &Vec<ServiceCapability> {
&self.capabilities
}
pub fn codec_type(&self) -> Option<&MediaCodecType> {
self.capabilities.iter().find_map(|cap| match cap {
ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
_ => None,
})
}
pub fn local_id(&self) -> &StreamEndpointId {
&self.id
}
pub fn remote_id(&self) -> Option<&StreamEndpointId> {
self.remote_id.as_ref()
}
pub fn endpoint_type(&self) -> &EndpointType {
&self.endpoint_type
}
pub fn information(&self) -> StreamInformation {
let in_use = self.state_is(StreamState::Idle).is_err();
StreamInformation::new(
self.id.clone(),
in_use,
self.media_type.clone(),
self.endpoint_type.clone(),
)
}
pub fn take_transport(&mut self) -> Option<MediaStream> {
let mut stream_held = self.stream_held.lock();
if *stream_held || self.transport.is_none() {
return None;
}
*stream_held = true;
Some(MediaStream::new(
self.stream_held.clone(),
Arc::downgrade(self.transport.as_ref().unwrap()),
))
}
}
pub struct MediaStream {
in_use: Arc<Mutex<bool>>,
channel: Weak<RwLock<Channel>>,
}
impl MediaStream {
pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
Self { in_use, channel }
}
fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
self.channel
.upgrade()
.ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
}
pub fn max_tx_size(&self) -> Result<usize, io::Error> {
match self.try_upgrade()?.try_read() {
Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
Ok(lock) => Ok(lock.max_tx_size()),
}
}
}
impl Drop for MediaStream {
fn drop(&mut self) {
let mut l = self.in_use.lock();
*l = false;
}
}
impl Stream for MediaStream {
type Item = AvdtpResult<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let arc_chan = match self.try_upgrade() {
Err(_e) => return Poll::Ready(None),
Ok(c) => c,
};
let lock = match arc_chan.try_write() {
Err(_e) => return Poll::Ready(None),
Ok(lock) => lock,
};
let mut pin_chan = Pin::new(lock);
match pin_chan.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl io::AsyncWrite for MediaStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let arc_chan = match self.try_upgrade() {
Err(e) => return Poll::Ready(Err(e)),
Ok(c) => c,
};
let lock = match arc_chan.try_write() {
Err(_) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
}
Ok(lock) => lock,
};
let mut pin_chan = Pin::new(lock);
pin_chan.as_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let arc_chan = match self.try_upgrade() {
Err(e) => return Poll::Ready(Err(e)),
Ok(c) => c,
};
let lock = match arc_chan.try_write() {
Err(_) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
}
Ok(lock) => lock,
};
let mut pin_chan = Pin::new(lock);
pin_chan.as_mut().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let arc_chan = match self.try_upgrade() {
Err(e) => return Poll::Ready(Err(e)),
Ok(c) => c,
};
let lock = match arc_chan.try_write() {
Err(_) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
}
Ok(lock) => lock,
};
let mut pin_chan = Pin::new(lock);
pin_chan.as_mut().poll_close(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::{expect_remote_recv, setup_peer};
use crate::Request;
use assert_matches::assert_matches;
use fidl::endpoints::create_request_stream;
use futures::io::AsyncWriteExt;
use futures::stream::StreamExt;
use {
fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
fuchsia_async as fasync,
};
const REMOTE_ID_VAL: u8 = 1;
const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
#[test]
fn make() {
let s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
);
assert!(s.is_ok());
let s = s.unwrap();
assert_eq!(&StreamEndpointId(1), s.local_id());
let info = s.information();
assert!(!info.in_use());
let no = StreamEndpoint::new(
0,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
);
assert!(no.is_err());
}
fn establish_stream(s: &mut StreamEndpoint) -> Channel {
assert_matches!(s.establish(), Ok(()));
let (chan, remote) = Channel::create();
assert_matches!(s.receive_channel(chan), Ok(false));
remote
}
#[test]
fn from_info() {
let seid = StreamEndpointId::try_from(5).unwrap();
let info =
StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
let capabilities = vec![ServiceCapability::MediaTransport];
let endpoint = StreamEndpoint::from_info(&info, capabilities);
assert_eq!(&seid, endpoint.local_id());
assert_eq!(&false, endpoint.information().in_use());
assert_eq!(1, endpoint.capabilities().len());
}
#[test]
fn codec_type() {
let s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
],
)
.unwrap();
assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
let s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
)
.unwrap();
assert_eq!(None, s.codec_type());
}
fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
r#type,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], },
],
)
.unwrap()
}
#[test]
fn stream_configure_reconfigure() {
let _exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
assert_matches!(
s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
);
assert_matches!(
s.configure(
&REMOTE_ID,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
}
]
),
Ok(())
);
let _channel = establish_stream(&mut s);
assert_matches!(
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
Err((_, ErrorCode::BadState))
);
let reconfiguration = vec![ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
}];
let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
assert_eq!(Some(&new_configuration), s.get_configuration());
assert_matches!(
s.reconfigure(vec![ServiceCapability::MediaTransport]),
Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
);
assert_matches!(s.start(), Ok(()));
assert_matches!(
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
Err((_, ErrorCode::BadState))
);
assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
assert_matches!(s.suspend(), Ok(()));
assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
assert_matches!(
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
Err((_, ErrorCode::BadState))
);
}
#[test]
fn stream_establishment() {
let _exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
let (remote, transport) = Channel::create();
assert_matches!(s.establish(), Err(ErrorCode::BadState));
assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
let buf: &mut [u8] = &mut [0; 1];
assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
assert_matches!(s.establish(), Ok(()));
let (_remote, transport) = Channel::create();
assert_matches!(s.receive_channel(transport), Ok(false));
}
fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
let (peer, signaling) = setup_peer();
let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
let mut req_stream = peer.take_request_stream();
let mut req_fut = req_stream.next();
let complete = exec.run_until_stalled(&mut req_fut);
let responder = match complete {
Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
_ => panic!("Expected a close request"),
};
(peer, signaling, responder)
}
#[test]
fn stream_release_without_abort() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
let remote_transport = establish_stream(&mut s);
let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
s.release(responder, &peer).unwrap();
expect_remote_recv(&[0x42, 0x08], &signaling);
drop(remote_transport);
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
assert_eq!(s.state(), StreamState::Idle);
}
#[test]
fn test_mediastream() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
assert!(s.take_transport().is_none());
let remote_transport = establish_stream(&mut s);
let temp_stream = s.take_transport();
assert!(temp_stream.is_some());
assert!(s.take_transport().is_none());
drop(temp_stream);
let media_stream = s.take_transport();
assert!(media_stream.is_some());
let mut media_stream = media_stream.unwrap();
assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
let mut write_fut = media_stream.write(hearts);
assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
expect_remote_recv(hearts, &remote_transport);
let mut close_fut = media_stream.close();
assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
drop(s);
let mut result = vec![0];
assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
let mut next_fut = media_stream.next();
assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
assert_matches!(media_stream.max_tx_size(), Err(_));
}
#[test]
fn stream_release_with_abort() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
let remote_transport = establish_stream(&mut s);
let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
s.release(responder, &peer).unwrap();
expect_remote_recv(&[0x42, 0x08], &signaling);
let next = std::pin::pin!(signaling.next());
let received =
exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
assert_eq!(0x0A, received[1]);
let txlabel = received[0] & 0xF0;
assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
let _ = exec.run_singlethreaded(&mut remote_transport.closed());
while s.state() != StreamState::Idle {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
}
#[test]
fn start_and_suspend() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
assert_matches!(s.start(), Err(ErrorCode::BadState));
assert_matches!(s.suspend(), Err(ErrorCode::BadState));
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
assert_matches!(s.start(), Err(ErrorCode::BadState));
assert_matches!(s.suspend(), Err(ErrorCode::BadState));
assert_matches!(s.establish(), Ok(()));
assert_matches!(s.start(), Err(ErrorCode::BadState));
assert_matches!(s.suspend(), Err(ErrorCode::BadState));
let (remote, local) = zx::Socket::create_datagram();
let (client_end, mut direction_request_stream) =
create_request_stream::<bredr::AudioDirectionExtMarker>();
let ext = bredr::Channel {
socket: Some(local),
channel_mode: Some(fidl_bt::ChannelMode::Basic),
max_tx_sdu_size: Some(1004),
ext_direction: Some(client_end),
..Default::default()
};
let transport = Channel::try_from(ext).unwrap();
assert_matches!(s.receive_channel(transport), Ok(false));
assert_matches!(s.suspend(), Err(ErrorCode::BadState));
assert_matches!(s.start(), Ok(()));
match exec.run_until_stalled(&mut direction_request_stream.next()) {
Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
priority,
responder,
}))) => {
assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
responder.send(Ok(())).expect("response to send cleanly");
}
x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
};
assert_matches!(s.start(), Err(ErrorCode::BadState));
assert_matches!(s.suspend(), Ok(()));
match exec.run_until_stalled(&mut direction_request_stream.next()) {
Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
priority,
responder,
}))) => {
assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
responder.send(Ok(())).expect("response to send cleanly");
}
x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
};
assert_matches!(s.start(), Ok(()));
assert_matches!(s.suspend(), Ok(()));
let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
{
s.release(responder, &peer).unwrap();
expect_remote_recv(&[0x42, 0x08], &signaling);
drop(remote);
while s.state() != StreamState::Idle {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
}
assert_matches!(s.start(), Err(ErrorCode::BadState));
assert_matches!(s.suspend(), Err(ErrorCode::BadState));
}
fn receive_l2cap_params_channel(
s: &mut StreamEndpoint,
) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
assert_matches!(s.establish(), Ok(()));
let (remote, local) = zx::Socket::create_datagram();
let (client_end, l2cap_params_requests) =
create_request_stream::<bredr::L2capParametersExtMarker>();
let ext = bredr::Channel {
socket: Some(local),
channel_mode: Some(fidl_bt::ChannelMode::Basic),
max_tx_sdu_size: Some(1004),
ext_l2cap: Some(client_end),
..Default::default()
};
let transport = Channel::try_from(ext).unwrap();
assert_matches!(s.receive_channel(transport), Ok(false));
(remote, l2cap_params_requests)
}
#[test]
fn sets_flush_timeout_for_source_transports() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Source);
let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
request,
responder,
}))) => {
assert_eq!(
Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
request.flush_timeout
);
responder.send(&request).expect("response to send cleanly");
}
x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
};
}
#[test]
fn no_flush_timeout_for_sink_transports() {
let mut exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
Poll::Pending => {}
x => panic!("Expected no request to set flush timeout, got {:?}", x),
};
}
#[test]
fn get_configuration() {
let mut s = test_endpoint(EndpointType::Sink);
assert!(s.get_configuration().is_none());
let config = vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0),
codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
},
];
assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
match s.get_configuration() {
Some(c) => assert_eq!(&config, c),
x => panic!("Expected Ok from get_configuration but got {:?}", x),
};
s.abort();
assert!(s.get_configuration().is_none());
}
use std::sync::atomic::{AtomicUsize, Ordering};
fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_reader = call_count.clone();
let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
let _ = call_count.fetch_add(1, Ordering::SeqCst);
});
(Some(count_cb), call_count_reader)
}
#[test]
fn update_callback() {
let _exec = fasync::TestExecutor::new();
let mut s = test_endpoint(EndpointType::Sink);
let (cb, call_count) = call_count_callback();
s.set_update_callback(cb);
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
.expect("Configure to succeed in test");
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); s.establish().expect("Establish to succeed in test");
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); let (_, transport) = Channel::create();
assert_eq!(
s.receive_channel(transport).expect("Receive channel to succeed in test"),
false
);
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); s.start().expect("Start to succeed in test");
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); s.suspend().expect("Suspend to succeed in test");
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
call_count.store(0, Ordering::SeqCst); s.abort();
}
}