Skip to main content

fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, DirType, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey,
16    ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    AttributeId, Extent, HandleOptions, HandleOwner, RootDigest, StoreObjectHandle,
25    TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: AttributeId,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: AttributeId,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> AttributeId {
194        self.attribute_id
195    }
196
197    /// Consumes the `DataObjectHandle` and returns the `StoreObjectHandle` that it contained.
198    pub fn into_store_object_handle(self) -> StoreObjectHandle<S> {
199        self.handle
200    }
201
202    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
203        &self.overwrite_ranges
204    }
205
206    pub fn is_verified_file(&self) -> bool {
207        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
208    }
209
210    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
211    /// If another caller has already started but not completed `enabled_verity`, returns
212    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
213    /// FxfsError::AlreadyExists.
214    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
215        let mut fsverity_guard = self.fsverity_state.lock();
216        match *fsverity_guard {
217            FsverityState::None => {
218                *fsverity_guard = FsverityState::Started;
219                Ok(())
220            }
221            FsverityState::Started | FsverityState::Pending(_) => {
222                Err(anyhow!(FxfsError::Unavailable))
223            }
224            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
225        }
226    }
227
228    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
229    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
230    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
231        let mut fsverity_guard = self.fsverity_state.lock();
232        assert!(matches!(*fsverity_guard, FsverityState::Started));
233        *fsverity_guard = FsverityState::Pending(descriptor);
234    }
235
236    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
237    /// not `FsverityState::Pending(_)`.
238    pub fn finalize_fsverity_state(&self) {
239        let mut fsverity_state_guard = self.fsverity_state.lock();
240        let mut_fsverity_state = fsverity_state_guard.deref_mut();
241        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
242        match fsverity_state {
243            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
244            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
245            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
246            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
247        }
248        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
249        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
250        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
251        // converting them back to sparse regions.
252        self.overwrite_ranges.clear();
253    }
254
255    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
256    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
257    /// verified against the root digest here, and will return an error if the tree is not correct.
258    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
259        let (metadata, hasher) = match descriptor {
260            FsverityMetadata::Internal(root_digest, salt) => {
261                let merkle_tree = self
262                    .read_attr(AttributeId::FSVERITY_MERKLE)
263                    .await?
264                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
265                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
266                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
267                (metadata, hasher)
268            }
269            FsverityMetadata::F2fs(verity_range) => {
270                let expected_length = verity_range.length()? as usize;
271                let mut buffer = self
272                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
273                    .await;
274                ensure!(
275                    expected_length
276                        == self
277                            .handle
278                            .read(AttributeId::FSVERITY_MERKLE, verity_range.start, buffer.as_mut())
279                            .await?,
280                    FxfsError::Inconsistent
281                );
282                FsverityStateInner::from_bytes(
283                    buffer.as_slice()[0..expected_length].into(),
284                    self.block_size() as usize,
285                )?
286            }
287        };
288        // Validate the merkle tree data against the root before applying it.
289        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
290        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
291        let mut builder = MerkleTreeBuilder::new(hasher);
292        for leaf in leaf_chunks {
293            builder.push_data_hash(leaf.to_vec());
294        }
295        let tree = builder.finish();
296        let root_hash = match &metadata.root_digest {
297            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
298            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
299        };
300
301        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
302
303        let mut fsverity_guard = self.fsverity_state.lock();
304        assert!(matches!(*fsverity_guard, FsverityState::None));
305        *fsverity_guard = FsverityState::Some(metadata);
306
307        Ok(())
308    }
309
310    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
311    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
312    /// block-aligned. Fails on non fsverity-enabled files.
313    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
314        let block_size = self.block_size() as usize;
315        assert!(offset % block_size == 0);
316        let fsverity_state = self.fsverity_state.lock();
317        match &*fsverity_state {
318            FsverityState::None => {
319                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
320            }
321            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
322                "Enable verity has not yet completed, fsverity state: {:?}",
323                *fsverity_state
324            )),
325            FsverityState::Some(metadata) => {
326                let hasher = metadata.get_hasher_for_block_size(block_size);
327                let leaf_nodes: Vec<&[u8]> =
328                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
329                fxfs_trace::duration!("fsverity-verify", "len" => buffer.len());
330                // TODO(b/318880297): Consider parallelizing computation.
331                for b in buffer.chunks(block_size) {
332                    ensure!(
333                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
334                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
335                    );
336                    offset += block_size;
337                }
338                Ok(())
339            }
340        }
341    }
342
343    /// Extend the file with the given extent.  The only use case for this right now is for files
344    /// that must exist at certain offsets on the device, such as super-blocks.
345    pub async fn extend<'a>(
346        &'a self,
347        transaction: &mut Transaction<'a>,
348        device_range: Range<u64>,
349    ) -> Result<(), Error> {
350        let old_end =
351            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
352        let new_size = old_end + device_range.end - device_range.start;
353        self.store().allocator().mark_allocated(
354            transaction,
355            self.store().store_object_id(),
356            device_range.clone(),
357        )?;
358        self.txn_update_size(transaction, new_size, None).await?;
359        let key_id = self.get_key(None).await?.0;
360        transaction.add(
361            self.store().store_object_id,
362            Mutation::merge_object(
363                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
364                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
365            ),
366        );
367        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
368    }
369
370    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
371    // the data from `buf`.
372    async fn align_buffer(
373        &self,
374        offset: u64,
375        buf: BufferRef<'_>,
376    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
377        self.handle.align_buffer(self.attribute_id(), offset, buf).await
378    }
379
380    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
381    // data will be encrypted if necessary.
382    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
383    // the buffer in-place rather than copying to another buffer if the write is already aligned.
384    async fn write_at(
385        &self,
386        offset: u64,
387        buf: MutableBufferRef<'_>,
388        device_offset: u64,
389    ) -> Result<MaybeChecksums, Error> {
390        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
391    }
392
393    /// Verifies that the entire range in the file is zeroes, as either uninitialized overwrite
394    /// range, or no extent at all. If a single allocated and written extent is found, this returns
395    /// false.
396    pub async fn check_unwritten_zero(&self, range: Range<u64>) -> Result<bool, Error> {
397        let tree = &self.store().tree();
398        let layer_set = tree.layer_set();
399        let key = Extent(range);
400        let lower_bound = ObjectKey::attribute(
401            self.object_id(),
402            self.attribute_id,
403            AttributeKey::Extent(key.search_key()),
404        );
405        let mut merger = layer_set.merger();
406        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
407        while let Some(ItemRef {
408            key:
409                ObjectKey {
410                    object_id,
411                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
412                },
413            value: ObjectValue::Extent(value),
414            ..
415        }) = iter.get()
416            && *object_id == self.object_id()
417            && *attr_id == self.attribute_id
418        {
419            if let ExtentValue::Some { mode, .. } = value {
420                if let Some(overlap) = key.overlap(extent_key) {
421                    if let ExtentMode::OverwritePartial(bits) = mode {
422                        let starting_index = (overlap.start - extent_key.start) / self.block_size();
423                        for initialized in bits
424                            .iter()
425                            .skip(starting_index as usize)
426                            .take((overlap.length().unwrap() / self.block_size()) as usize)
427                        {
428                            if initialized {
429                                return Ok(false);
430                            }
431                        }
432                    } else {
433                        return Ok(false);
434                    }
435                } else {
436                    break;
437                }
438            }
439            iter.advance().await?;
440        }
441        Ok(true)
442    }
443
444    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
445    pub async fn zero(
446        &self,
447        transaction: &mut Transaction<'_>,
448        range: Range<u64>,
449    ) -> Result<(), Error> {
450        self.handle.zero(transaction, self.attribute_id(), range).await
451    }
452
453    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
454    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
455    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
456    pub fn get_descriptor(&self) -> Option<(fio::VerificationOptions, Vec<u8>)> {
457        let fsverity_state = self.fsverity_state.lock();
458        match &*fsverity_state {
459            FsverityState::Some(metadata) => {
460                let (options, root_hash) = match &metadata.root_digest {
461                    RootDigest::Sha256(root_hash) => (
462                        fio::VerificationOptions {
463                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
464                            salt: Some(metadata.salt.clone()),
465                            ..Default::default()
466                        },
467                        root_hash.to_vec(),
468                    ),
469                    RootDigest::Sha512(root_hash) => (
470                        fio::VerificationOptions {
471                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
472                            salt: Some(metadata.salt.clone()),
473                            ..Default::default()
474                        },
475                        root_hash.clone(),
476                    ),
477                };
478                Some((options, root_hash))
479            }
480            _ => None,
481        }
482    }
483
484    async fn build_verity_tree(
485        &self,
486        hasher: FsVerityHasher,
487        hash_alg: fio::HashAlgorithm,
488        salt: &[u8],
489    ) -> Result<(MerkleTree, Vec<u8>), Error> {
490        let hash_len = hasher.hash_size();
491        let mut builder = MerkleTreeBuilder::new(hasher);
492        let mut offset = 0;
493        let size = self.get_size();
494        // TODO(b/314836822): Consider further tuning the buffer size to optimize
495        // performance. Experimentally, most verity-enabled files are <256K.
496        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
497        while offset < size {
498            // TODO(b/314842875): Consider optimizations for sparse files.
499            let read = self.read(offset, buf.as_mut()).await? as u64;
500            assert!(offset + read <= size);
501            builder.write(&buf.as_slice()[0..read as usize]);
502            offset += read;
503        }
504        let tree = builder.finish();
505        // This will include a block for the root layer, which will be used to house the descriptor.
506        let tree_data_len = tree
507            .as_ref()
508            .iter()
509            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
510            .sum();
511        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
512        // Iterating from the top layers down to the leaves.
513        for layer in tree.as_ref().iter().rev() {
514            // Skip the root layer.
515            if layer.len() <= 1 {
516                continue;
517            }
518            merkle_tree_data.extend(layer.iter().flatten());
519            // Pad to the end of the block.
520            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
521            merkle_tree_data.resize(padded_size, 0);
522        }
523
524        // Zero the last block, then write the descriptor to the start of it.
525        let descriptor_offset = merkle_tree_data.len();
526        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
527        let descriptor = FsVerityDescriptorRaw::new(
528            hash_alg,
529            self.block_size(),
530            self.get_size(),
531            tree.root(),
532            salt,
533        )?;
534        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
535
536        Ok((tree, merkle_tree_data))
537    }
538
539    /// Reads the data attribute and computes a merkle tree from the data. The values of the
540    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
541    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
542    /// `AttributeId::FSVERITY_MERKLE`. Updates the root_hash of the `descriptor` according to the
543    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
544    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
545    #[trace]
546    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
547        self.set_fsverity_state_started()?;
548        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
549        // the graveyard should process the tombstone before we start rewriting the attribute.
550        if self
551            .store()
552            .tree()
553            .find(&ObjectKey::graveyard_attribute_entry(
554                self.store().graveyard_directory_object_id(),
555                self.object_id(),
556                AttributeId::FSVERITY_MERKLE,
557            ))
558            .await?
559            .is_some()
560        {
561            self.store().filesystem().graveyard().flush().await;
562        }
563        let mut transaction = self.new_transaction().await?;
564        let hash_alg =
565            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
566        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
567        let (root_digest, merkle_tree) = match hash_alg {
568            fio::HashAlgorithm::Sha256 => {
569                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
570                    salt.clone(),
571                    self.block_size() as usize,
572                ));
573                let (tree, merkle_tree_data) =
574                    self.build_verity_tree(hasher, hash_alg, &salt).await?;
575                let root: [u8; 32] = tree.root().try_into().unwrap();
576                (RootDigest::Sha256(root), merkle_tree_data)
577            }
578            fio::HashAlgorithm::Sha512 => {
579                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
580                    salt.clone(),
581                    self.block_size() as usize,
582                ));
583                let (tree, merkle_tree_data) =
584                    self.build_verity_tree(hasher, hash_alg, &salt).await?;
585                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
586            }
587            _ => {
588                bail!(
589                    anyhow!(FxfsError::NotSupported)
590                        .context(format!("hash algorithm not supported"))
591                );
592            }
593        };
594        // TODO(b/314194485): Eventually want streaming writes.
595        // The merkle tree attribute should not require trimming because it should not
596        // exist.
597        self.handle
598            .write_new_attr_in_batches(
599                &mut transaction,
600                AttributeId::FSVERITY_MERKLE,
601                &merkle_tree,
602                WRITE_ATTR_BATCH_SIZE,
603            )
604            .await?;
605        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
606            self.store().remove_attribute_from_graveyard(
607                &mut transaction,
608                self.object_id(),
609                AttributeId::FSVERITY_MERKLE,
610            );
611        };
612        let descriptor_decoded =
613            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
614        let descriptor = FsverityStateInner {
615            root_digest,
616            salt,
617            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
618        };
619        self.set_fsverity_state_pending(descriptor);
620        transaction.add_with_object(
621            self.store().store_object_id(),
622            Mutation::replace_or_insert_object(
623                ObjectKey::attribute(self.object_id(), AttributeId::DATA, AttributeKey::Attribute),
624                ObjectValue::verified_attribute(
625                    self.get_size(),
626                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
627                ),
628            ),
629            AssocObj::Borrowed(self),
630        );
631        transaction.commit().await?;
632        Ok(())
633    }
634
635    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
636    /// range is beyond the end of the file, the file size is updated.
637    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
638        debug_assert!(range.start < range.end);
639
640        // It's not required that callers of allocate use block aligned ranges, but we need to make
641        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
642        // what was asked for for block alignment purposes. We just need to make sure that the size
643        // of the file is still the non-block-aligned end of the range if the size was changed.
644        let mut new_range = range.clone();
645        new_range.start = round_down(new_range.start, self.block_size());
646        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
647        // required error code when the requested range is larger than the file size.
648        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
649
650        let mut transaction = self.new_transaction().await?;
651        let mut to_allocate = Vec::new();
652        let mut to_switch = Vec::new();
653        let key_id = self.get_key(None).await?.0;
654
655        {
656            let tree = &self.store().tree;
657            let layer_set = tree.layer_set();
658            let offset_key = ObjectKey::attribute(
659                self.object_id(),
660                self.attribute_id(),
661                AttributeKey::Extent(Extent::search_key_from_offset(new_range.start)),
662            );
663            let mut merger = layer_set.merger();
664            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
665
666            loop {
667                match iter.get() {
668                    Some(ItemRef {
669                        key:
670                            ObjectKey {
671                                object_id,
672                                data:
673                                    ObjectKeyData::Attribute(
674                                        attribute_id,
675                                        AttributeKey::Extent(extent_key),
676                                    ),
677                            },
678                        value: ObjectValue::Extent(extent_value),
679                        ..
680                    }) if *object_id == self.object_id()
681                        && *attribute_id == self.attribute_id() =>
682                    {
683                        // If the start of this extent is beyond the end of the range we are
684                        // allocating, we don't have any more work to do.
685                        if new_range.end <= extent_key.start {
686                            break;
687                        }
688                        // Add any prefix we might need to allocate.
689                        if new_range.start < extent_key.start {
690                            to_allocate.push(new_range.start..extent_key.start);
691                            new_range.start = extent_key.start;
692                        }
693                        let device_offset = match extent_value {
694                            ExtentValue::None => {
695                                // If the extent value is None, it indicates a deleted extent. In
696                                // that case, we just skip it entirely. By keeping the new_range
697                                // where it is, this section will get included in the new
698                                // allocations.
699                                iter.advance().await?;
700                                continue;
701                            }
702                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
703                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
704                                // If this extent is already in overwrite mode, we can skip it.
705                                if extent_key.end < new_range.end {
706                                    new_range.start = extent_key.end;
707                                    iter.advance().await?;
708                                    continue;
709                                } else {
710                                    new_range.start = new_range.end;
711                                    break;
712                                }
713                            }
714                            ExtentValue::Some { device_offset, .. } => *device_offset,
715                        };
716
717                        // Figure out how we have to break up the ranges.
718                        let device_offset = device_offset + (new_range.start - extent_key.start);
719                        if extent_key.end < new_range.end {
720                            to_switch.push((new_range.start..extent_key.end, device_offset));
721                            new_range.start = extent_key.end;
722                        } else {
723                            to_switch.push((new_range.start..new_range.end, device_offset));
724                            new_range.start = new_range.end;
725                            break;
726                        }
727                    }
728                    // The records are sorted so if we find something that isn't an extent or
729                    // doesn't match the object id then there are no more extent records for this
730                    // object.
731                    _ => break,
732                }
733                iter.advance().await?;
734            }
735        }
736
737        if new_range.start < new_range.end {
738            to_allocate.push(new_range.clone());
739        }
740
741        // We can update the size in the first transaction because even if subsequent transactions
742        // don't get replayed, the data between the current and new end of the file will be zero
743        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
744        // in the first transaction, overwrite extents may be written past the end of the file
745        // which is an fsck error.
746        //
747        // The potential new size needs to be the non-block-aligned range end - we round up to the
748        // nearest block size for the actual allocation, but shouldn't do that for the file size.
749        let new_size = std::cmp::max(range.end, self.get_size());
750        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
751        // first transaction, in case we split transactions. This makes it okay to only replay the
752        // first transaction if power loss occurs - the file will be in an unusual state, but not
753        // an invalid one, if only part of the allocate goes through.
754        transaction.add_with_object(
755            self.store().store_object_id(),
756            Mutation::replace_or_insert_object(
757                ObjectKey::attribute(
758                    self.object_id(),
759                    self.attribute_id(),
760                    AttributeKey::Attribute,
761                ),
762                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
763            ),
764            AssocObj::Borrowed(self),
765        );
766
767        // The maximum number of mutations we are going to allow per transaction in allocate. This
768        // is probably quite a bit lower than the actual limit, but it should be large enough to
769        // handle most non-edge-case versions of allocate without splitting the transaction.
770        const MAX_TRANSACTION_SIZE: usize = 256;
771        for (switch_range, device_offset) in to_switch {
772            transaction.add_with_object(
773                self.store().store_object_id(),
774                Mutation::merge_object(
775                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
776                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
777                        device_offset,
778                        key_id,
779                    )),
780                ),
781                AssocObj::Borrowed(self),
782            );
783            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
784                transaction.commit_and_continue().await?;
785            }
786        }
787
788        let mut allocated = 0;
789        let allocator = self.store().allocator();
790        for mut allocate_range in to_allocate {
791            while allocate_range.start < allocate_range.end {
792                let device_range = allocator
793                    .allocate(
794                        &mut transaction,
795                        self.store().store_object_id(),
796                        allocate_range.end - allocate_range.start,
797                    )
798                    .await
799                    .context("allocation failed")?;
800                let device_range_len = device_range.end - device_range.start;
801
802                transaction.add_with_object(
803                    self.store().store_object_id(),
804                    Mutation::merge_object(
805                        ObjectKey::extent(
806                            self.object_id(),
807                            self.attribute_id(),
808                            allocate_range.start..allocate_range.start + device_range_len,
809                        ),
810                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
811                            device_range.start,
812                            (device_range_len / self.block_size()) as usize,
813                            key_id,
814                        )),
815                    ),
816                    AssocObj::Borrowed(self),
817                );
818
819                allocate_range.start += device_range_len;
820                allocated += device_range_len;
821
822                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
823                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
824                    transaction.commit_and_continue().await?;
825                    allocated = 0;
826                }
827            }
828        }
829
830        self.update_allocated_size(&mut transaction, allocated, 0).await?;
831        transaction.commit().await?;
832
833        Ok(())
834    }
835
836    /// Return information on a contiguous set of extents that has the same allocation status,
837    /// starting from `start_offset`. The information returned is if this set of extents are marked
838    /// allocated/not allocated and also the size of this set (in bytes). This is used when
839    /// querying slices for volumes.
840    /// This function expects `start_offset` to be aligned to block size
841    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
842        let block_size = self.block_size();
843        assert_eq!(start_offset % block_size, 0);
844
845        if start_offset > self.get_size() {
846            bail!(FxfsError::OutOfRange)
847        }
848
849        if start_offset == self.get_size() {
850            return Ok((false, 0));
851        }
852
853        let tree = &self.store().tree;
854        let layer_set = tree.layer_set();
855        let offset_key = ObjectKey::attribute(
856            self.object_id(),
857            self.attribute_id(),
858            AttributeKey::Extent(Extent::search_key_from_offset(start_offset)),
859        );
860        let mut merger = layer_set.merger();
861        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
862
863        let mut allocated = None;
864        let mut end = start_offset;
865
866        loop {
867            // Iterate through the extents, each time setting `end` as the end of the previous
868            // extent
869            match iter.get() {
870                Some(ItemRef {
871                    key:
872                        ObjectKey {
873                            object_id,
874                            data:
875                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
876                        },
877                    value: ObjectValue::Extent(extent_value),
878                    ..
879                }) => {
880                    // Equivalent of getting no extents back
881                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
882                        if allocated == Some(false) || allocated.is_none() {
883                            end = self.get_size();
884                            allocated = Some(false);
885                        }
886                        break;
887                    }
888                    ensure!(extent_key.is_aligned(block_size), FxfsError::Inconsistent);
889                    if extent_key.start > end {
890                        // If a previous extent has already been visited and we are tracking an
891                        // allocated set, we are only interested in an extent where the range of the
892                        // current extent follows immediately after the previous one.
893                        if allocated == Some(true) {
894                            break;
895                        } else {
896                            // The gap between the previous `end` and this extent is not allocated
897                            end = extent_key.start;
898                            allocated = Some(false);
899                            // Continue this iteration, except now the `end` is set to the end of
900                            // the "previous" extent which is this gap between the start_offset
901                            // and the current extent
902                        }
903                    }
904
905                    // We can assume that from here, the `end` points to the end of a previous
906                    // extent.
907                    match extent_value {
908                        // The current extent has been allocated
909                        ExtentValue::Some { .. } => {
910                            // Stop searching if previous extent was marked deleted
911                            if allocated == Some(false) {
912                                break;
913                            }
914                            allocated = Some(true);
915                        }
916                        // This extent has been marked deleted
917                        ExtentValue::None => {
918                            // Stop searching if previous extent was marked allocated
919                            if allocated == Some(true) {
920                                break;
921                            }
922                            allocated = Some(false);
923                        }
924                    }
925                    end = extent_key.end;
926                }
927                // This occurs when there are no extents left
928                None => {
929                    if allocated == Some(false) || allocated.is_none() {
930                        end = self.get_size();
931                        allocated = Some(false);
932                    }
933                    // Otherwise, we were monitoring extents that were allocated, so just exit.
934                    break;
935                }
936                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
937                Some(_) => {}
938            }
939            iter.advance().await?;
940        }
941
942        Ok((allocated.unwrap(), end - start_offset))
943    }
944
945    pub async fn txn_write<'a>(
946        &'a self,
947        transaction: &mut Transaction<'a>,
948        offset: u64,
949        buf: BufferRef<'_>,
950    ) -> Result<(), Error> {
951        if buf.is_empty() {
952            return Ok(());
953        }
954        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
955        self.multi_write(
956            transaction,
957            self.attribute_id(),
958            std::slice::from_ref(&aligned),
959            transfer_buf.as_mut(),
960        )
961        .await?;
962        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
963            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
964        }
965        Ok(())
966    }
967
968    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
969    // if encryption takes place.  The ranges must all be aligned and no change to content size is
970    // applied; the caller is responsible for updating size if required.
971    pub async fn multi_write<'a>(
972        &'a self,
973        transaction: &mut Transaction<'a>,
974        attribute_id: AttributeId,
975        ranges: &[Range<u64>],
976        buf: MutableBufferRef<'_>,
977    ) -> Result<(), Error> {
978        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
979    }
980
981    // `buf` is mutable as an optimization, since the write may require encryption, we can
982    // encrypt the buffer in-place rather than copying to another buffer if the write is
983    // already aligned.
984    //
985    // Note: in the event of power failure during an overwrite() call, it is possible that
986    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
987    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
988    pub async fn overwrite(
989        &self,
990        mut offset: u64,
991        mut buf: MutableBufferRef<'_>,
992        options: OverwriteOptions,
993    ) -> Result<(), Error> {
994        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
995        let end = offset + buf.len() as u64;
996
997        let key_id = self.get_key(None).await?.0;
998
999        // The transaction only ends up being used if allow_allocations is true
1000        let mut transaction =
1001            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
1002
1003        // We build up a list of writes to perform later
1004        let writes = FuturesUnordered::new();
1005
1006        if options.barrier_on_first_write {
1007            self.store().device.barrier();
1008        }
1009
1010        // We create a new scope here, so that the merger iterator will get dropped before we try to
1011        // commit our transaction. Otherwise the transaction commit would block.
1012        {
1013            let store = self.store();
1014            let store_object_id = store.store_object_id;
1015            let allocator = store.allocator();
1016            let tree = &store.tree;
1017            let layer_set = tree.layer_set();
1018            let mut merger = layer_set.merger();
1019            let mut iter = merger
1020                .query(Query::FullRange(&ObjectKey::attribute(
1021                    self.object_id(),
1022                    self.attribute_id(),
1023                    AttributeKey::Extent(Extent::search_key_from_offset(offset)),
1024                )))
1025                .await?;
1026            let block_size = self.block_size();
1027
1028            loop {
1029                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
1030                    Some(ItemRef {
1031                        key:
1032                            ObjectKey {
1033                                object_id,
1034                                data:
1035                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1036                            },
1037                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
1038                        ..
1039                    }) if *object_id == self.object_id()
1040                        && *attribute_id == self.attribute_id()
1041                        && extent.end == offset =>
1042                    {
1043                        iter.advance().await?;
1044                        continue;
1045                    }
1046                    Some(ItemRef {
1047                        key:
1048                            ObjectKey {
1049                                object_id,
1050                                data:
1051                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1052                            },
1053                        value,
1054                        ..
1055                    }) if *object_id == self.object_id()
1056                        && *attribute_id == self.attribute_id()
1057                        && extent.start <= offset =>
1058                    {
1059                        match value {
1060                            ObjectValue::Extent(ExtentValue::Some {
1061                                device_offset,
1062                                mode: ExtentMode::Raw,
1063                                ..
1064                            }) => {
1065                                ensure!(
1066                                    extent.is_aligned(block_size)
1067                                        && device_offset % block_size == 0,
1068                                    FxfsError::Inconsistent
1069                                );
1070                                let offset_within_extent = offset - extent.start;
1071                                let remaining_length_of_extent = (extent
1072                                    .end
1073                                    .checked_sub(offset)
1074                                    .ok_or(FxfsError::Inconsistent)?)
1075                                    as usize;
1076                                // Yields (device_offset, bytes_to_write, should_advance)
1077                                (
1078                                    device_offset + offset_within_extent,
1079                                    min(buf.len(), remaining_length_of_extent),
1080                                    true,
1081                                )
1082                            }
1083                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1084                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1085                                // a new extent without checksums?
1086                                bail!(
1087                                    "extent from ({},{}) which overlaps offset \
1088                                        {} has the wrong extent mode",
1089                                    extent.start,
1090                                    extent.end,
1091                                    offset
1092                                )
1093                            }
1094                            _ => {
1095                                bail!(
1096                                    "overwrite failed: extent overlapping offset {} has \
1097                                      unexpected ObjectValue",
1098                                    offset
1099                                )
1100                            }
1101                        }
1102                    }
1103                    maybe_item_ref => {
1104                        if let Some(transaction) = transaction.as_mut() {
1105                            assert_eq!(options.allow_allocations, true);
1106                            assert_eq!(offset % self.block_size(), 0);
1107
1108                            // We are going to make a new extent, but let's check if there is an
1109                            // extent after us. If there is an extent after us, then we don't want
1110                            // our new extent to bump into it...
1111                            let mut bytes_to_allocate =
1112                                round_up(buf.len() as u64, self.block_size())
1113                                    .ok_or(FxfsError::TooBig)?;
1114                            if let Some(ItemRef {
1115                                key:
1116                                    ObjectKey {
1117                                        object_id,
1118                                        data:
1119                                            ObjectKeyData::Attribute(
1120                                                attribute_id,
1121                                                AttributeKey::Extent(extent),
1122                                            ),
1123                                    },
1124                                ..
1125                            }) = maybe_item_ref
1126                            {
1127                                if *object_id == self.object_id()
1128                                    && *attribute_id == self.attribute_id()
1129                                    && offset < extent.start
1130                                {
1131                                    let bytes_until_next_extent = extent.start - offset;
1132                                    bytes_to_allocate =
1133                                        min(bytes_to_allocate, bytes_until_next_extent);
1134                                }
1135                            }
1136
1137                            let device_range = allocator
1138                                .allocate(transaction, store_object_id, bytes_to_allocate)
1139                                .await?;
1140                            let device_range_len = device_range.end - device_range.start;
1141                            transaction.add(
1142                                store_object_id,
1143                                Mutation::insert_object(
1144                                    ObjectKey::extent(
1145                                        self.object_id(),
1146                                        self.attribute_id(),
1147                                        offset..offset + device_range_len,
1148                                    ),
1149                                    ObjectValue::Extent(ExtentValue::new_raw(
1150                                        device_range.start,
1151                                        key_id,
1152                                    )),
1153                                ),
1154                            );
1155
1156                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1157
1158                            // Yields (device_offset, bytes_to_write, should_advance)
1159                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1160                        } else {
1161                            bail!(
1162                                "no extent overlapping offset {}, \
1163                                and new allocations are not allowed",
1164                                offset
1165                            )
1166                        }
1167                    }
1168                };
1169                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1170                writes.push(self.write_at(offset, current_buf, device_offset));
1171                if remaining_buf.len() == 0 {
1172                    break;
1173                } else {
1174                    buf = remaining_buf;
1175                    offset += bytes_to_write as u64;
1176                    if should_advance {
1177                        iter.advance().await?;
1178                    }
1179                }
1180            }
1181        }
1182
1183        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1184        // The checksums are being ignored here, but we don't need to know them
1185        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1186
1187        if let Some(mut transaction) = transaction {
1188            assert_eq!(options.allow_allocations, true);
1189            if !transaction.is_empty() {
1190                if end > self.get_size() {
1191                    self.grow(&mut transaction, self.get_size(), end).await?;
1192                }
1193                transaction.commit().await?;
1194            }
1195        }
1196
1197        Ok(())
1198    }
1199
1200    // Within a transaction, the size of the object might have changed, so get the size from there
1201    // if it exists, otherwise, fall back on the cached size.
1202    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1203        transaction
1204            .get_object_mutation(
1205                self.store().store_object_id,
1206                ObjectKey::attribute(
1207                    self.object_id(),
1208                    self.attribute_id(),
1209                    AttributeKey::Attribute,
1210                ),
1211            )
1212            .and_then(|m| {
1213                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1214                    Some(size)
1215                } else {
1216                    None
1217                }
1218            })
1219            .unwrap_or_else(|| self.get_size())
1220    }
1221
1222    pub async fn txn_update_size<'a>(
1223        &'a self,
1224        transaction: &mut Transaction<'a>,
1225        new_size: u64,
1226        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1227        // Some it is set to the value, if None it is left unchanged.
1228        update_has_overwrite_extents: Option<bool>,
1229    ) -> Result<(), Error> {
1230        let key =
1231            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1232        let mut mutation = if let Some(mutation) =
1233            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1234        {
1235            mutation.clone()
1236        } else {
1237            ObjectStoreMutation {
1238                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1239                op: Operation::ReplaceOrInsert,
1240            }
1241        };
1242        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1243            *size = new_size;
1244            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1245                *has_overwrite_extents = update_has_overwrite_extents;
1246            }
1247        } else {
1248            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1249        }
1250        transaction.add_with_object(
1251            self.store().store_object_id(),
1252            Mutation::ObjectStore(mutation),
1253            AssocObj::Borrowed(self),
1254        );
1255        Ok(())
1256    }
1257
1258    async fn update_allocated_size(
1259        &self,
1260        transaction: &mut Transaction<'_>,
1261        allocated: u64,
1262        deallocated: u64,
1263    ) -> Result<(), Error> {
1264        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1265    }
1266
1267    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1268        if self
1269            .overwrite_ranges
1270            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1271        {
1272            // This returns true if there were ranges, but this truncate removed them all, which
1273            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1274            Ok(Some(false))
1275        } else {
1276            Ok(None)
1277        }
1278    }
1279
1280    pub async fn shrink<'a>(
1281        &'a self,
1282        transaction: &mut Transaction<'a>,
1283        size: u64,
1284        update_has_overwrite_extents: Option<bool>,
1285    ) -> Result<NeedsTrim, Error> {
1286        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1287        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1288        Ok(needs_trim)
1289    }
1290
1291    pub async fn grow<'a>(
1292        &'a self,
1293        transaction: &mut Transaction<'a>,
1294        old_size: u64,
1295        size: u64,
1296    ) -> Result<(), Error> {
1297        // Before growing the file, we must make sure that a previous trim has completed.
1298        let store = self.store();
1299        while matches!(
1300            store
1301                .trim_some(
1302                    transaction,
1303                    self.object_id(),
1304                    self.attribute_id(),
1305                    TrimMode::FromOffset(old_size)
1306                )
1307                .await?,
1308            TrimResult::Incomplete
1309        ) {
1310            transaction.commit_and_continue().await?;
1311        }
1312        // We might need to zero out the tail of the old last block.
1313        let block_size = self.block_size();
1314        if old_size % block_size != 0 {
1315            let layer_set = store.tree.layer_set();
1316            let mut merger = layer_set.merger();
1317            let aligned_old_size = round_down(old_size, block_size);
1318            let iter = merger
1319                .query(Query::FullRange(&ObjectKey::attribute(
1320                    self.object_id(),
1321                    self.attribute_id(),
1322                    AttributeKey::Extent(Extent::search_key_from_offset(aligned_old_size)),
1323                )))
1324                .await?;
1325            if let Some(ItemRef {
1326                key:
1327                    ObjectKey {
1328                        object_id,
1329                        data:
1330                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1331                    },
1332                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1333                ..
1334            }) = iter.get()
1335            {
1336                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1337                    let device_offset = device_offset
1338                        .checked_add(aligned_old_size - extent_key.start)
1339                        .ok_or(FxfsError::Inconsistent)?;
1340                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1341                    let mut buf = self.allocate_buffer(block_size as usize).await;
1342                    // In the case that this extent is in OverwritePartial mode, there is a
1343                    // possibility that the last block is allocated, but not initialized yet, in
1344                    // which case we don't actually need to bother zeroing out the tail. However,
1345                    // it's not strictly incorrect to change uninitialized data, so we skip the
1346                    // check and blindly do it to keep it simpler here.
1347                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1348                        .await?;
1349                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1350                    self.multi_write(
1351                        transaction,
1352                        *attribute_id,
1353                        &[aligned_old_size..aligned_old_size + block_size],
1354                        buf.as_mut(),
1355                    )
1356                    .await?;
1357                }
1358            }
1359        }
1360        self.txn_update_size(transaction, size, None).await?;
1361        Ok(())
1362    }
1363
1364    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1365    /// Returns a set of device ranges (i.e. potentially multiple extents).
1366    ///
1367    /// It may not be possible to preallocate the entire requested range in one request
1368    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1369    /// we can up to some (arbitrary, internal) limit on transaction size.
1370    ///
1371    /// `file_range.start` is modified to point at the end of the logical range
1372    /// that was preallocated such that repeated calls to `preallocate_range` with new
1373    /// transactions can be used to preallocate ranges of any size.
1374    ///
1375    /// Requested range must be a multiple of block size.
1376    pub async fn preallocate_range<'a>(
1377        &'a self,
1378        transaction: &mut Transaction<'a>,
1379        file_range: &mut Range<u64>,
1380    ) -> Result<Vec<Range<u64>>, Error> {
1381        let block_size = self.block_size();
1382        assert!(file_range.is_aligned(block_size));
1383        assert!(!self.handle.is_encrypted());
1384        let mut ranges = Vec::new();
1385        let tree = &self.store().tree;
1386        let layer_set = tree.layer_set();
1387        let mut merger = layer_set.merger();
1388        let mut iter = merger
1389            .query(Query::FullRange(&ObjectKey::attribute(
1390                self.object_id(),
1391                self.attribute_id(),
1392                AttributeKey::Extent(Extent::search_key_from_offset(file_range.start)),
1393            )))
1394            .await?;
1395        let mut allocated = 0;
1396        let key_id = self.get_key(None).await?.0;
1397        'outer: while file_range.start < file_range.end {
1398            let allocate_end = loop {
1399                match iter.get() {
1400                    // Case for allocated extents for the same object that overlap with file_range.
1401                    Some(ItemRef {
1402                        key:
1403                            ObjectKey {
1404                                object_id,
1405                                data:
1406                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1407                            },
1408                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1409                        ..
1410                    }) if *object_id == self.object_id()
1411                        && *attribute_id == self.attribute_id()
1412                        && extent.start < file_range.end =>
1413                    {
1414                        ensure!(
1415                            extent.is_valid()
1416                                && extent.is_aligned(block_size)
1417                                && device_offset % block_size == 0,
1418                            FxfsError::Inconsistent
1419                        );
1420                        // If the start of the requested file_range overlaps with an existing extent...
1421                        if extent.start <= file_range.start {
1422                            // Record the existing extent and move on.
1423                            let device_range = device_offset
1424                                .checked_add(file_range.start - extent.start)
1425                                .ok_or(FxfsError::Inconsistent)?
1426                                ..device_offset
1427                                    .checked_add(min(extent.end, file_range.end) - extent.start)
1428                                    .ok_or(FxfsError::Inconsistent)?;
1429                            file_range.start += device_range.end - device_range.start;
1430                            ranges.push(device_range);
1431                            if file_range.start >= file_range.end {
1432                                break 'outer;
1433                            }
1434                            iter.advance().await?;
1435                            continue;
1436                        } else {
1437                            // There's nothing allocated between file_range.start and the beginning
1438                            // of this extent.
1439                            break extent.start;
1440                        }
1441                    }
1442                    // Case for deleted extents eclipsed by file_range.
1443                    Some(ItemRef {
1444                        key:
1445                            ObjectKey {
1446                                object_id,
1447                                data:
1448                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1449                            },
1450                        value: ObjectValue::Extent(ExtentValue::None),
1451                        ..
1452                    }) if *object_id == self.object_id()
1453                        && *attribute_id == self.attribute_id()
1454                        && extent.end < file_range.end =>
1455                    {
1456                        iter.advance().await?;
1457                    }
1458                    _ => {
1459                        // We can just preallocate the rest.
1460                        break file_range.end;
1461                    }
1462                }
1463            };
1464            let device_range = self
1465                .store()
1466                .allocator()
1467                .allocate(
1468                    transaction,
1469                    self.store().store_object_id(),
1470                    allocate_end - file_range.start,
1471                )
1472                .await
1473                .context("Allocation failed")?;
1474            allocated += device_range.end - device_range.start;
1475            let this_file_range =
1476                file_range.start..file_range.start + device_range.end - device_range.start;
1477            file_range.start = this_file_range.end;
1478            transaction.add(
1479                self.store().store_object_id,
1480                Mutation::merge_object(
1481                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1482                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1483                ),
1484            );
1485            ranges.push(device_range);
1486            // If we didn't allocate all that we requested, we'll loop around and try again.
1487            // ... unless we have filled the transaction. The caller should check file_range.
1488            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1489                break;
1490            }
1491        }
1492        // Update the file size if it changed.
1493        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1494            self.txn_update_size(transaction, file_range.start, None).await?;
1495        }
1496        self.update_allocated_size(transaction, allocated, 0).await?;
1497        Ok(ranges)
1498    }
1499
1500    pub async fn update_attributes<'a>(
1501        &self,
1502        transaction: &mut Transaction<'a>,
1503        node_attributes: Option<&fio::MutableNodeAttributes>,
1504        change_time: Option<Timestamp>,
1505    ) -> Result<(), Error> {
1506        // This codepath is only called by files, whose wrapping key id users cannot directly set
1507        // as per fscrypt.
1508        ensure!(
1509            !matches!(
1510                node_attributes,
1511                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1512            ),
1513            FxfsError::BadPath
1514        );
1515        self.handle.update_attributes(transaction, node_attributes, change_time).await
1516    }
1517
1518    /// Get the default set of transaction options for this object. This is mostly the overall
1519    /// default, modified by any [`HandleOptions`] held by this handle.
1520    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1521        self.handle.default_transaction_options()
1522    }
1523
1524    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1525        self.new_transaction_with_options(self.default_transaction_options()).await
1526    }
1527
1528    pub async fn new_transaction_with_options<'b>(
1529        &self,
1530        options: Options<'b>,
1531    ) -> Result<Transaction<'b>, Error> {
1532        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1533    }
1534
1535    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1536    pub async fn flush_device(&self) -> Result<(), Error> {
1537        self.handle.flush_device().await
1538    }
1539
1540    /// Reads an entire attribute.
1541    pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1542        self.handle.read_attr(attribute_id).await
1543    }
1544
1545    /// Writes an entire attribute.  This *always* uses the volume data key.
1546    pub async fn write_attr(&self, attribute_id: AttributeId, data: &[u8]) -> Result<(), Error> {
1547        // Must be different attribute otherwise cached size gets out of date.
1548        assert_ne!(attribute_id, self.attribute_id());
1549        let store = self.store();
1550        let mut transaction = self.new_transaction().await?;
1551        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1552            transaction.commit_and_continue().await?;
1553            while matches!(
1554                store
1555                    .trim_some(
1556                        &mut transaction,
1557                        self.object_id(),
1558                        attribute_id,
1559                        TrimMode::FromOffset(data.len() as u64),
1560                    )
1561                    .await?,
1562                TrimResult::Incomplete
1563            ) {
1564                transaction.commit_and_continue().await?;
1565            }
1566        }
1567        transaction.commit().await?;
1568        Ok(())
1569    }
1570
1571    async fn read_and_decrypt(
1572        &self,
1573        device_offset: u64,
1574        file_offset: u64,
1575        buffer: MutableBufferRef<'_>,
1576        key_id: u64,
1577    ) -> Result<(), Error> {
1578        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1579    }
1580
1581    /// Truncates a file to a given size (growing/shrinking as required).
1582    ///
1583    /// Nb: Most code will want to call truncate() instead. This method is used
1584    /// to update the super block -- a case where we must borrow metadata space.
1585    pub async fn truncate_with_options(
1586        &self,
1587        options: Options<'_>,
1588        size: u64,
1589    ) -> Result<(), Error> {
1590        let mut transaction = self.new_transaction_with_options(options).await?;
1591        let old_size = self.get_size();
1592        if size == old_size {
1593            return Ok(());
1594        }
1595        if size < old_size {
1596            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1597            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1598                // The file needs to be trimmed.
1599                transaction.commit_and_continue().await?;
1600                let store = self.store();
1601                while matches!(
1602                    store
1603                        .trim_some(
1604                            &mut transaction,
1605                            self.object_id(),
1606                            self.attribute_id(),
1607                            TrimMode::FromOffset(size)
1608                        )
1609                        .await?,
1610                    TrimResult::Incomplete
1611                ) {
1612                    if let Err(error) = transaction.commit_and_continue().await {
1613                        warn!(error:?; "Failed to trim after truncate");
1614                        return Ok(());
1615                    }
1616                }
1617                if let Err(error) = transaction.commit().await {
1618                    warn!(error:?; "Failed to trim after truncate");
1619                }
1620                return Ok(());
1621            }
1622        } else {
1623            self.grow(&mut transaction, old_size, size).await?;
1624        }
1625        transaction.commit().await?;
1626        Ok(())
1627    }
1628
1629    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1630        // We don't take a read guard here since the object properties are contained in a single
1631        // object, which cannot be inconsistent with itself. The LSM tree does not return
1632        // intermediate states for a single object.
1633        let item = self
1634            .store()
1635            .tree
1636            .find(&ObjectKey::object(self.object_id()))
1637            .await?
1638            .expect("Unable to find object record");
1639        match item.value {
1640            ObjectValue::Object {
1641                kind: ObjectKind::File { refs, .. },
1642                attributes:
1643                    ObjectAttributes {
1644                        creation_time,
1645                        modification_time,
1646                        posix_attributes,
1647                        allocated_size,
1648                        access_time,
1649                        change_time,
1650                        ..
1651                    },
1652            } => Ok(ObjectProperties {
1653                refs,
1654                allocated_size,
1655                data_attribute_size: self.get_size(),
1656                creation_time,
1657                modification_time,
1658                access_time,
1659                change_time,
1660                sub_dirs: 0,
1661                posix_attributes,
1662                dir_type: DirType::Normal,
1663            }),
1664            _ => bail!(FxfsError::NotFile),
1665        }
1666    }
1667
1668    // Returns the contents of this object. This object must be < |limit| bytes in size.
1669    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1670        let size = self.get_size();
1671        if size > limit as u64 {
1672            bail!("Object too big ({} > {})", size, limit);
1673        }
1674        let mut buf = self.allocate_buffer(size as usize).await;
1675        self.read(0u64, buf.as_mut()).await?;
1676        Ok(buf.as_slice().into())
1677    }
1678
1679    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1680    /// their logical offset within the file.
1681    ///
1682    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1683    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1684        let mut extents = Vec::new();
1685        let tree = &self.store().tree;
1686        let layer_set = tree.layer_set();
1687        let mut merger = layer_set.merger();
1688        let mut iter = merger
1689            .query(Query::FullRange(&ObjectKey::attribute(
1690                self.object_id(),
1691                self.attribute_id(),
1692                AttributeKey::Extent(Extent::search_key_from_offset(0)),
1693            )))
1694            .await?;
1695        loop {
1696            match iter.get() {
1697                Some(ItemRef {
1698                    key:
1699                        ObjectKey {
1700                            object_id,
1701                            data:
1702                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1703                        },
1704                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1705                    ..
1706                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1707                    let logical_offset = extent.start;
1708                    let device_range = *device_offset..*device_offset + extent.length()?;
1709                    extents.push(FileExtent::new(logical_offset, device_range)?);
1710                }
1711                _ => break,
1712            }
1713            iter.advance().await?;
1714        }
1715        Ok(extents)
1716    }
1717}
1718
1719impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1720    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1721        match mutation {
1722            Mutation::ObjectStore(ObjectStoreMutation {
1723                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1724                ..
1725            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1726            Mutation::ObjectStore(ObjectStoreMutation {
1727                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1728                ..
1729            }) => {
1730                debug_assert_eq!(
1731                    self.get_size(),
1732                    *size,
1733                    "size should be set when verity is enabled and must not change"
1734                );
1735                self.finalize_fsverity_state()
1736            }
1737            Mutation::ObjectStore(ObjectStoreMutation {
1738                item:
1739                    ObjectItem {
1740                        key:
1741                            ObjectKey {
1742                                object_id,
1743                                data:
1744                                    ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent)),
1745                            },
1746                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1747                        ..
1748                    },
1749                ..
1750            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1751                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1752                    self.overwrite_ranges.apply_range(extent.clone().into())
1753                }
1754                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1755            },
1756            _ => {}
1757        }
1758    }
1759}
1760
1761impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1762    fn set_trace(&self, v: bool) {
1763        self.handle.set_trace(v)
1764    }
1765
1766    fn object_id(&self) -> u64 {
1767        self.handle.object_id()
1768    }
1769
1770    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1771        self.handle.allocate_buffer(size)
1772    }
1773
1774    fn block_size(&self) -> u64 {
1775        self.handle.block_size()
1776    }
1777}
1778
1779#[async_trait]
1780impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1781    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1782        let fs = self.store().filesystem();
1783        let guard = fs
1784            .lock_manager()
1785            .read_lock(lock_keys![LockKey::object_attribute(
1786                self.store().store_object_id,
1787                self.object_id(),
1788                self.attribute_id(),
1789            )])
1790            .await;
1791
1792        let size = self.get_size();
1793        if offset >= size {
1794            return Ok(0);
1795        }
1796        let length = min(buf.len() as u64, size - offset) as usize;
1797        buf = buf.subslice_mut(0..length);
1798        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1799        if self.is_verified_file() {
1800            self.verify_data(offset as usize, buf.as_slice())?;
1801        }
1802        Ok(length)
1803    }
1804
1805    fn get_size(&self) -> u64 {
1806        self.content_size.load(atomic::Ordering::Relaxed)
1807    }
1808}
1809
1810impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1811    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1812        let offset = offset.unwrap_or_else(|| self.get_size());
1813        let mut transaction = self.new_transaction().await?;
1814        self.txn_write(&mut transaction, offset, buf).await?;
1815        let new_size = self.txn_get_size(&transaction);
1816        transaction.commit().await?;
1817        Ok(new_size)
1818    }
1819
1820    async fn truncate(&self, size: u64) -> Result<(), Error> {
1821        self.truncate_with_options(self.default_transaction_options(), size).await
1822    }
1823
1824    async fn flush(&self) -> Result<(), Error> {
1825        Ok(())
1826    }
1827}
1828
1829/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1830/// write go directly to the handle in a transaction.
1831pub struct DirectWriter<'a, S: HandleOwner> {
1832    handle: &'a DataObjectHandle<S>,
1833    options: transaction::Options<'a>,
1834    buffer: Buffer<'a>,
1835    offset: u64,
1836    buf_offset: usize,
1837}
1838
1839const BUFFER_SIZE: usize = 1_048_576;
1840
1841impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1842    fn drop(&mut self) {
1843        if self.buf_offset != 0 {
1844            warn!("DirectWriter: dropping data, did you forget to call complete?");
1845        }
1846    }
1847}
1848
1849impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1850    pub async fn new(
1851        handle: &'a DataObjectHandle<S>,
1852        options: transaction::Options<'a>,
1853    ) -> DirectWriter<'a, S> {
1854        Self {
1855            handle,
1856            options,
1857            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1858            offset: 0,
1859            buf_offset: 0,
1860        }
1861    }
1862
1863    async fn flush(&mut self) -> Result<(), Error> {
1864        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1865        self.handle
1866            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1867            .await?;
1868        transaction.commit().await?;
1869        self.offset += self.buf_offset as u64;
1870        self.buf_offset = 0;
1871        Ok(())
1872    }
1873}
1874
1875impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1876    fn block_size(&self) -> u64 {
1877        self.handle.block_size()
1878    }
1879
1880    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1881        while buf.len() > 0 {
1882            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1883            self.buffer
1884                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1885                .as_mut_slice()
1886                .copy_from_slice(&buf[..to_do]);
1887            self.buf_offset += to_do;
1888            if self.buf_offset == BUFFER_SIZE {
1889                self.flush().await?;
1890            }
1891            buf = &buf[to_do..];
1892        }
1893        Ok(())
1894    }
1895
1896    async fn complete(&mut self) -> Result<(), Error> {
1897        self.flush().await?;
1898        Ok(())
1899    }
1900
1901    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1902        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1903            self.buffer
1904                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1905                .as_mut_slice()
1906                .fill(0);
1907            self.buf_offset += amount as usize;
1908        } else {
1909            self.flush().await?;
1910            self.offset += amount;
1911        }
1912        Ok(())
1913    }
1914
1915    /// The number of bytes written to this writer (including unflushed bytes).
1916    fn bytes_written(&self) -> u64 {
1917        self.offset + self.buf_offset as u64
1918    }
1919}
1920
1921#[cfg(test)]
1922mod tests {
1923    use crate::errors::FxfsError;
1924    use crate::filesystem::{
1925        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1926    };
1927    use crate::fsck::{
1928        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1929    };
1930    use crate::lsm_tree::Query;
1931    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1932    use crate::object_handle::{
1933        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1934    };
1935    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1936    use crate::object_store::directory::replace_child;
1937    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1938    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1939    use crate::object_store::volume::root_volume;
1940    use crate::object_store::{
1941        AttributeId, AttributeKey, DataObjectHandle, DirType, Directory, Extent, ExtentMode,
1942        ExtentValue, HandleOptions, LockKey, NewChildStoreOptions, ObjectKeyData, ObjectStore,
1943        PosixAttributes, StoreOptions, TRANSACTION_MUTATION_THRESHOLD,
1944    };
1945    use crate::range::RangeExt;
1946    use crate::round::{round_down, round_up};
1947    use assert_matches::assert_matches;
1948    use bit_vec::BitVec;
1949    use fidl_fuchsia_io as fio;
1950    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1951    use fuchsia_async as fasync;
1952    use fuchsia_sync::Mutex;
1953    use futures::FutureExt;
1954    use futures::channel::oneshot::channel;
1955    use futures::stream::{FuturesUnordered, StreamExt};
1956    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1957    use fxfs_insecure_crypto::new_insecure_crypt;
1958    use std::ops::Range;
1959    use std::sync::Arc;
1960    use std::time::Duration;
1961    use storage_device::DeviceHolder;
1962    use storage_device::fake_device::FakeDevice;
1963
1964    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1965
1966    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1967    // device block.
1968    const TEST_DATA_OFFSET: u64 = 5000;
1969    const TEST_DATA: &[u8] = b"hello";
1970    const TEST_OBJECT_SIZE: u64 = 5678;
1971    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1972    const TEST_OBJECT_NAME: &str = "foo";
1973
1974    async fn test_filesystem() -> OpenFxFilesystem {
1975        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1976        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1977    }
1978
1979    async fn create_object_with_key(
1980        fs: Arc<FxFilesystem>,
1981        crypt: Option<&dyn Crypt>,
1982        write_object_test_data: bool,
1983    ) -> DataObjectHandle<ObjectStore> {
1984        let store = fs.root_store();
1985        let object;
1986
1987        let mut transaction = fs
1988            .clone()
1989            .new_transaction(
1990                lock_keys![LockKey::object(
1991                    store.store_object_id(),
1992                    store.root_directory_object_id()
1993                )],
1994                Options::default(),
1995            )
1996            .await
1997            .expect("new_transaction failed");
1998
1999        object = if let Some(crypt) = crypt {
2000            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
2001            let (key, unwrapped_key) =
2002                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
2003            ObjectStore::create_object_with_key(
2004                &store,
2005                &mut transaction,
2006                object_id,
2007                HandleOptions::default(),
2008                EncryptionKey::Fxfs(key),
2009                unwrapped_key,
2010            )
2011            .await
2012            .expect("create_object failed")
2013        } else {
2014            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2015                .await
2016                .expect("create_object failed")
2017        };
2018
2019        let root_directory =
2020            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2021        root_directory
2022            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2023            .await
2024            .expect("add_child_file failed");
2025
2026        if write_object_test_data {
2027            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
2028            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
2029            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
2030            object
2031                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
2032                .await
2033                .expect("write failed");
2034        }
2035        transaction.commit().await.expect("commit failed");
2036        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2037        object
2038    }
2039
2040    async fn test_filesystem_and_object_with_key(
2041        crypt: Option<&dyn Crypt>,
2042        write_object_test_data: bool,
2043    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2044        let fs = test_filesystem().await;
2045        let object = create_object_with_key(fs.clone(), crypt, write_object_test_data).await;
2046        (fs, object)
2047    }
2048
2049    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2050        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2051    }
2052
2053    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2054    {
2055        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2056    }
2057
2058    #[fuchsia::test]
2059    async fn test_zero_buf_len_read() {
2060        let (fs, object) = test_filesystem_and_object().await;
2061        let mut buf = object.allocate_buffer(0).await;
2062        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2063        fs.close().await.expect("Close failed");
2064    }
2065
2066    #[fuchsia::test]
2067    async fn test_beyond_eof_read() {
2068        let (fs, object) = test_filesystem_and_object().await;
2069        let offset = TEST_OBJECT_SIZE as usize - 2;
2070        let align = offset % fs.block_size() as usize;
2071        let len: usize = 2;
2072        let mut buf = object.allocate_buffer(align + len + 1).await;
2073        buf.as_mut_slice().fill(123u8);
2074        assert_eq!(
2075            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2076            align + len
2077        );
2078        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2079        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2080        fs.close().await.expect("Close failed");
2081    }
2082
2083    #[fuchsia::test]
2084    async fn test_beyond_eof_read_from() {
2085        let (fs, object) = test_filesystem_and_object().await;
2086        let handle = &*object;
2087        let offset = TEST_OBJECT_SIZE as usize - 2;
2088        let align = offset % fs.block_size() as usize;
2089        let len: usize = 2;
2090        let mut buf = object.allocate_buffer(align + len + 1).await;
2091        buf.as_mut_slice().fill(123u8);
2092        assert_eq!(
2093            handle
2094                .read(AttributeId::DATA, (offset - align) as u64, buf.as_mut())
2095                .await
2096                .expect("read failed"),
2097            align + len
2098        );
2099        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2100        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2101        fs.close().await.expect("Close failed");
2102    }
2103
2104    #[fuchsia::test]
2105    async fn test_beyond_eof_read_unchecked() {
2106        let (fs, object) = test_filesystem_and_object().await;
2107        let offset = TEST_OBJECT_SIZE as usize - 2;
2108        let align = offset % fs.block_size() as usize;
2109        let len: usize = 2;
2110        let mut buf = object.allocate_buffer(align + len + 1).await;
2111        buf.as_mut_slice().fill(123u8);
2112        let guard = fs
2113            .lock_manager()
2114            .read_lock(lock_keys![LockKey::object_attribute(
2115                object.store().store_object_id,
2116                object.object_id(),
2117                AttributeId::DATA,
2118            )])
2119            .await;
2120        object
2121            .read_unchecked(AttributeId::DATA, (offset - align) as u64, buf.as_mut(), &guard)
2122            .await
2123            .expect("read failed");
2124        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2125        fs.close().await.expect("Close failed");
2126    }
2127
2128    #[fuchsia::test]
2129    async fn test_read_sparse() {
2130        let (fs, object) = test_filesystem_and_object().await;
2131        // Deliberately read not right to eof.
2132        let len = TEST_OBJECT_SIZE as usize - 1;
2133        let mut buf = object.allocate_buffer(len).await;
2134        buf.as_mut_slice().fill(123u8);
2135        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2136        let mut expected = vec![0; len];
2137        let offset = TEST_DATA_OFFSET as usize;
2138        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2139        assert_eq!(buf.as_slice()[..len], expected[..]);
2140        fs.close().await.expect("Close failed");
2141    }
2142
2143    #[fuchsia::test]
2144    async fn test_read_after_writes_interspersed_with_flush() {
2145        let (fs, object) = test_filesystem_and_object().await;
2146
2147        object.owner().flush().await.expect("flush failed");
2148
2149        // Write more test data to the first block fo the file.
2150        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2151        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2152        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2153
2154        let len = TEST_OBJECT_SIZE as usize - 1;
2155        let mut buf = object.allocate_buffer(len).await;
2156        buf.as_mut_slice().fill(123u8);
2157        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2158
2159        let mut expected = vec![0u8; len];
2160        let offset = TEST_DATA_OFFSET as usize;
2161        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2162        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2163        assert_eq!(buf.as_slice(), &expected);
2164        fs.close().await.expect("Close failed");
2165    }
2166
2167    #[fuchsia::test]
2168    async fn test_read_after_truncate_and_extend() {
2169        let (fs, object) = test_filesystem_and_object().await;
2170
2171        // Arrange for there to be <extent><deleted-extent><extent>.
2172        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2173        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2174        // This adds an extent at 0..512.
2175        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2176        // This deletes 512..1024.
2177        object.truncate(3).await.expect("truncate failed");
2178        let data = b"foo";
2179        let offset = 1500u64;
2180        let align = (offset % fs.block_size() as u64) as usize;
2181        let mut buf = object.allocate_buffer(align + data.len()).await;
2182        buf.as_mut_slice()[align..].copy_from_slice(data);
2183        // This adds 1024..1536.
2184        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2185
2186        const LEN1: usize = 1503;
2187        let mut buf = object.allocate_buffer(LEN1).await;
2188        buf.as_mut_slice().fill(123u8);
2189        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2190        let mut expected = [0; LEN1];
2191        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2192        expected[1500..].copy_from_slice(b"foo");
2193        assert_eq!(buf.as_slice(), &expected);
2194
2195        // Also test a read that ends midway through the deleted extent.
2196        const LEN2: usize = 601;
2197        let mut buf = object.allocate_buffer(LEN2).await;
2198        buf.as_mut_slice().fill(123u8);
2199        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2200        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2201        fs.close().await.expect("Close failed");
2202    }
2203
2204    #[fuchsia::test]
2205    async fn test_read_whole_blocks_with_multiple_objects() {
2206        let (fs, object) = test_filesystem_and_object().await;
2207        let block_size = object.block_size() as usize;
2208        let mut buffer = object.allocate_buffer(block_size).await;
2209        buffer.as_mut_slice().fill(0xaf);
2210        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2211
2212        let store = object.owner();
2213        let mut transaction = fs
2214            .clone()
2215            .new_transaction(lock_keys![], Options::default())
2216            .await
2217            .expect("new_transaction failed");
2218        let object2 =
2219            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2220                .await
2221                .expect("create_object failed");
2222        transaction.commit().await.expect("commit failed");
2223        let mut ef_buffer = object.allocate_buffer(block_size).await;
2224        ef_buffer.as_mut_slice().fill(0xef);
2225        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2226
2227        let mut buffer = object.allocate_buffer(block_size).await;
2228        buffer.as_mut_slice().fill(0xaf);
2229        object
2230            .write_or_append(Some(block_size as u64), buffer.as_ref())
2231            .await
2232            .expect("write failed");
2233        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2234        object2
2235            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2236            .await
2237            .expect("write failed");
2238
2239        let mut buffer = object.allocate_buffer(4 * block_size).await;
2240        buffer.as_mut_slice().fill(123);
2241        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2242        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2243        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2244        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2245        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2246        fs.close().await.expect("Close failed");
2247    }
2248
2249    #[fuchsia::test]
2250    async fn test_alignment() {
2251        let (fs, object) = test_filesystem_and_object().await;
2252
2253        struct AlignTest {
2254            fill: u8,
2255            object: DataObjectHandle<ObjectStore>,
2256            mirror: Vec<u8>,
2257        }
2258
2259        impl AlignTest {
2260            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2261                let mirror = {
2262                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2263                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2264                    buf.as_slice().to_vec()
2265                };
2266                Self { fill: 0, object, mirror }
2267            }
2268
2269            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2270            // operation to an in-memory copy of the object.
2271            // Each subsequent call bumps the value of fill.
2272            // It is expected that the object and its mirror maintain identical content.
2273            async fn test(&mut self, range: Range<u64>) {
2274                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2275                self.fill += 1;
2276                buf.as_mut_slice().fill(self.fill);
2277                self.object
2278                    .write_or_append(Some(range.start), buf.as_ref())
2279                    .await
2280                    .expect("write_or_append failed");
2281                if range.end > self.mirror.len() as u64 {
2282                    self.mirror.resize(range.end as usize, 0);
2283                }
2284                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2285                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2286                assert_eq!(
2287                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2288                    self.mirror.len()
2289                );
2290                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2291            }
2292        }
2293
2294        let block_size = object.block_size() as u64;
2295        let mut align = AlignTest::new(object).await;
2296
2297        // Fill the object to start with (with 1).
2298        align.test(0..2 * block_size + 1).await;
2299
2300        // Unaligned head (fills with 2, overwrites that with 3).
2301        align.test(1..block_size).await;
2302        align.test(1..2 * block_size).await;
2303
2304        // Unaligned tail (fills with 4 and 5).
2305        align.test(0..block_size - 1).await;
2306        align.test(0..2 * block_size - 1).await;
2307
2308        // Both unaligned (fills with 6 and 7).
2309        align.test(1..block_size - 1).await;
2310        align.test(1..2 * block_size - 1).await;
2311
2312        fs.close().await.expect("Close failed");
2313    }
2314
2315    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2316        let allocator = fs.allocator();
2317        let allocated_before = allocator.get_allocated_bytes();
2318        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2319        object
2320            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2321            .await
2322            .expect("preallocate_range failed");
2323        transaction.commit().await.expect("commit failed");
2324        assert!(object.get_size() < 1048576);
2325        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2326        object
2327            .preallocate_range(&mut transaction, &mut (0..1048576))
2328            .await
2329            .expect("preallocate_range failed");
2330        transaction.commit().await.expect("commit failed");
2331        assert_eq!(object.get_size(), 1048576);
2332        // Check that it didn't reallocate the space for the existing extent
2333        let allocated_after = allocator.get_allocated_bytes();
2334        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2335
2336        let mut buf = object
2337            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2338            .await;
2339        buf.as_mut_slice().fill(47);
2340        object
2341            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2342            .await
2343            .expect("write failed");
2344        buf.as_mut_slice().fill(95);
2345        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2346        object
2347            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2348            .await
2349            .expect("write failed");
2350
2351        // Make sure there were no more allocations.
2352        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2353
2354        // Read back the data and make sure it is what we expect.
2355        let mut buf = object.allocate_buffer(104876).await;
2356        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2357        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2358        assert_eq!(
2359            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2360            TEST_DATA
2361        );
2362        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2363    }
2364
2365    #[fuchsia::test]
2366    async fn test_preallocate_range() {
2367        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2368        test_preallocate_common(&fs, object).await;
2369        fs.close().await.expect("Close failed");
2370    }
2371
2372    // This is identical to the previous test except that we flush so that extents end up in
2373    // different layers.
2374    #[fuchsia::test]
2375    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2376        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2377        object.owner().flush().await.expect("flush failed");
2378        test_preallocate_common(&fs, object).await;
2379        fs.close().await.expect("Close failed");
2380    }
2381
2382    #[fuchsia::test]
2383    async fn test_already_preallocated() {
2384        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2385        let allocator = fs.allocator();
2386        let allocated_before = allocator.get_allocated_bytes();
2387        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2388        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2389        object
2390            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2391            .await
2392            .expect("preallocate_range failed");
2393        transaction.commit().await.expect("commit failed");
2394        // Check that it didn't reallocate any new space.
2395        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2396        fs.close().await.expect("Close failed");
2397    }
2398
2399    #[fuchsia::test]
2400    async fn test_overwrite_when_preallocated_at_start_of_file() {
2401        // The standard test data we put in the test object would cause an extent with checksums
2402        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2403        let (fs, object) = test_filesystem_and_empty_object().await;
2404
2405        let object = ObjectStore::open_object(
2406            object.owner(),
2407            object.object_id(),
2408            HandleOptions::default(),
2409            None,
2410        )
2411        .await
2412        .expect("open_object failed");
2413
2414        assert_eq!(fs.block_size(), 4096);
2415
2416        let mut write_buf = object.allocate_buffer(4096).await;
2417        write_buf.as_mut_slice().fill(95);
2418
2419        // First try to overwrite without allowing allocations
2420        // We expect this to fail, since nothing is allocated yet
2421        object
2422            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2423            .await
2424            .expect_err("overwrite succeeded");
2425
2426        // Now preallocate some space (exactly one block)
2427        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2428        object
2429            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2430            .await
2431            .expect("preallocate_range failed");
2432        transaction.commit().await.expect("commit failed");
2433
2434        // Now try the same overwrite command as before, it should work this time,
2435        // even with allocations disabled...
2436        {
2437            let mut read_buf = object.allocate_buffer(4096).await;
2438            object.read(0, read_buf.as_mut()).await.expect("read failed");
2439            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2440        }
2441        object
2442            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2443            .await
2444            .expect("overwrite failed");
2445        {
2446            let mut read_buf = object.allocate_buffer(4096).await;
2447            object.read(0, read_buf.as_mut()).await.expect("read failed");
2448            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2449        }
2450
2451        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2452        // one block earlier at offset 0
2453        object
2454            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2455            .await
2456            .expect_err("overwrite succeeded");
2457
2458        // We can't assert anything about the existing bytes, because they haven't been allocated
2459        // yet and they could contain any values
2460        object
2461            .overwrite(
2462                4096,
2463                write_buf.as_mut(),
2464                OverwriteOptions { allow_allocations: true, ..Default::default() },
2465            )
2466            .await
2467            .expect("overwrite failed");
2468        {
2469            let mut read_buf = object.allocate_buffer(4096).await;
2470            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2471            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2472        }
2473
2474        // Check that the overwrites haven't messed up the filesystem state
2475        let fsck_options = FsckOptions {
2476            fail_on_warning: true,
2477            no_lock: true,
2478            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2479            ..Default::default()
2480        };
2481        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2482
2483        fs.close().await.expect("Close failed");
2484    }
2485
2486    #[fuchsia::test]
2487    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2488        // The standard test data we put in the test object would cause an extent with checksums
2489        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2490        let (fs, object) = test_filesystem_and_empty_object().await;
2491
2492        let object = ObjectStore::open_object(
2493            object.owner(),
2494            object.object_id(),
2495            HandleOptions::default(),
2496            None,
2497        )
2498        .await
2499        .expect("open_object failed");
2500
2501        assert_eq!(fs.block_size(), 4096);
2502        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2503
2504        // Let's create some non-holes
2505        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2506        object
2507            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2508            .await
2509            .expect("preallocate_range failed");
2510        object
2511            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2512            .await
2513            .expect("preallocate_range failed");
2514        object
2515            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2516            .await
2517            .expect("preallocate_range failed");
2518        object
2519            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2520            .await
2521            .expect("preallocate_range failed");
2522        transaction.commit().await.expect("commit failed");
2523
2524        assert_eq!(object.get_size(), 524288);
2525
2526        let mut write_buf = object.allocate_buffer(4096).await;
2527        write_buf.as_mut_slice().fill(95);
2528
2529        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2530        object
2531            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2532            .await
2533            .expect_err("overwrite succeeded");
2534        object
2535            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2536            .await
2537            .expect_err("overwrite succeeded");
2538        object
2539            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2540            .await
2541            .expect_err("overwrite succeeded");
2542        object
2543            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2544            .await
2545            .expect_err("overwrite succeeded");
2546
2547        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2548        {
2549            let mut read_buf = object.allocate_buffer(4096).await;
2550            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2551            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2552        }
2553        object
2554            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2555            .await
2556            .expect("overwrite failed");
2557        {
2558            let mut read_buf = object.allocate_buffer(4096).await;
2559            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2560            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2561        }
2562        {
2563            let mut read_buf = object.allocate_buffer(4096).await;
2564            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2565            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2566        }
2567        object
2568            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2569            .await
2570            .expect("overwrite failed");
2571        {
2572            let mut read_buf = object.allocate_buffer(4096).await;
2573            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2574            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2575        }
2576        {
2577            let mut read_buf = object.allocate_buffer(4096).await;
2578            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2579            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2580        }
2581        object
2582            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2583            .await
2584            .expect("overwrite failed");
2585        {
2586            let mut read_buf = object.allocate_buffer(4096).await;
2587            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2588            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2589        }
2590        {
2591            let mut read_buf = object.allocate_buffer(4096).await;
2592            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2593            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2594        }
2595        object
2596            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2597            .await
2598            .expect("overwrite failed");
2599        {
2600            let mut read_buf = object.allocate_buffer(4096).await;
2601            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2602            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2603        }
2604
2605        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2606        let mut huge_write_buf = object.allocate_buffer(524288).await;
2607        huge_write_buf.as_mut_slice().fill(96);
2608
2609        // With allocations disabled, the big overwrite should fail...
2610        object
2611            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2612            .await
2613            .expect_err("overwrite succeeded");
2614        // ... but it should work when allocations are enabled
2615        object
2616            .overwrite(
2617                0,
2618                huge_write_buf.as_mut(),
2619                OverwriteOptions { allow_allocations: true, ..Default::default() },
2620            )
2621            .await
2622            .expect("overwrite failed");
2623        {
2624            let mut read_buf = object.allocate_buffer(524288).await;
2625            object.read(0, read_buf.as_mut()).await.expect("read failed");
2626            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2627        }
2628
2629        // Check that the overwrites haven't messed up the filesystem state
2630        let fsck_options = FsckOptions {
2631            fail_on_warning: true,
2632            no_lock: true,
2633            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2634            ..Default::default()
2635        };
2636        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2637
2638        fs.close().await.expect("Close failed");
2639    }
2640
2641    #[fuchsia::test]
2642    async fn test_overwrite_when_unallocated_at_start_of_file() {
2643        // The standard test data we put in the test object would cause an extent with checksums
2644        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2645        let (fs, object) = test_filesystem_and_empty_object().await;
2646
2647        let object = ObjectStore::open_object(
2648            object.owner(),
2649            object.object_id(),
2650            HandleOptions::default(),
2651            None,
2652        )
2653        .await
2654        .expect("open_object failed");
2655
2656        assert_eq!(fs.block_size(), 4096);
2657
2658        let mut write_buf = object.allocate_buffer(4096).await;
2659        write_buf.as_mut_slice().fill(95);
2660
2661        // First try to overwrite without allowing allocations
2662        // We expect this to fail, since nothing is allocated yet
2663        object
2664            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2665            .await
2666            .expect_err("overwrite succeeded");
2667
2668        // Now try the same overwrite command as before, but allow allocations
2669        object
2670            .overwrite(
2671                0,
2672                write_buf.as_mut(),
2673                OverwriteOptions { allow_allocations: true, ..Default::default() },
2674            )
2675            .await
2676            .expect("overwrite failed");
2677        {
2678            let mut read_buf = object.allocate_buffer(4096).await;
2679            object.read(0, read_buf.as_mut()).await.expect("read failed");
2680            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2681        }
2682
2683        // Now try to overwrite at the next block. This should fail if allocations are disabled
2684        object
2685            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2686            .await
2687            .expect_err("overwrite succeeded");
2688
2689        // ... but it should work if allocations are enabled
2690        object
2691            .overwrite(
2692                4096,
2693                write_buf.as_mut(),
2694                OverwriteOptions { allow_allocations: true, ..Default::default() },
2695            )
2696            .await
2697            .expect("overwrite failed");
2698        {
2699            let mut read_buf = object.allocate_buffer(4096).await;
2700            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2701            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2702        }
2703
2704        // Check that the overwrites haven't messed up the filesystem state
2705        let fsck_options = FsckOptions {
2706            fail_on_warning: true,
2707            no_lock: true,
2708            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2709            ..Default::default()
2710        };
2711        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2712
2713        fs.close().await.expect("Close failed");
2714    }
2715
2716    #[fuchsia::test]
2717    async fn test_overwrite_can_extend_a_file() {
2718        // The standard test data we put in the test object would cause an extent with checksums
2719        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2720        let (fs, object) = test_filesystem_and_empty_object().await;
2721
2722        let object = ObjectStore::open_object(
2723            object.owner(),
2724            object.object_id(),
2725            HandleOptions::default(),
2726            None,
2727        )
2728        .await
2729        .expect("open_object failed");
2730
2731        assert_eq!(fs.block_size(), 4096);
2732        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2733
2734        let mut write_buf = object.allocate_buffer(4096).await;
2735        write_buf.as_mut_slice().fill(95);
2736
2737        // Let's try to fill up the last block, and increase the file size in doing so
2738        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2739
2740        // Expected to fail with allocations disabled
2741        object
2742            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2743            .await
2744            .expect_err("overwrite succeeded");
2745        // ... but expected to succeed with allocations enabled
2746        object
2747            .overwrite(
2748                last_block_offset,
2749                write_buf.as_mut(),
2750                OverwriteOptions { allow_allocations: true, ..Default::default() },
2751            )
2752            .await
2753            .expect("overwrite failed");
2754        {
2755            let mut read_buf = object.allocate_buffer(4096).await;
2756            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2757            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2758        }
2759
2760        assert_eq!(object.get_size(), 8192);
2761
2762        // Let's try to write at the next block, too
2763        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2764
2765        // Expected to fail with allocations disabled
2766        object
2767            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2768            .await
2769            .expect_err("overwrite succeeded");
2770        // ... but expected to succeed with allocations enabled
2771        object
2772            .overwrite(
2773                next_block_offset,
2774                write_buf.as_mut(),
2775                OverwriteOptions { allow_allocations: true, ..Default::default() },
2776            )
2777            .await
2778            .expect("overwrite failed");
2779        {
2780            let mut read_buf = object.allocate_buffer(4096).await;
2781            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2782            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2783        }
2784
2785        assert_eq!(object.get_size(), 12288);
2786
2787        // Check that the overwrites haven't messed up the filesystem state
2788        let fsck_options = FsckOptions {
2789            fail_on_warning: true,
2790            no_lock: true,
2791            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2792            ..Default::default()
2793        };
2794        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2795
2796        fs.close().await.expect("Close failed");
2797    }
2798
2799    #[fuchsia::test]
2800    async fn test_enable_verity() {
2801        let fs: OpenFxFilesystem = test_filesystem().await;
2802        let mut transaction = fs
2803            .clone()
2804            .new_transaction(lock_keys![], Options::default())
2805            .await
2806            .expect("new_transaction failed");
2807        let store = fs.root_store();
2808        let object = Arc::new(
2809            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2810                .await
2811                .expect("create_object failed"),
2812        );
2813
2814        transaction.commit().await.unwrap();
2815
2816        object
2817            .enable_verity(fio::VerificationOptions {
2818                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2819                salt: Some(vec![]),
2820                ..Default::default()
2821            })
2822            .await
2823            .expect("set verified file metadata failed");
2824
2825        let handle =
2826            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2827                .await
2828                .expect("open_object failed");
2829
2830        assert!(handle.is_verified_file());
2831
2832        fs.close().await.expect("Close failed");
2833    }
2834
2835    #[fuchsia::test]
2836    async fn test_enable_verity_large_file() {
2837        // Need to make a large FakeDevice to create space for a 67 MB file.
2838        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2839        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2840        let root_store = fs.root_store();
2841        let mut transaction = fs
2842            .clone()
2843            .new_transaction(lock_keys![], Options::default())
2844            .await
2845            .expect("new_transaction failed");
2846
2847        let handle = ObjectStore::create_object(
2848            &root_store,
2849            &mut transaction,
2850            HandleOptions::default(),
2851            None,
2852        )
2853        .await
2854        .expect("failed to create object");
2855        transaction.commit().await.expect("commit failed");
2856        let mut offset = 0;
2857
2858        // Write a file big enough to trigger multiple transactions on enable_verity().
2859        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2860        buf.as_mut_slice().fill(1);
2861        for _ in 0..130 {
2862            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2863            offset += WRITE_ATTR_BATCH_SIZE as u64;
2864        }
2865
2866        handle
2867            .enable_verity(fio::VerificationOptions {
2868                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2869                salt: Some(vec![]),
2870                ..Default::default()
2871            })
2872            .await
2873            .expect("set verified file metadata failed");
2874
2875        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2876        offset = 0;
2877        for _ in 0..130 {
2878            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2879            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2880            offset += WRITE_ATTR_BATCH_SIZE as u64;
2881        }
2882
2883        fsck(fs.clone()).await.expect("fsck failed");
2884        fs.close().await.expect("Close failed");
2885    }
2886
2887    #[fuchsia::test]
2888    async fn test_retry_enable_verity_on_reboot() {
2889        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2890        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2891        let root_store = fs.root_store();
2892        let mut transaction = fs
2893            .clone()
2894            .new_transaction(lock_keys![], Options::default())
2895            .await
2896            .expect("new_transaction failed");
2897
2898        let handle = ObjectStore::create_object(
2899            &root_store,
2900            &mut transaction,
2901            HandleOptions::default(),
2902            None,
2903        )
2904        .await
2905        .expect("failed to create object");
2906        transaction.commit().await.expect("commit failed");
2907
2908        let object_id = {
2909            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2910            transaction.add(
2911                root_store.store_object_id(),
2912                Mutation::replace_or_insert_object(
2913                    ObjectKey::graveyard_attribute_entry(
2914                        root_store.graveyard_directory_object_id(),
2915                        handle.object_id(),
2916                        AttributeId::FSVERITY_MERKLE,
2917                    ),
2918                    ObjectValue::Some,
2919                ),
2920            );
2921
2922            // This write should span three transactions. This test mimics the behavior when the
2923            // last transaction gets interrupted by a filesystem.close().
2924            handle
2925                .write_new_attr_in_batches(
2926                    &mut transaction,
2927                    AttributeId::FSVERITY_MERKLE,
2928                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2929                    WRITE_ATTR_BATCH_SIZE,
2930                )
2931                .await
2932                .expect("failed to write merkle attribute");
2933
2934            handle.object_id()
2935            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2936            // release the transaction locks.
2937        };
2938
2939        fs.close().await.expect("failed to close filesystem");
2940        let device = fs.take_device().await;
2941        device.reopen(false);
2942
2943        let fs =
2944            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2945        fsck(fs.clone()).await.expect("fsck failed");
2946        fs.close().await.expect("failed to close filesystem");
2947        let device = fs.take_device().await;
2948        device.reopen(false);
2949
2950        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2951        let fs = FxFilesystem::open(device).await.expect("open failed");
2952        let root_store = fs.root_store();
2953        let handle =
2954            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2955                .await
2956                .expect("open_object failed");
2957        handle
2958            .enable_verity(fio::VerificationOptions {
2959                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2960                salt: Some(vec![]),
2961                ..Default::default()
2962            })
2963            .await
2964            .expect("set verified file metadata failed");
2965
2966        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2967        // isn't strictly necessary for the test to pass (the graveyard marker was already
2968        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2969        // graveyard entry not being removed upon processing.
2970        fs.graveyard().flush().await;
2971        assert!(
2972            FsVerityDescriptor::from_bytes(
2973                &handle
2974                    .read_attr(AttributeId::FSVERITY_MERKLE)
2975                    .await
2976                    .expect("read_attr failed")
2977                    .expect("No attr found"),
2978                handle.block_size() as usize
2979            )
2980            .is_ok()
2981        );
2982        fsck(fs.clone()).await.expect("fsck failed");
2983        fs.close().await.expect("Close failed");
2984    }
2985
2986    #[fuchsia::test]
2987    async fn test_verify_data_corrupt_file() {
2988        let fs: OpenFxFilesystem = test_filesystem().await;
2989        let mut transaction = fs
2990            .clone()
2991            .new_transaction(lock_keys![], Options::default())
2992            .await
2993            .expect("new_transaction failed");
2994        let store = fs.root_store();
2995        let object = Arc::new(
2996            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2997                .await
2998                .expect("create_object failed"),
2999        );
3000
3001        transaction.commit().await.unwrap();
3002
3003        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3004        buf.as_mut_slice().fill(123);
3005        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3006
3007        object
3008            .enable_verity(fio::VerificationOptions {
3009                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3010                salt: Some(vec![]),
3011                ..Default::default()
3012            })
3013            .await
3014            .expect("set verified file metadata failed");
3015
3016        // Change file contents and ensure verification fails
3017        buf.as_mut_slice().fill(234);
3018        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3019        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
3020
3021        fs.close().await.expect("Close failed");
3022    }
3023
3024    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
3025    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
3026    // paths.
3027    #[fuchsia::test]
3028    async fn test_parse_f2fs_verity() {
3029        let fs: OpenFxFilesystem = test_filesystem().await;
3030        let mut transaction = fs
3031            .clone()
3032            .new_transaction(lock_keys![], Options::default())
3033            .await
3034            .expect("new_transaction failed");
3035        let store = fs.root_store();
3036        let object = Arc::new(
3037            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3038                .await
3039                .expect("create_object failed"),
3040        );
3041
3042        transaction.commit().await.unwrap();
3043        let file_size = fs.block_size() * 2;
3044        // Write over one block to make there be leaf hashes.
3045        {
3046            let mut buf = object.allocate_buffer(file_size as usize).await;
3047            buf.as_mut_slice().fill(64);
3048            assert_eq!(
3049                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3050                file_size
3051            );
3052        }
3053
3054        // Enable verity normally, then shift the type.
3055        object
3056            .enable_verity(fio::VerificationOptions {
3057                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3058                salt: Some(vec![]),
3059                ..Default::default()
3060            })
3061            .await
3062            .expect("set verified file metadata failed");
3063        let (verity_info, root_hash) = object.get_descriptor().unwrap();
3064
3065        let mut transaction = fs
3066            .clone()
3067            .new_transaction(
3068                lock_keys![LockKey::Object {
3069                    store_object_id: store.store_object_id(),
3070                    object_id: object.object_id()
3071                }],
3072                Options::default(),
3073            )
3074            .await
3075            .expect("new_transaction failed");
3076        transaction.add(
3077            store.store_object_id(),
3078            Mutation::replace_or_insert_object(
3079                ObjectKey::attribute(
3080                    object.object_id(),
3081                    AttributeId::DATA,
3082                    AttributeKey::Attribute,
3083                ),
3084                ObjectValue::verified_attribute(
3085                    file_size,
3086                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3087                ),
3088            ),
3089        );
3090        transaction.add(
3091            store.store_object_id(),
3092            Mutation::replace_or_insert_object(
3093                ObjectKey::attribute(
3094                    object.object_id(),
3095                    AttributeId::FSVERITY_MERKLE,
3096                    AttributeKey::Attribute,
3097                ),
3098                ObjectValue::attribute(fs.block_size() * 2, false),
3099            ),
3100        );
3101        {
3102            let descriptor = FsVerityDescriptorRaw::new(
3103                fio::HashAlgorithm::Sha256,
3104                fs.block_size(),
3105                file_size,
3106                root_hash.as_slice(),
3107                match &verity_info.salt {
3108                    Some(salt) => salt.as_slice(),
3109                    None => [0u8; 0].as_slice(),
3110                },
3111            )
3112            .expect("Creating descriptor");
3113            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3114            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3115            object
3116                .multi_write(
3117                    &mut transaction,
3118                    AttributeId::FSVERITY_MERKLE,
3119                    &[fs.block_size()..(fs.block_size() * 2)],
3120                    buf.as_mut(),
3121                )
3122                .await
3123                .expect("Writing descriptor");
3124        }
3125        transaction.commit().await.unwrap();
3126
3127        let handle =
3128            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3129                .await
3130                .expect("open_object failed");
3131
3132        assert!(handle.is_verified_file());
3133
3134        let mut buf = object.allocate_buffer(file_size as usize).await;
3135        assert_eq!(
3136            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3137            file_size as usize
3138        );
3139
3140        fs.close().await.expect("Close failed");
3141    }
3142
3143    #[fuchsia::test]
3144    async fn test_verify_data_corrupt_tree() {
3145        let fs: OpenFxFilesystem = test_filesystem().await;
3146        let object_id = {
3147            let store = fs.root_store();
3148            let mut transaction = fs
3149                .clone()
3150                .new_transaction(lock_keys![], Options::default())
3151                .await
3152                .expect("new_transaction failed");
3153            let object = Arc::new(
3154                ObjectStore::create_object(
3155                    &store,
3156                    &mut transaction,
3157                    HandleOptions::default(),
3158                    None,
3159                )
3160                .await
3161                .expect("create_object failed"),
3162            );
3163            let object_id = object.object_id();
3164
3165            transaction.commit().await.unwrap();
3166
3167            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3168            buf.as_mut_slice().fill(123);
3169            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3170
3171            object
3172                .enable_verity(fio::VerificationOptions {
3173                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3174                    salt: Some(vec![]),
3175                    ..Default::default()
3176                })
3177                .await
3178                .expect("set verified file metadata failed");
3179            object.read(0, buf.as_mut()).await.expect("verified read");
3180
3181            // Corrupt the merkle tree before closing.
3182            let mut merkle = object
3183                .read_attr(AttributeId::FSVERITY_MERKLE)
3184                .await
3185                .unwrap()
3186                .expect("Reading merkle tree");
3187            merkle[0] = merkle[0].wrapping_add(1);
3188            object
3189                .write_attr(AttributeId::FSVERITY_MERKLE, &*merkle)
3190                .await
3191                .expect("Overwriting merkle");
3192
3193            object_id
3194        }; // Close object.
3195
3196        // Reopening the object should complain about the corrupted merkle tree.
3197        assert!(
3198            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3199                .await
3200                .is_err()
3201        );
3202        fs.close().await.expect("Close failed");
3203    }
3204
3205    #[fuchsia::test]
3206    async fn test_extend() {
3207        let fs = test_filesystem().await;
3208        let handle;
3209        let mut transaction = fs
3210            .clone()
3211            .new_transaction(lock_keys![], Options::default())
3212            .await
3213            .expect("new_transaction failed");
3214        let store = fs.root_store();
3215        handle =
3216            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3217                .await
3218                .expect("create_object failed");
3219
3220        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3221        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3222        // of 2MiB here.
3223        const START_OFFSET: u64 = 2048 * 1024;
3224        handle
3225            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3226            .await
3227            .expect("extend failed");
3228        transaction.commit().await.expect("commit failed");
3229        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3230        buf.as_mut_slice().fill(123);
3231        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3232        buf.as_mut_slice().fill(67);
3233        handle.read(0, buf.as_mut()).await.expect("read failed");
3234        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3235        fs.close().await.expect("Close failed");
3236    }
3237
3238    #[fuchsia::test]
3239    async fn test_truncate_deallocates_old_extents() {
3240        let (fs, object) = test_filesystem_and_object().await;
3241        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3242        buf.as_mut_slice().fill(0xaa);
3243        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3244
3245        let allocator = fs.allocator();
3246        let allocated_before = allocator.get_allocated_bytes();
3247        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3248        let allocated_after = allocator.get_allocated_bytes();
3249        assert!(
3250            allocated_after < allocated_before,
3251            "before = {} after = {}",
3252            allocated_before,
3253            allocated_after
3254        );
3255        fs.close().await.expect("Close failed");
3256    }
3257
3258    #[fuchsia::test]
3259    async fn test_truncate_zeroes_tail_block() {
3260        let (fs, object) = test_filesystem_and_object().await;
3261
3262        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3263        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3264            .await
3265            .expect("truncate failed");
3266
3267        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3268        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3269        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3270
3271        let mut expected = TEST_DATA.to_vec();
3272        expected[3..].fill(0);
3273        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3274    }
3275
3276    #[fuchsia::test]
3277    async fn test_trim() {
3278        // Format a new filesystem.
3279        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3280        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3281        let block_size = fs.block_size();
3282        root_volume(fs.clone())
3283            .await
3284            .expect("root_volume failed")
3285            .new_volume("test", NewChildStoreOptions::default())
3286            .await
3287            .expect("volume failed");
3288        fs.close().await.expect("close failed");
3289        let device = fs.take_device().await;
3290        device.reopen(false);
3291
3292        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3293        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3294        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3295        // graveyard trims the file as expected.
3296        #[derive(Default)]
3297        struct Context {
3298            store: Option<Arc<ObjectStore>>,
3299            object_id: Option<u64>,
3300        }
3301        let shared_context = Arc::new(Mutex::new(Context::default()));
3302
3303        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3304
3305        // Wait for an object to get tombstoned by the graveyard.
3306        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3307            loop {
3308                if let Err(e) =
3309                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3310                {
3311                    assert!(
3312                        FxfsError::NotFound.matches(&e),
3313                        "open_object didn't fail with NotFound: {:?}",
3314                        e
3315                    );
3316                    break;
3317                }
3318                // The graveyard should eventually tombstone the object.
3319                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3320            }
3321        }
3322
3323        // Checks to see if the object needs to be trimmed.
3324        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3325            let root_directory = Directory::open(store, store.root_directory_object_id())
3326                .await
3327                .expect("open failed");
3328            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3329            if let Some((oid, _, _)) = oid {
3330                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3331                    .await
3332                    .expect("open_object failed");
3333                let props = object.get_properties().await.expect("get_properties failed");
3334                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3335                    Some(object)
3336                } else {
3337                    None
3338                }
3339            } else {
3340                None
3341            }
3342        }
3343
3344        let shared_context_clone = shared_context.clone();
3345        let post_commit = move || {
3346            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3347            let shared_context = shared_context_clone.clone();
3348            async move {
3349                // First run fsck on the current filesystem.
3350                let options = FsckOptions {
3351                    fail_on_warning: true,
3352                    no_lock: true,
3353                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3354                    ..Default::default()
3355                };
3356                let fs = store.filesystem();
3357
3358                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3359                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3360                    .await
3361                    .expect("fsck_volume_with_options failed");
3362
3363                // Now check that we can replay this correctly.
3364                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3365                    .await
3366                    .expect("sync failed");
3367                let device = fs.device().snapshot().expect("snapshot failed");
3368
3369                let object_id = shared_context.lock().object_id.clone();
3370
3371                let fs2 = FxFilesystemBuilder::new()
3372                    .skip_initial_reap(object_id.is_none())
3373                    .open(device)
3374                    .await
3375                    .expect("open failed");
3376
3377                // If the "foo" file exists check that allocated size matches content size.
3378                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3379                let store =
3380                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3381
3382                if let Some(oid) = object_id {
3383                    // For the second pass, the object should get tombstoned.
3384                    expect_tombstoned(&store, oid).await;
3385                } else if let Some(object) = needs_trim(&store).await {
3386                    // Extend the file and make sure that it is correctly trimmed.
3387                    object.truncate(object_size).await.expect("truncate failed");
3388                    let mut buf = object.allocate_buffer(block_size as usize).await;
3389                    object
3390                        .read(object_size - block_size * 2, buf.as_mut())
3391                        .await
3392                        .expect("read failed");
3393                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3394
3395                    // Remount, this time with the graveyard performing an initial reap and the
3396                    // object should get trimmed.
3397                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3398                        .await
3399                        .expect("open failed");
3400                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3401                    let store = root_vol
3402                        .volume("test", StoreOptions::default())
3403                        .await
3404                        .expect("volume failed");
3405                    while needs_trim(&store).await.is_some() {
3406                        // The object has been truncated, but still has some data allocated to
3407                        // it.  The graveyard should trim the object eventually.
3408                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3409                    }
3410
3411                    // Run fsck.
3412                    fsck_with_options(fs.clone(), &options)
3413                        .await
3414                        .expect("fsck_with_options failed");
3415                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3416                        .await
3417                        .expect("fsck_volume_with_options failed");
3418                    fs.close().await.expect("close failed");
3419                }
3420
3421                // Run fsck on fs2.
3422                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3423                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3424                    .await
3425                    .expect("fsck_volume_with_options failed");
3426                fs2.close().await.expect("close failed");
3427            }
3428            .boxed()
3429        };
3430
3431        let fs = FxFilesystemBuilder::new()
3432            .post_commit_hook(post_commit)
3433            .open(device)
3434            .await
3435            .expect("open failed");
3436
3437        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3438        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3439
3440        shared_context.lock().store = Some(store.clone());
3441
3442        let root_directory =
3443            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3444
3445        let object;
3446        let mut transaction = fs
3447            .clone()
3448            .new_transaction(
3449                lock_keys![LockKey::object(
3450                    store.store_object_id(),
3451                    store.root_directory_object_id()
3452                )],
3453                Options::default(),
3454            )
3455            .await
3456            .expect("new_transaction failed");
3457        object = root_directory
3458            .create_child_file(&mut transaction, "foo")
3459            .await
3460            .expect("create_object failed");
3461        transaction.commit().await.expect("commit failed");
3462
3463        let mut transaction = fs
3464            .clone()
3465            .new_transaction(
3466                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3467                Options::default(),
3468            )
3469            .await
3470            .expect("new_transaction failed");
3471
3472        // Two passes: first with a regular object, and then with that object moved into the
3473        // graveyard.
3474        let mut pass = 0;
3475        loop {
3476            // Create enough extents in it such that when we truncate the object it will require
3477            // more than one transaction.
3478            let mut buf = object.allocate_buffer(5).await;
3479            buf.as_mut_slice().fill(1);
3480            // Write every other block.
3481            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3482                object
3483                    .txn_write(&mut transaction, offset, buf.as_ref())
3484                    .await
3485                    .expect("write failed");
3486            }
3487            transaction.commit().await.expect("commit failed");
3488            // This should take up more than one transaction.
3489            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3490
3491            if pass == 1 {
3492                break;
3493            }
3494
3495            // Store the object ID so that we can make sure the object is always tombstoned
3496            // after remount (see above).
3497            shared_context.lock().object_id = Some(object.object_id());
3498
3499            transaction = fs
3500                .clone()
3501                .new_transaction(
3502                    lock_keys![
3503                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3504                        LockKey::object(store.store_object_id(), object.object_id()),
3505                    ],
3506                    Options::default(),
3507                )
3508                .await
3509                .expect("new_transaction failed");
3510
3511            // Move the object into the graveyard.
3512            replace_child(&mut transaction, None, (&root_directory, "foo"))
3513                .await
3514                .expect("replace_child failed");
3515            store.add_to_graveyard(&mut transaction, object.object_id());
3516
3517            pass += 1;
3518        }
3519
3520        fs.close().await.expect("Close failed");
3521    }
3522
3523    #[fuchsia::test]
3524    async fn test_adjust_refs() {
3525        let (fs, object) = test_filesystem_and_object().await;
3526        let store = object.owner();
3527        let mut transaction = fs
3528            .clone()
3529            .new_transaction(
3530                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3531                Options::default(),
3532            )
3533            .await
3534            .expect("new_transaction failed");
3535        assert_eq!(
3536            store
3537                .adjust_refs(&mut transaction, object.object_id(), 1)
3538                .await
3539                .expect("adjust_refs failed"),
3540            false
3541        );
3542        transaction.commit().await.expect("commit failed");
3543
3544        let allocator = fs.allocator();
3545        let allocated_before = allocator.get_allocated_bytes();
3546        let mut transaction = fs
3547            .clone()
3548            .new_transaction(
3549                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3550                Options::default(),
3551            )
3552            .await
3553            .expect("new_transaction failed");
3554        assert_eq!(
3555            store
3556                .adjust_refs(&mut transaction, object.object_id(), -2)
3557                .await
3558                .expect("adjust_refs failed"),
3559            true
3560        );
3561        transaction.commit().await.expect("commit failed");
3562
3563        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3564
3565        store
3566            .tombstone_object(
3567                object.object_id(),
3568                Options { borrow_metadata_space: true, ..Default::default() },
3569            )
3570            .await
3571            .expect("purge failed");
3572
3573        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3574
3575        // We need to remove the directory entry, too, otherwise fsck will complain
3576        {
3577            let mut transaction = fs
3578                .clone()
3579                .new_transaction(
3580                    lock_keys![LockKey::object(
3581                        store.store_object_id(),
3582                        store.root_directory_object_id()
3583                    )],
3584                    Options::default(),
3585                )
3586                .await
3587                .expect("new_transaction failed");
3588            let root_directory = Directory::open(&store, store.root_directory_object_id())
3589                .await
3590                .expect("open failed");
3591            transaction.add(
3592                store.store_object_id(),
3593                Mutation::replace_or_insert_object(
3594                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, DirType::Normal),
3595                    ObjectValue::None,
3596                ),
3597            );
3598            transaction.commit().await.expect("commit failed");
3599        }
3600
3601        fsck_with_options(
3602            fs.clone(),
3603            &FsckOptions {
3604                fail_on_warning: true,
3605                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3606                ..Default::default()
3607            },
3608        )
3609        .await
3610        .expect("fsck_with_options failed");
3611
3612        fs.close().await.expect("Close failed");
3613    }
3614
3615    #[fuchsia::test]
3616    async fn test_locks() {
3617        let (fs, object) = test_filesystem_and_object().await;
3618        let (send1, recv1) = channel();
3619        let (send2, recv2) = channel();
3620        let (send3, recv3) = channel();
3621        let done = Mutex::new(false);
3622        let mut futures = FuturesUnordered::new();
3623        futures.push(
3624            async {
3625                let mut t = object.new_transaction().await.expect("new_transaction failed");
3626                send1.send(()).unwrap(); // Tell the next future to continue.
3627                send3.send(()).unwrap(); // Tell the last future to continue.
3628                recv2.await.unwrap();
3629                let mut buf = object.allocate_buffer(5).await;
3630                buf.as_mut_slice().copy_from_slice(b"hello");
3631                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3632                // This is a halting problem so all we can do is sleep.
3633                fasync::Timer::new(Duration::from_millis(100)).await;
3634                assert!(!*done.lock());
3635                t.commit().await.expect("commit failed");
3636            }
3637            .boxed(),
3638        );
3639        futures.push(
3640            async {
3641                recv1.await.unwrap();
3642                // Reads should not block.
3643                let offset = TEST_DATA_OFFSET as usize;
3644                let align = offset % fs.block_size() as usize;
3645                let len = TEST_DATA.len();
3646                let mut buf = object.allocate_buffer(align + len).await;
3647                assert_eq!(
3648                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3649                    align + TEST_DATA.len()
3650                );
3651                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3652                // Tell the first future to continue.
3653                send2.send(()).unwrap();
3654            }
3655            .boxed(),
3656        );
3657        futures.push(
3658            async {
3659                // This should block until the first future has completed.
3660                recv3.await.unwrap();
3661                let _t = object.new_transaction().await.expect("new_transaction failed");
3662                let mut buf = object.allocate_buffer(5).await;
3663                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3664                assert_eq!(buf.as_slice(), b"hello");
3665            }
3666            .boxed(),
3667        );
3668        while let Some(()) = futures.next().await {}
3669        fs.close().await.expect("Close failed");
3670    }
3671
3672    #[fuchsia::test(threads = 10)]
3673    async fn test_racy_reads() {
3674        let fs = test_filesystem().await;
3675        let object;
3676        let mut transaction = fs
3677            .clone()
3678            .new_transaction(lock_keys![], Options::default())
3679            .await
3680            .expect("new_transaction failed");
3681        let store = fs.root_store();
3682        object = Arc::new(
3683            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3684                .await
3685                .expect("create_object failed"),
3686        );
3687        transaction.commit().await.expect("commit failed");
3688        for _ in 0..100 {
3689            let cloned_object = object.clone();
3690            let writer = fasync::Task::spawn(async move {
3691                let mut buf = cloned_object.allocate_buffer(10).await;
3692                buf.as_mut_slice().fill(123);
3693                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3694            });
3695            let cloned_object = object.clone();
3696            let reader = fasync::Task::spawn(async move {
3697                let wait_time = rand::random_range(0..5);
3698                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3699                let mut buf = cloned_object.allocate_buffer(10).await;
3700                buf.as_mut_slice().fill(23);
3701                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3702                // If we succeed in reading data, it must include the write; i.e. if we see the size
3703                // change, we should see the data too.  For this to succeed it requires locking on
3704                // the read size to ensure that when we read the size, we get the extents changed in
3705                // that same transaction.
3706                if amount != 0 {
3707                    assert_eq!(amount, 10);
3708                    assert_eq!(buf.as_slice(), &[123; 10]);
3709                }
3710            });
3711            writer.await;
3712            reader.await;
3713            object.truncate(0).await.expect("truncate failed");
3714        }
3715        fs.close().await.expect("Close failed");
3716    }
3717
3718    #[fuchsia::test]
3719    async fn test_allocated_size() {
3720        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3721
3722        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3723        let mut buf = object.allocate_buffer(5).await;
3724        buf.as_mut_slice().copy_from_slice(b"hello");
3725        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3726        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3727        assert_eq!(after, before + fs.block_size() as u64);
3728
3729        // Do the same write again and there should be no change.
3730        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3731        assert_eq!(
3732            object.get_properties().await.expect("get_properties failed").allocated_size,
3733            after
3734        );
3735
3736        // extend...
3737        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3738        let offset = 1000 * fs.block_size() as u64;
3739        let before = after;
3740        object
3741            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3742            .await
3743            .expect("extend failed");
3744        transaction.commit().await.expect("commit failed");
3745        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3746        assert_eq!(after, before + fs.block_size() as u64);
3747
3748        // truncate...
3749        let before = after;
3750        let size = object.get_size();
3751        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3752        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3753        assert_eq!(after, before - fs.block_size() as u64);
3754
3755        // preallocate_range...
3756        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3757        let before = after;
3758        let mut file_range = offset..offset + fs.block_size() as u64;
3759        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3760        transaction.commit().await.expect("commit failed");
3761        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3762        assert_eq!(after, before + fs.block_size() as u64);
3763        fs.close().await.expect("Close failed");
3764    }
3765
3766    #[fuchsia::test(threads = 10)]
3767    async fn test_zero() {
3768        let (fs, object) = test_filesystem_and_object().await;
3769        let expected_size = object.get_size();
3770        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3771        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3772        transaction.commit().await.expect("commit failed");
3773        assert_eq!(object.get_size(), expected_size);
3774        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3775        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3776        assert_eq!(
3777            &buf.as_slice()[0..expected_size as usize],
3778            vec![0u8; expected_size as usize].as_slice()
3779        );
3780        fs.close().await.expect("Close failed");
3781    }
3782
3783    #[fuchsia::test]
3784    async fn test_properties() {
3785        let (fs, object) = test_filesystem_and_object().await;
3786        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3787        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3788        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3789
3790        // ObjectProperties can be updated through `update_attributes`.
3791        // `get_properties` should reflect the latest changes.
3792        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3793        object
3794            .update_attributes(
3795                &mut transaction,
3796                Some(&fio::MutableNodeAttributes {
3797                    creation_time: Some(CRTIME.as_nanos()),
3798                    modification_time: Some(MTIME.as_nanos()),
3799                    mode: Some(111),
3800                    gid: Some(222),
3801                    ..Default::default()
3802                }),
3803                None,
3804            )
3805            .await
3806            .expect("update_attributes failed");
3807        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3808        object
3809            .update_attributes(
3810                &mut transaction,
3811                Some(&fio::MutableNodeAttributes {
3812                    modification_time: Some(MTIME_NEW.as_nanos()),
3813                    gid: Some(333),
3814                    rdev: Some(444),
3815                    ..Default::default()
3816                }),
3817                Some(CTIME),
3818            )
3819            .await
3820            .expect("update_timestamps failed");
3821        transaction.commit().await.expect("commit failed");
3822
3823        let properties = object.get_properties().await.expect("get_properties failed");
3824        assert_matches!(
3825            properties,
3826            ObjectProperties {
3827                refs: 1u64,
3828                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3829                data_attribute_size: TEST_OBJECT_SIZE,
3830                creation_time: CRTIME,
3831                modification_time: MTIME_NEW,
3832                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3833                change_time: CTIME,
3834                ..
3835            }
3836        );
3837        fs.close().await.expect("Close failed");
3838    }
3839
3840    #[fuchsia::test]
3841    async fn test_is_allocated() {
3842        let (fs, object) = test_filesystem_and_object().await;
3843
3844        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3845        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3846        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3847        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3848
3849        // Check for the case where where we have the following extent layout
3850        //       [ unallocated ][ `TEST_DATA` ]
3851        // The extents before `aligned_offset` should not be allocated
3852        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3853        assert_eq!(count, aligned_offset);
3854        assert_eq!(allocated, false);
3855
3856        let (allocated, count) =
3857            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3858        assert_eq!(count, aligned_length);
3859        assert_eq!(allocated, true);
3860
3861        // Check for the case where where we query out of range
3862        let end = aligned_offset + aligned_length;
3863        object
3864            .is_allocated(end)
3865            .await
3866            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3867
3868        // Check for the case where where we start querying for allocation starting from
3869        // an allocated range to the end of the device
3870        let size = 50 * fs.block_size() as u64;
3871        object.truncate(size).await.expect("extend failed");
3872
3873        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3874        assert_eq!(count, size - end);
3875        assert_eq!(allocated, false);
3876
3877        // Check for the case where where we have the following extent layout
3878        //      [ unallocated ][ `buf` ][ `buf` ]
3879        let buf_length = 5 * fs.block_size();
3880        let mut buf = object.allocate_buffer(buf_length as usize).await;
3881        buf.as_mut_slice().fill(123);
3882        let new_offset = end + 20 * fs.block_size() as u64;
3883        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3884        object
3885            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3886            .await
3887            .expect("write failed");
3888
3889        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3890        assert_eq!(count, new_offset - end);
3891        assert_eq!(allocated, false);
3892
3893        let (allocated, count) =
3894            object.is_allocated(new_offset).await.expect("is_allocated failed");
3895        assert_eq!(count, 2 * buf_length);
3896        assert_eq!(allocated, true);
3897
3898        // Check the case where we query from the middle of an extent
3899        let (allocated, count) = object
3900            .is_allocated(new_offset + 4 * fs.block_size())
3901            .await
3902            .expect("is_allocated failed");
3903        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3904        assert_eq!(allocated, true);
3905
3906        // Now, write buffer to a location already written to.
3907        // Check for the case when we the following extent layout
3908        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3909        let other_buf_length = 3 * fs.block_size();
3910        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3911        other_buf.as_mut_slice().fill(231);
3912        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3913
3914        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3915        // allocated from `new_offset`
3916        let (allocated, count) =
3917            object.is_allocated(new_offset).await.expect("is_allocated failed");
3918        assert_eq!(count, 2 * buf_length);
3919        assert_eq!(allocated, true);
3920
3921        // Check for the case when we the following extent layout
3922        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3923        // Mark TEST_DATA as deleted
3924        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3925        object
3926            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3927            .await
3928            .expect("zero failed");
3929        // Mark `other_buf` as deleted
3930        object
3931            .zero(&mut transaction, new_offset..new_offset + buf_length)
3932            .await
3933            .expect("zero failed");
3934        transaction.commit().await.expect("commit transaction failed");
3935
3936        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3937        assert_eq!(count, new_offset + buf_length);
3938        assert_eq!(allocated, false);
3939
3940        let (allocated, count) =
3941            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3942        assert_eq!(count, buf_length);
3943        assert_eq!(allocated, true);
3944
3945        let new_end = new_offset + buf_length + count;
3946
3947        // Check for the case where there are objects with different keys.
3948        // Case that we're checking for:
3949        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3950        let store = object.owner();
3951        let mut transaction = fs
3952            .clone()
3953            .new_transaction(lock_keys![], Options::default())
3954            .await
3955            .expect("new_transaction failed");
3956        let object2 =
3957            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3958                .await
3959                .expect("create_object failed");
3960        transaction.commit().await.expect("commit failed");
3961
3962        object2
3963            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3964            .await
3965            .expect("write failed");
3966
3967        // Expecting that the extent with a different key is treated like unallocated extent
3968        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3969        assert_eq!(count, size - new_end);
3970        assert_eq!(allocated, false);
3971
3972        fs.close().await.expect("close failed");
3973    }
3974
3975    #[fuchsia::test(threads = 10)]
3976    async fn test_read_write_attr() {
3977        let (_fs, object) = test_filesystem_and_object().await;
3978        let data = [0xffu8; 16_384];
3979        object.write_attr(AttributeId(20), &data).await.expect("write_attr failed");
3980        let rdata = object
3981            .read_attr(AttributeId(20))
3982            .await
3983            .expect("read_attr failed")
3984            .expect("no attribute data found");
3985        assert_eq!(&data[..], &rdata[..]);
3986
3987        assert_eq!(object.read_attr(AttributeId(21)).await.expect("read_attr failed"), None);
3988    }
3989
3990    #[fuchsia::test(threads = 10)]
3991    async fn test_allocate_basic() {
3992        let (fs, object) = test_filesystem_and_empty_object().await;
3993        let block_size = fs.block_size();
3994        let file_size = block_size * 10;
3995        object.truncate(file_size).await.unwrap();
3996
3997        let small_buf_size = 1024;
3998        let large_buf_aligned_size = block_size as usize * 2;
3999        let large_buf_size = block_size as usize * 2 + 1024;
4000
4001        let mut small_buf = object.allocate_buffer(small_buf_size).await;
4002        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
4003        let mut large_buf = object.allocate_buffer(large_buf_size).await;
4004
4005        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
4006        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
4007        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
4008        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
4009        assert_eq!(
4010            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
4011            large_buf_aligned_size
4012        );
4013        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
4014
4015        // Allocation succeeds, and without any writes to the location it shows up as zero.
4016        object.allocate(block_size..block_size * 3).await.unwrap();
4017
4018        // Test starting before, inside, and after the allocated section with every sized buffer.
4019        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
4020            for offset in 0..4 {
4021                assert_eq!(
4022                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
4023                    buf.len(),
4024                    "buf_index: {}, read offset: {}",
4025                    buf_index,
4026                    offset,
4027                );
4028                assert_eq!(
4029                    buf.as_slice(),
4030                    &vec![0; buf.len()],
4031                    "buf_index: {}, read offset: {}",
4032                    buf_index,
4033                    offset,
4034                );
4035            }
4036        }
4037
4038        fs.close().await.expect("close failed");
4039    }
4040
4041    #[fuchsia::test(threads = 10)]
4042    async fn test_allocate_extends_file() {
4043        const BUF_SIZE: usize = 1024;
4044        let (fs, object) = test_filesystem_and_empty_object().await;
4045        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4046        let block_size = fs.block_size();
4047
4048        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4049        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4050
4051        assert!(TEST_OBJECT_SIZE < block_size * 4);
4052        // Allocation succeeds, and without any writes to the location it shows up as zero.
4053        object.allocate(0..block_size * 4).await.unwrap();
4054        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4055        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4056        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4057        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4058        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4059        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4060
4061        fs.close().await.expect("close failed");
4062    }
4063
4064    #[fuchsia::test(threads = 10)]
4065    async fn test_allocate_past_end() {
4066        const BUF_SIZE: usize = 1024;
4067        let (fs, object) = test_filesystem_and_empty_object().await;
4068        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4069        let block_size = fs.block_size();
4070
4071        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4072        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4073
4074        assert!(TEST_OBJECT_SIZE < block_size * 4);
4075        // Allocation succeeds, and without any writes to the location it shows up as zero.
4076        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4077        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4078        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4079        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4080        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4081        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4082        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4083
4084        fs.close().await.expect("close failed");
4085    }
4086
4087    #[fuchsia::test(threads = 10)]
4088    async fn test_allocate_read_attr() {
4089        let (fs, object) = test_filesystem_and_empty_object().await;
4090        let block_size = fs.block_size();
4091        let file_size = block_size * 4;
4092        object.truncate(file_size).await.unwrap();
4093
4094        let content = object
4095            .read_attr(object.attribute_id())
4096            .await
4097            .expect("failed to read attr")
4098            .expect("attr returned none");
4099        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4100
4101        object.allocate(block_size..block_size * 3).await.unwrap();
4102
4103        let content = object
4104            .read_attr(object.attribute_id())
4105            .await
4106            .expect("failed to read attr")
4107            .expect("attr returned none");
4108        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4109
4110        fs.close().await.expect("close failed");
4111    }
4112
4113    #[fuchsia::test(threads = 10)]
4114    async fn test_allocate_existing_data() {
4115        struct Case {
4116            written_ranges: Vec<Range<usize>>,
4117            allocate_range: Range<u64>,
4118        }
4119        let cases = [
4120            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4121            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4122            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4123            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4124            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4125            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4126            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4127        ];
4128
4129        for case in cases {
4130            let (fs, object) = test_filesystem_and_empty_object().await;
4131            let block_size = fs.block_size();
4132            let file_size = block_size * 10;
4133            object.truncate(file_size).await.unwrap();
4134
4135            for write in &case.written_ranges {
4136                let write_len = (write.end - write.start) * block_size as usize;
4137                let mut write_buf = object.allocate_buffer(write_len).await;
4138                write_buf.as_mut_slice().fill(0xff);
4139                assert_eq!(
4140                    object
4141                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4142                        .await
4143                        .unwrap(),
4144                    file_size
4145                );
4146            }
4147
4148            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4149            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4150
4151            object
4152                .allocate(
4153                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4154                )
4155                .await
4156                .unwrap();
4157
4158            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4159            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4160            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4161
4162            fs.close().await.expect("close failed");
4163        }
4164    }
4165
4166    async fn get_modes(
4167        obj: &DataObjectHandle<ObjectStore>,
4168        mut search_range: Range<u64>,
4169    ) -> Vec<(Range<u64>, ExtentMode)> {
4170        let mut modes = Vec::new();
4171        let store = obj.store();
4172        let tree = store.tree();
4173        let layer_set = tree.layer_set();
4174        let mut merger = layer_set.merger();
4175        let mut iter = merger
4176            .query(Query::FullRange(&ObjectKey::attribute(
4177                obj.object_id(),
4178                AttributeId::DATA,
4179                AttributeKey::Extent(Extent::search_key_from_offset(search_range.start)),
4180            )))
4181            .await
4182            .unwrap();
4183        loop {
4184            match iter.get() {
4185                Some(ItemRef {
4186                    key:
4187                        ObjectKey {
4188                            object_id,
4189                            data:
4190                                ObjectKeyData::Attribute(
4191                                    AttributeId::DATA,
4192                                    AttributeKey::Extent(extent),
4193                                ),
4194                        },
4195                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4196                    ..
4197                }) if *object_id == obj.object_id() => {
4198                    if search_range.end <= extent.start {
4199                        break;
4200                    }
4201                    let found_range = std::cmp::max(search_range.start, extent.start)
4202                        ..std::cmp::min(search_range.end, extent.end);
4203                    search_range.start = found_range.end;
4204                    modes.push((found_range, mode.clone()));
4205                    if search_range.start == search_range.end {
4206                        break;
4207                    }
4208                    iter.advance().await.unwrap();
4209                }
4210                x => panic!("looking for extent record, found this {:?}", x),
4211            }
4212        }
4213        modes
4214    }
4215
4216    async fn assert_all_overwrite(
4217        obj: &DataObjectHandle<ObjectStore>,
4218        mut search_range: Range<u64>,
4219    ) {
4220        let modes = get_modes(obj, search_range.clone()).await;
4221        for mode in modes {
4222            assert_eq!(
4223                mode.0.start, search_range.start,
4224                "missing mode in range {}..{}",
4225                search_range.start, mode.0.start
4226            );
4227            match mode.1 {
4228                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4229                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4230            }
4231            assert!(
4232                mode.0.end <= search_range.end,
4233                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4234                search_range,
4235                mode,
4236            );
4237            search_range.start = mode.0.end;
4238        }
4239        assert_eq!(
4240            search_range.start, search_range.end,
4241            "missing mode in range {:?}",
4242            search_range
4243        );
4244    }
4245
4246    #[fuchsia::test(threads = 10)]
4247    async fn test_multi_overwrite() {
4248        #[derive(Debug)]
4249        struct Case {
4250            pre_writes: Vec<Range<usize>>,
4251            allocate_ranges: Vec<Range<u64>>,
4252            overwrites: Vec<Vec<Range<u64>>>,
4253        }
4254        let cases = [
4255            Case {
4256                pre_writes: Vec::new(),
4257                allocate_ranges: vec![1..3],
4258                overwrites: vec![vec![1..3]],
4259            },
4260            Case {
4261                pre_writes: Vec::new(),
4262                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4263                overwrites: vec![vec![0..4]],
4264            },
4265            Case {
4266                pre_writes: Vec::new(),
4267                allocate_ranges: vec![0..4],
4268                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4269            },
4270            Case {
4271                pre_writes: Vec::new(),
4272                allocate_ranges: vec![0..4],
4273                overwrites: vec![vec![3..4]],
4274            },
4275            Case {
4276                pre_writes: Vec::new(),
4277                allocate_ranges: vec![0..4],
4278                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4279            },
4280            Case {
4281                pre_writes: Vec::new(),
4282                allocate_ranges: vec![1..2, 5..6, 7..8],
4283                overwrites: vec![vec![5..6]],
4284            },
4285            Case {
4286                pre_writes: Vec::new(),
4287                allocate_ranges: vec![1..3],
4288                overwrites: vec![
4289                    vec![1..3],
4290                    vec![1..3],
4291                    vec![1..3],
4292                    vec![1..3],
4293                    vec![1..3],
4294                    vec![1..3],
4295                    vec![1..3],
4296                    vec![1..3],
4297                ],
4298            },
4299            Case {
4300                pre_writes: Vec::new(),
4301                allocate_ranges: vec![0..5],
4302                overwrites: vec![
4303                    vec![1..3],
4304                    vec![1..3],
4305                    vec![1..3],
4306                    vec![1..3],
4307                    vec![1..3],
4308                    vec![1..3],
4309                    vec![1..3],
4310                    vec![1..3],
4311                ],
4312            },
4313            Case {
4314                pre_writes: Vec::new(),
4315                allocate_ranges: vec![0..5],
4316                overwrites: vec![vec![0..2, 2..4, 4..5]],
4317            },
4318            Case {
4319                pre_writes: Vec::new(),
4320                allocate_ranges: vec![0..5, 5..10],
4321                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4322            },
4323            Case {
4324                pre_writes: Vec::new(),
4325                allocate_ranges: vec![0..4, 6..10],
4326                overwrites: vec![vec![2..3, 7..9]],
4327            },
4328            Case {
4329                pre_writes: Vec::new(),
4330                allocate_ranges: vec![0..10],
4331                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4332            },
4333            Case {
4334                pre_writes: Vec::new(),
4335                allocate_ranges: vec![0..10],
4336                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4337            },
4338            Case {
4339                pre_writes: vec![1..3],
4340                allocate_ranges: vec![1..3],
4341                overwrites: vec![vec![1..3]],
4342            },
4343            Case {
4344                pre_writes: vec![1..3],
4345                allocate_ranges: vec![4..6],
4346                overwrites: vec![vec![5..6]],
4347            },
4348            Case {
4349                pre_writes: vec![1..3],
4350                allocate_ranges: vec![0..4],
4351                overwrites: vec![vec![0..4]],
4352            },
4353            Case {
4354                pre_writes: vec![1..3],
4355                allocate_ranges: vec![2..4],
4356                overwrites: vec![vec![2..4]],
4357            },
4358            Case {
4359                pre_writes: vec![3..5],
4360                allocate_ranges: vec![1..3, 6..7],
4361                overwrites: vec![vec![1..3, 6..7]],
4362            },
4363            Case {
4364                pre_writes: vec![1..3, 5..7, 8..9],
4365                allocate_ranges: vec![0..5],
4366                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4367            },
4368            Case {
4369                pre_writes: Vec::new(),
4370                allocate_ranges: vec![0..10, 4..6],
4371                overwrites: Vec::new(),
4372            },
4373            Case {
4374                pre_writes: Vec::new(),
4375                allocate_ranges: vec![3..8, 5..10],
4376                overwrites: Vec::new(),
4377            },
4378            Case {
4379                pre_writes: Vec::new(),
4380                allocate_ranges: vec![5..10, 3..8],
4381                overwrites: Vec::new(),
4382            },
4383        ];
4384
4385        for (i, case) in cases.into_iter().enumerate() {
4386            log::info!("running case {} - {:?}", i, case);
4387            let (fs, object) = test_filesystem_and_empty_object().await;
4388            let block_size = fs.block_size();
4389            let file_size = block_size * 10;
4390            object.truncate(file_size).await.unwrap();
4391
4392            for write in case.pre_writes {
4393                let write_len = (write.end - write.start) * block_size as usize;
4394                let mut write_buf = object.allocate_buffer(write_len).await;
4395                write_buf.as_mut_slice().fill(0xff);
4396                assert_eq!(
4397                    object
4398                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4399                        .await
4400                        .unwrap(),
4401                    file_size
4402                );
4403            }
4404
4405            for allocate_range in &case.allocate_ranges {
4406                object
4407                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4408                    .await
4409                    .unwrap();
4410            }
4411
4412            for allocate_range in case.allocate_ranges {
4413                assert_all_overwrite(
4414                    &object,
4415                    allocate_range.start * block_size..allocate_range.end * block_size,
4416                )
4417                .await;
4418            }
4419
4420            for overwrite in case.overwrites {
4421                let mut write_len = 0;
4422                let overwrite = overwrite
4423                    .into_iter()
4424                    .map(|r| {
4425                        write_len += (r.end - r.start) * block_size;
4426                        r.start * block_size..r.end * block_size
4427                    })
4428                    .collect::<Vec<_>>();
4429                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4430                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4431                write_buf.as_mut_slice().copy_from_slice(&data);
4432
4433                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4434                assert_eq!(
4435                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4436                    expected_buf.len()
4437                );
4438                let expected_buf_slice = expected_buf.as_mut_slice();
4439                let mut data_slice = data.as_slice();
4440                for r in &overwrite {
4441                    let len = r.length().unwrap() as usize;
4442                    let (copy_from, rest) = data_slice.split_at(len);
4443                    expected_buf_slice[r.start as usize..r.end as usize]
4444                        .copy_from_slice(&copy_from);
4445                    data_slice = rest;
4446                }
4447
4448                let mut transaction = object.new_transaction().await.unwrap();
4449                object
4450                    .multi_overwrite(
4451                        &mut transaction,
4452                        AttributeId::DATA,
4453                        &overwrite,
4454                        write_buf.as_mut(),
4455                    )
4456                    .await
4457                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4458                // Double check the emitted checksums. We should have one u64 checksum for every
4459                // block we wrote to disk.
4460                let mut checksummed_range_length = 0;
4461                let mut num_checksums = 0;
4462                for (device_range, checksums, _) in transaction.checksums() {
4463                    let range_len = device_range.end - device_range.start;
4464                    let checksums_len = checksums.len() as u64;
4465                    assert_eq!(range_len / checksums_len, block_size);
4466                    checksummed_range_length += range_len;
4467                    num_checksums += checksums_len;
4468                }
4469                assert_eq!(checksummed_range_length, write_len);
4470                assert_eq!(num_checksums, write_len / block_size);
4471                transaction.commit().await.unwrap();
4472
4473                let mut buf = object.allocate_buffer(file_size as usize).await;
4474                assert_eq!(
4475                    object.read(0, buf.as_mut()).await.unwrap(),
4476                    buf.len(),
4477                    "failed length check on case {}",
4478                    i,
4479                );
4480                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4481            }
4482
4483            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4484            fs.close().await.expect("close failed");
4485        }
4486    }
4487
4488    #[fuchsia::test(threads = 10)]
4489    async fn test_multi_overwrite_mode_updates() {
4490        let (fs, object) = test_filesystem_and_empty_object().await;
4491        let block_size = fs.block_size();
4492        let file_size = block_size * 10;
4493        object.truncate(file_size).await.unwrap();
4494
4495        let mut expected_bitmap = BitVec::from_elem(10, false);
4496
4497        object.allocate(0..10 * block_size).await.unwrap();
4498        assert_eq!(
4499            get_modes(&object, 0..10 * block_size).await,
4500            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4501        );
4502
4503        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4504        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4505        write_buf.as_mut_slice().copy_from_slice(&data);
4506        let mut transaction = object.new_transaction().await.unwrap();
4507        object
4508            .multi_overwrite(
4509                &mut transaction,
4510                AttributeId::DATA,
4511                &[2 * block_size..4 * block_size],
4512                write_buf.as_mut(),
4513            )
4514            .await
4515            .unwrap();
4516        transaction.commit().await.unwrap();
4517
4518        expected_bitmap.set(2, true);
4519        expected_bitmap.set(3, true);
4520        assert_eq!(
4521            get_modes(&object, 0..10 * block_size).await,
4522            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4523        );
4524
4525        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4526        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4527        write_buf.as_mut_slice().copy_from_slice(&data);
4528        let mut transaction = object.new_transaction().await.unwrap();
4529        object
4530            .multi_overwrite(
4531                &mut transaction,
4532                AttributeId::DATA,
4533                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4534                write_buf.as_mut(),
4535            )
4536            .await
4537            .unwrap();
4538        transaction.commit().await.unwrap();
4539
4540        expected_bitmap.set(4, true);
4541        expected_bitmap.set(6, true);
4542        assert_eq!(
4543            get_modes(&object, 0..10 * block_size).await,
4544            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4545        );
4546
4547        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4548        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4549        write_buf.as_mut_slice().copy_from_slice(&data);
4550        let mut transaction = object.new_transaction().await.unwrap();
4551        object
4552            .multi_overwrite(
4553                &mut transaction,
4554                AttributeId::DATA,
4555                &[
4556                    0..2 * block_size,
4557                    5 * block_size..6 * block_size,
4558                    7 * block_size..10 * block_size,
4559                ],
4560                write_buf.as_mut(),
4561            )
4562            .await
4563            .unwrap();
4564        transaction.commit().await.unwrap();
4565
4566        assert_eq!(
4567            get_modes(&object, 0..10 * block_size).await,
4568            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4569        );
4570
4571        fs.close().await.expect("close failed");
4572    }
4573
4574    #[fuchsia::test(threads = 10)]
4575    async fn test_check_unwritten_zero() {
4576        let device = DeviceHolder::new(FakeDevice::new(256 * 1024, TEST_DEVICE_BLOCK_SIZE));
4577        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4578        let object = create_object_with_key(fs.clone(), Some(&new_insecure_crypt()), false).await;
4579        let block_size = fs.block_size();
4580
4581        // Set up a file with eight blocks to look like this:
4582        // | None | COW | COW | None | Overwrite(unwritten) | Overwrite(written) | None |
4583        let file_size = block_size * 7;
4584        object.truncate(file_size).await.unwrap();
4585        assert!(object.check_unwritten_zero(0..file_size).await.unwrap());
4586
4587        let mut buffer = object.allocate_buffer(block_size as usize).await;
4588        buffer.as_mut_slice().fill(1);
4589        object.write_or_append(Some(block_size), buffer.as_ref()).await.expect("write failed");
4590        object.write_or_append(Some(block_size * 2), buffer.as_ref()).await.expect("write failed");
4591
4592        object.allocate((block_size * 4)..(block_size * 6)).await.expect("Allocate failed");
4593        let mut transaction = fs
4594            .clone()
4595            .new_transaction(
4596                lock_keys![LockKey::object(object.store().store_object_id(), object.object_id(),)],
4597                Options::default(),
4598            )
4599            .await
4600            .expect("new_transaction failed");
4601        object
4602            .multi_overwrite(
4603                &mut transaction,
4604                AttributeId::DATA,
4605                &vec![(block_size * 5)..(block_size * 6)],
4606                buffer.as_mut(),
4607            )
4608            .await
4609            .expect("Multi overwrite");
4610        transaction.commit().await.expect("Committing overwrite");
4611
4612        // Anything touching the COW ranges should fail.
4613        assert!(!object.check_unwritten_zero(0..(block_size * 2)).await.unwrap());
4614        assert!(!object.check_unwritten_zero(block_size..(block_size * 3)).await.unwrap());
4615        assert!(!object.check_unwritten_zero((block_size * 2)..(block_size * 4)).await.unwrap());
4616
4617        // This should be fine, as the OverwritePartial should only touch the unwritten block.
4618        assert!(object.check_unwritten_zero((block_size * 3)..(block_size * 5)).await.unwrap());
4619
4620        // These should touch the written overwrite block and fail.
4621        assert!(!object.check_unwritten_zero((block_size * 4)..(block_size * 6)).await.unwrap());
4622        assert!(!object.check_unwritten_zero((block_size * 5)..(block_size * 7)).await.unwrap());
4623
4624        fs.close().await.expect("close failed");
4625    }
4626}