1use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13 MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{BASE_READ_AHEAD_SIZE, FxVolume};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38pub const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
39
40const PURGED: usize = 1 << (usize::BITS - 1);
44
45#[derive(ToWeakNode)]
47pub struct FxBlob {
48 handle: DataObjectHandle<FxVolume>,
49 vmo: zx::Vmo,
50 open_count: AtomicUsize,
51 merkle_root: Hash,
52 merkle_verifier: ReadSizedMerkleVerifier,
53 compression_info: Option<CompressionInfo>,
54 uncompressed_size: u64, pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
56 chunks_supplied: AtomicBitVec,
57}
58
59impl FxBlob {
60 pub fn new(
61 handle: DataObjectHandle<FxVolume>,
62 merkle_root: Hash,
63 merkle_verifier: MerkleVerifier,
64 compression_info: Option<CompressionInfo>,
65 uncompressed_size: u64,
66 ) -> Result<Arc<Self>, Error> {
67 let min_chunk_size = min_chunk_size(&compression_info);
68 let merkle_verifier =
69 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
70 let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
71
72 Ok(Arc::new_cyclic(|weak| {
73 let (vmo, pager_packet_receiver_registration) = handle
74 .owner()
75 .pager()
76 .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
77 .unwrap();
78 set_vmo_name(&vmo, &merkle_root);
79 Self {
80 handle,
81 vmo,
82 open_count: AtomicUsize::new(0),
83 merkle_root,
84 merkle_verifier,
85 compression_info,
86 uncompressed_size,
87 pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
88 chunks_supplied,
89 }
90 }))
91 }
92
93 pub fn overwrite_me(
95 self: &Arc<Self>,
96 handle: DataObjectHandle<FxVolume>,
97 merkle_verifier: MerkleVerifier,
98 compression_info: Option<CompressionInfo>,
99 ) -> Arc<Self> {
100 let min_chunk_size = min_chunk_size(&compression_info);
101 let merkle_verifier =
102 ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
103 .expect("The chunk size should have been validated by the delivery blob parser");
104 let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
107 let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
108
109 let new_blob = Arc::new(Self {
110 handle,
111 vmo,
112 open_count: AtomicUsize::new(0),
113 merkle_root: self.merkle_root,
114 merkle_verifier,
115 compression_info,
116 uncompressed_size: self.uncompressed_size,
117 pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
118 chunks_supplied,
119 });
120
121 self.handle.owner().cache().remove(self.as_ref());
124
125 let receiver_lock =
128 self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
129 if receiver_lock.is_strong() {
130 new_blob.open_count_add_one();
135 let old_blob = self.clone();
136 Epoch::global().defer(move || old_blob.open_count_sub_one());
137 }
138 new_blob
139 }
140
141 pub fn root(&self) -> Hash {
142 self.merkle_root
143 }
144
145 fn record_page_fault_metric(&self, range: &Range<u64>) {
146 let chunk_size: u64 = min_chunk_size(&self.compression_info);
147
148 let first_chunk = range.start / chunk_size;
149 let last_chunk = range.end.div_ceil(chunk_size);
151
152 let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
153
154 if supplied_count > 0 {
155 self.handle
156 .owner()
157 .blob_resupplied_count()
158 .increment(supplied_count, Ordering::Relaxed);
159 }
160 }
161}
162
163impl Drop for FxBlob {
164 fn drop(&mut self) {
165 let volume = self.handle.owner();
166 volume.cache().remove(self);
167 }
168}
169
170impl OpenedNode<FxBlob> {
171 pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
178 let blob = self.0.as_ref();
179 let child_vmo = blob.vmo.create_child(
180 zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
181 0,
182 0,
183 )?;
184 if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
185 blob.open_count_add_one();
189 }
190 Ok(child_vmo)
191 }
192}
193
194impl FxNode for FxBlob {
195 fn object_id(&self) -> u64 {
196 self.handle.object_id()
197 }
198
199 fn parent(&self) -> Option<Arc<FxDirectory>> {
200 unreachable!(); }
202
203 fn set_parent(&self, _parent: Arc<FxDirectory>) {
204 }
206
207 fn open_count_add_one(&self) {
208 let old = self.open_count.fetch_add(1, Ordering::Relaxed);
209 assert!(old != PURGED && old != PURGED - 1);
210 }
211
212 fn open_count_sub_one(self: Arc<Self>) {
213 let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
214 assert!(old & !PURGED > 0);
215 if old == PURGED + 1 {
216 let store = self.handle.store();
217 store
218 .filesystem()
219 .graveyard()
220 .queue_tombstone_object(store.store_object_id(), self.object_id());
221 }
222 }
223
224 fn object_descriptor(&self) -> ObjectDescriptor {
225 ObjectDescriptor::File
226 }
227
228 fn terminate(&self) {
229 self.pager_packet_receiver_registration.stop_watching_for_zero_children();
230 }
231
232 fn mark_to_be_purged(&self) {
233 let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
234 assert!(old & PURGED == 0);
235 if old == 0 {
236 let store = self.handle.store();
237 store
238 .filesystem()
239 .graveyard()
240 .queue_tombstone_object(store.store_object_id(), self.object_id());
241 }
242 }
243}
244
245impl PagerBacked for FxBlob {
246 fn pager(&self) -> &crate::pager::Pager {
247 self.handle.owner().pager()
248 }
249
250 fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
251 &self.pager_packet_receiver_registration
252 }
253
254 fn vmo(&self) -> &zx::Vmo {
255 &self.vmo
256 }
257
258 fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
259 let read_ahead_size = self.handle.owner().read_ahead_size();
260 let read_ahead_size = if let Some(compression_info) = &self.compression_info {
261 read_ahead_size_for_chunk_size(compression_info.chunk_size, read_ahead_size)
262 } else {
263 read_ahead_size
264 };
265 default_page_in(self, range, read_ahead_size)
267 }
268
269 fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
270 unreachable!();
271 }
272
273 fn on_zero_children(self: Arc<Self>) {
274 self.open_count_sub_one();
275 }
276
277 fn byte_size(&self) -> u64 {
278 self.uncompressed_size
279 }
280
281 async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
282 self.record_page_fault_metric(&range);
283
284 let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
285 let read = match &self.compression_info {
286 None => self.handle.read(range.start, buffer.as_mut()).await?,
287 Some(compression_info) => {
288 let compressed_offsets =
289 match compression_info.compressed_range_for_uncompressed_range(&range)? {
290 (start, None) => start..self.handle.get_size(),
291 (start, Some(end)) => start..end.get(),
292 };
293 let bs = self.handle.block_size();
294 let aligned = round_down(compressed_offsets.start, bs)
295 ..round_up(compressed_offsets.end, bs).unwrap();
296 let mut compressed_buf =
297 self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
298
299 let mut decompression_errors = 0;
300 let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
301 loop {
302 let (read, _) = try_join!(
303 self.handle.read(aligned.start, compressed_buf.as_mut()),
304 async {
305 buffer
306 .allocator()
307 .buffer_source()
308 .commit_range(buffer.range())
309 .map_err(|e| e.into())
310 }
311 )
312 .with_context(|| {
313 format!(
314 "Failed to read compressed range {:?}, len {}",
315 aligned,
316 self.handle.get_size()
317 )
318 })?;
319 let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
320 ..(compressed_offsets.end - aligned.start) as usize;
321 ensure!(
322 read >= compressed_buf_range.end - compressed_buf_range.start,
323 anyhow!(FxfsError::Inconsistent).context(format!(
324 "Unexpected EOF, read {}, but expected {}",
325 read,
326 compressed_buf_range.end - compressed_buf_range.start,
327 ))
328 );
329
330 let buf = buffer.as_mut_slice();
331 let decompression_result = {
332 fxfs_trace::duration!("blob-decompress", "len" => len);
333 compression_info.decompress(
334 &compressed_buf.as_slice()[compressed_buf_range],
335 &mut buf[..len],
336 range.start,
337 )
338 };
339 match decompression_result {
340 Ok(()) => break,
341 Err(error) => {
342 record_decompression_error_crash_report(
343 compressed_buf.as_slice(),
344 &range,
345 &compressed_offsets,
346 &self.merkle_root,
347 )
348 .await;
349 decompression_errors += 1;
350 if decompression_errors == 2 {
351 bail!(
352 anyhow!(FxfsError::IntegrityError)
353 .context(format!("Decompression error: {error:?}"))
354 );
355 } else {
356 warn!(error:?; "Decompression error; retrying");
357 }
358 }
359 }
360 } if decompression_errors > 0 {
362 info!("Read succeeded on second attempt");
363 }
364 len
365 }
366 };
367 {
368 fxfs_trace::duration!("blob-verify", "len" => read);
371 self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
372 }
373 buffer.as_mut_slice()[read..].fill(0);
375 Ok(buffer)
376 }
377}
378
379pub struct CompressionInfo {
380 chunk_size: u64,
381 small_offsets: Box<[u32]>,
384 large_offsets: Box<[u64]>,
385 decompressor: ThreadLocalDecompressor,
386}
387
388impl CompressionInfo {
389 pub fn new(
390 chunk_size: u64,
391 offsets: &[u64],
392 compression_algorithm: CompressionAlgorithm,
393 ) -> Result<Self, Error> {
394 let decompressor = compression_algorithm.thread_local_decompressor();
395 if chunk_size == 0 {
396 return Err(FxfsError::IntegrityError.into());
397 } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
398 Err(FxfsError::IntegrityError.into())
400 } else if !offsets.windows(2).all(|window| window[0] < window[1]) {
401 Err(FxfsError::IntegrityError.into())
403 } else if offsets.len() == 1 {
404 Ok(Self {
407 chunk_size,
408 small_offsets: Box::default(),
409 large_offsets: Box::default(),
410 decompressor,
411 })
412 } else if *offsets.last().unwrap() <= u32::MAX as u64 {
413 Ok(Self {
416 chunk_size,
417 small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
418 large_offsets: Box::default(),
419 decompressor,
420 })
421 } else {
422 let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
424 Ok(Self {
425 chunk_size,
426 small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
427 large_offsets: offsets[partition_point..].into(),
428 decompressor,
429 })
430 }
431 }
432
433 fn compressed_range_for_uncompressed_range(
434 &self,
435 range: &Range<u64>,
436 ) -> Result<(u64, Option<NonZero<u64>>), Error> {
437 ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
438 ensure!(range.start < range.end, FxfsError::Inconsistent);
439
440 let start_chunk_index = (range.start / self.chunk_size) as usize;
441 let start_offset = self
442 .compressed_offset_for_chunk_index(start_chunk_index)
443 .ok_or(FxfsError::OutOfRange)?;
444
445 let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
447 let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
448 None => None,
449 Some(offset) => {
450 ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
452 Some(NonZero::new(offset).unwrap())
455 }
456 };
457 Ok((start_offset, end_offset))
458 }
459
460 fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
461 if chunk_index == 0 {
463 Some(0)
464 } else if chunk_index - 1 < self.small_offsets.len() {
465 Some(self.small_offsets[chunk_index - 1] as u64)
466 } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
467 Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
468 } else {
469 None
470 }
471 }
472
473 fn decompress(
479 &self,
480 mut src: &[u8],
481 mut dst: &mut [u8],
482 dst_start_offset: u64,
483 ) -> Result<(), Error> {
484 ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
485
486 let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
487 let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
488 let mut start_offset = self
489 .compressed_offset_for_chunk_index(start_chunk_index)
490 .ok_or(FxfsError::Inconsistent)?;
491
492 for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
494 match self.compressed_offset_for_chunk_index(chunk_index + 1) {
495 Some(end_offset) => {
496 let (to_decompress, src_remaining) = src
497 .split_at_checked((end_offset - start_offset) as usize)
498 .ok_or(FxfsError::Inconsistent)?;
499 let (to_decompress_into, dst_remaining) = dst
500 .split_at_mut_checked(self.chunk_size as usize)
501 .ok_or(FxfsError::Inconsistent)?;
502
503 let decompressed_bytes = self.decompressor.decompress_into(
504 to_decompress,
505 to_decompress_into,
506 chunk_index,
507 )?;
508 ensure!(
509 decompressed_bytes == to_decompress_into.len(),
510 FxfsError::Inconsistent
511 );
512 src = src_remaining;
513 dst = dst_remaining;
514 start_offset = end_offset;
515 }
516 None => {
517 let decompressed_bytes =
518 self.decompressor.decompress_into(src, dst, chunk_index)?;
519 ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
520 }
521 }
522 }
523
524 Ok(())
525 }
526}
527
528fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
529 let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
530 let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
531 let name = zx::Name::new(&name).unwrap();
532 vmo.set_name(&name).unwrap();
533}
534
535fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
536 if let Some(compression_info) = compression_info {
537 read_ahead_size_for_chunk_size(compression_info.chunk_size, BASE_READ_AHEAD_SIZE)
538 } else {
539 BASE_READ_AHEAD_SIZE
540 }
541}
542
543fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
544 if chunk_size >= suggested_read_ahead_size {
545 chunk_size
546 } else {
547 round_down(suggested_read_ahead_size, chunk_size)
548 }
549}
550
551async fn record_decompression_error_crash_report(
552 compressed_buf: &[u8],
553 uncompressed_offsets: &Range<u64>,
554 compressed_offsets: &Range<u64>,
555 merkle_root: &Hash,
556) {
557 static DONE_ONCE: AtomicBool = AtomicBool::new(false);
558 if !DONE_ONCE.swap(true, Ordering::Relaxed) {
559 if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
560 let size = compressed_buf.len() as u64;
561 let vmo = zx::Vmo::create(size).unwrap();
562 vmo.write(compressed_buf, 0).unwrap();
563 if let Err(e) = proxy
564 .file_report(CrashReport {
565 program_name: Some("fxfs".to_string()),
566 crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
567 is_fatal: Some(false),
568 annotations: Some(vec![
569 Annotation {
570 key: "fxfs.range".to_string(),
571 value: format!("{:?}", uncompressed_offsets),
572 },
573 Annotation {
574 key: "fxfs.compressed_offsets".to_string(),
575 value: format!("{:?}", compressed_offsets),
576 },
577 Annotation {
578 key: "fxfs.merkle_root".to_string(),
579 value: format!("{}", merkle_root),
580 },
581 ]),
582 attachments: Some(vec![Attachment {
583 key: "fxfs_compressed_data".to_string(),
584 value: Buffer { vmo, size },
585 }]),
586 ..Default::default()
587 })
588 .await
589 {
590 error!(e:?; "Failed to file crash report");
591 } else {
592 warn!("Filed crash report for decompression error");
593 }
594 } else {
595 error!("Failed to connect to crash report service");
596 }
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
604 use crate::fuchsia::memory_pressure::MemoryPressureLevel;
605 use crate::fuchsia::pager::PageInRange;
606 use crate::fuchsia::volume::{MAX_READ_AHEAD_SIZE, MemoryPressureConfig};
607 use crate::fxblob::testing::open_blob_fixture;
608 use assert_matches::assert_matches;
609 use delivery_blob::CompressionMode;
610 use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
611 use fuchsia_async as fasync;
612 use fuchsia_async::epoch::Epoch;
613 use fxfs_make_blob_image::FxBlobBuilder;
614 use std::time::Duration;
615 use storage_device::DeviceHolder;
616 use storage_device::fake_device::FakeDevice;
617
618 const CHUNK_SIZE: usize = 32 * 1024;
619
620 #[fasync::run(10, test)]
621 async fn test_empty_blob() {
622 let fixture = new_blob_fixture().await;
623
624 let data = vec![];
625 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
626 assert_eq!(fixture.read_blob(hash).await, data);
627
628 fixture.close().await;
629 }
630
631 #[fasync::run(10, test)]
632 async fn test_large_blob() {
633 let fixture = new_blob_fixture().await;
634
635 let data = vec![3; 3_000_000];
636 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
637
638 assert_eq!(fixture.read_blob(hash).await, data);
639
640 fixture.close().await;
641 }
642
643 #[fasync::run(10, test)]
644 async fn test_large_compressed_blob() {
645 let fixture = new_blob_fixture().await;
646
647 let data = vec![3; 3_000_000];
648 let hash = fixture.write_blob(&data, CompressionMode::Always).await;
649
650 assert_eq!(fixture.read_blob(hash).await, data);
651
652 fixture.close().await;
653 }
654
655 #[fasync::run(10, test)]
656 async fn test_non_page_aligned_blob() {
657 let fixture = new_blob_fixture().await;
658
659 let page_size = zx::system_get_page_size() as usize;
660 let data = vec![0xffu8; page_size - 1];
661 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
662 assert_eq!(fixture.read_blob(hash).await, data);
663
664 {
665 let vmo = fixture.get_blob_vmo(hash).await;
666 let mut buf = vec![0x11u8; page_size];
667 vmo.read(&mut buf[..], 0).expect("vmo read failed");
668 assert_eq!(data, buf[..data.len()]);
669 assert_eq!(buf[data.len()], 0);
671 }
672
673 fixture.close().await;
674 }
675
676 #[fasync::run(10, test)]
677 async fn test_blob_invalid_contents() {
678 let fixture = new_blob_fixture().await;
679
680 let data = vec![0xffu8; (MAX_READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
681 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
682 let name = format!("{}", hash);
683
684 {
685 let handle = fixture.get_blob_handle(&name).await;
687 let mut transaction =
688 handle.new_transaction().await.expect("failed to create transaction");
689 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
690 buf.as_mut_slice().fill(0);
691 handle
692 .txn_write(&mut transaction, MAX_READ_AHEAD_SIZE, buf.as_ref())
693 .await
694 .expect("txn_write failed");
695 transaction.commit().await.expect("failed to commit transaction");
696 }
697
698 {
699 let blob_vmo = fixture.get_blob_vmo(hash).await;
700 let mut buf = vec![0; BLOCK_SIZE as usize];
701 assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
702 assert_matches!(
703 blob_vmo.read(&mut buf[..], MAX_READ_AHEAD_SIZE),
704 Err(zx::Status::IO_DATA_INTEGRITY)
705 );
706 }
707
708 fixture.close().await;
709 }
710
711 #[fasync::run(10, test)]
712 async fn test_lz4_blob() {
713 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
714 let blob_data = vec![0xAA; 68 * 1024];
715 let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
716 let blob = fxblob_builder
717 .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
718 .unwrap();
719 let blob_hash = blob.hash();
720 fxblob_builder.install_blob(&blob).await.unwrap();
721 let device = fxblob_builder.finalize().await.unwrap().0;
722 device.reopen(false);
723 let fixture = open_blob_fixture(device).await;
724
725 assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
726
727 fixture.close().await;
728 }
729
730 #[fasync::run(10, test)]
731 async fn test_blob_vmos_are_immutable() {
732 let fixture = new_blob_fixture().await;
733
734 let data = vec![0xffu8; 500];
735 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
736 let blob_vmo = fixture.get_blob_vmo(hash).await;
737
738 assert_matches!(blob_vmo.set_size(20), Err(_));
740
741 assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
743
744 assert_matches!(blob_vmo.set_stream_size(20), Err(_));
746
747 fixture.close().await;
748 }
749
750 const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
751 const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
752 const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
753
754 #[fuchsia::test]
755 fn test_compression_info_offsets_must_start_with_zero() {
756 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
757 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
758 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
759 }
760
761 #[fuchsia::test]
762 fn test_compression_info_offsets_must_be_sorted() {
763 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
764 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
765 assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
766 }
767
768 #[fuchsia::test]
769 fn test_compression_info_splitting_offsets() {
770 let compression_info =
772 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
773 assert!(compression_info.small_offsets.is_empty());
774 assert!(compression_info.large_offsets.is_empty());
775
776 let compression_info =
778 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
779 assert_eq!(&*compression_info.small_offsets, &[10]);
780 assert!(compression_info.large_offsets.is_empty());
781
782 let compression_info =
784 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
785 assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
786 assert!(compression_info.large_offsets.is_empty());
787
788 let compression_info =
790 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
791 .unwrap();
792 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
793 assert!(compression_info.large_offsets.is_empty());
794
795 let compression_info =
797 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
798 assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
799 assert!(compression_info.large_offsets.is_empty());
800
801 let compression_info =
803 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
804 .unwrap();
805 assert!(compression_info.small_offsets.is_empty());
806 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
807
808 let compression_info = CompressionInfo::new(
810 COMPRESSED_BLOB_CHUNK_SIZE,
811 &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
812 ZSTD,
813 )
814 .unwrap();
815 assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
816 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
817
818 let compression_info =
820 CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
821 .unwrap();
822 assert!(compression_info.small_offsets.is_empty());
823 assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
824
825 let compression_info = CompressionInfo::new(
827 COMPRESSED_BLOB_CHUNK_SIZE,
828 &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
829 ZSTD,
830 )
831 .unwrap();
832 assert!(compression_info.small_offsets.is_empty());
833 assert_eq!(
834 &*compression_info.large_offsets,
835 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
836 );
837
838 let compression_info = CompressionInfo::new(
840 COMPRESSED_BLOB_CHUNK_SIZE,
841 &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
842 ZSTD,
843 )
844 .unwrap();
845 assert_eq!(&*compression_info.small_offsets, &[10, 20]);
846 assert_eq!(
847 &*compression_info.large_offsets,
848 &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
849 );
850 }
851
852 #[fuchsia::test]
853 fn test_compression_info_compressed_range_for_uncompressed_range() {
854 fn check_compression_ranges(
855 offsets: &[u64],
856 expected_ranges: &[(u64, Option<u64>)],
857 chunk_size: u64,
858 read_ahead_size: u64,
859 ) {
860 let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
861 for (i, range) in expected_ranges.iter().enumerate() {
862 let i = i as u64;
863 let result = compression_info
864 .compressed_range_for_uncompressed_range(
865 &(i * read_ahead_size..(i + 1) * read_ahead_size),
866 )
867 .unwrap();
868 assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
869 }
870 }
871 check_compression_ranges(
872 &[0, 10, 20, 30],
873 &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
874 COMPRESSED_BLOB_CHUNK_SIZE,
875 COMPRESSED_BLOB_CHUNK_SIZE,
876 );
877 check_compression_ranges(
878 &[0, 10, 20, 30],
879 &[(0, Some(20)), (20, None)],
880 COMPRESSED_BLOB_CHUNK_SIZE,
881 COMPRESSED_BLOB_CHUNK_SIZE * 2,
882 );
883 check_compression_ranges(
884 &[0, 10, 20, 30],
885 &[(0, None)],
886 COMPRESSED_BLOB_CHUNK_SIZE,
887 COMPRESSED_BLOB_CHUNK_SIZE * 4,
888 );
889 check_compression_ranges(
890 &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
891 &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
892 COMPRESSED_BLOB_CHUNK_SIZE,
893 COMPRESSED_BLOB_CHUNK_SIZE * 4,
894 );
895 check_compression_ranges(
896 &[
897 0,
898 10,
899 20,
900 30,
901 MAX_SMALL_OFFSET + 10,
902 MAX_SMALL_OFFSET + 20,
903 MAX_SMALL_OFFSET + 30,
904 MAX_SMALL_OFFSET + 40,
905 MAX_SMALL_OFFSET + 50,
906 ],
907 &[
908 (0, Some(20)),
909 (20, Some(MAX_SMALL_OFFSET + 10)),
910 (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
911 (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
912 ],
913 COMPRESSED_BLOB_CHUNK_SIZE,
914 COMPRESSED_BLOB_CHUNK_SIZE * 2,
915 );
916 }
917
918 #[fuchsia::test]
919 fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
920 let compression_info = CompressionInfo::new(
921 COMPRESSED_BLOB_CHUNK_SIZE,
922 &[
923 0,
924 10,
925 20,
926 30,
927 MAX_SMALL_OFFSET + 10,
928 MAX_SMALL_OFFSET + 20,
929 MAX_SMALL_OFFSET + 30,
930 MAX_SMALL_OFFSET + 40,
931 MAX_SMALL_OFFSET + 50,
932 ],
933 ZSTD,
934 )
935 .unwrap();
936
937 assert!(
939 compression_info
940 .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
941 .is_err()
942 );
943
944 assert!(
946 compression_info
947 .compressed_range_for_uncompressed_range(
948 &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
949 )
950 .is_err()
951 );
952
953 assert!(
955 compression_info
956 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
957 .is_err()
958 );
959 assert!(
960 compression_info
961 .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
962 .is_err()
963 );
964 assert!(
965 compression_info
966 .compressed_range_for_uncompressed_range(
967 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
968 )
969 .is_err()
970 );
971 assert!(
972 compression_info
973 .compressed_range_for_uncompressed_range(
974 &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
975 )
976 .is_err()
977 );
978
979 assert!(
981 compression_info
982 .compressed_range_for_uncompressed_range(
983 &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
984 )
985 .is_ok()
986 );
987 }
988
989 #[fuchsia::test]
990 fn test_read_ahead_size_for_chunk_size() {
991 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
992 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
993 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
994
995 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
996 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
997 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
998 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
999
1000 assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1001 assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1002 assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1003 assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1004 }
1005
1006 fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1007 let options =
1008 ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1009 let mut compressor = options.compressor();
1010 let mut uncompressed_data = Vec::with_capacity(size);
1011 {
1012 let mut run_length = 1;
1013 let mut run_value: u8 = 0;
1014 while uncompressed_data.len() < size {
1015 uncompressed_data
1016 .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1017 run_length = (run_length + 1) % 19 + 1;
1018 run_value = (run_value + 1) % 17;
1019 }
1020 }
1021 let mut compressed_offsets = vec![0];
1022 let mut compressed_data = vec![];
1023 for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1024 let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1025 compressed_data.append(&mut compressed_chunk);
1026 compressed_offsets.push(compressed_data.len() as u64);
1027 }
1028 compressed_offsets.pop();
1029 (
1030 CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1031 .unwrap(),
1032 compressed_data,
1033 uncompressed_data,
1034 )
1035 }
1036
1037 #[fuchsia::test]
1038 fn test_compression_info_decompress_single_chunk() {
1039 let (compression_info, compressed_data, uncompressed_data) =
1040 build_compression_info(CHUNK_SIZE);
1041 let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1042
1043 compression_info
1044 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1045 .expect("failed to decompress");
1046 assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1047
1048 compression_info
1050 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1051 .expect_err("decompression should fail");
1052
1053 compression_info
1055 .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1056 .expect_err("decompression should fail");
1057 }
1058
1059 #[fuchsia::test]
1060 fn test_compression_info_decompress_multiple_chunks() {
1061 fn slice_for_chunks<'a>(
1062 compressed_data: &'a [u8],
1063 compression_info: &CompressionInfo,
1064 chunks: Range<u64>,
1065 ) -> &'a [u8] {
1066 let (start, end) = compression_info
1067 .compressed_range_for_uncompressed_range(
1068 &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1069 )
1070 .unwrap();
1071 let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1072 &compressed_data[start as usize..end as usize]
1073 }
1074
1075 const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1076 let (compression_info, compressed_data, uncompressed_data) =
1077 build_compression_info(BLOB_SIZE);
1078 let mut decompressed_data = vec![0u8; BLOB_SIZE];
1079
1080 compression_info
1082 .decompress(&compressed_data, &mut decompressed_data, 0)
1083 .expect("failed to decompress");
1084 assert_eq!(uncompressed_data, decompressed_data);
1085
1086 compression_info
1088 .decompress(
1089 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1090 &mut decompressed_data[0..CHUNK_SIZE * 4],
1091 0,
1092 )
1093 .expect("failed to decompress");
1094 assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1095
1096 compression_info
1098 .decompress(
1099 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1100 &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1101 0,
1102 )
1103 .expect_err("decompression should fail");
1104
1105 compression_info
1107 .decompress(
1108 slice_for_chunks(&compressed_data, &compression_info, 0..4),
1109 &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1110 0,
1111 )
1112 .expect_err("decompression should fail");
1113
1114 let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1116 compression_info
1117 .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1118 .expect("failed to decompress");
1119 assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1120
1121 compression_info
1123 .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1124 .expect_err("decompression should fail");
1125
1126 compression_info
1128 .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1129 .expect_err("decompression should fail");
1130 }
1131
1132 #[fasync::run(10, test)]
1133 async fn test_refault_metric() {
1134 async fn wait(condition: impl Fn() -> bool) -> bool {
1135 let mut wait_count = 0;
1136 while !condition() {
1137 fasync::Timer::new(Duration::from_millis(20)).await;
1138 wait_count += 1;
1139 if wait_count > 100 {
1140 return false;
1141 }
1142 }
1143 return true;
1144 }
1145
1146 let fixture = new_blob_fixture().await;
1147
1148 {
1149 let volume = fixture.volume().volume().clone();
1150 volume.start_background_task(
1151 MemoryPressureConfig::default(),
1152 fixture.volumes_directory().memory_pressure_monitor(),
1153 );
1154
1155 let data = vec![0xffu8; 252 * 1024];
1156 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1157
1158 let blob = fixture.get_blob(hash).await.unwrap();
1159 assert_eq!(blob.chunks_supplied.len(), 8);
1160 assert_eq!(
1162 &blob.chunks_supplied.get(),
1163 &[false, false, false, false, false, false, false, false]
1164 );
1165
1166 blob.vmo.read_to_vec::<u8>(32 * 1024, 4096).unwrap();
1167
1168 assert_eq!(
1169 &blob.chunks_supplied.get(),
1170 &[true, true, true, true, false, false, false, false]
1171 );
1172
1173 fixture
1174 .memory_pressure_proxy()
1175 .on_level_changed(MemoryPressureLevel::Critical)
1176 .await
1177 .expect("Failed to send memory pressure level change");
1178
1179 assert!(
1180 wait(|| volume.read_ahead_size() == BASE_READ_AHEAD_SIZE).await,
1181 "read-ahead size didn't change with memory pressure change"
1182 );
1183
1184 blob.vmo.read_to_vec::<u8>(164 * 1024, 4096).unwrap();
1185 assert_eq!(
1186 &blob.chunks_supplied.get(),
1187 &[true, true, true, true, false, true, false, false]
1188 );
1189
1190 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1192
1193 blob.clone().page_in(PageInRange::new(
1198 32 * 1024..68 * 1024,
1199 blob.clone(),
1200 Epoch::global().guard(),
1201 ));
1202
1203 assert!(wait(|| volume.blob_resupplied_count().read(Ordering::SeqCst) == 2,).await);
1204
1205 assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1206 }
1207
1208 fixture.close().await;
1209 }
1210}