Skip to main content

delivery_blob/
compression.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implementation of chunked-compression library in Rust. Archives can be created by making a new
6//! [`ChunkedArchive`] and serializing/writing it. An archive's header can be verified and seek
7//! table decoded using [`decode_archive`].
8
9use itertools::Itertools;
10use rayon::prelude::*;
11use std::ops::Range;
12use thiserror::Error;
13use zerocopy::byteorder::{LE, U16, U32, U64};
14use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, Unaligned};
15
16mod compression_algorithm;
17pub use compression_algorithm::{
18    CompressionAlgorithm, Compressor, Decompressor, ThreadLocalCompressor, ThreadLocalDecompressor,
19};
20
21/// Validated chunk information from an archive. Compressed ranges are relative to the start of
22/// compressed data (i.e. they start after the header and seek table).
23#[derive(Copy, Clone, Eq, PartialEq)]
24pub struct ZstdError(pub usize);
25
26impl std::fmt::Display for ZstdError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        let msg = zstd::zstd_safe::get_error_name(self.0);
29        let enum_code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(self.0) };
30        write!(f, "{:?} ({})", enum_code, msg)
31    }
32}
33
34impl std::fmt::Debug for ZstdError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        std::fmt::Display::fmt(self, f)
37    }
38}
39
40#[cfg(target_os = "fuchsia")]
41impl From<ZstdError> for zx::Status {
42    fn from(err: ZstdError) -> Self {
43        use zstd::zstd_safe::zstd_sys::ZSTD_ErrorCode::*;
44        let code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(err.0) };
45        match code {
46            ZSTD_error_corruption_detected
47            | ZSTD_error_checksum_wrong
48            | ZSTD_error_literals_headerWrong
49            | ZSTD_error_dictionary_corrupted
50            | ZSTD_error_prefix_unknown => zx::Status::IO_DATA_INTEGRITY,
51
52            ZSTD_error_version_unsupported
53            | ZSTD_error_frameParameter_unsupported
54            | ZSTD_error_parameter_unsupported => zx::Status::NOT_SUPPORTED,
55
56            ZSTD_error_parameter_outOfBound
57            | ZSTD_error_srcSize_wrong
58            | ZSTD_error_dstSize_tooSmall => zx::Status::INVALID_ARGS,
59
60            ZSTD_error_no_error
61            | ZSTD_error_GENERIC
62            | ZSTD_error_frameParameter_windowTooLarge
63            | ZSTD_error_dictionary_wrong
64            | ZSTD_error_dictionaryCreation_failed
65            | ZSTD_error_parameter_combination_unsupported
66            | ZSTD_error_tableLog_tooLarge
67            | ZSTD_error_maxSymbolValue_tooLarge
68            | ZSTD_error_maxSymbolValue_tooSmall
69            | ZSTD_error_stabilityCondition_notRespected
70            | ZSTD_error_stage_wrong
71            | ZSTD_error_init_missing
72            | ZSTD_error_memory_allocation
73            | ZSTD_error_workSpace_tooSmall
74            | ZSTD_error_dstBuffer_null
75            | ZSTD_error_noForwardProgress_destFull
76            | ZSTD_error_noForwardProgress_inputEmpty
77            | ZSTD_error_frameIndex_tooLarge
78            | ZSTD_error_seekableIO
79            | ZSTD_error_dstBuffer_wrong
80            | ZSTD_error_srcBuffer_wrong
81            | ZSTD_error_sequenceProducer_failed
82            | ZSTD_error_externalSequences_invalid
83            | ZSTD_error_maxCode => zx::Status::INTERNAL,
84        }
85    }
86}
87
88#[derive(Debug, Error)]
89pub enum FormatError {
90    #[error("Zstd error: {0}")]
91    Zstd(ZstdError),
92    #[error("LZ4 error: {0}")]
93    Lz4(lz4::Error),
94}
95
96#[cfg(target_os = "fuchsia")]
97impl From<&FormatError> for zx::Status {
98    fn from(err: &FormatError) -> Self {
99        match err {
100            FormatError::Zstd(e) => zx::Status::from(*e),
101            FormatError::Lz4(_) => zx::Status::IO_DATA_INTEGRITY,
102        }
103    }
104}
105
106// *NOTE*: Use caution when using the `#[source]` attribute or naming fields `source`. Some callers
107// attempt to downcast library errors into the concrete type of the root cause.
108// See https://docs.rs/thiserror/latest/thiserror/ for more information.
109#[derive(Debug, Error)]
110pub enum ChunkedArchiveError {
111    #[error("Invalid or unsupported archive version.")]
112    InvalidVersion,
113
114    #[error("Archive header has incorrect magic.")]
115    BadMagic,
116
117    #[error("Integrity checks failed (e.g. incorrect CRC, inconsistent header fields).")]
118    IntegrityError,
119
120    #[error("Value is out of range or cannot be represented in specified type.")]
121    OutOfRange,
122
123    #[error("Error decompressing chunk {index}: {error}")]
124    DecompressionError { index: usize, error: FormatError },
125
126    #[error("Error compressing chunk {index}: {error}")]
127    CompressionError { index: usize, error: FormatError },
128}
129
130/// Options for constructing a chunked archive.
131#[derive(Copy, Clone, Debug, Eq, PartialEq)]
132pub enum ChunkedArchiveOptions {
133    /// A chunked-compression V2 archive will be created.
134    V2 {
135        /// Chunked-compression V2 has a limit of 1023 chunks. If splitting the data up into
136        /// `minimum_chunk_size`d chunks would exceed this limit then the chunk size increased by
137        /// `chunk_alignment` until fewer than 1024 are required. `minimum_chunk_size` must be a
138        /// multiple of `chunk_alignment`.
139        minimum_chunk_size: usize,
140        /// The chosen uncompressed chunk size must always be a multiple of this value.
141        chunk_alignment: usize,
142        /// The Zstd compression level to use when compressing chunks.
143        compression_level: i32,
144    },
145    /// A chunked-compression V3 archive will be created.
146    V3 {
147        /// The compression algorithm to use to compress the chunks.
148        compression_algorithm: CompressionAlgorithm,
149    },
150}
151
152impl ChunkedArchiveOptions {
153    const V2_VERSION: u16 = 2;
154    const V2_MAX_CHUNKS: usize = 1023;
155
156    const V3_VERSION: u16 = 3;
157    const V3_MAX_CHUNKS: usize = u32::MAX as usize;
158    const V3_CHUNK_SIZE: usize = 32 * 1024;
159    const V3_ZSTD_COMPRESSION_LEVEL: i32 = 22;
160
161    /// Which version of chunked-compression archive should be constructed.
162    fn version(&self) -> u16 {
163        match self {
164            Self::V2 { .. } => Self::V2_VERSION,
165            Self::V3 { .. } => Self::V3_VERSION,
166        }
167    }
168
169    /// The compression algorithm to use to compress the chunks.
170    fn compression_algorithm(&self) -> CompressionAlgorithm {
171        match self {
172            Self::V2 { .. } => CompressionAlgorithm::Zstd,
173            Self::V3 { compression_algorithm } => *compression_algorithm,
174        }
175    }
176
177    /// Calculate how large chunks must be for a given amount of data.
178    fn chunk_size_for(&self, data_size: usize) -> usize {
179        match self {
180            Self::V2 { chunk_alignment, minimum_chunk_size: target_chunk_size, .. } => {
181                if data_size <= (Self::V2_MAX_CHUNKS * target_chunk_size) {
182                    *target_chunk_size
183                } else {
184                    let chunk_size = data_size.div_ceil(Self::V2_MAX_CHUNKS);
185                    chunk_size.checked_next_multiple_of(*chunk_alignment).unwrap()
186                }
187            }
188            Self::V3 { .. } => {
189                assert!(
190                    data_size.div_ceil(Self::V3_CHUNK_SIZE) <= Self::V3_MAX_CHUNKS,
191                    "Chunked-compression V3 only supports data up to ~140TB"
192                );
193                Self::V3_CHUNK_SIZE
194            }
195        }
196    }
197
198    /// Constructs a compressor to compress chunks based on the specified options.
199    pub fn compressor(&self) -> Compressor {
200        match self {
201            Self::V2 { compression_level, .. } => {
202                let mut cctx = zstd::zstd_safe::CCtx::create();
203                cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
204                    *compression_level,
205                ))
206                .expect("setting the compression level should never fail");
207                Compressor::Zstd(cctx)
208            }
209            Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
210                let mut cctx = zstd::zstd_safe::CCtx::create();
211                cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
212                    Self::V3_ZSTD_COMPRESSION_LEVEL,
213                ))
214                .expect("setting the compression level should never fail");
215                Compressor::Zstd(cctx)
216            }
217            Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
218                Compressor::Lz4 { compression_level: lz4::HcCompressionLevel::custom(12) }
219            }
220        }
221    }
222
223    /// Constructs a compressor object that uses a thread local compressor to compress chunks based
224    /// on the specified options.
225    pub fn thread_local_compressor(&self) -> ThreadLocalCompressor {
226        match self {
227            Self::V2 { compression_level, .. } => {
228                ThreadLocalCompressor::Zstd { compression_level: *compression_level }
229            }
230            Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
231                ThreadLocalCompressor::Zstd { compression_level: Self::V3_ZSTD_COMPRESSION_LEVEL }
232            }
233            Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
234                ThreadLocalCompressor::Lz4 {
235                    compression_level: lz4::HcCompressionLevel::custom(12),
236                }
237            }
238        }
239    }
240
241    /// Returns true if `version` is a valid chunked-compression version.
242    fn is_valid_version(version: u16) -> bool {
243        match version {
244            Self::V2_VERSION => true,
245            Self::V3_VERSION => true,
246            _ => false,
247        }
248    }
249
250    /// Returns the maximum number of chunks supported by the chunked-compression format at the
251    /// specified version.
252    fn max_chunks_for_version(version: u16) -> Result<usize, ChunkedArchiveError> {
253        match version {
254            Self::V2_VERSION => Ok(Self::V2_MAX_CHUNKS),
255            Self::V3_VERSION => Ok(Self::V3_MAX_CHUNKS),
256            _ => Err(ChunkedArchiveError::InvalidVersion),
257        }
258    }
259}
260
261/// Validated chunk information from an archive. Compressed ranges are relative to the start of
262/// compressed data (i.e. they start after the header and seek table).
263#[derive(Clone, Debug, Eq, PartialEq)]
264pub struct ChunkInfo {
265    pub decompressed_range: Range<usize>,
266    pub compressed_range: Range<usize>,
267}
268
269impl ChunkInfo {
270    fn from_entry(
271        entry: &SeekTableEntry,
272        header_length: usize,
273    ) -> Result<Self, ChunkedArchiveError> {
274        let decompressed_start = entry.decompressed_offset.get() as usize;
275        let decompressed_size = entry.decompressed_size.get() as usize;
276        let decompressed_range = decompressed_start
277            ..decompressed_start
278                .checked_add(decompressed_size)
279                .ok_or(ChunkedArchiveError::OutOfRange)?;
280
281        let compressed_offset = entry.compressed_offset.get() as usize;
282        let compressed_start = compressed_offset
283            .checked_sub(header_length)
284            .ok_or(ChunkedArchiveError::IntegrityError)?;
285        let compressed_size = entry.compressed_size.get() as usize;
286        let compressed_range = compressed_start
287            ..compressed_start
288                .checked_add(compressed_size)
289                .ok_or(ChunkedArchiveError::OutOfRange)?;
290
291        Ok(Self { decompressed_range, compressed_range })
292    }
293}
294
295/// Validated information from decoding an archive.
296#[derive(Debug)]
297pub struct DecodedArchive {
298    compression_algorithm: CompressionAlgorithm,
299    seek_table: Vec<ChunkInfo>,
300}
301
302impl DecodedArchive {
303    /// The total size of decompressing all of the chunks in the archive.
304    pub fn decompressed_size(&self) -> usize {
305        self.seek_table.last().map_or(0, |entry| entry.decompressed_range.end)
306    }
307}
308
309/// Decodes a chunked archive header. Returns a `DecodedArchive` and any remaining bytes that are
310/// part of the chunk data. Returns `Ok(None)` if `data` is not large enough to decode the archive
311/// header & seek table.
312pub fn decode_archive(
313    data: &[u8],
314    archive_length: usize,
315) -> Result<Option<(DecodedArchive, /*archive_data*/ &[u8])>, ChunkedArchiveError> {
316    match Ref::<_, ChunkedArchiveHeader>::from_prefix(data).map_err(Into::into) {
317        Ok((header, data)) => header.decode_archive(data, archive_length as u64),
318        Err(zerocopy::SizeError { .. }) => Ok(None), // Not enough data.
319    }
320}
321
322/// Chunked archive header.
323#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
324#[repr(C)]
325struct ChunkedArchiveHeader {
326    magic: [u8; 8],
327    version: U16<LE>,
328    // This field was added in V3 and should not be used if `version` is 2. Technically, this field
329    // should be 0 in V2, Zstd has the value 0, and V2 always uses Zstd so accessing this field in
330    // V2 should give the correct result.
331    compression_algorithm: u8,
332    reserved_0: u8,
333    num_entries: U32<LE>,
334    checksum: U32<LE>,
335    reserved_1: U32<LE>,
336    reserved_2: U64<LE>,
337}
338
339/// Chunked archive seek table entry.
340#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
341#[repr(C)]
342struct SeekTableEntry {
343    decompressed_offset: U64<LE>,
344    decompressed_size: U64<LE>,
345    compressed_offset: U64<LE>,
346    compressed_size: U64<LE>,
347}
348
349impl ChunkedArchiveHeader {
350    const CHUNKED_ARCHIVE_MAGIC: [u8; 8] = [0x46, 0x9b, 0x78, 0xef, 0x0f, 0xd0, 0xb2, 0x03];
351    const CHUNKED_ARCHIVE_CHECKSUM_OFFSET: usize = 16;
352
353    fn new(
354        seek_table: &[SeekTableEntry],
355        options: ChunkedArchiveOptions,
356    ) -> Result<Self, ChunkedArchiveError> {
357        let header: ChunkedArchiveHeader = Self {
358            magic: Self::CHUNKED_ARCHIVE_MAGIC,
359            version: options.version().into(),
360            compression_algorithm: options.compression_algorithm().into(),
361            reserved_0: 0.into(),
362            num_entries: TryInto::<u32>::try_into(seek_table.len())
363                .or(Err(ChunkedArchiveError::OutOfRange))?
364                .into(),
365            checksum: 0.into(), // `checksum` is calculated below.
366            reserved_1: 0.into(),
367            reserved_2: 0.into(),
368        };
369        Ok(Self { checksum: header.checksum(seek_table).into(), ..header })
370    }
371
372    /// Calculate the checksum of the header + all seek table entries.
373    fn checksum(&self, entries: &[SeekTableEntry]) -> u32 {
374        let crc_algo = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
375        let mut digest = crc_algo.digest();
376        digest.update(&self.as_bytes()[..Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET]);
377        digest.update(
378            &self.as_bytes()
379                [Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET + self.checksum.as_bytes().len()..],
380        );
381        digest.update(entries.as_bytes());
382        digest.finalize()
383    }
384
385    /// Calculate the total header length of an archive *including* all seek table entries.
386    fn header_length(num_entries: usize) -> usize {
387        std::mem::size_of::<ChunkedArchiveHeader>()
388            + (std::mem::size_of::<SeekTableEntry>() * num_entries)
389    }
390
391    /// Validates the archive header and decodes the seek table.
392    fn decode_archive(
393        self,
394        data: &[u8],
395        archive_length: u64,
396    ) -> Result<Option<(DecodedArchive, /*chunk_data*/ &[u8])>, ChunkedArchiveError> {
397        // Deserialize seek table.
398        let num_entries = self.num_entries.get() as usize;
399        let Ok((entries, chunk_data)) =
400            Ref::<_, [SeekTableEntry]>::from_prefix_with_elems(data, num_entries)
401        else {
402            return Ok(None);
403        };
404        let entries: &[SeekTableEntry] = Ref::into_ref(entries);
405
406        // Validate archive header.
407        if self.magic != Self::CHUNKED_ARCHIVE_MAGIC {
408            return Err(ChunkedArchiveError::BadMagic);
409        }
410        let version = self.version.get();
411        if !ChunkedArchiveOptions::is_valid_version(version) {
412            return Err(ChunkedArchiveError::InvalidVersion);
413        }
414        if self.checksum.get() != self.checksum(entries) {
415            return Err(ChunkedArchiveError::IntegrityError);
416        }
417        if entries.len() > ChunkedArchiveOptions::max_chunks_for_version(version)? {
418            return Err(ChunkedArchiveError::IntegrityError);
419        }
420        let compression_algorithm = CompressionAlgorithm::try_from(self.compression_algorithm)?;
421
422        // Validate seek table using invariants I0 through I5.
423
424        // I0: The first seek table entry, if any, must have decompressed offset 0.
425        if !entries.is_empty() && entries[0].decompressed_offset.get() != 0 {
426            return Err(ChunkedArchiveError::IntegrityError);
427        }
428
429        // I1: The compressed offsets of all seek table entries must not overlap with the header.
430        let header_length = Self::header_length(entries.len());
431        if entries.iter().any(|entry| entry.compressed_offset.get() < header_length as u64) {
432            return Err(ChunkedArchiveError::IntegrityError);
433        }
434
435        // I2: Each entry's decompressed offset must be equal to the end of the previous frame
436        //     (i.e. to the previous frame's decompressed offset + length).
437        for (prev, curr) in entries.iter().tuple_windows() {
438            if (prev.decompressed_offset.get() + prev.decompressed_size.get())
439                != curr.decompressed_offset.get()
440            {
441                return Err(ChunkedArchiveError::IntegrityError);
442            }
443        }
444
445        // I3: Each entry's compressed offset must be greater than or equal to the end of the
446        //     previous frame (i.e. to the previous frame's compressed offset + length).
447        for (prev, curr) in entries.iter().tuple_windows() {
448            if (prev.compressed_offset.get() + prev.compressed_size.get())
449                > curr.compressed_offset.get()
450            {
451                return Err(ChunkedArchiveError::IntegrityError);
452            }
453        }
454
455        // I4: Each entry must have a non-zero decompressed and compressed length.
456        for entry in entries.iter() {
457            if entry.decompressed_size.get() == 0 || entry.compressed_size.get() == 0 {
458                return Err(ChunkedArchiveError::IntegrityError);
459            }
460        }
461
462        // I5: Data referenced by each entry must fit within the specified file size.
463        for entry in entries.iter() {
464            let compressed_end = entry.compressed_offset.get() + entry.compressed_size.get();
465            if compressed_end > archive_length {
466                return Err(ChunkedArchiveError::IntegrityError);
467            }
468        }
469
470        let seek_table = entries
471            .iter()
472            .map(|entry| ChunkInfo::from_entry(entry, header_length))
473            .try_collect()?;
474        Ok(Some((DecodedArchive { seek_table, compression_algorithm }, chunk_data)))
475    }
476}
477
478/// In-memory representation of a compressed chunk.
479pub struct CompressedChunk {
480    /// Compressed data for this chunk.
481    pub compressed_data: Vec<u8>,
482    /// Size of this chunk when decompressed.
483    pub decompressed_size: usize,
484}
485
486/// In-memory representation of a compressed chunked archive.
487pub struct ChunkedArchive {
488    /// Chunks this archive contains, in order. Right now we only allow creating archives with
489    /// contiguous compressed and decompressed space.
490    chunks: Vec<CompressedChunk>,
491    /// Size used to chunk input when creating this archive. Last chunk may be smaller than this
492    /// amount.
493    chunk_size: usize,
494    /// The options used to construct this archive.
495    options: ChunkedArchiveOptions,
496}
497
498impl ChunkedArchive {
499    /// Create a ChunkedArchive for `data` compressing each chunk in parallel. This function uses
500    /// the `rayon` crate for parallelism. By default compression happens in the global thread pool,
501    /// but this function can also be executed within a locally scoped pool.
502    pub fn new(data: &[u8], options: ChunkedArchiveOptions) -> Result<Self, ChunkedArchiveError> {
503        let chunk_size = options.chunk_size_for(data.len());
504        let mut chunks: Vec<Result<CompressedChunk, ChunkedArchiveError>> = vec![];
505        let compressor = options.thread_local_compressor();
506        data.par_chunks(chunk_size)
507            .enumerate()
508            .map(|(index, chunk)| {
509                let compressed_data = compressor.compress(chunk, index)?;
510                Ok(CompressedChunk { compressed_data, decompressed_size: chunk.len() })
511            })
512            .collect_into_vec(&mut chunks);
513        let chunks: Vec<_> = chunks.into_iter().try_collect()?;
514        Ok(ChunkedArchive { chunks, chunk_size, options })
515    }
516
517    /// Accessor for compressed chunk data.
518    pub fn chunks(&self) -> &Vec<CompressedChunk> {
519        &self.chunks
520    }
521
522    /// The chunk size calculated for this archive during compression. Represents how input data
523    /// was chunked for compression. Note that the final chunk may be smaller than this amount
524    /// when decompressed.
525    pub fn chunk_size(&self) -> usize {
526        self.chunk_size
527    }
528
529    /// Sum of sizes of all compressed chunks.
530    pub fn compressed_data_size(&self) -> usize {
531        self.chunks.iter().map(|chunk| chunk.compressed_data.len()).sum()
532    }
533
534    /// Total size of the archive in bytes.
535    pub fn serialized_size(&self) -> usize {
536        ChunkedArchiveHeader::header_length(self.chunks.len()) + self.compressed_data_size()
537    }
538
539    /// Write the archive to `writer`.
540    pub fn write(self, mut writer: impl std::io::Write) -> Result<(), std::io::Error> {
541        let seek_table = self.make_seek_table();
542        let header = ChunkedArchiveHeader::new(&seek_table, self.options).unwrap();
543        writer.write_all(header.as_bytes())?;
544        writer.write_all(seek_table.as_slice().as_bytes())?;
545        for chunk in self.chunks {
546            writer.write_all(&chunk.compressed_data)?;
547        }
548        Ok(())
549    }
550
551    /// Create the seek table for this archive.
552    fn make_seek_table(&self) -> Vec<SeekTableEntry> {
553        let header_length = ChunkedArchiveHeader::header_length(self.chunks.len());
554        let mut seek_table = vec![];
555        seek_table.reserve(self.chunks.len());
556        let mut compressed_size: usize = 0;
557        let mut decompressed_offset: usize = 0;
558        for chunk in &self.chunks {
559            seek_table.push(SeekTableEntry {
560                decompressed_offset: (decompressed_offset as u64).into(),
561                decompressed_size: (chunk.decompressed_size as u64).into(),
562                compressed_offset: ((header_length + compressed_size) as u64).into(),
563                compressed_size: (chunk.compressed_data.len() as u64).into(),
564            });
565            compressed_size += chunk.compressed_data.len();
566            decompressed_offset += chunk.decompressed_size;
567        }
568        seek_table
569    }
570}
571
572/// Streaming decompressor for chunked archives. Example:
573/// ```
574/// // Create a chunked archive:
575/// let data: Vec<u8> = vec![3; 1024];
576/// let compressed = ChunkedArchive::new(&data, /*block_size*/ 8192).serialize().unwrap();
577/// // Verify the header + decode the seek table:
578/// let (seek_table, archive_data) = decode_archive(&compressed, compressed.len())?.unwrap();
579/// let mut decompressed: Vec<u8> = vec![];
580/// let mut on_chunk = |data: &[u8]| { decompressed.extend_from_slice(data); };
581/// let mut decompressor = ChunkedDecompressor(seek_table);
582/// // `on_chunk` is invoked as each slice is made available. Archive can be provided as chunks.
583/// decompressor.update(archive_data, &mut on_chunk);
584/// assert_eq!(data.as_slice(), decompressed.as_slice());
585/// ```
586pub struct ChunkedDecompressor {
587    seek_table: Vec<ChunkInfo>,
588    buffer: Vec<u8>,
589    data_written: usize,
590    curr_chunk: usize,
591    total_compressed_size: usize,
592    decompressor: Decompressor,
593    decompressed_buffer: Vec<u8>,
594    error_handler: Option<ErrorHandler>,
595}
596
597type ErrorHandler = Box<dyn Fn(usize, ChunkInfo, &[u8]) -> () + Send + 'static>;
598
599impl ChunkedDecompressor {
600    /// Create a new decompressor to decode an archive from a validated seek table.
601    pub fn new(decoded_archive: DecodedArchive) -> Result<Self, ChunkedArchiveError> {
602        let DecodedArchive { compression_algorithm, seek_table } = decoded_archive;
603        let total_compressed_size =
604            seek_table.last().map_or(0, |last_chunk| last_chunk.compressed_range.end);
605        let decompressed_buffer =
606            vec![0u8; seek_table.first().map_or(0, |c| c.decompressed_range.len())];
607        Ok(Self {
608            seek_table,
609            buffer: vec![],
610            data_written: 0,
611            curr_chunk: 0,
612            total_compressed_size,
613            decompressor: compression_algorithm.decompressor(),
614            decompressed_buffer,
615            error_handler: None,
616        })
617    }
618
619    /// Creates a new decompressor with an additional error handler invoked when a chunk fails to be
620    /// decompressed.
621    pub fn new_with_error_handler(
622        decoded_archive: DecodedArchive,
623        error_handler: ErrorHandler,
624    ) -> Result<Self, ChunkedArchiveError> {
625        Ok(Self { error_handler: Some(error_handler), ..Self::new(decoded_archive)? })
626    }
627
628    pub fn seek_table(&self) -> &Vec<ChunkInfo> {
629        &self.seek_table
630    }
631
632    fn finish_chunk(
633        &mut self,
634        data: &[u8],
635        chunk_callback: &mut impl FnMut(&[u8]) -> (),
636    ) -> Result<(), ChunkedArchiveError> {
637        debug_assert_eq!(data.len(), self.seek_table[self.curr_chunk].compressed_range.len());
638        let chunk = &self.seek_table[self.curr_chunk];
639        let decompressed_size = self
640            .decompressor
641            .decompress_into(data, self.decompressed_buffer.as_mut_slice(), self.curr_chunk)
642            .inspect_err(|_| {
643                if let Some(error_handler) = &self.error_handler {
644                    error_handler(self.curr_chunk, chunk.clone(), data.as_bytes());
645                }
646            })?;
647        if decompressed_size != chunk.decompressed_range.len() {
648            return Err(ChunkedArchiveError::IntegrityError);
649        }
650        chunk_callback(&self.decompressed_buffer[..decompressed_size]);
651        self.curr_chunk += 1;
652        Ok(())
653    }
654
655    /// Update the decompressor with more data.
656    pub fn update(
657        &mut self,
658        mut data: &[u8],
659        chunk_callback: &mut impl FnMut(&[u8]) -> (),
660    ) -> Result<(), ChunkedArchiveError> {
661        // Caller must not provide too much data.
662        if self.data_written + data.len() > self.total_compressed_size {
663            return Err(ChunkedArchiveError::OutOfRange);
664        }
665        self.data_written += data.len();
666
667        // If we had leftover data from a previous read, append until we've filled a chunk.
668        if !self.buffer.is_empty() {
669            let to_read = std::cmp::min(
670                data.len(),
671                self.seek_table[self.curr_chunk]
672                    .compressed_range
673                    .len()
674                    .checked_sub(self.buffer.len())
675                    .unwrap(),
676            );
677            self.buffer.extend_from_slice(&data[..to_read]);
678            if self.buffer.len() == self.seek_table[self.curr_chunk].compressed_range.len() {
679                // Take self.buffer temporarily (so we don't have to split borrows).
680                // That way we don't have to re-commit the pages we've already used in the buffer
681                // for next time.
682                let full_chunk = std::mem::take(&mut self.buffer);
683                self.finish_chunk(&full_chunk[..], chunk_callback)?;
684                self.buffer = full_chunk;
685                // Draining the buffer will set the length to 0 but keep the capacity the same.
686                self.buffer.drain(..);
687            }
688            data = &data[to_read..];
689        }
690
691        // Decode as many full chunks as we can.
692        while !data.is_empty()
693            && self.curr_chunk < self.seek_table.len()
694            && self.seek_table[self.curr_chunk].compressed_range.len() <= data.len()
695        {
696            let len = self.seek_table[self.curr_chunk].compressed_range.len();
697            self.finish_chunk(&data[..len], chunk_callback)?;
698            data = &data[len..];
699        }
700
701        // Buffer the rest for the next call.
702        if !data.is_empty() {
703            debug_assert!(self.curr_chunk < self.seek_table.len());
704            debug_assert!(self.data_written < self.total_compressed_size);
705            self.buffer.extend_from_slice(data);
706        }
707
708        debug_assert!(
709            self.data_written < self.total_compressed_size
710                || self.curr_chunk == self.seek_table.len()
711        );
712
713        Ok(())
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use crate::Type1Blob;
720
721    use super::*;
722    use rand::Rng;
723    use std::matches;
724
725    /// Create a compressed archive and ensure we can decode it as a valid archive that passes all
726    /// required integrity checks.
727    #[test]
728    fn compress_simple() {
729        let data: Vec<u8> = vec![0; 32 * 1024 * 16];
730        let archive = ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS).unwrap();
731        // This data is highly compressible, so the result should be smaller than the original.
732        let mut compressed: Vec<u8> = vec![];
733        archive.write(&mut compressed).unwrap();
734        assert!(compressed.len() <= data.len());
735        // We should be able to decode and verify the archive's integrity in-place.
736        assert!(decode_archive(&compressed, compressed.len()).unwrap().is_some());
737    }
738
739    /// Generate a header + seek table for verifying invariants/integrity checks.
740    fn generate_archive(
741        num_entries: usize,
742        options: ChunkedArchiveOptions,
743    ) -> (ChunkedArchiveHeader, Vec<SeekTableEntry>, /*archive_length*/ u64) {
744        let mut seek_table = Vec::with_capacity(num_entries);
745        let header_length = ChunkedArchiveHeader::header_length(num_entries) as u64;
746        const COMPRESSED_CHUNK_SIZE: u64 = 1024;
747        const DECOMPRESSED_CHUNK_SIZE: u64 = 2048;
748        for n in 0..(num_entries as u64) {
749            seek_table.push(SeekTableEntry {
750                compressed_offset: (header_length + (n * COMPRESSED_CHUNK_SIZE)).into(),
751                compressed_size: COMPRESSED_CHUNK_SIZE.into(),
752                decompressed_offset: (n * DECOMPRESSED_CHUNK_SIZE).into(),
753                decompressed_size: DECOMPRESSED_CHUNK_SIZE.into(),
754            });
755        }
756        let header = ChunkedArchiveHeader::new(&seek_table, options).unwrap();
757        let archive_length: u64 = header_length + (num_entries as u64 * COMPRESSED_CHUNK_SIZE);
758        (header, seek_table, archive_length)
759    }
760
761    #[test]
762    fn should_validate_self() {
763        let (header, seek_table, archive_length) =
764            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
765        let serialized_table = seek_table.as_slice().as_bytes();
766        assert!(header.decode_archive(serialized_table, archive_length).unwrap().is_some());
767    }
768
769    #[test]
770    fn should_validate_empty() {
771        let (header, _, archive_length) = generate_archive(0, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
772        assert!(header.decode_archive(&[], archive_length).unwrap().is_some());
773    }
774
775    #[test]
776    fn should_detect_bad_magic() {
777        let (header, seek_table, archive_length) =
778            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
779        let mut corrupt_magic = ChunkedArchiveHeader::CHUNKED_ARCHIVE_MAGIC;
780        corrupt_magic[0] = !corrupt_magic[0];
781        let bad_magic = ChunkedArchiveHeader { magic: corrupt_magic, ..header };
782        let serialized_table = seek_table.as_slice().as_bytes();
783        assert!(matches!(
784            bad_magic.decode_archive(serialized_table, archive_length).unwrap_err(),
785            ChunkedArchiveError::BadMagic
786        ));
787    }
788    #[test]
789    fn should_detect_wrong_version() {
790        let (header, seek_table, archive_length) =
791            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
792        let invalid_version = ChunkedArchiveHeader { version: u16::MAX.into(), ..header };
793        let serialized_table = seek_table.as_slice().as_bytes();
794        assert!(matches!(
795            invalid_version.decode_archive(serialized_table, archive_length).unwrap_err(),
796            ChunkedArchiveError::InvalidVersion
797        ));
798    }
799
800    #[test]
801    fn should_detect_corrupt_checksum() {
802        let (header, seek_table, archive_length) =
803            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
804        let corrupt_checksum =
805            ChunkedArchiveHeader { checksum: (!header.checksum.get()).into(), ..header };
806        let serialized_table = seek_table.as_slice().as_bytes();
807        assert!(matches!(
808            corrupt_checksum.decode_archive(serialized_table, archive_length).unwrap_err(),
809            ChunkedArchiveError::IntegrityError
810        ));
811    }
812
813    #[test]
814    fn should_reject_too_many_entries_v2() {
815        let (too_many_entries, seek_table, archive_length) = generate_archive(
816            ChunkedArchiveOptions::V2_MAX_CHUNKS + 1,
817            Type1Blob::CHUNKED_ARCHIVE_OPTIONS,
818        );
819
820        let serialized_table = seek_table.as_slice().as_bytes();
821        assert!(matches!(
822            too_many_entries.decode_archive(serialized_table, archive_length).unwrap_err(),
823            ChunkedArchiveError::IntegrityError
824        ));
825    }
826
827    #[test]
828    fn invariant_i0_first_entry_zero() {
829        let (header, mut seek_table, archive_length) =
830            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
831        assert_eq!(seek_table[0].decompressed_offset.get(), 0);
832        seek_table[0].decompressed_offset = 1.into();
833
834        let serialized_table = seek_table.as_slice().as_bytes();
835        assert!(matches!(
836            header.decode_archive(serialized_table, archive_length).unwrap_err(),
837            ChunkedArchiveError::IntegrityError
838        ));
839    }
840
841    #[test]
842    fn invariant_i1_no_header_overlap() {
843        let (header, mut seek_table, archive_length) =
844            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
845        let header_end = ChunkedArchiveHeader::header_length(seek_table.len()) as u64;
846        assert!(seek_table[0].compressed_offset.get() >= header_end);
847        seek_table[0].compressed_offset = (header_end - 1).into();
848        let serialized_table = seek_table.as_slice().as_bytes();
849        assert!(matches!(
850            header.decode_archive(serialized_table, archive_length).unwrap_err(),
851            ChunkedArchiveError::IntegrityError
852        ));
853    }
854
855    #[test]
856    fn invariant_i2_decompressed_monotonic() {
857        let (header, mut seek_table, archive_length) =
858            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
859        assert_eq!(
860            seek_table[0].decompressed_offset.get() + seek_table[0].decompressed_size.get(),
861            seek_table[1].decompressed_offset.get()
862        );
863        seek_table[1].decompressed_offset = (seek_table[1].decompressed_offset.get() - 1).into();
864        let serialized_table = seek_table.as_slice().as_bytes();
865        assert!(matches!(
866            header.decode_archive(serialized_table, archive_length).unwrap_err(),
867            ChunkedArchiveError::IntegrityError
868        ));
869    }
870
871    #[test]
872    fn invariant_i3_compressed_monotonic() {
873        let (header, mut seek_table, archive_length) =
874            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
875        assert!(
876            (seek_table[0].compressed_offset.get() + seek_table[0].compressed_size.get())
877                <= seek_table[1].compressed_offset.get()
878        );
879        seek_table[1].compressed_offset = (seek_table[1].compressed_offset.get() - 1).into();
880        let serialized_table = seek_table.as_slice().as_bytes();
881        assert!(matches!(
882            header.decode_archive(serialized_table, archive_length).unwrap_err(),
883            ChunkedArchiveError::IntegrityError
884        ));
885    }
886
887    #[test]
888    fn invariant_i4_nonzero_compressed_size() {
889        let (header, mut seek_table, archive_length) =
890            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
891        assert!(seek_table[0].compressed_size.get() > 0);
892        seek_table[0].compressed_size = 0.into();
893        let serialized_table = seek_table.as_slice().as_bytes();
894        assert!(matches!(
895            header.decode_archive(serialized_table, archive_length).unwrap_err(),
896            ChunkedArchiveError::IntegrityError
897        ));
898    }
899
900    #[test]
901    fn invariant_i4_nonzero_decompressed_size() {
902        let (header, mut seek_table, archive_length) =
903            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
904        assert!(seek_table[0].decompressed_size.get() > 0);
905        seek_table[0].decompressed_size = 0.into();
906        let serialized_table = seek_table.as_slice().as_bytes();
907        assert!(matches!(
908            header.decode_archive(serialized_table, archive_length).unwrap_err(),
909            ChunkedArchiveError::IntegrityError
910        ));
911    }
912
913    #[test]
914    fn invariant_i5_within_archive() {
915        let (header, mut seek_table, archive_length) =
916            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
917        let last_entry = seek_table.last_mut().unwrap();
918        assert!(
919            (last_entry.compressed_offset.get() + last_entry.compressed_size.get())
920                <= archive_length
921        );
922        last_entry.compressed_offset = (archive_length + 1).into();
923        let serialized_table = seek_table.as_slice().as_bytes();
924        assert!(matches!(
925            header.decode_archive(serialized_table, archive_length).unwrap_err(),
926            ChunkedArchiveError::IntegrityError
927        ));
928    }
929
930    #[test]
931    fn max_chunks() {
932        let ChunkedArchiveOptions::V2 { minimum_chunk_size, chunk_alignment, .. } =
933            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
934        else {
935            panic!()
936        };
937        assert_eq!(
938            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
939                .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS),
940            minimum_chunk_size
941        );
942        assert_eq!(
943            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
944                .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS + 1),
945            minimum_chunk_size + chunk_alignment
946        );
947    }
948
949    #[test]
950    fn test_decompressor_empty_archive() {
951        let mut compressed: Vec<u8> = vec![];
952        ChunkedArchive::new(&[], Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
953            .expect("compress")
954            .write(&mut compressed)
955            .expect("write archive");
956        let (decoded_archive, chunk_data) =
957            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
958        assert!(decoded_archive.seek_table.is_empty());
959        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
960        let mut chunk_callback = |_chunk: &[u8]| panic!("Archive doesn't have any chunks.");
961        // Stream data into the decompressor in small chunks to exhaust more edge cases.
962        chunk_data
963            .chunks(4)
964            .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
965    }
966
967    #[test]
968    fn test_decompressor() {
969        const UNCOMPRESSED_LENGTH: usize = 3_000_000;
970        let data: Vec<u8> = {
971            let range = rand::distr::Uniform::<u8>::new_inclusive(0, 255).unwrap();
972            rand::rng().sample_iter(&range).take(UNCOMPRESSED_LENGTH).collect()
973        };
974        let mut compressed: Vec<u8> = vec![];
975        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
976            .expect("compress")
977            .write(&mut compressed)
978            .expect("write archive");
979        let (decoded_archive, chunk_data) =
980            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
981
982        // Make sure we have multiple chunks for this test.
983        let num_chunks = decoded_archive.seek_table.len();
984        assert!(num_chunks > 1);
985
986        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
987
988        let mut decoded_chunks: usize = 0;
989        let mut decompressed_offset: usize = 0;
990        let mut chunk_callback = |decompressed_chunk: &[u8]| {
991            assert!(
992                decompressed_chunk
993                    == &data[decompressed_offset..decompressed_offset + decompressed_chunk.len()]
994            );
995            decompressed_offset += decompressed_chunk.len();
996            decoded_chunks += 1;
997        };
998
999        // Stream data into the decompressor in small chunks to exhaust more edge cases.
1000        chunk_data
1001            .chunks(4)
1002            .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
1003        assert_eq!(decoded_chunks, num_chunks);
1004    }
1005
1006    #[test]
1007    fn test_decompressor_corrupt_decompressed_size() {
1008        let data = vec![0; 3_000_000];
1009        let mut compressed: Vec<u8> = vec![];
1010        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1011            .expect("compress")
1012            .write(&mut compressed)
1013            .expect("write archive");
1014        let (mut decoded_archive, chunk_data) =
1015            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1016
1017        // Corrupt the decompressed size of the chunk.
1018        decoded_archive.seek_table[0].decompressed_range =
1019            decoded_archive.seek_table[0].decompressed_range.start
1020                ..decoded_archive.seek_table[0].decompressed_range.end + 1;
1021
1022        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1023        assert!(matches!(
1024            decompressor.update(&chunk_data, &mut |_chunk| {}),
1025            Err(ChunkedArchiveError::IntegrityError)
1026        ));
1027    }
1028
1029    #[test]
1030    fn test_decompressor_corrupt_compressed_size() {
1031        let data = vec![0; 3_000_000];
1032        let mut compressed: Vec<u8> = vec![];
1033        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1034            .expect("compress")
1035            .write(&mut compressed)
1036            .expect("write archive");
1037        let (mut decoded_archive, chunk_data) =
1038            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1039
1040        // Corrupt the compressed size of the chunk.
1041        decoded_archive.seek_table[0].compressed_range =
1042            decoded_archive.seek_table[0].compressed_range.start
1043                ..decoded_archive.seek_table[0].compressed_range.end - 1;
1044        let first_chunk_info = decoded_archive.seek_table[0].clone();
1045        let error_handler = move |chunk_index: usize, chunk_info: ChunkInfo, chunk_data: &[u8]| {
1046            assert_eq!(chunk_index, 0);
1047            assert_eq!(chunk_info, first_chunk_info);
1048            assert_eq!(chunk_data.len(), chunk_info.compressed_range.len());
1049        };
1050
1051        let mut decompressor =
1052            ChunkedDecompressor::new_with_error_handler(decoded_archive, Box::new(error_handler))
1053                .unwrap();
1054        assert!(matches!(
1055            decompressor.update(&chunk_data, &mut |_chunk| {}),
1056            Err(ChunkedArchiveError::DecompressionError { .. })
1057        ));
1058    }
1059
1060    #[test]
1061    fn test_decompressor_zstd_data_corruption() {
1062        let data = vec![0; 3_000_000];
1063        let mut compressed: Vec<u8> = vec![];
1064        let archive = match ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS) {
1065            Ok(a) => a,
1066            Err(e) => {
1067                panic!("Failed to compress in test: {:?}", e);
1068            }
1069        };
1070        archive.write(&mut compressed).expect("write archive");
1071        let (decoded_archive, chunk_data) =
1072            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1073
1074        let mut corrupt_data = chunk_data.to_vec();
1075        if corrupt_data.len() > 100 {
1076            corrupt_data[100] = !corrupt_data[100];
1077        }
1078
1079        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1080        let result = decompressor.update(&corrupt_data, &mut |_chunk| {});
1081        assert!(matches!(result, Err(ChunkedArchiveError::DecompressionError { .. })));
1082    }
1083
1084    #[test]
1085    fn test_v3_zstd_roundtrip() {
1086        let data = vec![0; 3_000_000];
1087        let options =
1088            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
1089        let mut compressed = vec![];
1090        ChunkedArchive::new(&data, options)
1091            .expect("compress")
1092            .write(&mut compressed)
1093            .expect("write");
1094
1095        // Verify header.
1096        let (header, _) =
1097            Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1098        assert_eq!(header.version.get(), 3);
1099        assert_eq!(header.compression_algorithm, CompressionAlgorithm::Zstd as u8);
1100
1101        let (decoded_archive, chunk_data) =
1102            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1103
1104        // Decompress.
1105        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1106        let mut decompressed: Vec<u8> = vec![];
1107        let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1108        decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1109
1110        assert_eq!(decompressed, data);
1111    }
1112
1113    #[test]
1114    fn test_v3_lz4_roundtrip() {
1115        let data = vec![0; 3_000_000];
1116        let options =
1117            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1118        let mut compressed = vec![];
1119        ChunkedArchive::new(&data, options)
1120            .expect("compress")
1121            .write(&mut compressed)
1122            .expect("write");
1123
1124        // Verify header.
1125        let (header, _) =
1126            Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1127        assert_eq!(header.version.get(), 3);
1128        assert_eq!(header.compression_algorithm, CompressionAlgorithm::Lz4 as u8);
1129
1130        let (decoded_archive, chunk_data) =
1131            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1132
1133        // Decompress.
1134        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1135        let mut decompressed: Vec<u8> = vec![];
1136        let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1137        decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1138
1139        assert_eq!(decompressed, data);
1140    }
1141}