stream_processor_test/
stream.rs#![allow(clippy::large_futures)]
use crate::buffer_set::*;
use crate::elementary_stream::*;
use crate::input_packet_stream::*;
use crate::output_validator::*;
use crate::{FatalError, Result};
use fidl_fuchsia_media::*;
use fidl_fuchsia_sysmem2::BufferCollectionConstraints;
use fuchsia_stream_processors::*;
use std::rc::Rc;
use tracing::debug;
pub type OrdinalSequence = <OrdinalPattern as IntoIterator>::IntoIter;
#[derive(Debug, Clone)]
pub struct StreamOptions {
pub queue_format_details: bool,
pub release_input_buffers_at_end: bool,
pub release_output_buffers_at_end: bool,
pub input_buffer_collection_constraints: Option<BufferCollectionConstraints>,
pub output_buffer_collection_constraints: Option<BufferCollectionConstraints>,
pub stop_after_first_output: bool,
}
impl Default for StreamOptions {
fn default() -> Self {
Self {
queue_format_details: true,
release_input_buffers_at_end: false,
release_output_buffers_at_end: false,
input_buffer_collection_constraints: None,
output_buffer_collection_constraints: None,
stop_after_first_output: false,
}
}
}
pub struct Stream<'a> {
pub format_details_version_ordinal: u64,
pub stream_lifetime_ordinal: u64,
pub input_buffer_ordinals: &'a mut OrdinalSequence,
pub input_packet_stream:
Option<InputPacketStream<Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a>>>,
pub output_buffer_ordinals: &'a mut OrdinalSequence,
pub output_buffer_set: Option<BufferSet>,
pub current_output_format: Option<Rc<ValidStreamOutputFormat>>,
pub stream_processor: &'a mut StreamProcessorProxy,
pub stream: &'a dyn ElementaryStream,
pub options: StreamOptions,
pub output: Vec<Output>,
}
pub enum StreamControlFlow {
Continue,
Stop,
}
impl<'a: 'b, 'b> Stream<'a> {
pub async fn start(&'b mut self) -> Result<()> {
if self.options.queue_format_details && self.input_packet_stream.is_some() {
debug!("Sending input format details for follow-up stream.");
self.stream_processor.queue_input_format_details(
self.stream_lifetime_ordinal,
&self.stream.format_details(self.format_details_version_ordinal),
)?;
}
self.send_available_input()?;
Ok(())
}
pub async fn handle_event(
&'b mut self,
event: StreamProcessorEvent,
) -> Result<StreamControlFlow> {
match event {
StreamProcessorEvent::OnInputConstraints { input_constraints } => {
debug!("Received input constraints.");
debug!("Input constraints are: {:#?}", input_constraints);
let buffer_set = BufferSetFactory::buffer_set(
get_ordinal(self.input_buffer_ordinals),
ValidStreamBufferConstraints::try_from(input_constraints)?,
self.stream_processor,
BufferSetType::Input,
self.options.input_buffer_collection_constraints.clone(),
)
.await?;
debug!("Sending input format details in response to input constraints.");
self.stream_processor.queue_input_format_details(
self.stream_lifetime_ordinal,
&self.stream.format_details(self.format_details_version_ordinal),
)?;
let chunk_stream = self.stream.capped_chunks(buffer_set.buffer_size);
self.input_packet_stream = Some(InputPacketStream::new(
buffer_set,
chunk_stream,
self.stream_lifetime_ordinal,
));
self.send_available_input()?;
}
StreamProcessorEvent::OnOutputConstraints { output_config } => {
debug!("Received output constraints.");
debug!("Output constraints are: {:#?}", output_config);
let constraints = ValidStreamOutputConstraints::try_from(output_config)?;
if constraints.buffer_constraints_action_required {
self.output_buffer_set = Some(
BufferSetFactory::buffer_set(
get_ordinal(self.output_buffer_ordinals),
constraints.buffer_constraints,
self.stream_processor,
BufferSetType::Output,
self.options.output_buffer_collection_constraints.clone(),
)
.await?,
);
}
}
StreamProcessorEvent::OnFreeInputPacket { free_input_packet } => {
debug!("Received freed input packet.");
debug!("Freed input packet is: {:#?}", free_input_packet);
let free_input_packet = ValidPacketHeader::try_from(free_input_packet)?;
let input_packet_stream = self.input_packet_stream.as_mut().expect(concat!(
"Unwrapping packet stream; ",
"it should be set before we ",
"get free input packets back."
));
input_packet_stream.add_free_packet(free_input_packet)?;
self.send_available_input()?;
}
StreamProcessorEvent::OnOutputFormat { output_format } => {
debug!("Received output format.");
debug!("Output format is: {:#?}", output_format);
let output_format = ValidStreamOutputFormat::try_from(output_format)?;
assert_eq!(output_format.stream_lifetime_ordinal, self.stream_lifetime_ordinal);
self.current_output_format = Some(Rc::new(output_format));
}
StreamProcessorEvent::OnOutputPacket {
output_packet,
error_detected_before,
error_detected_during,
} => {
assert!(!error_detected_before);
assert!(!error_detected_during);
debug!("Received output packet.");
debug!("Output packet is: {:#?}", output_packet);
let output_packet = ValidPacket::try_from(output_packet)?;
self.output.push(Output::Packet(OutputPacket {
data: self
.output_buffer_set
.as_ref()
.ok_or(FatalError(String::from(concat!(
"There should be an output buffer set ",
"if we are receiving output packets"
))))?
.read_packet(&output_packet)?,
format: self.current_output_format.clone().ok_or(FatalError(String::from(
concat!(
"There should be an output format set ",
"if we are receiving output packets"
),
)))?,
packet: output_packet,
}));
self.stream_processor.recycle_output_packet(&PacketHeader {
buffer_lifetime_ordinal: Some(output_packet.header.buffer_lifetime_ordinal),
packet_index: Some(output_packet.header.packet_index),
..Default::default()
})?;
if self.options.stop_after_first_output {
return Ok(StreamControlFlow::Stop);
}
}
StreamProcessorEvent::OnOutputEndOfStream {
stream_lifetime_ordinal,
error_detected_before,
} => {
assert!(!error_detected_before);
debug!("Received output end of stream.");
debug!("End of stream is for stream {}", stream_lifetime_ordinal);
self.output.push(Output::Eos { stream_lifetime_ordinal });
self.stream_processor.close_current_stream(
self.stream_lifetime_ordinal,
self.options.release_input_buffers_at_end,
self.options.release_output_buffers_at_end,
)?;
self.stream_processor.sync().await?;
return Ok(StreamControlFlow::Stop);
}
e => {
debug!("Got other event: {:#?}", e);
}
}
Ok(StreamControlFlow::Continue)
}
fn send_available_input(&'b mut self) -> Result<()> {
let input_packet_stream =
if let Some(input_packet_stream) = self.input_packet_stream.as_mut() {
input_packet_stream
} else {
return Ok(());
};
loop {
match input_packet_stream.next_packet()? {
PacketPoll::Ready(input_packet) => {
debug!("Sending input packet. {:?}", input_packet.valid_length_bytes);
self.stream_processor.queue_input_packet(&input_packet)?;
}
PacketPoll::Eos => {
debug!("Sending end of stream.");
break Ok(self
.stream_processor
.queue_input_end_of_stream(self.stream_lifetime_ordinal)?);
}
PacketPoll::NotReady => break Ok(()),
}
}
}
}