1use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13 MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38pub const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
39
40const PURGED: usize = 1 << (usize::BITS - 1);
44
45#[derive(ToWeakNode)]
47pub struct FxBlob {
48 handle: DataObjectHandle<FxVolume>,
49 vmo: zx::Vmo,
50 open_count: AtomicUsize,
51 merkle_root: Hash,
52 merkle_verifier: ReadSizedMerkleVerifier,
53 compression_info: Option<CompressionInfo>,
54 uncompressed_size: u64, pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
56 chunks_supplied: AtomicBitVec,
57}
58
59impl FxBlob {
60 pub fn new(
61 handle: DataObjectHandle<FxVolume>,
62 merkle_root: Hash,
63 merkle_verifier: MerkleVerifier,
64 compression_info: Option<CompressionInfo>,
65 uncompressed_size: u64,
66 ) -> Result<Arc<Self>, Error> {
67 let min_chunk_size = min_chunk_size(&compression_info);
68 let merkle_verifier =
69 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
70 let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
71
72 Ok(Arc::new_cyclic(|weak| {
73 let (vmo, pager_packet_receiver_registration) = handle
74 .owner()
75 .pager()
76 .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
77 .unwrap();
78 set_vmo_name(&vmo, &merkle_root);
79 Self {
80 handle,
81 vmo,
82 open_count: AtomicUsize::new(0),
83 merkle_root,
84 merkle_verifier,
85 compression_info,
86 uncompressed_size,
87 pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
88 chunks_supplied,
89 }
90 }))
91 }
92
93 pub fn overwrite_me(
95 self: &Arc<Self>,
96 handle: DataObjectHandle<FxVolume>,
97 merkle_verifier: MerkleVerifier,
98 compression_info: Option<CompressionInfo>,
99 ) -> Arc<Self> {
100 let min_chunk_size = min_chunk_size(&compression_info);
101 let merkle_verifier =
102 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
103 .expect("The chunk size should have been validated by the delivery blob parser");
104 let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
107 let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
108
109 let new_blob = Arc::new(Self {
110 handle,
111 vmo,
112 open_count: AtomicUsize::new(0),
113 merkle_root: self.merkle_root,
114 merkle_verifier,
115 compression_info,
116 uncompressed_size: self.uncompressed_size,
117 pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
118 chunks_supplied,
119 });
120
121 self.handle.owner().cache().remove(self.as_ref());
124
125 let receiver_lock =
128 self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
129 if receiver_lock.is_strong() {
130 new_blob.open_count_add_one();
135 let old_blob = self.clone();
136 Epoch::global().defer(move || old_blob.open_count_sub_one());
137 }
138 new_blob
139 }
140
141 pub fn root(&self) -> Hash {
142 self.merkle_root
143 }
144
145 fn record_page_fault_metric(&self, range: &Range<u64>) {
146 let chunk_size: u64 = min_chunk_size(&self.compression_info);
147
148 let first_chunk = range.start / chunk_size;
149 let last_chunk = range.end.div_ceil(chunk_size);
151
152 let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
153
154 if supplied_count > 0 {
155 self.handle
156 .owner()
157 .blob_resupplied_count()
158 .increment(supplied_count, Ordering::Relaxed);
159 }
160 }
161}
162
163impl Drop for FxBlob {
164 fn drop(&mut self) {
165 let volume = self.handle.owner();
166 volume.cache().remove(self);
167 }
168}
169
170impl OpenedNode<FxBlob> {
171 pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
178 let blob = self.0.as_ref();
179 let child_vmo = blob.vmo.create_child(
180 zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
181 0,
182 0,
183 )?;
184 if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
185 blob.open_count_add_one();
189 }
190 Ok(child_vmo)
191 }
192}
193
194impl FxNode for FxBlob {
195 fn object_id(&self) -> u64 {
196 self.handle.object_id()
197 }
198
199 fn parent(&self) -> Option<Arc<FxDirectory>> {
200 unreachable!(); }
202
203 fn set_parent(&self, _parent: Arc<FxDirectory>) {
204 }
206
207 fn open_count_add_one(&self) {
208 let old = self.open_count.fetch_add(1, Ordering::Relaxed);
209 assert!(old != PURGED && old != PURGED - 1);
210 }
211
212 fn open_count_sub_one(self: Arc<Self>) {
213 let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
214 assert!(old & !PURGED > 0);
215 if old == PURGED + 1 {
216 let store = self.handle.store();
217 store
218 .filesystem()
219 .graveyard()
220 .queue_tombstone_object(store.store_object_id(), self.object_id());
221 }
222 }
223
224 fn object_descriptor(&self) -> ObjectDescriptor {
225 ObjectDescriptor::File
226 }
227
228 fn terminate(&self) {
229 self.pager_packet_receiver_registration.stop_watching_for_zero_children();
230 }
231
232 fn mark_to_be_purged(&self) {
233 let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
234 assert!(old & PURGED == 0);
235 if old == 0 {
236 let store = self.handle.store();
237 store
238 .filesystem()
239 .graveyard()
240 .queue_tombstone_object(store.store_object_id(), self.object_id());
241 }
242 }
243}
244
245impl PagerBacked for FxBlob {
246 fn pager(&self) -> &crate::pager::Pager {
247 self.handle.owner().pager()
248 }
249
250 fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
251 &self.pager_packet_receiver_registration
252 }
253
254 fn vmo(&self) -> &zx::Vmo {
255 &self.vmo
256 }
257
258 fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
259 let read_ahead_size = if let Some(compression_info) = &self.compression_info {
260 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
261 } else {
262 READ_AHEAD_SIZE
263 };
264 default_page_in(self, range, read_ahead_size)
266 }
267
268 fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
269 unreachable!();
270 }
271
272 fn on_zero_children(self: Arc<Self>) {
273 self.open_count_sub_one();
274 }
275
276 fn byte_size(&self) -> u64 {
277 self.uncompressed_size
278 }
279
280 async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
281 self.record_page_fault_metric(&range);
282
283 let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
284 let read = match &self.compression_info {
285 None => self.handle.read(range.start, buffer.as_mut()).await?,
286 Some(compression_info) => {
287 let compressed_offsets =
288 match compression_info.compressed_range_for_uncompressed_range(&range)? {
289 (start, None) => start..self.handle.get_size(),
290 (start, Some(end)) => start..end.get(),
291 };
292 let bs = self.handle.block_size();
293 let aligned = round_down(compressed_offsets.start, bs)
294 ..round_up(compressed_offsets.end, bs).unwrap();
295 let mut compressed_buf =
296 self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
297
298 let mut decompression_errors = 0;
299 let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
300 loop {
301 let (read, _) = try_join!(
302 self.handle.read(aligned.start, compressed_buf.as_mut()),
303 async {
304 buffer
305 .allocator()
306 .buffer_source()
307 .commit_range(buffer.range())
308 .map_err(|e| e.into())
309 }
310 )
311 .with_context(|| {
312 format!(
313 "Failed to read compressed range {:?}, len {}",
314 aligned,
315 self.handle.get_size()
316 )
317 })?;
318 let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
319 ..(compressed_offsets.end - aligned.start) as usize;
320 ensure!(
321 read >= compressed_buf_range.end - compressed_buf_range.start,
322 anyhow!(FxfsError::Inconsistent).context(format!(
323 "Unexpected EOF, read {}, but expected {}",
324 read,
325 compressed_buf_range.end - compressed_buf_range.start,
326 ))
327 );
328
329 let buf = buffer.as_mut_slice();
330 let decompression_result = {
331 fxfs_trace::duration!("blob-decompress", "len" => len);
332 compression_info.decompress(
333 &compressed_buf.as_slice()[compressed_buf_range],
334 &mut buf[..len],
335 range.start,
336 )
337 };
338 match decompression_result {
339 Ok(()) => break,
340 Err(error) => {
341 record_decompression_error_crash_report(
342 compressed_buf.as_slice(),
343 &range,
344 &compressed_offsets,
345 &self.merkle_root,
346 )
347 .await;
348 decompression_errors += 1;
349 if decompression_errors == 2 {
350 bail!(
351 anyhow!(FxfsError::IntegrityError)
352 .context(format!("Decompression error: {error:?}"))
353 );
354 } else {
355 warn!(error:?; "Decompression error; retrying");
356 }
357 }
358 }
359 } if decompression_errors > 0 {
361 info!("Read succeeded on second attempt");
362 }
363 len
364 }
365 };
366 {
367 fxfs_trace::duration!("blob-verify", "len" => read);
370 self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
371 }
372 buffer.as_mut_slice()[read..].fill(0);
374 Ok(buffer)
375 }
376}
377
378pub struct CompressionInfo {
379 chunk_size: u64,
380 small_offsets: Box<[u32]>,
383 large_offsets: Box<[u64]>,
384 decompressor: ThreadLocalDecompressor,
385}
386
387impl CompressionInfo {
388 pub fn new(
389 chunk_size: u64,
390 offsets: &[u64],
391 compression_algorithm: CompressionAlgorithm,
392 ) -> Result<Self, Error> {
393 let decompressor = compression_algorithm.thread_local_decompressor();
394 if chunk_size == 0 {
395 return Err(FxfsError::IntegrityError.into());
396 } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
397 Err(FxfsError::IntegrityError.into())
399 } else if !offsets.array_windows().all(|[a, b]| a < b) {
400 Err(FxfsError::IntegrityError.into())
402 } else if offsets.len() == 1 {
403 Ok(Self {
406 chunk_size,
407 small_offsets: Box::default(),
408 large_offsets: Box::default(),
409 decompressor,
410 })
411 } else if *offsets.last().unwrap() <= u32::MAX as u64 {
412 Ok(Self {
415 chunk_size,
416 small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
417 large_offsets: Box::default(),
418 decompressor,
419 })
420 } else {
421 let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
423 Ok(Self {
424 chunk_size,
425 small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
426 large_offsets: offsets[partition_point..].into(),
427 decompressor,
428 })
429 }
430 }
431
432 fn compressed_range_for_uncompressed_range(
433 &self,
434 range: &Range<u64>,
435 ) -> Result<(u64, Option<NonZero<u64>>), Error> {
436 ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
437 ensure!(range.start < range.end, FxfsError::Inconsistent);
438
439 let start_chunk_index = (range.start / self.chunk_size) as usize;
440 let start_offset = self
441 .compressed_offset_for_chunk_index(start_chunk_index)
442 .ok_or(FxfsError::OutOfRange)?;
443
444 let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
446 let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
447 None => None,
448 Some(offset) => {
449 ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
451 Some(NonZero::new(offset).unwrap())
454 }
455 };
456 Ok((start_offset, end_offset))
457 }
458
459 fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
460 if chunk_index == 0 {
462 Some(0)
463 } else if chunk_index - 1 < self.small_offsets.len() {
464 Some(self.small_offsets[chunk_index - 1] as u64)
465 } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
466 Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
467 } else {
468 None
469 }
470 }
471
472 fn decompress(
478 &self,
479 mut src: &[u8],
480 mut dst: &mut [u8],
481 dst_start_offset: u64,
482 ) -> Result<(), Error> {
483 ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
484
485 let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
486 let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
487 let mut start_offset = self
488 .compressed_offset_for_chunk_index(start_chunk_index)
489 .ok_or(FxfsError::Inconsistent)?;
490
491 for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
493 match self.compressed_offset_for_chunk_index(chunk_index + 1) {
494 Some(end_offset) => {
495 let (to_decompress, src_remaining) = src
496 .split_at_checked((end_offset - start_offset) as usize)
497 .ok_or(FxfsError::Inconsistent)?;
498 let (to_decompress_into, dst_remaining) = dst
499 .split_at_mut_checked(self.chunk_size as usize)
500 .ok_or(FxfsError::Inconsistent)?;
501
502 let decompressed_bytes = self.decompressor.decompress_into(
503 to_decompress,
504 to_decompress_into,
505 chunk_index,
506 )?;
507 ensure!(
508 decompressed_bytes == to_decompress_into.len(),
509 FxfsError::Inconsistent
510 );
511 src = src_remaining;
512 dst = dst_remaining;
513 start_offset = end_offset;
514 }
515 None => {
516 let decompressed_bytes =
517 self.decompressor.decompress_into(src, dst, chunk_index)?;
518 ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
519 }
520 }
521 }
522
523 Ok(())
524 }
525}
526
527fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
528 let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
529 let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
530 let name = zx::Name::new(&name).unwrap();
531 vmo.set_name(&name).unwrap();
532}
533
534fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
535 if let Some(compression_info) = compression_info {
536 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
537 } else {
538 READ_AHEAD_SIZE
539 }
540}
541
542fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
543 if chunk_size >= suggested_read_ahead_size {
544 chunk_size
545 } else {
546 round_down(suggested_read_ahead_size, chunk_size)
547 }
548}
549
550async fn record_decompression_error_crash_report(
551 compressed_buf: &[u8],
552 uncompressed_offsets: &Range<u64>,
553 compressed_offsets: &Range<u64>,
554 merkle_root: &Hash,
555) {
556 static DONE_ONCE: AtomicBool = AtomicBool::new(false);
557 if !DONE_ONCE.swap(true, Ordering::Relaxed) {
558 if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
559 let size = compressed_buf.len() as u64;
560 let vmo = zx::Vmo::create(size).unwrap();
561 vmo.write(compressed_buf, 0).unwrap();
562 if let Err(e) = proxy
563 .file_report(CrashReport {
564 program_name: Some("fxfs".to_string()),
565 crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
566 is_fatal: Some(false),
567 annotations: Some(vec![
568 Annotation {
569 key: "fxfs.range".to_string(),
570 value: format!("{:?}", uncompressed_offsets),
571 },
572 Annotation {
573 key: "fxfs.compressed_offsets".to_string(),
574 value: format!("{:?}", compressed_offsets),
575 },
576 Annotation {
577 key: "fxfs.merkle_root".to_string(),
578 value: format!("{}", merkle_root),
579 },
580 ]),
581 attachments: Some(vec![Attachment {
582 key: "fxfs_compressed_data".to_string(),
583 value: Buffer { vmo, size },
584 }]),
585 ..Default::default()
586 })
587 .await
588 {
589 error!(e:?; "Failed to file crash report");
590 } else {
591 warn!("Filed crash report for decompression error");
592 }
593 } else {
594 error!("Failed to connect to crash report service");
595 }
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
603 use crate::fuchsia::pager::PageInRange;
604 use crate::fxblob::testing::open_blob_fixture;
605 use assert_matches::assert_matches;
606 use delivery_blob::CompressionMode;
607 use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
608 use fuchsia_async as fasync;
609 use fuchsia_async::epoch::Epoch;
610 use fxfs_make_blob_image::FxBlobBuilder;
611 use storage_device::DeviceHolder;
612 use storage_device::fake_device::FakeDevice;
613
614 const CHUNK_SIZE: usize = 32 * 1024;
615
616 #[fasync::run(10, test)]
617 async fn test_empty_blob() {
618 let fixture = new_blob_fixture().await;
619
620 let data = vec![];
621 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
622 assert_eq!(fixture.read_blob(hash).await, data);
623
624 fixture.close().await;
625 }
626
627 #[fasync::run(10, test)]
628 async fn test_large_blob() {
629 let fixture = new_blob_fixture().await;
630
631 let data = vec![3; 3_000_000];
632 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
633
634 assert_eq!(fixture.read_blob(hash).await, data);
635
636 fixture.close().await;
637 }
638
639 #[fasync::run(10, test)]
640 async fn test_large_compressed_blob() {
641 let fixture = new_blob_fixture().await;
642
643 let data = vec![3; 3_000_000];
644 let hash = fixture.write_blob(&data, CompressionMode::Always).await;
645
646 assert_eq!(fixture.read_blob(hash).await, data);
647
648 fixture.close().await;
649 }
650
651 #[fasync::run(10, test)]
652 async fn test_non_page_aligned_blob() {
653 let fixture = new_blob_fixture().await;
654
655 let page_size = zx::system_get_page_size() as usize;
656 let data = vec![0xffu8; page_size - 1];
657 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
658 assert_eq!(fixture.read_blob(hash).await, data);
659
660 {
661 let vmo = fixture.get_blob_vmo(hash).await;
662 let mut buf = vec![0x11u8; page_size];
663 vmo.read(&mut buf[..], 0).expect("vmo read failed");
664 assert_eq!(data, buf[..data.len()]);
665 assert_eq!(buf[data.len()], 0);
667 }
668
669 fixture.close().await;
670 }
671
672 #[fasync::run(10, test)]
673 async fn test_blob_invalid_contents() {
674 let fixture = new_blob_fixture().await;
675
676 let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
677 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678 let name = format!("{}", hash);
679
680 {
681 let handle = fixture.get_blob_handle(&name).await;
683 let mut transaction =
684 handle.new_transaction().await.expect("failed to create transaction");
685 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
686 buf.as_mut_slice().fill(0);
687 handle
688 .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
689 .await
690 .expect("txn_write failed");
691 transaction.commit().await.expect("failed to commit transaction");
692 }
693
694 {
695 let blob_vmo = fixture.get_blob_vmo(hash).await;
696 let mut buf = vec![0; BLOCK_SIZE as usize];
697 assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
698 assert_matches!(
699 blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
700 Err(zx::Status::IO_DATA_INTEGRITY)
701 );
702 }
703
704 fixture.close().await;
705 }
706
707 #[fasync::run(10, test)]
708 async fn test_lz4_blob() {
709 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
710 let blob_data = vec![0xAA; 68 * 1024];
711 let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
712 let blob = fxblob_builder
713 .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
714 .unwrap();
715 let blob_hash = blob.hash();
716 fxblob_builder.install_blob(&blob).await.unwrap();
717 let device = fxblob_builder.finalize().await.unwrap().0;
718 device.reopen(false);
719 let fixture = open_blob_fixture(device).await;
720
721 assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
722
723 fixture.close().await;
724 }
725
726 #[fasync::run(10, test)]
727 async fn test_blob_vmos_are_immutable() {
728 let fixture = new_blob_fixture().await;
729
730 let data = vec![0xffu8; 500];
731 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
732 let blob_vmo = fixture.get_blob_vmo(hash).await;
733
734 assert_matches!(blob_vmo.set_size(20), Err(_));
736
737 assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
739
740 assert_matches!(blob_vmo.set_stream_size(20), Err(_));
742
743 fixture.close().await;
744 }
745
746 const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
747 const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
748 const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
749
750 #[fuchsia::test]
751 fn test_compression_info_offsets_must_start_with_zero() {
752 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
753 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
754 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
755 }
756
757 #[fuchsia::test]
758 fn test_compression_info_offsets_must_be_sorted() {
759 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
760 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
761 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
762 }
763
764 #[fuchsia::test]
765 fn test_compression_info_splitting_offsets() {
766 let compression_info =
768 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
769 assert!(compression_info.small_offsets.is_empty());
770 assert!(compression_info.large_offsets.is_empty());
771
772 let compression_info =
774 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
775 assert_eq!(&*compression_info.small_offsets, &[10]);
776 assert!(compression_info.large_offsets.is_empty());
777
778 let compression_info =
780 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
781 assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
782 assert!(compression_info.large_offsets.is_empty());
783
784 let compression_info =
786 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
787 .unwrap();
788 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
789 assert!(compression_info.large_offsets.is_empty());
790
791 let compression_info =
793 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
794 assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
795 assert!(compression_info.large_offsets.is_empty());
796
797 let compression_info =
799 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
800 .unwrap();
801 assert!(compression_info.small_offsets.is_empty());
802 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
803
804 let compression_info = CompressionInfo::new(
806 COMPRESSED_BLOB_CHUNK_SIZE,
807 &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
808 ZSTD,
809 )
810 .unwrap();
811 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
812 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
813
814 let compression_info =
816 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
817 .unwrap();
818 assert!(compression_info.small_offsets.is_empty());
819 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
820
821 let compression_info = CompressionInfo::new(
823 COMPRESSED_BLOB_CHUNK_SIZE,
824 &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
825 ZSTD,
826 )
827 .unwrap();
828 assert!(compression_info.small_offsets.is_empty());
829 assert_eq!(
830 &*compression_info.large_offsets,
831 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
832 );
833
834 let compression_info = CompressionInfo::new(
836 COMPRESSED_BLOB_CHUNK_SIZE,
837 &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
838 ZSTD,
839 )
840 .unwrap();
841 assert_eq!(&*compression_info.small_offsets, &[10, 20]);
842 assert_eq!(
843 &*compression_info.large_offsets,
844 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
845 );
846 }
847
848 #[fuchsia::test]
849 fn test_compression_info_compressed_range_for_uncompressed_range() {
850 fn check_compression_ranges(
851 offsets: &[u64],
852 expected_ranges: &[(u64, Option<u64>)],
853 chunk_size: u64,
854 read_ahead_size: u64,
855 ) {
856 let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
857 for (i, range) in expected_ranges.iter().enumerate() {
858 let i = i as u64;
859 let result = compression_info
860 .compressed_range_for_uncompressed_range(
861 &(i * read_ahead_size..(i + 1) * read_ahead_size),
862 )
863 .unwrap();
864 assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
865 }
866 }
867 check_compression_ranges(
868 &[0, 10, 20, 30],
869 &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
870 COMPRESSED_BLOB_CHUNK_SIZE,
871 COMPRESSED_BLOB_CHUNK_SIZE,
872 );
873 check_compression_ranges(
874 &[0, 10, 20, 30],
875 &[(0, Some(20)), (20, None)],
876 COMPRESSED_BLOB_CHUNK_SIZE,
877 COMPRESSED_BLOB_CHUNK_SIZE * 2,
878 );
879 check_compression_ranges(
880 &[0, 10, 20, 30],
881 &[(0, None)],
882 COMPRESSED_BLOB_CHUNK_SIZE,
883 COMPRESSED_BLOB_CHUNK_SIZE * 4,
884 );
885 check_compression_ranges(
886 &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
887 &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
888 COMPRESSED_BLOB_CHUNK_SIZE,
889 COMPRESSED_BLOB_CHUNK_SIZE * 4,
890 );
891 check_compression_ranges(
892 &[
893 0,
894 10,
895 20,
896 30,
897 MAX_SMALL_OFFSET + 10,
898 MAX_SMALL_OFFSET + 20,
899 MAX_SMALL_OFFSET + 30,
900 MAX_SMALL_OFFSET + 40,
901 MAX_SMALL_OFFSET + 50,
902 ],
903 &[
904 (0, Some(20)),
905 (20, Some(MAX_SMALL_OFFSET + 10)),
906 (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
907 (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
908 ],
909 COMPRESSED_BLOB_CHUNK_SIZE,
910 COMPRESSED_BLOB_CHUNK_SIZE * 2,
911 );
912 }
913
914 #[fuchsia::test]
915 fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
916 let compression_info = CompressionInfo::new(
917 COMPRESSED_BLOB_CHUNK_SIZE,
918 &[
919 0,
920 10,
921 20,
922 30,
923 MAX_SMALL_OFFSET + 10,
924 MAX_SMALL_OFFSET + 20,
925 MAX_SMALL_OFFSET + 30,
926 MAX_SMALL_OFFSET + 40,
927 MAX_SMALL_OFFSET + 50,
928 ],
929 ZSTD,
930 )
931 .unwrap();
932
933 assert!(
935 compression_info
936 .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
937 .is_err()
938 );
939
940 assert!(
942 compression_info
943 .compressed_range_for_uncompressed_range(
944 &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
945 )
946 .is_err()
947 );
948
949 assert!(
951 compression_info
952 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
953 .is_err()
954 );
955 assert!(
956 compression_info
957 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
958 .is_err()
959 );
960 assert!(
961 compression_info
962 .compressed_range_for_uncompressed_range(
963 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
964 )
965 .is_err()
966 );
967 assert!(
968 compression_info
969 .compressed_range_for_uncompressed_range(
970 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
971 )
972 .is_err()
973 );
974
975 assert!(
977 compression_info
978 .compressed_range_for_uncompressed_range(
979 &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
980 )
981 .is_ok()
982 );
983 }
984
985 #[fuchsia::test]
986 fn test_read_ahead_size_for_chunk_size() {
987 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
988 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
989 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
990
991 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
992 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
993 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
994 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
995
996 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
997 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
998 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
999 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1000 }
1001
1002 fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1003 let options =
1004 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1005 let mut compressor = options.compressor();
1006 let mut uncompressed_data = Vec::with_capacity(size);
1007 {
1008 let mut run_length = 1;
1009 let mut run_value: u8 = 0;
1010 while uncompressed_data.len() < size {
1011 uncompressed_data
1012 .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1013 run_length = (run_length + 1) % 19 + 1;
1014 run_value = (run_value + 1) % 17;
1015 }
1016 }
1017 let mut compressed_offsets = vec![0];
1018 let mut compressed_data = vec![];
1019 for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1020 let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1021 compressed_data.append(&mut compressed_chunk);
1022 compressed_offsets.push(compressed_data.len() as u64);
1023 }
1024 compressed_offsets.pop();
1025 (
1026 CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1027 .unwrap(),
1028 compressed_data,
1029 uncompressed_data,
1030 )
1031 }
1032
1033 #[fuchsia::test]
1034 fn test_compression_info_decompress_single_chunk() {
1035 let (compression_info, compressed_data, uncompressed_data) =
1036 build_compression_info(CHUNK_SIZE);
1037 let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1038
1039 compression_info
1040 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1041 .expect("failed to decompress");
1042 assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1043
1044 compression_info
1046 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1047 .expect_err("decompression should fail");
1048
1049 compression_info
1051 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1052 .expect_err("decompression should fail");
1053 }
1054
1055 #[fuchsia::test]
1056 fn test_compression_info_decompress_multiple_chunks() {
1057 fn slice_for_chunks<'a>(
1058 compressed_data: &'a [u8],
1059 compression_info: &CompressionInfo,
1060 chunks: Range<u64>,
1061 ) -> &'a [u8] {
1062 let (start, end) = compression_info
1063 .compressed_range_for_uncompressed_range(
1064 &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1065 )
1066 .unwrap();
1067 let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1068 &compressed_data[start as usize..end as usize]
1069 }
1070
1071 const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1072 let (compression_info, compressed_data, uncompressed_data) =
1073 build_compression_info(BLOB_SIZE);
1074 let mut decompressed_data = vec![0u8; BLOB_SIZE];
1075
1076 compression_info
1078 .decompress(&compressed_data, &mut decompressed_data, 0)
1079 .expect("failed to decompress");
1080 assert_eq!(uncompressed_data, decompressed_data);
1081
1082 compression_info
1084 .decompress(
1085 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1086 &mut decompressed_data[0..CHUNK_SIZE * 4],
1087 0,
1088 )
1089 .expect("failed to decompress");
1090 assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1091
1092 compression_info
1094 .decompress(
1095 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1096 &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1097 0,
1098 )
1099 .expect_err("decompression should fail");
1100
1101 compression_info
1103 .decompress(
1104 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1105 &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1106 0,
1107 )
1108 .expect_err("decompression should fail");
1109
1110 let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1112 compression_info
1113 .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1114 .expect("failed to decompress");
1115 assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1116
1117 compression_info
1119 .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1120 .expect_err("decompression should fail");
1121
1122 compression_info
1124 .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1125 .expect_err("decompression should fail");
1126 }
1127
1128 #[fasync::run(10, test)]
1129 async fn test_refault_metric() {
1130 let fixture = new_blob_fixture().await;
1131 {
1132 let volume = fixture.volume().volume().clone();
1133 const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1134 let data = vec![0xffu8; FILE_SIZE as usize];
1135 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1136
1137 let blob = fixture.get_blob(hash).await.unwrap();
1138 assert_eq!(blob.chunks_supplied.len(), 4);
1139 assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1141
1142 blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1143
1144 assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1145
1146 blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1147 assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1148
1149 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1151
1152 blob.clone().page_in(PageInRange::new(
1157 FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1158 blob.clone(),
1159 Epoch::global().guard(),
1160 ));
1161 Epoch::global().barrier().await;
1162
1163 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1164 }
1165
1166 fixture.close().await;
1167 }
1168}