use anyhow::{format_err, Context as _, Error};
use fidl::endpoints::ClientEnd;
use fidl_fuchsia_media::*;
use fidl_fuchsia_mediacodec::*;
use fidl_fuchsia_sysmem2::*;
use fuchsia_stream_processors::*;
use fuchsia_sync::{Mutex, RwLock};
use futures::future::{maybe_done, MaybeDone};
use futures::io::{self, AsyncWrite};
use futures::stream::{FusedStream, Stream};
use futures::task::{Context, Poll, Waker};
use futures::{ready, Future, StreamExt};
use log::{trace, warn};
use std::collections::{HashSet, VecDeque};
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use crate::buffer_collection_constraints::buffer_collection_constraints_default;
use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, format_err!("Fidl Error: {}", e))
}
#[derive(Debug)]
enum Listener {
None,
New,
Some(Waker),
}
impl Listener {
fn register(&mut self, waker: Waker) {
*self = match mem::replace(self, Listener::None) {
Listener::None => panic!("Polled a listener with no pollers"),
_ => Listener::Some(waker),
};
}
fn wake(&mut self) {
if let Listener::None = self {
return;
}
match mem::replace(self, Listener::New) {
Listener::None => panic!("Should have been polled"),
Listener::Some(waker) => waker.wake(),
Listener::New => {}
}
}
fn waker(&self) -> Option<&Waker> {
if let Listener::Some(ref waker) = self {
Some(waker)
} else {
None
}
}
}
impl Default for Listener {
fn default() -> Self {
Listener::None
}
}
struct OutputQueue {
listener: Listener,
queue: VecDeque<Packet>,
ended: bool,
}
impl OutputQueue {
fn enqueue(&mut self, packet: Packet) {
self.queue.push_back(packet);
self.listener.wake();
}
fn mark_ended(&mut self) {
self.ended = true;
self.listener.wake();
}
fn waker(&self) -> Option<&Waker> {
self.listener.waker()
}
fn wake(&mut self) {
self.listener.wake();
}
}
impl Default for OutputQueue {
fn default() -> Self {
OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
}
}
impl Stream for OutputQueue {
type Item = Packet;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.queue.pop_front() {
Some(packet) => Poll::Ready(Some(packet)),
None if self.ended => Poll::Ready(None),
None => {
self.listener.register(cx.waker().clone());
Poll::Pending
}
}
}
}
const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
struct InputBufferIndex(u32);
struct StreamProcessorInner {
processor: StreamProcessorProxy,
sysmem_client: AllocatorProxy,
events: StreamProcessorEventStream,
input_packet_size: u64,
client_owned: HashSet<InputBufferIndex>,
input_cursor: Option<(InputBufferIndex, u64)>,
output_queue: Mutex<OutputQueue>,
input_waker: Option<Waker>,
input_allocation: MaybeDone<SysmemAllocation>,
output_allocation: MaybeDone<SysmemAllocation>,
}
impl StreamProcessorInner {
fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
match evt {
StreamProcessorEvent::OnInputConstraints { input_constraints } => {
let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
let buffer_constraints =
Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
let processor = self.processor.clone();
let mut partial_settings = Self::partial_settings();
let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
partial_settings.sysmem_token =
Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
token.into_channel(),
));
if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
warn!("Couldn't set input buffer settings: {:?}", e);
}
};
self.input_allocation = maybe_done(SysmemAllocation::allocate(
self.sysmem_client.clone(),
BufferName { name: "StreamProcessorInput", priority: 1 },
None,
buffer_constraints,
token_fn,
)?);
}
StreamProcessorEvent::OnOutputConstraints { output_config } => {
let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
if !output_constraints.buffer_constraints_action_required {
return Ok(());
}
let buffer_constraints =
Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
let processor = self.processor.clone();
let mut partial_settings = Self::partial_settings();
let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
partial_settings.sysmem_token =
Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
token.into_channel(),
));
if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
warn!("Couldn't set output buffer settings: {:?}", e);
}
};
self.output_allocation = maybe_done(SysmemAllocation::allocate(
self.sysmem_client.clone(),
BufferName { name: "StreamProcessorOutput", priority: 1 },
None,
buffer_constraints,
token_fn,
)?);
}
StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
let mut lock = self.output_queue.lock();
lock.enqueue(output_packet);
}
StreamProcessorEvent::OnFreeInputPacket {
free_input_packet: PacketHeader { packet_index: Some(idx), .. },
} => {
if !self.client_owned.insert(InputBufferIndex(idx)) {
warn!("Freed an input packet that was already freed: {:?}", idx);
}
self.setup_input_cursor();
}
StreamProcessorEvent::OnOutputEndOfStream { .. } => {
let mut lock = self.output_queue.lock();
lock.mark_ended();
}
StreamProcessorEvent::OnOutputFormat { .. } => {}
e => trace!("Unhandled stream processor event: {:?}", e),
}
Ok(())
}
fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
match ready!(self.events.poll_next_unpin(cx)) {
Some(Err(e)) => Poll::Ready(Err(e.into())),
Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
None => Poll::Ready(Err(format_err!("Client disconnected"))),
}
}
fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
BufferCollectionConstraints {
buffer_memory_constraints: Some(BufferMemoryConstraints {
min_size_bytes: Some(min_buffer_size as u64),
..Default::default()
}),
..buffer_collection_constraints_default()
}
}
fn partial_settings() -> StreamBufferPartialSettings {
StreamBufferPartialSettings {
buffer_lifetime_ordinal: Some(1),
buffer_constraints_version_ordinal: Some(1),
sysmem_token: None,
..Default::default()
}
}
fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
Pin::new(&mut self.input_allocation)
.output_mut()
.expect("allocation completed")
.as_mut()
.expect("succcessful allocation")
}
fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
Pin::new(&mut self.output_allocation)
.output_mut()
.expect("allocation completed")
.as_mut()
.expect("succcessful allocation")
}
fn input_allocation_complete(&mut self) -> Result<(), Error> {
let _ = Pin::new(&mut self.input_allocation)
.output_mut()
.ok_or_else(|| format_err!("allocation isn't complete"))?;
let settings = self.input_buffers().settings();
self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
let buffer_count = self.input_buffers().len();
for i in 0..buffer_count {
let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
}
self.setup_input_cursor();
Ok(())
}
fn output_allocation_complete(&mut self) -> Result<(), Error> {
let _ = Pin::new(&mut self.output_allocation)
.output_mut()
.ok_or_else(|| format_err!("allocation isn't complete"))?;
self.processor
.complete_output_buffer_partial_settings(1)
.context("setting output buffer settings")?;
Ok(())
}
fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
if let MaybeDone::Future(_) = self.input_allocation {
match Pin::new(&mut self.input_allocation).poll(cx) {
Poll::Ready(()) => {
if let Err(e) = self.input_allocation_complete() {
return Poll::Ready(Err(e));
}
}
Poll::Pending => {}
};
}
if let MaybeDone::Future(_) = self.output_allocation {
match Pin::new(&mut self.output_allocation).poll(cx) {
Poll::Ready(()) => {
if let Err(e) = self.output_allocation_complete() {
return Poll::Ready(Err(e));
}
}
Poll::Pending => {}
};
}
Poll::Pending
}
fn waiting_waker(&self) -> Option<Waker> {
match (self.output_queue.lock().waker(), &self.input_waker) {
(None, None) => None,
(Some(waker), _) => Some(waker.clone()),
(_, Some(waker)) => Some(waker.clone()),
}
}
fn poll_events(&mut self) -> Result<(), Error> {
let waker = loop {
let waker = match self.waiting_waker() {
None => return Ok(()),
Some(waker) => waker,
};
match self.process_event(&mut Context::from_waker(&waker)) {
Poll::Pending => break waker,
Poll::Ready(Err(e)) => {
warn!("Stream processing error: {:?}", e);
return Err(e.into());
}
Poll::Ready(Ok(())) => {}
}
};
if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
warn!("Stream buffer allocation error: {:?}", e);
return Err(e.into());
}
Ok(())
}
fn wake_output(&mut self) {
self.output_queue.lock().wake();
}
fn wake_input(&mut self) {
if let Some(w) = self.input_waker.take() {
w.wake();
}
}
fn setup_input_cursor(&mut self) {
if self.input_cursor.is_some() {
return;
}
let next_idx = match self.client_owned.iter().next() {
None => return,
Some(idx) => idx.clone(),
};
let _ = self.client_owned.remove(&next_idx);
self.input_cursor = Some((next_idx, 0));
self.wake_input();
}
fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
let packet = ValidPacket::try_from(packet)?;
let output_size = packet.valid_length_bytes as usize;
let offset = packet.start_offset as u64;
let mut output = vec![0; output_size];
let buf_idx = packet.buffer_index;
let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
vmo.read(&mut output, offset)?;
self.processor.recycle_output_packet(&packet.header.into())?;
Ok(output)
}
}
pub struct StreamProcessor {
inner: Arc<RwLock<StreamProcessorInner>>,
}
pub struct StreamProcessorOutputStream {
inner: Arc<RwLock<StreamProcessorInner>>,
}
impl StreamProcessor {
fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
let events = processor.take_event_stream();
Self {
inner: Arc::new(RwLock::new(StreamProcessorInner {
processor,
sysmem_client,
events,
input_packet_size: 0,
client_owned: HashSet::new(),
input_cursor: None,
output_queue: Default::default(),
input_waker: None,
input_allocation: maybe_done(SysmemAllocation::pending()),
output_allocation: maybe_done(SysmemAllocation::pending()),
})),
}
}
pub fn create_encoder(
input_domain: DomainFormat,
encoder_settings: EncoderSettings,
) -> Result<StreamProcessor, Error> {
let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
.context("Connecting to sysmem")?;
let format_details = FormatDetails {
domain: Some(input_domain),
encoder_settings: Some(encoder_settings),
format_details_version_ordinal: Some(1),
mime_type: Some("audio/pcm".to_string()),
oob_bytes: None,
pass_through_parameters: None,
timebase: None,
..Default::default()
};
let encoder_params = CreateEncoderParams {
input_details: Some(format_details),
require_hw: Some(false),
..Default::default()
};
let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
.context("Failed to connect to Codec Factory")?;
let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
Ok(StreamProcessor::create(processor, sysmem_client))
}
pub fn create_decoder(
mime_type: &str,
oob_bytes: Option<Vec<u8>>,
) -> Result<StreamProcessor, Error> {
let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
.context("Connecting to sysmem")?;
let format_details = FormatDetails {
mime_type: Some(mime_type.to_string()),
oob_bytes: oob_bytes,
format_details_version_ordinal: Some(1),
encoder_settings: None,
domain: None,
pass_through_parameters: None,
timebase: None,
..Default::default()
};
let decoder_params = CreateDecoderParams {
input_details: Some(format_details),
permit_lack_of_split_header_handling: Some(true),
..Default::default()
};
let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
.context("Failed to connect to Codec Factory")?;
let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
Ok(StreamProcessor::create(processor, sysmem_client))
}
pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
{
let read = self.inner.read();
let mut lock = read.output_queue.lock();
if let Listener::None = lock.listener {
lock.listener = Listener::New;
} else {
return Err(format_err!("Output stream already taken"));
}
}
Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
}
fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
let mut bytes_idx = 0;
while bytes.len() > bytes_idx {
{
let mut write = self.inner.write();
let (idx, size) = match write.input_cursor.take() {
None => return Ok(bytes_idx),
Some(x) => x,
};
let space_left = write.input_packet_size - size;
let left_to_write = bytes.len() - bytes_idx;
let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
if space_left as usize > left_to_write {
let write_buf = &bytes[bytes_idx..];
let write_len = write_buf.len();
buffer_vmo.write(write_buf, size)?;
bytes_idx += write_len;
write.input_cursor = Some((idx, size + write_len as u64));
assert!(bytes.len() == bytes_idx);
return Ok(bytes_idx);
}
let end_idx = bytes_idx + space_left as usize;
let write_buf = &bytes[bytes_idx..end_idx];
let write_len = write_buf.len();
buffer_vmo.write(write_buf, size)?;
bytes_idx += write_len;
assert_eq!(size + write_len as u64, write.input_packet_size);
write.input_cursor = Some((idx, write.input_packet_size));
}
self.send_packet()?;
}
Ok(bytes_idx)
}
pub fn send_packet(&mut self) -> Result<(), io::Error> {
let mut write = self.inner.write();
if write.input_cursor.is_none() {
return Ok(());
}
let (idx, size) = write.input_cursor.take().expect("input cursor is none");
if size == 0 {
write.input_cursor = Some((idx, size));
return Ok(());
}
let packet = Packet {
header: Some(PacketHeader {
buffer_lifetime_ordinal: Some(1),
packet_index: Some(idx.0),
..Default::default()
}),
buffer_index: Some(idx.0),
stream_lifetime_ordinal: Some(1),
start_offset: Some(0),
valid_length_bytes: Some(size as u32),
start_access_unit: Some(true),
known_end_access_unit: Some(true),
..Default::default()
};
write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
write.setup_input_cursor();
Ok(())
}
fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let mut write = self.inner.write();
write.input_waker = None;
if write.input_cursor.is_some() {
return Poll::Ready(Ok(()));
}
write.input_waker = Some(cx.waker().clone());
if let Err(e) = write.poll_events() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e)));
}
Poll::Pending
}
pub fn close(&mut self) -> Result<(), io::Error> {
self.send_packet()?;
let mut write = self.inner.write();
write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
write.input_cursor = None;
write.wake_input();
write.wake_output();
Ok(())
}
}
impl AsyncWrite for StreamProcessor {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(self.poll_writable(cx))?;
match self.write_bytes(buf) {
Ok(written) => Poll::Ready(Ok(written)),
Err(e) => Poll::Ready(Err(e.into())),
}
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.send_packet())
}
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.send_packet())
}
}
impl Stream for StreamProcessorOutputStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut write = self.inner.write();
let packet = {
let mut queue = write.output_queue.lock();
match queue.poll_next_unpin(cx) {
Poll::Ready(Some(packet)) => Some(Some(packet)),
Poll::Ready(None) => Some(None),
Poll::Pending => {
None
}
}
};
if let Err(e) = write.poll_events() {
return Poll::Ready(Some(Err(e.into())));
}
match packet {
Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
Some(None) => Poll::Ready(None),
None => Poll::Pending,
}
}
}
impl FusedStream for StreamProcessorOutputStream {
fn is_terminated(&self) -> bool {
self.inner.read().output_queue.lock().ended
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_test_helpers::run_while;
use byteorder::{ByteOrder, NativeEndian};
use fixture::fixture;
use fuchsia_async as fasync;
use futures::io::AsyncWriteExt;
use futures::FutureExt;
use futures_test::task::new_count_waker;
use sha2::{Digest as _, Sha256};
use std::fs::File;
use std::io::{Read, Write};
use std::pin::pin;
use stream_processor_test::ExpectedDigest;
const PCM_SAMPLE_SIZE: usize = 2;
#[derive(Clone, Debug)]
pub struct PcmAudio {
pcm_format: PcmFormat,
buffer: Vec<u8>,
}
impl PcmAudio {
pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
const FREQUENCY: f32 = 20.0;
const AMPLITUDE: f32 = 0.2;
let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
let samples_per_frame = pcm_format.channel_map.len();
let sample_count = frame_count * samples_per_frame;
let mut buffer = vec![0; frame_count * pcm_frame_size];
for i in 0..sample_count {
let frame = (i / samples_per_frame) as f32;
let value =
((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
let sample = (value * i16::max_value() as f32) as i16;
let mut sample_bytes = [0; std::mem::size_of::<i16>()];
NativeEndian::write_i16(&mut sample_bytes, sample);
let offset = i * PCM_SAMPLE_SIZE;
buffer[offset] = sample_bytes[0];
buffer[offset + 1] = sample_bytes[1];
}
Self { pcm_format, buffer }
}
pub fn frame_size(&self) -> usize {
self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
}
}
pub struct BytesValidator {
pub output_file: Option<&'static str>,
pub expected_digest: ExpectedDigest,
}
impl BytesValidator {
fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
let mut hasher = Sha256::default();
file.write_all(&bytes)?;
hasher.update(&bytes);
let digest: [u8; 32] = hasher.finalize().into();
if self.expected_digest.bytes != digest {
return Err(format_err!(
"Expected {}; got {}",
self.expected_digest,
hex::encode(digest)
))
.into();
}
Ok(())
}
fn output_file(&self) -> Result<impl Write, Error> {
Ok(if let Some(file) = self.output_file {
Box::new(std::fs::File::create(file)?) as Box<dyn Write>
} else {
Box::new(std::io::sink()) as Box<dyn Write>
})
}
fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
self.write_and_hash(self.output_file()?, &bytes)
}
}
#[fuchsia::test]
fn encode_sbc() {
let mut exec = fasync::TestExecutor::new();
let pcm_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: 44100,
channel_map: vec![AudioChannelId::Cf],
};
let sub_bands = SbcSubBands::SubBands4;
let block_count = SbcBlockCount::BlockCount8;
let input_frames = 3000;
let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
sub_bands,
block_count,
allocation: SbcAllocation::AllocLoudness,
channel_mode: SbcChannelMode::Mono,
bit_pool: 59, });
let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
AudioUncompressedFormat::Pcm(pcm_format),
));
let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
.expect("to create Encoder");
let frames_per_packet: usize = 8; let packet_size = pcm_audio.frame_size() * frames_per_packet;
let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
let first_packet = packets.next().unwrap();
let written =
exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
assert_eq!(written, first_packet.len());
let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
assert!(encoder.take_output_stream().is_err());
let encoded_fut = pin!(encoded_stream.next());
let (waker, encoder_fut_wake_count) = new_count_waker();
let mut counting_ctx = Context::from_waker(&waker);
assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
for packet in packets {
let mut written_fut = encoder.write(&packet);
let written_bytes =
exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
assert_eq!(packet.len(), written_bytes);
frames_sent += packet.len() / pcm_audio.frame_size();
}
encoder.close().expect("stream should always be closable");
assert_eq!(input_frames, frames_sent);
let woke_count = encoder_fut_wake_count.get();
while encoder_fut_wake_count.get() == woke_count {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
let mut encoded = Vec::new();
loop {
let mut encoded_fut = encoded_stream.next();
match exec.run_singlethreaded(&mut encoded_fut) {
Some(Ok(enc_data)) => {
assert!(!enc_data.is_empty());
encoded.extend_from_slice(&enc_data);
}
Some(Err(e)) => {
panic!("Unexpected error when polling encoded data: {}", e);
}
None => {
break;
}
}
}
let expected_digest = ExpectedDigest::new(
"Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
"5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
);
let hash_validator = BytesValidator { output_file: None, expected_digest };
assert_eq!(6110, encoded.len(), "Encoded size should be equal");
let validated = hash_validator.validate(encoded.as_slice());
assert!(validated.is_ok(), "Failed hash: {:?}", validated);
}
fn fix_sbc_test_file<F>(_name: &str, test: F)
where
F: FnOnce(Vec<u8>) -> (),
{
const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
let mut sbc_data = Vec::new();
let _ = File::open(SBC_TEST_FILE)
.expect("open test file")
.read_to_end(&mut sbc_data)
.expect("read test file");
test(sbc_data)
}
#[fixture(fix_sbc_test_file)]
#[fuchsia::test]
fn decode_sbc(sbc_data: Vec<u8>) {
let mut exec = fasync::TestExecutor::new();
const SBC_FRAME_SIZE: usize = 72;
const INPUT_FRAMES: usize = 23;
let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
let mut decoder =
StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
assert!(decoder.take_output_stream().is_err());
let mut frames_sent = 0;
let frames_per_packet: usize = 1; let packet_size = SBC_FRAME_SIZE * frames_per_packet;
for frames in sbc_data.as_slice().chunks(packet_size) {
let mut written_fut = decoder.write(&frames);
let written_bytes =
exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
assert_eq!(frames.len(), written_bytes);
frames_sent += frames.len() / SBC_FRAME_SIZE;
}
assert_eq!(INPUT_FRAMES, frames_sent);
let mut flush_fut = pin!(decoder.flush());
exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
decoder.close().expect("stream should always be closable");
let mut decoded = Vec::new();
loop {
let mut decoded_fut = decoded_stream.next();
match exec.run_singlethreaded(&mut decoded_fut) {
Some(Ok(dec_data)) => {
assert!(!dec_data.is_empty());
decoded.extend_from_slice(&dec_data);
}
Some(Err(e)) => {
panic!("Unexpected error when polling decoded data: {}", e);
}
None => {
break;
}
}
}
let expected_digest = ExpectedDigest::new(
"Pcm: 44.1kHz/16bit/Mono",
"ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
);
let hash_validator = BytesValidator { output_file: None, expected_digest };
assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
let validated = hash_validator.validate(decoded.as_slice());
assert!(validated.is_ok(), "Failed hash: {:?}", validated);
}
#[fixture(fix_sbc_test_file)]
#[fuchsia::test]
fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
let mut exec = fasync::TestExecutor::new();
const SBC_FRAME_SIZE: usize = 72;
let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
let mut decoder =
StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
let next_frame = chunks.next().unwrap();
let written =
exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
assert_eq!(written, next_frame.len());
let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
let decoded_fut = pin!(decoded_stream.next());
let (waker, decoder_fut_wake_count) = new_count_waker();
let mut counting_ctx = Context::from_waker(&waker);
assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
let frame = chunks.next().unwrap();
let mut written_fut = decoder.write(&frame);
let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
assert_eq!(frame.len(), written_bytes);
let mut flush_fut = pin!(decoder.flush());
exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
assert_eq!(decoder_fut_wake_count.get(), 0);
while decoder_fut_wake_count.get() == 0 {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
assert_eq!(decoder_fut_wake_count.get(), 1);
let mut decoded = Vec::new();
let mut decoded_fut = decoded_stream.next();
match exec.run_singlethreaded(&mut decoded_fut) {
Some(Ok(dec_data)) => {
assert!(!dec_data.is_empty());
decoded.extend_from_slice(&dec_data);
}
x => panic!("Expected decoded frame, got {:?}", x),
}
assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
}
#[fixture(fix_sbc_test_file)]
#[fuchsia::test]
fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
let mut exec = fasync::TestExecutor::new();
const SBC_FRAME_SIZE: usize = 72;
let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
let mut decoder =
StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
let decoded_fut = pin!(decoded_stream.next());
let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
let next_frame = chunks.next().unwrap();
let (written_res, mut decoded_fut) =
run_while(&mut exec, decoded_fut, decoder.write(next_frame));
assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
let (waker, write_fut_wake_count) = new_count_waker();
let mut counting_ctx = Context::from_waker(&waker);
let mut wake_count_before_stall = 0;
for frame in chunks {
wake_count_before_stall = write_fut_wake_count.get();
let mut written_fut = decoder.write(&frame);
if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
if write_fut_wake_count.get() != wake_count_before_stall {
continue;
}
break;
}
let mut flush_fut = pin!(decoder.flush());
exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
}
let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
for frame in chunks {
wake_count_before_stall = write_fut_wake_count.get();
let mut written_fut = decoder.write(&frame);
if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
if write_fut_wake_count.get() != wake_count_before_stall {
continue;
}
break;
}
let mut flush_fut = pin!(decoder.flush());
exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
}
while write_fut_wake_count.get() == wake_count_before_stall {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
}
}