1use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13 MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38const PURGED: usize = 1 << (usize::BITS - 1);
42
43#[derive(ToWeakNode)]
45pub struct FxBlob {
46 handle: DataObjectHandle<FxVolume>,
47 vmo: zx::Vmo,
48 open_count: AtomicUsize,
49 merkle_root: Hash,
50 merkle_verifier: ReadSizedMerkleVerifier,
51 compression_info: Option<CompressionInfo>,
52 uncompressed_size: u64, pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
54 chunks_supplied: AtomicBitVec,
55}
56
57impl FxBlob {
58 pub fn new(
59 handle: DataObjectHandle<FxVolume>,
60 merkle_root: Hash,
61 merkle_verifier: MerkleVerifier,
62 compression_info: Option<CompressionInfo>,
63 uncompressed_size: u64,
64 ) -> Result<Arc<Self>, Error> {
65 let min_chunk_size = min_chunk_size(&compression_info);
66 let merkle_verifier =
67 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
68 let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
69
70 Ok(Arc::new_cyclic(|weak| {
71 let (vmo, pager_packet_receiver_registration) = handle
72 .owner()
73 .pager()
74 .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
75 .unwrap();
76 set_vmo_name(&vmo, &merkle_root);
77 Self {
78 handle,
79 vmo,
80 open_count: AtomicUsize::new(0),
81 merkle_root,
82 merkle_verifier,
83 compression_info,
84 uncompressed_size,
85 pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
86 chunks_supplied,
87 }
88 }))
89 }
90
91 pub fn overwrite_me(
93 self: &Arc<Self>,
94 handle: DataObjectHandle<FxVolume>,
95 merkle_verifier: MerkleVerifier,
96 compression_info: Option<CompressionInfo>,
97 ) -> Arc<Self> {
98 let min_chunk_size = min_chunk_size(&compression_info);
99 let merkle_verifier =
100 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
101 .expect("The chunk size should have been validated by the delivery blob parser");
102 let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
105 let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
106
107 let new_blob = Arc::new(Self {
108 handle,
109 vmo,
110 open_count: AtomicUsize::new(0),
111 merkle_root: self.merkle_root,
112 merkle_verifier,
113 compression_info,
114 uncompressed_size: self.uncompressed_size,
115 pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
116 chunks_supplied,
117 });
118
119 self.handle.owner().cache().remove(self.as_ref());
122
123 let receiver_lock =
126 self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
127 if receiver_lock.is_strong() {
128 new_blob.open_count_add_one();
133 let old_blob = self.clone();
134 Epoch::global().defer(move || old_blob.open_count_sub_one());
135 }
136 new_blob
137 }
138
139 pub fn root(&self) -> Hash {
140 self.merkle_root
141 }
142
143 fn record_page_fault_metric(&self, range: &Range<u64>) {
144 let chunk_size: u64 = min_chunk_size(&self.compression_info);
145
146 let first_chunk = range.start / chunk_size;
147 let last_chunk = range.end.div_ceil(chunk_size);
149
150 let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
151
152 if supplied_count > 0 {
153 self.handle
154 .owner()
155 .blob_resupplied_count()
156 .increment(supplied_count, Ordering::Relaxed);
157 }
158 }
159}
160
161impl Drop for FxBlob {
162 fn drop(&mut self) {
163 let volume = self.handle.owner();
164 volume.cache().remove(self);
165 }
166}
167
168impl OpenedNode<FxBlob> {
169 pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
176 let blob = self.0.as_ref();
177 let child_vmo = blob.vmo.create_child(
178 zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
179 0,
180 0,
181 )?;
182 if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
183 blob.open_count_add_one();
187 }
188 Ok(child_vmo)
189 }
190}
191
192impl FxNode for FxBlob {
193 fn object_id(&self) -> u64 {
194 self.handle.object_id()
195 }
196
197 fn parent(&self) -> Option<Arc<FxDirectory>> {
198 unreachable!(); }
200
201 fn set_parent(&self, _parent: Arc<FxDirectory>) {
202 }
204
205 fn open_count_add_one(&self) {
206 let old = self.open_count.fetch_add(1, Ordering::Relaxed);
207 assert!(old != PURGED && old != PURGED - 1);
208 }
209
210 fn open_count_sub_one(self: Arc<Self>) {
211 let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
212 assert!(old & !PURGED > 0);
213 if old == PURGED + 1 {
214 let store = self.handle.store();
215 store
216 .filesystem()
217 .graveyard()
218 .queue_tombstone_object(store.store_object_id(), self.object_id());
219 }
220 }
221
222 fn object_descriptor(&self) -> ObjectDescriptor {
223 ObjectDescriptor::File
224 }
225
226 fn terminate(&self) {
227 self.pager_packet_receiver_registration.stop_watching_for_zero_children();
228 }
229
230 fn mark_to_be_purged(self: Arc<Self>) {
231 let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
232 assert!(old & PURGED == 0);
233 if old == 0 {
234 let store = self.handle.store();
235 store
236 .filesystem()
237 .graveyard()
238 .queue_tombstone_object(store.store_object_id(), self.object_id());
239 }
240 }
241}
242
243impl PagerBacked for FxBlob {
244 fn pager(&self) -> &crate::pager::Pager {
245 self.handle.owner().pager()
246 }
247
248 fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
249 &self.pager_packet_receiver_registration
250 }
251
252 fn vmo(&self) -> &zx::Vmo {
253 &self.vmo
254 }
255
256 fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
257 let read_ahead_size = if let Some(compression_info) = &self.compression_info {
258 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
259 } else {
260 READ_AHEAD_SIZE
261 };
262 default_page_in(self, range, read_ahead_size)
264 }
265
266 fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
267 unreachable!();
268 }
269
270 fn on_zero_children(self: Arc<Self>) {
271 self.open_count_sub_one();
272 }
273
274 fn byte_size(&self) -> u64 {
275 self.uncompressed_size
276 }
277
278 async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
279 self.record_page_fault_metric(&range);
280
281 let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
282 let read = match &self.compression_info {
283 None => self.handle.read(range.start, buffer.as_mut()).await?,
284 Some(compression_info) => {
285 let compressed_offsets =
286 match compression_info.compressed_range_for_uncompressed_range(&range)? {
287 (start, None) => start..self.handle.get_size(),
288 (start, Some(end)) => start..end.get(),
289 };
290 let bs = self.handle.block_size();
291 let aligned = round_down(compressed_offsets.start, bs)
292 ..round_up(compressed_offsets.end, bs).unwrap();
293 let mut compressed_buf =
294 self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
295
296 let mut decompression_errors = 0;
297 let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
298 loop {
299 let (read, _) = try_join!(
300 self.handle.read(aligned.start, compressed_buf.as_mut()),
301 async {
302 buffer
303 .allocator()
304 .buffer_source()
305 .commit_range(buffer.range())
306 .map_err(|e| e.into())
307 }
308 )
309 .with_context(|| {
310 format!(
311 "Failed to read compressed range {:?}, len {}",
312 aligned,
313 self.handle.get_size()
314 )
315 })?;
316 let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
317 ..(compressed_offsets.end - aligned.start) as usize;
318 ensure!(
319 read >= compressed_buf_range.end - compressed_buf_range.start,
320 anyhow!(FxfsError::Inconsistent).context(format!(
321 "Unexpected EOF, read {}, but expected {}",
322 read,
323 compressed_buf_range.end - compressed_buf_range.start,
324 ))
325 );
326
327 let buf = buffer.as_mut_slice();
328 let decompression_result = {
329 fxfs_trace::duration!("blob-decompress", "len" => len);
330 compression_info.decompress(
331 &compressed_buf.as_slice()[compressed_buf_range],
332 &mut buf[..len],
333 range.start,
334 )
335 };
336 match decompression_result {
337 Ok(()) => break,
338 Err(error) => {
339 record_decompression_error_crash_report(
340 compressed_buf.as_slice(),
341 &range,
342 &compressed_offsets,
343 &self.merkle_root,
344 )
345 .await;
346 decompression_errors += 1;
347 if decompression_errors == 2 {
348 bail!(
349 anyhow!(FxfsError::IntegrityError)
350 .context(format!("Decompression error: {error:?}"))
351 );
352 } else {
353 warn!(error:?; "Decompression error; retrying");
354 }
355 }
356 }
357 } if decompression_errors > 0 {
359 info!("Read succeeded on second attempt");
360 }
361 len
362 }
363 };
364 {
365 fxfs_trace::duration!("blob-verify", "len" => read);
368 self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
369 }
370 buffer.as_mut_slice()[read..].fill(0);
372 Ok(buffer)
373 }
374}
375
376pub struct CompressionInfo {
377 chunk_size: u64,
378 small_offsets: Box<[u32]>,
381 large_offsets: Box<[u64]>,
382 decompressor: ThreadLocalDecompressor,
383}
384
385impl CompressionInfo {
386 pub fn new(
387 chunk_size: u64,
388 offsets: &[u64],
389 compression_algorithm: CompressionAlgorithm,
390 ) -> Result<Self, Error> {
391 let decompressor = compression_algorithm.thread_local_decompressor();
392 if chunk_size == 0 {
393 return Err(FxfsError::IntegrityError.into());
394 } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
395 Err(FxfsError::IntegrityError.into())
397 } else if !offsets.array_windows().all(|[a, b]| a < b) {
398 Err(FxfsError::IntegrityError.into())
400 } else if offsets.len() == 1 {
401 Ok(Self {
404 chunk_size,
405 small_offsets: Box::default(),
406 large_offsets: Box::default(),
407 decompressor,
408 })
409 } else if *offsets.last().unwrap() <= u32::MAX as u64 {
410 Ok(Self {
413 chunk_size,
414 small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
415 large_offsets: Box::default(),
416 decompressor,
417 })
418 } else {
419 let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
421 Ok(Self {
422 chunk_size,
423 small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
424 large_offsets: offsets[partition_point..].into(),
425 decompressor,
426 })
427 }
428 }
429
430 fn compressed_range_for_uncompressed_range(
431 &self,
432 range: &Range<u64>,
433 ) -> Result<(u64, Option<NonZero<u64>>), Error> {
434 ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
435 ensure!(range.start < range.end, FxfsError::Inconsistent);
436
437 let start_chunk_index = (range.start / self.chunk_size) as usize;
438 let start_offset = self
439 .compressed_offset_for_chunk_index(start_chunk_index)
440 .ok_or(FxfsError::OutOfRange)?;
441
442 let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
444 let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
445 None => None,
446 Some(offset) => {
447 ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
449 Some(NonZero::new(offset).unwrap())
452 }
453 };
454 Ok((start_offset, end_offset))
455 }
456
457 fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
458 if chunk_index == 0 {
460 Some(0)
461 } else if chunk_index - 1 < self.small_offsets.len() {
462 Some(self.small_offsets[chunk_index - 1] as u64)
463 } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
464 Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
465 } else {
466 None
467 }
468 }
469
470 fn decompress(
476 &self,
477 mut src: &[u8],
478 mut dst: &mut [u8],
479 dst_start_offset: u64,
480 ) -> Result<(), Error> {
481 ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
482
483 let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
484 let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
485 let mut start_offset = self
486 .compressed_offset_for_chunk_index(start_chunk_index)
487 .ok_or(FxfsError::Inconsistent)?;
488
489 for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
491 match self.compressed_offset_for_chunk_index(chunk_index + 1) {
492 Some(end_offset) => {
493 let (to_decompress, src_remaining) = src
494 .split_at_checked((end_offset - start_offset) as usize)
495 .ok_or(FxfsError::Inconsistent)?;
496 let (to_decompress_into, dst_remaining) = dst
497 .split_at_mut_checked(self.chunk_size as usize)
498 .ok_or(FxfsError::Inconsistent)?;
499
500 let decompressed_bytes = self.decompressor.decompress_into(
501 to_decompress,
502 to_decompress_into,
503 chunk_index,
504 )?;
505 ensure!(
506 decompressed_bytes == to_decompress_into.len(),
507 FxfsError::Inconsistent
508 );
509 src = src_remaining;
510 dst = dst_remaining;
511 start_offset = end_offset;
512 }
513 None => {
514 let decompressed_bytes =
515 self.decompressor.decompress_into(src, dst, chunk_index)?;
516 ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
517 }
518 }
519 }
520
521 Ok(())
522 }
523}
524
525fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
526 let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
527 let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
528 let name = zx::Name::new(&name).unwrap();
529 vmo.set_name(&name).unwrap();
530}
531
532fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
533 if let Some(compression_info) = compression_info {
534 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
535 } else {
536 READ_AHEAD_SIZE
537 }
538}
539
540fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
541 if chunk_size >= suggested_read_ahead_size {
542 chunk_size
543 } else {
544 round_down(suggested_read_ahead_size, chunk_size)
545 }
546}
547
548async fn record_decompression_error_crash_report(
549 compressed_buf: &[u8],
550 uncompressed_offsets: &Range<u64>,
551 compressed_offsets: &Range<u64>,
552 merkle_root: &Hash,
553) {
554 static DONE_ONCE: AtomicBool = AtomicBool::new(false);
555 if !DONE_ONCE.swap(true, Ordering::Relaxed) {
556 if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
557 let size = compressed_buf.len() as u64;
558 let vmo = zx::Vmo::create(size).unwrap();
559 vmo.write(compressed_buf, 0).unwrap();
560 if let Err(e) = proxy
561 .file_report(CrashReport {
562 program_name: Some("fxfs".to_string()),
563 crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
564 is_fatal: Some(false),
565 annotations: Some(vec![
566 Annotation {
567 key: "fxfs.range".to_string(),
568 value: format!("{:?}", uncompressed_offsets),
569 },
570 Annotation {
571 key: "fxfs.compressed_offsets".to_string(),
572 value: format!("{:?}", compressed_offsets),
573 },
574 Annotation {
575 key: "fxfs.merkle_root".to_string(),
576 value: format!("{}", merkle_root),
577 },
578 ]),
579 attachments: Some(vec![Attachment {
580 key: "fxfs_compressed_data".to_string(),
581 value: Buffer { vmo, size },
582 }]),
583 ..Default::default()
584 })
585 .await
586 {
587 error!(e:?; "Failed to file crash report");
588 } else {
589 warn!("Filed crash report for decompression error");
590 }
591 } else {
592 error!("Failed to connect to crash report service");
593 }
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
601 use crate::fuchsia::pager::PageInRange;
602 use crate::fxblob::testing::open_blob_fixture;
603 use assert_matches::assert_matches;
604 use delivery_blob::CompressionMode;
605 use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
606 use fuchsia_async as fasync;
607 use fuchsia_async::epoch::Epoch;
608 use fxfs_make_blob_image::FxBlobBuilder;
609 use storage_device::DeviceHolder;
610 use storage_device::fake_device::FakeDevice;
611
612 const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
613 const CHUNK_SIZE: usize = 32 * 1024;
614
615 #[fasync::run(10, test)]
616 async fn test_empty_blob() {
617 let fixture = new_blob_fixture().await;
618
619 let data = vec![];
620 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
621 assert_eq!(fixture.read_blob(hash).await, data);
622
623 fixture.close().await;
624 }
625
626 #[fasync::run(10, test)]
627 async fn test_large_blob() {
628 let fixture = new_blob_fixture().await;
629
630 let data = vec![3; 3_000_000];
631 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
632
633 assert_eq!(fixture.read_blob(hash).await, data);
634
635 fixture.close().await;
636 }
637
638 #[fasync::run(10, test)]
639 async fn test_large_compressed_blob() {
640 let fixture = new_blob_fixture().await;
641
642 let data = vec![3; 3_000_000];
643 let hash = fixture.write_blob(&data, CompressionMode::Always).await;
644
645 assert_eq!(fixture.read_blob(hash).await, data);
646
647 fixture.close().await;
648 }
649
650 #[fasync::run(10, test)]
651 async fn test_non_page_aligned_blob() {
652 let fixture = new_blob_fixture().await;
653
654 let page_size = zx::system_get_page_size() as usize;
655 let data = vec![0xffu8; page_size - 1];
656 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
657 assert_eq!(fixture.read_blob(hash).await, data);
658
659 {
660 let vmo = fixture.get_blob_vmo(hash).await;
661 let mut buf = vec![0x11u8; page_size];
662 vmo.read(&mut buf[..], 0).expect("vmo read failed");
663 assert_eq!(data, buf[..data.len()]);
664 assert_eq!(buf[data.len()], 0);
666 }
667
668 fixture.close().await;
669 }
670
671 #[fasync::run(10, test)]
672 async fn test_blob_invalid_contents() {
673 let fixture = new_blob_fixture().await;
674
675 let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
676 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
677 let name = format!("{}", hash);
678
679 {
680 let handle = fixture.get_blob_handle(&name).await;
682 let mut transaction =
683 handle.new_transaction().await.expect("failed to create transaction");
684 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
685 buf.as_mut_slice().fill(0);
686 handle
687 .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
688 .await
689 .expect("txn_write failed");
690 transaction.commit().await.expect("failed to commit transaction");
691 }
692
693 {
694 let blob_vmo = fixture.get_blob_vmo(hash).await;
695 let mut buf = vec![0; BLOCK_SIZE as usize];
696 assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
697 assert_matches!(
698 blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
699 Err(zx::Status::IO_DATA_INTEGRITY)
700 );
701 }
702
703 fixture.close().await;
704 }
705
706 #[fasync::run(10, test)]
707 async fn test_lz4_blob() {
708 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
709 let blob_data = vec![0xAA; 68 * 1024];
710 let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
711 let blob = fxblob_builder
712 .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
713 .unwrap();
714 let blob_hash = blob.hash();
715 fxblob_builder.install_blob(&blob).await.unwrap();
716 let device = fxblob_builder.finalize().await.unwrap().0;
717 device.reopen(false);
718 let fixture = open_blob_fixture(device).await;
719
720 assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
721
722 fixture.close().await;
723 }
724
725 #[fasync::run(10, test)]
726 async fn test_blob_vmos_are_immutable() {
727 let fixture = new_blob_fixture().await;
728
729 let data = vec![0xffu8; 500];
730 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
731 let blob_vmo = fixture.get_blob_vmo(hash).await;
732
733 assert_matches!(blob_vmo.set_size(20), Err(_));
735
736 assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
738
739 assert_matches!(blob_vmo.set_stream_size(20), Err(_));
741
742 fixture.close().await;
743 }
744
745 const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
746 const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
747 const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
748
749 #[fuchsia::test]
750 fn test_compression_info_offsets_must_start_with_zero() {
751 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
752 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
753 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
754 }
755
756 #[fuchsia::test]
757 fn test_compression_info_offsets_must_be_sorted() {
758 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
759 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
760 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
761 }
762
763 #[fuchsia::test]
764 fn test_compression_info_splitting_offsets() {
765 let compression_info =
767 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
768 assert!(compression_info.small_offsets.is_empty());
769 assert!(compression_info.large_offsets.is_empty());
770
771 let compression_info =
773 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
774 assert_eq!(&*compression_info.small_offsets, &[10]);
775 assert!(compression_info.large_offsets.is_empty());
776
777 let compression_info =
779 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
780 assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
781 assert!(compression_info.large_offsets.is_empty());
782
783 let compression_info =
785 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
786 .unwrap();
787 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
788 assert!(compression_info.large_offsets.is_empty());
789
790 let compression_info =
792 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
793 assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
794 assert!(compression_info.large_offsets.is_empty());
795
796 let compression_info =
798 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
799 .unwrap();
800 assert!(compression_info.small_offsets.is_empty());
801 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
802
803 let compression_info = CompressionInfo::new(
805 COMPRESSED_BLOB_CHUNK_SIZE,
806 &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
807 ZSTD,
808 )
809 .unwrap();
810 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
811 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
812
813 let compression_info =
815 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
816 .unwrap();
817 assert!(compression_info.small_offsets.is_empty());
818 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
819
820 let compression_info = CompressionInfo::new(
822 COMPRESSED_BLOB_CHUNK_SIZE,
823 &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
824 ZSTD,
825 )
826 .unwrap();
827 assert!(compression_info.small_offsets.is_empty());
828 assert_eq!(
829 &*compression_info.large_offsets,
830 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
831 );
832
833 let compression_info = CompressionInfo::new(
835 COMPRESSED_BLOB_CHUNK_SIZE,
836 &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
837 ZSTD,
838 )
839 .unwrap();
840 assert_eq!(&*compression_info.small_offsets, &[10, 20]);
841 assert_eq!(
842 &*compression_info.large_offsets,
843 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
844 );
845 }
846
847 #[fuchsia::test]
848 fn test_compression_info_compressed_range_for_uncompressed_range() {
849 fn check_compression_ranges(
850 offsets: &[u64],
851 expected_ranges: &[(u64, Option<u64>)],
852 chunk_size: u64,
853 read_ahead_size: u64,
854 ) {
855 let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
856 for (i, range) in expected_ranges.iter().enumerate() {
857 let i = i as u64;
858 let result = compression_info
859 .compressed_range_for_uncompressed_range(
860 &(i * read_ahead_size..(i + 1) * read_ahead_size),
861 )
862 .unwrap();
863 assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
864 }
865 }
866 check_compression_ranges(
867 &[0, 10, 20, 30],
868 &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
869 COMPRESSED_BLOB_CHUNK_SIZE,
870 COMPRESSED_BLOB_CHUNK_SIZE,
871 );
872 check_compression_ranges(
873 &[0, 10, 20, 30],
874 &[(0, Some(20)), (20, None)],
875 COMPRESSED_BLOB_CHUNK_SIZE,
876 COMPRESSED_BLOB_CHUNK_SIZE * 2,
877 );
878 check_compression_ranges(
879 &[0, 10, 20, 30],
880 &[(0, None)],
881 COMPRESSED_BLOB_CHUNK_SIZE,
882 COMPRESSED_BLOB_CHUNK_SIZE * 4,
883 );
884 check_compression_ranges(
885 &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
886 &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
887 COMPRESSED_BLOB_CHUNK_SIZE,
888 COMPRESSED_BLOB_CHUNK_SIZE * 4,
889 );
890 check_compression_ranges(
891 &[
892 0,
893 10,
894 20,
895 30,
896 MAX_SMALL_OFFSET + 10,
897 MAX_SMALL_OFFSET + 20,
898 MAX_SMALL_OFFSET + 30,
899 MAX_SMALL_OFFSET + 40,
900 MAX_SMALL_OFFSET + 50,
901 ],
902 &[
903 (0, Some(20)),
904 (20, Some(MAX_SMALL_OFFSET + 10)),
905 (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
906 (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
907 ],
908 COMPRESSED_BLOB_CHUNK_SIZE,
909 COMPRESSED_BLOB_CHUNK_SIZE * 2,
910 );
911 }
912
913 #[fuchsia::test]
914 fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
915 let compression_info = CompressionInfo::new(
916 COMPRESSED_BLOB_CHUNK_SIZE,
917 &[
918 0,
919 10,
920 20,
921 30,
922 MAX_SMALL_OFFSET + 10,
923 MAX_SMALL_OFFSET + 20,
924 MAX_SMALL_OFFSET + 30,
925 MAX_SMALL_OFFSET + 40,
926 MAX_SMALL_OFFSET + 50,
927 ],
928 ZSTD,
929 )
930 .unwrap();
931
932 assert!(
934 compression_info
935 .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
936 .is_err()
937 );
938
939 assert!(
941 compression_info
942 .compressed_range_for_uncompressed_range(
943 &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
944 )
945 .is_err()
946 );
947
948 assert!(
950 compression_info
951 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
952 .is_err()
953 );
954 assert!(
955 compression_info
956 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
957 .is_err()
958 );
959 assert!(
960 compression_info
961 .compressed_range_for_uncompressed_range(
962 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
963 )
964 .is_err()
965 );
966 assert!(
967 compression_info
968 .compressed_range_for_uncompressed_range(
969 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
970 )
971 .is_err()
972 );
973
974 assert!(
976 compression_info
977 .compressed_range_for_uncompressed_range(
978 &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
979 )
980 .is_ok()
981 );
982 }
983
984 #[fuchsia::test]
985 fn test_read_ahead_size_for_chunk_size() {
986 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
987 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
988 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
989
990 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
991 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
992 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
993 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
994
995 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
996 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
997 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
998 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
999 }
1000
1001 fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1002 let options =
1003 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1004 let mut compressor = options.compressor();
1005 let mut uncompressed_data = Vec::with_capacity(size);
1006 {
1007 let mut run_length = 1;
1008 let mut run_value: u8 = 0;
1009 while uncompressed_data.len() < size {
1010 uncompressed_data
1011 .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1012 run_length = (run_length + 1) % 19 + 1;
1013 run_value = (run_value + 1) % 17;
1014 }
1015 }
1016 let mut compressed_offsets = vec![0];
1017 let mut compressed_data = vec![];
1018 for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1019 let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1020 compressed_data.append(&mut compressed_chunk);
1021 compressed_offsets.push(compressed_data.len() as u64);
1022 }
1023 compressed_offsets.pop();
1024 (
1025 CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1026 .unwrap(),
1027 compressed_data,
1028 uncompressed_data,
1029 )
1030 }
1031
1032 #[fuchsia::test]
1033 fn test_compression_info_decompress_single_chunk() {
1034 let (compression_info, compressed_data, uncompressed_data) =
1035 build_compression_info(CHUNK_SIZE);
1036 let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1037
1038 compression_info
1039 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1040 .expect("failed to decompress");
1041 assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1042
1043 compression_info
1045 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1046 .expect_err("decompression should fail");
1047
1048 compression_info
1050 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1051 .expect_err("decompression should fail");
1052 }
1053
1054 #[fuchsia::test]
1055 fn test_compression_info_decompress_multiple_chunks() {
1056 fn slice_for_chunks<'a>(
1057 compressed_data: &'a [u8],
1058 compression_info: &CompressionInfo,
1059 chunks: Range<u64>,
1060 ) -> &'a [u8] {
1061 let (start, end) = compression_info
1062 .compressed_range_for_uncompressed_range(
1063 &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1064 )
1065 .unwrap();
1066 let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1067 &compressed_data[start as usize..end as usize]
1068 }
1069
1070 const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1071 let (compression_info, compressed_data, uncompressed_data) =
1072 build_compression_info(BLOB_SIZE);
1073 let mut decompressed_data = vec![0u8; BLOB_SIZE];
1074
1075 compression_info
1077 .decompress(&compressed_data, &mut decompressed_data, 0)
1078 .expect("failed to decompress");
1079 assert_eq!(uncompressed_data, decompressed_data);
1080
1081 compression_info
1083 .decompress(
1084 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1085 &mut decompressed_data[0..CHUNK_SIZE * 4],
1086 0,
1087 )
1088 .expect("failed to decompress");
1089 assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1090
1091 compression_info
1093 .decompress(
1094 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1095 &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1096 0,
1097 )
1098 .expect_err("decompression should fail");
1099
1100 compression_info
1102 .decompress(
1103 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1104 &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1105 0,
1106 )
1107 .expect_err("decompression should fail");
1108
1109 let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1111 compression_info
1112 .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1113 .expect("failed to decompress");
1114 assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1115
1116 compression_info
1118 .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1119 .expect_err("decompression should fail");
1120
1121 compression_info
1123 .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1124 .expect_err("decompression should fail");
1125 }
1126
1127 #[fasync::run(10, test)]
1128 async fn test_refault_metric() {
1129 let fixture = new_blob_fixture().await;
1130 {
1131 let volume = fixture.volume().volume().clone();
1132 const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1133 let data = vec![0xffu8; FILE_SIZE as usize];
1134 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1135
1136 let blob = fixture.get_blob(hash).await.unwrap();
1137 assert_eq!(blob.chunks_supplied.len(), 4);
1138 assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1140
1141 blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1142
1143 assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1144
1145 blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1146 assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1147
1148 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1150
1151 blob.clone().page_in(PageInRange::new(
1156 FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1157 blob.clone(),
1158 Epoch::global().guard(),
1159 ));
1160 Epoch::global().barrier().await;
1161
1162 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1163 }
1164
1165 fixture.close().await;
1166 }
1167}