use fuchsia_async::{MonotonicInstant, TimeoutExt};
use fuchsia_bluetooth::types::Channel;
use futures::future::Ready;
use futures::stream::FilterMap;
use futures::{future, Stream, StreamExt};
use log::{info, trace};
use packet_encoding::{Decodable, Encodable};
use zx::MonotonicDuration;
#[cfg(test)]
mod tests;
mod types;
use crate::avctp::{
Command as AvctpCommand, CommandStream as AvctpCommandStream, Header as AvctpHeader,
Packet as AvctpPacket, Peer as AvctpPeer,
};
use crate::{Error, Result};
use self::types::BT_SIG_COMPANY_ID;
pub use self::types::{CommandType, Header, OpCode, PacketType, ResponseType, SubunitType};
pub type CommandStream = FilterMap<
AvctpCommandStream,
Ready<Option<Result<Command>>>,
fn(Result<AvctpCommand>) -> Ready<Option<Result<Command>>>,
>;
#[derive(Debug)]
pub struct Command {
inner: AvctpCommand,
avc_header: Header,
}
impl Command {
pub fn avctp_header(&self) -> &AvctpHeader {
self.inner.header()
}
pub fn avc_header(&self) -> &Header {
&self.avc_header
}
pub fn body(&self) -> &[u8] {
&self.inner.body()[self.avc_header.encoded_len()..]
}
pub fn send_response(&self, response_type: ResponseType, body: &[u8]) -> Result<()> {
let response_header = self.avc_header.create_response(response_type)?;
let mut rbuf = vec![0 as u8; response_header.encoded_len()];
response_header.encode(&mut rbuf[..])?;
if body.len() > 0 {
rbuf.extend_from_slice(body);
}
self.inner.send_response(rbuf.as_slice())
}
pub fn is_vendor_dependent(&self) -> bool {
self.avc_header.op_code() == &OpCode::VendorDependent
}
}
impl TryFrom<Result<AvctpCommand>> for Command {
type Error = Error;
fn try_from(value: Result<AvctpCommand>) -> Result<Command> {
let inner = match value {
Err(e) => return Err(e),
Ok(inner) => inner,
};
let avc_header = match Header::decode(inner.body()) {
Err(e) => return Err(e),
Ok(head) => head,
};
Ok(Command { inner, avc_header })
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CommandResponse(pub ResponseType, pub Vec<u8>);
impl CommandResponse {
pub fn response_type(&self) -> ResponseType {
return self.0;
}
pub fn response(&self) -> &[u8] {
return self.1.as_slice();
}
}
impl TryFrom<AvctpPacket> for CommandResponse {
type Error = Error;
fn try_from(value: AvctpPacket) -> Result<CommandResponse> {
let buf = value.body();
let avc_header = Header::decode(buf)?;
let body = buf[avc_header.encoded_len()..].to_vec();
if let PacketType::Response(response_type) = avc_header.packet_type() {
Ok(CommandResponse(response_type, body))
} else {
Err(Error::InvalidHeader)
}
}
}
#[derive(Debug)]
pub struct Peer {
inner: AvctpPeer,
}
impl Peer {
pub fn new(channel: Channel) -> Self {
Self { inner: AvctpPeer::new(channel) }
}
fn filter_internal_responses(
avct_command_result: Result<AvctpCommand>,
) -> Option<Result<Command>> {
let cmd = match Command::try_from(avct_command_result) {
Ok(cmd) => cmd,
Err(e) => return Some(Err(e)),
};
let avcth = cmd.avctp_header();
let avch = cmd.avc_header();
match (avcth.is_single(), avch.subunit_type(), avch.op_code()) {
(true, Some(SubunitType::Unit), &OpCode::UnitInfo) => {
trace!("received UNITINFO command");
let mut pbuf: [u8; 5] = [0xff; 5];
pbuf[0] = 0x07;
pbuf[1] = u8::from(&SubunitType::Panel) << 3;
pbuf[2] = 0xff;
pbuf[3] = 0xff;
pbuf[4] = 0xff;
match cmd.send_response(ResponseType::ImplementedStable, &pbuf) {
Err(e) => Some(Err(e)),
Ok(_) => None,
}
}
(true, Some(SubunitType::Unit), &OpCode::SubUnitInfo) => {
trace!("received SUBUNITINFO command");
let mut pbuf: [u8; 5] = [0xff; 5];
pbuf[0] = 0b111;
pbuf[1] = u8::from(&SubunitType::Panel) << 3;
match cmd.send_response(ResponseType::ImplementedStable, &pbuf) {
Err(e) => Some(Err(e)),
Ok(_) => None,
}
}
(_, Some(SubunitType::Panel), &OpCode::Passthrough)
| (_, Some(SubunitType::Panel), &OpCode::VendorDependent) => Some(Ok(cmd)),
_ => {
info!("received invalid command");
match cmd.send_response(ResponseType::NotImplemented, &[]) {
Err(e) => Some(Err(e)),
Ok(_) => None,
}
}
}
}
pub fn take_command_stream(&self) -> CommandStream {
self.inner
.take_command_stream()
.filter_map(|avct_command| future::ready(Self::filter_internal_responses(avct_command)))
}
fn passthrough_command_timeout() -> MonotonicDuration {
const CMD_TIMER_MS: i64 = 1000;
MonotonicDuration::from_millis(CMD_TIMER_MS)
}
pub fn send_vendor_dependent_command<'a>(
&'a self,
command_type: CommandType,
payload: &'a [u8],
) -> Result<impl Stream<Item = Result<CommandResponse>>> {
let avc_header = Header::new(
command_type,
u8::from(&SubunitType::Panel),
0,
OpCode::VendorDependent,
Some(BT_SIG_COMPANY_ID),
);
let avc_h_len = avc_header.encoded_len();
let mut buf = vec![0; avc_h_len];
avc_header.encode(&mut buf[..])?;
buf.extend_from_slice(payload);
let stream = self.inner.send_command(buf.as_slice())?;
let stream = stream.map(|resp| CommandResponse::try_from(resp?));
Ok(stream)
}
pub async fn send_avc_passthrough_command<'a>(
&'a self,
payload: &'a [u8],
) -> Result<CommandResponse> {
let avc_header = Header::new(
CommandType::Control,
u8::from(&SubunitType::Panel),
0,
OpCode::Passthrough,
Some(BT_SIG_COMPANY_ID),
);
let avc_h_len = avc_header.encoded_len();
let mut buf = vec![0; avc_h_len];
avc_header.encode(&mut buf[..])?;
buf.extend_from_slice(payload);
let mut response_stream = self.inner.send_command(buf.as_slice())?;
let timeout = MonotonicInstant::after(Peer::passthrough_command_timeout());
loop {
if let Some(resp) = response_stream
.next()
.on_timeout(timeout, || return Some(Err(Error::Timeout)))
.await
{
let value = CommandResponse::try_from(resp?)?;
if value.0 == ResponseType::Interim {
continue;
}
return Ok(value);
} else {
return Err(Error::PeerDisconnected);
}
}
}
}