delivery_blob/compression/
compression_algorithm.rs1use crate::compression::ChunkedArchiveError;
12
13thread_local! {
14 static ZSTD_COMPRESSOR: std::cell::RefCell<zstd::bulk::Compressor<'static>> =
15 std::cell::RefCell::new({
16 let mut compressor = zstd::bulk::Compressor::default();
17 compressor.set_parameter(zstd::zstd_safe::CParameter::ChecksumFlag(true)).unwrap();
18 compressor
19 });
20 static ZSTD_DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
21 std::cell::RefCell::new(zstd::bulk::Decompressor::default());
22}
23
24#[derive(Copy, Clone, Debug, Eq, PartialEq)]
26#[repr(u8)]
27pub enum CompressionAlgorithm {
28 Zstd = 0,
29 Lz4 = 1,
30}
31
32impl CompressionAlgorithm {
33 pub fn decompressor(&self) -> Decompressor {
36 match self {
37 Self::Zstd => Decompressor::Zstd(zstd::bulk::Decompressor::default()),
38 Self::Lz4 => Decompressor::Lz4,
39 }
40 }
41
42 pub fn thread_local_decompressor(&self) -> ThreadLocalDecompressor {
47 match self {
48 Self::Zstd => ThreadLocalDecompressor::Zstd,
49 Self::Lz4 => ThreadLocalDecompressor::Lz4,
50 }
51 }
52}
53
54impl From<CompressionAlgorithm> for u8 {
55 fn from(value: CompressionAlgorithm) -> Self {
56 value as u8
57 }
58}
59
60impl TryFrom<u8> for CompressionAlgorithm {
61 type Error = ChunkedArchiveError;
62 fn try_from(value: u8) -> Result<Self, Self::Error> {
63 match value {
64 0 => Ok(CompressionAlgorithm::Zstd),
65 1 => Ok(CompressionAlgorithm::Lz4),
66 _ => Err(ChunkedArchiveError::IntegrityError),
67 }
68 }
69}
70
71pub enum Decompressor {
73 Zstd(zstd::bulk::Decompressor<'static>),
74 Lz4,
75}
76
77impl Decompressor {
78 pub fn decompress(
80 &mut self,
81 data: &[u8],
82 uncompressed_size: usize,
83 chunk_index: usize,
84 ) -> Result<Vec<u8>, ChunkedArchiveError> {
85 match self {
86 Self::Zstd(decompressor) => {
87 decompressor.decompress(data, uncompressed_size).map_err(|error| {
88 ChunkedArchiveError::DecompressionError { index: chunk_index, error }
89 })
90 }
91 Self::Lz4 => lz4::decompress(data, uncompressed_size).map_err(|_| {
92 ChunkedArchiveError::DecompressionError {
93 index: chunk_index,
94 error: std::io::Error::other("LZ4 decompression error"),
95 }
96 }),
97 }
98 }
99
100 pub fn decompress_into<'a>(
102 &mut self,
103 data: &[u8],
104 destination: &'a mut [u8],
105 chunk_index: usize,
106 ) -> Result<usize, ChunkedArchiveError> {
107 match self {
108 Self::Zstd(decompressor) => {
109 decompressor.decompress_to_buffer(data, destination).map_err(|error| {
110 ChunkedArchiveError::DecompressionError { index: chunk_index, error }
111 })
112 }
113 Self::Lz4 => lz4::decompress_into(data, destination).map_err(|e| {
114 ChunkedArchiveError::DecompressionError {
115 index: chunk_index,
116 error: std::io::Error::other(e),
117 }
118 }),
119 }
120 }
121}
122
123#[derive(Copy, Clone)]
124pub enum ThreadLocalDecompressor {
126 Zstd,
127 Lz4,
128}
129
130impl ThreadLocalDecompressor {
131 pub fn decompress(
133 &self,
134 data: &[u8],
135 uncompressed_size: usize,
136 chunk_index: usize,
137 ) -> Result<Vec<u8>, ChunkedArchiveError> {
138 match self {
139 Self::Zstd => ZSTD_DECOMPRESSOR.with(|decompressor| {
140 decompressor.borrow_mut().decompress(data, uncompressed_size).map_err(|error| {
141 ChunkedArchiveError::DecompressionError { index: chunk_index, error }
142 })
143 }),
144 Self::Lz4 => lz4::decompress(data, uncompressed_size).map_err(|_| {
145 ChunkedArchiveError::DecompressionError {
146 index: chunk_index,
147 error: std::io::Error::other("LZ4 decompression error"),
148 }
149 }),
150 }
151 }
152
153 pub fn decompress_into<'a>(
155 &self,
156 data: &[u8],
157 destination: &'a mut [u8],
158 chunk_index: usize,
159 ) -> Result<usize, ChunkedArchiveError> {
160 match self {
161 Self::Zstd => ZSTD_DECOMPRESSOR.with(|decompressor| {
162 decompressor.borrow_mut().decompress_to_buffer(data, destination).map_err(|error| {
163 ChunkedArchiveError::DecompressionError { index: chunk_index, error }
164 })
165 }),
166 Self::Lz4 => lz4::decompress_into(data, destination).map_err(|e| {
167 ChunkedArchiveError::DecompressionError {
168 index: chunk_index,
169 error: std::io::Error::other(e),
170 }
171 }),
172 }
173 }
174}
175
176pub enum Compressor {
178 Zstd(zstd::bulk::Compressor<'static>),
179 Lz4 { compression_level: lz4::HcCompressionLevel },
180}
181
182impl Compressor {
183 pub fn compress(
185 &mut self,
186 data: &[u8],
187 chunk_index: usize,
188 ) -> Result<Vec<u8>, ChunkedArchiveError> {
189 match self {
190 Self::Zstd(compressor) => compressor.compress(data).map_err(|error| {
191 ChunkedArchiveError::CompressionError { index: chunk_index, error }
192 }),
193 Self::Lz4 { compression_level } => Ok(lz4::compress_hc(data, *compression_level)
194 .expect("chunk size is less than max LZ4 input")),
195 }
196 }
197}
198
199#[derive(Copy, Clone)]
200pub enum ThreadLocalCompressor {
202 Zstd { compression_level: i32 },
203 Lz4 { compression_level: lz4::HcCompressionLevel },
204}
205
206impl ThreadLocalCompressor {
207 pub fn compress(
209 &self,
210 data: &[u8],
211 chunk_index: usize,
212 ) -> Result<Vec<u8>, ChunkedArchiveError> {
213 match self {
214 Self::Zstd { compression_level } => ZSTD_COMPRESSOR.with(|compressor| {
215 let mut compressor = compressor.borrow_mut();
216 compressor
217 .set_compression_level(*compression_level)
218 .expect("setting the compression level should never fail");
219 compressor.compress(data).map_err(|error| ChunkedArchiveError::CompressionError {
220 index: chunk_index,
221 error,
222 })
223 }),
224 Self::Lz4 { compression_level } => Ok(lz4::compress_hc(data, *compression_level)
225 .expect("chunk size is less than max LZ4 input")),
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::compression::ChunkedArchiveOptions;
234
235 const TEST_DATA: &[u8] = b"hello world this is some test data to compress and decompress";
236
237 #[test]
238 fn test_zstd_roundtrip() {
239 let options =
240 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
241 let mut compressor = options.compressor();
242 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
243
244 let mut decompressor = CompressionAlgorithm::Zstd.decompressor();
245 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
246
247 assert_eq!(decompressed, TEST_DATA);
248 }
249
250 #[test]
251 fn test_lz4_roundtrip() {
252 let options =
253 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
254 let mut compressor = options.compressor();
255 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
256
257 let mut decompressor = CompressionAlgorithm::Lz4.decompressor();
258 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
259
260 assert_eq!(decompressed, TEST_DATA);
261 }
262
263 #[test]
264 fn test_thread_local_zstd_roundtrip() {
265 let options =
266 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Zstd };
267 let compressor = options.thread_local_compressor();
268 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
269
270 let decompressor = CompressionAlgorithm::Zstd.thread_local_decompressor();
271 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
272
273 assert_eq!(decompressed, TEST_DATA);
274 }
275
276 #[test]
277 fn test_thread_local_lz4_roundtrip() {
278 let options =
279 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
280 let compressor = options.thread_local_compressor();
281 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
282
283 let decompressor = CompressionAlgorithm::Lz4.thread_local_decompressor();
284 let decompressed = decompressor.decompress(&compressed, TEST_DATA.len(), 0).unwrap();
285
286 assert_eq!(decompressed, TEST_DATA);
287 }
288
289 #[test]
290 fn test_decompress_into() {
291 let options = ChunkedArchiveOptions::V2 {
292 minimum_chunk_size: 0,
293 chunk_alignment: 0,
294 compression_level: 1,
295 };
296 let mut compressor = options.compressor();
297 let compressed = compressor.compress(TEST_DATA, 0).unwrap();
298
299 let mut decompressor = CompressionAlgorithm::Zstd.decompressor();
300 let mut buffer = vec![0u8; TEST_DATA.len()];
301 let len = decompressor.decompress_into(&compressed, &mut buffer, 0).unwrap();
302
303 assert_eq!(len, TEST_DATA.len());
304 assert_eq!(buffer, TEST_DATA);
305 }
306
307 #[test]
308 fn test_algorithm_conversion() {
309 assert_eq!(u8::from(CompressionAlgorithm::Zstd), 0);
310 assert_eq!(u8::from(CompressionAlgorithm::Lz4), 1);
311
312 assert_eq!(CompressionAlgorithm::try_from(0).unwrap(), CompressionAlgorithm::Zstd);
313 assert_eq!(CompressionAlgorithm::try_from(1).unwrap(), CompressionAlgorithm::Lz4);
314 assert!(CompressionAlgorithm::try_from(2).is_err());
315 }
316}