stream_processor_test/
stream_runner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::buffer_set::*;
use crate::elementary_stream::*;
use crate::input_packet_stream::*;
use crate::output_validator::*;
use crate::stream::*;
use crate::Result;
use fidl_fuchsia_media::*;
use futures::TryStreamExt;
use std::rc::Rc;
use tracing::debug;

/// Runs elementary streams through a stream processor.
pub struct StreamRunner {
    input_buffer_ordinals: OrdinalSequence,
    output_buffer_ordinals: OrdinalSequence,
    stream_lifetime_ordinals: OrdinalSequence,
    format_details_ordinals: OrdinalSequence,
    output_buffer_set: Option<BufferSet>,
    input_buffer_set: Option<BufferSet>,
    stream_processor: StreamProcessorProxy,
}

impl StreamRunner {
    pub fn new(stream_processor: StreamProcessorProxy) -> Self {
        Self {
            input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
            output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
            stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
            format_details_ordinals: OrdinalPattern::All.into_iter(),
            input_buffer_set: None,
            output_buffer_set: None,
            stream_processor,
        }
    }

    pub async fn run_stream(
        &mut self,
        stream: Rc<dyn ElementaryStream>,
        options: StreamOptions,
    ) -> Result<Vec<Output>> {
        let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
        let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);

        debug!(%stream_lifetime_ordinal, %format_details_version_ordinal, "Starting a stream");

        let mut events = self.stream_processor.take_event_stream();

        let output = {
            let mut stream = Stream {
                format_details_version_ordinal,
                stream_lifetime_ordinal,
                input_buffer_ordinals: &mut self.input_buffer_ordinals,
                input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
                    InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
                }),
                output_buffer_ordinals: &mut self.output_buffer_ordinals,
                output_buffer_set: self.output_buffer_set.take(),
                current_output_format: None,
                stream_processor: &mut self.stream_processor,
                stream: stream.as_ref(),
                options: options.clone(),
                output: vec![],
            };

            stream.start().await?;

            let channel_closed = loop {
                let Some(event) = events.try_next().await? else {
                    break true;
                };
                #[allow(clippy::large_futures)]
                let control_flow = stream.handle_event(event).await?;
                match control_flow {
                    StreamControlFlow::Continue => {}
                    StreamControlFlow::Stop => break false,
                };
            };

            let mut output = stream.output;
            if channel_closed {
                output.push(Output::CodecChannelClose);
            }

            self.input_buffer_set =
                stream.input_packet_stream.map(|stream| stream.take_buffer_set());
            self.output_buffer_set = stream.output_buffer_set;

            output
        };

        if options.release_input_buffers_at_end {
            self.input_buffer_set = None;
        }

        if options.release_output_buffers_at_end {
            self.output_buffer_set = None;
        }

        Ok(output)
    }
}