stream_processor_test/
input_packet_stream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::buffer_set::*;
use crate::elementary_stream::*;
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use fidl_fuchsia_media::*;
use fuchsia_stream_processors::*;

use std::collections::HashMap;
use std::fmt;
use thiserror::Error;

type PacketIdx = u32;
type BufferIdx = u32;

/// A stream converting elementary stream chunks into input packets for a stream processor.
pub struct InputPacketStream<I> {
    packet_and_buffer_pairs: HashMap<PacketIdx, (BufferIdx, UsageStatus)>,
    buffer_set: BufferSet,
    stream_lifetime_ordinal: u64,
    stream: I,
    sent_eos: bool,
}

#[derive(Copy, Clone, PartialEq, Debug)]
enum UsageStatus {
    Free,
    InUse,
}

#[derive(Debug, Error)]
pub enum Error {
    PacketRefersToInvalidBuffer,
    BufferTooSmall { buffer_size: usize, stream_chunk_size: usize },
    VmoWriteFail(zx::Status),
}

impl fmt::Display for Error {
    fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Debug::fmt(&self, w)
    }
}

pub enum PacketPoll {
    Ready(Packet),
    Eos,
    NotReady,
}

impl<'a, I: Iterator<Item = ElementaryStreamChunk>> InputPacketStream<I> {
    pub fn new(buffer_set: BufferSet, stream: I, stream_lifetime_ordinal: u64) -> Self {
        // The official # of packets / usable packet_index values can be greater than this (for
        // now), but we don't need to use more packets than buffers, and we know # of packets will
        // be at least buffer_count.
        let packets = 0..(buffer_set.buffers.len() as u32);
        let buffers = packets.clone().rev().map(|idx| (idx, UsageStatus::Free));
        Self {
            packet_and_buffer_pairs: packets.zip(buffers).collect(),
            buffer_set,
            stream_lifetime_ordinal,
            stream,
            sent_eos: false,
        }
    }

    pub fn add_free_packet(&mut self, packet: ValidPacketHeader) -> Result<(), Error> {
        let (_, ref mut status) = *self
            .packet_and_buffer_pairs
            .get_mut(&packet.packet_index)
            .ok_or(Error::PacketRefersToInvalidBuffer)?;
        *status = UsageStatus::Free;
        Ok(())
    }

    fn free_packet_and_buffer(&mut self) -> Option<(u32, u32)> {
        // This is a linear search. This may not be appropriate in prod code.
        self.packet_and_buffer_pairs.iter_mut().find_map(|(packet, (buffer, usage))| match usage {
            UsageStatus::Free => {
                *usage = UsageStatus::InUse;
                Some((*packet, *buffer))
            }
            UsageStatus::InUse => None,
        })
    }

    pub fn next_packet(&mut self) -> Result<PacketPoll, Error> {
        let (packet_idx, buffer_idx) = if let Some(idxs) = self.free_packet_and_buffer() {
            idxs
        } else {
            return Ok(PacketPoll::NotReady);
        };

        let chunk = if let Some(chunk) = self.stream.next() {
            chunk
        } else if !self.sent_eos {
            self.sent_eos = true;
            return Ok(PacketPoll::Eos);
        } else {
            return Ok(PacketPoll::NotReady);
        };

        let buffer = self
            .buffer_set
            .buffers
            .get(buffer_idx as usize)
            .ok_or(Error::PacketRefersToInvalidBuffer)?;

        if (buffer.size as usize) < chunk.data.len() {
            return Err(Error::BufferTooSmall {
                buffer_size: buffer.size as usize,
                stream_chunk_size: chunk.data.len(),
            });
        }

        buffer.data.write(&chunk.data, 0).map_err(Error::VmoWriteFail)?;

        Ok(PacketPoll::Ready(Packet {
            header: Some(PacketHeader {
                packet_index: Some(packet_idx),
                buffer_lifetime_ordinal: Some(self.buffer_set.buffer_lifetime_ordinal),
                ..Default::default()
            }),
            buffer_index: Some(buffer_idx),
            stream_lifetime_ordinal: Some(self.stream_lifetime_ordinal),
            start_offset: Some(0),
            valid_length_bytes: Some(chunk.data.len() as u32),
            timestamp_ish: chunk.timestamp,
            start_access_unit: Some(chunk.start_access_unit),
            known_end_access_unit: Some(chunk.known_end_access_unit),
            ..Default::default()
        }))
    }

    pub fn take_buffer_set(self) -> BufferSet {
        self.buffer_set
    }
}