Skip to main content

fxfs_platform/fuchsia/fxblob/
blob.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`FxBlob`] node type used to represent an immutable blob persisted to
6//! disk which can be read back.
7
8use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13    MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{BASE_READ_AHEAD_SIZE, FxVolume};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38pub const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
39
40// When the top bit of the open count is set, it means the file has been deleted and when the count
41// drops to zero, it will be tombstoned.  Once it has dropped to zero, it cannot be opened again
42// (assertions will fire).
43const PURGED: usize = 1 << (usize::BITS - 1);
44
45/// Represents an immutable blob stored on Fxfs with associated an merkle tree.
46#[derive(ToWeakNode)]
47pub struct FxBlob {
48    handle: DataObjectHandle<FxVolume>,
49    vmo: zx::Vmo,
50    open_count: AtomicUsize,
51    merkle_root: Hash,
52    merkle_verifier: ReadSizedMerkleVerifier,
53    compression_info: Option<CompressionInfo>,
54    uncompressed_size: u64, // always set.
55    pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
56    chunks_supplied: AtomicBitVec,
57}
58
59impl FxBlob {
60    pub fn new(
61        handle: DataObjectHandle<FxVolume>,
62        merkle_root: Hash,
63        merkle_verifier: MerkleVerifier,
64        compression_info: Option<CompressionInfo>,
65        uncompressed_size: u64,
66    ) -> Result<Arc<Self>, Error> {
67        let min_chunk_size = min_chunk_size(&compression_info);
68        let merkle_verifier =
69            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
70        let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
71
72        Ok(Arc::new_cyclic(|weak| {
73            let (vmo, pager_packet_receiver_registration) = handle
74                .owner()
75                .pager()
76                .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
77                .unwrap();
78            set_vmo_name(&vmo, &merkle_root);
79            Self {
80                handle,
81                vmo,
82                open_count: AtomicUsize::new(0),
83                merkle_root,
84                merkle_verifier,
85                compression_info,
86                uncompressed_size,
87                pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
88                chunks_supplied,
89            }
90        }))
91    }
92
93    /// Returns the new blob.
94    pub fn overwrite_me(
95        self: &Arc<Self>,
96        handle: DataObjectHandle<FxVolume>,
97        merkle_verifier: MerkleVerifier,
98        compression_info: Option<CompressionInfo>,
99    ) -> Arc<Self> {
100        let min_chunk_size = min_chunk_size(&compression_info);
101        let merkle_verifier =
102            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
103                .expect("The chunk size should have been validated by the delivery blob parser");
104        // The chunk size may have changed between the old blob and the new blob. Preserving the
105        // chunks supplied bits isn't important.
106        let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
107        let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
108
109        let new_blob = Arc::new(Self {
110            handle,
111            vmo,
112            open_count: AtomicUsize::new(0),
113            merkle_root: self.merkle_root,
114            merkle_verifier,
115            compression_info,
116            uncompressed_size: self.uncompressed_size,
117            pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
118            chunks_supplied,
119        });
120
121        // We have tests that rely on the cache being purged and there are races where the
122        // `FxBlob::drop` isn't called early enough, which can make the test flaky.
123        self.handle.owner().cache().remove(self.as_ref());
124
125        // Lock must be held until the open counts is incremented to prevent concurrent handling of
126        // zero children signals.
127        let receiver_lock =
128            self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
129        if receiver_lock.is_strong() {
130            // If there was a strong moved between them, then the counts exchange as well. It is
131            // only important that the increment happen under the lock as it may handle the next
132            // zero children signal, no new requests can now go to the old blob, but to safely
133            // ensure that all existing requests finish, we will defer the open count decrement.
134            new_blob.open_count_add_one();
135            let old_blob = self.clone();
136            Epoch::global().defer(move || old_blob.open_count_sub_one());
137        }
138        new_blob
139    }
140
141    pub fn root(&self) -> Hash {
142        self.merkle_root
143    }
144
145    fn record_page_fault_metric(&self, range: &Range<u64>) {
146        let chunk_size: u64 = min_chunk_size(&self.compression_info);
147
148        let first_chunk = range.start / chunk_size;
149        // The end of the range may not be chunk aligned if it's the last chunk.
150        let last_chunk = range.end.div_ceil(chunk_size);
151
152        let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
153
154        if supplied_count > 0 {
155            self.handle
156                .owner()
157                .blob_resupplied_count()
158                .increment(supplied_count, Ordering::Relaxed);
159        }
160    }
161}
162
163impl Drop for FxBlob {
164    fn drop(&mut self) {
165        let volume = self.handle.owner();
166        volume.cache().remove(self);
167    }
168}
169
170impl OpenedNode<FxBlob> {
171    /// Creates a read-only child VMO for this blob backed by the pager. The blob cannot be purged
172    /// until all child VMOs have been destroyed.
173    ///
174    /// *WARNING*: We need to ensure the open count is non-zero before invoking this function, so
175    /// it is only implemented for [`OpenedNode<FxBlob>`]. This prevents the blob from being purged
176    /// before we get a chance to register it with the pager for [`zx::Signals::VMO_ZERO_CHILDREN`].
177    pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
178        let blob = self.0.as_ref();
179        let child_vmo = blob.vmo.create_child(
180            zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
181            0,
182            0,
183        )?;
184        if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
185            // Take an open count so that we keep this object alive if it is otherwise closed. This
186            // is only valid since we know the current open count is non-zero, otherwise we might
187            // increment the open count after the blob has been purged.
188            blob.open_count_add_one();
189        }
190        Ok(child_vmo)
191    }
192}
193
194impl FxNode for FxBlob {
195    fn object_id(&self) -> u64 {
196        self.handle.object_id()
197    }
198
199    fn parent(&self) -> Option<Arc<FxDirectory>> {
200        unreachable!(); // Add a parent back-reference if needed.
201    }
202
203    fn set_parent(&self, _parent: Arc<FxDirectory>) {
204        // NOP
205    }
206
207    fn open_count_add_one(&self) {
208        let old = self.open_count.fetch_add(1, Ordering::Relaxed);
209        assert!(old != PURGED && old != PURGED - 1);
210    }
211
212    fn open_count_sub_one(self: Arc<Self>) {
213        let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
214        assert!(old & !PURGED > 0);
215        if old == PURGED + 1 {
216            let store = self.handle.store();
217            store
218                .filesystem()
219                .graveyard()
220                .queue_tombstone_object(store.store_object_id(), self.object_id());
221        }
222    }
223
224    fn object_descriptor(&self) -> ObjectDescriptor {
225        ObjectDescriptor::File
226    }
227
228    fn terminate(&self) {
229        self.pager_packet_receiver_registration.stop_watching_for_zero_children();
230    }
231
232    fn mark_to_be_purged(&self) {
233        let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
234        assert!(old & PURGED == 0);
235        if old == 0 {
236            let store = self.handle.store();
237            store
238                .filesystem()
239                .graveyard()
240                .queue_tombstone_object(store.store_object_id(), self.object_id());
241        }
242    }
243}
244
245impl PagerBacked for FxBlob {
246    fn pager(&self) -> &crate::pager::Pager {
247        self.handle.owner().pager()
248    }
249
250    fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
251        &self.pager_packet_receiver_registration
252    }
253
254    fn vmo(&self) -> &zx::Vmo {
255        &self.vmo
256    }
257
258    fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
259        let read_ahead_size = self.handle.owner().read_ahead_size();
260        let read_ahead_size = if let Some(compression_info) = &self.compression_info {
261            read_ahead_size_for_chunk_size(compression_info.chunk_size, read_ahead_size)
262        } else {
263            read_ahead_size
264        };
265        // Delegate to the generic page handling code.
266        default_page_in(self, range, read_ahead_size)
267    }
268
269    fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
270        unreachable!();
271    }
272
273    fn on_zero_children(self: Arc<Self>) {
274        self.open_count_sub_one();
275    }
276
277    fn byte_size(&self) -> u64 {
278        self.uncompressed_size
279    }
280
281    async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
282        self.record_page_fault_metric(&range);
283
284        let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
285        let read = match &self.compression_info {
286            None => self.handle.read(range.start, buffer.as_mut()).await?,
287            Some(compression_info) => {
288                let compressed_offsets =
289                    match compression_info.compressed_range_for_uncompressed_range(&range)? {
290                        (start, None) => start..self.handle.get_size(),
291                        (start, Some(end)) => start..end.get(),
292                    };
293                let bs = self.handle.block_size();
294                let aligned = round_down(compressed_offsets.start, bs)
295                    ..round_up(compressed_offsets.end, bs).unwrap();
296                let mut compressed_buf =
297                    self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
298
299                let mut decompression_errors = 0;
300                let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
301                loop {
302                    let (read, _) = try_join!(
303                        self.handle.read(aligned.start, compressed_buf.as_mut()),
304                        async {
305                            buffer
306                                .allocator()
307                                .buffer_source()
308                                .commit_range(buffer.range())
309                                .map_err(|e| e.into())
310                        }
311                    )
312                    .with_context(|| {
313                        format!(
314                            "Failed to read compressed range {:?}, len {}",
315                            aligned,
316                            self.handle.get_size()
317                        )
318                    })?;
319                    let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
320                        ..(compressed_offsets.end - aligned.start) as usize;
321                    ensure!(
322                        read >= compressed_buf_range.end - compressed_buf_range.start,
323                        anyhow!(FxfsError::Inconsistent).context(format!(
324                            "Unexpected EOF, read {}, but expected {}",
325                            read,
326                            compressed_buf_range.end - compressed_buf_range.start,
327                        ))
328                    );
329
330                    let buf = buffer.as_mut_slice();
331                    let decompression_result = {
332                        fxfs_trace::duration!("blob-decompress", "len" => len);
333                        compression_info.decompress(
334                            &compressed_buf.as_slice()[compressed_buf_range],
335                            &mut buf[..len],
336                            range.start,
337                        )
338                    };
339                    match decompression_result {
340                        Ok(()) => break,
341                        Err(error) => {
342                            record_decompression_error_crash_report(
343                                compressed_buf.as_slice(),
344                                &range,
345                                &compressed_offsets,
346                                &self.merkle_root,
347                            )
348                            .await;
349                            decompression_errors += 1;
350                            if decompression_errors == 2 {
351                                bail!(
352                                    anyhow!(FxfsError::IntegrityError)
353                                        .context(format!("Decompression error: {error:?}"))
354                                );
355                            } else {
356                                warn!(error:?; "Decompression error; retrying");
357                            }
358                        }
359                    }
360                } // loop
361                if decompression_errors > 0 {
362                    info!("Read succeeded on second attempt");
363                }
364                len
365            }
366        };
367        {
368            // TODO(https://fxbug.dev/42073035): This should be offloaded to the kernel at which
369            // point we can delete this.
370            fxfs_trace::duration!("blob-verify", "len" => read);
371            self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
372        }
373        // Zero the tail.
374        buffer.as_mut_slice()[read..].fill(0);
375        Ok(buffer)
376    }
377}
378
379pub struct CompressionInfo {
380    chunk_size: u64,
381    // The chunked compression format stores 0 as the first offset but it's not stored here. Not
382    // storing the 0 avoids the allocation for blobs smaller than the chunk size.
383    small_offsets: Box<[u32]>,
384    large_offsets: Box<[u64]>,
385    decompressor: ThreadLocalDecompressor,
386}
387
388impl CompressionInfo {
389    pub fn new(
390        chunk_size: u64,
391        offsets: &[u64],
392        compression_algorithm: CompressionAlgorithm,
393    ) -> Result<Self, Error> {
394        let decompressor = compression_algorithm.thread_local_decompressor();
395        if chunk_size == 0 {
396            return Err(FxfsError::IntegrityError.into());
397        } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
398            // There should always be at least 1 offset and the first offset must always be 0.
399            Err(FxfsError::IntegrityError.into())
400        } else if !offsets.windows(2).all(|window| window[0] < window[1]) {
401            // The offsets must be in ascending order.
402            Err(FxfsError::IntegrityError.into())
403        } else if offsets.len() == 1 {
404            // Simple case where the blob is smaller than the chunk size so only the 0 offset is
405            // present. The 0 isn't stored so no allocation is necessary.
406            Ok(Self {
407                chunk_size,
408                small_offsets: Box::default(),
409                large_offsets: Box::default(),
410                decompressor,
411            })
412        } else if *offsets.last().unwrap() <= u32::MAX as u64 {
413            // Check the last index first since most compressed blobs are going to be smaller
414            // than 4GiB making all offsets small.
415            Ok(Self {
416                chunk_size,
417                small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
418                large_offsets: Box::default(),
419                decompressor,
420            })
421        } else {
422            // The partition point is the index of the first compressed offset that's > u32::MAX.
423            let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
424            Ok(Self {
425                chunk_size,
426                small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
427                large_offsets: offsets[partition_point..].into(),
428                decompressor,
429            })
430        }
431    }
432
433    fn compressed_range_for_uncompressed_range(
434        &self,
435        range: &Range<u64>,
436    ) -> Result<(u64, Option<NonZero<u64>>), Error> {
437        ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
438        ensure!(range.start < range.end, FxfsError::Inconsistent);
439
440        let start_chunk_index = (range.start / self.chunk_size) as usize;
441        let start_offset = self
442            .compressed_offset_for_chunk_index(start_chunk_index)
443            .ok_or(FxfsError::OutOfRange)?;
444
445        // The end of the range may not be aligned to the chunk size for the last chunk.
446        let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
447        let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
448            None => None,
449            Some(offset) => {
450                // This isn't the last chunk so the end must be aligned.
451                ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
452                // `CompressionInfo::new` validates that all of the offsets are ascending. The end
453                // of the range is greater than the start so this can never be 0.
454                Some(NonZero::new(offset).unwrap())
455            }
456        };
457        Ok((start_offset, end_offset))
458    }
459
460    fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
461        // The "0" compressed offset isn't stored so all of the indices are shifted left by 1.
462        if chunk_index == 0 {
463            Some(0)
464        } else if chunk_index - 1 < self.small_offsets.len() {
465            Some(self.small_offsets[chunk_index - 1] as u64)
466        } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
467            Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
468        } else {
469            None
470        }
471    }
472
473    /// Decompress the bytes of `src` into `dst`.
474    ///   - `src` is allowed to span multiple chunks.
475    ///   - `dst` must have the exact size of the uncompressed bytes.
476    ///   - `dst_start_offset` is the location of the uncompressed bytes within the blob and must be
477    ///     chunk aligned. This is necessary for determining the chunk boundaries in `src`.
478    fn decompress(
479        &self,
480        mut src: &[u8],
481        mut dst: &mut [u8],
482        dst_start_offset: u64,
483    ) -> Result<(), Error> {
484        ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
485
486        let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
487        let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
488        let mut start_offset = self
489            .compressed_offset_for_chunk_index(start_chunk_index)
490            .ok_or(FxfsError::Inconsistent)?;
491
492        // Decompress each chunk individually.
493        for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
494            match self.compressed_offset_for_chunk_index(chunk_index + 1) {
495                Some(end_offset) => {
496                    let (to_decompress, src_remaining) = src
497                        .split_at_checked((end_offset - start_offset) as usize)
498                        .ok_or(FxfsError::Inconsistent)?;
499                    let (to_decompress_into, dst_remaining) = dst
500                        .split_at_mut_checked(self.chunk_size as usize)
501                        .ok_or(FxfsError::Inconsistent)?;
502
503                    let decompressed_bytes = self.decompressor.decompress_into(
504                        to_decompress,
505                        to_decompress_into,
506                        chunk_index,
507                    )?;
508                    ensure!(
509                        decompressed_bytes == to_decompress_into.len(),
510                        FxfsError::Inconsistent
511                    );
512                    src = src_remaining;
513                    dst = dst_remaining;
514                    start_offset = end_offset;
515                }
516                None => {
517                    let decompressed_bytes =
518                        self.decompressor.decompress_into(src, dst, chunk_index)?;
519                    ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
520                }
521            }
522        }
523
524        Ok(())
525    }
526}
527
528fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
529    let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
530    let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
531    let name = zx::Name::new(&name).unwrap();
532    vmo.set_name(&name).unwrap();
533}
534
535fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
536    if let Some(compression_info) = compression_info {
537        read_ahead_size_for_chunk_size(compression_info.chunk_size, BASE_READ_AHEAD_SIZE)
538    } else {
539        BASE_READ_AHEAD_SIZE
540    }
541}
542
543fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
544    if chunk_size >= suggested_read_ahead_size {
545        chunk_size
546    } else {
547        round_down(suggested_read_ahead_size, chunk_size)
548    }
549}
550
551async fn record_decompression_error_crash_report(
552    compressed_buf: &[u8],
553    uncompressed_offsets: &Range<u64>,
554    compressed_offsets: &Range<u64>,
555    merkle_root: &Hash,
556) {
557    static DONE_ONCE: AtomicBool = AtomicBool::new(false);
558    if !DONE_ONCE.swap(true, Ordering::Relaxed) {
559        if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
560            let size = compressed_buf.len() as u64;
561            let vmo = zx::Vmo::create(size).unwrap();
562            vmo.write(compressed_buf, 0).unwrap();
563            if let Err(e) = proxy
564                .file_report(CrashReport {
565                    program_name: Some("fxfs".to_string()),
566                    crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
567                    is_fatal: Some(false),
568                    annotations: Some(vec![
569                        Annotation {
570                            key: "fxfs.range".to_string(),
571                            value: format!("{:?}", uncompressed_offsets),
572                        },
573                        Annotation {
574                            key: "fxfs.compressed_offsets".to_string(),
575                            value: format!("{:?}", compressed_offsets),
576                        },
577                        Annotation {
578                            key: "fxfs.merkle_root".to_string(),
579                            value: format!("{}", merkle_root),
580                        },
581                    ]),
582                    attachments: Some(vec![Attachment {
583                        key: "fxfs_compressed_data".to_string(),
584                        value: Buffer { vmo, size },
585                    }]),
586                    ..Default::default()
587                })
588                .await
589            {
590                error!(e:?; "Failed to file crash report");
591            } else {
592                warn!("Filed crash report for decompression error");
593            }
594        } else {
595            error!("Failed to connect to crash report service");
596        }
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
604    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
605    use crate::fuchsia::pager::PageInRange;
606    use crate::fuchsia::volume::{MAX_READ_AHEAD_SIZE, MemoryPressureConfig};
607    use crate::fxblob::testing::open_blob_fixture;
608    use assert_matches::assert_matches;
609    use delivery_blob::CompressionMode;
610    use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
611    use fuchsia_async as fasync;
612    use fuchsia_async::epoch::Epoch;
613    use fxfs_make_blob_image::FxBlobBuilder;
614    use std::time::Duration;
615    use storage_device::DeviceHolder;
616    use storage_device::fake_device::FakeDevice;
617
618    const CHUNK_SIZE: usize = 32 * 1024;
619
620    #[fasync::run(10, test)]
621    async fn test_empty_blob() {
622        let fixture = new_blob_fixture().await;
623
624        let data = vec![];
625        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
626        assert_eq!(fixture.read_blob(hash).await, data);
627
628        fixture.close().await;
629    }
630
631    #[fasync::run(10, test)]
632    async fn test_large_blob() {
633        let fixture = new_blob_fixture().await;
634
635        let data = vec![3; 3_000_000];
636        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
637
638        assert_eq!(fixture.read_blob(hash).await, data);
639
640        fixture.close().await;
641    }
642
643    #[fasync::run(10, test)]
644    async fn test_large_compressed_blob() {
645        let fixture = new_blob_fixture().await;
646
647        let data = vec![3; 3_000_000];
648        let hash = fixture.write_blob(&data, CompressionMode::Always).await;
649
650        assert_eq!(fixture.read_blob(hash).await, data);
651
652        fixture.close().await;
653    }
654
655    #[fasync::run(10, test)]
656    async fn test_non_page_aligned_blob() {
657        let fixture = new_blob_fixture().await;
658
659        let page_size = zx::system_get_page_size() as usize;
660        let data = vec![0xffu8; page_size - 1];
661        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
662        assert_eq!(fixture.read_blob(hash).await, data);
663
664        {
665            let vmo = fixture.get_blob_vmo(hash).await;
666            let mut buf = vec![0x11u8; page_size];
667            vmo.read(&mut buf[..], 0).expect("vmo read failed");
668            assert_eq!(data, buf[..data.len()]);
669            // Ensure the tail is zeroed
670            assert_eq!(buf[data.len()], 0);
671        }
672
673        fixture.close().await;
674    }
675
676    #[fasync::run(10, test)]
677    async fn test_blob_invalid_contents() {
678        let fixture = new_blob_fixture().await;
679
680        let data = vec![0xffu8; (MAX_READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
681        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
682        let name = format!("{}", hash);
683
684        {
685            // Overwrite the second read-ahead window.  The first window should successfully verify.
686            let handle = fixture.get_blob_handle(&name).await;
687            let mut transaction =
688                handle.new_transaction().await.expect("failed to create transaction");
689            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
690            buf.as_mut_slice().fill(0);
691            handle
692                .txn_write(&mut transaction, MAX_READ_AHEAD_SIZE, buf.as_ref())
693                .await
694                .expect("txn_write failed");
695            transaction.commit().await.expect("failed to commit transaction");
696        }
697
698        {
699            let blob_vmo = fixture.get_blob_vmo(hash).await;
700            let mut buf = vec![0; BLOCK_SIZE as usize];
701            assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
702            assert_matches!(
703                blob_vmo.read(&mut buf[..], MAX_READ_AHEAD_SIZE),
704                Err(zx::Status::IO_DATA_INTEGRITY)
705            );
706        }
707
708        fixture.close().await;
709    }
710
711    #[fasync::run(10, test)]
712    async fn test_lz4_blob() {
713        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
714        let blob_data = vec![0xAA; 68 * 1024];
715        let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
716        let blob = fxblob_builder
717            .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
718            .unwrap();
719        let blob_hash = blob.hash();
720        fxblob_builder.install_blob(&blob).await.unwrap();
721        let device = fxblob_builder.finalize().await.unwrap().0;
722        device.reopen(/*read_only=*/ false);
723        let fixture = open_blob_fixture(device).await;
724
725        assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
726
727        fixture.close().await;
728    }
729
730    #[fasync::run(10, test)]
731    async fn test_blob_vmos_are_immutable() {
732        let fixture = new_blob_fixture().await;
733
734        let data = vec![0xffu8; 500];
735        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
736        let blob_vmo = fixture.get_blob_vmo(hash).await;
737
738        // The VMO shouldn't be resizable.
739        assert_matches!(blob_vmo.set_size(20), Err(_));
740
741        // The VMO shouldn't be writable.
742        assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
743
744        // The VMO's content size shouldn't be modifiable.
745        assert_matches!(blob_vmo.set_stream_size(20), Err(_));
746
747        fixture.close().await;
748    }
749
750    const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
751    const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
752    const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
753
754    #[fuchsia::test]
755    fn test_compression_info_offsets_must_start_with_zero() {
756        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
757        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
758        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
759    }
760
761    #[fuchsia::test]
762    fn test_compression_info_offsets_must_be_sorted() {
763        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
764        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
765        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
766    }
767
768    #[fuchsia::test]
769    fn test_compression_info_splitting_offsets() {
770        // Single chunk blob doesn't store any offsets.
771        let compression_info =
772            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
773        assert!(compression_info.small_offsets.is_empty());
774        assert!(compression_info.large_offsets.is_empty());
775
776        // Single small offset.
777        let compression_info =
778            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
779        assert_eq!(&*compression_info.small_offsets, &[10]);
780        assert!(compression_info.large_offsets.is_empty());
781
782        // Multiple small offsets.
783        let compression_info =
784            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
785        assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
786        assert!(compression_info.large_offsets.is_empty());
787
788        // One less than the largest small offset.
789        let compression_info =
790            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
791                .unwrap();
792        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
793        assert!(compression_info.large_offsets.is_empty());
794
795        // The largest small offset.
796        let compression_info =
797            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
798        assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
799        assert!(compression_info.large_offsets.is_empty());
800
801        // The smallest large offset.
802        let compression_info =
803            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
804                .unwrap();
805        assert!(compression_info.small_offsets.is_empty());
806        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
807
808        // Multiple offsets around boundary between small and large offsets.
809        let compression_info = CompressionInfo::new(
810            COMPRESSED_BLOB_CHUNK_SIZE,
811            &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
812            ZSTD,
813        )
814        .unwrap();
815        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
816        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
817
818        // Single large offset.
819        let compression_info =
820            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
821                .unwrap();
822        assert!(compression_info.small_offsets.is_empty());
823        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
824
825        // Multiple large offsets.
826        let compression_info = CompressionInfo::new(
827            COMPRESSED_BLOB_CHUNK_SIZE,
828            &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
829            ZSTD,
830        )
831        .unwrap();
832        assert!(compression_info.small_offsets.is_empty());
833        assert_eq!(
834            &*compression_info.large_offsets,
835            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
836        );
837
838        // Small and large offsets.
839        let compression_info = CompressionInfo::new(
840            COMPRESSED_BLOB_CHUNK_SIZE,
841            &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
842            ZSTD,
843        )
844        .unwrap();
845        assert_eq!(&*compression_info.small_offsets, &[10, 20]);
846        assert_eq!(
847            &*compression_info.large_offsets,
848            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
849        );
850    }
851
852    #[fuchsia::test]
853    fn test_compression_info_compressed_range_for_uncompressed_range() {
854        fn check_compression_ranges(
855            offsets: &[u64],
856            expected_ranges: &[(u64, Option<u64>)],
857            chunk_size: u64,
858            read_ahead_size: u64,
859        ) {
860            let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
861            for (i, range) in expected_ranges.iter().enumerate() {
862                let i = i as u64;
863                let result = compression_info
864                    .compressed_range_for_uncompressed_range(
865                        &(i * read_ahead_size..(i + 1) * read_ahead_size),
866                    )
867                    .unwrap();
868                assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
869            }
870        }
871        check_compression_ranges(
872            &[0, 10, 20, 30],
873            &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
874            COMPRESSED_BLOB_CHUNK_SIZE,
875            COMPRESSED_BLOB_CHUNK_SIZE,
876        );
877        check_compression_ranges(
878            &[0, 10, 20, 30],
879            &[(0, Some(20)), (20, None)],
880            COMPRESSED_BLOB_CHUNK_SIZE,
881            COMPRESSED_BLOB_CHUNK_SIZE * 2,
882        );
883        check_compression_ranges(
884            &[0, 10, 20, 30],
885            &[(0, None)],
886            COMPRESSED_BLOB_CHUNK_SIZE,
887            COMPRESSED_BLOB_CHUNK_SIZE * 4,
888        );
889        check_compression_ranges(
890            &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
891            &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
892            COMPRESSED_BLOB_CHUNK_SIZE,
893            COMPRESSED_BLOB_CHUNK_SIZE * 4,
894        );
895        check_compression_ranges(
896            &[
897                0,
898                10,
899                20,
900                30,
901                MAX_SMALL_OFFSET + 10,
902                MAX_SMALL_OFFSET + 20,
903                MAX_SMALL_OFFSET + 30,
904                MAX_SMALL_OFFSET + 40,
905                MAX_SMALL_OFFSET + 50,
906            ],
907            &[
908                (0, Some(20)),
909                (20, Some(MAX_SMALL_OFFSET + 10)),
910                (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
911                (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
912            ],
913            COMPRESSED_BLOB_CHUNK_SIZE,
914            COMPRESSED_BLOB_CHUNK_SIZE * 2,
915        );
916    }
917
918    #[fuchsia::test]
919    fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
920        let compression_info = CompressionInfo::new(
921            COMPRESSED_BLOB_CHUNK_SIZE,
922            &[
923                0,
924                10,
925                20,
926                30,
927                MAX_SMALL_OFFSET + 10,
928                MAX_SMALL_OFFSET + 20,
929                MAX_SMALL_OFFSET + 30,
930                MAX_SMALL_OFFSET + 40,
931                MAX_SMALL_OFFSET + 50,
932            ],
933            ZSTD,
934        )
935        .unwrap();
936
937        // The start of reads must be chunk aligned.
938        assert!(
939            compression_info
940                .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
941                .is_err()
942        );
943
944        // Reading entirely past the last offset isn't allowed.
945        assert!(
946            compression_info
947                .compressed_range_for_uncompressed_range(
948                    &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
949                )
950                .is_err()
951        );
952
953        // Reading a different amount than the read-ahead size isn't allowed for middle offsets.
954        assert!(
955            compression_info
956                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
957                .is_err()
958        );
959        assert!(
960            compression_info
961                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
962                .is_err()
963        );
964        assert!(
965            compression_info
966                .compressed_range_for_uncompressed_range(
967                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
968                )
969                .is_err()
970        );
971        assert!(
972            compression_info
973                .compressed_range_for_uncompressed_range(
974                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
975                )
976                .is_err()
977        );
978
979        // Reading less than the read-ahead size for the last offset is allowed.
980        assert!(
981            compression_info
982                .compressed_range_for_uncompressed_range(
983                    &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
984                )
985                .is_ok()
986        );
987    }
988
989    #[fuchsia::test]
990    fn test_read_ahead_size_for_chunk_size() {
991        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
992        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
993        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
994
995        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
996        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
997        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
998        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
999
1000        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
1001        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
1002        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
1003        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1004    }
1005
1006    fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1007        let options =
1008            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1009        let mut compressor = options.compressor();
1010        let mut uncompressed_data = Vec::with_capacity(size);
1011        {
1012            let mut run_length = 1;
1013            let mut run_value: u8 = 0;
1014            while uncompressed_data.len() < size {
1015                uncompressed_data
1016                    .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1017                run_length = (run_length + 1) % 19 + 1;
1018                run_value = (run_value + 1) % 17;
1019            }
1020        }
1021        let mut compressed_offsets = vec![0];
1022        let mut compressed_data = vec![];
1023        for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1024            let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1025            compressed_data.append(&mut compressed_chunk);
1026            compressed_offsets.push(compressed_data.len() as u64);
1027        }
1028        compressed_offsets.pop();
1029        (
1030            CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1031                .unwrap(),
1032            compressed_data,
1033            uncompressed_data,
1034        )
1035    }
1036
1037    #[fuchsia::test]
1038    fn test_compression_info_decompress_single_chunk() {
1039        let (compression_info, compressed_data, uncompressed_data) =
1040            build_compression_info(CHUNK_SIZE);
1041        let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1042
1043        compression_info
1044            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1045            .expect("failed to decompress");
1046        assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1047
1048        // Too small of destination buffer.
1049        compression_info
1050            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1051            .expect_err("decompression should fail");
1052
1053        // Too large of destination buffer.
1054        compression_info
1055            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1056            .expect_err("decompression should fail");
1057    }
1058
1059    #[fuchsia::test]
1060    fn test_compression_info_decompress_multiple_chunks() {
1061        fn slice_for_chunks<'a>(
1062            compressed_data: &'a [u8],
1063            compression_info: &CompressionInfo,
1064            chunks: Range<u64>,
1065        ) -> &'a [u8] {
1066            let (start, end) = compression_info
1067                .compressed_range_for_uncompressed_range(
1068                    &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1069                )
1070                .unwrap();
1071            let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1072            &compressed_data[start as usize..end as usize]
1073        }
1074
1075        const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1076        let (compression_info, compressed_data, uncompressed_data) =
1077            build_compression_info(BLOB_SIZE);
1078        let mut decompressed_data = vec![0u8; BLOB_SIZE];
1079
1080        // Decompress the entire blob.
1081        compression_info
1082            .decompress(&compressed_data, &mut decompressed_data, 0)
1083            .expect("failed to decompress");
1084        assert_eq!(uncompressed_data, decompressed_data);
1085
1086        // Decompress just the whole chunks.
1087        compression_info
1088            .decompress(
1089                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1090                &mut decompressed_data[0..CHUNK_SIZE * 4],
1091                0,
1092            )
1093            .expect("failed to decompress");
1094        assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1095
1096        // Too small of destination buffer for whole chunks.
1097        compression_info
1098            .decompress(
1099                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1100                &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1101                0,
1102            )
1103            .expect_err("decompression should fail");
1104
1105        // Too large of destination buffer for whole chunks.
1106        compression_info
1107            .decompress(
1108                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1109                &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1110                0,
1111            )
1112            .expect_err("decompression should fail");
1113
1114        // Decompress just the tail.
1115        let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1116        compression_info
1117            .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1118            .expect("failed to decompress");
1119        assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1120
1121        // Too small of destination buffer for the tail.
1122        compression_info
1123            .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1124            .expect_err("decompression should fail");
1125
1126        // Too large of destination buffer for the tail.
1127        compression_info
1128            .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1129            .expect_err("decompression should fail");
1130    }
1131
1132    #[fasync::run(10, test)]
1133    async fn test_refault_metric() {
1134        async fn wait(condition: impl Fn() -> bool) -> bool {
1135            let mut wait_count = 0;
1136            while !condition() {
1137                fasync::Timer::new(Duration::from_millis(20)).await;
1138                wait_count += 1;
1139                if wait_count > 100 {
1140                    return false;
1141                }
1142            }
1143            return true;
1144        }
1145
1146        let fixture = new_blob_fixture().await;
1147
1148        {
1149            let volume = fixture.volume().volume().clone();
1150            volume.start_background_task(
1151                MemoryPressureConfig::default(),
1152                fixture.volumes_directory().memory_pressure_monitor(),
1153            );
1154
1155            let data = vec![0xffu8; 252 * 1024];
1156            let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1157
1158            let blob = fixture.get_blob(hash).await.unwrap();
1159            assert_eq!(blob.chunks_supplied.len(), 8);
1160            // Nothing has been read yet.
1161            assert_eq!(
1162                &blob.chunks_supplied.get(),
1163                &[false, false, false, false, false, false, false, false]
1164            );
1165
1166            blob.vmo.read_to_vec::<u8>(32 * 1024, 4096).unwrap();
1167
1168            assert_eq!(
1169                &blob.chunks_supplied.get(),
1170                &[true, true, true, true, false, false, false, false]
1171            );
1172
1173            fixture
1174                .memory_pressure_proxy()
1175                .on_level_changed(MemoryPressureLevel::Critical)
1176                .await
1177                .expect("Failed to send memory pressure level change");
1178
1179            assert!(
1180                wait(|| volume.read_ahead_size() == BASE_READ_AHEAD_SIZE).await,
1181                "read-ahead size didn't change with memory pressure change"
1182            );
1183
1184            blob.vmo.read_to_vec::<u8>(164 * 1024, 4096).unwrap();
1185            assert_eq!(
1186                &blob.chunks_supplied.get(),
1187                &[true, true, true, true, false, true, false, false]
1188            );
1189
1190            // We have loaded pages, but only once each.
1191            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1192
1193            // Re-read some pages.
1194
1195            // We can't evict pages from the VMO so get the kernel to resupply them but we can call
1196            // page_in directly and wait for the counters to change.
1197            blob.clone().page_in(PageInRange::new(
1198                32 * 1024..68 * 1024,
1199                blob.clone(),
1200                Epoch::global().guard(),
1201            ));
1202
1203            assert!(wait(|| volume.blob_resupplied_count().read(Ordering::SeqCst) == 2,).await);
1204
1205            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1206        }
1207
1208        fixture.close().await;
1209    }
1210}