Skip to main content

fxfs_platform/fuchsia/fxblob/
blob.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`FxBlob`] node type used to represent an immutable blob persisted to
6//! disk which can be read back.
7
8use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13    MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38pub const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
39
40// When the top bit of the open count is set, it means the file has been deleted and when the count
41// drops to zero, it will be tombstoned.  Once it has dropped to zero, it cannot be opened again
42// (assertions will fire).
43const PURGED: usize = 1 << (usize::BITS - 1);
44
45/// Represents an immutable blob stored on Fxfs with associated an merkle tree.
46#[derive(ToWeakNode)]
47pub struct FxBlob {
48    handle: DataObjectHandle<FxVolume>,
49    vmo: zx::Vmo,
50    open_count: AtomicUsize,
51    merkle_root: Hash,
52    merkle_verifier: ReadSizedMerkleVerifier,
53    compression_info: Option<CompressionInfo>,
54    uncompressed_size: u64, // always set.
55    pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
56    chunks_supplied: AtomicBitVec,
57}
58
59impl FxBlob {
60    pub fn new(
61        handle: DataObjectHandle<FxVolume>,
62        merkle_root: Hash,
63        merkle_verifier: MerkleVerifier,
64        compression_info: Option<CompressionInfo>,
65        uncompressed_size: u64,
66    ) -> Result<Arc<Self>, Error> {
67        let min_chunk_size = min_chunk_size(&compression_info);
68        let merkle_verifier =
69            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
70        let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
71
72        Ok(Arc::new_cyclic(|weak| {
73            let (vmo, pager_packet_receiver_registration) = handle
74                .owner()
75                .pager()
76                .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
77                .unwrap();
78            set_vmo_name(&vmo, &merkle_root);
79            Self {
80                handle,
81                vmo,
82                open_count: AtomicUsize::new(0),
83                merkle_root,
84                merkle_verifier,
85                compression_info,
86                uncompressed_size,
87                pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
88                chunks_supplied,
89            }
90        }))
91    }
92
93    /// Returns the new blob.
94    pub fn overwrite_me(
95        self: &Arc<Self>,
96        handle: DataObjectHandle<FxVolume>,
97        merkle_verifier: MerkleVerifier,
98        compression_info: Option<CompressionInfo>,
99    ) -> Arc<Self> {
100        let min_chunk_size = min_chunk_size(&compression_info);
101        let merkle_verifier =
102            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
103                .expect("The chunk size should have been validated by the delivery blob parser");
104        // The chunk size may have changed between the old blob and the new blob. Preserving the
105        // chunks supplied bits isn't important.
106        let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
107        let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
108
109        let new_blob = Arc::new(Self {
110            handle,
111            vmo,
112            open_count: AtomicUsize::new(0),
113            merkle_root: self.merkle_root,
114            merkle_verifier,
115            compression_info,
116            uncompressed_size: self.uncompressed_size,
117            pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
118            chunks_supplied,
119        });
120
121        // We have tests that rely on the cache being purged and there are races where the
122        // `FxBlob::drop` isn't called early enough, which can make the test flaky.
123        self.handle.owner().cache().remove(self.as_ref());
124
125        // Lock must be held until the open counts is incremented to prevent concurrent handling of
126        // zero children signals.
127        let receiver_lock =
128            self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
129        if receiver_lock.is_strong() {
130            // If there was a strong moved between them, then the counts exchange as well. It is
131            // only important that the increment happen under the lock as it may handle the next
132            // zero children signal, no new requests can now go to the old blob, but to safely
133            // ensure that all existing requests finish, we will defer the open count decrement.
134            new_blob.open_count_add_one();
135            let old_blob = self.clone();
136            Epoch::global().defer(move || old_blob.open_count_sub_one());
137        }
138        new_blob
139    }
140
141    pub fn root(&self) -> Hash {
142        self.merkle_root
143    }
144
145    fn record_page_fault_metric(&self, range: &Range<u64>) {
146        let chunk_size: u64 = min_chunk_size(&self.compression_info);
147
148        let first_chunk = range.start / chunk_size;
149        // The end of the range may not be chunk aligned if it's the last chunk.
150        let last_chunk = range.end.div_ceil(chunk_size);
151
152        let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
153
154        if supplied_count > 0 {
155            self.handle
156                .owner()
157                .blob_resupplied_count()
158                .increment(supplied_count, Ordering::Relaxed);
159        }
160    }
161}
162
163impl Drop for FxBlob {
164    fn drop(&mut self) {
165        let volume = self.handle.owner();
166        volume.cache().remove(self);
167    }
168}
169
170impl OpenedNode<FxBlob> {
171    /// Creates a read-only child VMO for this blob backed by the pager. The blob cannot be purged
172    /// until all child VMOs have been destroyed.
173    ///
174    /// *WARNING*: We need to ensure the open count is non-zero before invoking this function, so
175    /// it is only implemented for [`OpenedNode<FxBlob>`]. This prevents the blob from being purged
176    /// before we get a chance to register it with the pager for [`zx::Signals::VMO_ZERO_CHILDREN`].
177    pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
178        let blob = self.0.as_ref();
179        let child_vmo = blob.vmo.create_child(
180            zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
181            0,
182            0,
183        )?;
184        if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
185            // Take an open count so that we keep this object alive if it is otherwise closed. This
186            // is only valid since we know the current open count is non-zero, otherwise we might
187            // increment the open count after the blob has been purged.
188            blob.open_count_add_one();
189        }
190        Ok(child_vmo)
191    }
192}
193
194impl FxNode for FxBlob {
195    fn object_id(&self) -> u64 {
196        self.handle.object_id()
197    }
198
199    fn parent(&self) -> Option<Arc<FxDirectory>> {
200        unreachable!(); // Add a parent back-reference if needed.
201    }
202
203    fn set_parent(&self, _parent: Arc<FxDirectory>) {
204        // NOP
205    }
206
207    fn open_count_add_one(&self) {
208        let old = self.open_count.fetch_add(1, Ordering::Relaxed);
209        assert!(old != PURGED && old != PURGED - 1);
210    }
211
212    fn open_count_sub_one(self: Arc<Self>) {
213        let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
214        assert!(old & !PURGED > 0);
215        if old == PURGED + 1 {
216            let store = self.handle.store();
217            store
218                .filesystem()
219                .graveyard()
220                .queue_tombstone_object(store.store_object_id(), self.object_id());
221        }
222    }
223
224    fn object_descriptor(&self) -> ObjectDescriptor {
225        ObjectDescriptor::File
226    }
227
228    fn terminate(&self) {
229        self.pager_packet_receiver_registration.stop_watching_for_zero_children();
230    }
231
232    fn mark_to_be_purged(&self) {
233        let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
234        assert!(old & PURGED == 0);
235        if old == 0 {
236            let store = self.handle.store();
237            store
238                .filesystem()
239                .graveyard()
240                .queue_tombstone_object(store.store_object_id(), self.object_id());
241        }
242    }
243}
244
245impl PagerBacked for FxBlob {
246    fn pager(&self) -> &crate::pager::Pager {
247        self.handle.owner().pager()
248    }
249
250    fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
251        &self.pager_packet_receiver_registration
252    }
253
254    fn vmo(&self) -> &zx::Vmo {
255        &self.vmo
256    }
257
258    fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
259        let read_ahead_size = if let Some(compression_info) = &self.compression_info {
260            read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
261        } else {
262            READ_AHEAD_SIZE
263        };
264        // Delegate to the generic page handling code.
265        default_page_in(self, range, read_ahead_size)
266    }
267
268    fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
269        unreachable!();
270    }
271
272    fn on_zero_children(self: Arc<Self>) {
273        self.open_count_sub_one();
274    }
275
276    fn byte_size(&self) -> u64 {
277        self.uncompressed_size
278    }
279
280    async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
281        self.record_page_fault_metric(&range);
282
283        let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
284        let read = match &self.compression_info {
285            None => self.handle.read(range.start, buffer.as_mut()).await?,
286            Some(compression_info) => {
287                let compressed_offsets =
288                    match compression_info.compressed_range_for_uncompressed_range(&range)? {
289                        (start, None) => start..self.handle.get_size(),
290                        (start, Some(end)) => start..end.get(),
291                    };
292                let bs = self.handle.block_size();
293                let aligned = round_down(compressed_offsets.start, bs)
294                    ..round_up(compressed_offsets.end, bs).unwrap();
295                let mut compressed_buf =
296                    self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
297
298                let mut decompression_errors = 0;
299                let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
300                loop {
301                    let (read, _) = try_join!(
302                        self.handle.read(aligned.start, compressed_buf.as_mut()),
303                        async {
304                            buffer
305                                .allocator()
306                                .buffer_source()
307                                .commit_range(buffer.range())
308                                .map_err(|e| e.into())
309                        }
310                    )
311                    .with_context(|| {
312                        format!(
313                            "Failed to read compressed range {:?}, len {}",
314                            aligned,
315                            self.handle.get_size()
316                        )
317                    })?;
318                    let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
319                        ..(compressed_offsets.end - aligned.start) as usize;
320                    ensure!(
321                        read >= compressed_buf_range.end - compressed_buf_range.start,
322                        anyhow!(FxfsError::Inconsistent).context(format!(
323                            "Unexpected EOF, read {}, but expected {}",
324                            read,
325                            compressed_buf_range.end - compressed_buf_range.start,
326                        ))
327                    );
328
329                    let buf = buffer.as_mut_slice();
330                    let decompression_result = {
331                        fxfs_trace::duration!("blob-decompress", "len" => len);
332                        compression_info.decompress(
333                            &compressed_buf.as_slice()[compressed_buf_range],
334                            &mut buf[..len],
335                            range.start,
336                        )
337                    };
338                    match decompression_result {
339                        Ok(()) => break,
340                        Err(error) => {
341                            record_decompression_error_crash_report(
342                                compressed_buf.as_slice(),
343                                &range,
344                                &compressed_offsets,
345                                &self.merkle_root,
346                            )
347                            .await;
348                            decompression_errors += 1;
349                            if decompression_errors == 2 {
350                                bail!(
351                                    anyhow!(FxfsError::IntegrityError)
352                                        .context(format!("Decompression error: {error:?}"))
353                                );
354                            } else {
355                                warn!(error:?; "Decompression error; retrying");
356                            }
357                        }
358                    }
359                } // loop
360                if decompression_errors > 0 {
361                    info!("Read succeeded on second attempt");
362                }
363                len
364            }
365        };
366        {
367            // TODO(https://fxbug.dev/42073035): This should be offloaded to the kernel at which
368            // point we can delete this.
369            fxfs_trace::duration!("blob-verify", "len" => read);
370            self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
371        }
372        // Zero the tail.
373        buffer.as_mut_slice()[read..].fill(0);
374        Ok(buffer)
375    }
376}
377
378pub struct CompressionInfo {
379    chunk_size: u64,
380    // The chunked compression format stores 0 as the first offset but it's not stored here. Not
381    // storing the 0 avoids the allocation for blobs smaller than the chunk size.
382    small_offsets: Box<[u32]>,
383    large_offsets: Box<[u64]>,
384    decompressor: ThreadLocalDecompressor,
385}
386
387impl CompressionInfo {
388    pub fn new(
389        chunk_size: u64,
390        offsets: &[u64],
391        compression_algorithm: CompressionAlgorithm,
392    ) -> Result<Self, Error> {
393        let decompressor = compression_algorithm.thread_local_decompressor();
394        if chunk_size == 0 {
395            return Err(FxfsError::IntegrityError.into());
396        } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
397            // There should always be at least 1 offset and the first offset must always be 0.
398            Err(FxfsError::IntegrityError.into())
399        } else if !offsets.array_windows().all(|[a, b]| a < b) {
400            // The offsets must be in ascending order.
401            Err(FxfsError::IntegrityError.into())
402        } else if offsets.len() == 1 {
403            // Simple case where the blob is smaller than the chunk size so only the 0 offset is
404            // present. The 0 isn't stored so no allocation is necessary.
405            Ok(Self {
406                chunk_size,
407                small_offsets: Box::default(),
408                large_offsets: Box::default(),
409                decompressor,
410            })
411        } else if *offsets.last().unwrap() <= u32::MAX as u64 {
412            // Check the last index first since most compressed blobs are going to be smaller
413            // than 4GiB making all offsets small.
414            Ok(Self {
415                chunk_size,
416                small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
417                large_offsets: Box::default(),
418                decompressor,
419            })
420        } else {
421            // The partition point is the index of the first compressed offset that's > u32::MAX.
422            let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
423            Ok(Self {
424                chunk_size,
425                small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
426                large_offsets: offsets[partition_point..].into(),
427                decompressor,
428            })
429        }
430    }
431
432    fn compressed_range_for_uncompressed_range(
433        &self,
434        range: &Range<u64>,
435    ) -> Result<(u64, Option<NonZero<u64>>), Error> {
436        ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
437        ensure!(range.start < range.end, FxfsError::Inconsistent);
438
439        let start_chunk_index = (range.start / self.chunk_size) as usize;
440        let start_offset = self
441            .compressed_offset_for_chunk_index(start_chunk_index)
442            .ok_or(FxfsError::OutOfRange)?;
443
444        // The end of the range may not be aligned to the chunk size for the last chunk.
445        let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
446        let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
447            None => None,
448            Some(offset) => {
449                // This isn't the last chunk so the end must be aligned.
450                ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
451                // `CompressionInfo::new` validates that all of the offsets are ascending. The end
452                // of the range is greater than the start so this can never be 0.
453                Some(NonZero::new(offset).unwrap())
454            }
455        };
456        Ok((start_offset, end_offset))
457    }
458
459    fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
460        // The "0" compressed offset isn't stored so all of the indices are shifted left by 1.
461        if chunk_index == 0 {
462            Some(0)
463        } else if chunk_index - 1 < self.small_offsets.len() {
464            Some(self.small_offsets[chunk_index - 1] as u64)
465        } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
466            Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
467        } else {
468            None
469        }
470    }
471
472    /// Decompress the bytes of `src` into `dst`.
473    ///   - `src` is allowed to span multiple chunks.
474    ///   - `dst` must have the exact size of the uncompressed bytes.
475    ///   - `dst_start_offset` is the location of the uncompressed bytes within the blob and must be
476    ///     chunk aligned. This is necessary for determining the chunk boundaries in `src`.
477    fn decompress(
478        &self,
479        mut src: &[u8],
480        mut dst: &mut [u8],
481        dst_start_offset: u64,
482    ) -> Result<(), Error> {
483        ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
484
485        let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
486        let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
487        let mut start_offset = self
488            .compressed_offset_for_chunk_index(start_chunk_index)
489            .ok_or(FxfsError::Inconsistent)?;
490
491        // Decompress each chunk individually.
492        for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
493            match self.compressed_offset_for_chunk_index(chunk_index + 1) {
494                Some(end_offset) => {
495                    let (to_decompress, src_remaining) = src
496                        .split_at_checked((end_offset - start_offset) as usize)
497                        .ok_or(FxfsError::Inconsistent)?;
498                    let (to_decompress_into, dst_remaining) = dst
499                        .split_at_mut_checked(self.chunk_size as usize)
500                        .ok_or(FxfsError::Inconsistent)?;
501
502                    let decompressed_bytes = self.decompressor.decompress_into(
503                        to_decompress,
504                        to_decompress_into,
505                        chunk_index,
506                    )?;
507                    ensure!(
508                        decompressed_bytes == to_decompress_into.len(),
509                        FxfsError::Inconsistent
510                    );
511                    src = src_remaining;
512                    dst = dst_remaining;
513                    start_offset = end_offset;
514                }
515                None => {
516                    let decompressed_bytes =
517                        self.decompressor.decompress_into(src, dst, chunk_index)?;
518                    ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
519                }
520            }
521        }
522
523        Ok(())
524    }
525}
526
527fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
528    let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
529    let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
530    let name = zx::Name::new(&name).unwrap();
531    vmo.set_name(&name).unwrap();
532}
533
534fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
535    if let Some(compression_info) = compression_info {
536        read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
537    } else {
538        READ_AHEAD_SIZE
539    }
540}
541
542fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
543    if chunk_size >= suggested_read_ahead_size {
544        chunk_size
545    } else {
546        round_down(suggested_read_ahead_size, chunk_size)
547    }
548}
549
550async fn record_decompression_error_crash_report(
551    compressed_buf: &[u8],
552    uncompressed_offsets: &Range<u64>,
553    compressed_offsets: &Range<u64>,
554    merkle_root: &Hash,
555) {
556    static DONE_ONCE: AtomicBool = AtomicBool::new(false);
557    if !DONE_ONCE.swap(true, Ordering::Relaxed) {
558        if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
559            let size = compressed_buf.len() as u64;
560            let vmo = zx::Vmo::create(size).unwrap();
561            vmo.write(compressed_buf, 0).unwrap();
562            if let Err(e) = proxy
563                .file_report(CrashReport {
564                    program_name: Some("fxfs".to_string()),
565                    crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
566                    is_fatal: Some(false),
567                    annotations: Some(vec![
568                        Annotation {
569                            key: "fxfs.range".to_string(),
570                            value: format!("{:?}", uncompressed_offsets),
571                        },
572                        Annotation {
573                            key: "fxfs.compressed_offsets".to_string(),
574                            value: format!("{:?}", compressed_offsets),
575                        },
576                        Annotation {
577                            key: "fxfs.merkle_root".to_string(),
578                            value: format!("{}", merkle_root),
579                        },
580                    ]),
581                    attachments: Some(vec![Attachment {
582                        key: "fxfs_compressed_data".to_string(),
583                        value: Buffer { vmo, size },
584                    }]),
585                    ..Default::default()
586                })
587                .await
588            {
589                error!(e:?; "Failed to file crash report");
590            } else {
591                warn!("Filed crash report for decompression error");
592            }
593        } else {
594            error!("Failed to connect to crash report service");
595        }
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
603    use crate::fuchsia::pager::PageInRange;
604    use crate::fxblob::testing::open_blob_fixture;
605    use assert_matches::assert_matches;
606    use delivery_blob::CompressionMode;
607    use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
608    use fuchsia_async as fasync;
609    use fuchsia_async::epoch::Epoch;
610    use fxfs_make_blob_image::FxBlobBuilder;
611    use storage_device::DeviceHolder;
612    use storage_device::fake_device::FakeDevice;
613
614    const CHUNK_SIZE: usize = 32 * 1024;
615
616    #[fasync::run(10, test)]
617    async fn test_empty_blob() {
618        let fixture = new_blob_fixture().await;
619
620        let data = vec![];
621        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
622        assert_eq!(fixture.read_blob(hash).await, data);
623
624        fixture.close().await;
625    }
626
627    #[fasync::run(10, test)]
628    async fn test_large_blob() {
629        let fixture = new_blob_fixture().await;
630
631        let data = vec![3; 3_000_000];
632        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
633
634        assert_eq!(fixture.read_blob(hash).await, data);
635
636        fixture.close().await;
637    }
638
639    #[fasync::run(10, test)]
640    async fn test_large_compressed_blob() {
641        let fixture = new_blob_fixture().await;
642
643        let data = vec![3; 3_000_000];
644        let hash = fixture.write_blob(&data, CompressionMode::Always).await;
645
646        assert_eq!(fixture.read_blob(hash).await, data);
647
648        fixture.close().await;
649    }
650
651    #[fasync::run(10, test)]
652    async fn test_non_page_aligned_blob() {
653        let fixture = new_blob_fixture().await;
654
655        let page_size = zx::system_get_page_size() as usize;
656        let data = vec![0xffu8; page_size - 1];
657        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
658        assert_eq!(fixture.read_blob(hash).await, data);
659
660        {
661            let vmo = fixture.get_blob_vmo(hash).await;
662            let mut buf = vec![0x11u8; page_size];
663            vmo.read(&mut buf[..], 0).expect("vmo read failed");
664            assert_eq!(data, buf[..data.len()]);
665            // Ensure the tail is zeroed
666            assert_eq!(buf[data.len()], 0);
667        }
668
669        fixture.close().await;
670    }
671
672    #[fasync::run(10, test)]
673    async fn test_blob_invalid_contents() {
674        let fixture = new_blob_fixture().await;
675
676        let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
677        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678        let name = format!("{}", hash);
679
680        {
681            // Overwrite the second read-ahead window.  The first window should successfully verify.
682            let handle = fixture.get_blob_handle(&name).await;
683            let mut transaction =
684                handle.new_transaction().await.expect("failed to create transaction");
685            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
686            buf.as_mut_slice().fill(0);
687            handle
688                .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
689                .await
690                .expect("txn_write failed");
691            transaction.commit().await.expect("failed to commit transaction");
692        }
693
694        {
695            let blob_vmo = fixture.get_blob_vmo(hash).await;
696            let mut buf = vec![0; BLOCK_SIZE as usize];
697            assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
698            assert_matches!(
699                blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
700                Err(zx::Status::IO_DATA_INTEGRITY)
701            );
702        }
703
704        fixture.close().await;
705    }
706
707    #[fasync::run(10, test)]
708    async fn test_lz4_blob() {
709        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
710        let blob_data = vec![0xAA; 68 * 1024];
711        let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
712        let blob = fxblob_builder
713            .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
714            .unwrap();
715        let blob_hash = blob.hash();
716        fxblob_builder.install_blob(&blob).await.unwrap();
717        let device = fxblob_builder.finalize().await.unwrap().0;
718        device.reopen(/*read_only=*/ false);
719        let fixture = open_blob_fixture(device).await;
720
721        assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
722
723        fixture.close().await;
724    }
725
726    #[fasync::run(10, test)]
727    async fn test_blob_vmos_are_immutable() {
728        let fixture = new_blob_fixture().await;
729
730        let data = vec![0xffu8; 500];
731        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
732        let blob_vmo = fixture.get_blob_vmo(hash).await;
733
734        // The VMO shouldn't be resizable.
735        assert_matches!(blob_vmo.set_size(20), Err(_));
736
737        // The VMO shouldn't be writable.
738        assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
739
740        // The VMO's content size shouldn't be modifiable.
741        assert_matches!(blob_vmo.set_stream_size(20), Err(_));
742
743        fixture.close().await;
744    }
745
746    const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
747    const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
748    const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
749
750    #[fuchsia::test]
751    fn test_compression_info_offsets_must_start_with_zero() {
752        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
753        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
754        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
755    }
756
757    #[fuchsia::test]
758    fn test_compression_info_offsets_must_be_sorted() {
759        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
760        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
761        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
762    }
763
764    #[fuchsia::test]
765    fn test_compression_info_splitting_offsets() {
766        // Single chunk blob doesn't store any offsets.
767        let compression_info =
768            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
769        assert!(compression_info.small_offsets.is_empty());
770        assert!(compression_info.large_offsets.is_empty());
771
772        // Single small offset.
773        let compression_info =
774            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
775        assert_eq!(&*compression_info.small_offsets, &[10]);
776        assert!(compression_info.large_offsets.is_empty());
777
778        // Multiple small offsets.
779        let compression_info =
780            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
781        assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
782        assert!(compression_info.large_offsets.is_empty());
783
784        // One less than the largest small offset.
785        let compression_info =
786            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
787                .unwrap();
788        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
789        assert!(compression_info.large_offsets.is_empty());
790
791        // The largest small offset.
792        let compression_info =
793            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
794        assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
795        assert!(compression_info.large_offsets.is_empty());
796
797        // The smallest large offset.
798        let compression_info =
799            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
800                .unwrap();
801        assert!(compression_info.small_offsets.is_empty());
802        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
803
804        // Multiple offsets around boundary between small and large offsets.
805        let compression_info = CompressionInfo::new(
806            COMPRESSED_BLOB_CHUNK_SIZE,
807            &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
808            ZSTD,
809        )
810        .unwrap();
811        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
812        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
813
814        // Single large offset.
815        let compression_info =
816            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
817                .unwrap();
818        assert!(compression_info.small_offsets.is_empty());
819        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
820
821        // Multiple large offsets.
822        let compression_info = CompressionInfo::new(
823            COMPRESSED_BLOB_CHUNK_SIZE,
824            &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
825            ZSTD,
826        )
827        .unwrap();
828        assert!(compression_info.small_offsets.is_empty());
829        assert_eq!(
830            &*compression_info.large_offsets,
831            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
832        );
833
834        // Small and large offsets.
835        let compression_info = CompressionInfo::new(
836            COMPRESSED_BLOB_CHUNK_SIZE,
837            &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
838            ZSTD,
839        )
840        .unwrap();
841        assert_eq!(&*compression_info.small_offsets, &[10, 20]);
842        assert_eq!(
843            &*compression_info.large_offsets,
844            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
845        );
846    }
847
848    #[fuchsia::test]
849    fn test_compression_info_compressed_range_for_uncompressed_range() {
850        fn check_compression_ranges(
851            offsets: &[u64],
852            expected_ranges: &[(u64, Option<u64>)],
853            chunk_size: u64,
854            read_ahead_size: u64,
855        ) {
856            let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
857            for (i, range) in expected_ranges.iter().enumerate() {
858                let i = i as u64;
859                let result = compression_info
860                    .compressed_range_for_uncompressed_range(
861                        &(i * read_ahead_size..(i + 1) * read_ahead_size),
862                    )
863                    .unwrap();
864                assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
865            }
866        }
867        check_compression_ranges(
868            &[0, 10, 20, 30],
869            &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
870            COMPRESSED_BLOB_CHUNK_SIZE,
871            COMPRESSED_BLOB_CHUNK_SIZE,
872        );
873        check_compression_ranges(
874            &[0, 10, 20, 30],
875            &[(0, Some(20)), (20, None)],
876            COMPRESSED_BLOB_CHUNK_SIZE,
877            COMPRESSED_BLOB_CHUNK_SIZE * 2,
878        );
879        check_compression_ranges(
880            &[0, 10, 20, 30],
881            &[(0, None)],
882            COMPRESSED_BLOB_CHUNK_SIZE,
883            COMPRESSED_BLOB_CHUNK_SIZE * 4,
884        );
885        check_compression_ranges(
886            &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
887            &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
888            COMPRESSED_BLOB_CHUNK_SIZE,
889            COMPRESSED_BLOB_CHUNK_SIZE * 4,
890        );
891        check_compression_ranges(
892            &[
893                0,
894                10,
895                20,
896                30,
897                MAX_SMALL_OFFSET + 10,
898                MAX_SMALL_OFFSET + 20,
899                MAX_SMALL_OFFSET + 30,
900                MAX_SMALL_OFFSET + 40,
901                MAX_SMALL_OFFSET + 50,
902            ],
903            &[
904                (0, Some(20)),
905                (20, Some(MAX_SMALL_OFFSET + 10)),
906                (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
907                (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
908            ],
909            COMPRESSED_BLOB_CHUNK_SIZE,
910            COMPRESSED_BLOB_CHUNK_SIZE * 2,
911        );
912    }
913
914    #[fuchsia::test]
915    fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
916        let compression_info = CompressionInfo::new(
917            COMPRESSED_BLOB_CHUNK_SIZE,
918            &[
919                0,
920                10,
921                20,
922                30,
923                MAX_SMALL_OFFSET + 10,
924                MAX_SMALL_OFFSET + 20,
925                MAX_SMALL_OFFSET + 30,
926                MAX_SMALL_OFFSET + 40,
927                MAX_SMALL_OFFSET + 50,
928            ],
929            ZSTD,
930        )
931        .unwrap();
932
933        // The start of reads must be chunk aligned.
934        assert!(
935            compression_info
936                .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
937                .is_err()
938        );
939
940        // Reading entirely past the last offset isn't allowed.
941        assert!(
942            compression_info
943                .compressed_range_for_uncompressed_range(
944                    &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
945                )
946                .is_err()
947        );
948
949        // Reading a different amount than the read-ahead size isn't allowed for middle offsets.
950        assert!(
951            compression_info
952                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
953                .is_err()
954        );
955        assert!(
956            compression_info
957                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
958                .is_err()
959        );
960        assert!(
961            compression_info
962                .compressed_range_for_uncompressed_range(
963                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
964                )
965                .is_err()
966        );
967        assert!(
968            compression_info
969                .compressed_range_for_uncompressed_range(
970                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
971                )
972                .is_err()
973        );
974
975        // Reading less than the read-ahead size for the last offset is allowed.
976        assert!(
977            compression_info
978                .compressed_range_for_uncompressed_range(
979                    &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
980                )
981                .is_ok()
982        );
983    }
984
985    #[fuchsia::test]
986    fn test_read_ahead_size_for_chunk_size() {
987        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
988        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
989        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
990
991        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
992        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
993        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
994        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
995
996        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
997        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
998        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
999        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
1000    }
1001
1002    fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1003        let options =
1004            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1005        let mut compressor = options.compressor();
1006        let mut uncompressed_data = Vec::with_capacity(size);
1007        {
1008            let mut run_length = 1;
1009            let mut run_value: u8 = 0;
1010            while uncompressed_data.len() < size {
1011                uncompressed_data
1012                    .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1013                run_length = (run_length + 1) % 19 + 1;
1014                run_value = (run_value + 1) % 17;
1015            }
1016        }
1017        let mut compressed_offsets = vec![0];
1018        let mut compressed_data = vec![];
1019        for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1020            let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1021            compressed_data.append(&mut compressed_chunk);
1022            compressed_offsets.push(compressed_data.len() as u64);
1023        }
1024        compressed_offsets.pop();
1025        (
1026            CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1027                .unwrap(),
1028            compressed_data,
1029            uncompressed_data,
1030        )
1031    }
1032
1033    #[fuchsia::test]
1034    fn test_compression_info_decompress_single_chunk() {
1035        let (compression_info, compressed_data, uncompressed_data) =
1036            build_compression_info(CHUNK_SIZE);
1037        let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1038
1039        compression_info
1040            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1041            .expect("failed to decompress");
1042        assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1043
1044        // Too small of destination buffer.
1045        compression_info
1046            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1047            .expect_err("decompression should fail");
1048
1049        // Too large of destination buffer.
1050        compression_info
1051            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1052            .expect_err("decompression should fail");
1053    }
1054
1055    #[fuchsia::test]
1056    fn test_compression_info_decompress_multiple_chunks() {
1057        fn slice_for_chunks<'a>(
1058            compressed_data: &'a [u8],
1059            compression_info: &CompressionInfo,
1060            chunks: Range<u64>,
1061        ) -> &'a [u8] {
1062            let (start, end) = compression_info
1063                .compressed_range_for_uncompressed_range(
1064                    &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1065                )
1066                .unwrap();
1067            let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1068            &compressed_data[start as usize..end as usize]
1069        }
1070
1071        const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1072        let (compression_info, compressed_data, uncompressed_data) =
1073            build_compression_info(BLOB_SIZE);
1074        let mut decompressed_data = vec![0u8; BLOB_SIZE];
1075
1076        // Decompress the entire blob.
1077        compression_info
1078            .decompress(&compressed_data, &mut decompressed_data, 0)
1079            .expect("failed to decompress");
1080        assert_eq!(uncompressed_data, decompressed_data);
1081
1082        // Decompress just the whole chunks.
1083        compression_info
1084            .decompress(
1085                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1086                &mut decompressed_data[0..CHUNK_SIZE * 4],
1087                0,
1088            )
1089            .expect("failed to decompress");
1090        assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1091
1092        // Too small of destination buffer for whole chunks.
1093        compression_info
1094            .decompress(
1095                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1096                &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1097                0,
1098            )
1099            .expect_err("decompression should fail");
1100
1101        // Too large of destination buffer for whole chunks.
1102        compression_info
1103            .decompress(
1104                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1105                &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1106                0,
1107            )
1108            .expect_err("decompression should fail");
1109
1110        // Decompress just the tail.
1111        let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1112        compression_info
1113            .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1114            .expect("failed to decompress");
1115        assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1116
1117        // Too small of destination buffer for the tail.
1118        compression_info
1119            .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1120            .expect_err("decompression should fail");
1121
1122        // Too large of destination buffer for the tail.
1123        compression_info
1124            .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1125            .expect_err("decompression should fail");
1126    }
1127
1128    #[fasync::run(10, test)]
1129    async fn test_refault_metric() {
1130        let fixture = new_blob_fixture().await;
1131        {
1132            let volume = fixture.volume().volume().clone();
1133            const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1134            let data = vec![0xffu8; FILE_SIZE as usize];
1135            let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1136
1137            let blob = fixture.get_blob(hash).await.unwrap();
1138            assert_eq!(blob.chunks_supplied.len(), 4);
1139            // Nothing has been read yet.
1140            assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1141
1142            blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1143
1144            assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1145
1146            blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1147            assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1148
1149            // We have loaded pages, but only once each.
1150            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1151
1152            // Re-read some pages.
1153
1154            // We can't evict pages from the VMO to get the kernel to resupply them but we can call
1155            // page_in directly and wait for the counters to change.
1156            blob.clone().page_in(PageInRange::new(
1157                FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1158                blob.clone(),
1159                Epoch::global().guard(),
1160            ));
1161            Epoch::global().barrier().await;
1162
1163            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1164        }
1165
1166        fixture.close().await;
1167    }
1168}