Skip to main content

fxfs_platform_testing/fuchsia/fxblob/
blob.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`FxBlob`] node type used to represent an immutable blob persisted to
6//! disk which can be read back.
7
8use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13    MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer as MemBuffer;
21use fuchsia_component_client::connect_to_protocol;
22use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
23use futures::try_join;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::lock_keys;
27use fxfs::log::*;
28use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
29use fxfs::object_store::transaction::LockKey;
30use fxfs::object_store::{AttributeId, DataObjectHandle, ObjectDescriptor, StoreObjectHandle};
31use fxfs::round::{round_down, round_up};
32use fxfs_macros::ToWeakNode;
33use std::num::NonZero;
34use std::ops::Range;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37use storage_device::buffer::{Buffer, BufferFuture, MutableBufferRef};
38use zx::Status;
39
40// When the top bit of the open count is set, it means the file has been deleted and when the count
41// drops to zero, it will be tombstoned.  Once it has dropped to zero, it cannot be opened again
42// (assertions will fire).
43const PURGED: usize = 1 << (usize::BITS - 1);
44
45/// Represents an immutable blob stored on Fxfs with associated an merkle tree.
46#[derive(ToWeakNode)]
47pub struct FxBlob {
48    handle: StoreObjectHandle<FxVolume>,
49    vmo: zx::Vmo,
50    open_count: AtomicUsize,
51    merkle_root: Hash,
52    merkle_verifier: ReadSizedMerkleVerifier,
53    compression_info: Option<CompressionInfo>,
54    uncompressed_size: u64, // always set.
55    stored_size: u64,
56    pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
57    chunks_supplied: AtomicBitVec,
58}
59
60// Fuchsia can have many open blobs at once. The size of FxBlob is important.
61static_assertions::const_assert!(size_of::<FxBlob>() <= 192);
62
63impl FxBlob {
64    pub async fn new(
65        handle: StoreObjectHandle<FxVolume>,
66        merkle_root: Hash,
67    ) -> Result<Arc<Self>, Error> {
68        let stored_size =
69            handle.store().get_attribute_size(handle.object_id(), AttributeId::DATA).await?;
70        let metadata = BlobMetadata::read_from(&handle).await?;
71        let (uncompressed_size, compression_info) = match &metadata.format {
72            BlobFormat::Uncompressed => (stored_size, None),
73            BlobFormat::ChunkedZstd { uncompressed_size, chunk_size, compressed_offsets } => (
74                *uncompressed_size,
75                Some(CompressionInfo::new(
76                    *chunk_size,
77                    compressed_offsets,
78                    CompressionAlgorithm::Zstd,
79                )?),
80            ),
81            BlobFormat::ChunkedLz4 { uncompressed_size, chunk_size, compressed_offsets } => (
82                *uncompressed_size,
83                Some(CompressionInfo::new(
84                    *chunk_size,
85                    compressed_offsets,
86                    CompressionAlgorithm::Lz4,
87                )?),
88            ),
89        };
90        let merkle_verifier = metadata.into_merkle_verifier(merkle_root)?;
91
92        let min_chunk_size = min_chunk_size(&compression_info);
93        let merkle_verifier =
94            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
95        let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
96
97        Ok(Arc::new_cyclic(|weak| {
98            let (vmo, pager_packet_receiver_registration) = handle
99                .owner()
100                .pager()
101                .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
102                .unwrap();
103            set_vmo_name(&vmo, &merkle_root);
104            Self {
105                handle,
106                vmo,
107                open_count: AtomicUsize::new(0),
108                merkle_root,
109                merkle_verifier,
110                compression_info,
111                uncompressed_size,
112                stored_size,
113                pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
114                chunks_supplied,
115            }
116        }))
117    }
118
119    /// Returns the new blob.
120    pub fn overwrite_me(
121        self: &Arc<Self>,
122        handle: DataObjectHandle<FxVolume>,
123        merkle_verifier: MerkleVerifier,
124        compression_info: Option<CompressionInfo>,
125    ) -> Arc<Self> {
126        let min_chunk_size = min_chunk_size(&compression_info);
127        let merkle_verifier =
128            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
129                .expect("The chunk size should have been validated by the delivery blob parser");
130        // The chunk size may have changed between the old blob and the new blob. Preserving the
131        // chunks supplied bits isn't important.
132        let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
133        let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
134        let stored_size = handle.get_size();
135
136        let new_blob = Arc::new(Self {
137            handle: handle.into_store_object_handle(),
138            vmo,
139            open_count: AtomicUsize::new(0),
140            merkle_root: self.merkle_root,
141            merkle_verifier,
142            compression_info,
143            uncompressed_size: self.uncompressed_size,
144            stored_size,
145            pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
146            chunks_supplied,
147        });
148
149        // We have tests that rely on the cache being purged and there are races where the
150        // `FxBlob::drop` isn't called early enough, which can make the test flaky.
151        self.handle.owner().cache().remove(self.as_ref());
152
153        // Lock must be held until the open counts is incremented to prevent concurrent handling of
154        // zero children signals.
155        let receiver_lock =
156            self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
157        if receiver_lock.is_strong() {
158            // If there was a strong moved between them, then the counts exchange as well. It is
159            // only important that the increment happen under the lock as it may handle the next
160            // zero children signal, no new requests can now go to the old blob, and because
161            // existing requests hold an open count reference using `try_keep_open` for the duration
162            // of the request, we can immediately decrement the open count of the old blob.
163            new_blob.open_count_add_one();
164            self.clone().open_count_sub_one();
165        }
166        new_blob
167    }
168
169    pub fn root(&self) -> Hash {
170        self.merkle_root
171    }
172
173    fn record_page_fault_metric(&self, range: &Range<u64>) {
174        let chunk_size: u64 = min_chunk_size(&self.compression_info);
175
176        let first_chunk = range.start / chunk_size;
177        // The end of the range may not be chunk aligned if it's the last chunk.
178        let last_chunk = range.end.div_ceil(chunk_size);
179
180        let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
181
182        if supplied_count > 0 {
183            self.handle
184                .owner()
185                .blob_resupplied_count()
186                .increment(supplied_count, Ordering::Relaxed);
187        }
188    }
189
190    fn allocate_buffer(&self, size: u64) -> BufferFuture<'_> {
191        self.handle.store().device().allocate_buffer(size as usize)
192    }
193
194    async fn read_blocks(&self, offset: u64, buf: MutableBufferRef<'_>) -> Result<(), Error> {
195        let fs = self.handle.store().filesystem();
196        let guard = fs
197            .lock_manager()
198            .read_lock(lock_keys![LockKey::object_attribute(
199                self.handle.store().store_object_id(),
200                self.handle.object_id(),
201                AttributeId::DATA,
202            )])
203            .await;
204        self.handle.read_unchecked(AttributeId::DATA, offset, buf, &guard).await
205    }
206}
207
208impl Drop for FxBlob {
209    fn drop(&mut self) {
210        let volume = self.handle.owner();
211        volume.cache().remove(self);
212    }
213}
214
215impl OpenedNode<FxBlob> {
216    /// Creates a read-only child VMO for this blob backed by the pager. The blob cannot be purged
217    /// until all child VMOs have been destroyed.
218    ///
219    /// *WARNING*: We need to ensure the open count is non-zero before invoking this function, so
220    /// it is only implemented for [`OpenedNode<FxBlob>`]. This prevents the blob from being purged
221    /// before we get a chance to register it with the pager for [`zx::Signals::VMO_ZERO_CHILDREN`].
222    pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
223        let blob = self.0.as_ref();
224        let child_vmo = blob.vmo.create_child(
225            zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
226            0,
227            0,
228        )?;
229        if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
230            // Take an open count so that we keep this object alive if it is otherwise closed. This
231            // is only valid since we know the current open count is non-zero, otherwise we might
232            // increment the open count after the blob has been purged.
233            blob.open_count_add_one();
234        }
235        Ok(child_vmo)
236    }
237}
238
239impl FxNode for FxBlob {
240    fn object_id(&self) -> u64 {
241        self.handle.object_id()
242    }
243
244    fn parent(&self) -> Option<Arc<FxDirectory>> {
245        unreachable!(); // Add a parent back-reference if needed.
246    }
247
248    fn set_parent(&self, _parent: Arc<FxDirectory>) {
249        // NOP
250    }
251
252    fn open_count_add_one(&self) {
253        let old = self.open_count.fetch_add(1, Ordering::Relaxed);
254        assert!(old != PURGED && old != PURGED - 1);
255    }
256
257    fn open_count_sub_one(self: Arc<Self>) {
258        let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
259        assert!(old & !PURGED > 0);
260        if old == PURGED + 1 {
261            let store = self.handle.store();
262            store
263                .filesystem()
264                .graveyard()
265                .queue_tombstone_object(store.store_object_id(), self.object_id());
266        }
267    }
268
269    fn object_descriptor(&self) -> ObjectDescriptor {
270        ObjectDescriptor::File
271    }
272
273    fn terminate(&self) {
274        self.pager_packet_receiver_registration.stop_watching_for_zero_children();
275    }
276
277    fn mark_to_be_purged(self: Arc<Self>) {
278        let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
279        assert!(old & PURGED == 0);
280        if old == 0 {
281            let store = self.handle.store();
282            store
283                .filesystem()
284                .graveyard()
285                .queue_tombstone_object(store.store_object_id(), self.object_id());
286        }
287    }
288}
289
290impl PagerBacked for FxBlob {
291    fn try_keep_open(self: Arc<Self>) -> Result<OpenedNode<Self>, Arc<Self>> {
292        let mut old = self.open_count.load(Ordering::Relaxed);
293        loop {
294            if old & !PURGED == 0 {
295                return Err(self);
296            }
297            assert!(old & !PURGED < PURGED - 1);
298            match self.open_count.compare_exchange_weak(
299                old,
300                old + 1,
301                Ordering::Relaxed,
302                Ordering::Relaxed,
303            ) {
304                Ok(_) => return Ok(OpenedNode(self)),
305                Err(new_value) => old = new_value,
306            }
307        }
308    }
309
310    fn pager(&self) -> &crate::pager::Pager {
311        self.handle.owner().pager()
312    }
313
314    fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
315        &self.pager_packet_receiver_registration
316    }
317
318    fn vmo(&self) -> &zx::Vmo {
319        &self.vmo
320    }
321
322    fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
323        let read_ahead_size = if let Some(compression_info) = &self.compression_info {
324            read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
325        } else {
326            READ_AHEAD_SIZE
327        };
328        // Delegate to the generic page handling code.
329        default_page_in(self, range, read_ahead_size)
330    }
331
332    fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
333        unreachable!();
334    }
335
336    fn on_zero_children(self: Arc<Self>) {
337        self.open_count_sub_one();
338    }
339
340    fn byte_size(&self) -> u64 {
341        self.uncompressed_size
342    }
343
344    async fn aligned_read(&self, range: Range<u64>) -> Result<Buffer<'_>, Error> {
345        // The vmo shouldn't have full pages beyond the end of the blob so we shouldn't be getting
346        // page faults for ranges beyond the end of the blob.
347        ensure!(range.start < self.uncompressed_size, FxfsError::InvalidArgs);
348        self.record_page_fault_metric(&range);
349
350        let mut buffer = self.allocate_buffer(range.end - range.start).await;
351        let unaligned_bytes =
352            (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
353        match &self.compression_info {
354            None => self.read_blocks(range.start, buffer.as_mut()).await?,
355            Some(compression_info) => {
356                let compressed_offsets =
357                    match compression_info.compressed_range_for_uncompressed_range(&range)? {
358                        (start, None) => start..self.stored_size,
359                        (start, Some(end)) => start..end.get(),
360                    };
361                let bs = self.handle.block_size();
362                let aligned = round_down(compressed_offsets.start, bs)
363                    ..round_up(compressed_offsets.end, bs).unwrap();
364                let mut compressed_buf = self.allocate_buffer(aligned.end - aligned.start).await;
365
366                let mut decompression_errors = 0;
367                loop {
368                    try_join!(self.read_blocks(aligned.start, compressed_buf.as_mut()), async {
369                        buffer
370                            .allocator()
371                            .buffer_source()
372                            .commit_range(buffer.range())
373                            .map_err(|e| e.into())
374                    })
375                    .with_context(|| {
376                        format!(
377                            "Failed to read compressed range {:?}, len {}",
378                            aligned, self.stored_size
379                        )
380                    })?;
381                    let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
382                        ..(compressed_offsets.end - aligned.start) as usize;
383
384                    let buf = buffer.as_mut_slice();
385                    let decompression_result = {
386                        fxfs_trace::duration!("blob-decompress", "len" => unaligned_bytes);
387                        compression_info.decompress(
388                            &compressed_buf.as_slice()[compressed_buf_range],
389                            &mut buf[..unaligned_bytes],
390                            range.start,
391                        )
392                    };
393                    match decompression_result {
394                        Ok(()) => break,
395                        Err(error) => {
396                            record_decompression_error_crash_report(
397                                compressed_buf.as_slice(),
398                                &range,
399                                &compressed_offsets,
400                                &self.merkle_root,
401                            )
402                            .await;
403                            decompression_errors += 1;
404                            if decompression_errors == 2 {
405                                bail!(
406                                    anyhow!(FxfsError::IntegrityError)
407                                        .context(format!("Decompression error: {error:?}"))
408                                );
409                            } else {
410                                warn!(error:?; "Decompression error; retrying");
411                            }
412                        }
413                    }
414                } // loop
415                if decompression_errors > 0 {
416                    info!("Read succeeded on second attempt");
417                }
418            }
419        };
420        {
421            // TODO(https://fxbug.dev/42073035): This should be offloaded to the kernel at which
422            // point we can delete this.
423            fxfs_trace::duration!("blob-verify", "len" => unaligned_bytes);
424            self.merkle_verifier
425                .verify(range.start as usize, &buffer.as_slice()[..unaligned_bytes])?;
426        }
427        // Zero the tail.
428        buffer.as_mut_slice()[unaligned_bytes..].fill(0);
429        Ok(buffer)
430    }
431}
432
433pub struct CompressionInfo {
434    chunk_size: u64,
435    // The chunked compression format stores 0 as the first offset but it's not stored here. Not
436    // storing the 0 avoids the allocation for blobs smaller than the chunk size.
437    small_offsets: Box<[u32]>,
438    large_offsets: Box<[u64]>,
439    decompressor: ThreadLocalDecompressor,
440}
441
442impl CompressionInfo {
443    pub fn new(
444        chunk_size: u64,
445        offsets: &[u64],
446        compression_algorithm: CompressionAlgorithm,
447    ) -> Result<Self, Error> {
448        let decompressor = compression_algorithm.thread_local_decompressor();
449        if chunk_size == 0 {
450            return Err(FxfsError::IntegrityError.into());
451        } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
452            // There should always be at least 1 offset and the first offset must always be 0.
453            Err(FxfsError::IntegrityError.into())
454        } else if !offsets.array_windows().all(|[a, b]| a < b) {
455            // The offsets must be in ascending order.
456            Err(FxfsError::IntegrityError.into())
457        } else if offsets.len() == 1 {
458            // Simple case where the blob is smaller than the chunk size so only the 0 offset is
459            // present. The 0 isn't stored so no allocation is necessary.
460            Ok(Self {
461                chunk_size,
462                small_offsets: Box::default(),
463                large_offsets: Box::default(),
464                decompressor,
465            })
466        } else if *offsets.last().unwrap() <= u32::MAX as u64 {
467            // Check the last index first since most compressed blobs are going to be smaller
468            // than 4GiB making all offsets small.
469            Ok(Self {
470                chunk_size,
471                small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
472                large_offsets: Box::default(),
473                decompressor,
474            })
475        } else {
476            // The partition point is the index of the first compressed offset that's > u32::MAX.
477            let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
478            Ok(Self {
479                chunk_size,
480                small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
481                large_offsets: offsets[partition_point..].into(),
482                decompressor,
483            })
484        }
485    }
486
487    fn compressed_range_for_uncompressed_range(
488        &self,
489        range: &Range<u64>,
490    ) -> Result<(u64, Option<NonZero<u64>>), Error> {
491        ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
492        ensure!(range.start < range.end, FxfsError::Inconsistent);
493
494        let start_chunk_index = (range.start / self.chunk_size) as usize;
495        let start_offset = self
496            .compressed_offset_for_chunk_index(start_chunk_index)
497            .ok_or(FxfsError::OutOfRange)?;
498
499        // The end of the range may not be aligned to the chunk size for the last chunk.
500        let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
501        let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
502            None => None,
503            Some(offset) => {
504                // This isn't the last chunk so the end must be aligned.
505                ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
506                // `CompressionInfo::new` validates that all of the offsets are ascending. The end
507                // of the range is greater than the start so this can never be 0.
508                Some(NonZero::new(offset).unwrap())
509            }
510        };
511        Ok((start_offset, end_offset))
512    }
513
514    fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
515        // The "0" compressed offset isn't stored so all of the indices are shifted left by 1.
516        if chunk_index == 0 {
517            Some(0)
518        } else if chunk_index - 1 < self.small_offsets.len() {
519            Some(self.small_offsets[chunk_index - 1] as u64)
520        } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
521            Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
522        } else {
523            None
524        }
525    }
526
527    /// Decompress the bytes of `src` into `dst`.
528    ///   - `src` is allowed to span multiple chunks.
529    ///   - `dst` must have the exact size of the uncompressed bytes.
530    ///   - `dst_start_offset` is the location of the uncompressed bytes within the blob and must be
531    ///     chunk aligned. This is necessary for determining the chunk boundaries in `src`.
532    fn decompress(
533        &self,
534        mut src: &[u8],
535        mut dst: &mut [u8],
536        dst_start_offset: u64,
537    ) -> Result<(), Error> {
538        ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
539
540        let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
541        let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
542        let mut start_offset = self
543            .compressed_offset_for_chunk_index(start_chunk_index)
544            .ok_or(FxfsError::Inconsistent)?;
545
546        // Decompress each chunk individually.
547        for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
548            match self.compressed_offset_for_chunk_index(chunk_index + 1) {
549                Some(end_offset) => {
550                    let (to_decompress, src_remaining) = src
551                        .split_at_checked((end_offset - start_offset) as usize)
552                        .ok_or(FxfsError::Inconsistent)?;
553                    let (to_decompress_into, dst_remaining) = dst
554                        .split_at_mut_checked(self.chunk_size as usize)
555                        .ok_or(FxfsError::Inconsistent)?;
556
557                    let decompressed_bytes = self.decompressor.decompress_into(
558                        to_decompress,
559                        to_decompress_into,
560                        chunk_index,
561                    )?;
562                    ensure!(
563                        decompressed_bytes == to_decompress_into.len(),
564                        FxfsError::Inconsistent
565                    );
566                    src = src_remaining;
567                    dst = dst_remaining;
568                    start_offset = end_offset;
569                }
570                None => {
571                    let decompressed_bytes =
572                        self.decompressor.decompress_into(src, dst, chunk_index)?;
573                    ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
574                }
575            }
576        }
577
578        Ok(())
579    }
580}
581
582fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
583    let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
584    let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
585    let name = zx::Name::new(&name).unwrap();
586    vmo.set_name(&name).unwrap();
587}
588
589fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
590    if let Some(compression_info) = compression_info {
591        read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
592    } else {
593        READ_AHEAD_SIZE
594    }
595}
596
597fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
598    if chunk_size >= suggested_read_ahead_size {
599        chunk_size
600    } else {
601        round_down(suggested_read_ahead_size, chunk_size)
602    }
603}
604
605async fn record_decompression_error_crash_report(
606    compressed_buf: &[u8],
607    uncompressed_offsets: &Range<u64>,
608    compressed_offsets: &Range<u64>,
609    merkle_root: &Hash,
610) {
611    static DONE_ONCE: AtomicBool = AtomicBool::new(false);
612    if !DONE_ONCE.swap(true, Ordering::Relaxed) {
613        if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
614            let size = compressed_buf.len() as u64;
615            let vmo = zx::Vmo::create(size).unwrap();
616            vmo.write(compressed_buf, 0).unwrap();
617            if let Err(e) = proxy
618                .file_report(CrashReport {
619                    program_name: Some("fxfs".to_string()),
620                    crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
621                    is_fatal: Some(false),
622                    annotations: Some(vec![
623                        Annotation {
624                            key: "fxfs.range".to_string(),
625                            value: format!("{:?}", uncompressed_offsets),
626                        },
627                        Annotation {
628                            key: "fxfs.compressed_offsets".to_string(),
629                            value: format!("{:?}", compressed_offsets),
630                        },
631                        Annotation {
632                            key: "fxfs.merkle_root".to_string(),
633                            value: format!("{}", merkle_root),
634                        },
635                    ]),
636                    attachments: Some(vec![Attachment {
637                        key: "fxfs_compressed_data".to_string(),
638                        value: MemBuffer { vmo, size },
639                    }]),
640                    ..Default::default()
641                })
642                .await
643            {
644                error!(e:?; "Failed to file crash report");
645            } else {
646                warn!("Filed crash report for decompression error");
647            }
648        } else {
649            error!("Failed to connect to crash report service");
650        }
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
658    use crate::fuchsia::pager::PageInRange;
659    use crate::fxblob::testing::open_blob_fixture;
660    use assert_matches::assert_matches;
661    use delivery_blob::CompressionMode;
662    use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
663    use fuchsia_async as fasync;
664    use fuchsia_async::epoch::Epoch;
665    use fxfs_make_blob_image::FxBlobBuilder;
666    use storage_device::DeviceHolder;
667    use storage_device::fake_device::FakeDevice;
668
669    const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
670    const CHUNK_SIZE: usize = 32 * 1024;
671
672    #[fasync::run(10, test)]
673    async fn test_empty_blob() {
674        let fixture = new_blob_fixture().await;
675
676        let data = vec![];
677        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678        assert_eq!(fixture.read_blob(hash).await, data);
679
680        fixture.close().await;
681    }
682
683    #[fasync::run(10, test)]
684    async fn test_large_blob() {
685        let fixture = new_blob_fixture().await;
686
687        let data = vec![3; 3_000_000];
688        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
689
690        assert_eq!(fixture.read_blob(hash).await, data);
691
692        fixture.close().await;
693    }
694
695    #[fasync::run(10, test)]
696    async fn test_large_compressed_blob() {
697        let fixture = new_blob_fixture().await;
698
699        let data = vec![3; 3_000_000];
700        let hash = fixture.write_blob(&data, CompressionMode::Always).await;
701
702        assert_eq!(fixture.read_blob(hash).await, data);
703
704        fixture.close().await;
705    }
706
707    #[fasync::run(10, test)]
708    async fn test_non_page_aligned_blob() {
709        let fixture = new_blob_fixture().await;
710
711        let page_size = zx::system_get_page_size() as usize;
712        let data = vec![0xffu8; page_size - 1];
713        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
714        assert_eq!(fixture.read_blob(hash).await, data);
715
716        {
717            let vmo = fixture.get_blob_vmo(hash).await;
718            let mut buf = vec![0x11u8; page_size];
719            vmo.read(&mut buf[..], 0).expect("vmo read failed");
720            assert_eq!(data, buf[..data.len()]);
721            // Ensure the tail is zeroed
722            assert_eq!(buf[data.len()], 0);
723        }
724
725        fixture.close().await;
726    }
727
728    #[fasync::run(10, test)]
729    async fn test_blob_invalid_contents() {
730        let fixture = new_blob_fixture().await;
731
732        let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
733        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
734        let name = format!("{}", hash);
735
736        {
737            // Overwrite the second read-ahead window.  The first window should successfully verify.
738            let handle = fixture.get_blob_handle(&name).await;
739            let mut transaction =
740                handle.new_transaction().await.expect("failed to create transaction");
741            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
742            buf.as_mut_slice().fill(0);
743            handle
744                .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
745                .await
746                .expect("txn_write failed");
747            transaction.commit().await.expect("failed to commit transaction");
748        }
749
750        {
751            let blob_vmo = fixture.get_blob_vmo(hash).await;
752            let mut buf = vec![0; BLOCK_SIZE as usize];
753            assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
754            assert_matches!(
755                blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
756                Err(zx::Status::IO_DATA_INTEGRITY)
757            );
758        }
759
760        fixture.close().await;
761    }
762
763    #[fasync::run(10, test)]
764    async fn test_lz4_blob() {
765        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
766        let blob_data = vec![0xAA; 68 * 1024];
767        let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
768        let blob = fxblob_builder
769            .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
770            .unwrap();
771        let blob_hash = blob.hash();
772        fxblob_builder.install_blob(&blob).await.unwrap();
773        let device = fxblob_builder.finalize().await.unwrap().0;
774        device.reopen(/*read_only=*/ false);
775        let fixture = open_blob_fixture(device).await;
776
777        assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
778
779        fixture.close().await;
780    }
781
782    #[fasync::run(10, test)]
783    async fn test_blob_vmos_are_immutable() {
784        let fixture = new_blob_fixture().await;
785
786        let data = vec![0xffu8; 500];
787        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
788        let blob_vmo = fixture.get_blob_vmo(hash).await;
789
790        // The VMO shouldn't be resizable.
791        assert_matches!(blob_vmo.set_size(20), Err(_));
792
793        // The VMO shouldn't be writable.
794        assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
795
796        // The VMO's content size shouldn't be modifiable.
797        assert_matches!(blob_vmo.set_stream_size(20), Err(_));
798
799        fixture.close().await;
800    }
801
802    const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
803    const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
804    const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
805
806    #[fuchsia::test]
807    fn test_compression_info_offsets_must_start_with_zero() {
808        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
809        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
810        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
811    }
812
813    #[fuchsia::test]
814    fn test_compression_info_offsets_must_be_sorted() {
815        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
816        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
817        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
818    }
819
820    #[fuchsia::test]
821    fn test_compression_info_splitting_offsets() {
822        // Single chunk blob doesn't store any offsets.
823        let compression_info =
824            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
825        assert!(compression_info.small_offsets.is_empty());
826        assert!(compression_info.large_offsets.is_empty());
827
828        // Single small offset.
829        let compression_info =
830            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
831        assert_eq!(&*compression_info.small_offsets, &[10]);
832        assert!(compression_info.large_offsets.is_empty());
833
834        // Multiple small offsets.
835        let compression_info =
836            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
837        assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
838        assert!(compression_info.large_offsets.is_empty());
839
840        // One less than the largest small offset.
841        let compression_info =
842            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
843                .unwrap();
844        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
845        assert!(compression_info.large_offsets.is_empty());
846
847        // The largest small offset.
848        let compression_info =
849            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
850        assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
851        assert!(compression_info.large_offsets.is_empty());
852
853        // The smallest large offset.
854        let compression_info =
855            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
856                .unwrap();
857        assert!(compression_info.small_offsets.is_empty());
858        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
859
860        // Multiple offsets around boundary between small and large offsets.
861        let compression_info = CompressionInfo::new(
862            COMPRESSED_BLOB_CHUNK_SIZE,
863            &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
864            ZSTD,
865        )
866        .unwrap();
867        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
868        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
869
870        // Single large offset.
871        let compression_info =
872            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
873                .unwrap();
874        assert!(compression_info.small_offsets.is_empty());
875        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
876
877        // Multiple large offsets.
878        let compression_info = CompressionInfo::new(
879            COMPRESSED_BLOB_CHUNK_SIZE,
880            &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
881            ZSTD,
882        )
883        .unwrap();
884        assert!(compression_info.small_offsets.is_empty());
885        assert_eq!(
886            &*compression_info.large_offsets,
887            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
888        );
889
890        // Small and large offsets.
891        let compression_info = CompressionInfo::new(
892            COMPRESSED_BLOB_CHUNK_SIZE,
893            &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
894            ZSTD,
895        )
896        .unwrap();
897        assert_eq!(&*compression_info.small_offsets, &[10, 20]);
898        assert_eq!(
899            &*compression_info.large_offsets,
900            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
901        );
902    }
903
904    #[fuchsia::test]
905    fn test_compression_info_compressed_range_for_uncompressed_range() {
906        fn check_compression_ranges(
907            offsets: &[u64],
908            expected_ranges: &[(u64, Option<u64>)],
909            chunk_size: u64,
910            read_ahead_size: u64,
911        ) {
912            let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
913            for (i, range) in expected_ranges.iter().enumerate() {
914                let i = i as u64;
915                let result = compression_info
916                    .compressed_range_for_uncompressed_range(
917                        &(i * read_ahead_size..(i + 1) * read_ahead_size),
918                    )
919                    .unwrap();
920                assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
921            }
922        }
923        check_compression_ranges(
924            &[0, 10, 20, 30],
925            &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
926            COMPRESSED_BLOB_CHUNK_SIZE,
927            COMPRESSED_BLOB_CHUNK_SIZE,
928        );
929        check_compression_ranges(
930            &[0, 10, 20, 30],
931            &[(0, Some(20)), (20, None)],
932            COMPRESSED_BLOB_CHUNK_SIZE,
933            COMPRESSED_BLOB_CHUNK_SIZE * 2,
934        );
935        check_compression_ranges(
936            &[0, 10, 20, 30],
937            &[(0, None)],
938            COMPRESSED_BLOB_CHUNK_SIZE,
939            COMPRESSED_BLOB_CHUNK_SIZE * 4,
940        );
941        check_compression_ranges(
942            &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
943            &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
944            COMPRESSED_BLOB_CHUNK_SIZE,
945            COMPRESSED_BLOB_CHUNK_SIZE * 4,
946        );
947        check_compression_ranges(
948            &[
949                0,
950                10,
951                20,
952                30,
953                MAX_SMALL_OFFSET + 10,
954                MAX_SMALL_OFFSET + 20,
955                MAX_SMALL_OFFSET + 30,
956                MAX_SMALL_OFFSET + 40,
957                MAX_SMALL_OFFSET + 50,
958            ],
959            &[
960                (0, Some(20)),
961                (20, Some(MAX_SMALL_OFFSET + 10)),
962                (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
963                (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
964            ],
965            COMPRESSED_BLOB_CHUNK_SIZE,
966            COMPRESSED_BLOB_CHUNK_SIZE * 2,
967        );
968    }
969
970    #[fuchsia::test]
971    fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
972        let compression_info = CompressionInfo::new(
973            COMPRESSED_BLOB_CHUNK_SIZE,
974            &[
975                0,
976                10,
977                20,
978                30,
979                MAX_SMALL_OFFSET + 10,
980                MAX_SMALL_OFFSET + 20,
981                MAX_SMALL_OFFSET + 30,
982                MAX_SMALL_OFFSET + 40,
983                MAX_SMALL_OFFSET + 50,
984            ],
985            ZSTD,
986        )
987        .unwrap();
988
989        // The start of reads must be chunk aligned.
990        assert!(
991            compression_info
992                .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
993                .is_err()
994        );
995
996        // Reading entirely past the last offset isn't allowed.
997        assert!(
998            compression_info
999                .compressed_range_for_uncompressed_range(
1000                    &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
1001                )
1002                .is_err()
1003        );
1004
1005        // Reading a different amount than the read-ahead size isn't allowed for middle offsets.
1006        assert!(
1007            compression_info
1008                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
1009                .is_err()
1010        );
1011        assert!(
1012            compression_info
1013                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
1014                .is_err()
1015        );
1016        assert!(
1017            compression_info
1018                .compressed_range_for_uncompressed_range(
1019                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
1020                )
1021                .is_err()
1022        );
1023        assert!(
1024            compression_info
1025                .compressed_range_for_uncompressed_range(
1026                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
1027                )
1028                .is_err()
1029        );
1030
1031        // Reading less than the read-ahead size for the last offset is allowed.
1032        assert!(
1033            compression_info
1034                .compressed_range_for_uncompressed_range(
1035                    &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
1036                )
1037                .is_ok()
1038        );
1039    }
1040
1041    #[fuchsia::test]
1042    fn test_read_ahead_size_for_chunk_size() {
1043        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
1044        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
1045        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
1046
1047        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
1048        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
1049        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
1050        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
1051
1052        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1053        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1054        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1055        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1056    }
1057
1058    fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1059        let options =
1060            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1061        let mut compressor = options.compressor();
1062        let mut uncompressed_data = Vec::with_capacity(size);
1063        {
1064            let mut run_length = 1;
1065            let mut run_value: u8 = 0;
1066            while uncompressed_data.len() < size {
1067                uncompressed_data
1068                    .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1069                run_length = (run_length + 1) % 19 + 1;
1070                run_value = (run_value + 1) % 17;
1071            }
1072        }
1073        let mut compressed_offsets = vec![0];
1074        let mut compressed_data = vec![];
1075        for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1076            let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1077            compressed_data.append(&mut compressed_chunk);
1078            compressed_offsets.push(compressed_data.len() as u64);
1079        }
1080        compressed_offsets.pop();
1081        (
1082            CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1083                .unwrap(),
1084            compressed_data,
1085            uncompressed_data,
1086        )
1087    }
1088
1089    #[fuchsia::test]
1090    fn test_compression_info_decompress_single_chunk() {
1091        let (compression_info, compressed_data, uncompressed_data) =
1092            build_compression_info(CHUNK_SIZE);
1093        let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1094
1095        compression_info
1096            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1097            .expect("failed to decompress");
1098        assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1099
1100        // Too small of destination buffer.
1101        compression_info
1102            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1103            .expect_err("decompression should fail");
1104
1105        // Too large of destination buffer.
1106        compression_info
1107            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1108            .expect_err("decompression should fail");
1109    }
1110
1111    #[fuchsia::test]
1112    fn test_compression_info_decompress_multiple_chunks() {
1113        fn slice_for_chunks<'a>(
1114            compressed_data: &'a [u8],
1115            compression_info: &CompressionInfo,
1116            chunks: Range<u64>,
1117        ) -> &'a [u8] {
1118            let (start, end) = compression_info
1119                .compressed_range_for_uncompressed_range(
1120                    &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1121                )
1122                .unwrap();
1123            let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1124            &compressed_data[start as usize..end as usize]
1125        }
1126
1127        const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1128        let (compression_info, compressed_data, uncompressed_data) =
1129            build_compression_info(BLOB_SIZE);
1130        let mut decompressed_data = vec![0u8; BLOB_SIZE];
1131
1132        // Decompress the entire blob.
1133        compression_info
1134            .decompress(&compressed_data, &mut decompressed_data, 0)
1135            .expect("failed to decompress");
1136        assert_eq!(uncompressed_data, decompressed_data);
1137
1138        // Decompress just the whole chunks.
1139        compression_info
1140            .decompress(
1141                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1142                &mut decompressed_data[0..CHUNK_SIZE * 4],
1143                0,
1144            )
1145            .expect("failed to decompress");
1146        assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1147
1148        // Too small of destination buffer for whole chunks.
1149        compression_info
1150            .decompress(
1151                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1152                &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1153                0,
1154            )
1155            .expect_err("decompression should fail");
1156
1157        // Too large of destination buffer for whole chunks.
1158        compression_info
1159            .decompress(
1160                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1161                &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1162                0,
1163            )
1164            .expect_err("decompression should fail");
1165
1166        // Decompress just the tail.
1167        let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1168        compression_info
1169            .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1170            .expect("failed to decompress");
1171        assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1172
1173        // Too small of destination buffer for the tail.
1174        compression_info
1175            .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1176            .expect_err("decompression should fail");
1177
1178        // Too large of destination buffer for the tail.
1179        compression_info
1180            .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1181            .expect_err("decompression should fail");
1182    }
1183
1184    #[fasync::run(10, test)]
1185    async fn test_refault_metric() {
1186        let fixture = new_blob_fixture().await;
1187        {
1188            let volume = fixture.volume().volume().clone();
1189            const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1190            let data = vec![0xffu8; FILE_SIZE as usize];
1191            let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1192
1193            let blob = fixture.get_opened_blob(hash).await.unwrap();
1194            assert_eq!(blob.chunks_supplied.len(), 4);
1195            // Nothing has been read yet.
1196            assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1197
1198            blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1199
1200            assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1201
1202            blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1203            assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1204
1205            // We have loaded pages, but only once each.
1206            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1207
1208            // Re-read some pages.
1209
1210            // We can't evict pages from the VMO to get the kernel to resupply them but we can call
1211            // page_in directly and wait for the counters to change.
1212            blob.clone().page_in(PageInRange::new(
1213                FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1214                blob.dup(),
1215                Epoch::global().guard(),
1216            ));
1217            Epoch::global().barrier().await;
1218
1219            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1220        }
1221
1222        fixture.close().await;
1223    }
1224}