1use std::cmp;
2use std::io;
3use std::io::prelude::*;
45#[cfg(feature = "tokio")]
6use futures::Poll;
7#[cfg(feature = "tokio")]
8use tokio_io::{AsyncRead, AsyncWrite};
910use super::bufread::{corrupt, read_gz_header};
11use super::{GzBuilder, GzHeader};
12use crate::crc::{Crc, CrcWriter};
13use crate::zio;
14use crate::{Compress, Compression, Decompress, Status};
1516/// A gzip streaming encoder
17///
18/// This structure exposes a [`Write`] interface that will emit compressed data
19/// to the underlying writer `W`.
20///
21/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
22///
23/// # Examples
24///
25/// ```
26/// use std::io::prelude::*;
27/// use flate2::Compression;
28/// use flate2::write::GzEncoder;
29///
30/// // Vec<u8> implements Write to print the compressed bytes of sample string
31/// # fn main() {
32///
33/// let mut e = GzEncoder::new(Vec::new(), Compression::default());
34/// e.write_all(b"Hello World").unwrap();
35/// println!("{:?}", e.finish().unwrap());
36/// # }
37/// ```
38#[derive(Debug)]
39pub struct GzEncoder<W: Write> {
40 inner: zio::Writer<W, Compress>,
41 crc: Crc,
42 crc_bytes_written: usize,
43 header: Vec<u8>,
44}
4546pub fn gz_encoder<W: Write>(header: Vec<u8>, w: W, lvl: Compression) -> GzEncoder<W> {
47 GzEncoder {
48 inner: zio::Writer::new(w, Compress::new(lvl, false)),
49 crc: Crc::new(),
50 header: header,
51 crc_bytes_written: 0,
52 }
53}
5455impl<W: Write> GzEncoder<W> {
56/// Creates a new encoder which will use the given compression level.
57 ///
58 /// The encoder is not configured specially for the emitted header. For
59 /// header configuration, see the `GzBuilder` type.
60 ///
61 /// The data written to the returned encoder will be compressed and then
62 /// written to the stream `w`.
63pub fn new(w: W, level: Compression) -> GzEncoder<W> {
64 GzBuilder::new().write(w, level)
65 }
6667/// Acquires a reference to the underlying writer.
68pub fn get_ref(&self) -> &W {
69self.inner.get_ref()
70 }
7172/// Acquires a mutable reference to the underlying writer.
73 ///
74 /// Note that mutation of the writer may result in surprising results if
75 /// this encoder is continued to be used.
76pub fn get_mut(&mut self) -> &mut W {
77self.inner.get_mut()
78 }
7980/// Attempt to finish this output stream, writing out final chunks of data.
81 ///
82 /// Note that this function can only be used once data has finished being
83 /// written to the output stream. After this function is called then further
84 /// calls to `write` may result in a panic.
85 ///
86 /// # Panics
87 ///
88 /// Attempts to write data to this stream may result in a panic after this
89 /// function is called.
90 ///
91 /// # Errors
92 ///
93 /// This function will perform I/O to complete this stream, and any I/O
94 /// errors which occur will be returned from this function.
95pub fn try_finish(&mut self) -> io::Result<()> {
96self.write_header()?;
97self.inner.finish()?;
9899while self.crc_bytes_written < 8 {
100let (sum, amt) = (self.crc.sum() as u32, self.crc.amount());
101let buf = [
102 (sum >> 0) as u8,
103 (sum >> 8) as u8,
104 (sum >> 16) as u8,
105 (sum >> 24) as u8,
106 (amt >> 0) as u8,
107 (amt >> 8) as u8,
108 (amt >> 16) as u8,
109 (amt >> 24) as u8,
110 ];
111let inner = self.inner.get_mut();
112let n = inner.write(&buf[self.crc_bytes_written..])?;
113self.crc_bytes_written += n;
114 }
115Ok(())
116 }
117118/// Finish encoding this stream, returning the underlying writer once the
119 /// encoding is done.
120 ///
121 /// Note that this function may not be suitable to call in a situation where
122 /// the underlying stream is an asynchronous I/O stream. To finish a stream
123 /// the `try_finish` (or `shutdown`) method should be used instead. To
124 /// re-acquire ownership of a stream it is safe to call this method after
125 /// `try_finish` or `shutdown` has returned `Ok`.
126 ///
127 /// # Errors
128 ///
129 /// This function will perform I/O to complete this stream, and any I/O
130 /// errors which occur will be returned from this function.
131pub fn finish(mut self) -> io::Result<W> {
132self.try_finish()?;
133Ok(self.inner.take_inner())
134 }
135136fn write_header(&mut self) -> io::Result<()> {
137while self.header.len() > 0 {
138let n = self.inner.get_mut().write(&self.header)?;
139self.header.drain(..n);
140 }
141Ok(())
142 }
143}
144145impl<W: Write> Write for GzEncoder<W> {
146fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
147assert_eq!(self.crc_bytes_written, 0);
148self.write_header()?;
149let n = self.inner.write(buf)?;
150self.crc.update(&buf[..n]);
151Ok(n)
152 }
153154fn flush(&mut self) -> io::Result<()> {
155assert_eq!(self.crc_bytes_written, 0);
156self.write_header()?;
157self.inner.flush()
158 }
159}
160161#[cfg(feature = "tokio")]
162impl<W: AsyncWrite> AsyncWrite for GzEncoder<W> {
163fn shutdown(&mut self) -> Poll<(), io::Error> {
164self.try_finish()?;
165self.get_mut().shutdown()
166 }
167}
168169impl<R: Read + Write> Read for GzEncoder<R> {
170fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
171self.get_mut().read(buf)
172 }
173}
174175#[cfg(feature = "tokio")]
176impl<R: AsyncRead + AsyncWrite> AsyncRead for GzEncoder<R> {}
177178impl<W: Write> Drop for GzEncoder<W> {
179fn drop(&mut self) {
180if self.inner.is_present() {
181let _ = self.try_finish();
182 }
183 }
184}
185186/// A gzip streaming decoder
187///
188/// This structure exposes a [`Write`] interface that will emit compressed data
189/// to the underlying writer `W`.
190///
191/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
192///
193/// # Examples
194///
195/// ```
196/// use std::io::prelude::*;
197/// use std::io;
198/// use flate2::Compression;
199/// use flate2::write::{GzEncoder, GzDecoder};
200///
201/// # fn main() {
202/// # let mut e = GzEncoder::new(Vec::new(), Compression::default());
203/// # e.write(b"Hello World").unwrap();
204/// # let bytes = e.finish().unwrap();
205/// # assert_eq!("Hello World", decode_writer(bytes).unwrap());
206/// # }
207/// // Uncompresses a gzip encoded vector of bytes and returns a string or error
208/// // Here Vec<u8> implements Write
209/// fn decode_writer(bytes: Vec<u8>) -> io::Result<String> {
210/// let mut writer = Vec::new();
211/// let mut decoder = GzDecoder::new(writer);
212/// decoder.write_all(&bytes[..])?;
213/// writer = decoder.finish()?;
214/// let return_string = String::from_utf8(writer).expect("String parsing error");
215/// Ok(return_string)
216/// }
217/// ```
218#[derive(Debug)]
219pub struct GzDecoder<W: Write> {
220 inner: zio::Writer<CrcWriter<W>, Decompress>,
221 crc_bytes: Vec<u8>,
222 header: Option<GzHeader>,
223 header_buf: Vec<u8>,
224}
225226const CRC_BYTES_LEN: usize = 8;
227228impl<W: Write> GzDecoder<W> {
229/// Creates a new decoder which will write uncompressed data to the stream.
230 ///
231 /// When this encoder is dropped or unwrapped the final pieces of data will
232 /// be flushed.
233pub fn new(w: W) -> GzDecoder<W> {
234 GzDecoder {
235 inner: zio::Writer::new(CrcWriter::new(w), Decompress::new(false)),
236 crc_bytes: Vec::with_capacity(CRC_BYTES_LEN),
237 header: None,
238 header_buf: Vec::new(),
239 }
240 }
241242/// Returns the header associated with this stream.
243pub fn header(&self) -> Option<&GzHeader> {
244self.header.as_ref()
245 }
246247/// Acquires a reference to the underlying writer.
248pub fn get_ref(&self) -> &W {
249self.inner.get_ref().get_ref()
250 }
251252/// Acquires a mutable reference to the underlying writer.
253 ///
254 /// Note that mutating the output/input state of the stream may corrupt this
255 /// object, so care must be taken when using this method.
256pub fn get_mut(&mut self) -> &mut W {
257self.inner.get_mut().get_mut()
258 }
259260/// Attempt to finish this output stream, writing out final chunks of data.
261 ///
262 /// Note that this function can only be used once data has finished being
263 /// written to the output stream. After this function is called then further
264 /// calls to `write` may result in a panic.
265 ///
266 /// # Panics
267 ///
268 /// Attempts to write data to this stream may result in a panic after this
269 /// function is called.
270 ///
271 /// # Errors
272 ///
273 /// This function will perform I/O to finish the stream, returning any
274 /// errors which happen.
275pub fn try_finish(&mut self) -> io::Result<()> {
276self.finish_and_check_crc()?;
277Ok(())
278 }
279280/// Consumes this decoder, flushing the output stream.
281 ///
282 /// This will flush the underlying data stream and then return the contained
283 /// writer if the flush succeeded.
284 ///
285 /// Note that this function may not be suitable to call in a situation where
286 /// the underlying stream is an asynchronous I/O stream. To finish a stream
287 /// the `try_finish` (or `shutdown`) method should be used instead. To
288 /// re-acquire ownership of a stream it is safe to call this method after
289 /// `try_finish` or `shutdown` has returned `Ok`.
290 ///
291 /// # Errors
292 ///
293 /// This function will perform I/O to complete this stream, and any I/O
294 /// errors which occur will be returned from this function.
295pub fn finish(mut self) -> io::Result<W> {
296self.finish_and_check_crc()?;
297Ok(self.inner.take_inner().into_inner())
298 }
299300fn finish_and_check_crc(&mut self) -> io::Result<()> {
301self.inner.finish()?;
302303if self.crc_bytes.len() != 8 {
304return Err(corrupt());
305 }
306307let crc = ((self.crc_bytes[0] as u32) << 0)
308 | ((self.crc_bytes[1] as u32) << 8)
309 | ((self.crc_bytes[2] as u32) << 16)
310 | ((self.crc_bytes[3] as u32) << 24);
311let amt = ((self.crc_bytes[4] as u32) << 0)
312 | ((self.crc_bytes[5] as u32) << 8)
313 | ((self.crc_bytes[6] as u32) << 16)
314 | ((self.crc_bytes[7] as u32) << 24);
315if crc != self.inner.get_ref().crc().sum() as u32 {
316return Err(corrupt());
317 }
318if amt != self.inner.get_ref().crc().amount() {
319return Err(corrupt());
320 }
321Ok(())
322 }
323}
324325struct Counter<T: Read> {
326 inner: T,
327 pos: usize,
328}
329330impl<T: Read> Read for Counter<T> {
331fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
332let pos = self.inner.read(buf)?;
333self.pos += pos;
334Ok(pos)
335 }
336}
337338impl<W: Write> Write for GzDecoder<W> {
339fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
340if self.header.is_none() {
341// trying to avoid buffer usage
342let (res, pos) = {
343let mut counter = Counter {
344 inner: self.header_buf.chain(buf),
345 pos: 0,
346 };
347let res = read_gz_header(&mut counter);
348 (res, counter.pos)
349 };
350351match res {
352Err(err) => {
353if err.kind() == io::ErrorKind::UnexpectedEof {
354// not enough data for header, save to the buffer
355self.header_buf.extend(buf);
356Ok(buf.len())
357 } else {
358Err(err)
359 }
360 }
361Ok(header) => {
362self.header = Some(header);
363let pos = pos - self.header_buf.len();
364self.header_buf.truncate(0);
365Ok(pos)
366 }
367 }
368 } else {
369let (n, status) = self.inner.write_with_status(buf)?;
370371if status == Status::StreamEnd {
372if n < buf.len() && self.crc_bytes.len() < 8 {
373let remaining = buf.len() - n;
374let crc_bytes = cmp::min(remaining, CRC_BYTES_LEN - self.crc_bytes.len());
375self.crc_bytes.extend(&buf[n..n + crc_bytes]);
376return Ok(n + crc_bytes);
377 }
378 }
379Ok(n)
380 }
381 }
382383fn flush(&mut self) -> io::Result<()> {
384self.inner.flush()
385 }
386}
387388#[cfg(feature = "tokio")]
389impl<W: AsyncWrite> AsyncWrite for GzDecoder<W> {
390fn shutdown(&mut self) -> Poll<(), io::Error> {
391self.try_finish()?;
392self.inner.get_mut().get_mut().shutdown()
393 }
394}
395396impl<W: Read + Write> Read for GzDecoder<W> {
397fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
398self.inner.get_mut().get_mut().read(buf)
399 }
400}
401402#[cfg(feature = "tokio")]
403impl<W: AsyncRead + AsyncWrite> AsyncRead for GzDecoder<W> {}
404405#[cfg(test)]
406mod tests {
407use super::*;
408409const STR: &'static str = "Hello World Hello World Hello World Hello World Hello World \
410 Hello World Hello World Hello World Hello World Hello World \
411 Hello World Hello World Hello World Hello World Hello World \
412 Hello World Hello World Hello World Hello World Hello World \
413 Hello World Hello World Hello World Hello World Hello World";
414415#[test]
416fn decode_writer_one_chunk() {
417let mut e = GzEncoder::new(Vec::new(), Compression::default());
418 e.write(STR.as_ref()).unwrap();
419let bytes = e.finish().unwrap();
420421let mut writer = Vec::new();
422let mut decoder = GzDecoder::new(writer);
423let n = decoder.write(&bytes[..]).unwrap();
424 decoder.write(&bytes[n..]).unwrap();
425 decoder.try_finish().unwrap();
426 writer = decoder.finish().unwrap();
427let return_string = String::from_utf8(writer).expect("String parsing error");
428assert_eq!(return_string, STR);
429 }
430431#[test]
432fn decode_writer_partial_header() {
433let mut e = GzEncoder::new(Vec::new(), Compression::default());
434 e.write(STR.as_ref()).unwrap();
435let bytes = e.finish().unwrap();
436437let mut writer = Vec::new();
438let mut decoder = GzDecoder::new(writer);
439assert_eq!(decoder.write(&bytes[..5]).unwrap(), 5);
440let n = decoder.write(&bytes[5..]).unwrap();
441if n < bytes.len() - 5 {
442 decoder.write(&bytes[n + 5..]).unwrap();
443 }
444 writer = decoder.finish().unwrap();
445let return_string = String::from_utf8(writer).expect("String parsing error");
446assert_eq!(return_string, STR);
447 }
448449#[test]
450fn decode_writer_exact_header() {
451let mut e = GzEncoder::new(Vec::new(), Compression::default());
452 e.write(STR.as_ref()).unwrap();
453let bytes = e.finish().unwrap();
454455let mut writer = Vec::new();
456let mut decoder = GzDecoder::new(writer);
457assert_eq!(decoder.write(&bytes[..10]).unwrap(), 10);
458 decoder.write(&bytes[10..]).unwrap();
459 writer = decoder.finish().unwrap();
460let return_string = String::from_utf8(writer).expect("String parsing error");
461assert_eq!(return_string, STR);
462 }
463464#[test]
465fn decode_writer_partial_crc() {
466let mut e = GzEncoder::new(Vec::new(), Compression::default());
467 e.write(STR.as_ref()).unwrap();
468let bytes = e.finish().unwrap();
469470let mut writer = Vec::new();
471let mut decoder = GzDecoder::new(writer);
472let l = bytes.len() - 5;
473let n = decoder.write(&bytes[..l]).unwrap();
474 decoder.write(&bytes[n..]).unwrap();
475 writer = decoder.finish().unwrap();
476let return_string = String::from_utf8(writer).expect("String parsing error");
477assert_eq!(return_string, STR);
478 }
479}