1use itertools::Itertools;
10use rayon::prelude::*;
11use std::ops::Range;
12use thiserror::Error;
13use zerocopy::byteorder::{LE, U16, U32, U64};
14use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, Unaligned};
15
16mod compression_algorithm;
17pub use compression_algorithm::{
18 CompressionAlgorithm, Compressor, Decompressor, ThreadLocalCompressor, ThreadLocalDecompressor,
19};
20
21#[derive(Copy, Clone, Eq, PartialEq)]
24pub struct ZstdError(pub usize);
25
26impl std::fmt::Display for ZstdError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 let msg = zstd::zstd_safe::get_error_name(self.0);
29 let enum_code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(self.0) };
30 write!(f, "{:?} ({})", enum_code, msg)
31 }
32}
33
34impl std::fmt::Debug for ZstdError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 std::fmt::Display::fmt(self, f)
37 }
38}
39
40#[cfg(target_os = "fuchsia")]
41impl From<ZstdError> for zx::Status {
42 fn from(err: ZstdError) -> Self {
43 use zstd::zstd_safe::zstd_sys::ZSTD_ErrorCode::*;
44 let code = unsafe { zstd::zstd_safe::zstd_sys::ZSTD_getErrorCode(err.0) };
45 match code {
46 ZSTD_error_corruption_detected
47 | ZSTD_error_checksum_wrong
48 | ZSTD_error_literals_headerWrong
49 | ZSTD_error_dictionary_corrupted
50 | ZSTD_error_prefix_unknown => zx::Status::IO_DATA_INTEGRITY,
51
52 ZSTD_error_version_unsupported
53 | ZSTD_error_frameParameter_unsupported
54 | ZSTD_error_parameter_unsupported => zx::Status::NOT_SUPPORTED,
55
56 ZSTD_error_parameter_outOfBound
57 | ZSTD_error_srcSize_wrong
58 | ZSTD_error_dstSize_tooSmall => zx::Status::INVALID_ARGS,
59
60 ZSTD_error_no_error
61 | ZSTD_error_GENERIC
62 | ZSTD_error_frameParameter_windowTooLarge
63 | ZSTD_error_dictionary_wrong
64 | ZSTD_error_dictionaryCreation_failed
65 | ZSTD_error_parameter_combination_unsupported
66 | ZSTD_error_tableLog_tooLarge
67 | ZSTD_error_maxSymbolValue_tooLarge
68 | ZSTD_error_maxSymbolValue_tooSmall
69 | ZSTD_error_stabilityCondition_notRespected
70 | ZSTD_error_stage_wrong
71 | ZSTD_error_init_missing
72 | ZSTD_error_memory_allocation
73 | ZSTD_error_workSpace_tooSmall
74 | ZSTD_error_dstBuffer_null
75 | ZSTD_error_noForwardProgress_destFull
76 | ZSTD_error_noForwardProgress_inputEmpty
77 | ZSTD_error_frameIndex_tooLarge
78 | ZSTD_error_seekableIO
79 | ZSTD_error_dstBuffer_wrong
80 | ZSTD_error_srcBuffer_wrong
81 | ZSTD_error_sequenceProducer_failed
82 | ZSTD_error_externalSequences_invalid
83 | ZSTD_error_maxCode => zx::Status::INTERNAL,
84 }
85 }
86}
87
88#[derive(Debug, Error)]
89pub enum FormatError {
90 #[error("Zstd error: {0}")]
91 Zstd(ZstdError),
92 #[error("LZ4 error: {0}")]
93 Lz4(lz4::Error),
94}
95
96#[cfg(target_os = "fuchsia")]
97impl From<&FormatError> for zx::Status {
98 fn from(err: &FormatError) -> Self {
99 match err {
100 FormatError::Zstd(e) => zx::Status::from(*e),
101 FormatError::Lz4(_) => zx::Status::IO_DATA_INTEGRITY,
102 }
103 }
104}
105
106#[derive(Debug, Error)]
110pub enum ChunkedArchiveError {
111 #[error("Invalid or unsupported archive version.")]
112 InvalidVersion,
113
114 #[error("Archive header has incorrect magic.")]
115 BadMagic,
116
117 #[error("Integrity checks failed (e.g. incorrect CRC, inconsistent header fields).")]
118 IntegrityError,
119
120 #[error("Value is out of range or cannot be represented in specified type.")]
121 OutOfRange,
122
123 #[error("Error decompressing chunk {index}: {error}")]
124 DecompressionError { index: usize, error: FormatError },
125
126 #[error("Error compressing chunk {index}: {error}")]
127 CompressionError { index: usize, error: FormatError },
128}
129
130#[derive(Copy, Clone, Debug, Eq, PartialEq)]
132pub enum ChunkedArchiveOptions {
133 V2 {
135 minimum_chunk_size: usize,
140 chunk_alignment: usize,
142 compression_level: i32,
144 },
145 V3 {
147 compression_algorithm: CompressionAlgorithm,
149 },
150}
151
152impl ChunkedArchiveOptions {
153 const V2_VERSION: u16 = 2;
154 const V2_MAX_CHUNKS: usize = 1023;
155
156 const V3_VERSION: u16 = 3;
157 const V3_MAX_CHUNKS: usize = u32::MAX as usize;
158 const V3_CHUNK_SIZE: usize = 32 * 1024;
159 const V3_ZSTD_COMPRESSION_LEVEL: i32 = 22;
160
161 fn version(&self) -> u16 {
163 match self {
164 Self::V2 { .. } => Self::V2_VERSION,
165 Self::V3 { .. } => Self::V3_VERSION,
166 }
167 }
168
169 fn compression_algorithm(&self) -> CompressionAlgorithm {
171 match self {
172 Self::V2 { .. } => CompressionAlgorithm::Zstd,
173 Self::V3 { compression_algorithm } => *compression_algorithm,
174 }
175 }
176
177 fn chunk_size_for(&self, data_size: usize) -> usize {
179 match self {
180 Self::V2 { chunk_alignment, minimum_chunk_size: target_chunk_size, .. } => {
181 if data_size <= (Self::V2_MAX_CHUNKS * target_chunk_size) {
182 *target_chunk_size
183 } else {
184 let chunk_size = data_size.div_ceil(Self::V2_MAX_CHUNKS);
185 chunk_size.checked_next_multiple_of(*chunk_alignment).unwrap()
186 }
187 }
188 Self::V3 { .. } => {
189 assert!(
190 data_size.div_ceil(Self::V3_CHUNK_SIZE) <= Self::V3_MAX_CHUNKS,
191 "Chunked-compression V3 only supports data up to ~140TB"
192 );
193 Self::V3_CHUNK_SIZE
194 }
195 }
196 }
197
198 pub fn compressor(&self) -> Compressor {
200 match self {
201 Self::V2 { compression_level, .. } => {
202 let mut cctx = zstd::zstd_safe::CCtx::create();
203 cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
204 *compression_level,
205 ))
206 .expect("setting the compression level should never fail");
207 Compressor::Zstd(cctx)
208 }
209 Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
210 let mut cctx = zstd::zstd_safe::CCtx::create();
211 cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
212 Self::V3_ZSTD_COMPRESSION_LEVEL,
213 ))
214 .expect("setting the compression level should never fail");
215 Compressor::Zstd(cctx)
216 }
217 Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
218 Compressor::Lz4 { compression_level: lz4::HcCompressionLevel::custom(12) }
219 }
220 }
221 }
222
223 pub fn thread_local_compressor(&self) -> ThreadLocalCompressor {
226 match self {
227 Self::V2 { compression_level, .. } => {
228 ThreadLocalCompressor::Zstd { compression_level: *compression_level }
229 }
230 Self::V3 { compression_algorithm: CompressionAlgorithm::Zstd } => {
231 ThreadLocalCompressor::Zstd { compression_level: Self::V3_ZSTD_COMPRESSION_LEVEL }
232 }
233 Self::V3 { compression_algorithm: CompressionAlgorithm::Lz4 } => {
234 ThreadLocalCompressor::Lz4 {
235 compression_level: lz4::HcCompressionLevel::custom(12),
236 }
237 }
238 }
239 }
240
241 fn is_valid_version(version: u16) -> bool {
243 match version {
244 Self::V2_VERSION => true,
245 Self::V3_VERSION => true,
246 _ => false,
247 }
248 }
249
250 fn max_chunks_for_version(version: u16) -> Result<usize, ChunkedArchiveError> {
253 match version {
254 Self::V2_VERSION => Ok(Self::V2_MAX_CHUNKS),
255 Self::V3_VERSION => Ok(Self::V3_MAX_CHUNKS),
256 _ => Err(ChunkedArchiveError::InvalidVersion),
257 }
258 }
259}
260
261#[derive(Clone, Debug, Eq, PartialEq)]
264pub struct ChunkInfo {
265 pub decompressed_range: Range<usize>,
266 pub compressed_range: Range<usize>,
267}
268
269impl ChunkInfo {
270 fn from_entry(
271 entry: &SeekTableEntry,
272 header_length: usize,
273 ) -> Result<Self, ChunkedArchiveError> {
274 let decompressed_start = entry.decompressed_offset.get() as usize;
275 let decompressed_size = entry.decompressed_size.get() as usize;
276 let decompressed_range = decompressed_start
277 ..decompressed_start
278 .checked_add(decompressed_size)
279 .ok_or(ChunkedArchiveError::OutOfRange)?;
280
281 let compressed_offset = entry.compressed_offset.get() as usize;
282 let compressed_start = compressed_offset
283 .checked_sub(header_length)
284 .ok_or(ChunkedArchiveError::IntegrityError)?;
285 let compressed_size = entry.compressed_size.get() as usize;
286 let compressed_range = compressed_start
287 ..compressed_start
288 .checked_add(compressed_size)
289 .ok_or(ChunkedArchiveError::OutOfRange)?;
290
291 Ok(Self { decompressed_range, compressed_range })
292 }
293}
294
295#[derive(Debug)]
297pub struct DecodedArchive {
298 compression_algorithm: CompressionAlgorithm,
299 seek_table: Vec<ChunkInfo>,
300}
301
302impl DecodedArchive {
303 pub fn decompressed_size(&self) -> usize {
305 self.seek_table.last().map_or(0, |entry| entry.decompressed_range.end)
306 }
307}
308
309pub fn decode_archive(
313 data: &[u8],
314 archive_length: usize,
315) -> Result<Option<(DecodedArchive, &[u8])>, ChunkedArchiveError> {
316 match Ref::<_, ChunkedArchiveHeader>::from_prefix(data).map_err(Into::into) {
317 Ok((header, data)) => header.decode_archive(data, archive_length as u64),
318 Err(zerocopy::SizeError { .. }) => Ok(None), }
320}
321
322#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
324#[repr(C)]
325struct ChunkedArchiveHeader {
326 magic: [u8; 8],
327 version: U16<LE>,
328 compression_algorithm: u8,
332 reserved_0: u8,
333 num_entries: U32<LE>,
334 checksum: U32<LE>,
335 reserved_1: U32<LE>,
336 reserved_2: U64<LE>,
337}
338
339#[derive(IntoBytes, KnownLayout, FromBytes, Immutable, Unaligned, Clone, Copy, Debug)]
341#[repr(C)]
342struct SeekTableEntry {
343 decompressed_offset: U64<LE>,
344 decompressed_size: U64<LE>,
345 compressed_offset: U64<LE>,
346 compressed_size: U64<LE>,
347}
348
349impl ChunkedArchiveHeader {
350 const CHUNKED_ARCHIVE_MAGIC: [u8; 8] = [0x46, 0x9b, 0x78, 0xef, 0x0f, 0xd0, 0xb2, 0x03];
351 const CHUNKED_ARCHIVE_CHECKSUM_OFFSET: usize = 16;
352
353 fn new(
354 seek_table: &[SeekTableEntry],
355 options: ChunkedArchiveOptions,
356 ) -> Result<Self, ChunkedArchiveError> {
357 let header: ChunkedArchiveHeader = Self {
358 magic: Self::CHUNKED_ARCHIVE_MAGIC,
359 version: options.version().into(),
360 compression_algorithm: options.compression_algorithm().into(),
361 reserved_0: 0.into(),
362 num_entries: TryInto::<u32>::try_into(seek_table.len())
363 .or(Err(ChunkedArchiveError::OutOfRange))?
364 .into(),
365 checksum: 0.into(), reserved_1: 0.into(),
367 reserved_2: 0.into(),
368 };
369 Ok(Self { checksum: header.checksum(seek_table).into(), ..header })
370 }
371
372 fn checksum(&self, entries: &[SeekTableEntry]) -> u32 {
374 let crc_algo = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
375 let mut digest = crc_algo.digest();
376 digest.update(&self.as_bytes()[..Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET]);
377 digest.update(
378 &self.as_bytes()
379 [Self::CHUNKED_ARCHIVE_CHECKSUM_OFFSET + self.checksum.as_bytes().len()..],
380 );
381 digest.update(entries.as_bytes());
382 digest.finalize()
383 }
384
385 fn header_length(num_entries: usize) -> usize {
387 std::mem::size_of::<ChunkedArchiveHeader>()
388 + (std::mem::size_of::<SeekTableEntry>() * num_entries)
389 }
390
391 fn decode_archive(
393 self,
394 data: &[u8],
395 archive_length: u64,
396 ) -> Result<Option<(DecodedArchive, &[u8])>, ChunkedArchiveError> {
397 let num_entries = self.num_entries.get() as usize;
399 let Ok((entries, chunk_data)) =
400 Ref::<_, [SeekTableEntry]>::from_prefix_with_elems(data, num_entries)
401 else {
402 return Ok(None);
403 };
404 let entries: &[SeekTableEntry] = Ref::into_ref(entries);
405
406 if self.magic != Self::CHUNKED_ARCHIVE_MAGIC {
408 return Err(ChunkedArchiveError::BadMagic);
409 }
410 let version = self.version.get();
411 if !ChunkedArchiveOptions::is_valid_version(version) {
412 return Err(ChunkedArchiveError::InvalidVersion);
413 }
414 if self.checksum.get() != self.checksum(entries) {
415 return Err(ChunkedArchiveError::IntegrityError);
416 }
417 if entries.len() > ChunkedArchiveOptions::max_chunks_for_version(version)? {
418 return Err(ChunkedArchiveError::IntegrityError);
419 }
420 let compression_algorithm = CompressionAlgorithm::try_from(self.compression_algorithm)?;
421
422 if !entries.is_empty() && entries[0].decompressed_offset.get() != 0 {
426 return Err(ChunkedArchiveError::IntegrityError);
427 }
428
429 let header_length = Self::header_length(entries.len());
431 if entries.iter().any(|entry| entry.compressed_offset.get() < header_length as u64) {
432 return Err(ChunkedArchiveError::IntegrityError);
433 }
434
435 for (prev, curr) in entries.iter().tuple_windows() {
438 if (prev.decompressed_offset.get() + prev.decompressed_size.get())
439 != curr.decompressed_offset.get()
440 {
441 return Err(ChunkedArchiveError::IntegrityError);
442 }
443 }
444
445 for (prev, curr) in entries.iter().tuple_windows() {
448 if (prev.compressed_offset.get() + prev.compressed_size.get())
449 > curr.compressed_offset.get()
450 {
451 return Err(ChunkedArchiveError::IntegrityError);
452 }
453 }
454
455 for entry in entries.iter() {
457 if entry.decompressed_size.get() == 0 || entry.compressed_size.get() == 0 {
458 return Err(ChunkedArchiveError::IntegrityError);
459 }
460 }
461
462 for entry in entries.iter() {
464 let compressed_end = entry.compressed_offset.get() + entry.compressed_size.get();
465 if compressed_end > archive_length {
466 return Err(ChunkedArchiveError::IntegrityError);
467 }
468 }
469
470 let seek_table = entries
471 .iter()
472 .map(|entry| ChunkInfo::from_entry(entry, header_length))
473 .try_collect()?;
474 Ok(Some((DecodedArchive { seek_table, compression_algorithm }, chunk_data)))
475 }
476}
477
478pub struct CompressedChunk {
480 pub compressed_data: Vec<u8>,
482 pub decompressed_size: usize,
484}
485
486pub struct ChunkedArchive {
488 chunks: Vec<CompressedChunk>,
491 chunk_size: usize,
494 options: ChunkedArchiveOptions,
496}
497
498impl ChunkedArchive {
499 pub fn new(data: &[u8], options: ChunkedArchiveOptions) -> Result<Self, ChunkedArchiveError> {
503 let chunk_size = options.chunk_size_for(data.len());
504 let mut chunks: Vec<Result<CompressedChunk, ChunkedArchiveError>> = vec![];
505 let compressor = options.thread_local_compressor();
506 data.par_chunks(chunk_size)
507 .enumerate()
508 .map(|(index, chunk)| {
509 let compressed_data = compressor.compress(chunk, index)?;
510 Ok(CompressedChunk { compressed_data, decompressed_size: chunk.len() })
511 })
512 .collect_into_vec(&mut chunks);
513 let chunks: Vec<_> = chunks.into_iter().try_collect()?;
514 Ok(ChunkedArchive { chunks, chunk_size, options })
515 }
516
517 pub fn chunks(&self) -> &Vec<CompressedChunk> {
519 &self.chunks
520 }
521
522 pub fn chunk_size(&self) -> usize {
526 self.chunk_size
527 }
528
529 pub fn compressed_data_size(&self) -> usize {
531 self.chunks.iter().map(|chunk| chunk.compressed_data.len()).sum()
532 }
533
534 pub fn serialized_size(&self) -> usize {
536 ChunkedArchiveHeader::header_length(self.chunks.len()) + self.compressed_data_size()
537 }
538
539 pub fn write(self, mut writer: impl std::io::Write) -> Result<(), std::io::Error> {
541 let seek_table = self.make_seek_table();
542 let header = ChunkedArchiveHeader::new(&seek_table, self.options).unwrap();
543 writer.write_all(header.as_bytes())?;
544 writer.write_all(seek_table.as_slice().as_bytes())?;
545 for chunk in self.chunks {
546 writer.write_all(&chunk.compressed_data)?;
547 }
548 Ok(())
549 }
550
551 fn make_seek_table(&self) -> Vec<SeekTableEntry> {
553 let header_length = ChunkedArchiveHeader::header_length(self.chunks.len());
554 let mut seek_table = vec![];
555 seek_table.reserve(self.chunks.len());
556 let mut compressed_size: usize = 0;
557 let mut decompressed_offset: usize = 0;
558 for chunk in &self.chunks {
559 seek_table.push(SeekTableEntry {
560 decompressed_offset: (decompressed_offset as u64).into(),
561 decompressed_size: (chunk.decompressed_size as u64).into(),
562 compressed_offset: ((header_length + compressed_size) as u64).into(),
563 compressed_size: (chunk.compressed_data.len() as u64).into(),
564 });
565 compressed_size += chunk.compressed_data.len();
566 decompressed_offset += chunk.decompressed_size;
567 }
568 seek_table
569 }
570}
571
572pub struct ChunkedDecompressor {
587 seek_table: Vec<ChunkInfo>,
588 buffer: Vec<u8>,
589 data_written: usize,
590 curr_chunk: usize,
591 total_compressed_size: usize,
592 decompressor: Decompressor,
593 decompressed_buffer: Vec<u8>,
594 error_handler: Option<ErrorHandler>,
595}
596
597type ErrorHandler = Box<dyn Fn(usize, ChunkInfo, &[u8]) -> () + Send + 'static>;
598
599impl ChunkedDecompressor {
600 pub fn new(decoded_archive: DecodedArchive) -> Result<Self, ChunkedArchiveError> {
602 let DecodedArchive { compression_algorithm, seek_table } = decoded_archive;
603 let total_compressed_size =
604 seek_table.last().map_or(0, |last_chunk| last_chunk.compressed_range.end);
605 let decompressed_buffer =
606 vec![0u8; seek_table.first().map_or(0, |c| c.decompressed_range.len())];
607 Ok(Self {
608 seek_table,
609 buffer: vec![],
610 data_written: 0,
611 curr_chunk: 0,
612 total_compressed_size,
613 decompressor: compression_algorithm.decompressor(),
614 decompressed_buffer,
615 error_handler: None,
616 })
617 }
618
619 pub fn new_with_error_handler(
622 decoded_archive: DecodedArchive,
623 error_handler: ErrorHandler,
624 ) -> Result<Self, ChunkedArchiveError> {
625 Ok(Self { error_handler: Some(error_handler), ..Self::new(decoded_archive)? })
626 }
627
628 pub fn seek_table(&self) -> &Vec<ChunkInfo> {
629 &self.seek_table
630 }
631
632 fn finish_chunk(
633 &mut self,
634 data: &[u8],
635 chunk_callback: &mut impl FnMut(&[u8]) -> (),
636 ) -> Result<(), ChunkedArchiveError> {
637 debug_assert_eq!(data.len(), self.seek_table[self.curr_chunk].compressed_range.len());
638 let chunk = &self.seek_table[self.curr_chunk];
639 let decompressed_size = self
640 .decompressor
641 .decompress_into(data, self.decompressed_buffer.as_mut_slice(), self.curr_chunk)
642 .inspect_err(|_| {
643 if let Some(error_handler) = &self.error_handler {
644 error_handler(self.curr_chunk, chunk.clone(), data.as_bytes());
645 }
646 })?;
647 if decompressed_size != chunk.decompressed_range.len() {
648 return Err(ChunkedArchiveError::IntegrityError);
649 }
650 chunk_callback(&self.decompressed_buffer[..decompressed_size]);
651 self.curr_chunk += 1;
652 Ok(())
653 }
654
655 pub fn update(
657 &mut self,
658 mut data: &[u8],
659 chunk_callback: &mut impl FnMut(&[u8]) -> (),
660 ) -> Result<(), ChunkedArchiveError> {
661 if self.data_written + data.len() > self.total_compressed_size {
663 return Err(ChunkedArchiveError::OutOfRange);
664 }
665 self.data_written += data.len();
666
667 if !self.buffer.is_empty() {
669 let to_read = std::cmp::min(
670 data.len(),
671 self.seek_table[self.curr_chunk]
672 .compressed_range
673 .len()
674 .checked_sub(self.buffer.len())
675 .unwrap(),
676 );
677 self.buffer.extend_from_slice(&data[..to_read]);
678 if self.buffer.len() == self.seek_table[self.curr_chunk].compressed_range.len() {
679 let full_chunk = std::mem::take(&mut self.buffer);
683 self.finish_chunk(&full_chunk[..], chunk_callback)?;
684 self.buffer = full_chunk;
685 self.buffer.drain(..);
687 }
688 data = &data[to_read..];
689 }
690
691 while !data.is_empty()
693 && self.curr_chunk < self.seek_table.len()
694 && self.seek_table[self.curr_chunk].compressed_range.len() <= data.len()
695 {
696 let len = self.seek_table[self.curr_chunk].compressed_range.len();
697 self.finish_chunk(&data[..len], chunk_callback)?;
698 data = &data[len..];
699 }
700
701 if !data.is_empty() {
703 debug_assert!(self.curr_chunk < self.seek_table.len());
704 debug_assert!(self.data_written < self.total_compressed_size);
705 self.buffer.extend_from_slice(data);
706 }
707
708 debug_assert!(
709 self.data_written < self.total_compressed_size
710 || self.curr_chunk == self.seek_table.len()
711 );
712
713 Ok(())
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use crate::Type1Blob;
720
721 use super::*;
722 use rand::Rng;
723 use std::matches;
724
725 #[test]
728 fn compress_simple() {
729 let data: Vec<u8> = vec![0; 32 * 1024 * 16];
730 let archive = ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS).unwrap();
731 let mut compressed: Vec<u8> = vec![];
733 archive.write(&mut compressed).unwrap();
734 assert!(compressed.len() <= data.len());
735 assert!(decode_archive(&compressed, compressed.len()).unwrap().is_some());
737 }
738
739 fn generate_archive(
741 num_entries: usize,
742 options: ChunkedArchiveOptions,
743 ) -> (ChunkedArchiveHeader, Vec<SeekTableEntry>, u64) {
744 let mut seek_table = Vec::with_capacity(num_entries);
745 let header_length = ChunkedArchiveHeader::header_length(num_entries) as u64;
746 const COMPRESSED_CHUNK_SIZE: u64 = 1024;
747 const DECOMPRESSED_CHUNK_SIZE: u64 = 2048;
748 for n in 0..(num_entries as u64) {
749 seek_table.push(SeekTableEntry {
750 compressed_offset: (header_length + (n * COMPRESSED_CHUNK_SIZE)).into(),
751 compressed_size: COMPRESSED_CHUNK_SIZE.into(),
752 decompressed_offset: (n * DECOMPRESSED_CHUNK_SIZE).into(),
753 decompressed_size: DECOMPRESSED_CHUNK_SIZE.into(),
754 });
755 }
756 let header = ChunkedArchiveHeader::new(&seek_table, options).unwrap();
757 let archive_length: u64 = header_length + (num_entries as u64 * COMPRESSED_CHUNK_SIZE);
758 (header, seek_table, archive_length)
759 }
760
761 #[test]
762 fn should_validate_self() {
763 let (header, seek_table, archive_length) =
764 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
765 let serialized_table = seek_table.as_slice().as_bytes();
766 assert!(header.decode_archive(serialized_table, archive_length).unwrap().is_some());
767 }
768
769 #[test]
770 fn should_validate_empty() {
771 let (header, _, archive_length) = generate_archive(0, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
772 assert!(header.decode_archive(&[], archive_length).unwrap().is_some());
773 }
774
775 #[test]
776 fn should_detect_bad_magic() {
777 let (header, seek_table, archive_length) =
778 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
779 let mut corrupt_magic = ChunkedArchiveHeader::CHUNKED_ARCHIVE_MAGIC;
780 corrupt_magic[0] = !corrupt_magic[0];
781 let bad_magic = ChunkedArchiveHeader { magic: corrupt_magic, ..header };
782 let serialized_table = seek_table.as_slice().as_bytes();
783 assert!(matches!(
784 bad_magic.decode_archive(serialized_table, archive_length).unwrap_err(),
785 ChunkedArchiveError::BadMagic
786 ));
787 }
788 #[test]
789 fn should_detect_wrong_version() {
790 let (header, seek_table, archive_length) =
791 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
792 let invalid_version = ChunkedArchiveHeader { version: u16::MAX.into(), ..header };
793 let serialized_table = seek_table.as_slice().as_bytes();
794 assert!(matches!(
795 invalid_version.decode_archive(serialized_table, archive_length).unwrap_err(),
796 ChunkedArchiveError::InvalidVersion
797 ));
798 }
799
800 #[test]
801 fn should_detect_corrupt_checksum() {
802 let (header, seek_table, archive_length) =
803 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
804 let corrupt_checksum =
805 ChunkedArchiveHeader { checksum: (!header.checksum.get()).into(), ..header };
806 let serialized_table = seek_table.as_slice().as_bytes();
807 assert!(matches!(
808 corrupt_checksum.decode_archive(serialized_table, archive_length).unwrap_err(),
809 ChunkedArchiveError::IntegrityError
810 ));
811 }
812
813 #[test]
814 fn should_reject_too_many_entries_v2() {
815 let (too_many_entries, seek_table, archive_length) = generate_archive(
816 ChunkedArchiveOptions::V2_MAX_CHUNKS + 1,
817 Type1Blob::CHUNKED_ARCHIVE_OPTIONS,
818 );
819
820 let serialized_table = seek_table.as_slice().as_bytes();
821 assert!(matches!(
822 too_many_entries.decode_archive(serialized_table, archive_length).unwrap_err(),
823 ChunkedArchiveError::IntegrityError
824 ));
825 }
826
827 #[test]
828 fn invariant_i0_first_entry_zero() {
829 let (header, mut seek_table, archive_length) =
830 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
831 assert_eq!(seek_table[0].decompressed_offset.get(), 0);
832 seek_table[0].decompressed_offset = 1.into();
833
834 let serialized_table = seek_table.as_slice().as_bytes();
835 assert!(matches!(
836 header.decode_archive(serialized_table, archive_length).unwrap_err(),
837 ChunkedArchiveError::IntegrityError
838 ));
839 }
840
841 #[test]
842 fn invariant_i1_no_header_overlap() {
843 let (header, mut seek_table, archive_length) =
844 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
845 let header_end = ChunkedArchiveHeader::header_length(seek_table.len()) as u64;
846 assert!(seek_table[0].compressed_offset.get() >= header_end);
847 seek_table[0].compressed_offset = (header_end - 1).into();
848 let serialized_table = seek_table.as_slice().as_bytes();
849 assert!(matches!(
850 header.decode_archive(serialized_table, archive_length).unwrap_err(),
851 ChunkedArchiveError::IntegrityError
852 ));
853 }
854
855 #[test]
856 fn invariant_i2_decompressed_monotonic() {
857 let (header, mut seek_table, archive_length) =
858 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
859 assert_eq!(
860 seek_table[0].decompressed_offset.get() + seek_table[0].decompressed_size.get(),
861 seek_table[1].decompressed_offset.get()
862 );
863 seek_table[1].decompressed_offset = (seek_table[1].decompressed_offset.get() - 1).into();
864 let serialized_table = seek_table.as_slice().as_bytes();
865 assert!(matches!(
866 header.decode_archive(serialized_table, archive_length).unwrap_err(),
867 ChunkedArchiveError::IntegrityError
868 ));
869 }
870
871 #[test]
872 fn invariant_i3_compressed_monotonic() {
873 let (header, mut seek_table, archive_length) =
874 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
875 assert!(
876 (seek_table[0].compressed_offset.get() + seek_table[0].compressed_size.get())
877 <= seek_table[1].compressed_offset.get()
878 );
879 seek_table[1].compressed_offset = (seek_table[1].compressed_offset.get() - 1).into();
880 let serialized_table = seek_table.as_slice().as_bytes();
881 assert!(matches!(
882 header.decode_archive(serialized_table, archive_length).unwrap_err(),
883 ChunkedArchiveError::IntegrityError
884 ));
885 }
886
887 #[test]
888 fn invariant_i4_nonzero_compressed_size() {
889 let (header, mut seek_table, archive_length) =
890 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
891 assert!(seek_table[0].compressed_size.get() > 0);
892 seek_table[0].compressed_size = 0.into();
893 let serialized_table = seek_table.as_slice().as_bytes();
894 assert!(matches!(
895 header.decode_archive(serialized_table, archive_length).unwrap_err(),
896 ChunkedArchiveError::IntegrityError
897 ));
898 }
899
900 #[test]
901 fn invariant_i4_nonzero_decompressed_size() {
902 let (header, mut seek_table, archive_length) =
903 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
904 assert!(seek_table[0].decompressed_size.get() > 0);
905 seek_table[0].decompressed_size = 0.into();
906 let serialized_table = seek_table.as_slice().as_bytes();
907 assert!(matches!(
908 header.decode_archive(serialized_table, archive_length).unwrap_err(),
909 ChunkedArchiveError::IntegrityError
910 ));
911 }
912
913 #[test]
914 fn invariant_i5_within_archive() {
915 let (header, mut seek_table, archive_length) =
916 generate_archive(4, Type1Blob::CHUNKED_ARCHIVE_OPTIONS);
917 let last_entry = seek_table.last_mut().unwrap();
918 assert!(
919 (last_entry.compressed_offset.get() + last_entry.compressed_size.get())
920 <= archive_length
921 );
922 last_entry.compressed_offset = (archive_length + 1).into();
923 let serialized_table = seek_table.as_slice().as_bytes();
924 assert!(matches!(
925 header.decode_archive(serialized_table, archive_length).unwrap_err(),
926 ChunkedArchiveError::IntegrityError
927 ));
928 }
929
930 #[test]
931 fn max_chunks() {
932 let ChunkedArchiveOptions::V2 { minimum_chunk_size, chunk_alignment, .. } =
933 Type1Blob::CHUNKED_ARCHIVE_OPTIONS
934 else {
935 panic!()
936 };
937 assert_eq!(
938 Type1Blob::CHUNKED_ARCHIVE_OPTIONS
939 .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS),
940 minimum_chunk_size
941 );
942 assert_eq!(
943 Type1Blob::CHUNKED_ARCHIVE_OPTIONS
944 .chunk_size_for(minimum_chunk_size * ChunkedArchiveOptions::V2_MAX_CHUNKS + 1),
945 minimum_chunk_size + chunk_alignment
946 );
947 }
948
949 #[test]
950 fn test_decompressor_empty_archive() {
951 let mut compressed: Vec<u8> = vec![];
952 ChunkedArchive::new(&[], Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
953 .expect("compress")
954 .write(&mut compressed)
955 .expect("write archive");
956 let (decoded_archive, chunk_data) =
957 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
958 assert!(decoded_archive.seek_table.is_empty());
959 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
960 let mut chunk_callback = |_chunk: &[u8]| panic!("Archive doesn't have any chunks.");
961 chunk_data
963 .chunks(4)
964 .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
965 }
966
967 #[test]
968 fn test_decompressor() {
969 const UNCOMPRESSED_LENGTH: usize = 3_000_000;
970 let data: Vec<u8> = {
971 let range = rand::distr::Uniform::<u8>::new_inclusive(0, 255).unwrap();
972 rand::rng().sample_iter(&range).take(UNCOMPRESSED_LENGTH).collect()
973 };
974 let mut compressed: Vec<u8> = vec![];
975 ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
976 .expect("compress")
977 .write(&mut compressed)
978 .expect("write archive");
979 let (decoded_archive, chunk_data) =
980 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
981
982 let num_chunks = decoded_archive.seek_table.len();
984 assert!(num_chunks > 1);
985
986 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
987
988 let mut decoded_chunks: usize = 0;
989 let mut decompressed_offset: usize = 0;
990 let mut chunk_callback = |decompressed_chunk: &[u8]| {
991 assert!(
992 decompressed_chunk
993 == &data[decompressed_offset..decompressed_offset + decompressed_chunk.len()]
994 );
995 decompressed_offset += decompressed_chunk.len();
996 decoded_chunks += 1;
997 };
998
999 chunk_data
1001 .chunks(4)
1002 .for_each(|data| decompressor.update(data, &mut chunk_callback).unwrap());
1003 assert_eq!(decoded_chunks, num_chunks);
1004 }
1005
1006 #[test]
1007 fn test_decompressor_corrupt_decompressed_size() {
1008 let data = vec![0; 3_000_000];
1009 let mut compressed: Vec<u8> = vec![];
1010 ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1011 .expect("compress")
1012 .write(&mut compressed)
1013 .expect("write archive");
1014 let (mut decoded_archive, chunk_data) =
1015 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1016
1017 decoded_archive.seek_table[0].decompressed_range =
1019 decoded_archive.seek_table[0].decompressed_range.start
1020 ..decoded_archive.seek_table[0].decompressed_range.end + 1;
1021
1022 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1023 assert!(matches!(
1024 decompressor.update(&chunk_data, &mut |_chunk| {}),
1025 Err(ChunkedArchiveError::IntegrityError)
1026 ));
1027 }
1028
1029 #[test]
1030 fn test_decompressor_corrupt_compressed_size() {
1031 let data = vec![0; 3_000_000];
1032 let mut compressed: Vec<u8> = vec![];
1033 ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS)
1034 .expect("compress")
1035 .write(&mut compressed)
1036 .expect("write archive");
1037 let (mut decoded_archive, chunk_data) =
1038 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1039
1040 decoded_archive.seek_table[0].compressed_range =
1042 decoded_archive.seek_table[0].compressed_range.start
1043 ..decoded_archive.seek_table[0].compressed_range.end - 1;
1044 let first_chunk_info = decoded_archive.seek_table[0].clone();
1045 let error_handler = move |chunk_index: usize, chunk_info: ChunkInfo, chunk_data: &[u8]| {
1046 assert_eq!(chunk_index, 0);
1047 assert_eq!(chunk_info, first_chunk_info);
1048 assert_eq!(chunk_data.len(), chunk_info.compressed_range.len());
1049 };
1050
1051 let mut decompressor =
1052 ChunkedDecompressor::new_with_error_handler(decoded_archive, Box::new(error_handler))
1053 .unwrap();
1054 assert!(matches!(
1055 decompressor.update(&chunk_data, &mut |_chunk| {}),
1056 Err(ChunkedArchiveError::DecompressionError { .. })
1057 ));
1058 }
1059
1060 #[test]
1061 fn test_decompressor_zstd_data_corruption() {
1062 let data = vec![0; 3_000_000];
1063 let mut compressed: Vec<u8> = vec![];
1064 let archive = match ChunkedArchive::new(&data, Type1Blob::CHUNKED_ARCHIVE_OPTIONS) {
1065 Ok(a) => a,
1066 Err(e) => {
1067 panic!("Failed to compress in test: {:?}", e);
1068 }
1069 };
1070 archive.write(&mut compressed).expect("write archive");
1071 let (decoded_archive, chunk_data) =
1072 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1073
1074 let mut corrupt_data = chunk_data.to_vec();
1075 if corrupt_data.len() > 100 {
1076 corrupt_data[100] = !corrupt_data[100];
1077 }
1078
1079 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1080 let result = decompressor.update(&corrupt_data, &mut |_chunk| {});
1081 assert!(matches!(result, Err(ChunkedArchiveError::DecompressionError { .. })));
1082 }
1083
1084 #[test]
1085 fn test_v3_zstd_roundtrip() {
1086 let data = vec![0; 3_000_000];
1087 let options =
1088 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
1089 let mut compressed = vec![];
1090 ChunkedArchive::new(&data, options)
1091 .expect("compress")
1092 .write(&mut compressed)
1093 .expect("write");
1094
1095 let (header, _) =
1097 Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1098 assert_eq!(header.version.get(), 3);
1099 assert_eq!(header.compression_algorithm, CompressionAlgorithm::Zstd as u8);
1100
1101 let (decoded_archive, chunk_data) =
1102 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1103
1104 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1106 let mut decompressed: Vec<u8> = vec![];
1107 let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1108 decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1109
1110 assert_eq!(decompressed, data);
1111 }
1112
1113 #[test]
1114 fn test_v3_lz4_roundtrip() {
1115 let data = vec![0; 3_000_000];
1116 let options =
1117 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1118 let mut compressed = vec![];
1119 ChunkedArchive::new(&data, options)
1120 .expect("compress")
1121 .write(&mut compressed)
1122 .expect("write");
1123
1124 let (header, _) =
1126 Ref::<_, ChunkedArchiveHeader>::from_prefix(compressed.as_slice()).unwrap();
1127 assert_eq!(header.version.get(), 3);
1128 assert_eq!(header.compression_algorithm, CompressionAlgorithm::Lz4 as u8);
1129
1130 let (decoded_archive, chunk_data) =
1131 decode_archive(&compressed, compressed.len()).unwrap().unwrap();
1132
1133 let mut decompressor = ChunkedDecompressor::new(decoded_archive).unwrap();
1135 let mut decompressed: Vec<u8> = vec![];
1136 let mut chunk_callback = |chunk: &[u8]| decompressed.extend_from_slice(chunk);
1137 decompressor.update(chunk_data, &mut chunk_callback).unwrap();
1138
1139 assert_eq!(decompressed, data);
1140 }
1141}