1use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13 MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer as MemBuffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
26use fxfs::errors::FxfsError;
27use fxfs::lock_keys;
28use fxfs::log::*;
29use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
30use fxfs::object_store::transaction::LockKey;
31use fxfs::object_store::{
32 DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, ObjectDescriptor, StoreObjectHandle,
33};
34use fxfs::round::{round_down, round_up};
35use fxfs_macros::ToWeakNode;
36use std::num::NonZero;
37use std::ops::Range;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
40use storage_device::buffer::{Buffer, BufferFuture, MutableBufferRef};
41use zx::Status;
42
43const PURGED: usize = 1 << (usize::BITS - 1);
47
48#[derive(ToWeakNode)]
50pub struct FxBlob {
51 handle: StoreObjectHandle<FxVolume>,
52 vmo: zx::Vmo,
53 open_count: AtomicUsize,
54 merkle_root: Hash,
55 merkle_verifier: ReadSizedMerkleVerifier,
56 compression_info: Option<CompressionInfo>,
57 uncompressed_size: u64, stored_size: u64,
59 pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
60 chunks_supplied: AtomicBitVec,
61}
62
63static_assertions::const_assert!(size_of::<FxBlob>() <= 192);
65
66impl FxBlob {
67 pub async fn new(
68 handle: StoreObjectHandle<FxVolume>,
69 merkle_root: Hash,
70 ) -> Result<Arc<Self>, Error> {
71 let stored_size = handle
72 .store()
73 .get_attribute_size(handle.object_id(), DEFAULT_DATA_ATTRIBUTE_ID)
74 .await?;
75 let metadata = BlobMetadata::read_from(&handle).await?;
76 let (uncompressed_size, compression_info) = match &metadata.format {
77 BlobFormat::Uncompressed => (stored_size, None),
78 BlobFormat::ChunkedZstd { uncompressed_size, chunk_size, compressed_offsets } => (
79 *uncompressed_size,
80 Some(CompressionInfo::new(
81 *chunk_size,
82 compressed_offsets,
83 CompressionAlgorithm::Zstd,
84 )?),
85 ),
86 BlobFormat::ChunkedLz4 { uncompressed_size, chunk_size, compressed_offsets } => (
87 *uncompressed_size,
88 Some(CompressionInfo::new(
89 *chunk_size,
90 compressed_offsets,
91 CompressionAlgorithm::Lz4,
92 )?),
93 ),
94 };
95 let merkle_verifier = metadata.into_merkle_verifier(merkle_root)?;
96
97 let min_chunk_size = min_chunk_size(&compression_info);
98 let merkle_verifier =
99 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
100 let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
101
102 Ok(Arc::new_cyclic(|weak| {
103 let (vmo, pager_packet_receiver_registration) = handle
104 .owner()
105 .pager()
106 .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
107 .unwrap();
108 set_vmo_name(&vmo, &merkle_root);
109 Self {
110 handle,
111 vmo,
112 open_count: AtomicUsize::new(0),
113 merkle_root,
114 merkle_verifier,
115 compression_info,
116 uncompressed_size,
117 stored_size,
118 pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
119 chunks_supplied,
120 }
121 }))
122 }
123
124 pub fn overwrite_me(
126 self: &Arc<Self>,
127 handle: DataObjectHandle<FxVolume>,
128 merkle_verifier: MerkleVerifier,
129 compression_info: Option<CompressionInfo>,
130 ) -> Arc<Self> {
131 let min_chunk_size = min_chunk_size(&compression_info);
132 let merkle_verifier =
133 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
134 .expect("The chunk size should have been validated by the delivery blob parser");
135 let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
138 let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
139 let stored_size = handle.get_size();
140
141 let new_blob = Arc::new(Self {
142 handle: handle.into_store_object_handle(),
143 vmo,
144 open_count: AtomicUsize::new(0),
145 merkle_root: self.merkle_root,
146 merkle_verifier,
147 compression_info,
148 uncompressed_size: self.uncompressed_size,
149 stored_size,
150 pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
151 chunks_supplied,
152 });
153
154 self.handle.owner().cache().remove(self.as_ref());
157
158 let receiver_lock =
161 self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
162 if receiver_lock.is_strong() {
163 new_blob.open_count_add_one();
168 let old_blob = self.clone();
169 Epoch::global().defer(move || old_blob.open_count_sub_one());
170 }
171 new_blob
172 }
173
174 pub fn root(&self) -> Hash {
175 self.merkle_root
176 }
177
178 fn record_page_fault_metric(&self, range: &Range<u64>) {
179 let chunk_size: u64 = min_chunk_size(&self.compression_info);
180
181 let first_chunk = range.start / chunk_size;
182 let last_chunk = range.end.div_ceil(chunk_size);
184
185 let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
186
187 if supplied_count > 0 {
188 self.handle
189 .owner()
190 .blob_resupplied_count()
191 .increment(supplied_count, Ordering::Relaxed);
192 }
193 }
194
195 fn allocate_buffer(&self, size: u64) -> BufferFuture<'_> {
196 self.handle.store().device().allocate_buffer(size as usize)
197 }
198
199 async fn read_blocks(&self, offset: u64, buf: MutableBufferRef<'_>) -> Result<(), Error> {
200 let fs = self.handle.store().filesystem();
201 let guard = fs
202 .lock_manager()
203 .read_lock(lock_keys![LockKey::object_attribute(
204 self.handle.store().store_object_id(),
205 self.handle.object_id(),
206 DEFAULT_DATA_ATTRIBUTE_ID,
207 )])
208 .await;
209 self.handle.read_unchecked(DEFAULT_DATA_ATTRIBUTE_ID, offset, buf, &guard).await
210 }
211}
212
213impl Drop for FxBlob {
214 fn drop(&mut self) {
215 let volume = self.handle.owner();
216 volume.cache().remove(self);
217 }
218}
219
220impl OpenedNode<FxBlob> {
221 pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
228 let blob = self.0.as_ref();
229 let child_vmo = blob.vmo.create_child(
230 zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
231 0,
232 0,
233 )?;
234 if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
235 blob.open_count_add_one();
239 }
240 Ok(child_vmo)
241 }
242}
243
244impl FxNode for FxBlob {
245 fn object_id(&self) -> u64 {
246 self.handle.object_id()
247 }
248
249 fn parent(&self) -> Option<Arc<FxDirectory>> {
250 unreachable!(); }
252
253 fn set_parent(&self, _parent: Arc<FxDirectory>) {
254 }
256
257 fn open_count_add_one(&self) {
258 let old = self.open_count.fetch_add(1, Ordering::Relaxed);
259 assert!(old != PURGED && old != PURGED - 1);
260 }
261
262 fn open_count_sub_one(self: Arc<Self>) {
263 let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
264 assert!(old & !PURGED > 0);
265 if old == PURGED + 1 {
266 let store = self.handle.store();
267 store
268 .filesystem()
269 .graveyard()
270 .queue_tombstone_object(store.store_object_id(), self.object_id());
271 }
272 }
273
274 fn object_descriptor(&self) -> ObjectDescriptor {
275 ObjectDescriptor::File
276 }
277
278 fn terminate(&self) {
279 self.pager_packet_receiver_registration.stop_watching_for_zero_children();
280 }
281
282 fn mark_to_be_purged(self: Arc<Self>) {
283 let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
284 assert!(old & PURGED == 0);
285 if old == 0 {
286 let store = self.handle.store();
287 store
288 .filesystem()
289 .graveyard()
290 .queue_tombstone_object(store.store_object_id(), self.object_id());
291 }
292 }
293}
294
295impl PagerBacked for FxBlob {
296 fn pager(&self) -> &crate::pager::Pager {
297 self.handle.owner().pager()
298 }
299
300 fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
301 &self.pager_packet_receiver_registration
302 }
303
304 fn vmo(&self) -> &zx::Vmo {
305 &self.vmo
306 }
307
308 fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
309 let read_ahead_size = if let Some(compression_info) = &self.compression_info {
310 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
311 } else {
312 READ_AHEAD_SIZE
313 };
314 default_page_in(self, range, read_ahead_size)
316 }
317
318 fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
319 unreachable!();
320 }
321
322 fn on_zero_children(self: Arc<Self>) {
323 self.open_count_sub_one();
324 }
325
326 fn byte_size(&self) -> u64 {
327 self.uncompressed_size
328 }
329
330 async fn aligned_read(&self, range: Range<u64>) -> Result<Buffer<'_>, Error> {
331 ensure!(range.start < self.uncompressed_size, FxfsError::InvalidArgs);
334 self.record_page_fault_metric(&range);
335
336 let mut buffer = self.allocate_buffer(range.end - range.start).await;
337 let unaligned_bytes =
338 (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
339 match &self.compression_info {
340 None => self.read_blocks(range.start, buffer.as_mut()).await?,
341 Some(compression_info) => {
342 let compressed_offsets =
343 match compression_info.compressed_range_for_uncompressed_range(&range)? {
344 (start, None) => start..self.stored_size,
345 (start, Some(end)) => start..end.get(),
346 };
347 let bs = self.handle.block_size();
348 let aligned = round_down(compressed_offsets.start, bs)
349 ..round_up(compressed_offsets.end, bs).unwrap();
350 let mut compressed_buf = self.allocate_buffer(aligned.end - aligned.start).await;
351
352 let mut decompression_errors = 0;
353 loop {
354 try_join!(self.read_blocks(aligned.start, compressed_buf.as_mut()), async {
355 buffer
356 .allocator()
357 .buffer_source()
358 .commit_range(buffer.range())
359 .map_err(|e| e.into())
360 })
361 .with_context(|| {
362 format!(
363 "Failed to read compressed range {:?}, len {}",
364 aligned, self.stored_size
365 )
366 })?;
367 let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
368 ..(compressed_offsets.end - aligned.start) as usize;
369
370 let buf = buffer.as_mut_slice();
371 let decompression_result = {
372 fxfs_trace::duration!("blob-decompress", "len" => unaligned_bytes);
373 compression_info.decompress(
374 &compressed_buf.as_slice()[compressed_buf_range],
375 &mut buf[..unaligned_bytes],
376 range.start,
377 )
378 };
379 match decompression_result {
380 Ok(()) => break,
381 Err(error) => {
382 record_decompression_error_crash_report(
383 compressed_buf.as_slice(),
384 &range,
385 &compressed_offsets,
386 &self.merkle_root,
387 )
388 .await;
389 decompression_errors += 1;
390 if decompression_errors == 2 {
391 bail!(
392 anyhow!(FxfsError::IntegrityError)
393 .context(format!("Decompression error: {error:?}"))
394 );
395 } else {
396 warn!(error:?; "Decompression error; retrying");
397 }
398 }
399 }
400 } if decompression_errors > 0 {
402 info!("Read succeeded on second attempt");
403 }
404 }
405 };
406 {
407 fxfs_trace::duration!("blob-verify", "len" => unaligned_bytes);
410 self.merkle_verifier
411 .verify(range.start as usize, &buffer.as_slice()[..unaligned_bytes])?;
412 }
413 buffer.as_mut_slice()[unaligned_bytes..].fill(0);
415 Ok(buffer)
416 }
417}
418
419pub struct CompressionInfo {
420 chunk_size: u64,
421 small_offsets: Box<[u32]>,
424 large_offsets: Box<[u64]>,
425 decompressor: ThreadLocalDecompressor,
426}
427
428impl CompressionInfo {
429 pub fn new(
430 chunk_size: u64,
431 offsets: &[u64],
432 compression_algorithm: CompressionAlgorithm,
433 ) -> Result<Self, Error> {
434 let decompressor = compression_algorithm.thread_local_decompressor();
435 if chunk_size == 0 {
436 return Err(FxfsError::IntegrityError.into());
437 } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
438 Err(FxfsError::IntegrityError.into())
440 } else if !offsets.array_windows().all(|[a, b]| a < b) {
441 Err(FxfsError::IntegrityError.into())
443 } else if offsets.len() == 1 {
444 Ok(Self {
447 chunk_size,
448 small_offsets: Box::default(),
449 large_offsets: Box::default(),
450 decompressor,
451 })
452 } else if *offsets.last().unwrap() <= u32::MAX as u64 {
453 Ok(Self {
456 chunk_size,
457 small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
458 large_offsets: Box::default(),
459 decompressor,
460 })
461 } else {
462 let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
464 Ok(Self {
465 chunk_size,
466 small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
467 large_offsets: offsets[partition_point..].into(),
468 decompressor,
469 })
470 }
471 }
472
473 fn compressed_range_for_uncompressed_range(
474 &self,
475 range: &Range<u64>,
476 ) -> Result<(u64, Option<NonZero<u64>>), Error> {
477 ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
478 ensure!(range.start < range.end, FxfsError::Inconsistent);
479
480 let start_chunk_index = (range.start / self.chunk_size) as usize;
481 let start_offset = self
482 .compressed_offset_for_chunk_index(start_chunk_index)
483 .ok_or(FxfsError::OutOfRange)?;
484
485 let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
487 let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
488 None => None,
489 Some(offset) => {
490 ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
492 Some(NonZero::new(offset).unwrap())
495 }
496 };
497 Ok((start_offset, end_offset))
498 }
499
500 fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
501 if chunk_index == 0 {
503 Some(0)
504 } else if chunk_index - 1 < self.small_offsets.len() {
505 Some(self.small_offsets[chunk_index - 1] as u64)
506 } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
507 Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
508 } else {
509 None
510 }
511 }
512
513 fn decompress(
519 &self,
520 mut src: &[u8],
521 mut dst: &mut [u8],
522 dst_start_offset: u64,
523 ) -> Result<(), Error> {
524 ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
525
526 let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
527 let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
528 let mut start_offset = self
529 .compressed_offset_for_chunk_index(start_chunk_index)
530 .ok_or(FxfsError::Inconsistent)?;
531
532 for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
534 match self.compressed_offset_for_chunk_index(chunk_index + 1) {
535 Some(end_offset) => {
536 let (to_decompress, src_remaining) = src
537 .split_at_checked((end_offset - start_offset) as usize)
538 .ok_or(FxfsError::Inconsistent)?;
539 let (to_decompress_into, dst_remaining) = dst
540 .split_at_mut_checked(self.chunk_size as usize)
541 .ok_or(FxfsError::Inconsistent)?;
542
543 let decompressed_bytes = self.decompressor.decompress_into(
544 to_decompress,
545 to_decompress_into,
546 chunk_index,
547 )?;
548 ensure!(
549 decompressed_bytes == to_decompress_into.len(),
550 FxfsError::Inconsistent
551 );
552 src = src_remaining;
553 dst = dst_remaining;
554 start_offset = end_offset;
555 }
556 None => {
557 let decompressed_bytes =
558 self.decompressor.decompress_into(src, dst, chunk_index)?;
559 ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
560 }
561 }
562 }
563
564 Ok(())
565 }
566}
567
568fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
569 let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
570 let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
571 let name = zx::Name::new(&name).unwrap();
572 vmo.set_name(&name).unwrap();
573}
574
575fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
576 if let Some(compression_info) = compression_info {
577 read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
578 } else {
579 READ_AHEAD_SIZE
580 }
581}
582
583fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
584 if chunk_size >= suggested_read_ahead_size {
585 chunk_size
586 } else {
587 round_down(suggested_read_ahead_size, chunk_size)
588 }
589}
590
591async fn record_decompression_error_crash_report(
592 compressed_buf: &[u8],
593 uncompressed_offsets: &Range<u64>,
594 compressed_offsets: &Range<u64>,
595 merkle_root: &Hash,
596) {
597 static DONE_ONCE: AtomicBool = AtomicBool::new(false);
598 if !DONE_ONCE.swap(true, Ordering::Relaxed) {
599 if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
600 let size = compressed_buf.len() as u64;
601 let vmo = zx::Vmo::create(size).unwrap();
602 vmo.write(compressed_buf, 0).unwrap();
603 if let Err(e) = proxy
604 .file_report(CrashReport {
605 program_name: Some("fxfs".to_string()),
606 crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
607 is_fatal: Some(false),
608 annotations: Some(vec![
609 Annotation {
610 key: "fxfs.range".to_string(),
611 value: format!("{:?}", uncompressed_offsets),
612 },
613 Annotation {
614 key: "fxfs.compressed_offsets".to_string(),
615 value: format!("{:?}", compressed_offsets),
616 },
617 Annotation {
618 key: "fxfs.merkle_root".to_string(),
619 value: format!("{}", merkle_root),
620 },
621 ]),
622 attachments: Some(vec![Attachment {
623 key: "fxfs_compressed_data".to_string(),
624 value: MemBuffer { vmo, size },
625 }]),
626 ..Default::default()
627 })
628 .await
629 {
630 error!(e:?; "Failed to file crash report");
631 } else {
632 warn!("Filed crash report for decompression error");
633 }
634 } else {
635 error!("Failed to connect to crash report service");
636 }
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
644 use crate::fuchsia::pager::PageInRange;
645 use crate::fxblob::testing::open_blob_fixture;
646 use assert_matches::assert_matches;
647 use delivery_blob::CompressionMode;
648 use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
649 use fuchsia_async as fasync;
650 use fuchsia_async::epoch::Epoch;
651 use fxfs_make_blob_image::FxBlobBuilder;
652 use storage_device::DeviceHolder;
653 use storage_device::fake_device::FakeDevice;
654
655 const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
656 const CHUNK_SIZE: usize = 32 * 1024;
657
658 #[fasync::run(10, test)]
659 async fn test_empty_blob() {
660 let fixture = new_blob_fixture().await;
661
662 let data = vec![];
663 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
664 assert_eq!(fixture.read_blob(hash).await, data);
665
666 fixture.close().await;
667 }
668
669 #[fasync::run(10, test)]
670 async fn test_large_blob() {
671 let fixture = new_blob_fixture().await;
672
673 let data = vec![3; 3_000_000];
674 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
675
676 assert_eq!(fixture.read_blob(hash).await, data);
677
678 fixture.close().await;
679 }
680
681 #[fasync::run(10, test)]
682 async fn test_large_compressed_blob() {
683 let fixture = new_blob_fixture().await;
684
685 let data = vec![3; 3_000_000];
686 let hash = fixture.write_blob(&data, CompressionMode::Always).await;
687
688 assert_eq!(fixture.read_blob(hash).await, data);
689
690 fixture.close().await;
691 }
692
693 #[fasync::run(10, test)]
694 async fn test_non_page_aligned_blob() {
695 let fixture = new_blob_fixture().await;
696
697 let page_size = zx::system_get_page_size() as usize;
698 let data = vec![0xffu8; page_size - 1];
699 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
700 assert_eq!(fixture.read_blob(hash).await, data);
701
702 {
703 let vmo = fixture.get_blob_vmo(hash).await;
704 let mut buf = vec![0x11u8; page_size];
705 vmo.read(&mut buf[..], 0).expect("vmo read failed");
706 assert_eq!(data, buf[..data.len()]);
707 assert_eq!(buf[data.len()], 0);
709 }
710
711 fixture.close().await;
712 }
713
714 #[fasync::run(10, test)]
715 async fn test_blob_invalid_contents() {
716 let fixture = new_blob_fixture().await;
717
718 let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
719 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
720 let name = format!("{}", hash);
721
722 {
723 let handle = fixture.get_blob_handle(&name).await;
725 let mut transaction =
726 handle.new_transaction().await.expect("failed to create transaction");
727 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
728 buf.as_mut_slice().fill(0);
729 handle
730 .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
731 .await
732 .expect("txn_write failed");
733 transaction.commit().await.expect("failed to commit transaction");
734 }
735
736 {
737 let blob_vmo = fixture.get_blob_vmo(hash).await;
738 let mut buf = vec![0; BLOCK_SIZE as usize];
739 assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
740 assert_matches!(
741 blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
742 Err(zx::Status::IO_DATA_INTEGRITY)
743 );
744 }
745
746 fixture.close().await;
747 }
748
749 #[fasync::run(10, test)]
750 async fn test_lz4_blob() {
751 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
752 let blob_data = vec![0xAA; 68 * 1024];
753 let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
754 let blob = fxblob_builder
755 .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
756 .unwrap();
757 let blob_hash = blob.hash();
758 fxblob_builder.install_blob(&blob).await.unwrap();
759 let device = fxblob_builder.finalize().await.unwrap().0;
760 device.reopen(false);
761 let fixture = open_blob_fixture(device).await;
762
763 assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
764
765 fixture.close().await;
766 }
767
768 #[fasync::run(10, test)]
769 async fn test_blob_vmos_are_immutable() {
770 let fixture = new_blob_fixture().await;
771
772 let data = vec![0xffu8; 500];
773 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
774 let blob_vmo = fixture.get_blob_vmo(hash).await;
775
776 assert_matches!(blob_vmo.set_size(20), Err(_));
778
779 assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
781
782 assert_matches!(blob_vmo.set_stream_size(20), Err(_));
784
785 fixture.close().await;
786 }
787
788 const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
789 const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
790 const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
791
792 #[fuchsia::test]
793 fn test_compression_info_offsets_must_start_with_zero() {
794 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
795 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
796 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
797 }
798
799 #[fuchsia::test]
800 fn test_compression_info_offsets_must_be_sorted() {
801 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
802 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
803 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
804 }
805
806 #[fuchsia::test]
807 fn test_compression_info_splitting_offsets() {
808 let compression_info =
810 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
811 assert!(compression_info.small_offsets.is_empty());
812 assert!(compression_info.large_offsets.is_empty());
813
814 let compression_info =
816 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
817 assert_eq!(&*compression_info.small_offsets, &[10]);
818 assert!(compression_info.large_offsets.is_empty());
819
820 let compression_info =
822 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
823 assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
824 assert!(compression_info.large_offsets.is_empty());
825
826 let compression_info =
828 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
829 .unwrap();
830 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
831 assert!(compression_info.large_offsets.is_empty());
832
833 let compression_info =
835 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
836 assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
837 assert!(compression_info.large_offsets.is_empty());
838
839 let compression_info =
841 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
842 .unwrap();
843 assert!(compression_info.small_offsets.is_empty());
844 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
845
846 let compression_info = CompressionInfo::new(
848 COMPRESSED_BLOB_CHUNK_SIZE,
849 &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
850 ZSTD,
851 )
852 .unwrap();
853 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
854 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
855
856 let compression_info =
858 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
859 .unwrap();
860 assert!(compression_info.small_offsets.is_empty());
861 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
862
863 let compression_info = CompressionInfo::new(
865 COMPRESSED_BLOB_CHUNK_SIZE,
866 &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
867 ZSTD,
868 )
869 .unwrap();
870 assert!(compression_info.small_offsets.is_empty());
871 assert_eq!(
872 &*compression_info.large_offsets,
873 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
874 );
875
876 let compression_info = CompressionInfo::new(
878 COMPRESSED_BLOB_CHUNK_SIZE,
879 &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
880 ZSTD,
881 )
882 .unwrap();
883 assert_eq!(&*compression_info.small_offsets, &[10, 20]);
884 assert_eq!(
885 &*compression_info.large_offsets,
886 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
887 );
888 }
889
890 #[fuchsia::test]
891 fn test_compression_info_compressed_range_for_uncompressed_range() {
892 fn check_compression_ranges(
893 offsets: &[u64],
894 expected_ranges: &[(u64, Option<u64>)],
895 chunk_size: u64,
896 read_ahead_size: u64,
897 ) {
898 let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
899 for (i, range) in expected_ranges.iter().enumerate() {
900 let i = i as u64;
901 let result = compression_info
902 .compressed_range_for_uncompressed_range(
903 &(i * read_ahead_size..(i + 1) * read_ahead_size),
904 )
905 .unwrap();
906 assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
907 }
908 }
909 check_compression_ranges(
910 &[0, 10, 20, 30],
911 &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
912 COMPRESSED_BLOB_CHUNK_SIZE,
913 COMPRESSED_BLOB_CHUNK_SIZE,
914 );
915 check_compression_ranges(
916 &[0, 10, 20, 30],
917 &[(0, Some(20)), (20, None)],
918 COMPRESSED_BLOB_CHUNK_SIZE,
919 COMPRESSED_BLOB_CHUNK_SIZE * 2,
920 );
921 check_compression_ranges(
922 &[0, 10, 20, 30],
923 &[(0, None)],
924 COMPRESSED_BLOB_CHUNK_SIZE,
925 COMPRESSED_BLOB_CHUNK_SIZE * 4,
926 );
927 check_compression_ranges(
928 &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
929 &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
930 COMPRESSED_BLOB_CHUNK_SIZE,
931 COMPRESSED_BLOB_CHUNK_SIZE * 4,
932 );
933 check_compression_ranges(
934 &[
935 0,
936 10,
937 20,
938 30,
939 MAX_SMALL_OFFSET + 10,
940 MAX_SMALL_OFFSET + 20,
941 MAX_SMALL_OFFSET + 30,
942 MAX_SMALL_OFFSET + 40,
943 MAX_SMALL_OFFSET + 50,
944 ],
945 &[
946 (0, Some(20)),
947 (20, Some(MAX_SMALL_OFFSET + 10)),
948 (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
949 (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
950 ],
951 COMPRESSED_BLOB_CHUNK_SIZE,
952 COMPRESSED_BLOB_CHUNK_SIZE * 2,
953 );
954 }
955
956 #[fuchsia::test]
957 fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
958 let compression_info = CompressionInfo::new(
959 COMPRESSED_BLOB_CHUNK_SIZE,
960 &[
961 0,
962 10,
963 20,
964 30,
965 MAX_SMALL_OFFSET + 10,
966 MAX_SMALL_OFFSET + 20,
967 MAX_SMALL_OFFSET + 30,
968 MAX_SMALL_OFFSET + 40,
969 MAX_SMALL_OFFSET + 50,
970 ],
971 ZSTD,
972 )
973 .unwrap();
974
975 assert!(
977 compression_info
978 .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
979 .is_err()
980 );
981
982 assert!(
984 compression_info
985 .compressed_range_for_uncompressed_range(
986 &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
987 )
988 .is_err()
989 );
990
991 assert!(
993 compression_info
994 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
995 .is_err()
996 );
997 assert!(
998 compression_info
999 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
1000 .is_err()
1001 );
1002 assert!(
1003 compression_info
1004 .compressed_range_for_uncompressed_range(
1005 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
1006 )
1007 .is_err()
1008 );
1009 assert!(
1010 compression_info
1011 .compressed_range_for_uncompressed_range(
1012 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
1013 )
1014 .is_err()
1015 );
1016
1017 assert!(
1019 compression_info
1020 .compressed_range_for_uncompressed_range(
1021 &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
1022 )
1023 .is_ok()
1024 );
1025 }
1026
1027 #[fuchsia::test]
1028 fn test_read_ahead_size_for_chunk_size() {
1029 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
1030 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
1031 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
1032
1033 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
1034 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
1035 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
1036 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
1037
1038 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1039 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1040 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1041 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1042 }
1043
1044 fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1045 let options =
1046 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1047 let mut compressor = options.compressor();
1048 let mut uncompressed_data = Vec::with_capacity(size);
1049 {
1050 let mut run_length = 1;
1051 let mut run_value: u8 = 0;
1052 while uncompressed_data.len() < size {
1053 uncompressed_data
1054 .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1055 run_length = (run_length + 1) % 19 + 1;
1056 run_value = (run_value + 1) % 17;
1057 }
1058 }
1059 let mut compressed_offsets = vec![0];
1060 let mut compressed_data = vec![];
1061 for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1062 let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1063 compressed_data.append(&mut compressed_chunk);
1064 compressed_offsets.push(compressed_data.len() as u64);
1065 }
1066 compressed_offsets.pop();
1067 (
1068 CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1069 .unwrap(),
1070 compressed_data,
1071 uncompressed_data,
1072 )
1073 }
1074
1075 #[fuchsia::test]
1076 fn test_compression_info_decompress_single_chunk() {
1077 let (compression_info, compressed_data, uncompressed_data) =
1078 build_compression_info(CHUNK_SIZE);
1079 let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1080
1081 compression_info
1082 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1083 .expect("failed to decompress");
1084 assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1085
1086 compression_info
1088 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1089 .expect_err("decompression should fail");
1090
1091 compression_info
1093 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1094 .expect_err("decompression should fail");
1095 }
1096
1097 #[fuchsia::test]
1098 fn test_compression_info_decompress_multiple_chunks() {
1099 fn slice_for_chunks<'a>(
1100 compressed_data: &'a [u8],
1101 compression_info: &CompressionInfo,
1102 chunks: Range<u64>,
1103 ) -> &'a [u8] {
1104 let (start, end) = compression_info
1105 .compressed_range_for_uncompressed_range(
1106 &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1107 )
1108 .unwrap();
1109 let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1110 &compressed_data[start as usize..end as usize]
1111 }
1112
1113 const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1114 let (compression_info, compressed_data, uncompressed_data) =
1115 build_compression_info(BLOB_SIZE);
1116 let mut decompressed_data = vec![0u8; BLOB_SIZE];
1117
1118 compression_info
1120 .decompress(&compressed_data, &mut decompressed_data, 0)
1121 .expect("failed to decompress");
1122 assert_eq!(uncompressed_data, decompressed_data);
1123
1124 compression_info
1126 .decompress(
1127 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1128 &mut decompressed_data[0..CHUNK_SIZE * 4],
1129 0,
1130 )
1131 .expect("failed to decompress");
1132 assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1133
1134 compression_info
1136 .decompress(
1137 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1138 &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1139 0,
1140 )
1141 .expect_err("decompression should fail");
1142
1143 compression_info
1145 .decompress(
1146 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1147 &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1148 0,
1149 )
1150 .expect_err("decompression should fail");
1151
1152 let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1154 compression_info
1155 .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1156 .expect("failed to decompress");
1157 assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1158
1159 compression_info
1161 .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1162 .expect_err("decompression should fail");
1163
1164 compression_info
1166 .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1167 .expect_err("decompression should fail");
1168 }
1169
1170 #[fasync::run(10, test)]
1171 async fn test_refault_metric() {
1172 let fixture = new_blob_fixture().await;
1173 {
1174 let volume = fixture.volume().volume().clone();
1175 const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1176 let data = vec![0xffu8; FILE_SIZE as usize];
1177 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1178
1179 let blob = fixture.get_blob(hash).await.unwrap();
1180 assert_eq!(blob.chunks_supplied.len(), 4);
1181 assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1183
1184 blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1185
1186 assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1187
1188 blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1189 assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1190
1191 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1193
1194 blob.clone().page_in(PageInRange::new(
1199 FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1200 blob.clone(),
1201 Epoch::global().guard(),
1202 ));
1203 Epoch::global().barrier().await;
1204
1205 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1206 }
1207
1208 fixture.close().await;
1209 }
1210}