delivery_blob/compression/
compression_algorithm.rs1use super::{FormatError, ZstdError};
12use crate::compression::ChunkedArchiveError;
13
14thread_local! {
15 static ZSTD_COMPRESSOR: std::cell::RefCell<zstd::zstd_safe::CCtx<'static>> =
16 std::cell::RefCell::new({
17 let mut cctx = zstd::zstd_safe::CCtx::create();
18 cctx.set_parameter(zstd::zstd_safe::CParameter::ChecksumFlag(true)).unwrap();
19 cctx
20 });
21 static ZSTD_DECOMPRESSOR: std::cell::RefCell<zstd::zstd_safe::DCtx<'static>> =
22 std::cell::RefCell::new(zstd::zstd_safe::DCtx::default());
23}
24
25#[derive(Copy, Clone, Debug, Eq, PartialEq)]
27#[repr(u8)]
28pub enum CompressionAlgorithm {
29 Zstd = 0,
30 Lz4 = 1,
31}
32
33impl CompressionAlgorithm {
34 pub fn decompressor(&self) -> Decompressor {
37 match self {
38 Self::Zstd => Decompressor::Zstd(zstd::zstd_safe::DCtx::default()),
39 Self::Lz4 => Decompressor::Lz4,
40 }
41 }
42
43 pub fn thread_local_decompressor(&self) -> ThreadLocalDecompressor {
48 match self {
49 Self::Zstd => ThreadLocalDecompressor::Zstd,
50 Self::Lz4 => ThreadLocalDecompressor::Lz4,
51 }
52 }
53}
54
55impl From<CompressionAlgorithm> for u8 {
56 fn from(value: CompressionAlgorithm) -> Self {
57 value as u8
58 }
59}
60
61impl TryFrom<u8> for CompressionAlgorithm {
62 type Error = ChunkedArchiveError;
63 fn try_from(value: u8) -> Result<Self, Self::Error> {
64 match value {
65 0 => Ok(CompressionAlgorithm::Zstd),
66 1 => Ok(CompressionAlgorithm::Lz4),
67 _ => Err(ChunkedArchiveError::IntegrityError),
68 }
69 }
70}
71
72pub enum Decompressor {
74 Zstd(zstd::zstd_safe::DCtx<'static>),
75 Lz4,
76}
77
78impl Decompressor {
79 pub fn decompress(
81 &mut self,
82 data: &[u8],
83 uncompressed_size: usize,
84 chunk_index: usize,
85 ) -> Result<Vec<u8>, ChunkedArchiveError> {
86 match self {
87 Self::Zstd(dctx) => {
88 let mut buffer = Vec::with_capacity(uncompressed_size);
89 match dctx.decompress(&mut buffer, data) {
90 Ok(_) => Ok(buffer),
91 Err(code) => Err(ChunkedArchiveError::DecompressionError {
92 index: chunk_index,
93 error: FormatError::Zstd(ZstdError(code)),
94 }),
95 }
96 }
97 Self::Lz4 => lz4::decompress(data, uncompressed_size).map_err(|_| {
98 ChunkedArchiveError::DecompressionError {
99 index: chunk_index,
100 error: FormatError::Lz4(lz4::Error::DecompressionFailed),
101 }
102 }),
103 }
104 }
105
106 pub fn decompress_into<'a>(
108 &mut self,
109 data: &[u8],
110 destination: &'a mut [u8],
111 chunk_index: usize,
112 ) -> Result<usize, ChunkedArchiveError> {
113 match self {
114 Self::Zstd(dctx) => match dctx.decompress(destination, data) {
115 Ok(size) => Ok(size),
116 Err(code) => Err(ChunkedArchiveError::DecompressionError {
117 index: chunk_index,
118 error: FormatError::Zstd(ZstdError(code)),
119 }),
120 },
121 Self::Lz4 => lz4::decompress_into(data, destination).map_err(|e| {
122 ChunkedArchiveError::DecompressionError {
123 index: chunk_index,
124 error: FormatError::Lz4(e),
125 }
126 }),
127 }
128 }
129}
130
131#[derive(Copy, Clone)]
132pub enum ThreadLocalDecompressor {
134 Zstd,
135 Lz4,
136}
137
138impl ThreadLocalDecompressor {
139 pub fn decompress(
141 &self,
142 data: &[u8],
143 uncompressed_size: usize,
144 chunk_index: usize,
145 ) -> Result<Vec<u8>, ChunkedArchiveError> {
146 match self {
147 Self::Zstd => ZSTD_DECOMPRESSOR.with(|decompressor| {
148 let mut dctx = decompressor.borrow_mut();
149 let mut buffer = Vec::with_capacity(uncompressed_size);
150 match dctx.decompress(&mut buffer, data) {
151 Ok(_) => Ok(buffer),
152 Err(code) => Err(ChunkedArchiveError::DecompressionError {
153 index: chunk_index,
154 error: FormatError::Zstd(ZstdError(code)),
155 }),
156 }
157 }),
158 Self::Lz4 => lz4::decompress(data, uncompressed_size).map_err(|_| {
159 ChunkedArchiveError::DecompressionError {
160 index: chunk_index,
161 error: FormatError::Lz4(lz4::Error::DecompressionFailed),
162 }
163 }),
164 }
165 }
166
167 pub fn decompress_into<'a>(
169 &self,
170 data: &[u8],
171 destination: &'a mut [u8],
172 chunk_index: usize,
173 ) -> Result<usize, ChunkedArchiveError> {
174 match self {
175 Self::Zstd => ZSTD_DECOMPRESSOR.with(|decompressor| {
176 let mut dctx = decompressor.borrow_mut();
177 match dctx.decompress(destination, data) {
178 Ok(size) => Ok(size),
179 Err(code) => Err(ChunkedArchiveError::DecompressionError {
180 index: chunk_index,
181 error: FormatError::Zstd(ZstdError(code)),
182 }),
183 }
184 }),
185 Self::Lz4 => lz4::decompress_into(data, destination).map_err(|e| {
186 ChunkedArchiveError::DecompressionError {
187 index: chunk_index,
188 error: FormatError::Lz4(e),
189 }
190 }),
191 }
192 }
193}
194
195pub enum Compressor {
197 Zstd(zstd::zstd_safe::CCtx<'static>),
198 Lz4 { compression_level: lz4::HcCompressionLevel },
199}
200
201impl Compressor {
202 pub fn compress(
204 &mut self,
205 data: &[u8],
206 chunk_index: usize,
207 ) -> Result<Vec<u8>, ChunkedArchiveError> {
208 match self {
209 Self::Zstd(cctx) => {
210 let buffer_len = zstd::zstd_safe::compress_bound(data.len());
211 let mut buffer = Vec::with_capacity(buffer_len);
212 match cctx.compress2(&mut buffer, data) {
213 Ok(_) => Ok(buffer),
214 Err(code) => Err(ChunkedArchiveError::CompressionError {
215 index: chunk_index,
216 error: FormatError::Zstd(ZstdError(code)),
217 }),
218 }
219 }
220 Self::Lz4 { compression_level } => {
221 lz4::compress_hc(data, *compression_level).map_err(|error| {
222 ChunkedArchiveError::CompressionError {
223 index: chunk_index,
224 error: FormatError::Lz4(error),
225 }
226 })
227 }
228 }
229 }
230}
231
232#[derive(Copy, Clone)]
233pub enum ThreadLocalCompressor {
235 Zstd { compression_level: i32 },
236 Lz4 { compression_level: lz4::HcCompressionLevel },
237}
238
239impl ThreadLocalCompressor {
240 pub fn compress(
242 &self,
243 data: &[u8],
244 chunk_index: usize,
245 ) -> Result<Vec<u8>, ChunkedArchiveError> {
246 match self {
247 Self::Zstd { compression_level } => ZSTD_COMPRESSOR.with(|compressor| {
248 let mut cctx = compressor.borrow_mut();
249 cctx.set_parameter(zstd::zstd_safe::CParameter::CompressionLevel(
250 *compression_level,
251 ))
252 .expect("setting the compression level should never fail");
253 let buffer_len = zstd::zstd_safe::compress_bound(data.len());
254 let mut buffer = Vec::with_capacity(buffer_len);
255 match cctx.compress2(&mut buffer, data) {
256 Ok(_) => Ok(buffer),
257 Err(code) => Err(ChunkedArchiveError::CompressionError {
258 index: chunk_index,
259 error: FormatError::Zstd(ZstdError(code)),
260 }),
261 }
262 }),
263 Self::Lz4 { compression_level } => {
264 lz4::compress_hc(data, *compression_level).map_err(|error| {
265 ChunkedArchiveError::CompressionError {
266 index: chunk_index,
267 error: FormatError::Lz4(error),
268 }
269 })
270 }
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::compression::ChunkedArchiveOptions;
279
280 const TEST_DATA: &[u8] = b"hello world this is some test data to compress and decompress";
281
282 #[test]
283 fn test_zstd_roundtrip() {
284 let options =
285 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
286 let mut compressor = options.compressor();
287 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
288
289 let mut decompressor = CompressionAlgorithm::Zstd.decompressor();
290 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
291
292 assert_eq!(decompressed, TEST_DATA);
293 }
294
295 #[test]
296 fn test_lz4_roundtrip() {
297 let options =
298 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
299 let mut compressor = options.compressor();
300 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
301
302 let mut decompressor = CompressionAlgorithm::Lz4.decompressor();
303 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
304
305 assert_eq!(decompressed, TEST_DATA);
306 }
307
308 #[test]
309 fn test_thread_local_zstd_roundtrip() {
310 let options =
311 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
312 let compressor = options.thread_local_compressor();
313 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
314
315 let decompressor = CompressionAlgorithm::Zstd.thread_local_decompressor();
316 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
317
318 assert_eq!(decompressed, TEST_DATA);
319 }
320
321 #[test]
322 fn test_thread_local_lz4_roundtrip() {
323 let options =
324 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
325 let compressor = options.thread_local_compressor();
326 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
327
328 let decompressor = CompressionAlgorithm::Lz4.thread_local_decompressor();
329 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
330
331 assert_eq!(decompressed, TEST_DATA);
332 }
333
334 #[test]
335 fn test_decompress_into() {
336 let options = ChunkedArchiveOptions::V2 {
337 minimum_chunk_size: 0,
338 chunk_alignment: 0,
339 compression_level: 1,
340 };
341 let mut compressor = options.compressor();
342 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
343
344 let mut decompressor = CompressionAlgorithm::Zstd.decompressor();
345 let mut buffer = vec![0u8; TEST_DATA.len()];
346 let len = decompressor.decompress_into(&compressed, &mut buffer, 0).unwrap();
347
348 assert_eq!(len, TEST_DATA.len());
349 assert_eq!(buffer, TEST_DATA);
350 }
351
352 #[test]
353 fn test_algorithm_conversion() {
354 assert_eq!(u8::from(CompressionAlgorithm::Zstd), 0);
355 assert_eq!(u8::from(CompressionAlgorithm::Lz4), 1);
356
357 assert_eq!(CompressionAlgorithm::try_from(0).unwrap(), CompressionAlgorithm::Zstd);
358 assert_eq!(CompressionAlgorithm::try_from(1).unwrap(), CompressionAlgorithm::Lz4);
359 assert!(CompressionAlgorithm::try_from(2).is_err());
360 }
361}