stream_processor_test/
test_spec.rs#![allow(clippy::large_futures)]
use crate::elementary_stream::*;
use crate::output_validator::*;
use crate::stream::*;
use crate::stream_runner::*;
use crate::{FatalError, Result};
use anyhow::Context as _;
use fidl_fuchsia_media::StreamProcessorProxy;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use std::rc::Rc;
pub enum OutputSize {
PacketCount(usize),
RawBytesCount(usize),
}
const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;
pub type TestCaseOutputs = Vec<Output>;
pub trait StreamProcessorFactory {
fn connect_to_stream_processor(
&self,
stream: &dyn ElementaryStream,
format_details_version_ordinal: u64,
) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
}
pub struct TestSpec {
pub cases: Vec<TestCase>,
pub relation: CaseRelation,
pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
}
pub enum CaseRelation {
Serial,
Concurrent,
}
pub struct TestCase {
pub name: &'static str,
pub stream: Rc<dyn ElementaryStream>,
pub validators: Vec<Rc<dyn OutputValidator>>,
pub stream_options: Option<StreamOptions>,
}
impl TestSpec {
pub async fn run(self) -> Result<Option<Vec<TestCaseOutputs>>> {
let res = match self.relation {
CaseRelation::Serial => {
Some(run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await?)
}
CaseRelation::Concurrent => {
run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await?;
None
}
};
Ok(res)
}
}
async fn run_cases_serially(
stream_processor_factory: &dyn StreamProcessorFactory,
cases: Vec<TestCase>,
) -> Result<Vec<TestCaseOutputs>> {
let stream_processor =
if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
stream_processor_factory
.connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
.await?
} else {
return Err(FatalError(String::from("No test cases provided.")).into());
};
let mut stream_runner = StreamRunner::new(stream_processor);
let mut all_outputs = Vec::new();
for case in cases {
let output = stream_runner
.run_stream(case.stream, case.stream_options.unwrap_or_default())
.await
.context(format!("Running case {}", case.name))?;
for validator in case.validators {
validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
}
all_outputs.push(output);
}
Ok(all_outputs)
}
async fn run_cases_concurrently(
stream_processor_factory: &dyn StreamProcessorFactory,
cases: Vec<TestCase>,
) -> Result<()> {
let mut unordered = FuturesUnordered::new();
for case in cases {
unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
}
while let Some(_) = unordered.try_next().await? {}
Ok(())
}
pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
const MEGABYTE: usize = 1024 * 1024;
const STACK_SIZE: usize = 4 * MEGABYTE;
std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
}