1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45#![allow(clippy::large_futures)]
67use crate::elementary_stream::*;
8use crate::output_validator::*;
9use crate::stream::*;
10use crate::stream_runner::*;
11use crate::{FatalError, Result};
12use anyhow::Context as _;
13use fidl_fuchsia_media::StreamProcessorProxy;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::TryStreamExt;
17use std::rc::Rc;
1819pub enum OutputSize {
20// Size of output in terms of packets.
21PacketCount(usize),
22// Size of output in terms of number of raw bytes.
23RawBytesCount(usize),
24}
2526const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;
2728pub type TestCaseOutputs = Vec<Output>;
2930pub trait StreamProcessorFactory {
31fn connect_to_stream_processor(
32&self,
33 stream: &dyn ElementaryStream,
34 format_details_version_ordinal: u64,
35 ) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
36}
3738/// A test spec describes all the cases that will run and the circumstances in which
39/// they will run.
40pub struct TestSpec {
41pub cases: Vec<TestCase>,
42pub relation: CaseRelation,
43pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
44}
4546/// A case relation describes the temporal relationship between two test cases.
47pub enum CaseRelation {
48/// With serial relation, test cases will be run in sequence using the same codec server.
49 /// For serial relation, outputs from test cases will be returned.
50Serial,
51/// With concurrent relation, test cases will run concurrently using two or more codec servers.
52 /// For concurrent relation, outputs from test cases will not be returned.
53Concurrent,
54}
5556/// A test cases describes a sequence of elementary stream chunks that should be fed into a codec
57/// server, and a set of validators to check the output. To pass, all validations must pass for all
58/// output from the stream.
59pub struct TestCase {
60pub name: &'static str,
61pub stream: Rc<dyn ElementaryStream>,
62pub validators: Vec<Rc<dyn OutputValidator>>,
63pub stream_options: Option<StreamOptions>,
64}
6566impl TestSpec {
67pub async fn run(self) -> Result<Option<Vec<TestCaseOutputs>>> {
68let res = match self.relation {
69 CaseRelation::Serial => {
70Some(run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await?)
71 }
72 CaseRelation::Concurrent => {
73 run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await?;
74None
75}
76 };
77Ok(res)
78 }
79}
8081async fn run_cases_serially(
82 stream_processor_factory: &dyn StreamProcessorFactory,
83 cases: Vec<TestCase>,
84) -> Result<Vec<TestCaseOutputs>> {
85let stream_processor =
86if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
87 stream_processor_factory
88 .connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
89 .await?
90} else {
91return Err(FatalError(String::from("No test cases provided.")).into());
92 };
93let mut stream_runner = StreamRunner::new(stream_processor);
9495let mut all_outputs = Vec::new();
96for case in cases {
97let output = stream_runner
98 .run_stream(case.stream, case.stream_options.unwrap_or_default())
99 .await
100.context(format!("Running case {}", case.name))?;
101for validator in case.validators {
102 validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
103 }
104 all_outputs.push(output);
105 }
106Ok(all_outputs)
107}
108109async fn run_cases_concurrently(
110 stream_processor_factory: &dyn StreamProcessorFactory,
111 cases: Vec<TestCase>,
112) -> Result<()> {
113let mut unordered = FuturesUnordered::new();
114for case in cases {
115 unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
116 }
117118while let Some(_) = unordered.try_next().await? {}
119120Ok(())
121}
122123pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
124// The TestSpec futures are too big to fit on Fuchsia's default stack.
125const MEGABYTE: usize = 1024 * 1024;
126const STACK_SIZE: usize = 4 * MEGABYTE;
127 std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
128}