1use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13 MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer as MemBuffer;
21use fuchsia_component_client::connect_to_protocol;
22use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
23use futures::try_join;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::lock_keys;
27use fxfs::log::*;
28use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
29use fxfs::object_store::transaction::LockKey;
30use fxfs::object_store::{AttributeId, DataObjectHandle, ObjectDescriptor, StoreObjectHandle};
31use fxfs::round::{round_down, round_up};
32use fxfs_macros::ToWeakNode;
33use std::num::NonZero;
34use std::ops::Range;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37use storage_device::buffer::{Buffer, BufferFuture, MutableBufferRef};
38use zx::Status;
39
40const PURGED: usize = 1 << (usize::BITS - 1);
44
45#[derive(ToWeakNode)]
47pub struct FxBlob {
48 handle: StoreObjectHandle<FxVolume>,
49 vmo: zx::Vmo,
50 open_count: AtomicUsize,
51 merkle_root: Hash,
52 merkle_verifier: ReadSizedMerkleVerifier,
53 compression_info: Option<CompressionInfo>,
54 uncompressed_size: u64, stored_size: u64,
56 pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
57 chunks_supplied: AtomicBitVec,
58}
59
60static_assertions::const_assert!(size_of::<FxBlob>() <= 192);
62
63impl FxBlob {
64 pub async fn new(
65 handle: StoreObjectHandle<FxVolume>,
66 merkle_root: Hash,
67 ) -> Result<Arc<Self>, Error> {
68 let stored_size =
69 handle.store().get_attribute_size(handle.object_id(), AttributeId::DATA).await?;
70 let metadata = BlobMetadata::read_from(&handle).await?;
71 let (uncompressed_size, compression_info) = match &metadata.format {
72 BlobFormat::Uncompressed => (stored_size, None),
73 BlobFormat::ChunkedZstd { uncompressed_size, chunk_size, compressed_offsets } => (
74 *uncompressed_size,
75 Some(CompressionInfo::new(
76 *chunk_size,
77 compressed_offsets,
78 CompressionAlgorithm::Zstd,
79 )?),
80 ),
81 BlobFormat::ChunkedLz4 { uncompressed_size, chunk_size, compressed_offsets } => (
82 *uncompressed_size,
83 Some(CompressionInfo::new(
84 *chunk_size,
85 compressed_offsets,
86 CompressionAlgorithm::Lz4,
87 )?),
88 ),
89 };
90 let merkle_verifier = metadata.into_merkle_verifier(merkle_root)?;
91
92 let min_chunk_size = min_chunk_size(&compression_info);
93 let merkle_verifier =
94 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
95 let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
96
97 Ok(Arc::new_cyclic(|weak| {
98 let (vmo, pager_packet_receiver_registration) = handle
99 .owner()
100 .pager()
101 .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
102 .unwrap();
103 set_vmo_name(&vmo, &merkle_root);
104 Self {
105 handle,
106 vmo,
107 open_count: AtomicUsize::new(0),
108 merkle_root,
109 merkle_verifier,
110 compression_info,
111 uncompressed_size,
112 stored_size,
113 pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
114 chunks_supplied,
115 }
116 }))
117 }
118
119 pub fn overwrite_me(
121 self: &Arc<Self>,
122 handle: DataObjectHandle<FxVolume>,
123 merkle_verifier: MerkleVerifier,
124 compression_info: Option<CompressionInfo>,
125 ) -> Arc<Self> {
126 let min_chunk_size = min_chunk_size(&compression_info);
127 let merkle_verifier =
128 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
129 .expect("The chunk size should have been validated by the delivery blob parser");
130 let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
133 let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
134 let stored_size = handle.get_size();
135
136 let new_blob = Arc::new(Self {
137 handle: handle.into_store_object_handle(),
138 vmo,
139 open_count: AtomicUsize::new(0),
140 merkle_root: self.merkle_root,
141 merkle_verifier,
142 compression_info,
143 uncompressed_size: self.uncompressed_size,
144 stored_size,
145 pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
146 chunks_supplied,
147 });
148
149 self.handle.owner().cache().remove(self.as_ref());
152
153 let receiver_lock =
156 self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
157 if receiver_lock.is_strong() {
158 new_blob.open_count_add_one();
164 self.clone().open_count_sub_one();
165 }
166 new_blob
167 }
168
169 pub fn root(&self) -> Hash {
170 self.merkle_root
171 }
172
173 fn record_page_fault_metric(&self, range: &Range<u64>) {
174 let chunk_size: u64 = min_chunk_size(&self.compression_info);
175
176 let first_chunk = range.start / chunk_size;
177 let last_chunk = range.end.div_ceil(chunk_size);
179
180 let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
181
182 if supplied_count > 0 {
183 self.handle
184 .owner()
185 .blob_resupplied_count()
186 .increment(supplied_count, Ordering::Relaxed);
187 }
188 }
189
190 fn allocate_buffer(&self, size: u64) -> BufferFuture<'_> {
191 self.handle.store().device().allocate_buffer(size as usize)
192 }
193
194 async fn read_blocks(&self, offset: u64, buf: MutableBufferRef<'_>) -> Result<(), Error> {
195 let fs = self.handle.store().filesystem();
196 let guard = fs
197 .lock_manager()
198 .read_lock(lock_keys![LockKey::object_attribute(
199 self.handle.store().store_object_id(),
200 self.handle.object_id(),
201 AttributeId::DATA,
202 )])
203 .await;
204 self.handle.read_unchecked(AttributeId::DATA, offset, buf, &guard).await
205 }
206}
207
208impl Drop for FxBlob {
209 fn drop(&mut self) {
210 let volume = self.handle.owner();
211 volume.cache().remove(self);
212 }
213}
214
215impl OpenedNode<FxBlob> {
216 pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
223 let blob = self.0.as_ref();
224 let child_vmo = blob.vmo.create_child(
225 zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
226 0,
227 0,
228 )?;
229 if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
230 blob.open_count_add_one();
234 }
235 Ok(child_vmo)
236 }
237}
238
239impl FxNode for FxBlob {
240 fn object_id(&self) -> u64 {
241 self.handle.object_id()
242 }
243
244 fn parent(&self) -> Option<Arc<FxDirectory>> {
245 unreachable!(); }
247
248 fn set_parent(&self, _parent: Arc<FxDirectory>) {
249 }
251
252 fn open_count_add_one(&self) {
253 let old = self.open_count.fetch_add(1, Ordering::Relaxed);
254 assert!(old != PURGED && old != PURGED - 1);
255 }
256
257 fn open_count_sub_one(self: Arc<Self>) {
258 let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
259 assert!(old & !PURGED > 0);
260 if old == PURGED + 1 {
261 let store = self.handle.store();
262 store
263 .filesystem()
264 .graveyard()
265 .queue_tombstone_object(store.store_object_id(), self.object_id());
266 }
267 }
268
269 fn object_descriptor(&self) -> ObjectDescriptor {
270 ObjectDescriptor::File
271 }
272
273 fn terminate(&self) {
274 self.pager_packet_receiver_registration.stop_watching_for_zero_children();
275 }
276
277 fn mark_to_be_purged(self: Arc<Self>) {
278 let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
279 assert!(old & PURGED == 0);
280 if old == 0 {
281 let store = self.handle.store();
282 store
283 .filesystem()
284 .graveyard()
285 .queue_tombstone_object(store.store_object_id(), self.object_id());
286 }
287 }
288}
289
290impl PagerBacked for FxBlob {
291 fn try_keep_open(self: Arc<Self>) -> Result<OpenedNode<Self>, Arc<Self>> {
292 let mut old = self.open_count.load(Ordering::Relaxed);
293 loop {
294 if old & !PURGED == 0 {
295 return Err(self);
296 }
297 assert!(old & !PURGED < PURGED - 1);
298 match self.open_count.compare_exchange_weak(
299 old,
300 old + 1,
301 Ordering::Relaxed,
302 Ordering::Relaxed,
303 ) {
304 Ok(_) => return Ok(OpenedNode(self)),
305 Err(new_value) => old = new_value,
306 }
307 }
308 }
309
310 fn pager(&self) -> &crate::pager::Pager {
311 self.handle.owner().pager()
312 }
313
314 fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
315 &self.pager_packet_receiver_registration
316 }
317
318 fn vmo(&self) -> &zx::Vmo {
319 &self.vmo
320 }
321
322 fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
323 let read_ahead_size = if let Some(compression_info) = &self.compression_info {
324 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
325 } else {
326 READ_AHEAD_SIZE
327 };
328 default_page_in(self, range, read_ahead_size)
330 }
331
332 fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
333 unreachable!();
334 }
335
336 fn on_zero_children(self: Arc<Self>) {
337 self.open_count_sub_one();
338 }
339
340 fn byte_size(&self) -> u64 {
341 self.uncompressed_size
342 }
343
344 async fn aligned_read(&self, range: Range<u64>) -> Result<Buffer<'_>, Error> {
345 ensure!(range.start < self.uncompressed_size, FxfsError::InvalidArgs);
348 self.record_page_fault_metric(&range);
349
350 let mut buffer = self.allocate_buffer(range.end - range.start).await;
351 let unaligned_bytes =
352 (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
353 match &self.compression_info {
354 None => self.read_blocks(range.start, buffer.as_mut()).await?,
355 Some(compression_info) => {
356 let compressed_offsets =
357 match compression_info.compressed_range_for_uncompressed_range(&range)? {
358 (start, None) => start..self.stored_size,
359 (start, Some(end)) => start..end.get(),
360 };
361 let bs = self.handle.block_size();
362 let aligned = round_down(compressed_offsets.start, bs)
363 ..round_up(compressed_offsets.end, bs).unwrap();
364 let mut compressed_buf = self.allocate_buffer(aligned.end - aligned.start).await;
365
366 let mut decompression_errors = 0;
367 loop {
368 try_join!(self.read_blocks(aligned.start, compressed_buf.as_mut()), async {
369 buffer
370 .allocator()
371 .buffer_source()
372 .commit_range(buffer.range())
373 .map_err(|e| e.into())
374 })
375 .with_context(|| {
376 format!(
377 "Failed to read compressed range {:?}, len {}",
378 aligned, self.stored_size
379 )
380 })?;
381 let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
382 ..(compressed_offsets.end - aligned.start) as usize;
383
384 let buf = buffer.as_mut_slice();
385 let decompression_result = {
386 fxfs_trace::duration!("blob-decompress", "len" => unaligned_bytes);
387 compression_info.decompress(
388 &compressed_buf.as_slice()[compressed_buf_range],
389 &mut buf[..unaligned_bytes],
390 range.start,
391 )
392 };
393 match decompression_result {
394 Ok(()) => break,
395 Err(error) => {
396 record_decompression_error_crash_report(
397 compressed_buf.as_slice(),
398 &range,
399 &compressed_offsets,
400 &self.merkle_root,
401 )
402 .await;
403 decompression_errors += 1;
404 if decompression_errors == 2 {
405 bail!(
406 anyhow!(FxfsError::IntegrityError)
407 .context(format!("Decompression error: {error:?}"))
408 );
409 } else {
410 warn!(error:?; "Decompression error; retrying");
411 }
412 }
413 }
414 } if decompression_errors > 0 {
416 info!("Read succeeded on second attempt");
417 }
418 }
419 };
420 {
421 fxfs_trace::duration!("blob-verify", "len" => unaligned_bytes);
424 self.merkle_verifier
425 .verify(range.start as usize, &buffer.as_slice()[..unaligned_bytes])?;
426 }
427 buffer.as_mut_slice()[unaligned_bytes..].fill(0);
429 Ok(buffer)
430 }
431}
432
433pub struct CompressionInfo {
434 chunk_size: u64,
435 small_offsets: Box<[u32]>,
438 large_offsets: Box<[u64]>,
439 decompressor: ThreadLocalDecompressor,
440}
441
442impl CompressionInfo {
443 pub fn new(
444 chunk_size: u64,
445 offsets: &[u64],
446 compression_algorithm: CompressionAlgorithm,
447 ) -> Result<Self, Error> {
448 let decompressor = compression_algorithm.thread_local_decompressor();
449 if chunk_size == 0 {
450 return Err(FxfsError::IntegrityError.into());
451 } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
452 Err(FxfsError::IntegrityError.into())
454 } else if !offsets.array_windows().all(|[a, b]| a < b) {
455 Err(FxfsError::IntegrityError.into())
457 } else if offsets.len() == 1 {
458 Ok(Self {
461 chunk_size,
462 small_offsets: Box::default(),
463 large_offsets: Box::default(),
464 decompressor,
465 })
466 } else if *offsets.last().unwrap() <= u32::MAX as u64 {
467 Ok(Self {
470 chunk_size,
471 small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
472 large_offsets: Box::default(),
473 decompressor,
474 })
475 } else {
476 let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
478 Ok(Self {
479 chunk_size,
480 small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
481 large_offsets: offsets[partition_point..].into(),
482 decompressor,
483 })
484 }
485 }
486
487 fn compressed_range_for_uncompressed_range(
488 &self,
489 range: &Range<u64>,
490 ) -> Result<(u64, Option<NonZero<u64>>), Error> {
491 ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
492 ensure!(range.start < range.end, FxfsError::Inconsistent);
493
494 let start_chunk_index = (range.start / self.chunk_size) as usize;
495 let start_offset = self
496 .compressed_offset_for_chunk_index(start_chunk_index)
497 .ok_or(FxfsError::OutOfRange)?;
498
499 let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
501 let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
502 None => None,
503 Some(offset) => {
504 ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
506 Some(NonZero::new(offset).unwrap())
509 }
510 };
511 Ok((start_offset, end_offset))
512 }
513
514 fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
515 if chunk_index == 0 {
517 Some(0)
518 } else if chunk_index - 1 < self.small_offsets.len() {
519 Some(self.small_offsets[chunk_index - 1] as u64)
520 } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
521 Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
522 } else {
523 None
524 }
525 }
526
527 fn decompress(
533 &self,
534 mut src: &[u8],
535 mut dst: &mut [u8],
536 dst_start_offset: u64,
537 ) -> Result<(), Error> {
538 ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
539
540 let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
541 let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
542 let mut start_offset = self
543 .compressed_offset_for_chunk_index(start_chunk_index)
544 .ok_or(FxfsError::Inconsistent)?;
545
546 for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
548 match self.compressed_offset_for_chunk_index(chunk_index + 1) {
549 Some(end_offset) => {
550 let (to_decompress, src_remaining) = src
551 .split_at_checked((end_offset - start_offset) as usize)
552 .ok_or(FxfsError::Inconsistent)?;
553 let (to_decompress_into, dst_remaining) = dst
554 .split_at_mut_checked(self.chunk_size as usize)
555 .ok_or(FxfsError::Inconsistent)?;
556
557 let decompressed_bytes = self.decompressor.decompress_into(
558 to_decompress,
559 to_decompress_into,
560 chunk_index,
561 )?;
562 ensure!(
563 decompressed_bytes == to_decompress_into.len(),
564 FxfsError::Inconsistent
565 );
566 src = src_remaining;
567 dst = dst_remaining;
568 start_offset = end_offset;
569 }
570 None => {
571 let decompressed_bytes =
572 self.decompressor.decompress_into(src, dst, chunk_index)?;
573 ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
574 }
575 }
576 }
577
578 Ok(())
579 }
580}
581
582fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
583 let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
584 let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
585 let name = zx::Name::new(&name).unwrap();
586 vmo.set_name(&name).unwrap();
587}
588
589fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
590 if let Some(compression_info) = compression_info {
591 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
592 } else {
593 READ_AHEAD_SIZE
594 }
595}
596
597fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
598 if chunk_size >= suggested_read_ahead_size {
599 chunk_size
600 } else {
601 round_down(suggested_read_ahead_size, chunk_size)
602 }
603}
604
605async fn record_decompression_error_crash_report(
606 compressed_buf: &[u8],
607 uncompressed_offsets: &Range<u64>,
608 compressed_offsets: &Range<u64>,
609 merkle_root: &Hash,
610) {
611 static DONE_ONCE: AtomicBool = AtomicBool::new(false);
612 if !DONE_ONCE.swap(true, Ordering::Relaxed) {
613 if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
614 let size = compressed_buf.len() as u64;
615 let vmo = zx::Vmo::create(size).unwrap();
616 vmo.write(compressed_buf, 0).unwrap();
617 if let Err(e) = proxy
618 .file_report(CrashReport {
619 program_name: Some("fxfs".to_string()),
620 crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
621 is_fatal: Some(false),
622 annotations: Some(vec![
623 Annotation {
624 key: "fxfs.range".to_string(),
625 value: format!("{:?}", uncompressed_offsets),
626 },
627 Annotation {
628 key: "fxfs.compressed_offsets".to_string(),
629 value: format!("{:?}", compressed_offsets),
630 },
631 Annotation {
632 key: "fxfs.merkle_root".to_string(),
633 value: format!("{}", merkle_root),
634 },
635 ]),
636 attachments: Some(vec![Attachment {
637 key: "fxfs_compressed_data".to_string(),
638 value: MemBuffer { vmo, size },
639 }]),
640 ..Default::default()
641 })
642 .await
643 {
644 error!(e:?; "Failed to file crash report");
645 } else {
646 warn!("Filed crash report for decompression error");
647 }
648 } else {
649 error!("Failed to connect to crash report service");
650 }
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
658 use crate::fuchsia::pager::PageInRange;
659 use crate::fxblob::testing::open_blob_fixture;
660 use assert_matches::assert_matches;
661 use delivery_blob::CompressionMode;
662 use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
663 use fuchsia_async as fasync;
664 use fuchsia_async::epoch::Epoch;
665 use fxfs_make_blob_image::FxBlobBuilder;
666 use storage_device::DeviceHolder;
667 use storage_device::fake_device::FakeDevice;
668
669 const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
670 const CHUNK_SIZE: usize = 32 * 1024;
671
672 #[fasync::run(10, test)]
673 async fn test_empty_blob() {
674 let fixture = new_blob_fixture().await;
675
676 let data = vec![];
677 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678 assert_eq!(fixture.read_blob(hash).await, data);
679
680 fixture.close().await;
681 }
682
683 #[fasync::run(10, test)]
684 async fn test_large_blob() {
685 let fixture = new_blob_fixture().await;
686
687 let data = vec![3; 3_000_000];
688 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
689
690 assert_eq!(fixture.read_blob(hash).await, data);
691
692 fixture.close().await;
693 }
694
695 #[fasync::run(10, test)]
696 async fn test_large_compressed_blob() {
697 let fixture = new_blob_fixture().await;
698
699 let data = vec![3; 3_000_000];
700 let hash = fixture.write_blob(&data, CompressionMode::Always).await;
701
702 assert_eq!(fixture.read_blob(hash).await, data);
703
704 fixture.close().await;
705 }
706
707 #[fasync::run(10, test)]
708 async fn test_non_page_aligned_blob() {
709 let fixture = new_blob_fixture().await;
710
711 let page_size = zx::system_get_page_size() as usize;
712 let data = vec![0xffu8; page_size - 1];
713 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
714 assert_eq!(fixture.read_blob(hash).await, data);
715
716 {
717 let vmo = fixture.get_blob_vmo(hash).await;
718 let mut buf = vec![0x11u8; page_size];
719 vmo.read(&mut buf[..], 0).expect("vmo read failed");
720 assert_eq!(data, buf[..data.len()]);
721 assert_eq!(buf[data.len()], 0);
723 }
724
725 fixture.close().await;
726 }
727
728 #[fasync::run(10, test)]
729 async fn test_blob_invalid_contents() {
730 let fixture = new_blob_fixture().await;
731
732 let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
733 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
734 let name = format!("{}", hash);
735
736 {
737 let handle = fixture.get_blob_handle(&name).await;
739 let mut transaction =
740 handle.new_transaction().await.expect("failed to create transaction");
741 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
742 buf.as_mut_slice().fill(0);
743 handle
744 .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
745 .await
746 .expect("txn_write failed");
747 transaction.commit().await.expect("failed to commit transaction");
748 }
749
750 {
751 let blob_vmo = fixture.get_blob_vmo(hash).await;
752 let mut buf = vec![0; BLOCK_SIZE as usize];
753 assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
754 assert_matches!(
755 blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
756 Err(zx::Status::IO_DATA_INTEGRITY)
757 );
758 }
759
760 fixture.close().await;
761 }
762
763 #[fasync::run(10, test)]
764 async fn test_lz4_blob() {
765 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
766 let blob_data = vec![0xAA; 68 * 1024];
767 let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
768 let blob = fxblob_builder
769 .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
770 .unwrap();
771 let blob_hash = blob.hash();
772 fxblob_builder.install_blob(&blob).await.unwrap();
773 let device = fxblob_builder.finalize().await.unwrap().0;
774 device.reopen(false);
775 let fixture = open_blob_fixture(device).await;
776
777 assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
778
779 fixture.close().await;
780 }
781
782 #[fasync::run(10, test)]
783 async fn test_blob_vmos_are_immutable() {
784 let fixture = new_blob_fixture().await;
785
786 let data = vec![0xffu8; 500];
787 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
788 let blob_vmo = fixture.get_blob_vmo(hash).await;
789
790 assert_matches!(blob_vmo.set_size(20), Err(_));
792
793 assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
795
796 assert_matches!(blob_vmo.set_stream_size(20), Err(_));
798
799 fixture.close().await;
800 }
801
802 const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
803 const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
804 const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
805
806 #[fuchsia::test]
807 fn test_compression_info_offsets_must_start_with_zero() {
808 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
809 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
810 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
811 }
812
813 #[fuchsia::test]
814 fn test_compression_info_offsets_must_be_sorted() {
815 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
816 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
817 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
818 }
819
820 #[fuchsia::test]
821 fn test_compression_info_splitting_offsets() {
822 let compression_info =
824 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
825 assert!(compression_info.small_offsets.is_empty());
826 assert!(compression_info.large_offsets.is_empty());
827
828 let compression_info =
830 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
831 assert_eq!(&*compression_info.small_offsets, &[10]);
832 assert!(compression_info.large_offsets.is_empty());
833
834 let compression_info =
836 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
837 assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
838 assert!(compression_info.large_offsets.is_empty());
839
840 let compression_info =
842 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
843 .unwrap();
844 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
845 assert!(compression_info.large_offsets.is_empty());
846
847 let compression_info =
849 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
850 assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
851 assert!(compression_info.large_offsets.is_empty());
852
853 let compression_info =
855 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
856 .unwrap();
857 assert!(compression_info.small_offsets.is_empty());
858 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
859
860 let compression_info = CompressionInfo::new(
862 COMPRESSED_BLOB_CHUNK_SIZE,
863 &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
864 ZSTD,
865 )
866 .unwrap();
867 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
868 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
869
870 let compression_info =
872 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
873 .unwrap();
874 assert!(compression_info.small_offsets.is_empty());
875 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
876
877 let compression_info = CompressionInfo::new(
879 COMPRESSED_BLOB_CHUNK_SIZE,
880 &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
881 ZSTD,
882 )
883 .unwrap();
884 assert!(compression_info.small_offsets.is_empty());
885 assert_eq!(
886 &*compression_info.large_offsets,
887 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
888 );
889
890 let compression_info = CompressionInfo::new(
892 COMPRESSED_BLOB_CHUNK_SIZE,
893 &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
894 ZSTD,
895 )
896 .unwrap();
897 assert_eq!(&*compression_info.small_offsets, &[10, 20]);
898 assert_eq!(
899 &*compression_info.large_offsets,
900 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
901 );
902 }
903
904 #[fuchsia::test]
905 fn test_compression_info_compressed_range_for_uncompressed_range() {
906 fn check_compression_ranges(
907 offsets: &[u64],
908 expected_ranges: &[(u64, Option<u64>)],
909 chunk_size: u64,
910 read_ahead_size: u64,
911 ) {
912 let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
913 for (i, range) in expected_ranges.iter().enumerate() {
914 let i = i as u64;
915 let result = compression_info
916 .compressed_range_for_uncompressed_range(
917 &(i * read_ahead_size..(i + 1) * read_ahead_size),
918 )
919 .unwrap();
920 assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
921 }
922 }
923 check_compression_ranges(
924 &[0, 10, 20, 30],
925 &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
926 COMPRESSED_BLOB_CHUNK_SIZE,
927 COMPRESSED_BLOB_CHUNK_SIZE,
928 );
929 check_compression_ranges(
930 &[0, 10, 20, 30],
931 &[(0, Some(20)), (20, None)],
932 COMPRESSED_BLOB_CHUNK_SIZE,
933 COMPRESSED_BLOB_CHUNK_SIZE * 2,
934 );
935 check_compression_ranges(
936 &[0, 10, 20, 30],
937 &[(0, None)],
938 COMPRESSED_BLOB_CHUNK_SIZE,
939 COMPRESSED_BLOB_CHUNK_SIZE * 4,
940 );
941 check_compression_ranges(
942 &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
943 &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
944 COMPRESSED_BLOB_CHUNK_SIZE,
945 COMPRESSED_BLOB_CHUNK_SIZE * 4,
946 );
947 check_compression_ranges(
948 &[
949 0,
950 10,
951 20,
952 30,
953 MAX_SMALL_OFFSET + 10,
954 MAX_SMALL_OFFSET + 20,
955 MAX_SMALL_OFFSET + 30,
956 MAX_SMALL_OFFSET + 40,
957 MAX_SMALL_OFFSET + 50,
958 ],
959 &[
960 (0, Some(20)),
961 (20, Some(MAX_SMALL_OFFSET + 10)),
962 (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
963 (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
964 ],
965 COMPRESSED_BLOB_CHUNK_SIZE,
966 COMPRESSED_BLOB_CHUNK_SIZE * 2,
967 );
968 }
969
970 #[fuchsia::test]
971 fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
972 let compression_info = CompressionInfo::new(
973 COMPRESSED_BLOB_CHUNK_SIZE,
974 &[
975 0,
976 10,
977 20,
978 30,
979 MAX_SMALL_OFFSET + 10,
980 MAX_SMALL_OFFSET + 20,
981 MAX_SMALL_OFFSET + 30,
982 MAX_SMALL_OFFSET + 40,
983 MAX_SMALL_OFFSET + 50,
984 ],
985 ZSTD,
986 )
987 .unwrap();
988
989 assert!(
991 compression_info
992 .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
993 .is_err()
994 );
995
996 assert!(
998 compression_info
999 .compressed_range_for_uncompressed_range(
1000 &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
1001 )
1002 .is_err()
1003 );
1004
1005 assert!(
1007 compression_info
1008 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
1009 .is_err()
1010 );
1011 assert!(
1012 compression_info
1013 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
1014 .is_err()
1015 );
1016 assert!(
1017 compression_info
1018 .compressed_range_for_uncompressed_range(
1019 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
1020 )
1021 .is_err()
1022 );
1023 assert!(
1024 compression_info
1025 .compressed_range_for_uncompressed_range(
1026 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
1027 )
1028 .is_err()
1029 );
1030
1031 assert!(
1033 compression_info
1034 .compressed_range_for_uncompressed_range(
1035 &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
1036 )
1037 .is_ok()
1038 );
1039 }
1040
1041 #[fuchsia::test]
1042 fn test_read_ahead_size_for_chunk_size() {
1043 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
1044 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
1045 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
1046
1047 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
1048 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
1049 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
1050 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
1051
1052 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1053 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1054 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1055 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1056 }
1057
1058 fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1059 let options =
1060 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1061 let mut compressor = options.compressor();
1062 let mut uncompressed_data = Vec::with_capacity(size);
1063 {
1064 let mut run_length = 1;
1065 let mut run_value: u8 = 0;
1066 while uncompressed_data.len() < size {
1067 uncompressed_data
1068 .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1069 run_length = (run_length + 1) % 19 + 1;
1070 run_value = (run_value + 1) % 17;
1071 }
1072 }
1073 let mut compressed_offsets = vec![0];
1074 let mut compressed_data = vec![];
1075 for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1076 let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1077 compressed_data.append(&mut compressed_chunk);
1078 compressed_offsets.push(compressed_data.len() as u64);
1079 }
1080 compressed_offsets.pop();
1081 (
1082 CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1083 .unwrap(),
1084 compressed_data,
1085 uncompressed_data,
1086 )
1087 }
1088
1089 #[fuchsia::test]
1090 fn test_compression_info_decompress_single_chunk() {
1091 let (compression_info, compressed_data, uncompressed_data) =
1092 build_compression_info(CHUNK_SIZE);
1093 let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1094
1095 compression_info
1096 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1097 .expect("failed to decompress");
1098 assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1099
1100 compression_info
1102 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1103 .expect_err("decompression should fail");
1104
1105 compression_info
1107 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1108 .expect_err("decompression should fail");
1109 }
1110
1111 #[fuchsia::test]
1112 fn test_compression_info_decompress_multiple_chunks() {
1113 fn slice_for_chunks<'a>(
1114 compressed_data: &'a [u8],
1115 compression_info: &CompressionInfo,
1116 chunks: Range<u64>,
1117 ) -> &'a [u8] {
1118 let (start, end) = compression_info
1119 .compressed_range_for_uncompressed_range(
1120 &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1121 )
1122 .unwrap();
1123 let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1124 &compressed_data[start as usize..end as usize]
1125 }
1126
1127 const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1128 let (compression_info, compressed_data, uncompressed_data) =
1129 build_compression_info(BLOB_SIZE);
1130 let mut decompressed_data = vec![0u8; BLOB_SIZE];
1131
1132 compression_info
1134 .decompress(&compressed_data, &mut decompressed_data, 0)
1135 .expect("failed to decompress");
1136 assert_eq!(uncompressed_data, decompressed_data);
1137
1138 compression_info
1140 .decompress(
1141 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1142 &mut decompressed_data[0..CHUNK_SIZE * 4],
1143 0,
1144 )
1145 .expect("failed to decompress");
1146 assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1147
1148 compression_info
1150 .decompress(
1151 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1152 &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1153 0,
1154 )
1155 .expect_err("decompression should fail");
1156
1157 compression_info
1159 .decompress(
1160 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1161 &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1162 0,
1163 )
1164 .expect_err("decompression should fail");
1165
1166 let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1168 compression_info
1169 .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1170 .expect("failed to decompress");
1171 assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1172
1173 compression_info
1175 .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1176 .expect_err("decompression should fail");
1177
1178 compression_info
1180 .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1181 .expect_err("decompression should fail");
1182 }
1183
1184 #[fasync::run(10, test)]
1185 async fn test_refault_metric() {
1186 let fixture = new_blob_fixture().await;
1187 {
1188 let volume = fixture.volume().volume().clone();
1189 const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1190 let data = vec![0xffu8; FILE_SIZE as usize];
1191 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1192
1193 let blob = fixture.get_opened_blob(hash).await.unwrap();
1194 assert_eq!(blob.chunks_supplied.len(), 4);
1195 assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1197
1198 blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1199
1200 assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1201
1202 blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1203 assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1204
1205 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1207
1208 blob.clone().page_in(PageInRange::new(
1213 FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1214 blob.dup(),
1215 Epoch::global().guard(),
1216 ));
1217 Epoch::global().barrier().await;
1218
1219 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1220 }
1221
1222 fixture.close().await;
1223 }
1224}