stream_processor_test/
stream_runner.rsuse crate::buffer_set::*;
use crate::elementary_stream::*;
use crate::input_packet_stream::*;
use crate::output_validator::*;
use crate::stream::*;
use crate::Result;
use fidl_fuchsia_media::*;
use futures::TryStreamExt;
use std::rc::Rc;
use tracing::debug;
pub struct StreamRunner {
input_buffer_ordinals: OrdinalSequence,
output_buffer_ordinals: OrdinalSequence,
stream_lifetime_ordinals: OrdinalSequence,
format_details_ordinals: OrdinalSequence,
output_buffer_set: Option<BufferSet>,
input_buffer_set: Option<BufferSet>,
stream_processor: StreamProcessorProxy,
}
impl StreamRunner {
pub fn new(stream_processor: StreamProcessorProxy) -> Self {
Self {
input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
format_details_ordinals: OrdinalPattern::All.into_iter(),
input_buffer_set: None,
output_buffer_set: None,
stream_processor,
}
}
pub async fn run_stream(
&mut self,
stream: Rc<dyn ElementaryStream>,
options: StreamOptions,
) -> Result<Vec<Output>> {
let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
debug!(%stream_lifetime_ordinal, %format_details_version_ordinal, "Starting a stream");
let mut events = self.stream_processor.take_event_stream();
let output = {
let mut stream = Stream {
format_details_version_ordinal,
stream_lifetime_ordinal,
input_buffer_ordinals: &mut self.input_buffer_ordinals,
input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
}),
output_buffer_ordinals: &mut self.output_buffer_ordinals,
output_buffer_set: self.output_buffer_set.take(),
current_output_format: None,
stream_processor: &mut self.stream_processor,
stream: stream.as_ref(),
options: options.clone(),
output: vec![],
};
stream.start().await?;
let channel_closed = loop {
let Some(event) = events.try_next().await? else {
break true;
};
#[allow(clippy::large_futures)]
let control_flow = stream.handle_event(event).await?;
match control_flow {
StreamControlFlow::Continue => {}
StreamControlFlow::Stop => break false,
};
};
let mut output = stream.output;
if channel_closed {
output.push(Output::CodecChannelClose);
}
self.input_buffer_set =
stream.input_packet_stream.map(|stream| stream.take_buffer_set());
self.output_buffer_set = stream.output_buffer_set;
output
};
if options.release_input_buffers_at_end {
self.input_buffer_set = None;
}
if options.release_output_buffers_at_end {
self.output_buffer_set = None;
}
Ok(output)
}
}