zstd/stream/zio/reader.rs
1use std::io::{self, BufRead, Read};
2
3use crate::stream::raw::{InBuffer, Operation, OutBuffer};
4
5// [ reader -> zstd ] -> output
6/// Implements the [`Read`] API around an [`Operation`].
7///
8/// This can be used to wrap a raw in-memory operation in a read-focused API.
9///
10/// It can wrap either a compression or decompression operation, and pulls
11/// input data from a wrapped `Read`.
12pub struct Reader<R, D> {
13 reader: R,
14 operation: D,
15
16 state: State,
17
18 single_frame: bool,
19 finished_frame: bool,
20}
21
22enum State {
23 // Still actively reading from the inner `Read`
24 Reading,
25 // We reached EOF from the inner `Read`, now flushing.
26 PastEof,
27 // We are fully done, nothing can be read.
28 Finished,
29}
30
31impl<R, D> Reader<R, D> {
32 /// Creates a new `Reader`.
33 ///
34 /// `reader` will be used to pull input data for the given operation.
35 pub fn new(reader: R, operation: D) -> Self {
36 Reader {
37 reader,
38 operation,
39 state: State::Reading,
40 single_frame: false,
41 finished_frame: false,
42 }
43 }
44
45 /// Sets `self` to stop after the first decoded frame.
46 pub fn set_single_frame(&mut self) {
47 self.single_frame = true;
48 }
49
50 /// Returns a mutable reference to the underlying operation.
51 pub fn operation_mut(&mut self) -> &mut D {
52 &mut self.operation
53 }
54
55 /// Returns a mutable reference to the underlying reader.
56 pub fn reader_mut(&mut self) -> &mut R {
57 &mut self.reader
58 }
59
60 /// Returns a reference to the underlying reader.
61 pub fn reader(&self) -> &R {
62 &self.reader
63 }
64
65 /// Returns the inner reader.
66 pub fn into_inner(self) -> R {
67 self.reader
68 }
69}
70// Read and retry on Interrupted errors.
71fn fill_buf<R>(reader: &mut R) -> io::Result<&[u8]>
72where
73 R: BufRead,
74{
75 // This doesn't work right now because of the borrow-checker.
76 // When it can be made to compile, it would allow Reader to automatically
77 // retry on `Interrupted` error.
78 /*
79 loop {
80 match reader.fill_buf() {
81 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
82 otherwise => return otherwise,
83 }
84 }
85 */
86
87 // Workaround for now
88 let res = reader.fill_buf()?;
89
90 // eprintln!("Filled buffer: {:?}", res);
91
92 Ok(res)
93}
94
95impl<R, D> Read for Reader<R, D>
96where
97 R: BufRead,
98 D: Operation,
99{
100 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
101 // Keep trying until _something_ has been written.
102 let mut first = true;
103 loop {
104 match self.state {
105 State::Reading => {
106 let (bytes_read, bytes_written) = {
107 // Start with a fresh pool of un-processed data.
108 // This is the only line that can return an interruption error.
109 let input = if first {
110 // eprintln!("First run, no input coming.");
111 b""
112 } else {
113 fill_buf(&mut self.reader)?
114 };
115
116 // eprintln!("Input = {:?}", input);
117
118 // It's possible we don't have any new data to read.
119 // (In this case we may still have zstd's own buffer to clear.)
120 if !first && input.is_empty() {
121 self.state = State::PastEof;
122 continue;
123 }
124 first = false;
125
126 let mut src = InBuffer::around(input);
127 let mut dst = OutBuffer::around(buf);
128
129 // We don't want empty input (from first=true) to cause a frame
130 // re-initialization.
131 if self.finished_frame && !input.is_empty() {
132 // eprintln!("!! Reigniting !!");
133 self.operation.reinit()?;
134 self.finished_frame = false;
135 }
136
137 // Phase 1: feed input to the operation
138 let hint = self.operation.run(&mut src, &mut dst)?;
139 // eprintln!(
140 // "Hint={} Just run our operation:\n In={:?}\n Out={:?}",
141 // hint, src, dst
142 // );
143
144 if hint == 0 {
145 // In practice this only happens when decoding, when we just finished
146 // reading a frame.
147 self.finished_frame = true;
148 if self.single_frame {
149 self.state = State::Finished;
150 }
151 }
152
153 // eprintln!("Output: {:?}", dst);
154
155 (src.pos(), dst.pos())
156 };
157
158 self.reader.consume(bytes_read);
159
160 if bytes_written > 0 {
161 return Ok(bytes_written);
162 }
163
164 // We need more data! Try again!
165 }
166 State::PastEof => {
167 let mut dst = OutBuffer::around(buf);
168
169 // We already sent all the input we could get to zstd. Time to flush out the
170 // buffer and be done with it.
171
172 // Phase 2: flush out the operation's buffer
173 // Keep calling `finish()` until the buffer is empty.
174 let hint = self
175 .operation
176 .finish(&mut dst, self.finished_frame)?;
177 // eprintln!("Hint: {} ; Output: {:?}", hint, dst);
178 if hint == 0 {
179 // This indicates that the footer is complete.
180 // This is the only way to terminate the stream cleanly.
181 self.state = State::Finished;
182 }
183
184 return Ok(dst.pos());
185 }
186 State::Finished => {
187 return Ok(0);
188 }
189 }
190 }
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::Reader;
197 use std::io::{Cursor, Read};
198
199 #[test]
200 fn test_noop() {
201 use crate::stream::raw::NoOp;
202
203 let input = b"AbcdefghAbcdefgh.";
204
205 // Test reader
206 let mut output = Vec::new();
207 {
208 let mut reader = Reader::new(Cursor::new(input), NoOp);
209 reader.read_to_end(&mut output).unwrap();
210 }
211 assert_eq!(&output, input);
212 }
213
214 #[test]
215 fn test_compress() {
216 use crate::stream::raw::Encoder;
217
218 let input = b"AbcdefghAbcdefgh.";
219
220 // Test reader
221 let mut output = Vec::new();
222 {
223 let mut reader =
224 Reader::new(Cursor::new(input), Encoder::new(1).unwrap());
225 reader.read_to_end(&mut output).unwrap();
226 }
227 // eprintln!("{:?}", output);
228 let decoded = crate::decode_all(&output[..]).unwrap();
229 assert_eq!(&decoded, input);
230 }
231}