Skip to main content

fxfs_platform/fuchsia/fxblob/
blob.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`FxBlob`] node type used to represent an immutable blob persisted to
6//! disk which can be read back.
7
8use crate::constants::*;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::errors::map_to_status;
11use crate::fuchsia::node::{FxNode, OpenedNode};
12use crate::fuchsia::pager::{
13    MarkDirtyRange, PageInRange, PagerBacked, PagerPacketReceiverRegistration, default_page_in,
14};
15use crate::fuchsia::volume::{FxVolume, READ_AHEAD_SIZE};
16use crate::fxblob::atomic_vec::AtomicBitVec;
17use anyhow::{Context, Error, anyhow, bail, ensure};
18use delivery_blob::compression::{CompressionAlgorithm, ThreadLocalDecompressor};
19use fidl_fuchsia_feedback::{Annotation, Attachment, CrashReport};
20use fidl_fuchsia_mem::Buffer;
21use fuchsia_async::epoch::Epoch;
22use fuchsia_component_client::connect_to_protocol;
23use fuchsia_merkle::{Hash, MerkleVerifier, ReadSizedMerkleVerifier};
24use futures::try_join;
25use fxfs::errors::FxfsError;
26use fxfs::log::*;
27use fxfs::object_handle::{ObjectHandle, ReadObjectHandle};
28use fxfs::object_store::{DataObjectHandle, ObjectDescriptor};
29use fxfs::round::{round_down, round_up};
30use fxfs_macros::ToWeakNode;
31use std::num::NonZero;
32use std::ops::Range;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use storage_device::buffer;
36use zx::{HandleBased, Status};
37
38// When the top bit of the open count is set, it means the file has been deleted and when the count
39// drops to zero, it will be tombstoned.  Once it has dropped to zero, it cannot be opened again
40// (assertions will fire).
41const PURGED: usize = 1 << (usize::BITS - 1);
42
43/// Represents an immutable blob stored on Fxfs with associated an merkle tree.
44#[derive(ToWeakNode)]
45pub struct FxBlob {
46    handle: DataObjectHandle<FxVolume>,
47    vmo: zx::Vmo,
48    open_count: AtomicUsize,
49    merkle_root: Hash,
50    merkle_verifier: ReadSizedMerkleVerifier,
51    compression_info: Option<CompressionInfo>,
52    uncompressed_size: u64, // always set.
53    pager_packet_receiver_registration: Arc<PagerPacketReceiverRegistration<Self>>,
54    chunks_supplied: AtomicBitVec,
55}
56
57impl FxBlob {
58    pub fn new(
59        handle: DataObjectHandle<FxVolume>,
60        merkle_root: Hash,
61        merkle_verifier: MerkleVerifier,
62        compression_info: Option<CompressionInfo>,
63        uncompressed_size: u64,
64    ) -> Result<Arc<Self>, Error> {
65        let min_chunk_size = min_chunk_size(&compression_info);
66        let merkle_verifier =
67            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)?;
68        let chunks_supplied = AtomicBitVec::new(uncompressed_size.div_ceil(min_chunk_size));
69
70        Ok(Arc::new_cyclic(|weak| {
71            let (vmo, pager_packet_receiver_registration) = handle
72                .owner()
73                .pager()
74                .create_vmo(weak.clone(), uncompressed_size, zx::VmoOptions::empty())
75                .unwrap();
76            set_vmo_name(&vmo, &merkle_root);
77            Self {
78                handle,
79                vmo,
80                open_count: AtomicUsize::new(0),
81                merkle_root,
82                merkle_verifier,
83                compression_info,
84                uncompressed_size,
85                pager_packet_receiver_registration: Arc::new(pager_packet_receiver_registration),
86                chunks_supplied,
87            }
88        }))
89    }
90
91    /// Returns the new blob.
92    pub fn overwrite_me(
93        self: &Arc<Self>,
94        handle: DataObjectHandle<FxVolume>,
95        merkle_verifier: MerkleVerifier,
96        compression_info: Option<CompressionInfo>,
97    ) -> Arc<Self> {
98        let min_chunk_size = min_chunk_size(&compression_info);
99        let merkle_verifier =
100            ReadSizedMerkleVerifier::new(merkle_verifier, min_chunk_size as usize)
101                .expect("The chunk size should have been validated by the delivery blob parser");
102        // The chunk size may have changed between the old blob and the new blob. Preserving the
103        // chunks supplied bits isn't important.
104        let chunks_supplied = AtomicBitVec::new(self.uncompressed_size.div_ceil(min_chunk_size));
105        let vmo = self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap();
106
107        let new_blob = Arc::new(Self {
108            handle,
109            vmo,
110            open_count: AtomicUsize::new(0),
111            merkle_root: self.merkle_root,
112            merkle_verifier,
113            compression_info,
114            uncompressed_size: self.uncompressed_size,
115            pager_packet_receiver_registration: self.pager_packet_receiver_registration.clone(),
116            chunks_supplied,
117        });
118
119        // We have tests that rely on the cache being purged and there are races where the
120        // `FxBlob::drop` isn't called early enough, which can make the test flaky.
121        self.handle.owner().cache().remove(self.as_ref());
122
123        // Lock must be held until the open counts is incremented to prevent concurrent handling of
124        // zero children signals.
125        let receiver_lock =
126            self.pager_packet_receiver_registration.receiver().set_receiver(&new_blob);
127        if receiver_lock.is_strong() {
128            // If there was a strong moved between them, then the counts exchange as well. It is
129            // only important that the increment happen under the lock as it may handle the next
130            // zero children signal, no new requests can now go to the old blob, but to safely
131            // ensure that all existing requests finish, we will defer the open count decrement.
132            new_blob.open_count_add_one();
133            let old_blob = self.clone();
134            Epoch::global().defer(move || old_blob.open_count_sub_one());
135        }
136        new_blob
137    }
138
139    pub fn root(&self) -> Hash {
140        self.merkle_root
141    }
142
143    fn record_page_fault_metric(&self, range: &Range<u64>) {
144        let chunk_size: u64 = min_chunk_size(&self.compression_info);
145
146        let first_chunk = range.start / chunk_size;
147        // The end of the range may not be chunk aligned if it's the last chunk.
148        let last_chunk = range.end.div_ceil(chunk_size);
149
150        let supplied_count = self.chunks_supplied.test_and_set_range(first_chunk, last_chunk);
151
152        if supplied_count > 0 {
153            self.handle
154                .owner()
155                .blob_resupplied_count()
156                .increment(supplied_count, Ordering::Relaxed);
157        }
158    }
159}
160
161impl Drop for FxBlob {
162    fn drop(&mut self) {
163        let volume = self.handle.owner();
164        volume.cache().remove(self);
165    }
166}
167
168impl OpenedNode<FxBlob> {
169    /// Creates a read-only child VMO for this blob backed by the pager. The blob cannot be purged
170    /// until all child VMOs have been destroyed.
171    ///
172    /// *WARNING*: We need to ensure the open count is non-zero before invoking this function, so
173    /// it is only implemented for [`OpenedNode<FxBlob>`]. This prevents the blob from being purged
174    /// before we get a chance to register it with the pager for [`zx::Signals::VMO_ZERO_CHILDREN`].
175    pub fn create_child_vmo(&self) -> Result<zx::Vmo, Status> {
176        let blob = self.0.as_ref();
177        let child_vmo = blob.vmo.create_child(
178            zx::VmoChildOptions::REFERENCE | zx::VmoChildOptions::NO_WRITE,
179            0,
180            0,
181        )?;
182        if blob.handle.owner().pager().watch_for_zero_children(blob).map_err(map_to_status)? {
183            // Take an open count so that we keep this object alive if it is otherwise closed. This
184            // is only valid since we know the current open count is non-zero, otherwise we might
185            // increment the open count after the blob has been purged.
186            blob.open_count_add_one();
187        }
188        Ok(child_vmo)
189    }
190}
191
192impl FxNode for FxBlob {
193    fn object_id(&self) -> u64 {
194        self.handle.object_id()
195    }
196
197    fn parent(&self) -> Option<Arc<FxDirectory>> {
198        unreachable!(); // Add a parent back-reference if needed.
199    }
200
201    fn set_parent(&self, _parent: Arc<FxDirectory>) {
202        // NOP
203    }
204
205    fn open_count_add_one(&self) {
206        let old = self.open_count.fetch_add(1, Ordering::Relaxed);
207        assert!(old != PURGED && old != PURGED - 1);
208    }
209
210    fn open_count_sub_one(self: Arc<Self>) {
211        let old = self.open_count.fetch_sub(1, Ordering::Relaxed);
212        assert!(old & !PURGED > 0);
213        if old == PURGED + 1 {
214            let store = self.handle.store();
215            store
216                .filesystem()
217                .graveyard()
218                .queue_tombstone_object(store.store_object_id(), self.object_id());
219        }
220    }
221
222    fn object_descriptor(&self) -> ObjectDescriptor {
223        ObjectDescriptor::File
224    }
225
226    fn terminate(&self) {
227        self.pager_packet_receiver_registration.stop_watching_for_zero_children();
228    }
229
230    fn mark_to_be_purged(self: Arc<Self>) {
231        let old = self.open_count.fetch_or(PURGED, Ordering::Relaxed);
232        assert!(old & PURGED == 0);
233        if old == 0 {
234            let store = self.handle.store();
235            store
236                .filesystem()
237                .graveyard()
238                .queue_tombstone_object(store.store_object_id(), self.object_id());
239        }
240    }
241}
242
243impl PagerBacked for FxBlob {
244    fn pager(&self) -> &crate::pager::Pager {
245        self.handle.owner().pager()
246    }
247
248    fn pager_packet_receiver_registration(&self) -> &PagerPacketReceiverRegistration<Self> {
249        &self.pager_packet_receiver_registration
250    }
251
252    fn vmo(&self) -> &zx::Vmo {
253        &self.vmo
254    }
255
256    fn page_in(self: Arc<Self>, range: PageInRange<Self>) {
257        let read_ahead_size = if let Some(compression_info) = &self.compression_info {
258            read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
259        } else {
260            READ_AHEAD_SIZE
261        };
262        // Delegate to the generic page handling code.
263        default_page_in(self, range, read_ahead_size)
264    }
265
266    fn mark_dirty(self: Arc<Self>, _range: MarkDirtyRange<Self>) {
267        unreachable!();
268    }
269
270    fn on_zero_children(self: Arc<Self>) {
271        self.open_count_sub_one();
272    }
273
274    fn byte_size(&self) -> u64 {
275        self.uncompressed_size
276    }
277
278    async fn aligned_read(&self, range: Range<u64>) -> Result<buffer::Buffer<'_>, Error> {
279        self.record_page_fault_metric(&range);
280
281        let mut buffer = self.handle.allocate_buffer((range.end - range.start) as usize).await;
282        let read = match &self.compression_info {
283            None => self.handle.read(range.start, buffer.as_mut()).await?,
284            Some(compression_info) => {
285                let compressed_offsets =
286                    match compression_info.compressed_range_for_uncompressed_range(&range)? {
287                        (start, None) => start..self.handle.get_size(),
288                        (start, Some(end)) => start..end.get(),
289                    };
290                let bs = self.handle.block_size();
291                let aligned = round_down(compressed_offsets.start, bs)
292                    ..round_up(compressed_offsets.end, bs).unwrap();
293                let mut compressed_buf =
294                    self.handle.allocate_buffer((aligned.end - aligned.start) as usize).await;
295
296                let mut decompression_errors = 0;
297                let len = (std::cmp::min(range.end, self.uncompressed_size) - range.start) as usize;
298                loop {
299                    let (read, _) = try_join!(
300                        self.handle.read(aligned.start, compressed_buf.as_mut()),
301                        async {
302                            buffer
303                                .allocator()
304                                .buffer_source()
305                                .commit_range(buffer.range())
306                                .map_err(|e| e.into())
307                        }
308                    )
309                    .with_context(|| {
310                        format!(
311                            "Failed to read compressed range {:?}, len {}",
312                            aligned,
313                            self.handle.get_size()
314                        )
315                    })?;
316                    let compressed_buf_range = (compressed_offsets.start - aligned.start) as usize
317                        ..(compressed_offsets.end - aligned.start) as usize;
318                    ensure!(
319                        read >= compressed_buf_range.end - compressed_buf_range.start,
320                        anyhow!(FxfsError::Inconsistent).context(format!(
321                            "Unexpected EOF, read {}, but expected {}",
322                            read,
323                            compressed_buf_range.end - compressed_buf_range.start,
324                        ))
325                    );
326
327                    let buf = buffer.as_mut_slice();
328                    let decompression_result = {
329                        fxfs_trace::duration!("blob-decompress", "len" => len);
330                        compression_info.decompress(
331                            &compressed_buf.as_slice()[compressed_buf_range],
332                            &mut buf[..len],
333                            range.start,
334                        )
335                    };
336                    match decompression_result {
337                        Ok(()) => break,
338                        Err(error) => {
339                            record_decompression_error_crash_report(
340                                compressed_buf.as_slice(),
341                                &range,
342                                &compressed_offsets,
343                                &self.merkle_root,
344                            )
345                            .await;
346                            decompression_errors += 1;
347                            if decompression_errors == 2 {
348                                bail!(
349                                    anyhow!(FxfsError::IntegrityError)
350                                        .context(format!("Decompression error: {error:?}"))
351                                );
352                            } else {
353                                warn!(error:?; "Decompression error; retrying");
354                            }
355                        }
356                    }
357                } // loop
358                if decompression_errors > 0 {
359                    info!("Read succeeded on second attempt");
360                }
361                len
362            }
363        };
364        {
365            // TODO(https://fxbug.dev/42073035): This should be offloaded to the kernel at which
366            // point we can delete this.
367            fxfs_trace::duration!("blob-verify", "len" => read);
368            self.merkle_verifier.verify(range.start as usize, &buffer.as_slice()[..read])?;
369        }
370        // Zero the tail.
371        buffer.as_mut_slice()[read..].fill(0);
372        Ok(buffer)
373    }
374}
375
376pub struct CompressionInfo {
377    chunk_size: u64,
378    // The chunked compression format stores 0 as the first offset but it's not stored here. Not
379    // storing the 0 avoids the allocation for blobs smaller than the chunk size.
380    small_offsets: Box<[u32]>,
381    large_offsets: Box<[u64]>,
382    decompressor: ThreadLocalDecompressor,
383}
384
385impl CompressionInfo {
386    pub fn new(
387        chunk_size: u64,
388        offsets: &[u64],
389        compression_algorithm: CompressionAlgorithm,
390    ) -> Result<Self, Error> {
391        let decompressor = compression_algorithm.thread_local_decompressor();
392        if chunk_size == 0 {
393            return Err(FxfsError::IntegrityError.into());
394        } else if offsets.is_empty() || *offsets.first().unwrap() != 0 {
395            // There should always be at least 1 offset and the first offset must always be 0.
396            Err(FxfsError::IntegrityError.into())
397        } else if !offsets.array_windows().all(|[a, b]| a < b) {
398            // The offsets must be in ascending order.
399            Err(FxfsError::IntegrityError.into())
400        } else if offsets.len() == 1 {
401            // Simple case where the blob is smaller than the chunk size so only the 0 offset is
402            // present. The 0 isn't stored so no allocation is necessary.
403            Ok(Self {
404                chunk_size,
405                small_offsets: Box::default(),
406                large_offsets: Box::default(),
407                decompressor,
408            })
409        } else if *offsets.last().unwrap() <= u32::MAX as u64 {
410            // Check the last index first since most compressed blobs are going to be smaller
411            // than 4GiB making all offsets small.
412            Ok(Self {
413                chunk_size,
414                small_offsets: offsets[1..].iter().map(|x| *x as u32).collect(),
415                large_offsets: Box::default(),
416                decompressor,
417            })
418        } else {
419            // The partition point is the index of the first compressed offset that's > u32::MAX.
420            let partition_point = offsets.partition_point(|&x| x <= u32::MAX as u64);
421            Ok(Self {
422                chunk_size,
423                small_offsets: offsets[1..partition_point].iter().map(|x| *x as u32).collect(),
424                large_offsets: offsets[partition_point..].into(),
425                decompressor,
426            })
427        }
428    }
429
430    fn compressed_range_for_uncompressed_range(
431        &self,
432        range: &Range<u64>,
433    ) -> Result<(u64, Option<NonZero<u64>>), Error> {
434        ensure!(range.start.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
435        ensure!(range.start < range.end, FxfsError::Inconsistent);
436
437        let start_chunk_index = (range.start / self.chunk_size) as usize;
438        let start_offset = self
439            .compressed_offset_for_chunk_index(start_chunk_index)
440            .ok_or(FxfsError::OutOfRange)?;
441
442        // The end of the range may not be aligned to the chunk size for the last chunk.
443        let end_chunk_index = range.end.div_ceil(self.chunk_size) as usize;
444        let end_offset = match self.compressed_offset_for_chunk_index(end_chunk_index) {
445            None => None,
446            Some(offset) => {
447                // This isn't the last chunk so the end must be aligned.
448                ensure!(range.end.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
449                // `CompressionInfo::new` validates that all of the offsets are ascending. The end
450                // of the range is greater than the start so this can never be 0.
451                Some(NonZero::new(offset).unwrap())
452            }
453        };
454        Ok((start_offset, end_offset))
455    }
456
457    fn compressed_offset_for_chunk_index(&self, chunk_index: usize) -> Option<u64> {
458        // The "0" compressed offset isn't stored so all of the indices are shifted left by 1.
459        if chunk_index == 0 {
460            Some(0)
461        } else if chunk_index - 1 < self.small_offsets.len() {
462            Some(self.small_offsets[chunk_index - 1] as u64)
463        } else if chunk_index - 1 - self.small_offsets.len() < self.large_offsets.len() {
464            Some(self.large_offsets[chunk_index - 1 - self.small_offsets.len()])
465        } else {
466            None
467        }
468    }
469
470    /// Decompress the bytes of `src` into `dst`.
471    ///   - `src` is allowed to span multiple chunks.
472    ///   - `dst` must have the exact size of the uncompressed bytes.
473    ///   - `dst_start_offset` is the location of the uncompressed bytes within the blob and must be
474    ///     chunk aligned. This is necessary for determining the chunk boundaries in `src`.
475    fn decompress(
476        &self,
477        mut src: &[u8],
478        mut dst: &mut [u8],
479        dst_start_offset: u64,
480    ) -> Result<(), Error> {
481        ensure!(dst_start_offset.is_multiple_of(self.chunk_size), FxfsError::Inconsistent);
482
483        let start_chunk_index = (dst_start_offset / self.chunk_size) as usize;
484        let chunk_count = dst.len().div_ceil(self.chunk_size as usize);
485        let mut start_offset = self
486            .compressed_offset_for_chunk_index(start_chunk_index)
487            .ok_or(FxfsError::Inconsistent)?;
488
489        // Decompress each chunk individually.
490        for chunk_index in start_chunk_index..(start_chunk_index + chunk_count) {
491            match self.compressed_offset_for_chunk_index(chunk_index + 1) {
492                Some(end_offset) => {
493                    let (to_decompress, src_remaining) = src
494                        .split_at_checked((end_offset - start_offset) as usize)
495                        .ok_or(FxfsError::Inconsistent)?;
496                    let (to_decompress_into, dst_remaining) = dst
497                        .split_at_mut_checked(self.chunk_size as usize)
498                        .ok_or(FxfsError::Inconsistent)?;
499
500                    let decompressed_bytes = self.decompressor.decompress_into(
501                        to_decompress,
502                        to_decompress_into,
503                        chunk_index,
504                    )?;
505                    ensure!(
506                        decompressed_bytes == to_decompress_into.len(),
507                        FxfsError::Inconsistent
508                    );
509                    src = src_remaining;
510                    dst = dst_remaining;
511                    start_offset = end_offset;
512                }
513                None => {
514                    let decompressed_bytes =
515                        self.decompressor.decompress_into(src, dst, chunk_index)?;
516                    ensure!(decompressed_bytes == dst.len(), FxfsError::Inconsistent);
517                }
518            }
519        }
520
521        Ok(())
522    }
523}
524
525fn set_vmo_name(vmo: &zx::Vmo, merkle_root: &Hash) {
526    let trimmed_merkle = &merkle_root.to_string()[0..BLOB_NAME_HASH_LENGTH];
527    let name = format!("{BLOB_NAME_PREFIX}{trimmed_merkle}");
528    let name = zx::Name::new(&name).unwrap();
529    vmo.set_name(&name).unwrap();
530}
531
532fn min_chunk_size(compression_info: &Option<CompressionInfo>) -> u64 {
533    if let Some(compression_info) = compression_info {
534        read_ahead_size_for_chunk_size(compression_info.chunk_size, READ_AHEAD_SIZE)
535    } else {
536        READ_AHEAD_SIZE
537    }
538}
539
540fn read_ahead_size_for_chunk_size(chunk_size: u64, suggested_read_ahead_size: u64) -> u64 {
541    if chunk_size >= suggested_read_ahead_size {
542        chunk_size
543    } else {
544        round_down(suggested_read_ahead_size, chunk_size)
545    }
546}
547
548async fn record_decompression_error_crash_report(
549    compressed_buf: &[u8],
550    uncompressed_offsets: &Range<u64>,
551    compressed_offsets: &Range<u64>,
552    merkle_root: &Hash,
553) {
554    static DONE_ONCE: AtomicBool = AtomicBool::new(false);
555    if !DONE_ONCE.swap(true, Ordering::Relaxed) {
556        if let Ok(proxy) = connect_to_protocol::<fidl_fuchsia_feedback::CrashReporterMarker>() {
557            let size = compressed_buf.len() as u64;
558            let vmo = zx::Vmo::create(size).unwrap();
559            vmo.write(compressed_buf, 0).unwrap();
560            if let Err(e) = proxy
561                .file_report(CrashReport {
562                    program_name: Some("fxfs".to_string()),
563                    crash_signature: Some("fuchsia-fxfs-decompression_error".to_string()),
564                    is_fatal: Some(false),
565                    annotations: Some(vec![
566                        Annotation {
567                            key: "fxfs.range".to_string(),
568                            value: format!("{:?}", uncompressed_offsets),
569                        },
570                        Annotation {
571                            key: "fxfs.compressed_offsets".to_string(),
572                            value: format!("{:?}", compressed_offsets),
573                        },
574                        Annotation {
575                            key: "fxfs.merkle_root".to_string(),
576                            value: format!("{}", merkle_root),
577                        },
578                    ]),
579                    attachments: Some(vec![Attachment {
580                        key: "fxfs_compressed_data".to_string(),
581                        value: Buffer { vmo, size },
582                    }]),
583                    ..Default::default()
584                })
585                .await
586            {
587                error!(e:?; "Failed to file crash report");
588            } else {
589                warn!("Filed crash report for decompression error");
590            }
591        } else {
592            error!("Failed to connect to crash report service");
593        }
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture};
601    use crate::fuchsia::pager::PageInRange;
602    use crate::fxblob::testing::open_blob_fixture;
603    use assert_matches::assert_matches;
604    use delivery_blob::CompressionMode;
605    use delivery_blob::compression::{ChunkedArchiveOptions, CompressionAlgorithm};
606    use fuchsia_async as fasync;
607    use fuchsia_async::epoch::Epoch;
608    use fxfs_make_blob_image::FxBlobBuilder;
609    use storage_device::DeviceHolder;
610    use storage_device::fake_device::FakeDevice;
611
612    const BLOCK_SIZE: u64 = fuchsia_merkle::BLOCK_SIZE as u64;
613    const CHUNK_SIZE: usize = 32 * 1024;
614
615    #[fasync::run(10, test)]
616    async fn test_empty_blob() {
617        let fixture = new_blob_fixture().await;
618
619        let data = vec![];
620        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
621        assert_eq!(fixture.read_blob(hash).await, data);
622
623        fixture.close().await;
624    }
625
626    #[fasync::run(10, test)]
627    async fn test_large_blob() {
628        let fixture = new_blob_fixture().await;
629
630        let data = vec![3; 3_000_000];
631        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
632
633        assert_eq!(fixture.read_blob(hash).await, data);
634
635        fixture.close().await;
636    }
637
638    #[fasync::run(10, test)]
639    async fn test_large_compressed_blob() {
640        let fixture = new_blob_fixture().await;
641
642        let data = vec![3; 3_000_000];
643        let hash = fixture.write_blob(&data, CompressionMode::Always).await;
644
645        assert_eq!(fixture.read_blob(hash).await, data);
646
647        fixture.close().await;
648    }
649
650    #[fasync::run(10, test)]
651    async fn test_non_page_aligned_blob() {
652        let fixture = new_blob_fixture().await;
653
654        let page_size = zx::system_get_page_size() as usize;
655        let data = vec![0xffu8; page_size - 1];
656        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
657        assert_eq!(fixture.read_blob(hash).await, data);
658
659        {
660            let vmo = fixture.get_blob_vmo(hash).await;
661            let mut buf = vec![0x11u8; page_size];
662            vmo.read(&mut buf[..], 0).expect("vmo read failed");
663            assert_eq!(data, buf[..data.len()]);
664            // Ensure the tail is zeroed
665            assert_eq!(buf[data.len()], 0);
666        }
667
668        fixture.close().await;
669    }
670
671    #[fasync::run(10, test)]
672    async fn test_blob_invalid_contents() {
673        let fixture = new_blob_fixture().await;
674
675        let data = vec![0xffu8; (READ_AHEAD_SIZE + BLOCK_SIZE) as usize];
676        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
677        let name = format!("{}", hash);
678
679        {
680            // Overwrite the second read-ahead window.  The first window should successfully verify.
681            let handle = fixture.get_blob_handle(&name).await;
682            let mut transaction =
683                handle.new_transaction().await.expect("failed to create transaction");
684            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
685            buf.as_mut_slice().fill(0);
686            handle
687                .txn_write(&mut transaction, READ_AHEAD_SIZE, buf.as_ref())
688                .await
689                .expect("txn_write failed");
690            transaction.commit().await.expect("failed to commit transaction");
691        }
692
693        {
694            let blob_vmo = fixture.get_blob_vmo(hash).await;
695            let mut buf = vec![0; BLOCK_SIZE as usize];
696            assert_matches!(blob_vmo.read(&mut buf[..], 0), Ok(_));
697            assert_matches!(
698                blob_vmo.read(&mut buf[..], READ_AHEAD_SIZE),
699                Err(zx::Status::IO_DATA_INTEGRITY)
700            );
701        }
702
703        fixture.close().await;
704    }
705
706    #[fasync::run(10, test)]
707    async fn test_lz4_blob() {
708        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
709        let blob_data = vec![0xAA; 68 * 1024];
710        let fxblob_builder = FxBlobBuilder::new(device).await.unwrap();
711        let blob = fxblob_builder
712            .generate_blob(blob_data.clone(), Some(CompressionAlgorithm::Lz4))
713            .unwrap();
714        let blob_hash = blob.hash();
715        fxblob_builder.install_blob(&blob).await.unwrap();
716        let device = fxblob_builder.finalize().await.unwrap().0;
717        device.reopen(/*read_only=*/ false);
718        let fixture = open_blob_fixture(device).await;
719
720        assert_eq!(fixture.read_blob(blob_hash).await, blob_data);
721
722        fixture.close().await;
723    }
724
725    #[fasync::run(10, test)]
726    async fn test_blob_vmos_are_immutable() {
727        let fixture = new_blob_fixture().await;
728
729        let data = vec![0xffu8; 500];
730        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
731        let blob_vmo = fixture.get_blob_vmo(hash).await;
732
733        // The VMO shouldn't be resizable.
734        assert_matches!(blob_vmo.set_size(20), Err(_));
735
736        // The VMO shouldn't be writable.
737        assert_matches!(blob_vmo.write(b"overwrite", 0), Err(_));
738
739        // The VMO's content size shouldn't be modifiable.
740        assert_matches!(blob_vmo.set_stream_size(20), Err(_));
741
742        fixture.close().await;
743    }
744
745    const COMPRESSED_BLOB_CHUNK_SIZE: u64 = 32 * 1024;
746    const MAX_SMALL_OFFSET: u64 = u32::MAX as u64;
747    const ZSTD: CompressionAlgorithm = CompressionAlgorithm::Zstd;
748
749    #[fuchsia::test]
750    fn test_compression_info_offsets_must_start_with_zero() {
751        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[], ZSTD).is_err());
752        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[1], ZSTD).is_err());
753        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).is_ok());
754    }
755
756    #[fuchsia::test]
757    fn test_compression_info_offsets_must_be_sorted() {
758        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 2], ZSTD).is_ok());
759        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 2, 1], ZSTD).is_err());
760        assert!(CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 1, 1], ZSTD).is_err());
761    }
762
763    #[fuchsia::test]
764    fn test_compression_info_splitting_offsets() {
765        // Single chunk blob doesn't store any offsets.
766        let compression_info =
767            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0], ZSTD).unwrap();
768        assert!(compression_info.small_offsets.is_empty());
769        assert!(compression_info.large_offsets.is_empty());
770
771        // Single small offset.
772        let compression_info =
773            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10], ZSTD).unwrap();
774        assert_eq!(&*compression_info.small_offsets, &[10]);
775        assert!(compression_info.large_offsets.is_empty());
776
777        // Multiple small offsets.
778        let compression_info =
779            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, 10, 20, 30], ZSTD).unwrap();
780        assert_eq!(&*compression_info.small_offsets, &[10, 20, 30]);
781        assert!(compression_info.large_offsets.is_empty());
782
783        // One less than the largest small offset.
784        let compression_info =
785            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET - 1], ZSTD)
786                .unwrap();
787        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1]);
788        assert!(compression_info.large_offsets.is_empty());
789
790        // The largest small offset.
791        let compression_info =
792            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET], ZSTD).unwrap();
793        assert_eq!(&*compression_info.small_offsets, &[u32::MAX]);
794        assert!(compression_info.large_offsets.is_empty());
795
796        // The smallest large offset.
797        let compression_info =
798            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 1], ZSTD)
799                .unwrap();
800        assert!(compression_info.small_offsets.is_empty());
801        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
802
803        // Multiple offsets around boundary between small and large offsets.
804        let compression_info = CompressionInfo::new(
805            COMPRESSED_BLOB_CHUNK_SIZE,
806            &[0, MAX_SMALL_OFFSET - 1, MAX_SMALL_OFFSET, MAX_SMALL_OFFSET + 1],
807            ZSTD,
808        )
809        .unwrap();
810        assert_eq!(&*compression_info.small_offsets, &[u32::MAX - 1, u32::MAX]);
811        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 1]);
812
813        // Single large offset.
814        let compression_info =
815            CompressionInfo::new(COMPRESSED_BLOB_CHUNK_SIZE, &[0, MAX_SMALL_OFFSET + 10], ZSTD)
816                .unwrap();
817        assert!(compression_info.small_offsets.is_empty());
818        assert_eq!(&*compression_info.large_offsets, &[MAX_SMALL_OFFSET + 10]);
819
820        // Multiple large offsets.
821        let compression_info = CompressionInfo::new(
822            COMPRESSED_BLOB_CHUNK_SIZE,
823            &[0, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
824            ZSTD,
825        )
826        .unwrap();
827        assert!(compression_info.small_offsets.is_empty());
828        assert_eq!(
829            &*compression_info.large_offsets,
830            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
831        );
832
833        // Small and large offsets.
834        let compression_info = CompressionInfo::new(
835            COMPRESSED_BLOB_CHUNK_SIZE,
836            &[0, 10, 20, MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20],
837            ZSTD,
838        )
839        .unwrap();
840        assert_eq!(&*compression_info.small_offsets, &[10, 20]);
841        assert_eq!(
842            &*compression_info.large_offsets,
843            &[MAX_SMALL_OFFSET + 10, MAX_SMALL_OFFSET + 20]
844        );
845    }
846
847    #[fuchsia::test]
848    fn test_compression_info_compressed_range_for_uncompressed_range() {
849        fn check_compression_ranges(
850            offsets: &[u64],
851            expected_ranges: &[(u64, Option<u64>)],
852            chunk_size: u64,
853            read_ahead_size: u64,
854        ) {
855            let compression_info = CompressionInfo::new(chunk_size, offsets, ZSTD).unwrap();
856            for (i, range) in expected_ranges.iter().enumerate() {
857                let i = i as u64;
858                let result = compression_info
859                    .compressed_range_for_uncompressed_range(
860                        &(i * read_ahead_size..(i + 1) * read_ahead_size),
861                    )
862                    .unwrap();
863                assert_eq!(result, (range.0, range.1.map(|end| NonZero::new(end).unwrap())));
864            }
865        }
866        check_compression_ranges(
867            &[0, 10, 20, 30],
868            &[(0, Some(10)), (10, Some(20)), (20, Some(30)), (30, None)],
869            COMPRESSED_BLOB_CHUNK_SIZE,
870            COMPRESSED_BLOB_CHUNK_SIZE,
871        );
872        check_compression_ranges(
873            &[0, 10, 20, 30],
874            &[(0, Some(20)), (20, None)],
875            COMPRESSED_BLOB_CHUNK_SIZE,
876            COMPRESSED_BLOB_CHUNK_SIZE * 2,
877        );
878        check_compression_ranges(
879            &[0, 10, 20, 30],
880            &[(0, None)],
881            COMPRESSED_BLOB_CHUNK_SIZE,
882            COMPRESSED_BLOB_CHUNK_SIZE * 4,
883        );
884        check_compression_ranges(
885            &[0, 10, 20, 30, MAX_SMALL_OFFSET + 10],
886            &[(0, Some(MAX_SMALL_OFFSET + 10)), (MAX_SMALL_OFFSET + 10, None)],
887            COMPRESSED_BLOB_CHUNK_SIZE,
888            COMPRESSED_BLOB_CHUNK_SIZE * 4,
889        );
890        check_compression_ranges(
891            &[
892                0,
893                10,
894                20,
895                30,
896                MAX_SMALL_OFFSET + 10,
897                MAX_SMALL_OFFSET + 20,
898                MAX_SMALL_OFFSET + 30,
899                MAX_SMALL_OFFSET + 40,
900                MAX_SMALL_OFFSET + 50,
901            ],
902            &[
903                (0, Some(20)),
904                (20, Some(MAX_SMALL_OFFSET + 10)),
905                (MAX_SMALL_OFFSET + 10, Some(MAX_SMALL_OFFSET + 30)),
906                (MAX_SMALL_OFFSET + 30, Some(MAX_SMALL_OFFSET + 50)),
907            ],
908            COMPRESSED_BLOB_CHUNK_SIZE,
909            COMPRESSED_BLOB_CHUNK_SIZE * 2,
910        );
911    }
912
913    #[fuchsia::test]
914    fn test_compression_info_compressed_range_for_uncompressed_range_errors() {
915        let compression_info = CompressionInfo::new(
916            COMPRESSED_BLOB_CHUNK_SIZE,
917            &[
918                0,
919                10,
920                20,
921                30,
922                MAX_SMALL_OFFSET + 10,
923                MAX_SMALL_OFFSET + 20,
924                MAX_SMALL_OFFSET + 30,
925                MAX_SMALL_OFFSET + 40,
926                MAX_SMALL_OFFSET + 50,
927            ],
928            ZSTD,
929        )
930        .unwrap();
931
932        // The start of reads must be chunk aligned.
933        assert!(
934            compression_info
935                .compressed_range_for_uncompressed_range(&(1..COMPRESSED_BLOB_CHUNK_SIZE),)
936                .is_err()
937        );
938
939        // Reading entirely past the last offset isn't allowed.
940        assert!(
941            compression_info
942                .compressed_range_for_uncompressed_range(
943                    &(COMPRESSED_BLOB_CHUNK_SIZE * 9..COMPRESSED_BLOB_CHUNK_SIZE * 12),
944                )
945                .is_err()
946        );
947
948        // Reading a different amount than the read-ahead size isn't allowed for middle offsets.
949        assert!(
950            compression_info
951                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE + 1),)
952                .is_err()
953        );
954        assert!(
955            compression_info
956                .compressed_range_for_uncompressed_range(&(0..COMPRESSED_BLOB_CHUNK_SIZE - 1),)
957                .is_err()
958        );
959        assert!(
960            compression_info
961                .compressed_range_for_uncompressed_range(
962                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 + 1),
963                )
964                .is_err()
965        );
966        assert!(
967            compression_info
968                .compressed_range_for_uncompressed_range(
969                    &(COMPRESSED_BLOB_CHUNK_SIZE..COMPRESSED_BLOB_CHUNK_SIZE * 2 - 1),
970                )
971                .is_err()
972        );
973
974        // Reading less than the read-ahead size for the last offset is allowed.
975        assert!(
976            compression_info
977                .compressed_range_for_uncompressed_range(
978                    &(COMPRESSED_BLOB_CHUNK_SIZE * 8..COMPRESSED_BLOB_CHUNK_SIZE * 8 + 4096),
979                )
980                .is_ok()
981        );
982    }
983
984    #[fuchsia::test]
985    fn test_read_ahead_size_for_chunk_size() {
986        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 32 * 1024), 32 * 1024);
987        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 32 * 1024), 48 * 1024);
988        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 32 * 1024), 64 * 1024);
989
990        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 64 * 1024), 64 * 1024);
991        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 64 * 1024), 48 * 1024);
992        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 64 * 1024), 64 * 1024);
993        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 64 * 1024), 96 * 1024);
994
995        assert_eq!(read_ahead_size_for_chunk_size(32 * 1024, 128 * 1024), 128 * 1024);
996        assert_eq!(read_ahead_size_for_chunk_size(48 * 1024, 128 * 1024), 96 * 1024);
997        assert_eq!(read_ahead_size_for_chunk_size(64 * 1024, 128 * 1024), 128 * 1024);
998        assert_eq!(read_ahead_size_for_chunk_size(96 * 1024, 128 * 1024), 96 * 1024);
999    }
1000
1001    fn build_compression_info(size: usize) -> (CompressionInfo, Vec<u8>, Vec<u8>) {
1002        let options =
1003            ChunkedArchiveOptions::V3 { compression_algorithm: CompressionAlgorithm::Lz4 };
1004        let mut compressor = options.compressor();
1005        let mut uncompressed_data = Vec::with_capacity(size);
1006        {
1007            let mut run_length = 1;
1008            let mut run_value: u8 = 0;
1009            while uncompressed_data.len() < size {
1010                uncompressed_data
1011                    .resize(std::cmp::min(uncompressed_data.len() + run_length, size), run_value);
1012                run_length = (run_length + 1) % 19 + 1;
1013                run_value = (run_value + 1) % 17;
1014            }
1015        }
1016        let mut compressed_offsets = vec![0];
1017        let mut compressed_data = vec![];
1018        for chunk in uncompressed_data.chunks(CHUNK_SIZE) {
1019            let mut compressed_chunk = compressor.compress(chunk, 0).unwrap();
1020            compressed_data.append(&mut compressed_chunk);
1021            compressed_offsets.push(compressed_data.len() as u64);
1022        }
1023        compressed_offsets.pop();
1024        (
1025            CompressionInfo::new(CHUNK_SIZE as u64, &compressed_offsets, CompressionAlgorithm::Lz4)
1026                .unwrap(),
1027            compressed_data,
1028            uncompressed_data,
1029        )
1030    }
1031
1032    #[fuchsia::test]
1033    fn test_compression_info_decompress_single_chunk() {
1034        let (compression_info, compressed_data, uncompressed_data) =
1035            build_compression_info(CHUNK_SIZE);
1036        let mut decompressed_data = vec![0u8; CHUNK_SIZE + 1];
1037
1038        compression_info
1039            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE], 0)
1040            .expect("failed to decompress");
1041        assert_eq!(uncompressed_data, decompressed_data[0..CHUNK_SIZE]);
1042
1043        // Too small of destination buffer.
1044        compression_info
1045            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1046            .expect_err("decompression should fail");
1047
1048        // Too large of destination buffer.
1049        compression_info
1050            .decompress(&compressed_data, &mut decompressed_data[0..CHUNK_SIZE - 1], 0)
1051            .expect_err("decompression should fail");
1052    }
1053
1054    #[fuchsia::test]
1055    fn test_compression_info_decompress_multiple_chunks() {
1056        fn slice_for_chunks<'a>(
1057            compressed_data: &'a [u8],
1058            compression_info: &CompressionInfo,
1059            chunks: Range<u64>,
1060        ) -> &'a [u8] {
1061            let (start, end) = compression_info
1062                .compressed_range_for_uncompressed_range(
1063                    &(chunks.start * CHUNK_SIZE as u64..chunks.end * CHUNK_SIZE as u64),
1064                )
1065                .unwrap();
1066            let end = end.map_or(compressed_data.len() as u64, NonZero::<u64>::get);
1067            &compressed_data[start as usize..end as usize]
1068        }
1069
1070        const BLOB_SIZE: usize = CHUNK_SIZE * 4 + 4096;
1071        let (compression_info, compressed_data, uncompressed_data) =
1072            build_compression_info(BLOB_SIZE);
1073        let mut decompressed_data = vec![0u8; BLOB_SIZE];
1074
1075        // Decompress the entire blob.
1076        compression_info
1077            .decompress(&compressed_data, &mut decompressed_data, 0)
1078            .expect("failed to decompress");
1079        assert_eq!(uncompressed_data, decompressed_data);
1080
1081        // Decompress just the whole chunks.
1082        compression_info
1083            .decompress(
1084                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1085                &mut decompressed_data[0..CHUNK_SIZE * 4],
1086                0,
1087            )
1088            .expect("failed to decompress");
1089        assert_eq!(&uncompressed_data[0..CHUNK_SIZE], &decompressed_data[0..CHUNK_SIZE]);
1090
1091        // Too small of destination buffer for whole chunks.
1092        compression_info
1093            .decompress(
1094                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1095                &mut decompressed_data[0..CHUNK_SIZE * 4 - 1],
1096                0,
1097            )
1098            .expect_err("decompression should fail");
1099
1100        // Too large of destination buffer for whole chunks.
1101        compression_info
1102            .decompress(
1103                slice_for_chunks(&compressed_data, &compression_info, 0..4),
1104                &mut decompressed_data[0..CHUNK_SIZE * 4 + 1],
1105                0,
1106            )
1107            .expect_err("decompression should fail");
1108
1109        // Decompress just the tail.
1110        let partial_chunk = slice_for_chunks(&compressed_data, &compression_info, 4..5);
1111        compression_info
1112            .decompress(partial_chunk, &mut decompressed_data[0..4096], CHUNK_SIZE as u64 * 4)
1113            .expect("failed to decompress");
1114        assert_eq!(&uncompressed_data[CHUNK_SIZE * 4..], &decompressed_data[0..4096]);
1115
1116        // Too small of destination buffer for the tail.
1117        compression_info
1118            .decompress(partial_chunk, &mut decompressed_data[0..4095], CHUNK_SIZE as u64 * 4)
1119            .expect_err("decompression should fail");
1120
1121        // Too large of destination buffer for the tail.
1122        compression_info
1123            .decompress(partial_chunk, &mut decompressed_data[0..4097], CHUNK_SIZE as u64 * 4)
1124            .expect_err("decompression should fail");
1125    }
1126
1127    #[fasync::run(10, test)]
1128    async fn test_refault_metric() {
1129        let fixture = new_blob_fixture().await;
1130        {
1131            let volume = fixture.volume().volume().clone();
1132            const FILE_SIZE: u64 = READ_AHEAD_SIZE * 4 - 4096;
1133            let data = vec![0xffu8; FILE_SIZE as usize];
1134            let hash = fixture.write_blob(&data, CompressionMode::Never).await;
1135
1136            let blob = fixture.get_blob(hash).await.unwrap();
1137            assert_eq!(blob.chunks_supplied.len(), 4);
1138            // Nothing has been read yet.
1139            assert_eq!(&blob.chunks_supplied.get(), &[false, false, false, false]);
1140
1141            blob.vmo.read_to_vec::<u8>(4096, 4096).unwrap();
1142
1143            assert_eq!(&blob.chunks_supplied.get(), &[true, false, false, false]);
1144
1145            blob.vmo.read_to_vec::<u8>(READ_AHEAD_SIZE * 2 + 4096, READ_AHEAD_SIZE).unwrap();
1146            assert_eq!(&blob.chunks_supplied.get(), &[true, false, true, true]);
1147
1148            // We have loaded pages, but only once each.
1149            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 0);
1150
1151            // Re-read some pages.
1152
1153            // We can't evict pages from the VMO to get the kernel to resupply them but we can call
1154            // page_in directly and wait for the counters to change.
1155            blob.clone().page_in(PageInRange::new(
1156                FILE_SIZE - READ_AHEAD_SIZE..FILE_SIZE,
1157                blob.clone(),
1158                Epoch::global().guard(),
1159            ));
1160            Epoch::global().barrier().await;
1161
1162            assert_eq!(volume.blob_resupplied_count().read(Ordering::SeqCst), 2);
1163        }
1164
1165        fixture.close().await;
1166    }
1167}