Skip to main content

fxfs_platform_testing/fuchsia/fxblob/
blob.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`FxBlob`] node type used to represent an immutable blob persisted to
6//! disk which can be read back.
7
8use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13    MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer as MemBuffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
26use fxfs::errors::FxfsError;
27use fxfs::lock_keys;
28use fxfs::log::*;
29use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
30use fxfs::object_store::transaction::LockKey;
31use fxfs::object_store::{
32    DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, ObjectDescriptor, StoreObjectHandle,
33};
34use fxfs::round::{round_down, round_up};
35use fxfs_macros::ToWeakNode;
36use std::num::NonZero;
37use std::ops::Range;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
40use storage_device::buffer::{Buffer, BufferFuture, MutableBufferRef};
41use zx::Status;
42
43// When the top bit of the open count is set, it means the file has been deleted and when the count
44// drops to zero, it will be tombstoned.  Once it has dropped to zero, it cannot be opened again
45// (assertions will fire).
46const PURGED: usize = 1 << (usize::BITS - 1);
47
48/// Represents an immutable blob stored on Fxfs with associated an merkle tree.
49#[derive(ToWeakNode)]
50pub struct FxBlob {
51    handle: StoreObjectHandle<FxVolume>,
52    vmo: zx::Vmo,
53    open_count: AtomicUsize,
54    merkle_root: Hash,
55    merkle_verifier: ReadSizedMerkleVerifier,
56    compression_info: Option<CompressionInfo>,
57    uncompressed_size: u64, // always set.
58    stored_size: u64,
59    pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
60    chunks_supplied: AtomicBitVec,
61}
62
63// Fuchsia can have many open blobs at once. The size of FxBlob is important.
64static_assertions::const_assert!(size_of::<FxBlob>() <= 192);
65
66impl FxBlob {
67    pub async fn new(
68        handle: StoreObjectHandle<FxVolume>,
69        merkle_root: Hash,
70    ) -> Result<Arc<Self>, Error> {
71        let stored_size = handle
72            .store()
73            .get_attribute_size(handle.object_id(), DEFAULT_DATA_ATTRIBUTE_ID)
74            .await?;
75        let metadata = BlobMetadata::read_from(&handle).await?;
76        let (uncompressed_size, compression_info) = match &metadata.format {
77            BlobFormat::Uncompressed => (stored_size, None),
78            BlobFormat::ChunkedZstd { uncompressed_size, chunk_size, compressed_offsets } => (
79                *uncompressed_size,
80                Some(CompressionInfo::new(
81                    *chunk_size,
82                    compressed_offsets,
83                    CompressionAlgorithm::Zstd,
84                )?),
85            ),
86            BlobFormat::ChunkedLz4 { uncompressed_size, chunk_size, compressed_offsets } => (
87                *uncompressed_size,
88                Some(CompressionInfo::new(
89                    *chunk_size,
90                    compressed_offsets,
91                    CompressionAlgorithm::Lz4,
92                )?),
93            ),
94        };
95        let merkle_verifier = metadata.into_merkle_verifier(merkle_root)?;
96
97        let min_chunk_size = min_chunk_size(&compression_info);
98        let merkle_verifier =
99            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
100        let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
101
102        Ok(Arc::new_cyclic(|weak| {
103            let (vmo, pager_packet_receiver_registration) = handle
104                .owner()
105                .pager()
106                .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
107                .unwrap();
108            set_vmo_name(&vmo, &merkle_root);
109            Self {
110                handle,
111                vmo,
112                open_count: AtomicUsize::new(0),
113                merkle_root,
114                merkle_verifier,
115                compression_info,
116                uncompressed_size,
117                stored_size,
118                pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
119                chunks_supplied,
120            }
121        }))
122    }
123
124    /// Returns the new blob.
125    pub fn overwrite_me(
126        self: &Arc<Self>,
127        handle: DataObjectHandle<FxVolume>,
128        merkle_verifier: MerkleVerifier,
129        compression_info: Option<CompressionInfo>,
130    ) -> Arc<Self> {
131        let min_chunk_size = min_chunk_size(&compression_info);
132        let merkle_verifier =
133            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
134                .expect("The chunk size should have been validated by the delivery blob parser");
135        // The chunk size may have changed between the old blob and the new blob. Preserving the
136        // chunks supplied bits isn't important.
137        let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
138        let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
139        let stored_size = handle.get_size();
140
141        let new_blob = Arc::new(Self {
142            handle: handle.into_store_object_handle(),
143            vmo,
144            open_count: AtomicUsize::new(0),
145            merkle_root: self.merkle_root,
146            merkle_verifier,
147            compression_info,
148            uncompressed_size: self.uncompressed_size,
149            stored_size,
150            pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
151            chunks_supplied,
152        });
153
154        // We have tests that rely on the cache being purged and there are races where the
155        // `FxBlob::drop` isn't called early enough, which can make the test flaky.
156        self.handle.owner().cache().remove(self.as_ref());
157
158        // Lock must be held until the open counts is incremented to prevent concurrent handling of
159        // zero children signals.
160        let receiver_lock =
161            self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
162        if receiver_lock.is_strong() {
163            // If there was a strong moved between them, then the counts exchange as well. It is
164            // only important that the increment happen under the lock as it may handle the next
165            // zero children signal, no new requests can now go to the old blob, but to safely
166            // ensure that all existing requests finish, we will defer the open count decrement.
167            new_blob.open_count_add_one();
168            let old_blob = self.clone();
169            Epoch::global().defer(move || old_blob.open_count_sub_one());
170        }
171        new_blob
172    }
173
174    pub fn root(&self) -> Hash {
175        self.merkle_root
176    }
177
178    fn record_page_fault_metric(&self, range: &Range<u64>) {
179        let chunk_size: u64 = min_chunk_size(&self.compression_info);
180
181        let first_chunk = range.start / chunk_size;
182        // The end of the range may not be chunk aligned if it's the last chunk.
183        let last_chunk = range.end.div_ceil(chunk_size);
184
185        let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
186
187        if supplied_count > 0 {
188            self.handle
189                .owner()
190                .blob_resupplied_count()
191                .increment(supplied_count, Ordering::Relaxed);
192        }
193    }
194
195    fn allocate_buffer(&self, size: u64) -> BufferFuture<'_> {
196        self.handle.store().device().allocate_buffer(size as usize)
197    }
198
199    async fn read_blocks(&self, offset: u64, buf: MutableBufferRef<'_>) -> Result<(), Error> {
200        let fs = self.handle.store().filesystem();
201        let guard = fs
202            .lock_manager()
203            .read_lock(lock_keys![LockKey::object_attribute(
204                self.handle.store().store_object_id(),
205                self.handle.object_id(),
206                DEFAULT_DATA_ATTRIBUTE_ID,
207            )])
208            .await;
209        self.handle.read_unchecked(DEFAULT_DATA_ATTRIBUTE_ID, offset, buf, &guard).await
210    }
211}
212
213impl Drop for FxBlob {
214    fn drop(&mut self) {
215        let volume = self.handle.owner();
216        volume.cache().remove(self);
217    }
218}
219
220impl OpenedNode<FxBlob> {
221    /// Creates a read-only child VMO for this blob backed by the pager. The blob cannot be purged
222    /// until all child VMOs have been destroyed.
223    ///
224    /// *WARNING*: We need to ensure the open count is non-zero before invoking this function, so
225    /// it is only implemented for [`OpenedNode<FxBlob>`]. This prevents the blob from being purged
226    /// before we get a chance to register it with the pager for [`zx::Signals::VMO_ZERO_CHILDREN`].
227    pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
228        let blob = self.0.as_ref();
229        let child_vmo = blob.vmo.create_child(
230            zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
231            0,
232            0,
233        )?;
234        if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
235            // Take an open count so that we keep this object alive if it is otherwise closed. This
236            // is only valid since we know the current open count is non-zero, otherwise we might
237            // increment the open count after the blob has been purged.
238            blob.open_count_add_one();
239        }
240        Ok(child_vmo)
241    }
242}
243
244impl FxNode for FxBlob {
245    fn object_id(&self) -> u64 {
246        self.handle.object_id()
247    }
248
249    fn parent(&self) -> Option<Arc<FxDirectory>> {
250        unreachable!(); // Add a parent back-reference if needed.
251    }
252
253    fn set_parent(&self, _parent: Arc<FxDirectory>) {
254        // NOP
255    }
256
257    fn open_count_add_one(&self) {
258        let old = self.open_count.fetch_add(1, Ordering::Relaxed);
259        assert!(old != PURGED && old != PURGED - 1);
260    }
261
262    fn open_count_sub_one(self: Arc<Self>) {
263        let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
264        assert!(old & !PURGED > 0);
265        if old == PURGED + 1 {
266            let store = self.handle.store();
267            store
268                .filesystem()
269                .graveyard()
270                .queue_tombstone_object(store.store_object_id(), self.object_id());
271        }
272    }
273
274    fn object_descriptor(&self) -> ObjectDescriptor {
275        ObjectDescriptor::File
276    }
277
278    fn terminate(&self) {
279        self.pager_packet_receiver_registration.stop_watching_for_zero_children();
280    }
281
282    fn mark_to_be_purged(self: Arc<Self>) {
283        let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
284        assert!(old & PURGED == 0);
285        if old == 0 {
286            let store = self.handle.store();
287            store
288                .filesystem()
289                .graveyard()
290                .queue_tombstone_object(store.store_object_id(), self.object_id());
291        }
292    }
293}
294
295impl PagerBacked for FxBlob {
296    fn pager(&self) -> &crate::pager::Pager {
297        self.handle.owner().pager()
298    }
299
300    fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
301        &self.pager_packet_receiver_registration
302    }
303
304    fn vmo(&self) -> &zx::Vmo {
305        &self.vmo
306    }
307
308    fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
309        let read_ahead_size = if let Some(compression_info) = &self.compression_info {
310            read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
311        } else {
312            READ_AHEAD_SIZE
313        };
314        // Delegate to the generic page handling code.
315        default_page_in(self, range, read_ahead_size)
316    }
317
318    fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
319        unreachable!();
320    }
321
322    fn on_zero_children(self: Arc<Self>) {
323        self.open_count_sub_one();
324    }
325
326    fn byte_size(&self) -> u64 {
327        self.uncompressed_size
328    }
329
330    async fn aligned_read(&self, range: Range<u64>) -> Result<Buffer<'_>, Error> {
331        // The vmo shouldn't have full pages beyond the end of the blob so we shouldn't be getting
332        // page faults for ranges beyond the end of the blob.
333        ensure!(range.start < self.uncompressed_size, FxfsError::InvalidArgs);
334        self.record_page_fault_metric(&range);
335
336        let mut buffer = self.allocate_buffer(range.end - range.start).await;
337        let unaligned_bytes =
338            (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
339        match &self.compression_info {
340            None => self.read_blocks(range.start, buffer.as_mut()).await?,
341            Some(compression_info) => {
342                let compressed_offsets =
343                    match compression_info.compressed_range_for_uncompressed_range(&range)? {
344                        (start, None) => start..self.stored_size,
345                        (start, Some(end)) => start..end.get(),
346                    };
347                let bs = self.handle.block_size();
348                let aligned = round_down(compressed_offsets.start, bs)
349                    ..round_up(compressed_offsets.end, bs).unwrap();
350                let mut compressed_buf = self.allocate_buffer(aligned.end - aligned.start).await;
351
352                let mut decompression_errors = 0;
353                loop {
354                    try_join!(self.read_blocks(aligned.start, compressed_buf.as_mut()), async {
355                        buffer
356                            .allocator()
357                            .buffer_source()
358                            .commit_range(buffer.range())
359                            .map_err(|e| e.into())
360                    })
361                    .with_context(|| {
362                        format!(
363                            "Failed to read compressed range {:?}, len {}",
364                            aligned, self.stored_size
365                        )
366                    })?;
367                    let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
368                        ..(compressed_offsets.end - aligned.start) as usize;
369
370                    let buf = buffer.as_mut_slice();
371                    let decompression_result = {
372                        fxfs_trace::duration!("blob-decompress", "len" => unaligned_bytes);
373                        compression_info.decompress(
374                            &compressed_buf.as_slice()[compressed_buf_range],
375                            &mut buf[..unaligned_bytes],
376                            range.start,
377                        )
378                    };
379                    match decompression_result {
380                        Ok(()) => break,
381                        Err(error) => {
382                            record_decompression_error_crash_report(
383                                compressed_buf.as_slice(),
384                                &range,
385                                &compressed_offsets,
386                                &self.merkle_root,
387                            )
388                            .await;
389                            decompression_errors += 1;
390                            if decompression_errors == 2 {
391                                bail!(
392                                    anyhow!(FxfsError::IntegrityError)
393                                        .context(format!("Decompression error: {error:?}"))
394                                );
395                            } else {
396                                warn!(error:?; "Decompression error; retrying");
397                            }
398                        }
399                    }
400                } // loop
401                if decompression_errors > 0 {
402                    info!("Read succeeded on second attempt");
403                }
404            }
405        };
406        {
407            // TODO(https://fxbug.dev/42073035): This should be offloaded to the kernel at which
408            // point we can delete this.
409            fxfs_trace::duration!("blob-verify", "len" => unaligned_bytes);
410            self.merkle_verifier
411                .verify(range.start as usize, &buffer.as_slice()[..unaligned_bytes])?;
412        }
413        // Zero the tail.
414        buffer.as_mut_slice()[unaligned_bytes..].fill(0);
415        Ok(buffer)
416    }
417}
418
419pub struct CompressionInfo {
420    chunk_size: u64,
421    // The chunked compression format stores 0 as the first offset but it's not stored here. Not
422    // storing the 0 avoids the allocation for blobs smaller than the chunk size.
423    small_offsets: Box<[u32]>,
424    large_offsets: Box<[u64]>,
425    decompressor: ThreadLocalDecompressor,
426}
427
428impl CompressionInfo {
429    pub fn new(
430        chunk_size: u64,
431        offsets: &[u64],
432        compression_algorithm: CompressionAlgorithm,
433    ) -> Result<Self, Error> {
434        let decompressor = compression_algorithm.thread_local_decompressor();
435        if chunk_size == 0 {
436            return Err(FxfsError::IntegrityError.into());
437        } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
438            // There should always be at least 1 offset and the first offset must always be 0.
439            Err(FxfsError::IntegrityError.into())
440        } else if !offsets.array_windows().all(|[a, b]| a < b) {
441            // The offsets must be in ascending order.
442            Err(FxfsError::IntegrityError.into())
443        } else if offsets.len() == 1 {
444            // Simple case where the blob is smaller than the chunk size so only the 0 offset is
445            // present. The 0 isn't stored so no allocation is necessary.
446            Ok(Self {
447                chunk_size,
448                small_offsets: Box::default(),
449                large_offsets: Box::default(),
450                decompressor,
451            })
452        } else if *offsets.last().unwrap() <= u32::MAX as u64 {
453            // Check the last index first since most compressed blobs are going to be smaller
454            // than 4GiB making all offsets small.
455            Ok(Self {
456                chunk_size,
457                small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
458                large_offsets: Box::default(),
459                decompressor,
460            })
461        } else {
462            // The partition point is the index of the first compressed offset that's > u32::MAX.
463            let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
464            Ok(Self {
465                chunk_size,
466                small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
467                large_offsets: offsets[partition_point..].into(),
468                decompressor,
469            })
470        }
471    }
472
473    fn compressed_range_for_uncompressed_range(
474        &self,
475        range: &Range<u64>,
476    ) -> Result<(u64, Option<NonZero<u64>>), Error> {
477        ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
478        ensure!(range.start < range.end, FxfsError::Inconsistent);
479
480        let start_chunk_index = (range.start / self.chunk_size) as usize;
481        let start_offset = self
482            .compressed_offset_for_chunk_index(start_chunk_index)
483            .ok_or(FxfsError::OutOfRange)?;
484
485        // The end of the range may not be aligned to the chunk size for the last chunk.
486        let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
487        let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
488            None => None,
489            Some(offset) => {
490                // This isn't the last chunk so the end must be aligned.
491                ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
492                // `CompressionInfo::new` validates that all of the offsets are ascending. The end
493                // of the range is greater than the start so this can never be 0.
494                Some(NonZero::new(offset).unwrap())
495            }
496        };
497        Ok((start_offset, end_offset))
498    }
499
500    fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
501        // The "0" compressed offset isn't stored so all of the indices are shifted left by 1.
502        if chunk_index == 0 {
503            Some(0)
504        } else if chunk_index - 1 < self.small_offsets.len() {
505            Some(self.small_offsets[chunk_index - 1] as u64)
506        } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
507            Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
508        } else {
509            None
510        }
511    }
512
513    /// Decompress the bytes of `src` into `dst`.
514    ///   - `src` is allowed to span multiple chunks.
515    ///   - `dst` must have the exact size of the uncompressed bytes.
516    ///   - `dst_start_offset` is the location of the uncompressed bytes within the blob and must be
517    ///     chunk aligned. This is necessary for determining the chunk boundaries in `src`.
518    fn decompress(
519        &self,
520        mut src: &[u8],
521        mut dst: &mut [u8],
522        dst_start_offset: u64,
523    ) -> Result<(), Error> {
524        ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
525
526        let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
527        let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
528        let mut start_offset = self
529            .compressed_offset_for_chunk_index(start_chunk_index)
530            .ok_or(FxfsError::Inconsistent)?;
531
532        // Decompress each chunk individually.
533        for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
534            match self.compressed_offset_for_chunk_index(chunk_index + 1) {
535                Some(end_offset) => {
536                    let (to_decompress, src_remaining) = src
537                        .split_at_checked((end_offset - start_offset) as usize)
538                        .ok_or(FxfsError::Inconsistent)?;
539                    let (to_decompress_into, dst_remaining) = dst
540                        .split_at_mut_checked(self.chunk_size as usize)
541                        .ok_or(FxfsError::Inconsistent)?;
542
543                    let decompressed_bytes = self.decompressor.decompress_into(
544                        to_decompress,
545                        to_decompress_into,
546                        chunk_index,
547                    )?;
548                    ensure!(
549                        decompressed_bytes == to_decompress_into.len(),
550                        FxfsError::Inconsistent
551                    );
552                    src = src_remaining;
553                    dst = dst_remaining;
554                    start_offset = end_offset;
555                }
556                None => {
557                    let decompressed_bytes =
558                        self.decompressor.decompress_into(src, dst, chunk_index)?;
559                    ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
560                }
561            }
562        }
563
564        Ok(())
565    }
566}
567
568fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
569    let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
570    let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
571    let name = zx::Name::new(&name).unwrap();
572    vmo.set_name(&name).unwrap();
573}
574
575fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
576    if let Some(compression_info) = compression_info {
577        read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
578    } else {
579        READ_AHEAD_SIZE
580    }
581}
582
583fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
584    if chunk_size >= suggested_read_ahead_size {
585        chunk_size
586    } else {
587        round_down(suggested_read_ahead_size, chunk_size)
588    }
589}
590
591async fn record_decompression_error_crash_report(
592    compressed_buf: &[u8],
593    uncompressed_offsets: &Range<u64>,
594    compressed_offsets: &Range<u64>,
595    merkle_root: &Hash,
596) {
597    static DONE_ONCE: AtomicBool = AtomicBool::new(false);
598    if !DONE_ONCE.swap(true, Ordering::Relaxed) {
599        if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
600            let size = compressed_buf.len() as u64;
601            let vmo = zx::Vmo::create(size).unwrap();
602            vmo.write(compressed_buf, 0).unwrap();
603            if let Err(e) = proxy
604                .file_report(CrashReport {
605                    program_name: Some("fxfs".to_string()),
606                    crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
607                    is_fatal: Some(false),
608                    annotations: Some(vec![
609                        Annotation {
610                            key: "fxfs.range".to_string(),
611                            value: format!("{:?}", uncompressed_offsets),
612                        },
613                        Annotation {
614                            key: "fxfs.compressed_offsets".to_string(),
615                            value: format!("{:?}", compressed_offsets),
616                        },
617                        Annotation {
618                            key: "fxfs.merkle_root".to_string(),
619                            value: format!("{}", merkle_root),
620                        },
621                    ]),
622                    attachments: Some(vec![Attachment {
623                        key: "fxfs_compressed_data".to_string(),
624                        value: MemBuffer { vmo, size },
625                    }]),
626                    ..Default::default()
627                })
628                .await
629            {
630                error!(e:?; "Failed to file crash report");
631            } else {
632                warn!("Filed crash report for decompression error");
633            }
634        } else {
635            error!("Failed to connect to crash report service");
636        }
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
644    use crate::fuchsia::pager::PageInRange;
645    use crate::fxblob::testing::open_blob_fixture;
646    use assert_matches::assert_matches;
647    use delivery_blob::CompressionMode;
648    use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
649    use fuchsia_async as fasync;
650    use fuchsia_async::epoch::Epoch;
651    use fxfs_make_blob_image::FxBlobBuilder;
652    use storage_device::DeviceHolder;
653    use storage_device::fake_device::FakeDevice;
654
655    const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
656    const CHUNK_SIZE: usize = 32 * 1024;
657
658    #[fasync::run(10, test)]
659    async fn test_empty_blob() {
660        let fixture = new_blob_fixture().await;
661
662        let data = vec![];
663        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
664        assert_eq!(fixture.read_blob(hash).await, data);
665
666        fixture.close().await;
667    }
668
669    #[fasync::run(10, test)]
670    async fn test_large_blob() {
671        let fixture = new_blob_fixture().await;
672
673        let data = vec![3; 3_000_000];
674        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
675
676        assert_eq!(fixture.read_blob(hash).await, data);
677
678        fixture.close().await;
679    }
680
681    #[fasync::run(10, test)]
682    async fn test_large_compressed_blob() {
683        let fixture = new_blob_fixture().await;
684
685        let data = vec![3; 3_000_000];
686        let hash = fixture.write_blob(&data, CompressionMode::Always).await;
687
688        assert_eq!(fixture.read_blob(hash).await, data);
689
690        fixture.close().await;
691    }
692
693    #[fasync::run(10, test)]
694    async fn test_non_page_aligned_blob() {
695        let fixture = new_blob_fixture().await;
696
697        let page_size = zx::system_get_page_size() as usize;
698        let data = vec![0xffu8; page_size - 1];
699        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
700        assert_eq!(fixture.read_blob(hash).await, data);
701
702        {
703            let vmo = fixture.get_blob_vmo(hash).await;
704            let mut buf = vec![0x11u8; page_size];
705            vmo.read(&mut buf[..], 0).expect("vmo read failed");
706            assert_eq!(data, buf[..data.len()]);
707            // Ensure the tail is zeroed
708            assert_eq!(buf[data.len()], 0);
709        }
710
711        fixture.close().await;
712    }
713
714    #[fasync::run(10, test)]
715    async fn test_blob_invalid_contents() {
716        let fixture = new_blob_fixture().await;
717
718        let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
719        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
720        let name = format!("{}", hash);
721
722        {
723            // Overwrite the second read-ahead window.  The first window should successfully verify.
724            let handle = fixture.get_blob_handle(&name).await;
725            let mut transaction =
726                handle.new_transaction().await.expect("failed to create transaction");
727            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
728            buf.as_mut_slice().fill(0);
729            handle
730                .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
731                .await
732                .expect("txn_write failed");
733            transaction.commit().await.expect("failed to commit transaction");
734        }
735
736        {
737            let blob_vmo = fixture.get_blob_vmo(hash).await;
738            let mut buf = vec![0; BLOCK_SIZE as usize];
739            assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
740            assert_matches!(
741                blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
742                Err(zx::Status::IO_DATA_INTEGRITY)
743            );
744        }
745
746        fixture.close().await;
747    }
748
749    #[fasync::run(10, test)]
750    async fn test_lz4_blob() {
751        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
752        let blob_data = vec![0xAA; 68 * 1024];
753        let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
754        let blob = fxblob_builder
755            .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
756            .unwrap();
757        let blob_hash = blob.hash();
758        fxblob_builder.install_blob(&blob).await.unwrap();
759        let device = fxblob_builder.finalize().await.unwrap().0;
760        device.reopen(/*read_only=*/ false);
761        let fixture = open_blob_fixture(device).await;
762
763        assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
764
765        fixture.close().await;
766    }
767
768    #[fasync::run(10, test)]
769    async fn test_blob_vmos_are_immutable() {
770        let fixture = new_blob_fixture().await;
771
772        let data = vec![0xffu8; 500];
773        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
774        let blob_vmo = fixture.get_blob_vmo(hash).await;
775
776        // The VMO shouldn't be resizable.
777        assert_matches!(blob_vmo.set_size(20), Err(_));
778
779        // The VMO shouldn't be writable.
780        assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
781
782        // The VMO's content size shouldn't be modifiable.
783        assert_matches!(blob_vmo.set_stream_size(20), Err(_));
784
785        fixture.close().await;
786    }
787
788    const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
789    const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
790    const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
791
792    #[fuchsia::test]
793    fn test_compression_info_offsets_must_start_with_zero() {
794        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
795        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
796        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
797    }
798
799    #[fuchsia::test]
800    fn test_compression_info_offsets_must_be_sorted() {
801        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
802        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
803        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
804    }
805
806    #[fuchsia::test]
807    fn test_compression_info_splitting_offsets() {
808        // Single chunk blob doesn't store any offsets.
809        let compression_info =
810            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
811        assert!(compression_info.small_offsets.is_empty());
812        assert!(compression_info.large_offsets.is_empty());
813
814        // Single small offset.
815        let compression_info =
816            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
817        assert_eq!(&*compression_info.small_offsets, &[10]);
818        assert!(compression_info.large_offsets.is_empty());
819
820        // Multiple small offsets.
821        let compression_info =
822            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
823        assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
824        assert!(compression_info.large_offsets.is_empty());
825
826        // One less than the largest small offset.
827        let compression_info =
828            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
829                .unwrap();
830        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
831        assert!(compression_info.large_offsets.is_empty());
832
833        // The largest small offset.
834        let compression_info =
835            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
836        assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
837        assert!(compression_info.large_offsets.is_empty());
838
839        // The smallest large offset.
840        let compression_info =
841            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
842                .unwrap();
843        assert!(compression_info.small_offsets.is_empty());
844        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
845
846        // Multiple offsets around boundary between small and large offsets.
847        let compression_info = CompressionInfo::new(
848            COMPRESSED_BLOB_CHUNK_SIZE,
849            &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
850            ZSTD,
851        )
852        .unwrap();
853        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
854        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
855
856        // Single large offset.
857        let compression_info =
858            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
859                .unwrap();
860        assert!(compression_info.small_offsets.is_empty());
861        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
862
863        // Multiple large offsets.
864        let compression_info = CompressionInfo::new(
865            COMPRESSED_BLOB_CHUNK_SIZE,
866            &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
867            ZSTD,
868        )
869        .unwrap();
870        assert!(compression_info.small_offsets.is_empty());
871        assert_eq!(
872            &*compression_info.large_offsets,
873            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
874        );
875
876        // Small and large offsets.
877        let compression_info = CompressionInfo::new(
878            COMPRESSED_BLOB_CHUNK_SIZE,
879            &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
880            ZSTD,
881        )
882        .unwrap();
883        assert_eq!(&*compression_info.small_offsets, &[10, 20]);
884        assert_eq!(
885            &*compression_info.large_offsets,
886            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
887        );
888    }
889
890    #[fuchsia::test]
891    fn test_compression_info_compressed_range_for_uncompressed_range() {
892        fn check_compression_ranges(
893            offsets: &[u64],
894            expected_ranges: &[(u64, Option<u64>)],
895            chunk_size: u64,
896            read_ahead_size: u64,
897        ) {
898            let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
899            for (i, range) in expected_ranges.iter().enumerate() {
900                let i = i as u64;
901                let result = compression_info
902                    .compressed_range_for_uncompressed_range(
903                        &(i * read_ahead_size..(i + 1) * read_ahead_size),
904                    )
905                    .unwrap();
906                assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
907            }
908        }
909        check_compression_ranges(
910            &[0, 10, 20, 30],
911            &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
912            COMPRESSED_BLOB_CHUNK_SIZE,
913            COMPRESSED_BLOB_CHUNK_SIZE,
914        );
915        check_compression_ranges(
916            &[0, 10, 20, 30],
917            &[(0, Some(20)), (20, None)],
918            COMPRESSED_BLOB_CHUNK_SIZE,
919            COMPRESSED_BLOB_CHUNK_SIZE * 2,
920        );
921        check_compression_ranges(
922            &[0, 10, 20, 30],
923            &[(0, None)],
924            COMPRESSED_BLOB_CHUNK_SIZE,
925            COMPRESSED_BLOB_CHUNK_SIZE * 4,
926        );
927        check_compression_ranges(
928            &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
929            &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
930            COMPRESSED_BLOB_CHUNK_SIZE,
931            COMPRESSED_BLOB_CHUNK_SIZE * 4,
932        );
933        check_compression_ranges(
934            &[
935                0,
936                10,
937                20,
938                30,
939                MAX_SMALL_OFFSET + 10,
940                MAX_SMALL_OFFSET + 20,
941                MAX_SMALL_OFFSET + 30,
942                MAX_SMALL_OFFSET + 40,
943                MAX_SMALL_OFFSET + 50,
944            ],
945            &[
946                (0, Some(20)),
947                (20, Some(MAX_SMALL_OFFSET + 10)),
948                (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
949                (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
950            ],
951            COMPRESSED_BLOB_CHUNK_SIZE,
952            COMPRESSED_BLOB_CHUNK_SIZE * 2,
953        );
954    }
955
956    #[fuchsia::test]
957    fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
958        let compression_info = CompressionInfo::new(
959            COMPRESSED_BLOB_CHUNK_SIZE,
960            &[
961                0,
962                10,
963                20,
964                30,
965                MAX_SMALL_OFFSET + 10,
966                MAX_SMALL_OFFSET + 20,
967                MAX_SMALL_OFFSET + 30,
968                MAX_SMALL_OFFSET + 40,
969                MAX_SMALL_OFFSET + 50,
970            ],
971            ZSTD,
972        )
973        .unwrap();
974
975        // The start of reads must be chunk aligned.
976        assert!(
977            compression_info
978                .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
979                .is_err()
980        );
981
982        // Reading entirely past the last offset isn't allowed.
983        assert!(
984            compression_info
985                .compressed_range_for_uncompressed_range(
986                    &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
987                )
988                .is_err()
989        );
990
991        // Reading a different amount than the read-ahead size isn't allowed for middle offsets.
992        assert!(
993            compression_info
994                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
995                .is_err()
996        );
997        assert!(
998            compression_info
999                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
1000                .is_err()
1001        );
1002        assert!(
1003            compression_info
1004                .compressed_range_for_uncompressed_range(
1005                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
1006                )
1007                .is_err()
1008        );
1009        assert!(
1010            compression_info
1011                .compressed_range_for_uncompressed_range(
1012                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
1013                )
1014                .is_err()
1015        );
1016
1017        // Reading less than the read-ahead size for the last offset is allowed.
1018        assert!(
1019            compression_info
1020                .compressed_range_for_uncompressed_range(
1021                    &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
1022                )
1023                .is_ok()
1024        );
1025    }
1026
1027    #[fuchsia::test]
1028    fn test_read_ahead_size_for_chunk_size() {
1029        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
1030        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
1031        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
1032
1033        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
1034        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
1035        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
1036        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
1037
1038        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1039        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1040        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1041        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1042    }
1043
1044    fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1045        let options =
1046            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1047        let mut compressor = options.compressor();
1048        let mut uncompressed_data = Vec::with_capacity(size);
1049        {
1050            let mut run_length = 1;
1051            let mut run_value: u8 = 0;
1052            while uncompressed_data.len() < size {
1053                uncompressed_data
1054                    .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1055                run_length = (run_length + 1) % 19 + 1;
1056                run_value = (run_value + 1) % 17;
1057            }
1058        }
1059        let mut compressed_offsets = vec![0];
1060        let mut compressed_data = vec![];
1061        for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1062            let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1063            compressed_data.append(&mut compressed_chunk);
1064            compressed_offsets.push(compressed_data.len() as u64);
1065        }
1066        compressed_offsets.pop();
1067        (
1068            CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1069                .unwrap(),
1070            compressed_data,
1071            uncompressed_data,
1072        )
1073    }
1074
1075    #[fuchsia::test]
1076    fn test_compression_info_decompress_single_chunk() {
1077        let (compression_info, compressed_data, uncompressed_data) =
1078            build_compression_info(CHUNK_SIZE);
1079        let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1080
1081        compression_info
1082            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1083            .expect("failed to decompress");
1084        assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1085
1086        // Too small of destination buffer.
1087        compression_info
1088            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1089            .expect_err("decompression should fail");
1090
1091        // Too large of destination buffer.
1092        compression_info
1093            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1094            .expect_err("decompression should fail");
1095    }
1096
1097    #[fuchsia::test]
1098    fn test_compression_info_decompress_multiple_chunks() {
1099        fn slice_for_chunks<'a>(
1100            compressed_data: &'a [u8],
1101            compression_info: &CompressionInfo,
1102            chunks: Range<u64>,
1103        ) -> &'a [u8] {
1104            let (start, end) = compression_info
1105                .compressed_range_for_uncompressed_range(
1106                    &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1107                )
1108                .unwrap();
1109            let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1110            &compressed_data[start as usize..end as usize]
1111        }
1112
1113        const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1114        let (compression_info, compressed_data, uncompressed_data) =
1115            build_compression_info(BLOB_SIZE);
1116        let mut decompressed_data = vec![0u8; BLOB_SIZE];
1117
1118        // Decompress the entire blob.
1119        compression_info
1120            .decompress(&compressed_data, &mut decompressed_data, 0)
1121            .expect("failed to decompress");
1122        assert_eq!(uncompressed_data, decompressed_data);
1123
1124        // Decompress just the whole chunks.
1125        compression_info
1126            .decompress(
1127                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1128                &mut decompressed_data[0..CHUNK_SIZE * 4],
1129                0,
1130            )
1131            .expect("failed to decompress");
1132        assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1133
1134        // Too small of destination buffer for whole chunks.
1135        compression_info
1136            .decompress(
1137                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1138                &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1139                0,
1140            )
1141            .expect_err("decompression should fail");
1142
1143        // Too large of destination buffer for whole chunks.
1144        compression_info
1145            .decompress(
1146                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1147                &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1148                0,
1149            )
1150            .expect_err("decompression should fail");
1151
1152        // Decompress just the tail.
1153        let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1154        compression_info
1155            .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1156            .expect("failed to decompress");
1157        assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1158
1159        // Too small of destination buffer for the tail.
1160        compression_info
1161            .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1162            .expect_err("decompression should fail");
1163
1164        // Too large of destination buffer for the tail.
1165        compression_info
1166            .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1167            .expect_err("decompression should fail");
1168    }
1169
1170    #[fasync::run(10, test)]
1171    async fn test_refault_metric() {
1172        let fixture = new_blob_fixture().await;
1173        {
1174            let volume = fixture.volume().volume().clone();
1175            const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1176            let data = vec![0xffu8; FILE_SIZE as usize];
1177            let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1178
1179            let blob = fixture.get_blob(hash).await.unwrap();
1180            assert_eq!(blob.chunks_supplied.len(), 4);
1181            // Nothing has been read yet.
1182            assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1183
1184            blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1185
1186            assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1187
1188            blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1189            assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1190
1191            // We have loaded pages, but only once each.
1192            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1193
1194            // Re-read some pages.
1195
1196            // We can't evict pages from the VMO to get the kernel to resupply them but we can call
1197            // page_in directly and wait for the counters to change.
1198            blob.clone().page_in(PageInRange::new(
1199                FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1200                blob.clone(),
1201                Epoch::global().guard(),
1202            ));
1203            Epoch::global().barrier().await;
1204
1205            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1206        }
1207
1208        fixture.close().await;
1209    }
1210}