Skip to main content

delivery_blob/
compression.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Implementation of chunked-compression library in Rust. Archives can be created by making a new
6//! [`ChunkedArchive`] and serializing/writing it. An archive's header can be verified and seek
7//! table decoded using [`decode_archive`].
8
9use itertools::Itertools;
10use rayon::prelude::*;
11use std::ops::Range;
12use thiserror::Error;
13use zerocopy::byteorder::{LE, U16, U32, U64};
14use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, Unaligned};
15
16mod compression_algorithm;
17pub use compression_algorithm::{
18    CompressionAlgorithm, Compressor, Decompressor, ThreadLocalCompressor, ThreadLocalDecompressor,
19};
20
21/// Validated chunk information from an archive. Compressed ranges are relative to the start of
22/// compressed data (i.e. they start after the header and seek table).
23#[derive(Copy, Clone, Eq, PartialEq)]
24pub struct ZstdError(pub usize);
25
26impl std::fmt::Display for ZstdError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        let msg = zstd::zstd_safe::get_error_name(self.0);
29        let enum_code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(self.0) };
30        write!(f, "{:?} ({})", enum_code, msg)
31    }
32}
33
34impl std::fmt::Debug for ZstdError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        std::fmt::Display::fmt(self, f)
37    }
38}
39
40#[cfg(target_os = "fuchsia")]
41impl From<ZstdError> for zx::Status {
42    fn from(err: ZstdError) -> Self {
43        use zstd::zstd_safe::zstd_sys::ZSTD_ErrorCode::*;
44        let code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(err.0) };
45        match code {
46            ZSTD_error_corruption_detected
47            | ZSTD_error_checksum_wrong
48            | ZSTD_error_literals_headerWrong
49            | ZSTD_error_dictionary_corrupted
50            | ZSTD_error_prefix_unknown => zx::Status::IO_DATA_INTEGRITY,
51
52            ZSTD_error_version_unsupported
53            | ZSTD_error_frameParameter_unsupported
54            | ZSTD_error_parameter_unsupported => zx::Status::NOT_SUPPORTED,
55
56            ZSTD_error_parameter_outOfBound
57            | ZSTD_error_srcSize_wrong
58            | ZSTD_error_dstSize_tooSmall => zx::Status::INVALID_ARGS,
59
60            ZSTD_error_no_error
61            | ZSTD_error_GENERIC
62            | ZSTD_error_frameParameter_windowTooLarge
63            | ZSTD_error_dictionary_wrong
64            | ZSTD_error_dictionaryCreation_failed
65            | ZSTD_error_parameter_combination_unsupported
66            | ZSTD_error_tableLog_tooLarge
67            | ZSTD_error_maxSymbolValue_tooLarge
68            | ZSTD_error_maxSymbolValue_tooSmall
69            | ZSTD_error_stabilityCondition_notRespected
70            | ZSTD_error_stage_wrong
71            | ZSTD_error_init_missing
72            | ZSTD_error_memory_allocation
73            | ZSTD_error_workSpace_tooSmall
74            | ZSTD_error_dstBuffer_null
75            | ZSTD_error_noForwardProgress_destFull
76            | ZSTD_error_noForwardProgress_inputEmpty
77            | ZSTD_error_frameIndex_tooLarge
78            | ZSTD_error_seekableIO
79            | ZSTD_error_dstBuffer_wrong
80            | ZSTD_error_srcBuffer_wrong
81            | ZSTD_error_sequenceProducer_failed
82            | ZSTD_error_externalSequences_invalid
83            | ZSTD_error_cannotProduce_uncompressedBlock
84            | ZSTD_error_maxCode => zx::Status::INTERNAL,
85        }
86    }
87}
88
89#[derive(Debug, Error)]
90pub enum FormatError {
91    #[error("Zstd error: {0}")]
92    Zstd(ZstdError),
93    #[error("LZ4 error: {0}")]
94    Lz4(lz4::Error),
95}
96
97#[cfg(target_os = "fuchsia")]
98impl From<&FormatError> for zx::Status {
99    fn from(err: &FormatError) -> Self {
100        match err {
101            FormatError::Zstd(e) => zx::Status::from(*e),
102            FormatError::Lz4(_) => zx::Status::IO_DATA_INTEGRITY,
103        }
104    }
105}
106
107// *NOTE*: Use caution when using the `#[source]` attribute or naming fields `source`. Some callers
108// attempt to downcast library errors into the concrete type of the root cause.
109// See https://docs.rs/thiserror/latest/thiserror/ for more information.
110#[derive(Debug, Error)]
111pub enum ChunkedArchiveError {
112    #[error("Invalid or unsupported archive version.")]
113    InvalidVersion,
114
115    #[error("Archive header has incorrect magic.")]
116    BadMagic,
117
118    #[error("Integrity checks failed (e.g. incorrect CRC, inconsistent header fields).")]
119    IntegrityError,
120
121    #[error("Value is out of range or cannot be represented in specified type.")]
122    OutOfRange,
123
124    #[error("Error decompressing chunk {index}: {error}")]
125    DecompressionError { index: usize, error: FormatError },
126
127    #[error("Error compressing chunk {index}: {error}")]
128    CompressionError { index: usize, error: FormatError },
129}
130
131/// Options for constructing a chunked archive.
132#[derive(Copy, Clone, Debug, Eq, PartialEq)]
133pub enum ChunkedArchiveOptions {
134    /// A chunked-compression V2 archive will be created.
135    V2 {
136        /// Chunked-compression V2 has a limit of 1023 chunks. If splitting the data up into
137        /// `minimum_chunk_size`d chunks would exceed this limit then the chunk size increased by
138        /// `chunk_alignment` until fewer than 1024 are required. `minimum_chunk_size` must be a
139        /// multiple of `chunk_alignment`.
140        minimum_chunk_size: usize,
141        /// The chosen uncompressed chunk size must always be a multiple of this value.
142        chunk_alignment: usize,
143        /// The Zstd compression level to use when compressing chunks.
144        compression_level: i32,
145    },
146    /// A chunked-compression V3 archive will be created.
147    V3 {
148        /// The compression algorithm to use to compress the chunks.
149        compression_algorithm: CompressionAlgorithm,
150    },
151}
152
153impl ChunkedArchiveOptions {
154    const V2_VERSION: u16 = 2;
155    const V2_MAX_CHUNKS: usize = 1023;
156
157    const V3_VERSION: u16 = 3;
158    const V3_MAX_CHUNKS: usize = u32::MAX as usize;
159    const V3_CHUNK_SIZE: usize = 32 * 1024;
160    const V3_ZSTD_COMPRESSION_LEVEL: i32 = 22;
161
162    /// Which version of chunked-compression archive should be constructed.
163    fn version(&self) -> u16 {
164        match self {
165            Self::V2 { .. } => Self::V2_VERSION,
166            Self::V3 { .. } => Self::V3_VERSION,
167        }
168    }
169
170    /// The compression algorithm to use to compress the chunks.
171    fn compression_algorithm(&self) -> CompressionAlgorithm {
172        match self {
173            Self::V2 { .. } => CompressionAlgorithm::Zstd,
174            Self::V3 { compression_algorithm } => *compression_algorithm,
175        }
176    }
177
178    /// Calculate how large chunks must be for a given amount of data.
179    fn chunk_size_for(&self, data_size: usize) -> usize {
180        match self {
181            Self::V2 { chunk_alignment, minimum_chunk_size: target_chunk_size, .. } => {
182                if data_size <= (Self::V2_MAX_CHUNKS * target_chunk_size) {
183                    *target_chunk_size
184                } else {
185                    let chunk_size = data_size.div_ceil(Self::V2_MAX_CHUNKS);
186                    chunk_size.checked_next_multiple_of(*chunk_alignment).unwrap()
187                }
188            }
189            Self::V3 { .. } => {
190                assert!(
191                    data_size.div_ceil(Self::V3_CHUNK_SIZE) <= Self::V3_MAX_CHUNKS,
192                    "Chunked-compression V3 only supports data up to ~140TB"
193                );
194                Self::V3_CHUNK_SIZE
195            }
196        }
197    }
198
199    /// Constructs a compressor to compress chunks based on the specified options.
200    pub fn compressor(&self) -> Compressor {
201        match self {
202            Self::V2 { compression_level, .. } => {
203                let mut cctx = zstd::zstd_safe::CCtx::create();
204                cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
205                    *compression_level,
206                ))
207                .expect("setting the compression level should never fail");
208                Compressor::Zstd(cctx)
209            }
210            Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
211                let mut cctx = zstd::zstd_safe::CCtx::create();
212                cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
213                    Self::V3_ZSTD_COMPRESSION_LEVEL,
214                ))
215                .expect("setting the compression level should never fail");
216                Compressor::Zstd(cctx)
217            }
218            Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
219                Compressor::Lz4 { compression_level: lz4::HcCompressionLevel::custom(12) }
220            }
221        }
222    }
223
224    /// Constructs a compressor object that uses a thread local compressor to compress chunks based
225    /// on the specified options.
226    pub fn thread_local_compressor(&self) -> ThreadLocalCompressor {
227        match self {
228            Self::V2 { compression_level, .. } => {
229                ThreadLocalCompressor::Zstd { compression_level: *compression_level }
230            }
231            Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
232                ThreadLocalCompressor::Zstd { compression_level: Self::V3_ZSTD_COMPRESSION_LEVEL }
233            }
234            Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
235                ThreadLocalCompressor::Lz4 {
236                    compression_level: lz4::HcCompressionLevel::custom(12),
237                }
238            }
239        }
240    }
241
242    /// Returns true if `version` is a valid chunked-compression version.
243    fn is_valid_version(version: u16) -> bool {
244        match version {
245            Self::V2_VERSION => true,
246            Self::V3_VERSION => true,
247            _ => false,
248        }
249    }
250
251    /// Returns the maximum number of chunks supported by the chunked-compression format at the
252    /// specified version.
253    fn max_chunks_for_version(version: u16) -> Result<usize, ChunkedArchiveError> {
254        match version {
255            Self::V2_VERSION => Ok(Self::V2_MAX_CHUNKS),
256            Self::V3_VERSION => Ok(Self::V3_MAX_CHUNKS),
257            _ => Err(ChunkedArchiveError::InvalidVersion),
258        }
259    }
260}
261
262/// Validated chunk information from an archive. Compressed ranges are relative to the start of
263/// compressed data (i.e. they start after the header and seek table).
264#[derive(Clone, Debug, Eq, PartialEq)]
265pub struct ChunkInfo {
266    pub decompressed_range: Range<usize>,
267    pub compressed_range: Range<usize>,
268}
269
270impl ChunkInfo {
271    fn from_entry(
272        entry: &SeekTableEntry,
273        header_length: usize,
274    ) -> Result<Self, ChunkedArchiveError> {
275        let decompressed_start = entry.decompressed_offset.get() as usize;
276        let decompressed_size = entry.decompressed_size.get() as usize;
277        let decompressed_range = decompressed_start
278            ..decompressed_start
279                .checked_add(decompressed_size)
280                .ok_or(ChunkedArchiveError::OutOfRange)?;
281
282        let compressed_offset = entry.compressed_offset.get() as usize;
283        let compressed_start = compressed_offset
284            .checked_sub(header_length)
285            .ok_or(ChunkedArchiveError::IntegrityError)?;
286        let compressed_size = entry.compressed_size.get() as usize;
287        let compressed_range = compressed_start
288            ..compressed_start
289                .checked_add(compressed_size)
290                .ok_or(ChunkedArchiveError::OutOfRange)?;
291
292        Ok(Self { decompressed_range, compressed_range })
293    }
294}
295
296/// Validated information from decoding an archive.
297#[derive(Debug)]
298pub struct DecodedArchive {
299    compression_algorithm: CompressionAlgorithm,
300    seek_table: Vec<ChunkInfo>,
301}
302
303impl DecodedArchive {
304    /// The total size of decompressing all of the chunks in the archive.
305    pub fn decompressed_size(&self) -> usize {
306        self.seek_table.last().map_or(0, |entry| entry.decompressed_range.end)
307    }
308}
309
310/// Decodes a chunked archive header. Returns a `DecodedArchive` and any remaining bytes that are
311/// part of the chunk data. Returns `Ok(None)` if `data` is not large enough to decode the archive
312/// header & seek table.
313pub fn decode_archive(
314    data: &[u8],
315    archive_length: usize,
316) -> Result<Option<(DecodedArchive, /*archive_data*/ &[u8])>, ChunkedArchiveError> {
317    match Ref::<_, ChunkedArchiveHeader>::from_prefix(data).map_err(Into::into) {
318        Ok((header, data)) => header.decode_archive(data, archive_length as u64),
319        Err(zerocopy::SizeError { .. }) => Ok(None), // Not enough data.
320    }
321}
322
323/// Chunked archive header.
324#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
325#[repr(C)]
326struct ChunkedArchiveHeader {
327    magic: [u8; 8],
328    version: U16<LE>,
329    // This field was added in V3 and should not be used if `version` is 2. Technically, this field
330    // should be 0 in V2, Zstd has the value 0, and V2 always uses Zstd so accessing this field in
331    // V2 should give the correct result.
332    compression_algorithm: u8,
333    reserved_0: u8,
334    num_entries: U32<LE>,
335    checksum: U32<LE>,
336    reserved_1: U32<LE>,
337    reserved_2: U64<LE>,
338}
339
340/// Chunked archive seek table entry.
341#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
342#[repr(C)]
343struct SeekTableEntry {
344    decompressed_offset: U64<LE>,
345    decompressed_size: U64<LE>,
346    compressed_offset: U64<LE>,
347    compressed_size: U64<LE>,
348}
349
350impl ChunkedArchiveHeader {
351    const CHUNKED_ARCHIVE_MAGIC: [u8; 8] = [0x46, 0x9b, 0x78, 0xef, 0x0f, 0xd0, 0xb2, 0x03];
352    const CHUNKED_ARCHIVE_CHECKSUM_OFFSET: usize = 16;
353
354    fn new(
355        seek_table: &[SeekTableEntry],
356        options: ChunkedArchiveOptions,
357    ) -> Result<Self, ChunkedArchiveError> {
358        let header: ChunkedArchiveHeader = Self {
359            magic: Self::CHUNKED_ARCHIVE_MAGIC,
360            version: options.version().into(),
361            compression_algorithm: options.compression_algorithm().into(),
362            reserved_0: 0.into(),
363            num_entries: TryInto::<u32>::try_into(seek_table.len())
364                .or(Err(ChunkedArchiveError::OutOfRange))?
365                .into(),
366            checksum: 0.into(), // `checksum` is calculated below.
367            reserved_1: 0.into(),
368            reserved_2: 0.into(),
369        };
370        Ok(Self { checksum: header.checksum(seek_table).into(), ..header })
371    }
372
373    /// Calculate the checksum of the header + all seek table entries.
374    fn checksum(&self, entries: &[SeekTableEntry]) -> u32 {
375        let crc_algo = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
376        let mut digest = crc_algo.digest();
377        digest.update(&self.as_bytes()[..Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET]);
378        digest.update(
379            &self.as_bytes()
380                [Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET + self.checksum.as_bytes().len()..],
381        );
382        digest.update(entries.as_bytes());
383        digest.finalize()
384    }
385
386    /// Calculate the total header length of an archive *including* all seek table entries.
387    fn header_length(num_entries: usize) -> usize {
388        std::mem::size_of::<ChunkedArchiveHeader>()
389            + (std::mem::size_of::<SeekTableEntry>() * num_entries)
390    }
391
392    /// Validates the archive header and decodes the seek table.
393    fn decode_archive(
394        self,
395        data: &[u8],
396        archive_length: u64,
397    ) -> Result<Option<(DecodedArchive, /*chunk_data*/ &[u8])>, ChunkedArchiveError> {
398        // Deserialize seek table.
399        let num_entries = self.num_entries.get() as usize;
400        let Ok((entries, chunk_data)) =
401            Ref::<_, [SeekTableEntry]>::from_prefix_with_elems(data, num_entries)
402        else {
403            return Ok(None);
404        };
405        let entries: &[SeekTableEntry] = Ref::into_ref(entries);
406
407        // Validate archive header.
408        if self.magic != Self::CHUNKED_ARCHIVE_MAGIC {
409            return Err(ChunkedArchiveError::BadMagic);
410        }
411        let version = self.version.get();
412        if !ChunkedArchiveOptions::is_valid_version(version) {
413            return Err(ChunkedArchiveError::InvalidVersion);
414        }
415        if self.checksum.get() != self.checksum(entries) {
416            return Err(ChunkedArchiveError::IntegrityError);
417        }
418        if entries.len() > ChunkedArchiveOptions::max_chunks_for_version(version)? {
419            return Err(ChunkedArchiveError::IntegrityError);
420        }
421        let compression_algorithm = CompressionAlgorithm::try_from(self.compression_algorithm)?;
422
423        // Validate seek table using invariants I0 through I5.
424
425        // I0: The first seek table entry, if any, must have decompressed offset 0.
426        if !entries.is_empty() && entries[0].decompressed_offset.get() != 0 {
427            return Err(ChunkedArchiveError::IntegrityError);
428        }
429
430        // I1: The compressed offsets of all seek table entries must not overlap with the header.
431        let header_length = Self::header_length(entries.len());
432        if entries.iter().any(|entry| entry.compressed_offset.get() < header_length as u64) {
433            return Err(ChunkedArchiveError::IntegrityError);
434        }
435
436        // I2: Each entry's decompressed offset must be equal to the end of the previous frame
437        //     (i.e. to the previous frame's decompressed offset + length).
438        for (prev, curr) in entries.iter().tuple_windows() {
439            if (prev.decompressed_offset.get() + prev.decompressed_size.get())
440                != curr.decompressed_offset.get()
441            {
442                return Err(ChunkedArchiveError::IntegrityError);
443            }
444        }
445
446        // I3: Each entry's compressed offset must be greater than or equal to the end of the
447        //     previous frame (i.e. to the previous frame's compressed offset + length).
448        for (prev, curr) in entries.iter().tuple_windows() {
449            if (prev.compressed_offset.get() + prev.compressed_size.get())
450                > curr.compressed_offset.get()
451            {
452                return Err(ChunkedArchiveError::IntegrityError);
453            }
454        }
455
456        // I4: Each entry must have a non-zero decompressed and compressed length.
457        for entry in entries.iter() {
458            if entry.decompressed_size.get() == 0 || entry.compressed_size.get() == 0 {
459                return Err(ChunkedArchiveError::IntegrityError);
460            }
461        }
462
463        // I5: Data referenced by each entry must fit within the specified file size.
464        for entry in entries.iter() {
465            let compressed_end = entry.compressed_offset.get() + entry.compressed_size.get();
466            if compressed_end > archive_length {
467                return Err(ChunkedArchiveError::IntegrityError);
468            }
469        }
470
471        let seek_table = entries
472            .iter()
473            .map(|entry| ChunkInfo::from_entry(entry, header_length))
474            .try_collect()?;
475        Ok(Some((DecodedArchive { seek_table, compression_algorithm }, chunk_data)))
476    }
477}
478
479/// In-memory representation of a compressed chunk.
480pub struct CompressedChunk {
481    /// Compressed data for this chunk.
482    pub compressed_data: Vec<u8>,
483    /// Size of this chunk when decompressed.
484    pub decompressed_size: usize,
485}
486
487/// In-memory representation of a compressed chunked archive.
488pub struct ChunkedArchive {
489    /// Chunks this archive contains, in order. Right now we only allow creating archives with
490    /// contiguous compressed and decompressed space.
491    chunks: Vec<CompressedChunk>,
492    /// Size used to chunk input when creating this archive. Last chunk may be smaller than this
493    /// amount.
494    chunk_size: usize,
495    /// The options used to construct this archive.
496    options: ChunkedArchiveOptions,
497}
498
499impl ChunkedArchive {
500    /// Create a ChunkedArchive for `data` compressing each chunk in parallel. This function uses
501    /// the `rayon` crate for parallelism. By default compression happens in the global thread pool,
502    /// but this function can also be executed within a locally scoped pool.
503    pub fn new(data: &[u8], options: ChunkedArchiveOptions) -> Result<Self, ChunkedArchiveError> {
504        let chunk_size = options.chunk_size_for(data.len());
505        let mut chunks: Vec<Result<CompressedChunk, ChunkedArchiveError>> = vec![];
506        let compressor = options.thread_local_compressor();
507        data.par_chunks(chunk_size)
508            .enumerate()
509            .map(|(index, chunk)| {
510                let compressed_data = compressor.compress(chunk, index)?;
511                Ok(CompressedChunk { compressed_data, decompressed_size: chunk.len() })
512            })
513            .collect_into_vec(&mut chunks);
514        let chunks: Vec<_> = chunks.into_iter().try_collect()?;
515        Ok(ChunkedArchive { chunks, chunk_size, options })
516    }
517
518    /// Accessor for compressed chunk data.
519    pub fn chunks(&self) -> &Vec<CompressedChunk> {
520        &self.chunks
521    }
522
523    /// The chunk size calculated for this archive during compression. Represents how input data
524    /// was chunked for compression. Note that the final chunk may be smaller than this amount
525    /// when decompressed.
526    pub fn chunk_size(&self) -> usize {
527        self.chunk_size
528    }
529
530    /// Sum of sizes of all compressed chunks.
531    pub fn compressed_data_size(&self) -> usize {
532        self.chunks.iter().map(|chunk| chunk.compressed_data.len()).sum()
533    }
534
535    /// Total size of the archive in bytes.
536    pub fn serialized_size(&self) -> usize {
537        ChunkedArchiveHeader::header_length(self.chunks.len()) + self.compressed_data_size()
538    }
539
540    /// Write the archive to `writer`.
541    pub fn write(self, mut writer: impl std::io::Write) -> Result<(), std::io::Error> {
542        let seek_table = self.make_seek_table();
543        let header = ChunkedArchiveHeader::new(&seek_table, self.options).unwrap();
544        writer.write_all(header.as_bytes())?;
545        writer.write_all(seek_table.as_slice().as_bytes())?;
546        for chunk in self.chunks {
547            writer.write_all(&chunk.compressed_data)?;
548        }
549        Ok(())
550    }
551
552    /// Create the seek table for this archive.
553    fn make_seek_table(&self) -> Vec<SeekTableEntry> {
554        let header_length = ChunkedArchiveHeader::header_length(self.chunks.len());
555        let mut seek_table = vec![];
556        seek_table.reserve(self.chunks.len());
557        let mut compressed_size: usize = 0;
558        let mut decompressed_offset: usize = 0;
559        for chunk in &self.chunks {
560            seek_table.push(SeekTableEntry {
561                decompressed_offset: (decompressed_offset as u64).into(),
562                decompressed_size: (chunk.decompressed_size as u64).into(),
563                compressed_offset: ((header_length + compressed_size) as u64).into(),
564                compressed_size: (chunk.compressed_data.len() as u64).into(),
565            });
566            compressed_size += chunk.compressed_data.len();
567            decompressed_offset += chunk.decompressed_size;
568        }
569        seek_table
570    }
571}
572
573/// Streaming decompressor for chunked archives. Example:
574/// ```
575/// // Create a chunked archive:
576/// let data: Vec<u8> = vec![3; 1024];
577/// let compressed = ChunkedArchive::new(&data, /*block_size*/ 8192).serialize().unwrap();
578/// // Verify the header + decode the seek table:
579/// let (seek_table, archive_data) = decode_archive(&compressed, compressed.len())?.unwrap();
580/// let mut decompressed: Vec<u8> = vec![];
581/// let mut on_chunk = |data: &[u8]| { decompressed.extend_from_slice(data); };
582/// let mut decompressor = ChunkedDecompressor(seek_table);
583/// // `on_chunk` is invoked as each slice is made available. Archive can be provided as chunks.
584/// decompressor.update(archive_data, &mut on_chunk);
585/// assert_eq!(data.as_slice(), decompressed.as_slice());
586/// ```
587pub struct ChunkedDecompressor {
588    seek_table: Vec<ChunkInfo>,
589    buffer: Vec<u8>,
590    data_written: usize,
591    curr_chunk: usize,
592    total_compressed_size: usize,
593    decompressor: Decompressor,
594    decompressed_buffer: Vec<u8>,
595    error_handler: Option<ErrorHandler>,
596}
597
598type ErrorHandler = Box<dyn Fn(usize, ChunkInfo, &[u8]) -> () + Send + 'static>;
599
600impl ChunkedDecompressor {
601    /// Create a new decompressor to decode an archive from a validated seek table.
602    pub fn new(decoded_archive: DecodedArchive) -> Result<Self, ChunkedArchiveError> {
603        let DecodedArchive { compression_algorithm, seek_table } = decoded_archive;
604        let total_compressed_size =
605            seek_table.last().map_or(0, |last_chunk| last_chunk.compressed_range.end);
606        let decompressed_buffer =
607            vec![0u8; seek_table.first().map_or(0, |c| c.decompressed_range.len())];
608        Ok(Self {
609            seek_table,
610            buffer: vec![],
611            data_written: 0,
612            curr_chunk: 0,
613            total_compressed_size,
614            decompressor: compression_algorithm.decompressor(),
615            decompressed_buffer,
616            error_handler: None,
617        })
618    }
619
620    /// Creates a new decompressor with an additional error handler invoked when a chunk fails to be
621    /// decompressed.
622    pub fn new_with_error_handler(
623        decoded_archive: DecodedArchive,
624        error_handler: ErrorHandler,
625    ) -> Result<Self, ChunkedArchiveError> {
626        Ok(Self { error_handler: Some(error_handler), ..Self::new(decoded_archive)? })
627    }
628
629    pub fn seek_table(&self) -> &Vec<ChunkInfo> {
630        &self.seek_table
631    }
632
633    fn finish_chunk(
634        &mut self,
635        data: &[u8],
636        chunk_callback: &mut impl FnMut(&[u8]) -> (),
637    ) -> Result<(), ChunkedArchiveError> {
638        debug_assert_eq!(data.len(), self.seek_table[self.curr_chunk].compressed_range.len());
639        let chunk = &self.seek_table[self.curr_chunk];
640        let decompressed_size = self
641            .decompressor
642            .decompress_into(data, self.decompressed_buffer.as_mut_slice(), self.curr_chunk)
643            .inspect_err(|_| {
644                if let Some(error_handler) = &self.error_handler {
645                    error_handler(self.curr_chunk, chunk.clone(), data.as_bytes());
646                }
647            })?;
648        if decompressed_size != chunk.decompressed_range.len() {
649            return Err(ChunkedArchiveError::IntegrityError);
650        }
651        chunk_callback(&self.decompressed_buffer[..decompressed_size]);
652        self.curr_chunk += 1;
653        Ok(())
654    }
655
656    /// Update the decompressor with more data.
657    pub fn update(
658        &mut self,
659        mut data: &[u8],
660        chunk_callback: &mut impl FnMut(&[u8]) -> (),
661    ) -> Result<(), ChunkedArchiveError> {
662        // Caller must not provide too much data.
663        if self.data_written + data.len() > self.total_compressed_size {
664            return Err(ChunkedArchiveError::OutOfRange);
665        }
666        self.data_written += data.len();
667
668        // If we had leftover data from a previous read, append until we've filled a chunk.
669        if !self.buffer.is_empty() {
670            let to_read = std::cmp::min(
671                data.len(),
672                self.seek_table[self.curr_chunk]
673                    .compressed_range
674                    .len()
675                    .checked_sub(self.buffer.len())
676                    .unwrap(),
677            );
678            self.buffer.extend_from_slice(&data[..to_read]);
679            if self.buffer.len() == self.seek_table[self.curr_chunk].compressed_range.len() {
680                // Take self.buffer temporarily (so we don't have to split borrows).
681                // That way we don't have to re-commit the pages we've already used in the buffer
682                // for next time.
683                let full_chunk = std::mem::take(&mut self.buffer);
684                self.finish_chunk(&full_chunk[..], chunk_callback)?;
685                self.buffer = full_chunk;
686                // Draining the buffer will set the length to 0 but keep the capacity the same.
687                self.buffer.drain(..);
688            }
689            data = &data[to_read..];
690        }
691
692        // Decode as many full chunks as we can.
693        while !data.is_empty()
694            && self.curr_chunk < self.seek_table.len()
695            && self.seek_table[self.curr_chunk].compressed_range.len() <= data.len()
696        {
697            let len = self.seek_table[self.curr_chunk].compressed_range.len();
698            self.finish_chunk(&data[..len], chunk_callback)?;
699            data = &data[len..];
700        }
701
702        // Buffer the rest for the next call.
703        if !data.is_empty() {
704            debug_assert!(self.curr_chunk < self.seek_table.len());
705            debug_assert!(self.data_written < self.total_compressed_size);
706            self.buffer.extend_from_slice(data);
707        }
708
709        debug_assert!(
710            self.data_written < self.total_compressed_size
711                || self.curr_chunk == self.seek_table.len()
712        );
713
714        Ok(())
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use crate::Type1Blob;
721
722    use super::*;
723    use rand::Rng;
724    use std::matches;
725
726    /// Create a compressed archive and ensure we can decode it as a valid archive that passes all
727    /// required integrity checks.
728    #[test]
729    fn compress_simple() {
730        let data: Vec<u8> = vec![0; 32 * 1024 * 16];
731        let archive = ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS).unwrap();
732        // This data is highly compressible, so the result should be smaller than the original.
733        let mut compressed: Vec<u8> = vec![];
734        archive.write(&mut compressed).unwrap();
735        assert!(compressed.len() <= data.len());
736        // We should be able to decode and verify the archive's integrity in-place.
737        assert!(decode_archive(&compressed, compressed.len()).unwrap().is_some());
738    }
739
740    /// Generate a header + seek table for verifying invariants/integrity checks.
741    fn generate_archive(
742        num_entries: usize,
743        options: ChunkedArchiveOptions,
744    ) -> (ChunkedArchiveHeader, Vec<SeekTableEntry>, /*archive_length*/ u64) {
745        let mut seek_table = Vec::with_capacity(num_entries);
746        let header_length = ChunkedArchiveHeader::header_length(num_entries) as u64;
747        const COMPRESSED_CHUNK_SIZE: u64 = 1024;
748        const DECOMPRESSED_CHUNK_SIZE: u64 = 2048;
749        for n in 0..(num_entries as u64) {
750            seek_table.push(SeekTableEntry {
751                compressed_offset: (header_length + (n * COMPRESSED_CHUNK_SIZE)).into(),
752                compressed_size: COMPRESSED_CHUNK_SIZE.into(),
753                decompressed_offset: (n * DECOMPRESSED_CHUNK_SIZE).into(),
754                decompressed_size: DECOMPRESSED_CHUNK_SIZE.into(),
755            });
756        }
757        let header = ChunkedArchiveHeader::new(&seek_table, options).unwrap();
758        let archive_length: u64 = header_length + (num_entries as u64 * COMPRESSED_CHUNK_SIZE);
759        (header, seek_table, archive_length)
760    }
761
762    #[test]
763    fn should_validate_self() {
764        let (header, seek_table, archive_length) =
765            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
766        let serialized_table = seek_table.as_slice().as_bytes();
767        assert!(header.decode_archive(serialized_table, archive_length).unwrap().is_some());
768    }
769
770    #[test]
771    fn should_validate_empty() {
772        let (header, _, archive_length) = generate_archive(0, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
773        assert!(header.decode_archive(&[], archive_length).unwrap().is_some());
774    }
775
776    #[test]
777    fn should_detect_bad_magic() {
778        let (header, seek_table, archive_length) =
779            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
780        let mut corrupt_magic = ChunkedArchiveHeader::CHUNKED_ARCHIVE_MAGIC;
781        corrupt_magic[0] = !corrupt_magic[0];
782        let bad_magic = ChunkedArchiveHeader { magic: corrupt_magic, ..header };
783        let serialized_table = seek_table.as_slice().as_bytes();
784        assert!(matches!(
785            bad_magic.decode_archive(serialized_table, archive_length).unwrap_err(),
786            ChunkedArchiveError::BadMagic
787        ));
788    }
789    #[test]
790    fn should_detect_wrong_version() {
791        let (header, seek_table, archive_length) =
792            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
793        let invalid_version = ChunkedArchiveHeader { version: u16::MAX.into(), ..header };
794        let serialized_table = seek_table.as_slice().as_bytes();
795        assert!(matches!(
796            invalid_version.decode_archive(serialized_table, archive_length).unwrap_err(),
797            ChunkedArchiveError::InvalidVersion
798        ));
799    }
800
801    #[test]
802    fn should_detect_corrupt_checksum() {
803        let (header, seek_table, archive_length) =
804            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
805        let corrupt_checksum =
806            ChunkedArchiveHeader { checksum: (!header.checksum.get()).into(), ..header };
807        let serialized_table = seek_table.as_slice().as_bytes();
808        assert!(matches!(
809            corrupt_checksum.decode_archive(serialized_table, archive_length).unwrap_err(),
810            ChunkedArchiveError::IntegrityError
811        ));
812    }
813
814    #[test]
815    fn should_reject_too_many_entries_v2() {
816        let (too_many_entries, seek_table, archive_length) = generate_archive(
817            ChunkedArchiveOptions::V2_MAX_CHUNKS + 1,
818            Type1Blob::CHUNKED_ARCHIVE_OPTIONS,
819        );
820
821        let serialized_table = seek_table.as_slice().as_bytes();
822        assert!(matches!(
823            too_many_entries.decode_archive(serialized_table, archive_length).unwrap_err(),
824            ChunkedArchiveError::IntegrityError
825        ));
826    }
827
828    #[test]
829    fn invariant_i0_first_entry_zero() {
830        let (header, mut seek_table, archive_length) =
831            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
832        assert_eq!(seek_table[0].decompressed_offset.get(), 0);
833        seek_table[0].decompressed_offset = 1.into();
834
835        let serialized_table = seek_table.as_slice().as_bytes();
836        assert!(matches!(
837            header.decode_archive(serialized_table, archive_length).unwrap_err(),
838            ChunkedArchiveError::IntegrityError
839        ));
840    }
841
842    #[test]
843    fn invariant_i1_no_header_overlap() {
844        let (header, mut seek_table, archive_length) =
845            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
846        let header_end = ChunkedArchiveHeader::header_length(seek_table.len()) as u64;
847        assert!(seek_table[0].compressed_offset.get() >= header_end);
848        seek_table[0].compressed_offset = (header_end - 1).into();
849        let serialized_table = seek_table.as_slice().as_bytes();
850        assert!(matches!(
851            header.decode_archive(serialized_table, archive_length).unwrap_err(),
852            ChunkedArchiveError::IntegrityError
853        ));
854    }
855
856    #[test]
857    fn invariant_i2_decompressed_monotonic() {
858        let (header, mut seek_table, archive_length) =
859            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
860        assert_eq!(
861            seek_table[0].decompressed_offset.get() + seek_table[0].decompressed_size.get(),
862            seek_table[1].decompressed_offset.get()
863        );
864        seek_table[1].decompressed_offset = (seek_table[1].decompressed_offset.get() - 1).into();
865        let serialized_table = seek_table.as_slice().as_bytes();
866        assert!(matches!(
867            header.decode_archive(serialized_table, archive_length).unwrap_err(),
868            ChunkedArchiveError::IntegrityError
869        ));
870    }
871
872    #[test]
873    fn invariant_i3_compressed_monotonic() {
874        let (header, mut seek_table, archive_length) =
875            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
876        assert!(
877            (seek_table[0].compressed_offset.get() + seek_table[0].compressed_size.get())
878                <= seek_table[1].compressed_offset.get()
879        );
880        seek_table[1].compressed_offset = (seek_table[1].compressed_offset.get() - 1).into();
881        let serialized_table = seek_table.as_slice().as_bytes();
882        assert!(matches!(
883            header.decode_archive(serialized_table, archive_length).unwrap_err(),
884            ChunkedArchiveError::IntegrityError
885        ));
886    }
887
888    #[test]
889    fn invariant_i4_nonzero_compressed_size() {
890        let (header, mut seek_table, archive_length) =
891            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
892        assert!(seek_table[0].compressed_size.get() > 0);
893        seek_table[0].compressed_size = 0.into();
894        let serialized_table = seek_table.as_slice().as_bytes();
895        assert!(matches!(
896            header.decode_archive(serialized_table, archive_length).unwrap_err(),
897            ChunkedArchiveError::IntegrityError
898        ));
899    }
900
901    #[test]
902    fn invariant_i4_nonzero_decompressed_size() {
903        let (header, mut seek_table, archive_length) =
904            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
905        assert!(seek_table[0].decompressed_size.get() > 0);
906        seek_table[0].decompressed_size = 0.into();
907        let serialized_table = seek_table.as_slice().as_bytes();
908        assert!(matches!(
909            header.decode_archive(serialized_table, archive_length).unwrap_err(),
910            ChunkedArchiveError::IntegrityError
911        ));
912    }
913
914    #[test]
915    fn invariant_i5_within_archive() {
916        let (header, mut seek_table, archive_length) =
917            generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
918        let last_entry = seek_table.last_mut().unwrap();
919        assert!(
920            (last_entry.compressed_offset.get() + last_entry.compressed_size.get())
921                <= archive_length
922        );
923        last_entry.compressed_offset = (archive_length + 1).into();
924        let serialized_table = seek_table.as_slice().as_bytes();
925        assert!(matches!(
926            header.decode_archive(serialized_table, archive_length).unwrap_err(),
927            ChunkedArchiveError::IntegrityError
928        ));
929    }
930
931    #[test]
932    fn max_chunks() {
933        let ChunkedArchiveOptions::V2 { minimum_chunk_size, chunk_alignment, .. } =
934            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
935        else {
936            panic!()
937        };
938        assert_eq!(
939            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
940                .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS),
941            minimum_chunk_size
942        );
943        assert_eq!(
944            Type1Blob::CHUNKED_ARCHIVE_OPTIONS
945                .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS + 1),
946            minimum_chunk_size + chunk_alignment
947        );
948    }
949
950    #[test]
951    fn test_decompressor_empty_archive() {
952        let mut compressed: Vec<u8> = vec![];
953        ChunkedArchive::new(&[], Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
954            .expect("compress")
955            .write(&mut compressed)
956            .expect("write archive");
957        let (decoded_archive, chunk_data) =
958            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
959        assert!(decoded_archive.seek_table.is_empty());
960        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
961        let mut chunk_callback = |_chunk: &[u8]| panic!("Archive doesn't have any chunks.");
962        // Stream data into the decompressor in small chunks to exhaust more edge cases.
963        chunk_data
964            .chunks(4)
965            .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
966    }
967
968    #[test]
969    fn test_decompressor() {
970        const UNCOMPRESSED_LENGTH: usize = 3_000_000;
971        let data: Vec<u8> = {
972            let range = rand::distr::Uniform::<u8>::new_inclusive(0, 255).unwrap();
973            rand::rng().sample_iter(&range).take(UNCOMPRESSED_LENGTH).collect()
974        };
975        let mut compressed: Vec<u8> = vec![];
976        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
977            .expect("compress")
978            .write(&mut compressed)
979            .expect("write archive");
980        let (decoded_archive, chunk_data) =
981            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
982
983        // Make sure we have multiple chunks for this test.
984        let num_chunks = decoded_archive.seek_table.len();
985        assert!(num_chunks > 1);
986
987        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
988
989        let mut decoded_chunks: usize = 0;
990        let mut decompressed_offset: usize = 0;
991        let mut chunk_callback = |decompressed_chunk: &[u8]| {
992            assert!(
993                decompressed_chunk
994                    == &data[decompressed_offset..decompressed_offset + decompressed_chunk.len()]
995            );
996            decompressed_offset += decompressed_chunk.len();
997            decoded_chunks += 1;
998        };
999
1000        // Stream data into the decompressor in small chunks to exhaust more edge cases.
1001        chunk_data
1002            .chunks(4)
1003            .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
1004        assert_eq!(decoded_chunks, num_chunks);
1005    }
1006
1007    #[test]
1008    fn test_decompressor_corrupt_decompressed_size() {
1009        let data = vec![0; 3_000_000];
1010        let mut compressed: Vec<u8> = vec![];
1011        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1012            .expect("compress")
1013            .write(&mut compressed)
1014            .expect("write archive");
1015        let (mut decoded_archive, chunk_data) =
1016            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1017
1018        // Corrupt the decompressed size of the chunk.
1019        decoded_archive.seek_table[0].decompressed_range =
1020            decoded_archive.seek_table[0].decompressed_range.start
1021                ..decoded_archive.seek_table[0].decompressed_range.end + 1;
1022
1023        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1024        assert!(matches!(
1025            decompressor.update(&chunk_data, &mut |_chunk| {}),
1026            Err(ChunkedArchiveError::IntegrityError)
1027        ));
1028    }
1029
1030    #[test]
1031    fn test_decompressor_corrupt_compressed_size() {
1032        let data = vec![0; 3_000_000];
1033        let mut compressed: Vec<u8> = vec![];
1034        ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1035            .expect("compress")
1036            .write(&mut compressed)
1037            .expect("write archive");
1038        let (mut decoded_archive, chunk_data) =
1039            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1040
1041        // Corrupt the compressed size of the chunk.
1042        decoded_archive.seek_table[0].compressed_range =
1043            decoded_archive.seek_table[0].compressed_range.start
1044                ..decoded_archive.seek_table[0].compressed_range.end - 1;
1045        let first_chunk_info = decoded_archive.seek_table[0].clone();
1046        let error_handler = move |chunk_index: usize, chunk_info: ChunkInfo, chunk_data: &[u8]| {
1047            assert_eq!(chunk_index, 0);
1048            assert_eq!(chunk_info, first_chunk_info);
1049            assert_eq!(chunk_data.len(), chunk_info.compressed_range.len());
1050        };
1051
1052        let mut decompressor =
1053            ChunkedDecompressor::new_with_error_handler(decoded_archive, Box::new(error_handler))
1054                .unwrap();
1055        assert!(matches!(
1056            decompressor.update(&chunk_data, &mut |_chunk| {}),
1057            Err(ChunkedArchiveError::DecompressionError { .. })
1058        ));
1059    }
1060
1061    #[test]
1062    fn test_decompressor_zstd_data_corruption() {
1063        let data = vec![0; 3_000_000];
1064        let mut compressed: Vec<u8> = vec![];
1065        let archive = match ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS) {
1066            Ok(a) => a,
1067            Err(e) => {
1068                panic!("Failed to compress in test: {:?}", e);
1069            }
1070        };
1071        archive.write(&mut compressed).expect("write archive");
1072        let (decoded_archive, chunk_data) =
1073            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1074
1075        let mut corrupt_data = chunk_data.to_vec();
1076        if corrupt_data.len() > 100 {
1077            corrupt_data[100] = !corrupt_data[100];
1078        }
1079
1080        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1081        let result = decompressor.update(&corrupt_data, &mut |_chunk| {});
1082        assert!(matches!(result, Err(ChunkedArchiveError::DecompressionError { .. })));
1083    }
1084
1085    #[test]
1086    fn test_v3_zstd_roundtrip() {
1087        let data = vec![0; 3_000_000];
1088        let options =
1089            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
1090        let mut compressed = vec![];
1091        ChunkedArchive::new(&data, options)
1092            .expect("compress")
1093            .write(&mut compressed)
1094            .expect("write");
1095
1096        // Verify header.
1097        let (header, _) =
1098            Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1099        assert_eq!(header.version.get(), 3);
1100        assert_eq!(header.compression_algorithm, CompressionAlgorithm::Zstd as u8);
1101
1102        let (decoded_archive, chunk_data) =
1103            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1104
1105        // Decompress.
1106        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1107        let mut decompressed: Vec<u8> = vec![];
1108        let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1109        decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1110
1111        assert_eq!(decompressed, data);
1112    }
1113
1114    #[test]
1115    fn test_v3_lz4_roundtrip() {
1116        let data = vec![0; 3_000_000];
1117        let options =
1118            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1119        let mut compressed = vec![];
1120        ChunkedArchive::new(&data, options)
1121            .expect("compress")
1122            .write(&mut compressed)
1123            .expect("write");
1124
1125        // Verify header.
1126        let (header, _) =
1127            Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1128        assert_eq!(header.version.get(), 3);
1129        assert_eq!(header.compression_algorithm, CompressionAlgorithm::Lz4 as u8);
1130
1131        let (decoded_archive, chunk_data) =
1132            decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1133
1134        // Decompress.
1135        let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1136        let mut decompressed: Vec<u8> = vec![];
1137        let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1138        decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1139
1140        assert_eq!(decompressed, data);
1141    }
1142}