zstd/stream/zio/
reader.rs

1use std::io::{self, BufRead, Read};
2
3use crate::stream::raw::{InBuffer, Operation, OutBuffer};
4
5// [ reader -> zstd ] -> output
6/// Implements the [`Read`] API around an [`Operation`].
7///
8/// This can be used to wrap a raw in-memory operation in a read-focused API.
9///
10/// It can wrap either a compression or decompression operation, and pulls
11/// input data from a wrapped `Read`.
12pub struct Reader<R, D> {
13    reader: R,
14    operation: D,
15
16    state: State,
17
18    single_frame: bool,
19    finished_frame: bool,
20}
21
22enum State {
23    // Still actively reading from the inner `Read`
24    Reading,
25    // We reached EOF from the inner `Read`, now flushing.
26    PastEof,
27    // We are fully done, nothing can be read.
28    Finished,
29}
30
31impl<R, D> Reader<R, D> {
32    /// Creates a new `Reader`.
33    ///
34    /// `reader` will be used to pull input data for the given operation.
35    pub fn new(reader: R, operation: D) -> Self {
36        Reader {
37            reader,
38            operation,
39            state: State::Reading,
40            single_frame: false,
41            finished_frame: false,
42        }
43    }
44
45    /// Sets `self` to stop after the first decoded frame.
46    pub fn set_single_frame(&mut self) {
47        self.single_frame = true;
48    }
49
50    /// Returns a mutable reference to the underlying operation.
51    pub fn operation_mut(&mut self) -> &mut D {
52        &mut self.operation
53    }
54
55    /// Returns a mutable reference to the underlying reader.
56    pub fn reader_mut(&mut self) -> &mut R {
57        &mut self.reader
58    }
59
60    /// Returns a reference to the underlying reader.
61    pub fn reader(&self) -> &R {
62        &self.reader
63    }
64
65    /// Returns the inner reader.
66    pub fn into_inner(self) -> R {
67        self.reader
68    }
69}
70// Read and retry on Interrupted errors.
71fn fill_buf<R>(reader: &mut R) -> io::Result<&[u8]>
72where
73    R: BufRead,
74{
75    // This doesn't work right now because of the borrow-checker.
76    // When it can be made to compile, it would allow Reader to automatically
77    // retry on `Interrupted` error.
78    /*
79    loop {
80        match reader.fill_buf() {
81            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
82            otherwise => return otherwise,
83        }
84    }
85    */
86
87    // Workaround for now
88    let res = reader.fill_buf()?;
89
90    // eprintln!("Filled buffer: {:?}", res);
91
92    Ok(res)
93}
94
95impl<R, D> Read for Reader<R, D>
96where
97    R: BufRead,
98    D: Operation,
99{
100    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
101        // Keep trying until _something_ has been written.
102        let mut first = true;
103        loop {
104            match self.state {
105                State::Reading => {
106                    let (bytes_read, bytes_written) = {
107                        // Start with a fresh pool of un-processed data.
108                        // This is the only line that can return an interruption error.
109                        let input = if first {
110                            // eprintln!("First run, no input coming.");
111                            b""
112                        } else {
113                            fill_buf(&mut self.reader)?
114                        };
115
116                        // eprintln!("Input = {:?}", input);
117
118                        // It's possible we don't have any new data to read.
119                        // (In this case we may still have zstd's own buffer to clear.)
120                        if !first && input.is_empty() {
121                            self.state = State::PastEof;
122                            continue;
123                        }
124                        first = false;
125
126                        let mut src = InBuffer::around(input);
127                        let mut dst = OutBuffer::around(buf);
128
129                        // We don't want empty input (from first=true) to cause a frame
130                        // re-initialization.
131                        if self.finished_frame && !input.is_empty() {
132                            // eprintln!("!! Reigniting !!");
133                            self.operation.reinit()?;
134                            self.finished_frame = false;
135                        }
136
137                        // Phase 1: feed input to the operation
138                        let hint = self.operation.run(&mut src, &mut dst)?;
139                        // eprintln!(
140                        //     "Hint={} Just run our operation:\n In={:?}\n Out={:?}",
141                        //     hint, src, dst
142                        // );
143
144                        if hint == 0 {
145                            // In practice this only happens when decoding, when we just finished
146                            // reading a frame.
147                            self.finished_frame = true;
148                            if self.single_frame {
149                                self.state = State::Finished;
150                            }
151                        }
152
153                        // eprintln!("Output: {:?}", dst);
154
155                        (src.pos(), dst.pos())
156                    };
157
158                    self.reader.consume(bytes_read);
159
160                    if bytes_written > 0 {
161                        return Ok(bytes_written);
162                    }
163
164                    // We need more data! Try again!
165                }
166                State::PastEof => {
167                    let mut dst = OutBuffer::around(buf);
168
169                    // We already sent all the input we could get to zstd. Time to flush out the
170                    // buffer and be done with it.
171
172                    // Phase 2: flush out the operation's buffer
173                    // Keep calling `finish()` until the buffer is empty.
174                    let hint = self
175                        .operation
176                        .finish(&mut dst, self.finished_frame)?;
177                    // eprintln!("Hint: {} ; Output: {:?}", hint, dst);
178                    if hint == 0 {
179                        // This indicates that the footer is complete.
180                        // This is the only way to terminate the stream cleanly.
181                        self.state = State::Finished;
182                    }
183
184                    return Ok(dst.pos());
185                }
186                State::Finished => {
187                    return Ok(0);
188                }
189            }
190        }
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::Reader;
197    use std::io::{Cursor, Read};
198
199    #[test]
200    fn test_noop() {
201        use crate::stream::raw::NoOp;
202
203        let input = b"AbcdefghAbcdefgh.";
204
205        // Test reader
206        let mut output = Vec::new();
207        {
208            let mut reader = Reader::new(Cursor::new(input), NoOp);
209            reader.read_to_end(&mut output).unwrap();
210        }
211        assert_eq!(&output, input);
212    }
213
214    #[test]
215    fn test_compress() {
216        use crate::stream::raw::Encoder;
217
218        let input = b"AbcdefghAbcdefgh.";
219
220        // Test reader
221        let mut output = Vec::new();
222        {
223            let mut reader =
224                Reader::new(Cursor::new(input), Encoder::new(1).unwrap());
225            reader.read_to_end(&mut output).unwrap();
226        }
227        // eprintln!("{:?}", output);
228        let decoded = crate::decode_all(&output[..]).unwrap();
229        assert_eq!(&decoded, input);
230    }
231}