Skip to main content

fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, DirType, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey,
16    ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    AttributeId, Extent, HandleOptions, HandleOwner, RootDigest, StoreObjectHandle,
25    TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHash, FsVerityHasher, FsVerityHasherOptions,
34    MerkleTree, MerkleTreeBuilder, Sha256Hash, Sha512Hash,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45use zerocopy::FromBytes;
46
47mod allocated_ranges;
48pub use allocated_ranges::{AllocatedRanges, RangeType};
49
50/// How much data each transaction will cover when writing an attribute across batches. Pulled from
51/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
52pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
53
54/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
55/// attribute. In addition to traditional files, this means things like the journal, superblocks,
56/// and layer files.
57///
58/// It caches the content size of the data attribute it was configured for, and has helpers for
59/// complex extent manipulation, as well as implementations of ReadObjectHandle and
60/// WriteObjectHandle.
61pub struct DataObjectHandle<S: HandleOwner> {
62    handle: StoreObjectHandle<S>,
63    attribute_id: AttributeId,
64    content_size: AtomicU64,
65    fsverity_state: Mutex<FsverityState>,
66    overwrite_ranges: AllocatedRanges,
67}
68
69/// Represents the mapping of a file's contents to the physical storage backing it.
70#[derive(Debug, Clone)]
71pub struct FileExtent {
72    logical_offset: u64,
73    device_range: Range<u64>,
74}
75
76impl FileExtent {
77    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
78        // Ensure `device_range` is valid.
79        let length = device_range.length()?;
80        // Ensure no overflow when we calculate the end of the logical range.
81        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
82        Ok(Self { logical_offset, device_range })
83    }
84}
85
86impl FileExtent {
87    pub fn length(&self) -> u64 {
88        // SAFETY: We verified that the device_range's length is valid in Self::new.
89        unsafe { self.device_range.unchecked_length() }
90    }
91
92    pub fn logical_offset(&self) -> u64 {
93        self.logical_offset
94    }
95
96    pub fn logical_range(&self) -> Range<u64> {
97        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
98        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
99    }
100
101    pub fn device_range(&self) -> &Range<u64> {
102        &self.device_range
103    }
104}
105
106#[derive(Debug)]
107pub enum FsverityState {
108    None,
109    Started,
110    Pending(FsverityStateInner),
111    Some(FsverityStateInner),
112}
113
114#[derive(Debug)]
115pub struct FsverityStateInner {
116    root_digest: RootDigest,
117    salt: Vec<u8>,
118    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
119    // Potentially store a pager-backed vmo instead of passing around a boxed array.
120    merkle_tree: Box<[u8]>,
121}
122
123#[derive(Debug, Default)]
124pub struct OverwriteOptions {
125    // If false, then all the extents for the overwrite range must have been preallocated using
126    // preallocate_range or from existing writes.
127    pub allow_allocations: bool,
128    pub barrier_on_first_write: bool,
129}
130
131impl FsverityStateInner {
132    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
133        FsverityStateInner { root_digest, salt, merkle_tree }
134    }
135
136    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
137        match self.root_digest {
138            RootDigest::Sha256(_) => {
139                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
140            }
141            RootDigest::Sha512(_) => {
142                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
143            }
144        }
145    }
146
147    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
148        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
149            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
150
151        let root_digest = match descriptor.digest_algorithm() {
152            fio::HashAlgorithm::Sha256 => {
153                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
154            }
155            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
156            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
157        };
158        let hasher = descriptor.hasher();
159        let leaves =
160            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
161
162        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
163    }
164}
165
166impl<S: HandleOwner> Deref for DataObjectHandle<S> {
167    type Target = StoreObjectHandle<S>;
168    fn deref(&self) -> &Self::Target {
169        &self.handle
170    }
171}
172
173impl<S: HandleOwner> DataObjectHandle<S> {
174    pub fn new(
175        owner: Arc<S>,
176        object_id: u64,
177        permanent_keys: bool,
178        attribute_id: AttributeId,
179        size: u64,
180        fsverity_state: FsverityState,
181        options: HandleOptions,
182        trace: bool,
183        overwrite_ranges: &[Range<u64>],
184    ) -> Self {
185        Self {
186            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
187            attribute_id,
188            content_size: AtomicU64::new(size),
189            fsverity_state: Mutex::new(fsverity_state),
190            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
191        }
192    }
193
194    pub fn attribute_id(&self) -> AttributeId {
195        self.attribute_id
196    }
197
198    /// Consumes the `DataObjectHandle` and returns the `StoreObjectHandle` that it contained.
199    pub fn into_store_object_handle(self) -> StoreObjectHandle<S> {
200        self.handle
201    }
202
203    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
204        &self.overwrite_ranges
205    }
206
207    pub fn is_verified_file(&self) -> bool {
208        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
209    }
210
211    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
212    /// If another caller has already started but not completed `enabled_verity`, returns
213    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
214    /// FxfsError::AlreadyExists.
215    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
216        let mut fsverity_guard = self.fsverity_state.lock();
217        match *fsverity_guard {
218            FsverityState::None => {
219                *fsverity_guard = FsverityState::Started;
220                Ok(())
221            }
222            FsverityState::Started | FsverityState::Pending(_) => {
223                Err(anyhow!(FxfsError::Unavailable))
224            }
225            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
226        }
227    }
228
229    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
230    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
231    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
232        let mut fsverity_guard = self.fsverity_state.lock();
233        assert!(matches!(*fsverity_guard, FsverityState::Started));
234        *fsverity_guard = FsverityState::Pending(descriptor);
235    }
236
237    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
238    /// not `FsverityState::Pending(_)`.
239    pub fn finalize_fsverity_state(&self) {
240        let mut fsverity_state_guard = self.fsverity_state.lock();
241        let mut_fsverity_state = fsverity_state_guard.deref_mut();
242        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
243        match fsverity_state {
244            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
245            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
246            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
247            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
248        }
249        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
250        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
251        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
252        // converting them back to sparse regions.
253        self.overwrite_ranges.clear();
254    }
255
256    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
257    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
258    /// verified against the root digest here, and will return an error if the tree is not correct.
259    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
260        let (metadata, hasher) = match descriptor {
261            FsverityMetadata::Internal(root_digest, salt) => {
262                let merkle_tree = self
263                    .read_attr(AttributeId::FSVERITY_MERKLE)
264                    .await?
265                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
266                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
267                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
268                (metadata, hasher)
269            }
270            FsverityMetadata::F2fs(verity_range) => {
271                let expected_length = verity_range.length()? as usize;
272                let mut buffer = self
273                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
274                    .await;
275                ensure!(
276                    expected_length
277                        == self
278                            .handle
279                            .read(AttributeId::FSVERITY_MERKLE, verity_range.start, buffer.as_mut())
280                            .await?,
281                    FxfsError::Inconsistent
282                );
283                FsverityStateInner::from_bytes(
284                    buffer.as_slice()[0..expected_length].into(),
285                    self.block_size() as usize,
286                )?
287            }
288        };
289        // Validate the merkle tree data against the root before applying it.
290        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
291        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
292
293        let root_hash = match &metadata.root_digest {
294            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
295            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
296        };
297
298        let tree = match hasher {
299            FsVerityHasher::Sha256(_) => {
300                let mut builder = MerkleTreeBuilder::<Sha256Hash>::new(hasher);
301                for leaf in leaf_chunks {
302                    let hash = Sha256Hash::read_from_bytes(leaf).unwrap();
303                    builder.push_data_hash(hash);
304                }
305                builder.finish()
306            }
307            FsVerityHasher::Sha512(_) => {
308                let mut builder = MerkleTreeBuilder::<Sha512Hash>::new(hasher);
309                for leaf in leaf_chunks {
310                    let hash = Sha512Hash::read_from_bytes(leaf).unwrap();
311                    builder.push_data_hash(hash);
312                }
313                builder.finish()
314            }
315        };
316
317        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
318
319        let mut fsverity_guard = self.fsverity_state.lock();
320        assert!(matches!(*fsverity_guard, FsverityState::None));
321        *fsverity_guard = FsverityState::Some(metadata);
322
323        Ok(())
324    }
325
326    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
327    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
328    /// block-aligned. Fails on non fsverity-enabled files.
329    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
330        let block_size = self.block_size() as usize;
331        assert!(offset % block_size == 0);
332        let fsverity_state = self.fsverity_state.lock();
333        match &*fsverity_state {
334            FsverityState::None => {
335                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
336            }
337            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
338                "Enable verity has not yet completed, fsverity state: {:?}",
339                *fsverity_state
340            )),
341            FsverityState::Some(metadata) => {
342                let hasher = metadata.get_hasher_for_block_size(block_size);
343                let leaf_nodes: Vec<&[u8]> =
344                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
345                fxfs_trace::duration!("fsverity-verify", "len" => buffer.len());
346                // TODO(b/318880297): Consider parallelizing computation.
347                for b in buffer.chunks(block_size) {
348                    ensure!(
349                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
350                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
351                    );
352                    offset += block_size;
353                }
354                Ok(())
355            }
356        }
357    }
358
359    /// Extend the file with the given extent.  The only use case for this right now is for files
360    /// that must exist at certain offsets on the device, such as super-blocks.
361    pub async fn extend<'a>(
362        &'a self,
363        transaction: &mut Transaction<'a>,
364        device_range: Range<u64>,
365    ) -> Result<(), Error> {
366        let old_end =
367            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
368        let new_size = old_end + device_range.end - device_range.start;
369        self.store().allocator().mark_allocated(
370            transaction,
371            self.store().store_object_id(),
372            device_range.clone(),
373        )?;
374        self.txn_update_size(transaction, new_size, None).await?;
375        let key_id = self.get_key(None).await?.0;
376        transaction.add(
377            self.store().store_object_id,
378            Mutation::merge_object(
379                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
380                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
381            ),
382        );
383        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
384    }
385
386    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
387    // the data from `buf`.
388    async fn align_buffer(
389        &self,
390        offset: u64,
391        buf: BufferRef<'_>,
392    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
393        self.handle.align_buffer(self.attribute_id(), offset, buf).await
394    }
395
396    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
397    // data will be encrypted if necessary.
398    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
399    // the buffer in-place rather than copying to another buffer if the write is already aligned.
400    async fn write_at(
401        &self,
402        offset: u64,
403        buf: MutableBufferRef<'_>,
404        device_offset: u64,
405    ) -> Result<MaybeChecksums, Error> {
406        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
407    }
408
409    /// Verifies that the entire range in the file is zeroes, as either uninitialized overwrite
410    /// range, or no extent at all. If a single allocated and written extent is found, this returns
411    /// false.
412    pub async fn check_unwritten_zero(&self, range: Range<u64>) -> Result<bool, Error> {
413        let tree = &self.store().tree();
414        let layer_set = tree.layer_set();
415        let key = Extent(range);
416        let lower_bound = ObjectKey::attribute(
417            self.object_id(),
418            self.attribute_id,
419            AttributeKey::Extent(key.search_key()),
420        );
421        let mut merger = layer_set.merger();
422        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
423        while let Some(ItemRef {
424            key:
425                ObjectKey {
426                    object_id,
427                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
428                },
429            value: ObjectValue::Extent(value),
430            ..
431        }) = iter.get()
432            && *object_id == self.object_id()
433            && *attr_id == self.attribute_id
434        {
435            if let ExtentValue::Some { mode, .. } = value {
436                if let Some(overlap) = key.overlap(extent_key) {
437                    if let ExtentMode::OverwritePartial(bits) = mode {
438                        let starting_index = (overlap.start - extent_key.start) / self.block_size();
439                        for initialized in bits
440                            .iter()
441                            .skip(starting_index as usize)
442                            .take((overlap.length().unwrap() / self.block_size()) as usize)
443                        {
444                            if initialized {
445                                return Ok(false);
446                            }
447                        }
448                    } else {
449                        return Ok(false);
450                    }
451                } else {
452                    break;
453                }
454            }
455            iter.advance().await?;
456        }
457        Ok(true)
458    }
459
460    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
461    pub async fn zero(
462        &self,
463        transaction: &mut Transaction<'_>,
464        range: Range<u64>,
465    ) -> Result<(), Error> {
466        self.handle.zero(transaction, self.attribute_id(), range).await
467    }
468
469    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
470    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
471    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
472    pub fn get_descriptor(&self) -> Option<(fio::VerificationOptions, Vec<u8>)> {
473        let fsverity_state = self.fsverity_state.lock();
474        match &*fsverity_state {
475            FsverityState::Some(metadata) => {
476                let (options, root_hash) = match &metadata.root_digest {
477                    RootDigest::Sha256(root_hash) => (
478                        fio::VerificationOptions {
479                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
480                            salt: Some(metadata.salt.clone()),
481                            ..Default::default()
482                        },
483                        root_hash.to_vec(),
484                    ),
485                    RootDigest::Sha512(root_hash) => (
486                        fio::VerificationOptions {
487                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
488                            salt: Some(metadata.salt.clone()),
489                            ..Default::default()
490                        },
491                        root_hash.clone(),
492                    ),
493                };
494                Some((options, root_hash))
495            }
496            _ => None,
497        }
498    }
499
500    async fn build_verity_tree(
501        &self,
502        hasher: FsVerityHasher,
503        hash_alg: fio::HashAlgorithm,
504        salt: &[u8],
505    ) -> Result<(MerkleTree, Vec<u8>), Error> {
506        match hasher {
507            FsVerityHasher::Sha256(_) => {
508                self.build_verity_tree_impl::<Sha256Hash>(hasher, hash_alg, salt).await
509            }
510            FsVerityHasher::Sha512(_) => {
511                self.build_verity_tree_impl::<Sha512Hash>(hasher, hash_alg, salt).await
512            }
513        }
514    }
515
516    async fn build_verity_tree_impl<D: FsVerityHash>(
517        &self,
518        hasher: FsVerityHasher,
519        hash_alg: fio::HashAlgorithm,
520        salt: &[u8],
521    ) -> Result<(MerkleTree, Vec<u8>), Error> {
522        let hash_len = hasher.hash_size();
523        let mut builder = MerkleTreeBuilder::<D>::new(hasher);
524        let mut offset = 0;
525        let size = self.get_size();
526        // TODO(b/314836822): Consider further tuning the buffer size to optimize
527        // performance. Experimentally, most verity-enabled files are <256K.
528        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
529        while offset < size {
530            // TODO(b/314842875): Consider optimizations for sparse files.
531            let read = self.read(offset, buf.as_mut()).await? as u64;
532            assert!(offset + read <= size);
533            builder.write(&buf.as_slice()[0..read as usize]);
534            offset += read;
535        }
536        let tree = builder.finish();
537        // This will include a block for the root layer, which will be used to house the descriptor.
538        let tree_data_len = tree
539            .levels()
540            .iter()
541            .map(|layer| layer.len().next_multiple_of(self.block_size() as usize))
542            .sum();
543        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
544        // Iterating from the top layers down to the leaves.
545        for layer in tree.levels().iter().rev() {
546            // Skip the root layer.
547            if layer.len() <= hash_len {
548                continue;
549            }
550            merkle_tree_data.extend_from_slice(layer);
551            // Pad to the end of the block.
552            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
553            merkle_tree_data.resize(padded_size, 0);
554        }
555
556        // Zero the last block, then write the descriptor to the start of it.
557        let descriptor_offset = merkle_tree_data.len();
558        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
559        let descriptor = FsVerityDescriptorRaw::new(
560            hash_alg,
561            self.block_size(),
562            self.get_size(),
563            tree.root(),
564            salt,
565        )?;
566        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
567
568        Ok((tree, merkle_tree_data))
569    }
570
571    /// Reads the data attribute and computes a merkle tree from the data. The values of the
572    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
573    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
574    /// `AttributeId::FSVERITY_MERKLE`. Updates the root_hash of the `descriptor` according to the
575    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
576    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
577    #[trace]
578    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
579        self.set_fsverity_state_started()?;
580        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
581        // the graveyard should process the tombstone before we start rewriting the attribute.
582        if self
583            .store()
584            .tree()
585            .find(&ObjectKey::graveyard_attribute_entry(
586                self.store().graveyard_directory_object_id(),
587                self.object_id(),
588                AttributeId::FSVERITY_MERKLE,
589            ))
590            .await?
591            .is_some()
592        {
593            self.store().filesystem().graveyard().flush().await;
594        }
595        let mut transaction = self.new_transaction().await?;
596        let hash_alg =
597            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
598        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
599        let (root_digest, merkle_tree) = match hash_alg {
600            fio::HashAlgorithm::Sha256 => {
601                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
602                    salt.clone(),
603                    self.block_size() as usize,
604                ));
605                let (tree, merkle_tree_data) =
606                    self.build_verity_tree(hasher, hash_alg, &salt).await?;
607                let root: [u8; 32] = tree.root().try_into().unwrap();
608                (RootDigest::Sha256(root), merkle_tree_data)
609            }
610            fio::HashAlgorithm::Sha512 => {
611                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
612                    salt.clone(),
613                    self.block_size() as usize,
614                ));
615                let (tree, merkle_tree_data) =
616                    self.build_verity_tree(hasher, hash_alg, &salt).await?;
617                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
618            }
619            _ => {
620                bail!(
621                    anyhow!(FxfsError::NotSupported)
622                        .context(format!("hash algorithm not supported"))
623                );
624            }
625        };
626        // TODO(b/314194485): Eventually want streaming writes.
627        // The merkle tree attribute should not require trimming because it should not
628        // exist.
629        self.handle
630            .write_new_attr_in_batches(
631                &mut transaction,
632                AttributeId::FSVERITY_MERKLE,
633                &merkle_tree,
634                WRITE_ATTR_BATCH_SIZE,
635            )
636            .await?;
637        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
638            self.store().remove_attribute_from_graveyard(
639                &mut transaction,
640                self.object_id(),
641                AttributeId::FSVERITY_MERKLE,
642            );
643        };
644        let descriptor_decoded =
645            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
646        let descriptor = FsverityStateInner {
647            root_digest,
648            salt,
649            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
650        };
651        self.set_fsverity_state_pending(descriptor);
652        transaction.add_with_object(
653            self.store().store_object_id(),
654            Mutation::replace_or_insert_object(
655                ObjectKey::attribute(self.object_id(), AttributeId::DATA, AttributeKey::Attribute),
656                ObjectValue::verified_attribute(
657                    self.get_size(),
658                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
659                ),
660            ),
661            AssocObj::Borrowed(self),
662        );
663        transaction.commit().await?;
664        Ok(())
665    }
666
667    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
668    /// range is beyond the end of the file, the file size is updated.
669    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
670        debug_assert!(range.start < range.end);
671
672        // It's not required that callers of allocate use block aligned ranges, but we need to make
673        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
674        // what was asked for for block alignment purposes. We just need to make sure that the size
675        // of the file is still the non-block-aligned end of the range if the size was changed.
676        let mut new_range = range.clone();
677        new_range.start = round_down(new_range.start, self.block_size());
678        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
679        // required error code when the requested range is larger than the file size.
680        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
681
682        let mut transaction = self.new_transaction().await?;
683        let mut to_allocate = Vec::new();
684        let mut to_switch = Vec::new();
685        let key_id = self.get_key(None).await?.0;
686
687        {
688            let tree = &self.store().tree;
689            let layer_set = tree.layer_set();
690            let offset_key = ObjectKey::attribute(
691                self.object_id(),
692                self.attribute_id(),
693                AttributeKey::Extent(Extent::search_key_from_offset(new_range.start)),
694            );
695            let mut merger = layer_set.merger();
696            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
697
698            loop {
699                match iter.get() {
700                    Some(ItemRef {
701                        key:
702                            ObjectKey {
703                                object_id,
704                                data:
705                                    ObjectKeyData::Attribute(
706                                        attribute_id,
707                                        AttributeKey::Extent(extent_key),
708                                    ),
709                            },
710                        value: ObjectValue::Extent(extent_value),
711                        ..
712                    }) if *object_id == self.object_id()
713                        && *attribute_id == self.attribute_id() =>
714                    {
715                        // If the start of this extent is beyond the end of the range we are
716                        // allocating, we don't have any more work to do.
717                        if new_range.end <= extent_key.start {
718                            break;
719                        }
720                        // Add any prefix we might need to allocate.
721                        if new_range.start < extent_key.start {
722                            to_allocate.push(new_range.start..extent_key.start);
723                            new_range.start = extent_key.start;
724                        }
725                        let device_offset = match extent_value {
726                            ExtentValue::None => {
727                                // If the extent value is None, it indicates a deleted extent. In
728                                // that case, we just skip it entirely. By keeping the new_range
729                                // where it is, this section will get included in the new
730                                // allocations.
731                                iter.advance().await?;
732                                continue;
733                            }
734                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
735                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
736                                // If this extent is already in overwrite mode, we can skip it.
737                                if extent_key.end < new_range.end {
738                                    new_range.start = extent_key.end;
739                                    iter.advance().await?;
740                                    continue;
741                                } else {
742                                    new_range.start = new_range.end;
743                                    break;
744                                }
745                            }
746                            ExtentValue::Some { device_offset, .. } => *device_offset,
747                        };
748
749                        // Figure out how we have to break up the ranges.
750                        let device_offset = device_offset + (new_range.start - extent_key.start);
751                        if extent_key.end < new_range.end {
752                            to_switch.push((new_range.start..extent_key.end, device_offset));
753                            new_range.start = extent_key.end;
754                        } else {
755                            to_switch.push((new_range.start..new_range.end, device_offset));
756                            new_range.start = new_range.end;
757                            break;
758                        }
759                    }
760                    // The records are sorted so if we find something that isn't an extent or
761                    // doesn't match the object id then there are no more extent records for this
762                    // object.
763                    _ => break,
764                }
765                iter.advance().await?;
766            }
767        }
768
769        if new_range.start < new_range.end {
770            to_allocate.push(new_range.clone());
771        }
772
773        // We can update the size in the first transaction because even if subsequent transactions
774        // don't get replayed, the data between the current and new end of the file will be zero
775        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
776        // in the first transaction, overwrite extents may be written past the end of the file
777        // which is an fsck error.
778        //
779        // The potential new size needs to be the non-block-aligned range end - we round up to the
780        // nearest block size for the actual allocation, but shouldn't do that for the file size.
781        let new_size = std::cmp::max(range.end, self.get_size());
782        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
783        // first transaction, in case we split transactions. This makes it okay to only replay the
784        // first transaction if power loss occurs - the file will be in an unusual state, but not
785        // an invalid one, if only part of the allocate goes through.
786        transaction.add_with_object(
787            self.store().store_object_id(),
788            Mutation::replace_or_insert_object(
789                ObjectKey::attribute(
790                    self.object_id(),
791                    self.attribute_id(),
792                    AttributeKey::Attribute,
793                ),
794                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
795            ),
796            AssocObj::Borrowed(self),
797        );
798
799        // The maximum number of mutations we are going to allow per transaction in allocate. This
800        // is probably quite a bit lower than the actual limit, but it should be large enough to
801        // handle most non-edge-case versions of allocate without splitting the transaction.
802        const MAX_TRANSACTION_SIZE: usize = 256;
803        for (switch_range, device_offset) in to_switch {
804            transaction.add_with_object(
805                self.store().store_object_id(),
806                Mutation::merge_object(
807                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
808                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
809                        device_offset,
810                        key_id,
811                    )),
812                ),
813                AssocObj::Borrowed(self),
814            );
815            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
816                transaction.commit_and_continue().await?;
817            }
818        }
819
820        let mut allocated = 0;
821        let allocator = self.store().allocator();
822        for mut allocate_range in to_allocate {
823            while allocate_range.start < allocate_range.end {
824                let device_range = allocator
825                    .allocate(
826                        &mut transaction,
827                        self.store().store_object_id(),
828                        allocate_range.end - allocate_range.start,
829                    )
830                    .await
831                    .context("allocation failed")?;
832                let device_range_len = device_range.end - device_range.start;
833
834                transaction.add_with_object(
835                    self.store().store_object_id(),
836                    Mutation::merge_object(
837                        ObjectKey::extent(
838                            self.object_id(),
839                            self.attribute_id(),
840                            allocate_range.start..allocate_range.start + device_range_len,
841                        ),
842                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
843                            device_range.start,
844                            (device_range_len / self.block_size()) as usize,
845                            key_id,
846                        )),
847                    ),
848                    AssocObj::Borrowed(self),
849                );
850
851                allocate_range.start += device_range_len;
852                allocated += device_range_len;
853
854                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
855                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
856                    transaction.commit_and_continue().await?;
857                    allocated = 0;
858                }
859            }
860        }
861
862        self.update_allocated_size(&mut transaction, allocated, 0).await?;
863        transaction.commit().await?;
864
865        Ok(())
866    }
867
868    /// Return information on a contiguous set of extents that has the same allocation status,
869    /// starting from `start_offset`. The information returned is if this set of extents are marked
870    /// allocated/not allocated and also the size of this set (in bytes). This is used when
871    /// querying slices for volumes.
872    /// This function expects `start_offset` to be aligned to block size
873    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
874        let block_size = self.block_size();
875        assert_eq!(start_offset % block_size, 0);
876
877        if start_offset > self.get_size() {
878            bail!(FxfsError::OutOfRange)
879        }
880
881        if start_offset == self.get_size() {
882            return Ok((false, 0));
883        }
884
885        let tree = &self.store().tree;
886        let layer_set = tree.layer_set();
887        let offset_key = ObjectKey::attribute(
888            self.object_id(),
889            self.attribute_id(),
890            AttributeKey::Extent(Extent::search_key_from_offset(start_offset)),
891        );
892        let mut merger = layer_set.merger();
893        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
894
895        let mut allocated = None;
896        let mut end = start_offset;
897
898        loop {
899            // Iterate through the extents, each time setting `end` as the end of the previous
900            // extent
901            match iter.get() {
902                Some(ItemRef {
903                    key:
904                        ObjectKey {
905                            object_id,
906                            data:
907                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
908                        },
909                    value: ObjectValue::Extent(extent_value),
910                    ..
911                }) => {
912                    // Equivalent of getting no extents back
913                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
914                        if allocated == Some(false) || allocated.is_none() {
915                            end = self.get_size();
916                            allocated = Some(false);
917                        }
918                        break;
919                    }
920                    ensure!(extent_key.is_aligned(block_size), FxfsError::Inconsistent);
921                    if extent_key.start > end {
922                        // If a previous extent has already been visited and we are tracking an
923                        // allocated set, we are only interested in an extent where the range of the
924                        // current extent follows immediately after the previous one.
925                        if allocated == Some(true) {
926                            break;
927                        } else {
928                            // The gap between the previous `end` and this extent is not allocated
929                            end = extent_key.start;
930                            allocated = Some(false);
931                            // Continue this iteration, except now the `end` is set to the end of
932                            // the "previous" extent which is this gap between the start_offset
933                            // and the current extent
934                        }
935                    }
936
937                    // We can assume that from here, the `end` points to the end of a previous
938                    // extent.
939                    match extent_value {
940                        // The current extent has been allocated
941                        ExtentValue::Some { .. } => {
942                            // Stop searching if previous extent was marked deleted
943                            if allocated == Some(false) {
944                                break;
945                            }
946                            allocated = Some(true);
947                        }
948                        // This extent has been marked deleted
949                        ExtentValue::None => {
950                            // Stop searching if previous extent was marked allocated
951                            if allocated == Some(true) {
952                                break;
953                            }
954                            allocated = Some(false);
955                        }
956                    }
957                    end = extent_key.end;
958                }
959                // This occurs when there are no extents left
960                None => {
961                    if allocated == Some(false) || allocated.is_none() {
962                        end = self.get_size();
963                        allocated = Some(false);
964                    }
965                    // Otherwise, we were monitoring extents that were allocated, so just exit.
966                    break;
967                }
968                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
969                Some(_) => {}
970            }
971            iter.advance().await?;
972        }
973
974        Ok((allocated.unwrap(), end - start_offset))
975    }
976
977    pub async fn txn_write<'a>(
978        &'a self,
979        transaction: &mut Transaction<'a>,
980        offset: u64,
981        buf: BufferRef<'_>,
982    ) -> Result<(), Error> {
983        if buf.is_empty() {
984            return Ok(());
985        }
986        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
987        self.multi_write(
988            transaction,
989            self.attribute_id(),
990            std::slice::from_ref(&aligned),
991            transfer_buf.as_mut(),
992        )
993        .await?;
994        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
995            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
996        }
997        Ok(())
998    }
999
1000    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1001    // if encryption takes place.  The ranges must all be aligned and no change to content size is
1002    // applied; the caller is responsible for updating size if required.
1003    pub async fn multi_write<'a>(
1004        &'a self,
1005        transaction: &mut Transaction<'a>,
1006        attribute_id: AttributeId,
1007        ranges: &[Range<u64>],
1008        buf: MutableBufferRef<'_>,
1009    ) -> Result<(), Error> {
1010        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
1011    }
1012
1013    // `buf` is mutable as an optimization, since the write may require encryption, we can
1014    // encrypt the buffer in-place rather than copying to another buffer if the write is
1015    // already aligned.
1016    //
1017    // Note: in the event of power failure during an overwrite() call, it is possible that
1018    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
1019    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
1020    pub async fn overwrite(
1021        &self,
1022        mut offset: u64,
1023        mut buf: MutableBufferRef<'_>,
1024        options: OverwriteOptions,
1025    ) -> Result<(), Error> {
1026        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
1027        let end = offset + buf.len() as u64;
1028
1029        let key_id = self.get_key(None).await?.0;
1030
1031        // The transaction only ends up being used if allow_allocations is true
1032        let mut transaction =
1033            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
1034
1035        // We build up a list of writes to perform later
1036        let writes = FuturesUnordered::new();
1037
1038        if options.barrier_on_first_write {
1039            self.store().device.barrier();
1040        }
1041
1042        // We create a new scope here, so that the merger iterator will get dropped before we try to
1043        // commit our transaction. Otherwise the transaction commit would block.
1044        {
1045            let store = self.store();
1046            let store_object_id = store.store_object_id;
1047            let allocator = store.allocator();
1048            let tree = &store.tree;
1049            let layer_set = tree.layer_set();
1050            let mut merger = layer_set.merger();
1051            let mut iter = merger
1052                .query(Query::FullRange(&ObjectKey::attribute(
1053                    self.object_id(),
1054                    self.attribute_id(),
1055                    AttributeKey::Extent(Extent::search_key_from_offset(offset)),
1056                )))
1057                .await?;
1058            let block_size = self.block_size();
1059
1060            loop {
1061                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
1062                    Some(ItemRef {
1063                        key:
1064                            ObjectKey {
1065                                object_id,
1066                                data:
1067                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1068                            },
1069                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
1070                        ..
1071                    }) if *object_id == self.object_id()
1072                        && *attribute_id == self.attribute_id()
1073                        && extent.end == offset =>
1074                    {
1075                        iter.advance().await?;
1076                        continue;
1077                    }
1078                    Some(ItemRef {
1079                        key:
1080                            ObjectKey {
1081                                object_id,
1082                                data:
1083                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1084                            },
1085                        value,
1086                        ..
1087                    }) if *object_id == self.object_id()
1088                        && *attribute_id == self.attribute_id()
1089                        && extent.start <= offset =>
1090                    {
1091                        match value {
1092                            ObjectValue::Extent(ExtentValue::Some {
1093                                device_offset,
1094                                mode: ExtentMode::Raw,
1095                                ..
1096                            }) => {
1097                                ensure!(
1098                                    extent.is_aligned(block_size)
1099                                        && device_offset % block_size == 0,
1100                                    FxfsError::Inconsistent
1101                                );
1102                                let offset_within_extent = offset - extent.start;
1103                                let remaining_length_of_extent = (extent
1104                                    .end
1105                                    .checked_sub(offset)
1106                                    .ok_or(FxfsError::Inconsistent)?)
1107                                    as usize;
1108                                // Yields (device_offset, bytes_to_write, should_advance)
1109                                (
1110                                    device_offset + offset_within_extent,
1111                                    min(buf.len(), remaining_length_of_extent),
1112                                    true,
1113                                )
1114                            }
1115                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1116                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1117                                // a new extent without checksums?
1118                                bail!(
1119                                    "extent from ({},{}) which overlaps offset \
1120                                        {} has the wrong extent mode",
1121                                    extent.start,
1122                                    extent.end,
1123                                    offset
1124                                )
1125                            }
1126                            _ => {
1127                                bail!(
1128                                    "overwrite failed: extent overlapping offset {} has \
1129                                      unexpected ObjectValue",
1130                                    offset
1131                                )
1132                            }
1133                        }
1134                    }
1135                    maybe_item_ref => {
1136                        if let Some(transaction) = transaction.as_mut() {
1137                            assert_eq!(options.allow_allocations, true);
1138                            assert_eq!(offset % self.block_size(), 0);
1139
1140                            // We are going to make a new extent, but let's check if there is an
1141                            // extent after us. If there is an extent after us, then we don't want
1142                            // our new extent to bump into it...
1143                            let mut bytes_to_allocate =
1144                                round_up(buf.len() as u64, self.block_size())
1145                                    .ok_or(FxfsError::TooBig)?;
1146                            if let Some(ItemRef {
1147                                key:
1148                                    ObjectKey {
1149                                        object_id,
1150                                        data:
1151                                            ObjectKeyData::Attribute(
1152                                                attribute_id,
1153                                                AttributeKey::Extent(extent),
1154                                            ),
1155                                    },
1156                                ..
1157                            }) = maybe_item_ref
1158                            {
1159                                if *object_id == self.object_id()
1160                                    && *attribute_id == self.attribute_id()
1161                                    && offset < extent.start
1162                                {
1163                                    let bytes_until_next_extent = extent.start - offset;
1164                                    bytes_to_allocate =
1165                                        min(bytes_to_allocate, bytes_until_next_extent);
1166                                }
1167                            }
1168
1169                            let device_range = allocator
1170                                .allocate(transaction, store_object_id, bytes_to_allocate)
1171                                .await?;
1172                            let device_range_len = device_range.end - device_range.start;
1173                            transaction.add(
1174                                store_object_id,
1175                                Mutation::insert_object(
1176                                    ObjectKey::extent(
1177                                        self.object_id(),
1178                                        self.attribute_id(),
1179                                        offset..offset + device_range_len,
1180                                    ),
1181                                    ObjectValue::Extent(ExtentValue::new_raw(
1182                                        device_range.start,
1183                                        key_id,
1184                                    )),
1185                                ),
1186                            );
1187
1188                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1189
1190                            // Yields (device_offset, bytes_to_write, should_advance)
1191                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1192                        } else {
1193                            bail!(
1194                                "no extent overlapping offset {}, \
1195                                and new allocations are not allowed",
1196                                offset
1197                            )
1198                        }
1199                    }
1200                };
1201                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1202                writes.push(self.write_at(offset, current_buf, device_offset));
1203                if remaining_buf.len() == 0 {
1204                    break;
1205                } else {
1206                    buf = remaining_buf;
1207                    offset += bytes_to_write as u64;
1208                    if should_advance {
1209                        iter.advance().await?;
1210                    }
1211                }
1212            }
1213        }
1214
1215        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1216        // The checksums are being ignored here, but we don't need to know them
1217        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1218
1219        if let Some(mut transaction) = transaction {
1220            assert_eq!(options.allow_allocations, true);
1221            if !transaction.is_empty() {
1222                if end > self.get_size() {
1223                    self.grow(&mut transaction, self.get_size(), end).await?;
1224                }
1225                transaction.commit().await?;
1226            }
1227        }
1228
1229        Ok(())
1230    }
1231
1232    // Within a transaction, the size of the object might have changed, so get the size from there
1233    // if it exists, otherwise, fall back on the cached size.
1234    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1235        transaction
1236            .get_object_mutation(
1237                self.store().store_object_id,
1238                ObjectKey::attribute(
1239                    self.object_id(),
1240                    self.attribute_id(),
1241                    AttributeKey::Attribute,
1242                ),
1243            )
1244            .and_then(|m| {
1245                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1246                    Some(size)
1247                } else {
1248                    None
1249                }
1250            })
1251            .unwrap_or_else(|| self.get_size())
1252    }
1253
1254    pub async fn txn_update_size<'a>(
1255        &'a self,
1256        transaction: &mut Transaction<'a>,
1257        new_size: u64,
1258        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1259        // Some it is set to the value, if None it is left unchanged.
1260        update_has_overwrite_extents: Option<bool>,
1261    ) -> Result<(), Error> {
1262        let key =
1263            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1264        let mut mutation = if let Some(mutation) =
1265            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1266        {
1267            mutation.clone()
1268        } else {
1269            ObjectStoreMutation {
1270                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1271                op: Operation::ReplaceOrInsert,
1272            }
1273        };
1274        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1275            *size = new_size;
1276            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1277                *has_overwrite_extents = update_has_overwrite_extents;
1278            }
1279        } else {
1280            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1281        }
1282        transaction.add_with_object(
1283            self.store().store_object_id(),
1284            Mutation::ObjectStore(mutation),
1285            AssocObj::Borrowed(self),
1286        );
1287        Ok(())
1288    }
1289
1290    async fn update_allocated_size(
1291        &self,
1292        transaction: &mut Transaction<'_>,
1293        allocated: u64,
1294        deallocated: u64,
1295    ) -> Result<(), Error> {
1296        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1297    }
1298
1299    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1300        if self
1301            .overwrite_ranges
1302            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1303        {
1304            // This returns true if there were ranges, but this truncate removed them all, which
1305            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1306            Ok(Some(false))
1307        } else {
1308            Ok(None)
1309        }
1310    }
1311
1312    pub async fn shrink<'a>(
1313        &'a self,
1314        transaction: &mut Transaction<'a>,
1315        size: u64,
1316        update_has_overwrite_extents: Option<bool>,
1317    ) -> Result<NeedsTrim, Error> {
1318        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1319        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1320        Ok(needs_trim)
1321    }
1322
1323    pub async fn grow<'a>(
1324        &'a self,
1325        transaction: &mut Transaction<'a>,
1326        old_size: u64,
1327        size: u64,
1328    ) -> Result<(), Error> {
1329        // Before growing the file, we must make sure that a previous trim has completed.
1330        let store = self.store();
1331        while matches!(
1332            store
1333                .trim_some(
1334                    transaction,
1335                    self.object_id(),
1336                    self.attribute_id(),
1337                    TrimMode::FromOffset(old_size)
1338                )
1339                .await?,
1340            TrimResult::Incomplete
1341        ) {
1342            transaction.commit_and_continue().await?;
1343        }
1344        // We might need to zero out the tail of the old last block.
1345        let block_size = self.block_size();
1346        if old_size % block_size != 0 {
1347            let layer_set = store.tree.layer_set();
1348            let mut merger = layer_set.merger();
1349            let aligned_old_size = round_down(old_size, block_size);
1350            let iter = merger
1351                .query(Query::FullRange(&ObjectKey::attribute(
1352                    self.object_id(),
1353                    self.attribute_id(),
1354                    AttributeKey::Extent(Extent::search_key_from_offset(aligned_old_size)),
1355                )))
1356                .await?;
1357            if let Some(ItemRef {
1358                key:
1359                    ObjectKey {
1360                        object_id,
1361                        data:
1362                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1363                    },
1364                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1365                ..
1366            }) = iter.get()
1367            {
1368                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1369                    let device_offset = device_offset
1370                        .checked_add(aligned_old_size - extent_key.start)
1371                        .ok_or(FxfsError::Inconsistent)?;
1372                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1373                    let mut buf = self.allocate_buffer(block_size as usize).await;
1374                    // In the case that this extent is in OverwritePartial mode, there is a
1375                    // possibility that the last block is allocated, but not initialized yet, in
1376                    // which case we don't actually need to bother zeroing out the tail. However,
1377                    // it's not strictly incorrect to change uninitialized data, so we skip the
1378                    // check and blindly do it to keep it simpler here.
1379                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1380                        .await?;
1381                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1382                    self.multi_write(
1383                        transaction,
1384                        *attribute_id,
1385                        &[aligned_old_size..aligned_old_size + block_size],
1386                        buf.as_mut(),
1387                    )
1388                    .await?;
1389                }
1390            }
1391        }
1392        self.txn_update_size(transaction, size, None).await?;
1393        Ok(())
1394    }
1395
1396    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1397    /// Returns a set of device ranges (i.e. potentially multiple extents).
1398    ///
1399    /// It may not be possible to preallocate the entire requested range in one request
1400    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1401    /// we can up to some (arbitrary, internal) limit on transaction size.
1402    ///
1403    /// `file_range.start` is modified to point at the end of the logical range
1404    /// that was preallocated such that repeated calls to `preallocate_range` with new
1405    /// transactions can be used to preallocate ranges of any size.
1406    ///
1407    /// Requested range must be a multiple of block size.
1408    pub async fn preallocate_range<'a>(
1409        &'a self,
1410        transaction: &mut Transaction<'a>,
1411        file_range: &mut Range<u64>,
1412    ) -> Result<Vec<Range<u64>>, Error> {
1413        let block_size = self.block_size();
1414        assert!(file_range.is_aligned(block_size));
1415        assert!(!self.handle.is_encrypted());
1416        let mut ranges = Vec::new();
1417        let tree = &self.store().tree;
1418        let layer_set = tree.layer_set();
1419        let mut merger = layer_set.merger();
1420        let mut iter = merger
1421            .query(Query::FullRange(&ObjectKey::attribute(
1422                self.object_id(),
1423                self.attribute_id(),
1424                AttributeKey::Extent(Extent::search_key_from_offset(file_range.start)),
1425            )))
1426            .await?;
1427        let mut allocated = 0;
1428        let key_id = self.get_key(None).await?.0;
1429        'outer: while file_range.start < file_range.end {
1430            let allocate_end = loop {
1431                match iter.get() {
1432                    // Case for allocated extents for the same object that overlap with file_range.
1433                    Some(ItemRef {
1434                        key:
1435                            ObjectKey {
1436                                object_id,
1437                                data:
1438                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1439                            },
1440                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1441                        ..
1442                    }) if *object_id == self.object_id()
1443                        && *attribute_id == self.attribute_id()
1444                        && extent.start < file_range.end =>
1445                    {
1446                        ensure!(
1447                            extent.is_valid()
1448                                && extent.is_aligned(block_size)
1449                                && device_offset % block_size == 0,
1450                            FxfsError::Inconsistent
1451                        );
1452                        // If the start of the requested file_range overlaps with an existing extent...
1453                        if extent.start <= file_range.start {
1454                            // Record the existing extent and move on.
1455                            let device_range = device_offset
1456                                .checked_add(file_range.start - extent.start)
1457                                .ok_or(FxfsError::Inconsistent)?
1458                                ..device_offset
1459                                    .checked_add(min(extent.end, file_range.end) - extent.start)
1460                                    .ok_or(FxfsError::Inconsistent)?;
1461                            file_range.start += device_range.end - device_range.start;
1462                            ranges.push(device_range);
1463                            if file_range.start >= file_range.end {
1464                                break 'outer;
1465                            }
1466                            iter.advance().await?;
1467                            continue;
1468                        } else {
1469                            // There's nothing allocated between file_range.start and the beginning
1470                            // of this extent.
1471                            break extent.start;
1472                        }
1473                    }
1474                    // Case for deleted extents eclipsed by file_range.
1475                    Some(ItemRef {
1476                        key:
1477                            ObjectKey {
1478                                object_id,
1479                                data:
1480                                    ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1481                            },
1482                        value: ObjectValue::Extent(ExtentValue::None),
1483                        ..
1484                    }) if *object_id == self.object_id()
1485                        && *attribute_id == self.attribute_id()
1486                        && extent.end < file_range.end =>
1487                    {
1488                        iter.advance().await?;
1489                    }
1490                    _ => {
1491                        // We can just preallocate the rest.
1492                        break file_range.end;
1493                    }
1494                }
1495            };
1496            let device_range = self
1497                .store()
1498                .allocator()
1499                .allocate(
1500                    transaction,
1501                    self.store().store_object_id(),
1502                    allocate_end - file_range.start,
1503                )
1504                .await
1505                .context("Allocation failed")?;
1506            allocated += device_range.end - device_range.start;
1507            let this_file_range =
1508                file_range.start..file_range.start + device_range.end - device_range.start;
1509            file_range.start = this_file_range.end;
1510            transaction.add(
1511                self.store().store_object_id,
1512                Mutation::merge_object(
1513                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1514                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1515                ),
1516            );
1517            ranges.push(device_range);
1518            // If we didn't allocate all that we requested, we'll loop around and try again.
1519            // ... unless we have filled the transaction. The caller should check file_range.
1520            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1521                break;
1522            }
1523        }
1524        // Update the file size if it changed.
1525        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1526            self.txn_update_size(transaction, file_range.start, None).await?;
1527        }
1528        self.update_allocated_size(transaction, allocated, 0).await?;
1529        Ok(ranges)
1530    }
1531
1532    pub async fn update_attributes<'a>(
1533        &self,
1534        transaction: &mut Transaction<'a>,
1535        node_attributes: Option<&fio::MutableNodeAttributes>,
1536        change_time: Option<Timestamp>,
1537    ) -> Result<(), Error> {
1538        // This codepath is only called by files, whose wrapping key id users cannot directly set
1539        // as per fscrypt.
1540        ensure!(
1541            !matches!(
1542                node_attributes,
1543                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1544            ),
1545            FxfsError::BadPath
1546        );
1547        self.handle.update_attributes(transaction, node_attributes, change_time).await
1548    }
1549
1550    /// Get the default set of transaction options for this object. This is mostly the overall
1551    /// default, modified by any [`HandleOptions`] held by this handle.
1552    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1553        self.handle.default_transaction_options()
1554    }
1555
1556    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1557        self.new_transaction_with_options(self.default_transaction_options()).await
1558    }
1559
1560    pub async fn new_transaction_with_options<'b>(
1561        &self,
1562        options: Options<'b>,
1563    ) -> Result<Transaction<'b>, Error> {
1564        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1565    }
1566
1567    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1568    pub async fn flush_device(&self) -> Result<(), Error> {
1569        self.handle.flush_device().await
1570    }
1571
1572    /// Reads an entire attribute.
1573    pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1574        self.handle.read_attr(attribute_id).await
1575    }
1576
1577    /// Writes an entire attribute.  This *always* uses the volume data key.
1578    pub async fn write_attr(&self, attribute_id: AttributeId, data: &[u8]) -> Result<(), Error> {
1579        // Must be different attribute otherwise cached size gets out of date.
1580        assert_ne!(attribute_id, self.attribute_id());
1581        let store = self.store();
1582        let mut transaction = self.new_transaction().await?;
1583        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1584            transaction.commit_and_continue().await?;
1585            while matches!(
1586                store
1587                    .trim_some(
1588                        &mut transaction,
1589                        self.object_id(),
1590                        attribute_id,
1591                        TrimMode::FromOffset(data.len() as u64),
1592                    )
1593                    .await?,
1594                TrimResult::Incomplete
1595            ) {
1596                transaction.commit_and_continue().await?;
1597            }
1598        }
1599        transaction.commit().await?;
1600        Ok(())
1601    }
1602
1603    async fn read_and_decrypt(
1604        &self,
1605        device_offset: u64,
1606        file_offset: u64,
1607        buffer: MutableBufferRef<'_>,
1608        key_id: u64,
1609    ) -> Result<(), Error> {
1610        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1611    }
1612
1613    /// Truncates a file to a given size (growing/shrinking as required).
1614    ///
1615    /// Nb: Most code will want to call truncate() instead. This method is used
1616    /// to update the super block -- a case where we must borrow metadata space.
1617    pub async fn truncate_with_options(
1618        &self,
1619        options: Options<'_>,
1620        size: u64,
1621    ) -> Result<(), Error> {
1622        let mut transaction = self.new_transaction_with_options(options).await?;
1623        let old_size = self.get_size();
1624        if size == old_size {
1625            return Ok(());
1626        }
1627        if size < old_size {
1628            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1629            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1630                // The file needs to be trimmed.
1631                transaction.commit_and_continue().await?;
1632                let store = self.store();
1633                while matches!(
1634                    store
1635                        .trim_some(
1636                            &mut transaction,
1637                            self.object_id(),
1638                            self.attribute_id(),
1639                            TrimMode::FromOffset(size)
1640                        )
1641                        .await?,
1642                    TrimResult::Incomplete
1643                ) {
1644                    if let Err(error) = transaction.commit_and_continue().await {
1645                        warn!(error:?; "Failed to trim after truncate");
1646                        return Ok(());
1647                    }
1648                }
1649                if let Err(error) = transaction.commit().await {
1650                    warn!(error:?; "Failed to trim after truncate");
1651                }
1652                return Ok(());
1653            }
1654        } else {
1655            self.grow(&mut transaction, old_size, size).await?;
1656        }
1657        transaction.commit().await?;
1658        Ok(())
1659    }
1660
1661    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1662        // We don't take a read guard here since the object properties are contained in a single
1663        // object, which cannot be inconsistent with itself. The LSM tree does not return
1664        // intermediate states for a single object.
1665        let item = self
1666            .store()
1667            .tree
1668            .find(&ObjectKey::object(self.object_id()))
1669            .await?
1670            .expect("Unable to find object record");
1671        match item.value {
1672            ObjectValue::Object {
1673                kind: ObjectKind::File { refs, .. },
1674                attributes:
1675                    ObjectAttributes {
1676                        creation_time,
1677                        modification_time,
1678                        posix_attributes,
1679                        allocated_size,
1680                        access_time,
1681                        change_time,
1682                        ..
1683                    },
1684            } => Ok(ObjectProperties {
1685                refs,
1686                allocated_size,
1687                data_attribute_size: self.get_size(),
1688                creation_time,
1689                modification_time,
1690                access_time,
1691                change_time,
1692                sub_dirs: 0,
1693                posix_attributes,
1694                dir_type: DirType::Normal,
1695            }),
1696            _ => bail!(FxfsError::NotFile),
1697        }
1698    }
1699
1700    // Returns the contents of this object. This object must be < |limit| bytes in size.
1701    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1702        let size = self.get_size();
1703        if size > limit as u64 {
1704            bail!("Object too big ({} > {})", size, limit);
1705        }
1706        let mut buf = self.allocate_buffer(size as usize).await;
1707        self.read(0u64, buf.as_mut()).await?;
1708        Ok(buf.as_slice().into())
1709    }
1710
1711    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1712    /// their logical offset within the file.
1713    ///
1714    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1715    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1716        let mut extents = Vec::new();
1717        let tree = &self.store().tree;
1718        let layer_set = tree.layer_set();
1719        let mut merger = layer_set.merger();
1720        let mut iter = merger
1721            .query(Query::FullRange(&ObjectKey::attribute(
1722                self.object_id(),
1723                self.attribute_id(),
1724                AttributeKey::Extent(Extent::search_key_from_offset(0)),
1725            )))
1726            .await?;
1727        loop {
1728            match iter.get() {
1729                Some(ItemRef {
1730                    key:
1731                        ObjectKey {
1732                            object_id,
1733                            data:
1734                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1735                        },
1736                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1737                    ..
1738                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1739                    let logical_offset = extent.start;
1740                    let device_range = *device_offset..*device_offset + extent.length()?;
1741                    extents.push(FileExtent::new(logical_offset, device_range)?);
1742                }
1743                _ => break,
1744            }
1745            iter.advance().await?;
1746        }
1747        Ok(extents)
1748    }
1749}
1750
1751impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1752    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1753        match mutation {
1754            Mutation::ObjectStore(ObjectStoreMutation {
1755                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1756                ..
1757            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1758            Mutation::ObjectStore(ObjectStoreMutation {
1759                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1760                ..
1761            }) => {
1762                debug_assert_eq!(
1763                    self.get_size(),
1764                    *size,
1765                    "size should be set when verity is enabled and must not change"
1766                );
1767                self.finalize_fsverity_state()
1768            }
1769            Mutation::ObjectStore(ObjectStoreMutation {
1770                item:
1771                    ObjectItem {
1772                        key:
1773                            ObjectKey {
1774                                object_id,
1775                                data:
1776                                    ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent)),
1777                            },
1778                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1779                        ..
1780                    },
1781                ..
1782            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1783                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1784                    self.overwrite_ranges.apply_range(extent.clone().into())
1785                }
1786                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1787            },
1788            _ => {}
1789        }
1790    }
1791}
1792
1793impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1794    fn set_trace(&self, v: bool) {
1795        self.handle.set_trace(v)
1796    }
1797
1798    fn object_id(&self) -> u64 {
1799        self.handle.object_id()
1800    }
1801
1802    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1803        self.handle.allocate_buffer(size)
1804    }
1805
1806    fn block_size(&self) -> u64 {
1807        self.handle.block_size()
1808    }
1809}
1810
1811#[async_trait]
1812impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1813    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1814        let fs = self.store().filesystem();
1815        let guard = fs
1816            .lock_manager()
1817            .read_lock(lock_keys![LockKey::object_attribute(
1818                self.store().store_object_id,
1819                self.object_id(),
1820                self.attribute_id(),
1821            )])
1822            .await;
1823
1824        let size = self.get_size();
1825        if offset >= size {
1826            return Ok(0);
1827        }
1828        let length = min(buf.len() as u64, size - offset) as usize;
1829        buf = buf.subslice_mut(0..length);
1830        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1831        if self.is_verified_file() {
1832            self.verify_data(offset as usize, buf.as_slice())?;
1833        }
1834        Ok(length)
1835    }
1836
1837    fn get_size(&self) -> u64 {
1838        self.content_size.load(atomic::Ordering::Relaxed)
1839    }
1840}
1841
1842impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1843    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1844        let offset = offset.unwrap_or_else(|| self.get_size());
1845        let mut transaction = self.new_transaction().await?;
1846        self.txn_write(&mut transaction, offset, buf).await?;
1847        let new_size = self.txn_get_size(&transaction);
1848        transaction.commit().await?;
1849        Ok(new_size)
1850    }
1851
1852    async fn truncate(&self, size: u64) -> Result<(), Error> {
1853        self.truncate_with_options(self.default_transaction_options(), size).await
1854    }
1855
1856    async fn flush(&self) -> Result<(), Error> {
1857        Ok(())
1858    }
1859}
1860
1861/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1862/// write go directly to the handle in a transaction.
1863pub struct DirectWriter<'a, S: HandleOwner> {
1864    handle: &'a DataObjectHandle<S>,
1865    options: transaction::Options<'a>,
1866    buffer: Buffer<'a>,
1867    offset: u64,
1868    buf_offset: usize,
1869}
1870
1871const BUFFER_SIZE: usize = 1_048_576;
1872
1873impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1874    fn drop(&mut self) {
1875        if self.buf_offset != 0 {
1876            warn!("DirectWriter: dropping data, did you forget to call complete?");
1877        }
1878    }
1879}
1880
1881impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1882    pub async fn new(
1883        handle: &'a DataObjectHandle<S>,
1884        options: transaction::Options<'a>,
1885    ) -> DirectWriter<'a, S> {
1886        Self {
1887            handle,
1888            options,
1889            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1890            offset: 0,
1891            buf_offset: 0,
1892        }
1893    }
1894
1895    async fn flush(&mut self) -> Result<(), Error> {
1896        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1897        self.handle
1898            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1899            .await?;
1900        transaction.commit().await?;
1901        self.offset += self.buf_offset as u64;
1902        self.buf_offset = 0;
1903        Ok(())
1904    }
1905}
1906
1907impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1908    fn block_size(&self) -> u64 {
1909        self.handle.block_size()
1910    }
1911
1912    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1913        while buf.len() > 0 {
1914            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1915            self.buffer
1916                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1917                .as_mut_slice()
1918                .copy_from_slice(&buf[..to_do]);
1919            self.buf_offset += to_do;
1920            if self.buf_offset == BUFFER_SIZE {
1921                self.flush().await?;
1922            }
1923            buf = &buf[to_do..];
1924        }
1925        Ok(())
1926    }
1927
1928    async fn complete(mut self) -> Result<u64, Error> {
1929        self.flush().await?;
1930        Ok(self.offset + self.buf_offset as u64)
1931    }
1932
1933    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1934        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1935            self.buffer
1936                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1937                .as_mut_slice()
1938                .fill(0);
1939            self.buf_offset += amount as usize;
1940        } else {
1941            self.flush().await?;
1942            self.offset += amount;
1943        }
1944        Ok(())
1945    }
1946}
1947
1948#[cfg(test)]
1949mod tests {
1950    use crate::errors::FxfsError;
1951    use crate::filesystem::{
1952        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1953    };
1954    use crate::fsck::{
1955        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1956    };
1957    use crate::lsm_tree::Query;
1958    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1959    use crate::object_handle::{
1960        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1961    };
1962    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1963    use crate::object_store::directory::replace_child;
1964    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1965    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1966    use crate::object_store::volume::root_volume;
1967    use crate::object_store::{
1968        AttributeId, AttributeKey, DataObjectHandle, DirType, Directory, Extent, ExtentMode,
1969        ExtentValue, HandleOptions, LockKey, NewChildStoreOptions, ObjectKeyData, ObjectStore,
1970        PosixAttributes, StoreOptions, TRANSACTION_MUTATION_THRESHOLD,
1971    };
1972    use crate::range::RangeExt;
1973    use crate::round::{round_down, round_up};
1974    use assert_matches::assert_matches;
1975    use bit_vec::BitVec;
1976    use fidl_fuchsia_io as fio;
1977    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1978    use fuchsia_async as fasync;
1979    use fuchsia_sync::Mutex;
1980    use futures::FutureExt;
1981    use futures::channel::oneshot::channel;
1982    use futures::stream::{FuturesUnordered, StreamExt};
1983    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1984    use fxfs_insecure_crypto::new_insecure_crypt;
1985    use std::ops::Range;
1986    use std::sync::Arc;
1987    use std::time::Duration;
1988    use storage_device::DeviceHolder;
1989    use storage_device::fake_device::FakeDevice;
1990
1991    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1992
1993    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1994    // device block.
1995    const TEST_DATA_OFFSET: u64 = 5000;
1996    const TEST_DATA: &[u8] = b"hello";
1997    const TEST_OBJECT_SIZE: u64 = 5678;
1998    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1999    const TEST_OBJECT_NAME: &str = "foo";
2000
2001    async fn test_filesystem() -> OpenFxFilesystem {
2002        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2003        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2004    }
2005
2006    async fn create_object_with_key(
2007        fs: Arc<FxFilesystem>,
2008        crypt: Option<&dyn Crypt>,
2009        write_object_test_data: bool,
2010    ) -> DataObjectHandle<ObjectStore> {
2011        let store = fs.root_store();
2012        let object;
2013
2014        let mut transaction = fs
2015            .root_store()
2016            .new_transaction(
2017                lock_keys![LockKey::object(
2018                    store.store_object_id(),
2019                    store.root_directory_object_id()
2020                )],
2021                Options::default(),
2022            )
2023            .await
2024            .expect("new_transaction failed");
2025
2026        object = if let Some(crypt) = crypt {
2027            let object_id = store.get_next_object_id().await.unwrap();
2028            let (key, unwrapped_key) =
2029                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
2030            ObjectStore::create_object_with_key(
2031                &store,
2032                &mut transaction,
2033                object_id,
2034                HandleOptions::default(),
2035                EncryptionKey::Fxfs(key),
2036                unwrapped_key,
2037            )
2038            .await
2039            .expect("create_object failed")
2040        } else {
2041            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2042                .await
2043                .expect("create_object failed")
2044        };
2045
2046        let root_directory =
2047            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2048        root_directory
2049            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2050            .await
2051            .expect("add_child_file failed");
2052
2053        if write_object_test_data {
2054            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
2055            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
2056            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
2057            object
2058                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
2059                .await
2060                .expect("write failed");
2061        }
2062        transaction.commit().await.expect("commit failed");
2063        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2064        object
2065    }
2066
2067    async fn test_filesystem_and_object_with_key(
2068        crypt: Option<&dyn Crypt>,
2069        write_object_test_data: bool,
2070    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2071        let fs = test_filesystem().await;
2072        let object = create_object_with_key(fs.clone(), crypt, write_object_test_data).await;
2073        (fs, object)
2074    }
2075
2076    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2077        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2078    }
2079
2080    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2081    {
2082        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2083    }
2084
2085    #[fuchsia::test]
2086    async fn test_zero_buf_len_read() {
2087        let (fs, object) = test_filesystem_and_object().await;
2088        let mut buf = object.allocate_buffer(0).await;
2089        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2090        fs.close().await.expect("Close failed");
2091    }
2092
2093    #[fuchsia::test]
2094    async fn test_beyond_eof_read() {
2095        let (fs, object) = test_filesystem_and_object().await;
2096        let offset = TEST_OBJECT_SIZE as usize - 2;
2097        let align = offset % fs.block_size() as usize;
2098        let len: usize = 2;
2099        let mut buf = object.allocate_buffer(align + len + 1).await;
2100        buf.as_mut_slice().fill(123u8);
2101        assert_eq!(
2102            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2103            align + len
2104        );
2105        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2106        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2107        fs.close().await.expect("Close failed");
2108    }
2109
2110    #[fuchsia::test]
2111    async fn test_beyond_eof_read_from() {
2112        let (fs, object) = test_filesystem_and_object().await;
2113        let handle = &*object;
2114        let offset = TEST_OBJECT_SIZE as usize - 2;
2115        let align = offset % fs.block_size() as usize;
2116        let len: usize = 2;
2117        let mut buf = object.allocate_buffer(align + len + 1).await;
2118        buf.as_mut_slice().fill(123u8);
2119        assert_eq!(
2120            handle
2121                .read(AttributeId::DATA, (offset - align) as u64, buf.as_mut())
2122                .await
2123                .expect("read failed"),
2124            align + len
2125        );
2126        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2127        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2128        fs.close().await.expect("Close failed");
2129    }
2130
2131    #[fuchsia::test]
2132    async fn test_beyond_eof_read_unchecked() {
2133        let (fs, object) = test_filesystem_and_object().await;
2134        let offset = TEST_OBJECT_SIZE as usize - 2;
2135        let align = offset % fs.block_size() as usize;
2136        let len: usize = 2;
2137        let mut buf = object.allocate_buffer(align + len + 1).await;
2138        buf.as_mut_slice().fill(123u8);
2139        let guard = fs
2140            .lock_manager()
2141            .read_lock(lock_keys![LockKey::object_attribute(
2142                object.store().store_object_id,
2143                object.object_id(),
2144                AttributeId::DATA,
2145            )])
2146            .await;
2147        object
2148            .read_unchecked(AttributeId::DATA, (offset - align) as u64, buf.as_mut(), &guard)
2149            .await
2150            .expect("read failed");
2151        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2152        fs.close().await.expect("Close failed");
2153    }
2154
2155    #[fuchsia::test]
2156    async fn test_read_sparse() {
2157        let (fs, object) = test_filesystem_and_object().await;
2158        // Deliberately read not right to eof.
2159        let len = TEST_OBJECT_SIZE as usize - 1;
2160        let mut buf = object.allocate_buffer(len).await;
2161        buf.as_mut_slice().fill(123u8);
2162        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2163        let mut expected = vec![0; len];
2164        let offset = TEST_DATA_OFFSET as usize;
2165        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2166        assert_eq!(buf.as_slice()[..len], expected[..]);
2167        fs.close().await.expect("Close failed");
2168    }
2169
2170    #[fuchsia::test]
2171    async fn test_read_after_writes_interspersed_with_flush() {
2172        let (fs, object) = test_filesystem_and_object().await;
2173
2174        object.owner().flush().await.expect("flush failed");
2175
2176        // Write more test data to the first block fo the file.
2177        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2178        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2179        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2180
2181        let len = TEST_OBJECT_SIZE as usize - 1;
2182        let mut buf = object.allocate_buffer(len).await;
2183        buf.as_mut_slice().fill(123u8);
2184        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2185
2186        let mut expected = vec![0u8; len];
2187        let offset = TEST_DATA_OFFSET as usize;
2188        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2189        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2190        assert_eq!(buf.as_slice(), &expected);
2191        fs.close().await.expect("Close failed");
2192    }
2193
2194    #[fuchsia::test]
2195    async fn test_read_after_truncate_and_extend() {
2196        let (fs, object) = test_filesystem_and_object().await;
2197
2198        // Arrange for there to be <extent><deleted-extent><extent>.
2199        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2200        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2201        // This adds an extent at 0..512.
2202        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2203        // This deletes 512..1024.
2204        object.truncate(3).await.expect("truncate failed");
2205        let data = b"foo";
2206        let offset = 1500u64;
2207        let align = (offset % fs.block_size() as u64) as usize;
2208        let mut buf = object.allocate_buffer(align + data.len()).await;
2209        buf.as_mut_slice()[align..].copy_from_slice(data);
2210        // This adds 1024..1536.
2211        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2212
2213        const LEN1: usize = 1503;
2214        let mut buf = object.allocate_buffer(LEN1).await;
2215        buf.as_mut_slice().fill(123u8);
2216        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2217        let mut expected = [0; LEN1];
2218        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2219        expected[1500..].copy_from_slice(b"foo");
2220        assert_eq!(buf.as_slice(), &expected);
2221
2222        // Also test a read that ends midway through the deleted extent.
2223        const LEN2: usize = 601;
2224        let mut buf = object.allocate_buffer(LEN2).await;
2225        buf.as_mut_slice().fill(123u8);
2226        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2227        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2228        fs.close().await.expect("Close failed");
2229    }
2230
2231    #[fuchsia::test]
2232    async fn test_read_whole_blocks_with_multiple_objects() {
2233        let (fs, object) = test_filesystem_and_object().await;
2234        let block_size = object.block_size() as usize;
2235        let mut buffer = object.allocate_buffer(block_size).await;
2236        buffer.as_mut_slice().fill(0xaf);
2237        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2238
2239        let store = object.owner();
2240        let mut transaction = fs
2241            .root_store()
2242            .new_transaction(lock_keys![], Options::default())
2243            .await
2244            .expect("new_transaction failed");
2245        let object2 =
2246            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2247                .await
2248                .expect("create_object failed");
2249        transaction.commit().await.expect("commit failed");
2250        let mut ef_buffer = object.allocate_buffer(block_size).await;
2251        ef_buffer.as_mut_slice().fill(0xef);
2252        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2253
2254        let mut buffer = object.allocate_buffer(block_size).await;
2255        buffer.as_mut_slice().fill(0xaf);
2256        object
2257            .write_or_append(Some(block_size as u64), buffer.as_ref())
2258            .await
2259            .expect("write failed");
2260        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2261        object2
2262            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2263            .await
2264            .expect("write failed");
2265
2266        let mut buffer = object.allocate_buffer(4 * block_size).await;
2267        buffer.as_mut_slice().fill(123);
2268        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2269        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2270        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2271        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2272        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2273        fs.close().await.expect("Close failed");
2274    }
2275
2276    #[fuchsia::test]
2277    async fn test_alignment() {
2278        let (fs, object) = test_filesystem_and_object().await;
2279
2280        struct AlignTest {
2281            fill: u8,
2282            object: DataObjectHandle<ObjectStore>,
2283            mirror: Vec<u8>,
2284        }
2285
2286        impl AlignTest {
2287            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2288                let mirror = {
2289                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2290                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2291                    buf.as_slice().to_vec()
2292                };
2293                Self { fill: 0, object, mirror }
2294            }
2295
2296            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2297            // operation to an in-memory copy of the object.
2298            // Each subsequent call bumps the value of fill.
2299            // It is expected that the object and its mirror maintain identical content.
2300            async fn test(&mut self, range: Range<u64>) {
2301                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2302                self.fill += 1;
2303                buf.as_mut_slice().fill(self.fill);
2304                self.object
2305                    .write_or_append(Some(range.start), buf.as_ref())
2306                    .await
2307                    .expect("write_or_append failed");
2308                if range.end > self.mirror.len() as u64 {
2309                    self.mirror.resize(range.end as usize, 0);
2310                }
2311                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2312                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2313                assert_eq!(
2314                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2315                    self.mirror.len()
2316                );
2317                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2318            }
2319        }
2320
2321        let block_size = object.block_size() as u64;
2322        let mut align = AlignTest::new(object).await;
2323
2324        // Fill the object to start with (with 1).
2325        align.test(0..2 * block_size + 1).await;
2326
2327        // Unaligned head (fills with 2, overwrites that with 3).
2328        align.test(1..block_size).await;
2329        align.test(1..2 * block_size).await;
2330
2331        // Unaligned tail (fills with 4 and 5).
2332        align.test(0..block_size - 1).await;
2333        align.test(0..2 * block_size - 1).await;
2334
2335        // Both unaligned (fills with 6 and 7).
2336        align.test(1..block_size - 1).await;
2337        align.test(1..2 * block_size - 1).await;
2338
2339        fs.close().await.expect("Close failed");
2340    }
2341
2342    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2343        let allocator = fs.allocator();
2344        let allocated_before = allocator.get_allocated_bytes();
2345        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2346        object
2347            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2348            .await
2349            .expect("preallocate_range failed");
2350        transaction.commit().await.expect("commit failed");
2351        assert!(object.get_size() < 1048576);
2352        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2353        object
2354            .preallocate_range(&mut transaction, &mut (0..1048576))
2355            .await
2356            .expect("preallocate_range failed");
2357        transaction.commit().await.expect("commit failed");
2358        assert_eq!(object.get_size(), 1048576);
2359        // Check that it didn't reallocate the space for the existing extent
2360        let allocated_after = allocator.get_allocated_bytes();
2361        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2362
2363        let mut buf = object
2364            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2365            .await;
2366        buf.as_mut_slice().fill(47);
2367        object
2368            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2369            .await
2370            .expect("write failed");
2371        buf.as_mut_slice().fill(95);
2372        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2373        object
2374            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2375            .await
2376            .expect("write failed");
2377
2378        // Make sure there were no more allocations.
2379        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2380
2381        // Read back the data and make sure it is what we expect.
2382        let mut buf = object.allocate_buffer(104876).await;
2383        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2384        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2385        assert_eq!(
2386            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2387            TEST_DATA
2388        );
2389        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2390    }
2391
2392    #[fuchsia::test]
2393    async fn test_preallocate_range() {
2394        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2395        test_preallocate_common(&fs, object).await;
2396        fs.close().await.expect("Close failed");
2397    }
2398
2399    // This is identical to the previous test except that we flush so that extents end up in
2400    // different layers.
2401    #[fuchsia::test]
2402    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2403        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2404        object.owner().flush().await.expect("flush failed");
2405        test_preallocate_common(&fs, object).await;
2406        fs.close().await.expect("Close failed");
2407    }
2408
2409    #[fuchsia::test]
2410    async fn test_already_preallocated() {
2411        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2412        let allocator = fs.allocator();
2413        let allocated_before = allocator.get_allocated_bytes();
2414        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2415        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2416        object
2417            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2418            .await
2419            .expect("preallocate_range failed");
2420        transaction.commit().await.expect("commit failed");
2421        // Check that it didn't reallocate any new space.
2422        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2423        fs.close().await.expect("Close failed");
2424    }
2425
2426    #[fuchsia::test]
2427    async fn test_overwrite_when_preallocated_at_start_of_file() {
2428        // The standard test data we put in the test object would cause an extent with checksums
2429        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2430        let (fs, object) = test_filesystem_and_empty_object().await;
2431
2432        let object = ObjectStore::open_object(
2433            object.owner(),
2434            object.object_id(),
2435            HandleOptions::default(),
2436            None,
2437        )
2438        .await
2439        .expect("open_object failed");
2440
2441        assert_eq!(fs.block_size(), 4096);
2442
2443        let mut write_buf = object.allocate_buffer(4096).await;
2444        write_buf.as_mut_slice().fill(95);
2445
2446        // First try to overwrite without allowing allocations
2447        // We expect this to fail, since nothing is allocated yet
2448        object
2449            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2450            .await
2451            .expect_err("overwrite succeeded");
2452
2453        // Now preallocate some space (exactly one block)
2454        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2455        object
2456            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2457            .await
2458            .expect("preallocate_range failed");
2459        transaction.commit().await.expect("commit failed");
2460
2461        // Now try the same overwrite command as before, it should work this time,
2462        // even with allocations disabled...
2463        {
2464            let mut read_buf = object.allocate_buffer(4096).await;
2465            object.read(0, read_buf.as_mut()).await.expect("read failed");
2466            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2467        }
2468        object
2469            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2470            .await
2471            .expect("overwrite failed");
2472        {
2473            let mut read_buf = object.allocate_buffer(4096).await;
2474            object.read(0, read_buf.as_mut()).await.expect("read failed");
2475            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2476        }
2477
2478        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2479        // one block earlier at offset 0
2480        object
2481            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2482            .await
2483            .expect_err("overwrite succeeded");
2484
2485        // We can't assert anything about the existing bytes, because they haven't been allocated
2486        // yet and they could contain any values
2487        object
2488            .overwrite(
2489                4096,
2490                write_buf.as_mut(),
2491                OverwriteOptions { allow_allocations: true, ..Default::default() },
2492            )
2493            .await
2494            .expect("overwrite failed");
2495        {
2496            let mut read_buf = object.allocate_buffer(4096).await;
2497            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2498            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2499        }
2500
2501        // Check that the overwrites haven't messed up the filesystem state
2502        let fsck_options = FsckOptions {
2503            fail_on_warning: true,
2504            no_lock: true,
2505            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2506            ..Default::default()
2507        };
2508        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2509
2510        fs.close().await.expect("Close failed");
2511    }
2512
2513    #[fuchsia::test]
2514    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2515        // The standard test data we put in the test object would cause an extent with checksums
2516        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2517        let (fs, object) = test_filesystem_and_empty_object().await;
2518
2519        let object = ObjectStore::open_object(
2520            object.owner(),
2521            object.object_id(),
2522            HandleOptions::default(),
2523            None,
2524        )
2525        .await
2526        .expect("open_object failed");
2527
2528        assert_eq!(fs.block_size(), 4096);
2529        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2530
2531        // Let's create some non-holes
2532        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2533        object
2534            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2535            .await
2536            .expect("preallocate_range failed");
2537        object
2538            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2539            .await
2540            .expect("preallocate_range failed");
2541        object
2542            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2543            .await
2544            .expect("preallocate_range failed");
2545        object
2546            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2547            .await
2548            .expect("preallocate_range failed");
2549        transaction.commit().await.expect("commit failed");
2550
2551        assert_eq!(object.get_size(), 524288);
2552
2553        let mut write_buf = object.allocate_buffer(4096).await;
2554        write_buf.as_mut_slice().fill(95);
2555
2556        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2557        object
2558            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2559            .await
2560            .expect_err("overwrite succeeded");
2561        object
2562            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2563            .await
2564            .expect_err("overwrite succeeded");
2565        object
2566            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2567            .await
2568            .expect_err("overwrite succeeded");
2569        object
2570            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2571            .await
2572            .expect_err("overwrite succeeded");
2573
2574        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2575        {
2576            let mut read_buf = object.allocate_buffer(4096).await;
2577            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2578            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2579        }
2580        object
2581            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2582            .await
2583            .expect("overwrite failed");
2584        {
2585            let mut read_buf = object.allocate_buffer(4096).await;
2586            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2587            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2588        }
2589        {
2590            let mut read_buf = object.allocate_buffer(4096).await;
2591            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2592            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2593        }
2594        object
2595            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2596            .await
2597            .expect("overwrite failed");
2598        {
2599            let mut read_buf = object.allocate_buffer(4096).await;
2600            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2601            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2602        }
2603        {
2604            let mut read_buf = object.allocate_buffer(4096).await;
2605            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2606            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2607        }
2608        object
2609            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2610            .await
2611            .expect("overwrite failed");
2612        {
2613            let mut read_buf = object.allocate_buffer(4096).await;
2614            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2615            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2616        }
2617        {
2618            let mut read_buf = object.allocate_buffer(4096).await;
2619            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2620            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2621        }
2622        object
2623            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2624            .await
2625            .expect("overwrite failed");
2626        {
2627            let mut read_buf = object.allocate_buffer(4096).await;
2628            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2629            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2630        }
2631
2632        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2633        let mut huge_write_buf = object.allocate_buffer(524288).await;
2634        huge_write_buf.as_mut_slice().fill(96);
2635
2636        // With allocations disabled, the big overwrite should fail...
2637        object
2638            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2639            .await
2640            .expect_err("overwrite succeeded");
2641        // ... but it should work when allocations are enabled
2642        object
2643            .overwrite(
2644                0,
2645                huge_write_buf.as_mut(),
2646                OverwriteOptions { allow_allocations: true, ..Default::default() },
2647            )
2648            .await
2649            .expect("overwrite failed");
2650        {
2651            let mut read_buf = object.allocate_buffer(524288).await;
2652            object.read(0, read_buf.as_mut()).await.expect("read failed");
2653            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2654        }
2655
2656        // Check that the overwrites haven't messed up the filesystem state
2657        let fsck_options = FsckOptions {
2658            fail_on_warning: true,
2659            no_lock: true,
2660            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2661            ..Default::default()
2662        };
2663        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2664
2665        fs.close().await.expect("Close failed");
2666    }
2667
2668    #[fuchsia::test]
2669    async fn test_overwrite_when_unallocated_at_start_of_file() {
2670        // The standard test data we put in the test object would cause an extent with checksums
2671        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2672        let (fs, object) = test_filesystem_and_empty_object().await;
2673
2674        let object = ObjectStore::open_object(
2675            object.owner(),
2676            object.object_id(),
2677            HandleOptions::default(),
2678            None,
2679        )
2680        .await
2681        .expect("open_object failed");
2682
2683        assert_eq!(fs.block_size(), 4096);
2684
2685        let mut write_buf = object.allocate_buffer(4096).await;
2686        write_buf.as_mut_slice().fill(95);
2687
2688        // First try to overwrite without allowing allocations
2689        // We expect this to fail, since nothing is allocated yet
2690        object
2691            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2692            .await
2693            .expect_err("overwrite succeeded");
2694
2695        // Now try the same overwrite command as before, but allow allocations
2696        object
2697            .overwrite(
2698                0,
2699                write_buf.as_mut(),
2700                OverwriteOptions { allow_allocations: true, ..Default::default() },
2701            )
2702            .await
2703            .expect("overwrite failed");
2704        {
2705            let mut read_buf = object.allocate_buffer(4096).await;
2706            object.read(0, read_buf.as_mut()).await.expect("read failed");
2707            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2708        }
2709
2710        // Now try to overwrite at the next block. This should fail if allocations are disabled
2711        object
2712            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2713            .await
2714            .expect_err("overwrite succeeded");
2715
2716        // ... but it should work if allocations are enabled
2717        object
2718            .overwrite(
2719                4096,
2720                write_buf.as_mut(),
2721                OverwriteOptions { allow_allocations: true, ..Default::default() },
2722            )
2723            .await
2724            .expect("overwrite failed");
2725        {
2726            let mut read_buf = object.allocate_buffer(4096).await;
2727            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2728            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2729        }
2730
2731        // Check that the overwrites haven't messed up the filesystem state
2732        let fsck_options = FsckOptions {
2733            fail_on_warning: true,
2734            no_lock: true,
2735            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2736            ..Default::default()
2737        };
2738        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2739
2740        fs.close().await.expect("Close failed");
2741    }
2742
2743    #[fuchsia::test]
2744    async fn test_overwrite_can_extend_a_file() {
2745        // The standard test data we put in the test object would cause an extent with checksums
2746        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2747        let (fs, object) = test_filesystem_and_empty_object().await;
2748
2749        let object = ObjectStore::open_object(
2750            object.owner(),
2751            object.object_id(),
2752            HandleOptions::default(),
2753            None,
2754        )
2755        .await
2756        .expect("open_object failed");
2757
2758        assert_eq!(fs.block_size(), 4096);
2759        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2760
2761        let mut write_buf = object.allocate_buffer(4096).await;
2762        write_buf.as_mut_slice().fill(95);
2763
2764        // Let's try to fill up the last block, and increase the file size in doing so
2765        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2766
2767        // Expected to fail with allocations disabled
2768        object
2769            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2770            .await
2771            .expect_err("overwrite succeeded");
2772        // ... but expected to succeed with allocations enabled
2773        object
2774            .overwrite(
2775                last_block_offset,
2776                write_buf.as_mut(),
2777                OverwriteOptions { allow_allocations: true, ..Default::default() },
2778            )
2779            .await
2780            .expect("overwrite failed");
2781        {
2782            let mut read_buf = object.allocate_buffer(4096).await;
2783            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2784            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2785        }
2786
2787        assert_eq!(object.get_size(), 8192);
2788
2789        // Let's try to write at the next block, too
2790        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2791
2792        // Expected to fail with allocations disabled
2793        object
2794            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2795            .await
2796            .expect_err("overwrite succeeded");
2797        // ... but expected to succeed with allocations enabled
2798        object
2799            .overwrite(
2800                next_block_offset,
2801                write_buf.as_mut(),
2802                OverwriteOptions { allow_allocations: true, ..Default::default() },
2803            )
2804            .await
2805            .expect("overwrite failed");
2806        {
2807            let mut read_buf = object.allocate_buffer(4096).await;
2808            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2809            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2810        }
2811
2812        assert_eq!(object.get_size(), 12288);
2813
2814        // Check that the overwrites haven't messed up the filesystem state
2815        let fsck_options = FsckOptions {
2816            fail_on_warning: true,
2817            no_lock: true,
2818            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2819            ..Default::default()
2820        };
2821        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2822
2823        fs.close().await.expect("Close failed");
2824    }
2825
2826    #[fuchsia::test]
2827    async fn test_enable_verity() {
2828        let fs: OpenFxFilesystem = test_filesystem().await;
2829        let mut transaction = fs
2830            .root_store()
2831            .new_transaction(lock_keys![], Options::default())
2832            .await
2833            .expect("new_transaction failed");
2834        let store = fs.root_store();
2835        let object = Arc::new(
2836            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2837                .await
2838                .expect("create_object failed"),
2839        );
2840
2841        transaction.commit().await.unwrap();
2842
2843        object
2844            .enable_verity(fio::VerificationOptions {
2845                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2846                salt: Some(vec![]),
2847                ..Default::default()
2848            })
2849            .await
2850            .expect("set verified file metadata failed");
2851
2852        let handle =
2853            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2854                .await
2855                .expect("open_object failed");
2856
2857        assert!(handle.is_verified_file());
2858
2859        fs.close().await.expect("Close failed");
2860    }
2861
2862    #[fuchsia::test]
2863    async fn test_enable_verity_large_file() {
2864        // Need to make a large FakeDevice to create space for a 67 MB file.
2865        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2866        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2867        let root_store = fs.root_store();
2868        let mut transaction = fs
2869            .root_store()
2870            .new_transaction(lock_keys![], Options::default())
2871            .await
2872            .expect("new_transaction failed");
2873
2874        let handle = ObjectStore::create_object(
2875            &root_store,
2876            &mut transaction,
2877            HandleOptions::default(),
2878            None,
2879        )
2880        .await
2881        .expect("failed to create object");
2882        transaction.commit().await.expect("commit failed");
2883        let mut offset = 0;
2884
2885        // Write a file big enough to trigger multiple transactions on enable_verity().
2886        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2887        buf.as_mut_slice().fill(1);
2888        for _ in 0..130 {
2889            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2890            offset += WRITE_ATTR_BATCH_SIZE as u64;
2891        }
2892
2893        handle
2894            .enable_verity(fio::VerificationOptions {
2895                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2896                salt: Some(vec![]),
2897                ..Default::default()
2898            })
2899            .await
2900            .expect("set verified file metadata failed");
2901
2902        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2903        offset = 0;
2904        for _ in 0..130 {
2905            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2906            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2907            offset += WRITE_ATTR_BATCH_SIZE as u64;
2908        }
2909
2910        fsck(fs.clone()).await.expect("fsck failed");
2911        fs.close().await.expect("Close failed");
2912    }
2913
2914    #[fuchsia::test]
2915    async fn test_retry_enable_verity_on_reboot() {
2916        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2917        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2918        let root_store = fs.root_store();
2919        let mut transaction = fs
2920            .root_store()
2921            .new_transaction(lock_keys![], Options::default())
2922            .await
2923            .expect("new_transaction failed");
2924
2925        let handle = ObjectStore::create_object(
2926            &root_store,
2927            &mut transaction,
2928            HandleOptions::default(),
2929            None,
2930        )
2931        .await
2932        .expect("failed to create object");
2933        transaction.commit().await.expect("commit failed");
2934
2935        let object_id = {
2936            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2937            transaction.add(
2938                root_store.store_object_id(),
2939                Mutation::replace_or_insert_object(
2940                    ObjectKey::graveyard_attribute_entry(
2941                        root_store.graveyard_directory_object_id(),
2942                        handle.object_id(),
2943                        AttributeId::FSVERITY_MERKLE,
2944                    ),
2945                    ObjectValue::Some,
2946                ),
2947            );
2948
2949            // This write should span three transactions. This test mimics the behavior when the
2950            // last transaction gets interrupted by a filesystem.close().
2951            handle
2952                .write_new_attr_in_batches(
2953                    &mut transaction,
2954                    AttributeId::FSVERITY_MERKLE,
2955                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2956                    WRITE_ATTR_BATCH_SIZE,
2957                )
2958                .await
2959                .expect("failed to write merkle attribute");
2960
2961            handle.object_id()
2962            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2963            // release the transaction locks.
2964        };
2965
2966        fs.close().await.expect("failed to close filesystem");
2967        let device = fs.take_device().await;
2968        device.reopen(false);
2969
2970        let fs =
2971            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2972        fsck(fs.clone()).await.expect("fsck failed");
2973        fs.close().await.expect("failed to close filesystem");
2974        let device = fs.take_device().await;
2975        device.reopen(false);
2976
2977        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2978        let fs = FxFilesystem::open(device).await.expect("open failed");
2979        let root_store = fs.root_store();
2980        let handle =
2981            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2982                .await
2983                .expect("open_object failed");
2984        handle
2985            .enable_verity(fio::VerificationOptions {
2986                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2987                salt: Some(vec![]),
2988                ..Default::default()
2989            })
2990            .await
2991            .expect("set verified file metadata failed");
2992
2993        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2994        // isn't strictly necessary for the test to pass (the graveyard marker was already
2995        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2996        // graveyard entry not being removed upon processing.
2997        fs.graveyard().flush().await;
2998        assert!(
2999            FsVerityDescriptor::from_bytes(
3000                &handle
3001                    .read_attr(AttributeId::FSVERITY_MERKLE)
3002                    .await
3003                    .expect("read_attr failed")
3004                    .expect("No attr found"),
3005                handle.block_size() as usize
3006            )
3007            .is_ok()
3008        );
3009        fsck(fs.clone()).await.expect("fsck failed");
3010        fs.close().await.expect("Close failed");
3011    }
3012
3013    #[fuchsia::test]
3014    async fn test_verify_data_corrupt_file() {
3015        let fs: OpenFxFilesystem = test_filesystem().await;
3016        let mut transaction = fs
3017            .root_store()
3018            .new_transaction(lock_keys![], Options::default())
3019            .await
3020            .expect("new_transaction failed");
3021        let store = fs.root_store();
3022        let object = Arc::new(
3023            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3024                .await
3025                .expect("create_object failed"),
3026        );
3027
3028        transaction.commit().await.unwrap();
3029
3030        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3031        buf.as_mut_slice().fill(123);
3032        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3033
3034        object
3035            .enable_verity(fio::VerificationOptions {
3036                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3037                salt: Some(vec![]),
3038                ..Default::default()
3039            })
3040            .await
3041            .expect("set verified file metadata failed");
3042
3043        // Change file contents and ensure verification fails
3044        buf.as_mut_slice().fill(234);
3045        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3046        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
3047
3048        fs.close().await.expect("Close failed");
3049    }
3050
3051    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
3052    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
3053    // paths.
3054    #[fuchsia::test]
3055    async fn test_parse_f2fs_verity() {
3056        let fs: OpenFxFilesystem = test_filesystem().await;
3057        let mut transaction = fs
3058            .root_store()
3059            .new_transaction(lock_keys![], Options::default())
3060            .await
3061            .expect("new_transaction failed");
3062        let store = fs.root_store();
3063        let object = Arc::new(
3064            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3065                .await
3066                .expect("create_object failed"),
3067        );
3068
3069        transaction.commit().await.unwrap();
3070        let file_size = fs.block_size() * 2;
3071        // Write over one block to make there be leaf hashes.
3072        {
3073            let mut buf = object.allocate_buffer(file_size as usize).await;
3074            buf.as_mut_slice().fill(64);
3075            assert_eq!(
3076                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3077                file_size
3078            );
3079        }
3080
3081        // Enable verity normally, then shift the type.
3082        object
3083            .enable_verity(fio::VerificationOptions {
3084                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3085                salt: Some(vec![]),
3086                ..Default::default()
3087            })
3088            .await
3089            .expect("set verified file metadata failed");
3090        let (verity_info, root_hash) = object.get_descriptor().unwrap();
3091
3092        let mut transaction = fs
3093            .root_store()
3094            .new_transaction(
3095                lock_keys![LockKey::Object {
3096                    store_object_id: store.store_object_id(),
3097                    object_id: object.object_id()
3098                }],
3099                Options::default(),
3100            )
3101            .await
3102            .expect("new_transaction failed");
3103        transaction.add(
3104            store.store_object_id(),
3105            Mutation::replace_or_insert_object(
3106                ObjectKey::attribute(
3107                    object.object_id(),
3108                    AttributeId::DATA,
3109                    AttributeKey::Attribute,
3110                ),
3111                ObjectValue::verified_attribute(
3112                    file_size,
3113                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3114                ),
3115            ),
3116        );
3117        transaction.add(
3118            store.store_object_id(),
3119            Mutation::replace_or_insert_object(
3120                ObjectKey::attribute(
3121                    object.object_id(),
3122                    AttributeId::FSVERITY_MERKLE,
3123                    AttributeKey::Attribute,
3124                ),
3125                ObjectValue::attribute(fs.block_size() * 2, false),
3126            ),
3127        );
3128        {
3129            let descriptor = FsVerityDescriptorRaw::new(
3130                fio::HashAlgorithm::Sha256,
3131                fs.block_size(),
3132                file_size,
3133                root_hash.as_slice(),
3134                match &verity_info.salt {
3135                    Some(salt) => salt.as_slice(),
3136                    None => [0u8; 0].as_slice(),
3137                },
3138            )
3139            .expect("Creating descriptor");
3140            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3141            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3142            object
3143                .multi_write(
3144                    &mut transaction,
3145                    AttributeId::FSVERITY_MERKLE,
3146                    &[fs.block_size()..(fs.block_size() * 2)],
3147                    buf.as_mut(),
3148                )
3149                .await
3150                .expect("Writing descriptor");
3151        }
3152        transaction.commit().await.unwrap();
3153
3154        let handle =
3155            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3156                .await
3157                .expect("open_object failed");
3158
3159        assert!(handle.is_verified_file());
3160
3161        let mut buf = object.allocate_buffer(file_size as usize).await;
3162        assert_eq!(
3163            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3164            file_size as usize
3165        );
3166
3167        fs.close().await.expect("Close failed");
3168    }
3169
3170    #[fuchsia::test]
3171    async fn test_verify_data_corrupt_tree() {
3172        let fs: OpenFxFilesystem = test_filesystem().await;
3173        let object_id = {
3174            let store = fs.root_store();
3175            let mut transaction = fs
3176                .root_store()
3177                .new_transaction(lock_keys![], Options::default())
3178                .await
3179                .expect("new_transaction failed");
3180            let object = Arc::new(
3181                ObjectStore::create_object(
3182                    &store,
3183                    &mut transaction,
3184                    HandleOptions::default(),
3185                    None,
3186                )
3187                .await
3188                .expect("create_object failed"),
3189            );
3190            let object_id = object.object_id();
3191
3192            transaction.commit().await.unwrap();
3193
3194            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3195            buf.as_mut_slice().fill(123);
3196            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3197
3198            object
3199                .enable_verity(fio::VerificationOptions {
3200                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3201                    salt: Some(vec![]),
3202                    ..Default::default()
3203                })
3204                .await
3205                .expect("set verified file metadata failed");
3206            object.read(0, buf.as_mut()).await.expect("verified read");
3207
3208            // Corrupt the merkle tree before closing.
3209            let mut merkle = object
3210                .read_attr(AttributeId::FSVERITY_MERKLE)
3211                .await
3212                .unwrap()
3213                .expect("Reading merkle tree");
3214            merkle[0] = merkle[0].wrapping_add(1);
3215            object
3216                .write_attr(AttributeId::FSVERITY_MERKLE, &*merkle)
3217                .await
3218                .expect("Overwriting merkle");
3219
3220            object_id
3221        }; // Close object.
3222
3223        // Reopening the object should complain about the corrupted merkle tree.
3224        assert!(
3225            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3226                .await
3227                .is_err()
3228        );
3229        fs.close().await.expect("Close failed");
3230    }
3231
3232    #[fuchsia::test]
3233    async fn test_extend() {
3234        let fs = test_filesystem().await;
3235        let handle;
3236        let mut transaction = fs
3237            .root_store()
3238            .new_transaction(lock_keys![], Options::default())
3239            .await
3240            .expect("new_transaction failed");
3241        let store = fs.root_store();
3242        handle =
3243            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3244                .await
3245                .expect("create_object failed");
3246
3247        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3248        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3249        // of 2MiB here.
3250        const START_OFFSET: u64 = 2048 * 1024;
3251        handle
3252            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3253            .await
3254            .expect("extend failed");
3255        transaction.commit().await.expect("commit failed");
3256        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3257        buf.as_mut_slice().fill(123);
3258        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3259        buf.as_mut_slice().fill(67);
3260        handle.read(0, buf.as_mut()).await.expect("read failed");
3261        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3262        fs.close().await.expect("Close failed");
3263    }
3264
3265    #[fuchsia::test]
3266    async fn test_truncate_deallocates_old_extents() {
3267        let (fs, object) = test_filesystem_and_object().await;
3268        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3269        buf.as_mut_slice().fill(0xaa);
3270        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3271
3272        let allocator = fs.allocator();
3273        let allocated_before = allocator.get_allocated_bytes();
3274        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3275        let allocated_after = allocator.get_allocated_bytes();
3276        assert!(
3277            allocated_after < allocated_before,
3278            "before = {} after = {}",
3279            allocated_before,
3280            allocated_after
3281        );
3282        fs.close().await.expect("Close failed");
3283    }
3284
3285    #[fuchsia::test]
3286    async fn test_truncate_zeroes_tail_block() {
3287        let (fs, object) = test_filesystem_and_object().await;
3288
3289        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3290        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3291            .await
3292            .expect("truncate failed");
3293
3294        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3295        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3296        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3297
3298        let mut expected = TEST_DATA.to_vec();
3299        expected[3..].fill(0);
3300        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3301    }
3302
3303    #[fuchsia::test]
3304    async fn test_trim() {
3305        // Format a new filesystem.
3306        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3307        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3308        let block_size = fs.block_size();
3309        root_volume(fs.clone())
3310            .await
3311            .expect("root_volume failed")
3312            .new_volume("test", NewChildStoreOptions::default())
3313            .await
3314            .expect("volume failed");
3315        fs.close().await.expect("close failed");
3316        let device = fs.take_device().await;
3317        device.reopen(false);
3318
3319        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3320        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3321        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3322        // graveyard trims the file as expected.
3323        #[derive(Default)]
3324        struct Context {
3325            store: Option<Arc<ObjectStore>>,
3326            object_id: Option<u64>,
3327        }
3328        let shared_context = Arc::new(Mutex::new(Context::default()));
3329
3330        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3331
3332        // Wait for an object to get tombstoned by the graveyard.
3333        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3334            loop {
3335                if let Err(e) =
3336                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3337                {
3338                    assert!(
3339                        FxfsError::NotFound.matches(&e),
3340                        "open_object didn't fail with NotFound: {:?}",
3341                        e
3342                    );
3343                    break;
3344                }
3345                // The graveyard should eventually tombstone the object.
3346                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3347            }
3348        }
3349
3350        // Checks to see if the object needs to be trimmed.
3351        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3352            let root_directory = Directory::open(store, store.root_directory_object_id())
3353                .await
3354                .expect("open failed");
3355            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3356            if let Some((oid, _, _)) = oid {
3357                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3358                    .await
3359                    .expect("open_object failed");
3360                let props = object.get_properties().await.expect("get_properties failed");
3361                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3362                    Some(object)
3363                } else {
3364                    None
3365                }
3366            } else {
3367                None
3368            }
3369        }
3370
3371        let shared_context_clone = shared_context.clone();
3372        let post_commit = move || {
3373            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3374            let shared_context = shared_context_clone.clone();
3375            async move {
3376                // First run fsck on the current filesystem.
3377                let options = FsckOptions {
3378                    fail_on_warning: true,
3379                    no_lock: true,
3380                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3381                    ..Default::default()
3382                };
3383                let fs = store.filesystem();
3384
3385                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3386                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3387                    .await
3388                    .expect("fsck_volume_with_options failed");
3389
3390                // Now check that we can replay this correctly.
3391                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3392                    .await
3393                    .expect("sync failed");
3394                let device = fs.device().snapshot().expect("snapshot failed");
3395
3396                let object_id = shared_context.lock().object_id.clone();
3397
3398                let fs2 = FxFilesystemBuilder::new()
3399                    .skip_initial_reap(object_id.is_none())
3400                    .open(device)
3401                    .await
3402                    .expect("open failed");
3403
3404                // If the "foo" file exists check that allocated size matches content size.
3405                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3406                let store =
3407                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3408
3409                if let Some(oid) = object_id {
3410                    // For the second pass, the object should get tombstoned.
3411                    expect_tombstoned(&store, oid).await;
3412                } else if let Some(object) = needs_trim(&store).await {
3413                    // Extend the file and make sure that it is correctly trimmed.
3414                    object.truncate(object_size).await.expect("truncate failed");
3415                    let mut buf = object.allocate_buffer(block_size as usize).await;
3416                    object
3417                        .read(object_size - block_size * 2, buf.as_mut())
3418                        .await
3419                        .expect("read failed");
3420                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3421
3422                    // Remount, this time with the graveyard performing an initial reap and the
3423                    // object should get trimmed.
3424                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3425                        .await
3426                        .expect("open failed");
3427                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3428                    let store = root_vol
3429                        .volume("test", StoreOptions::default())
3430                        .await
3431                        .expect("volume failed");
3432                    while needs_trim(&store).await.is_some() {
3433                        // The object has been truncated, but still has some data allocated to
3434                        // it.  The graveyard should trim the object eventually.
3435                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3436                    }
3437
3438                    // Run fsck.
3439                    fsck_with_options(fs.clone(), &options)
3440                        .await
3441                        .expect("fsck_with_options failed");
3442                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3443                        .await
3444                        .expect("fsck_volume_with_options failed");
3445                    fs.close().await.expect("close failed");
3446                }
3447
3448                // Run fsck on fs2.
3449                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3450                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3451                    .await
3452                    .expect("fsck_volume_with_options failed");
3453                fs2.close().await.expect("close failed");
3454            }
3455            .boxed()
3456        };
3457
3458        let fs = FxFilesystemBuilder::new()
3459            .post_commit_hook(post_commit)
3460            .open(device)
3461            .await
3462            .expect("open failed");
3463
3464        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3465        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3466
3467        shared_context.lock().store = Some(store.clone());
3468
3469        let root_directory =
3470            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3471
3472        let object;
3473        let mut transaction = fs
3474            .root_store()
3475            .new_transaction(
3476                lock_keys![LockKey::object(
3477                    store.store_object_id(),
3478                    store.root_directory_object_id()
3479                )],
3480                Options::default(),
3481            )
3482            .await
3483            .expect("new_transaction failed");
3484        object = root_directory
3485            .create_child_file(&mut transaction, "foo")
3486            .await
3487            .expect("create_object failed");
3488        transaction.commit().await.expect("commit failed");
3489
3490        let mut transaction = fs
3491            .root_store()
3492            .new_transaction(
3493                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3494                Options::default(),
3495            )
3496            .await
3497            .expect("new_transaction failed");
3498
3499        // Two passes: first with a regular object, and then with that object moved into the
3500        // graveyard.
3501        let mut pass = 0;
3502        loop {
3503            // Create enough extents in it such that when we truncate the object it will require
3504            // more than one transaction.
3505            let mut buf = object.allocate_buffer(5).await;
3506            buf.as_mut_slice().fill(1);
3507            // Write every other block.
3508            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3509                object
3510                    .txn_write(&mut transaction, offset, buf.as_ref())
3511                    .await
3512                    .expect("write failed");
3513            }
3514            transaction.commit().await.expect("commit failed");
3515            // This should take up more than one transaction.
3516            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3517
3518            if pass == 1 {
3519                break;
3520            }
3521
3522            // Store the object ID so that we can make sure the object is always tombstoned
3523            // after remount (see above).
3524            shared_context.lock().object_id = Some(object.object_id());
3525
3526            transaction = fs
3527                .root_store()
3528                .new_transaction(
3529                    lock_keys![
3530                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3531                        LockKey::object(store.store_object_id(), object.object_id()),
3532                    ],
3533                    Options::default(),
3534                )
3535                .await
3536                .expect("new_transaction failed");
3537
3538            // Move the object into the graveyard.
3539            replace_child(&mut transaction, None, (&root_directory, "foo"))
3540                .await
3541                .expect("replace_child failed");
3542            store.add_to_graveyard(&mut transaction, object.object_id());
3543
3544            pass += 1;
3545        }
3546
3547        fs.close().await.expect("Close failed");
3548    }
3549
3550    #[fuchsia::test]
3551    async fn test_adjust_refs() {
3552        let (fs, object) = test_filesystem_and_object().await;
3553        let store = object.owner();
3554        let mut transaction = fs
3555            .root_store()
3556            .new_transaction(
3557                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3558                Options::default(),
3559            )
3560            .await
3561            .expect("new_transaction failed");
3562        assert_eq!(
3563            store
3564                .adjust_refs(&mut transaction, object.object_id(), 1)
3565                .await
3566                .expect("adjust_refs failed"),
3567            false
3568        );
3569        transaction.commit().await.expect("commit failed");
3570
3571        let allocator = fs.allocator();
3572        let allocated_before = allocator.get_allocated_bytes();
3573        let mut transaction = fs
3574            .root_store()
3575            .new_transaction(
3576                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3577                Options::default(),
3578            )
3579            .await
3580            .expect("new_transaction failed");
3581        assert_eq!(
3582            store
3583                .adjust_refs(&mut transaction, object.object_id(), -2)
3584                .await
3585                .expect("adjust_refs failed"),
3586            true
3587        );
3588        transaction.commit().await.expect("commit failed");
3589
3590        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3591
3592        store
3593            .tombstone_object(
3594                object.object_id(),
3595                Options { borrow_metadata_space: true, ..Default::default() },
3596            )
3597            .await
3598            .expect("purge failed");
3599
3600        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3601
3602        // We need to remove the directory entry, too, otherwise fsck will complain
3603        {
3604            let mut transaction = fs
3605                .root_store()
3606                .new_transaction(
3607                    lock_keys![LockKey::object(
3608                        store.store_object_id(),
3609                        store.root_directory_object_id()
3610                    )],
3611                    Options::default(),
3612                )
3613                .await
3614                .expect("new_transaction failed");
3615            let root_directory = Directory::open(&store, store.root_directory_object_id())
3616                .await
3617                .expect("open failed");
3618            transaction.add(
3619                store.store_object_id(),
3620                Mutation::replace_or_insert_object(
3621                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, DirType::Normal),
3622                    ObjectValue::None,
3623                ),
3624            );
3625            transaction.commit().await.expect("commit failed");
3626        }
3627
3628        fsck_with_options(
3629            fs.clone(),
3630            &FsckOptions {
3631                fail_on_warning: true,
3632                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3633                ..Default::default()
3634            },
3635        )
3636        .await
3637        .expect("fsck_with_options failed");
3638
3639        fs.close().await.expect("Close failed");
3640    }
3641
3642    #[fuchsia::test]
3643    async fn test_locks() {
3644        let (fs, object) = test_filesystem_and_object().await;
3645        let (send1, recv1) = channel();
3646        let (send2, recv2) = channel();
3647        let (send3, recv3) = channel();
3648        let done = Mutex::new(false);
3649        let mut futures = FuturesUnordered::new();
3650        futures.push(
3651            async {
3652                let mut t = object.new_transaction().await.expect("new_transaction failed");
3653                send1.send(()).unwrap(); // Tell the next future to continue.
3654                send3.send(()).unwrap(); // Tell the last future to continue.
3655                recv2.await.unwrap();
3656                let mut buf = object.allocate_buffer(5).await;
3657                buf.as_mut_slice().copy_from_slice(b"hello");
3658                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3659                // This is a halting problem so all we can do is sleep.
3660                fasync::Timer::new(Duration::from_millis(100)).await;
3661                assert!(!*done.lock());
3662                t.commit().await.expect("commit failed");
3663            }
3664            .boxed(),
3665        );
3666        futures.push(
3667            async {
3668                recv1.await.unwrap();
3669                // Reads should not block.
3670                let offset = TEST_DATA_OFFSET as usize;
3671                let align = offset % fs.block_size() as usize;
3672                let len = TEST_DATA.len();
3673                let mut buf = object.allocate_buffer(align + len).await;
3674                assert_eq!(
3675                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3676                    align + TEST_DATA.len()
3677                );
3678                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3679                // Tell the first future to continue.
3680                send2.send(()).unwrap();
3681            }
3682            .boxed(),
3683        );
3684        futures.push(
3685            async {
3686                // This should block until the first future has completed.
3687                recv3.await.unwrap();
3688                let _t = object.new_transaction().await.expect("new_transaction failed");
3689                let mut buf = object.allocate_buffer(5).await;
3690                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3691                assert_eq!(buf.as_slice(), b"hello");
3692            }
3693            .boxed(),
3694        );
3695        while let Some(()) = futures.next().await {}
3696        fs.close().await.expect("Close failed");
3697    }
3698
3699    #[fuchsia::test(threads = 10)]
3700    async fn test_racy_reads() {
3701        let fs = test_filesystem().await;
3702        let object;
3703        let mut transaction = fs
3704            .root_store()
3705            .new_transaction(lock_keys![], Options::default())
3706            .await
3707            .expect("new_transaction failed");
3708        let store = fs.root_store();
3709        object = Arc::new(
3710            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3711                .await
3712                .expect("create_object failed"),
3713        );
3714        transaction.commit().await.expect("commit failed");
3715        for _ in 0..100 {
3716            let cloned_object = object.clone();
3717            let writer = fasync::Task::spawn(async move {
3718                let mut buf = cloned_object.allocate_buffer(10).await;
3719                buf.as_mut_slice().fill(123);
3720                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3721            });
3722            let cloned_object = object.clone();
3723            let reader = fasync::Task::spawn(async move {
3724                let wait_time = rand::random_range(0..5);
3725                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3726                let mut buf = cloned_object.allocate_buffer(10).await;
3727                buf.as_mut_slice().fill(23);
3728                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3729                // If we succeed in reading data, it must include the write; i.e. if we see the size
3730                // change, we should see the data too.  For this to succeed it requires locking on
3731                // the read size to ensure that when we read the size, we get the extents changed in
3732                // that same transaction.
3733                if amount != 0 {
3734                    assert_eq!(amount, 10);
3735                    assert_eq!(buf.as_slice(), &[123; 10]);
3736                }
3737            });
3738            writer.await;
3739            reader.await;
3740            object.truncate(0).await.expect("truncate failed");
3741        }
3742        fs.close().await.expect("Close failed");
3743    }
3744
3745    #[fuchsia::test]
3746    async fn test_allocated_size() {
3747        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3748
3749        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3750        let mut buf = object.allocate_buffer(5).await;
3751        buf.as_mut_slice().copy_from_slice(b"hello");
3752        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3753        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3754        assert_eq!(after, before + fs.block_size() as u64);
3755
3756        // Do the same write again and there should be no change.
3757        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3758        assert_eq!(
3759            object.get_properties().await.expect("get_properties failed").allocated_size,
3760            after
3761        );
3762
3763        // extend...
3764        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3765        let offset = 1000 * fs.block_size() as u64;
3766        let before = after;
3767        object
3768            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3769            .await
3770            .expect("extend failed");
3771        transaction.commit().await.expect("commit failed");
3772        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3773        assert_eq!(after, before + fs.block_size() as u64);
3774
3775        // truncate...
3776        let before = after;
3777        let size = object.get_size();
3778        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3779        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3780        assert_eq!(after, before - fs.block_size() as u64);
3781
3782        // preallocate_range...
3783        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3784        let before = after;
3785        let mut file_range = offset..offset + fs.block_size() as u64;
3786        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3787        transaction.commit().await.expect("commit failed");
3788        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3789        assert_eq!(after, before + fs.block_size() as u64);
3790        fs.close().await.expect("Close failed");
3791    }
3792
3793    #[fuchsia::test(threads = 10)]
3794    async fn test_zero() {
3795        let (fs, object) = test_filesystem_and_object().await;
3796        let expected_size = object.get_size();
3797        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3798        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3799        transaction.commit().await.expect("commit failed");
3800        assert_eq!(object.get_size(), expected_size);
3801        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3802        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3803        assert_eq!(
3804            &buf.as_slice()[0..expected_size as usize],
3805            vec![0u8; expected_size as usize].as_slice()
3806        );
3807        fs.close().await.expect("Close failed");
3808    }
3809
3810    #[fuchsia::test]
3811    async fn test_properties() {
3812        let (fs, object) = test_filesystem_and_object().await;
3813        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3814        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3815        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3816
3817        // ObjectProperties can be updated through `update_attributes`.
3818        // `get_properties` should reflect the latest changes.
3819        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3820        object
3821            .update_attributes(
3822                &mut transaction,
3823                Some(&fio::MutableNodeAttributes {
3824                    creation_time: Some(CRTIME.as_nanos()),
3825                    modification_time: Some(MTIME.as_nanos()),
3826                    mode: Some(111),
3827                    gid: Some(222),
3828                    ..Default::default()
3829                }),
3830                None,
3831            )
3832            .await
3833            .expect("update_attributes failed");
3834        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3835        object
3836            .update_attributes(
3837                &mut transaction,
3838                Some(&fio::MutableNodeAttributes {
3839                    modification_time: Some(MTIME_NEW.as_nanos()),
3840                    gid: Some(333),
3841                    rdev: Some(444),
3842                    ..Default::default()
3843                }),
3844                Some(CTIME),
3845            )
3846            .await
3847            .expect("update_timestamps failed");
3848        transaction.commit().await.expect("commit failed");
3849
3850        let properties = object.get_properties().await.expect("get_properties failed");
3851        assert_matches!(
3852            properties,
3853            ObjectProperties {
3854                refs: 1u64,
3855                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3856                data_attribute_size: TEST_OBJECT_SIZE,
3857                creation_time: CRTIME,
3858                modification_time: MTIME_NEW,
3859                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3860                change_time: CTIME,
3861                ..
3862            }
3863        );
3864        fs.close().await.expect("Close failed");
3865    }
3866
3867    #[fuchsia::test]
3868    async fn test_is_allocated() {
3869        let (fs, object) = test_filesystem_and_object().await;
3870
3871        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3872        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3873        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3874        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3875
3876        // Check for the case where where we have the following extent layout
3877        //       [ unallocated ][ `TEST_DATA` ]
3878        // The extents before `aligned_offset` should not be allocated
3879        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3880        assert_eq!(count, aligned_offset);
3881        assert_eq!(allocated, false);
3882
3883        let (allocated, count) =
3884            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3885        assert_eq!(count, aligned_length);
3886        assert_eq!(allocated, true);
3887
3888        // Check for the case where where we query out of range
3889        let end = aligned_offset + aligned_length;
3890        object
3891            .is_allocated(end)
3892            .await
3893            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3894
3895        // Check for the case where where we start querying for allocation starting from
3896        // an allocated range to the end of the device
3897        let size = 50 * fs.block_size() as u64;
3898        object.truncate(size).await.expect("extend failed");
3899
3900        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3901        assert_eq!(count, size - end);
3902        assert_eq!(allocated, false);
3903
3904        // Check for the case where where we have the following extent layout
3905        //      [ unallocated ][ `buf` ][ `buf` ]
3906        let buf_length = 5 * fs.block_size();
3907        let mut buf = object.allocate_buffer(buf_length as usize).await;
3908        buf.as_mut_slice().fill(123);
3909        let new_offset = end + 20 * fs.block_size() as u64;
3910        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3911        object
3912            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3913            .await
3914            .expect("write failed");
3915
3916        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3917        assert_eq!(count, new_offset - end);
3918        assert_eq!(allocated, false);
3919
3920        let (allocated, count) =
3921            object.is_allocated(new_offset).await.expect("is_allocated failed");
3922        assert_eq!(count, 2 * buf_length);
3923        assert_eq!(allocated, true);
3924
3925        // Check the case where we query from the middle of an extent
3926        let (allocated, count) = object
3927            .is_allocated(new_offset + 4 * fs.block_size())
3928            .await
3929            .expect("is_allocated failed");
3930        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3931        assert_eq!(allocated, true);
3932
3933        // Now, write buffer to a location already written to.
3934        // Check for the case when we the following extent layout
3935        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3936        let other_buf_length = 3 * fs.block_size();
3937        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3938        other_buf.as_mut_slice().fill(231);
3939        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3940
3941        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3942        // allocated from `new_offset`
3943        let (allocated, count) =
3944            object.is_allocated(new_offset).await.expect("is_allocated failed");
3945        assert_eq!(count, 2 * buf_length);
3946        assert_eq!(allocated, true);
3947
3948        // Check for the case when we the following extent layout
3949        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3950        // Mark TEST_DATA as deleted
3951        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3952        object
3953            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3954            .await
3955            .expect("zero failed");
3956        // Mark `other_buf` as deleted
3957        object
3958            .zero(&mut transaction, new_offset..new_offset + buf_length)
3959            .await
3960            .expect("zero failed");
3961        transaction.commit().await.expect("commit transaction failed");
3962
3963        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3964        assert_eq!(count, new_offset + buf_length);
3965        assert_eq!(allocated, false);
3966
3967        let (allocated, count) =
3968            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3969        assert_eq!(count, buf_length);
3970        assert_eq!(allocated, true);
3971
3972        let new_end = new_offset + buf_length + count;
3973
3974        // Check for the case where there are objects with different keys.
3975        // Case that we're checking for:
3976        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3977        let store = object.owner();
3978        let mut transaction = fs
3979            .root_store()
3980            .new_transaction(lock_keys![], Options::default())
3981            .await
3982            .expect("new_transaction failed");
3983        let object2 =
3984            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3985                .await
3986                .expect("create_object failed");
3987        transaction.commit().await.expect("commit failed");
3988
3989        object2
3990            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3991            .await
3992            .expect("write failed");
3993
3994        // Expecting that the extent with a different key is treated like unallocated extent
3995        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3996        assert_eq!(count, size - new_end);
3997        assert_eq!(allocated, false);
3998
3999        fs.close().await.expect("close failed");
4000    }
4001
4002    #[fuchsia::test(threads = 10)]
4003    async fn test_read_write_attr() {
4004        let (_fs, object) = test_filesystem_and_object().await;
4005        let data = [0xffu8; 16_384];
4006        object.write_attr(AttributeId(20), &data).await.expect("write_attr failed");
4007        let rdata = object
4008            .read_attr(AttributeId(20))
4009            .await
4010            .expect("read_attr failed")
4011            .expect("no attribute data found");
4012        assert_eq!(&data[..], &rdata[..]);
4013
4014        assert_eq!(object.read_attr(AttributeId(21)).await.expect("read_attr failed"), None);
4015    }
4016
4017    #[fuchsia::test(threads = 10)]
4018    async fn test_allocate_basic() {
4019        let (fs, object) = test_filesystem_and_empty_object().await;
4020        let block_size = fs.block_size();
4021        let file_size = block_size * 10;
4022        object.truncate(file_size).await.unwrap();
4023
4024        let small_buf_size = 1024;
4025        let large_buf_aligned_size = block_size as usize * 2;
4026        let large_buf_size = block_size as usize * 2 + 1024;
4027
4028        let mut small_buf = object.allocate_buffer(small_buf_size).await;
4029        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
4030        let mut large_buf = object.allocate_buffer(large_buf_size).await;
4031
4032        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
4033        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
4034        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
4035        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
4036        assert_eq!(
4037            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
4038            large_buf_aligned_size
4039        );
4040        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
4041
4042        // Allocation succeeds, and without any writes to the location it shows up as zero.
4043        object.allocate(block_size..block_size * 3).await.unwrap();
4044
4045        // Test starting before, inside, and after the allocated section with every sized buffer.
4046        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
4047            for offset in 0..4 {
4048                assert_eq!(
4049                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
4050                    buf.len(),
4051                    "buf_index: {}, read offset: {}",
4052                    buf_index,
4053                    offset,
4054                );
4055                assert_eq!(
4056                    buf.as_slice(),
4057                    &vec![0; buf.len()],
4058                    "buf_index: {}, read offset: {}",
4059                    buf_index,
4060                    offset,
4061                );
4062            }
4063        }
4064
4065        fs.close().await.expect("close failed");
4066    }
4067
4068    #[fuchsia::test(threads = 10)]
4069    async fn test_allocate_extends_file() {
4070        const BUF_SIZE: usize = 1024;
4071        let (fs, object) = test_filesystem_and_empty_object().await;
4072        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4073        let block_size = fs.block_size();
4074
4075        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4076        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4077
4078        assert!(TEST_OBJECT_SIZE < block_size * 4);
4079        // Allocation succeeds, and without any writes to the location it shows up as zero.
4080        object.allocate(0..block_size * 4).await.unwrap();
4081        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4082        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4083        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4084        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4085        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4086        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4087
4088        fs.close().await.expect("close failed");
4089    }
4090
4091    #[fuchsia::test(threads = 10)]
4092    async fn test_allocate_past_end() {
4093        const BUF_SIZE: usize = 1024;
4094        let (fs, object) = test_filesystem_and_empty_object().await;
4095        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4096        let block_size = fs.block_size();
4097
4098        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4099        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4100
4101        assert!(TEST_OBJECT_SIZE < block_size * 4);
4102        // Allocation succeeds, and without any writes to the location it shows up as zero.
4103        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4104        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4105        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4106        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4107        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4108        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4109        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4110
4111        fs.close().await.expect("close failed");
4112    }
4113
4114    #[fuchsia::test(threads = 10)]
4115    async fn test_allocate_read_attr() {
4116        let (fs, object) = test_filesystem_and_empty_object().await;
4117        let block_size = fs.block_size();
4118        let file_size = block_size * 4;
4119        object.truncate(file_size).await.unwrap();
4120
4121        let content = object
4122            .read_attr(object.attribute_id())
4123            .await
4124            .expect("failed to read attr")
4125            .expect("attr returned none");
4126        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4127
4128        object.allocate(block_size..block_size * 3).await.unwrap();
4129
4130        let content = object
4131            .read_attr(object.attribute_id())
4132            .await
4133            .expect("failed to read attr")
4134            .expect("attr returned none");
4135        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4136
4137        fs.close().await.expect("close failed");
4138    }
4139
4140    #[fuchsia::test(threads = 10)]
4141    async fn test_allocate_existing_data() {
4142        struct Case {
4143            written_ranges: Vec<Range<usize>>,
4144            allocate_range: Range<u64>,
4145        }
4146        let cases = [
4147            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4148            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4149            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4150            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4151            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4152            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4153            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4154        ];
4155
4156        for case in cases {
4157            let (fs, object) = test_filesystem_and_empty_object().await;
4158            let block_size = fs.block_size();
4159            let file_size = block_size * 10;
4160            object.truncate(file_size).await.unwrap();
4161
4162            for write in &case.written_ranges {
4163                let write_len = (write.end - write.start) * block_size as usize;
4164                let mut write_buf = object.allocate_buffer(write_len).await;
4165                write_buf.as_mut_slice().fill(0xff);
4166                assert_eq!(
4167                    object
4168                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4169                        .await
4170                        .unwrap(),
4171                    file_size
4172                );
4173            }
4174
4175            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4176            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4177
4178            object
4179                .allocate(
4180                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4181                )
4182                .await
4183                .unwrap();
4184
4185            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4186            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4187            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4188
4189            fs.close().await.expect("close failed");
4190        }
4191    }
4192
4193    async fn get_modes(
4194        obj: &DataObjectHandle<ObjectStore>,
4195        mut search_range: Range<u64>,
4196    ) -> Vec<(Range<u64>, ExtentMode)> {
4197        let mut modes = Vec::new();
4198        let store = obj.store();
4199        let tree = store.tree();
4200        let layer_set = tree.layer_set();
4201        let mut merger = layer_set.merger();
4202        let mut iter = merger
4203            .query(Query::FullRange(&ObjectKey::attribute(
4204                obj.object_id(),
4205                AttributeId::DATA,
4206                AttributeKey::Extent(Extent::search_key_from_offset(search_range.start)),
4207            )))
4208            .await
4209            .unwrap();
4210        loop {
4211            match iter.get() {
4212                Some(ItemRef {
4213                    key:
4214                        ObjectKey {
4215                            object_id,
4216                            data:
4217                                ObjectKeyData::Attribute(
4218                                    AttributeId::DATA,
4219                                    AttributeKey::Extent(extent),
4220                                ),
4221                        },
4222                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4223                    ..
4224                }) if *object_id == obj.object_id() => {
4225                    if search_range.end <= extent.start {
4226                        break;
4227                    }
4228                    let found_range = std::cmp::max(search_range.start, extent.start)
4229                        ..std::cmp::min(search_range.end, extent.end);
4230                    search_range.start = found_range.end;
4231                    modes.push((found_range, mode.clone()));
4232                    if search_range.start == search_range.end {
4233                        break;
4234                    }
4235                    iter.advance().await.unwrap();
4236                }
4237                x => panic!("looking for extent record, found this {:?}", x),
4238            }
4239        }
4240        modes
4241    }
4242
4243    async fn assert_all_overwrite(
4244        obj: &DataObjectHandle<ObjectStore>,
4245        mut search_range: Range<u64>,
4246    ) {
4247        let modes = get_modes(obj, search_range.clone()).await;
4248        for mode in modes {
4249            assert_eq!(
4250                mode.0.start, search_range.start,
4251                "missing mode in range {}..{}",
4252                search_range.start, mode.0.start
4253            );
4254            match mode.1 {
4255                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4256                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4257            }
4258            assert!(
4259                mode.0.end <= search_range.end,
4260                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4261                search_range,
4262                mode,
4263            );
4264            search_range.start = mode.0.end;
4265        }
4266        assert_eq!(
4267            search_range.start, search_range.end,
4268            "missing mode in range {:?}",
4269            search_range
4270        );
4271    }
4272
4273    #[fuchsia::test(threads = 10)]
4274    async fn test_multi_overwrite() {
4275        #[derive(Debug)]
4276        struct Case {
4277            pre_writes: Vec<Range<usize>>,
4278            allocate_ranges: Vec<Range<u64>>,
4279            overwrites: Vec<Vec<Range<u64>>>,
4280        }
4281        let cases = [
4282            Case {
4283                pre_writes: Vec::new(),
4284                allocate_ranges: vec![1..3],
4285                overwrites: vec![vec![1..3]],
4286            },
4287            Case {
4288                pre_writes: Vec::new(),
4289                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4290                overwrites: vec![vec![0..4]],
4291            },
4292            Case {
4293                pre_writes: Vec::new(),
4294                allocate_ranges: vec![0..4],
4295                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4296            },
4297            Case {
4298                pre_writes: Vec::new(),
4299                allocate_ranges: vec![0..4],
4300                overwrites: vec![vec![3..4]],
4301            },
4302            Case {
4303                pre_writes: Vec::new(),
4304                allocate_ranges: vec![0..4],
4305                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4306            },
4307            Case {
4308                pre_writes: Vec::new(),
4309                allocate_ranges: vec![1..2, 5..6, 7..8],
4310                overwrites: vec![vec![5..6]],
4311            },
4312            Case {
4313                pre_writes: Vec::new(),
4314                allocate_ranges: vec![1..3],
4315                overwrites: vec![
4316                    vec![1..3],
4317                    vec![1..3],
4318                    vec![1..3],
4319                    vec![1..3],
4320                    vec![1..3],
4321                    vec![1..3],
4322                    vec![1..3],
4323                    vec![1..3],
4324                ],
4325            },
4326            Case {
4327                pre_writes: Vec::new(),
4328                allocate_ranges: vec![0..5],
4329                overwrites: vec![
4330                    vec![1..3],
4331                    vec![1..3],
4332                    vec![1..3],
4333                    vec![1..3],
4334                    vec![1..3],
4335                    vec![1..3],
4336                    vec![1..3],
4337                    vec![1..3],
4338                ],
4339            },
4340            Case {
4341                pre_writes: Vec::new(),
4342                allocate_ranges: vec![0..5],
4343                overwrites: vec![vec![0..2, 2..4, 4..5]],
4344            },
4345            Case {
4346                pre_writes: Vec::new(),
4347                allocate_ranges: vec![0..5, 5..10],
4348                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4349            },
4350            Case {
4351                pre_writes: Vec::new(),
4352                allocate_ranges: vec![0..4, 6..10],
4353                overwrites: vec![vec![2..3, 7..9]],
4354            },
4355            Case {
4356                pre_writes: Vec::new(),
4357                allocate_ranges: vec![0..10],
4358                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4359            },
4360            Case {
4361                pre_writes: Vec::new(),
4362                allocate_ranges: vec![0..10],
4363                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4364            },
4365            Case {
4366                pre_writes: vec![1..3],
4367                allocate_ranges: vec![1..3],
4368                overwrites: vec![vec![1..3]],
4369            },
4370            Case {
4371                pre_writes: vec![1..3],
4372                allocate_ranges: vec![4..6],
4373                overwrites: vec![vec![5..6]],
4374            },
4375            Case {
4376                pre_writes: vec![1..3],
4377                allocate_ranges: vec![0..4],
4378                overwrites: vec![vec![0..4]],
4379            },
4380            Case {
4381                pre_writes: vec![1..3],
4382                allocate_ranges: vec![2..4],
4383                overwrites: vec![vec![2..4]],
4384            },
4385            Case {
4386                pre_writes: vec![3..5],
4387                allocate_ranges: vec![1..3, 6..7],
4388                overwrites: vec![vec![1..3, 6..7]],
4389            },
4390            Case {
4391                pre_writes: vec![1..3, 5..7, 8..9],
4392                allocate_ranges: vec![0..5],
4393                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4394            },
4395            Case {
4396                pre_writes: Vec::new(),
4397                allocate_ranges: vec![0..10, 4..6],
4398                overwrites: Vec::new(),
4399            },
4400            Case {
4401                pre_writes: Vec::new(),
4402                allocate_ranges: vec![3..8, 5..10],
4403                overwrites: Vec::new(),
4404            },
4405            Case {
4406                pre_writes: Vec::new(),
4407                allocate_ranges: vec![5..10, 3..8],
4408                overwrites: Vec::new(),
4409            },
4410        ];
4411
4412        for (i, case) in cases.into_iter().enumerate() {
4413            log::info!("running case {} - {:?}", i, case);
4414            let (fs, object) = test_filesystem_and_empty_object().await;
4415            let block_size = fs.block_size();
4416            let file_size = block_size * 10;
4417            object.truncate(file_size).await.unwrap();
4418
4419            for write in case.pre_writes {
4420                let write_len = (write.end - write.start) * block_size as usize;
4421                let mut write_buf = object.allocate_buffer(write_len).await;
4422                write_buf.as_mut_slice().fill(0xff);
4423                assert_eq!(
4424                    object
4425                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4426                        .await
4427                        .unwrap(),
4428                    file_size
4429                );
4430            }
4431
4432            for allocate_range in &case.allocate_ranges {
4433                object
4434                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4435                    .await
4436                    .unwrap();
4437            }
4438
4439            for allocate_range in case.allocate_ranges {
4440                assert_all_overwrite(
4441                    &object,
4442                    allocate_range.start * block_size..allocate_range.end * block_size,
4443                )
4444                .await;
4445            }
4446
4447            for overwrite in case.overwrites {
4448                let mut write_len = 0;
4449                let overwrite = overwrite
4450                    .into_iter()
4451                    .map(|r| {
4452                        write_len += (r.end - r.start) * block_size;
4453                        r.start * block_size..r.end * block_size
4454                    })
4455                    .collect::<Vec<_>>();
4456                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4457                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4458                write_buf.as_mut_slice().copy_from_slice(&data);
4459
4460                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4461                assert_eq!(
4462                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4463                    expected_buf.len()
4464                );
4465                let expected_buf_slice = expected_buf.as_mut_slice();
4466                let mut data_slice = data.as_slice();
4467                for r in &overwrite {
4468                    let len = r.length().unwrap() as usize;
4469                    let (copy_from, rest) = data_slice.split_at(len);
4470                    expected_buf_slice[r.start as usize..r.end as usize]
4471                        .copy_from_slice(&copy_from);
4472                    data_slice = rest;
4473                }
4474
4475                let mut transaction = object.new_transaction().await.unwrap();
4476                object
4477                    .multi_overwrite(
4478                        &mut transaction,
4479                        AttributeId::DATA,
4480                        &overwrite,
4481                        write_buf.as_mut(),
4482                    )
4483                    .await
4484                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4485                // Double check the emitted checksums. We should have one u64 checksum for every
4486                // block we wrote to disk.
4487                let mut checksummed_range_length = 0;
4488                let mut num_checksums = 0;
4489                for (device_range, checksums, _) in transaction.checksums() {
4490                    let range_len = device_range.end - device_range.start;
4491                    let checksums_len = checksums.len() as u64;
4492                    assert_eq!(range_len / checksums_len, block_size);
4493                    checksummed_range_length += range_len;
4494                    num_checksums += checksums_len;
4495                }
4496                assert_eq!(checksummed_range_length, write_len);
4497                assert_eq!(num_checksums, write_len / block_size);
4498                transaction.commit().await.unwrap();
4499
4500                let mut buf = object.allocate_buffer(file_size as usize).await;
4501                assert_eq!(
4502                    object.read(0, buf.as_mut()).await.unwrap(),
4503                    buf.len(),
4504                    "failed length check on case {}",
4505                    i,
4506                );
4507                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4508            }
4509
4510            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4511            fs.close().await.expect("close failed");
4512        }
4513    }
4514
4515    #[fuchsia::test(threads = 10)]
4516    async fn test_multi_overwrite_mode_updates() {
4517        let (fs, object) = test_filesystem_and_empty_object().await;
4518        let block_size = fs.block_size();
4519        let file_size = block_size * 10;
4520        object.truncate(file_size).await.unwrap();
4521
4522        let mut expected_bitmap = BitVec::from_elem(10, false);
4523
4524        object.allocate(0..10 * block_size).await.unwrap();
4525        assert_eq!(
4526            get_modes(&object, 0..10 * block_size).await,
4527            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4528        );
4529
4530        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4531        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4532        write_buf.as_mut_slice().copy_from_slice(&data);
4533        let mut transaction = object.new_transaction().await.unwrap();
4534        object
4535            .multi_overwrite(
4536                &mut transaction,
4537                AttributeId::DATA,
4538                &[2 * block_size..4 * block_size],
4539                write_buf.as_mut(),
4540            )
4541            .await
4542            .unwrap();
4543        transaction.commit().await.unwrap();
4544
4545        expected_bitmap.set(2, true);
4546        expected_bitmap.set(3, true);
4547        assert_eq!(
4548            get_modes(&object, 0..10 * block_size).await,
4549            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4550        );
4551
4552        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4553        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4554        write_buf.as_mut_slice().copy_from_slice(&data);
4555        let mut transaction = object.new_transaction().await.unwrap();
4556        object
4557            .multi_overwrite(
4558                &mut transaction,
4559                AttributeId::DATA,
4560                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4561                write_buf.as_mut(),
4562            )
4563            .await
4564            .unwrap();
4565        transaction.commit().await.unwrap();
4566
4567        expected_bitmap.set(4, true);
4568        expected_bitmap.set(6, true);
4569        assert_eq!(
4570            get_modes(&object, 0..10 * block_size).await,
4571            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4572        );
4573
4574        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4575        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4576        write_buf.as_mut_slice().copy_from_slice(&data);
4577        let mut transaction = object.new_transaction().await.unwrap();
4578        object
4579            .multi_overwrite(
4580                &mut transaction,
4581                AttributeId::DATA,
4582                &[
4583                    0..2 * block_size,
4584                    5 * block_size..6 * block_size,
4585                    7 * block_size..10 * block_size,
4586                ],
4587                write_buf.as_mut(),
4588            )
4589            .await
4590            .unwrap();
4591        transaction.commit().await.unwrap();
4592
4593        assert_eq!(
4594            get_modes(&object, 0..10 * block_size).await,
4595            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4596        );
4597
4598        fs.close().await.expect("close failed");
4599    }
4600
4601    #[fuchsia::test(threads = 10)]
4602    async fn test_check_unwritten_zero() {
4603        let device = DeviceHolder::new(FakeDevice::new(256 * 1024, TEST_DEVICE_BLOCK_SIZE));
4604        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4605        let object = create_object_with_key(fs.clone(), Some(&new_insecure_crypt()), false).await;
4606        let block_size = fs.block_size();
4607
4608        // Set up a file with eight blocks to look like this:
4609        // | None | COW | COW | None | Overwrite(unwritten) | Overwrite(written) | None |
4610        let file_size = block_size * 7;
4611        object.truncate(file_size).await.unwrap();
4612        assert!(object.check_unwritten_zero(0..file_size).await.unwrap());
4613
4614        let mut buffer = object.allocate_buffer(block_size as usize).await;
4615        buffer.as_mut_slice().fill(1);
4616        object.write_or_append(Some(block_size), buffer.as_ref()).await.expect("write failed");
4617        object.write_or_append(Some(block_size * 2), buffer.as_ref()).await.expect("write failed");
4618
4619        object.allocate((block_size * 4)..(block_size * 6)).await.expect("Allocate failed");
4620        let mut transaction = fs
4621            .root_store()
4622            .new_transaction(
4623                lock_keys![LockKey::object(object.store().store_object_id(), object.object_id(),)],
4624                Options::default(),
4625            )
4626            .await
4627            .expect("new_transaction failed");
4628        object
4629            .multi_overwrite(
4630                &mut transaction,
4631                AttributeId::DATA,
4632                &vec![(block_size * 5)..(block_size * 6)],
4633                buffer.as_mut(),
4634            )
4635            .await
4636            .expect("Multi overwrite");
4637        transaction.commit().await.expect("Committing overwrite");
4638
4639        // Anything touching the COW ranges should fail.
4640        assert!(!object.check_unwritten_zero(0..(block_size * 2)).await.unwrap());
4641        assert!(!object.check_unwritten_zero(block_size..(block_size * 3)).await.unwrap());
4642        assert!(!object.check_unwritten_zero((block_size * 2)..(block_size * 4)).await.unwrap());
4643
4644        // This should be fine, as the OverwritePartial should only touch the unwritten block.
4645        assert!(object.check_unwritten_zero((block_size * 3)..(block_size * 5)).await.unwrap());
4646
4647        // These should touch the written overwrite block and fail.
4648        assert!(!object.check_unwritten_zero((block_size * 4)..(block_size * 6)).await.unwrap());
4649        assert!(!object.check_unwritten_zero((block_size * 5)..(block_size * 7)).await.unwrap());
4650
4651        fs.close().await.expect("close failed");
4652    }
4653}