stream_processor_test/
test_spec.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#![allow(clippy::large_futures)]

use crate::elementary_stream::*;
use crate::output_validator::*;
use crate::stream::*;
use crate::stream_runner::*;
use crate::{FatalError, Result};
use anyhow::Context as _;
use fidl_fuchsia_media::StreamProcessorProxy;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use std::rc::Rc;

pub enum OutputSize {
    // Size of output in terms of packets.
    PacketCount(usize),
    // Size of output in terms of number of raw bytes.
    RawBytesCount(usize),
}

const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;

pub type TestCaseOutputs = Vec<Output>;

pub trait StreamProcessorFactory {
    fn connect_to_stream_processor(
        &self,
        stream: &dyn ElementaryStream,
        format_details_version_ordinal: u64,
    ) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
}

/// A test spec describes all the cases that will run and the circumstances in which
/// they will run.
pub struct TestSpec {
    pub cases: Vec<TestCase>,
    pub relation: CaseRelation,
    pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
}

/// A case relation describes the temporal relationship between two test cases.
pub enum CaseRelation {
    /// With serial relation, test cases will be run in sequence using the same codec server.
    /// For serial relation, outputs from test cases will be returned.
    Serial,
    /// With concurrent relation, test cases will run concurrently using two or more codec servers.
    /// For concurrent relation, outputs from test cases will not be returned.
    Concurrent,
}

/// A test cases describes a sequence of elementary stream chunks that should be fed into a codec
/// server, and a set of validators to check the output. To pass, all validations must pass for all
/// output from the stream.
pub struct TestCase {
    pub name: &'static str,
    pub stream: Rc<dyn ElementaryStream>,
    pub validators: Vec<Rc<dyn OutputValidator>>,
    pub stream_options: Option<StreamOptions>,
}

impl TestSpec {
    pub async fn run(self) -> Result<Option<Vec<TestCaseOutputs>>> {
        let res = match self.relation {
            CaseRelation::Serial => {
                Some(run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await?)
            }
            CaseRelation::Concurrent => {
                run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await?;
                None
            }
        };
        Ok(res)
    }
}

async fn run_cases_serially(
    stream_processor_factory: &dyn StreamProcessorFactory,
    cases: Vec<TestCase>,
) -> Result<Vec<TestCaseOutputs>> {
    let stream_processor =
        if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
            stream_processor_factory
                .connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
                .await?
        } else {
            return Err(FatalError(String::from("No test cases provided.")).into());
        };
    let mut stream_runner = StreamRunner::new(stream_processor);

    let mut all_outputs = Vec::new();
    for case in cases {
        let output = stream_runner
            .run_stream(case.stream, case.stream_options.unwrap_or_default())
            .await
            .context(format!("Running case {}", case.name))?;
        for validator in case.validators {
            validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
        }
        all_outputs.push(output);
    }
    Ok(all_outputs)
}

async fn run_cases_concurrently(
    stream_processor_factory: &dyn StreamProcessorFactory,
    cases: Vec<TestCase>,
) -> Result<()> {
    let mut unordered = FuturesUnordered::new();
    for case in cases {
        unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
    }

    while let Some(_) = unordered.try_next().await? {}

    Ok(())
}

pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
    // The TestSpec futures are too big to fit on Fuchsia's default stack.
    const MEGABYTE: usize = 1024 * 1024;
    const STACK_SIZE: usize = 4 * MEGABYTE;
    std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
}