flate2/
zio.rs

1use std::io;
2use std::io::prelude::*;
3use std::mem;
4
5use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status};
6
7#[derive(Debug)]
8pub struct Writer<W: Write, D: Ops> {
9    obj: Option<W>,
10    pub data: D,
11    buf: Vec<u8>,
12}
13
14pub trait Ops {
15    type Flush: Flush;
16    fn total_in(&self) -> u64;
17    fn total_out(&self) -> u64;
18    fn run(
19        &mut self,
20        input: &[u8],
21        output: &mut [u8],
22        flush: Self::Flush,
23    ) -> Result<Status, DecompressError>;
24    fn run_vec(
25        &mut self,
26        input: &[u8],
27        output: &mut Vec<u8>,
28        flush: Self::Flush,
29    ) -> Result<Status, DecompressError>;
30}
31
32impl Ops for Compress {
33    type Flush = FlushCompress;
34    fn total_in(&self) -> u64 {
35        self.total_in()
36    }
37    fn total_out(&self) -> u64 {
38        self.total_out()
39    }
40    fn run(
41        &mut self,
42        input: &[u8],
43        output: &mut [u8],
44        flush: FlushCompress,
45    ) -> Result<Status, DecompressError> {
46        Ok(self.compress(input, output, flush).unwrap())
47    }
48    fn run_vec(
49        &mut self,
50        input: &[u8],
51        output: &mut Vec<u8>,
52        flush: FlushCompress,
53    ) -> Result<Status, DecompressError> {
54        Ok(self.compress_vec(input, output, flush).unwrap())
55    }
56}
57
58impl Ops for Decompress {
59    type Flush = FlushDecompress;
60    fn total_in(&self) -> u64 {
61        self.total_in()
62    }
63    fn total_out(&self) -> u64 {
64        self.total_out()
65    }
66    fn run(
67        &mut self,
68        input: &[u8],
69        output: &mut [u8],
70        flush: FlushDecompress,
71    ) -> Result<Status, DecompressError> {
72        self.decompress(input, output, flush)
73    }
74    fn run_vec(
75        &mut self,
76        input: &[u8],
77        output: &mut Vec<u8>,
78        flush: FlushDecompress,
79    ) -> Result<Status, DecompressError> {
80        self.decompress_vec(input, output, flush)
81    }
82}
83
84pub trait Flush {
85    fn none() -> Self;
86    fn sync() -> Self;
87    fn finish() -> Self;
88}
89
90impl Flush for FlushCompress {
91    fn none() -> Self {
92        FlushCompress::None
93    }
94
95    fn sync() -> Self {
96        FlushCompress::Sync
97    }
98
99    fn finish() -> Self {
100        FlushCompress::Finish
101    }
102}
103
104impl Flush for FlushDecompress {
105    fn none() -> Self {
106        FlushDecompress::None
107    }
108
109    fn sync() -> Self {
110        FlushDecompress::Sync
111    }
112
113    fn finish() -> Self {
114        FlushDecompress::Finish
115    }
116}
117
118pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
119where
120    R: BufRead,
121    D: Ops,
122{
123    loop {
124        let (read, consumed, ret, eof);
125        {
126            let input = obj.fill_buf()?;
127            eof = input.is_empty();
128            let before_out = data.total_out();
129            let before_in = data.total_in();
130            let flush = if eof {
131                D::Flush::finish()
132            } else {
133                D::Flush::none()
134            };
135            ret = data.run(input, dst, flush);
136            read = (data.total_out() - before_out) as usize;
137            consumed = (data.total_in() - before_in) as usize;
138        }
139        obj.consume(consumed);
140
141        match ret {
142            // If we haven't ready any data and we haven't hit EOF yet,
143            // then we need to keep asking for more data because if we
144            // return that 0 bytes of data have been read then it will
145            // be interpreted as EOF.
146            Ok(Status::Ok) | Ok(Status::BufError) if read == 0 && !eof && dst.len() > 0 => continue,
147            Ok(Status::Ok) | Ok(Status::BufError) | Ok(Status::StreamEnd) => return Ok(read),
148
149            Err(..) => {
150                return Err(io::Error::new(
151                    io::ErrorKind::InvalidInput,
152                    "corrupt deflate stream",
153                ))
154            }
155        }
156    }
157}
158
159impl<W: Write, D: Ops> Writer<W, D> {
160    pub fn new(w: W, d: D) -> Writer<W, D> {
161        Writer {
162            obj: Some(w),
163            data: d,
164            buf: Vec::with_capacity(32 * 1024),
165        }
166    }
167
168    pub fn finish(&mut self) -> io::Result<()> {
169        loop {
170            self.dump()?;
171
172            let before = self.data.total_out();
173            self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?;
174            if before == self.data.total_out() {
175                return Ok(());
176            }
177        }
178    }
179
180    pub fn replace(&mut self, w: W) -> W {
181        self.buf.truncate(0);
182        mem::replace(self.get_mut(), w)
183    }
184
185    pub fn get_ref(&self) -> &W {
186        self.obj.as_ref().unwrap()
187    }
188
189    pub fn get_mut(&mut self) -> &mut W {
190        self.obj.as_mut().unwrap()
191    }
192
193    // Note that this should only be called if the outer object is just about
194    // to be consumed!
195    //
196    // (e.g. an implementation of `into_inner`)
197    pub fn take_inner(&mut self) -> W {
198        self.obj.take().unwrap()
199    }
200
201    pub fn is_present(&self) -> bool {
202        self.obj.is_some()
203    }
204
205    // Returns total written bytes and status of underlying codec
206    pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> {
207        // miniz isn't guaranteed to actually write any of the buffer provided,
208        // it may be in a flushing mode where it's just giving us data before
209        // we're actually giving it any data. We don't want to spuriously return
210        // `Ok(0)` when possible as it will cause calls to write_all() to fail.
211        // As a result we execute this in a loop to ensure that we try our
212        // darndest to write the data.
213        loop {
214            self.dump()?;
215
216            let before_in = self.data.total_in();
217            let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none());
218            let written = (self.data.total_in() - before_in) as usize;
219
220            let is_stream_end = match ret {
221                Ok(Status::StreamEnd) => true,
222                _ => false,
223            };
224
225            if buf.len() > 0 && written == 0 && ret.is_ok() && !is_stream_end {
226                continue;
227            }
228            return match ret {
229                Ok(st) => match st {
230                    Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)),
231                },
232                Err(..) => Err(io::Error::new(
233                    io::ErrorKind::InvalidInput,
234                    "corrupt deflate stream",
235                )),
236            };
237        }
238    }
239
240    fn dump(&mut self) -> io::Result<()> {
241        // TODO: should manage this buffer not with `drain` but probably more of
242        // a deque-like strategy.
243        while self.buf.len() > 0 {
244            let n = self.obj.as_mut().unwrap().write(&self.buf)?;
245            if n == 0 {
246                return Err(io::ErrorKind::WriteZero.into());
247            }
248            self.buf.drain(..n);
249        }
250        Ok(())
251    }
252}
253
254impl<W: Write, D: Ops> Write for Writer<W, D> {
255    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
256        self.write_with_status(buf).map(|res| res.0)
257    }
258
259    fn flush(&mut self) -> io::Result<()> {
260        self.data
261            .run_vec(&[], &mut self.buf, D::Flush::sync())
262            .unwrap();
263
264        // Unfortunately miniz doesn't actually tell us when we're done with
265        // pulling out all the data from the internal stream. To remedy this we
266        // have to continually ask the stream for more memory until it doesn't
267        // give us a chunk of memory the same size as our own internal buffer,
268        // at which point we assume it's reached the end.
269        loop {
270            self.dump()?;
271            let before = self.data.total_out();
272            self.data
273                .run_vec(&[], &mut self.buf, D::Flush::none())
274                .unwrap();
275            if before == self.data.total_out() {
276                break;
277            }
278        }
279
280        self.obj.as_mut().unwrap().flush()
281    }
282}
283
284impl<W: Write, D: Ops> Drop for Writer<W, D> {
285    fn drop(&mut self) {
286        if self.obj.is_some() {
287            let _ = self.finish();
288        }
289    }
290}