Skip to main content

fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, DirType, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey,
16    ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: u64,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: u64,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> u64 {
194        self.attribute_id
195    }
196
197    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
198        &self.overwrite_ranges
199    }
200
201    pub fn is_verified_file(&self) -> bool {
202        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
203    }
204
205    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
206    /// If another caller has already started but not completed `enabled_verity`, returns
207    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
208    /// FxfsError::AlreadyExists.
209    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
210        let mut fsverity_guard = self.fsverity_state.lock();
211        match *fsverity_guard {
212            FsverityState::None => {
213                *fsverity_guard = FsverityState::Started;
214                Ok(())
215            }
216            FsverityState::Started | FsverityState::Pending(_) => {
217                Err(anyhow!(FxfsError::Unavailable))
218            }
219            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
220        }
221    }
222
223    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
224    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
225    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
226        let mut fsverity_guard = self.fsverity_state.lock();
227        assert!(matches!(*fsverity_guard, FsverityState::Started));
228        *fsverity_guard = FsverityState::Pending(descriptor);
229    }
230
231    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
232    /// not `FsverityState::Pending(_)`.
233    pub fn finalize_fsverity_state(&self) {
234        let mut fsverity_state_guard = self.fsverity_state.lock();
235        let mut_fsverity_state = fsverity_state_guard.deref_mut();
236        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
237        match fsverity_state {
238            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
239            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
240            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
241            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
242        }
243        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
244        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
245        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
246        // converting them back to sparse regions.
247        self.overwrite_ranges.clear();
248    }
249
250    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
251    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
252    /// verified against the root digest here, and will return an error if the tree is not correct.
253    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
254        let (metadata, hasher) = match descriptor {
255            FsverityMetadata::Internal(root_digest, salt) => {
256                let merkle_tree = self
257                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
258                    .await?
259                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
260                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
261                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
262                (metadata, hasher)
263            }
264            FsverityMetadata::F2fs(verity_range) => {
265                let expected_length = verity_range.length()? as usize;
266                let mut buffer = self
267                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
268                    .await;
269                ensure!(
270                    expected_length
271                        == self
272                            .handle
273                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
274                            .await?,
275                    FxfsError::Inconsistent
276                );
277                FsverityStateInner::from_bytes(
278                    buffer.as_slice()[0..expected_length].into(),
279                    self.block_size() as usize,
280                )?
281            }
282        };
283        // Validate the merkle tree data against the root before applying it.
284        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
285        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
286        let mut builder = MerkleTreeBuilder::new(hasher);
287        for leaf in leaf_chunks {
288            builder.push_data_hash(leaf.to_vec());
289        }
290        let tree = builder.finish();
291        let root_hash = match &metadata.root_digest {
292            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
293            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
294        };
295
296        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
297
298        let mut fsverity_guard = self.fsverity_state.lock();
299        assert!(matches!(*fsverity_guard, FsverityState::None));
300        *fsverity_guard = FsverityState::Some(metadata);
301
302        Ok(())
303    }
304
305    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
306    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
307    /// block-aligned. Fails on non fsverity-enabled files.
308    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
309        let block_size = self.block_size() as usize;
310        assert!(offset % block_size == 0);
311        let fsverity_state = self.fsverity_state.lock();
312        match &*fsverity_state {
313            FsverityState::None => {
314                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
315            }
316            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
317                "Enable verity has not yet completed, fsverity state: {:?}",
318                &*fsverity_state
319            )),
320            FsverityState::Some(metadata) => {
321                let hasher = metadata.get_hasher_for_block_size(block_size);
322                let leaf_nodes: Vec<&[u8]> =
323                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
324                fxfs_trace::duration!("fsverity-verify", "len" => buffer.len());
325                // TODO(b/318880297): Consider parallelizing computation.
326                for b in buffer.chunks(block_size) {
327                    ensure!(
328                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
329                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
330                    );
331                    offset += block_size;
332                }
333                Ok(())
334            }
335        }
336    }
337
338    /// Extend the file with the given extent.  The only use case for this right now is for files
339    /// that must exist at certain offsets on the device, such as super-blocks.
340    pub async fn extend<'a>(
341        &'a self,
342        transaction: &mut Transaction<'a>,
343        device_range: Range<u64>,
344    ) -> Result<(), Error> {
345        let old_end =
346            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
347        let new_size = old_end + device_range.end - device_range.start;
348        self.store().allocator().mark_allocated(
349            transaction,
350            self.store().store_object_id(),
351            device_range.clone(),
352        )?;
353        self.txn_update_size(transaction, new_size, None).await?;
354        let key_id = self.get_key(None).await?.0;
355        transaction.add(
356            self.store().store_object_id,
357            Mutation::merge_object(
358                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
359                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
360            ),
361        );
362        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
363    }
364
365    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
366    // the data from `buf`.
367    async fn align_buffer(
368        &self,
369        offset: u64,
370        buf: BufferRef<'_>,
371    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
372        self.handle.align_buffer(self.attribute_id(), offset, buf).await
373    }
374
375    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
376    // data will be encrypted if necessary.
377    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
378    // the buffer in-place rather than copying to another buffer if the write is already aligned.
379    async fn write_at(
380        &self,
381        offset: u64,
382        buf: MutableBufferRef<'_>,
383        device_offset: u64,
384    ) -> Result<MaybeChecksums, Error> {
385        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
386    }
387
388    /// Verifies that the entire range in the file is zeroes, as either uninitialized overwrite
389    /// range, or no extent at all. If a single allocated and written extent is found, this returns
390    /// false.
391    pub async fn check_unwritten_zero(&self, range: Range<u64>) -> Result<bool, Error> {
392        let tree = &self.store().tree();
393        let layer_set = tree.layer_set();
394        let key = ExtentKey { range };
395        let lower_bound = ObjectKey::attribute(
396            self.object_id(),
397            self.attribute_id,
398            AttributeKey::Extent(key.search_key()),
399        );
400        let mut merger = layer_set.merger();
401        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
402        while let Some(ItemRef {
403            key:
404                ObjectKey {
405                    object_id,
406                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
407                },
408            value: ObjectValue::Extent(value),
409            ..
410        }) = iter.get()
411            && *object_id == self.object_id()
412            && *attr_id == self.attribute_id
413        {
414            if let ExtentValue::Some { mode, .. } = value {
415                if let Some(overlap) = key.overlap(extent_key) {
416                    if let ExtentMode::OverwritePartial(bits) = mode {
417                        let starting_index =
418                            (overlap.start - extent_key.range.start) / self.block_size();
419                        for initialized in bits
420                            .iter()
421                            .skip(starting_index as usize)
422                            .take((overlap.length().unwrap() / self.block_size()) as usize)
423                        {
424                            if initialized {
425                                return Ok(false);
426                            }
427                        }
428                    } else {
429                        return Ok(false);
430                    }
431                } else {
432                    break;
433                }
434            }
435            iter.advance().await?;
436        }
437        Ok(true)
438    }
439
440    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
441    pub async fn zero(
442        &self,
443        transaction: &mut Transaction<'_>,
444        range: Range<u64>,
445    ) -> Result<(), Error> {
446        self.handle.zero(transaction, self.attribute_id(), range).await
447    }
448
449    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
450    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
451    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
452    pub fn get_descriptor(&self) -> Option<(fio::VerificationOptions, Vec<u8>)> {
453        let fsverity_state = self.fsverity_state.lock();
454        match &*fsverity_state {
455            FsverityState::Some(metadata) => {
456                let (options, root_hash) = match &metadata.root_digest {
457                    RootDigest::Sha256(root_hash) => (
458                        fio::VerificationOptions {
459                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
460                            salt: Some(metadata.salt.clone()),
461                            ..Default::default()
462                        },
463                        root_hash.to_vec(),
464                    ),
465                    RootDigest::Sha512(root_hash) => (
466                        fio::VerificationOptions {
467                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
468                            salt: Some(metadata.salt.clone()),
469                            ..Default::default()
470                        },
471                        root_hash.clone(),
472                    ),
473                };
474                Some((options, root_hash))
475            }
476            _ => None,
477        }
478    }
479
480    async fn build_verity_tree(
481        &self,
482        hasher: FsVerityHasher,
483        hash_alg: fio::HashAlgorithm,
484        salt: &[u8],
485    ) -> Result<(MerkleTree, Vec<u8>), Error> {
486        let hash_len = hasher.hash_size();
487        let mut builder = MerkleTreeBuilder::new(hasher);
488        let mut offset = 0;
489        let size = self.get_size();
490        // TODO(b/314836822): Consider further tuning the buffer size to optimize
491        // performance. Experimentally, most verity-enabled files are <256K.
492        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
493        while offset < size {
494            // TODO(b/314842875): Consider optimizations for sparse files.
495            let read = self.read(offset, buf.as_mut()).await? as u64;
496            assert!(offset + read <= size);
497            builder.write(&buf.as_slice()[0..read as usize]);
498            offset += read;
499        }
500        let tree = builder.finish();
501        // This will include a block for the root layer, which will be used to house the descriptor.
502        let tree_data_len = tree
503            .as_ref()
504            .iter()
505            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
506            .sum();
507        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
508        // Iterating from the top layers down to the leaves.
509        for layer in tree.as_ref().iter().rev() {
510            // Skip the root layer.
511            if layer.len() <= 1 {
512                continue;
513            }
514            merkle_tree_data.extend(layer.iter().flatten());
515            // Pad to the end of the block.
516            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
517            merkle_tree_data.resize(padded_size, 0);
518        }
519
520        // Zero the last block, then write the descriptor to the start of it.
521        let descriptor_offset = merkle_tree_data.len();
522        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
523        let descriptor = FsVerityDescriptorRaw::new(
524            hash_alg,
525            self.block_size(),
526            self.get_size(),
527            tree.root(),
528            salt,
529        )?;
530        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
531
532        Ok((tree, merkle_tree_data))
533    }
534
535    /// Reads the data attribute and computes a merkle tree from the data. The values of the
536    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
537    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
538    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
539    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
540    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
541    #[trace]
542    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
543        self.set_fsverity_state_started()?;
544        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
545        // the graveyard should process the tombstone before we start rewriting the attribute.
546        if let Some(_) = self
547            .store()
548            .tree()
549            .find(&ObjectKey::graveyard_attribute_entry(
550                self.store().graveyard_directory_object_id(),
551                self.object_id(),
552                FSVERITY_MERKLE_ATTRIBUTE_ID,
553            ))
554            .await?
555        {
556            self.store().filesystem().graveyard().flush().await;
557        }
558        let mut transaction = self.new_transaction().await?;
559        let hash_alg =
560            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
561        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
562        let (root_digest, merkle_tree) = match hash_alg {
563            fio::HashAlgorithm::Sha256 => {
564                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
565                    salt.clone(),
566                    self.block_size() as usize,
567                ));
568                let (tree, merkle_tree_data) =
569                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
570                let root: [u8; 32] = tree.root().try_into().unwrap();
571                (RootDigest::Sha256(root), merkle_tree_data)
572            }
573            fio::HashAlgorithm::Sha512 => {
574                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
575                    salt.clone(),
576                    self.block_size() as usize,
577                ));
578                let (tree, merkle_tree_data) =
579                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
580                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
581            }
582            _ => {
583                bail!(
584                    anyhow!(FxfsError::NotSupported)
585                        .context(format!("hash algorithm not supported"))
586                );
587            }
588        };
589        // TODO(b/314194485): Eventually want streaming writes.
590        // The merkle tree attribute should not require trimming because it should not
591        // exist.
592        self.handle
593            .write_new_attr_in_batches(
594                &mut transaction,
595                FSVERITY_MERKLE_ATTRIBUTE_ID,
596                &merkle_tree,
597                WRITE_ATTR_BATCH_SIZE,
598            )
599            .await?;
600        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
601            self.store().remove_attribute_from_graveyard(
602                &mut transaction,
603                self.object_id(),
604                FSVERITY_MERKLE_ATTRIBUTE_ID,
605            );
606        };
607        let descriptor_decoded =
608            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
609        let descriptor = FsverityStateInner {
610            root_digest: root_digest.clone(),
611            salt: salt.clone(),
612            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
613        };
614        self.set_fsverity_state_pending(descriptor);
615        transaction.add_with_object(
616            self.store().store_object_id(),
617            Mutation::replace_or_insert_object(
618                ObjectKey::attribute(
619                    self.object_id(),
620                    DEFAULT_DATA_ATTRIBUTE_ID,
621                    AttributeKey::Attribute,
622                ),
623                ObjectValue::verified_attribute(
624                    self.get_size(),
625                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
626                ),
627            ),
628            AssocObj::Borrowed(self),
629        );
630        transaction.commit().await?;
631        Ok(())
632    }
633
634    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
635    /// range is beyond the end of the file, the file size is updated.
636    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
637        debug_assert!(range.start < range.end);
638
639        // It's not required that callers of allocate use block aligned ranges, but we need to make
640        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
641        // what was asked for for block alignment purposes. We just need to make sure that the size
642        // of the file is still the non-block-aligned end of the range if the size was changed.
643        let mut new_range = range.clone();
644        new_range.start = round_down(new_range.start, self.block_size());
645        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
646        // required error code when the requested range is larger than the file size.
647        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
648
649        let mut transaction = self.new_transaction().await?;
650        let mut to_allocate = Vec::new();
651        let mut to_switch = Vec::new();
652        let key_id = self.get_key(None).await?.0;
653
654        {
655            let tree = &self.store().tree;
656            let layer_set = tree.layer_set();
657            let offset_key = ObjectKey::attribute(
658                self.object_id(),
659                self.attribute_id(),
660                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
661            );
662            let mut merger = layer_set.merger();
663            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
664
665            loop {
666                match iter.get() {
667                    Some(ItemRef {
668                        key:
669                            ObjectKey {
670                                object_id,
671                                data:
672                                    ObjectKeyData::Attribute(
673                                        attribute_id,
674                                        AttributeKey::Extent(extent_key),
675                                    ),
676                            },
677                        value: ObjectValue::Extent(extent_value),
678                        ..
679                    }) if *object_id == self.object_id()
680                        && *attribute_id == self.attribute_id() =>
681                    {
682                        // If the start of this extent is beyond the end of the range we are
683                        // allocating, we don't have any more work to do.
684                        if new_range.end <= extent_key.range.start {
685                            break;
686                        }
687                        // Add any prefix we might need to allocate.
688                        if new_range.start < extent_key.range.start {
689                            to_allocate.push(new_range.start..extent_key.range.start);
690                            new_range.start = extent_key.range.start;
691                        }
692                        let device_offset = match extent_value {
693                            ExtentValue::None => {
694                                // If the extent value is None, it indicates a deleted extent. In
695                                // that case, we just skip it entirely. By keeping the new_range
696                                // where it is, this section will get included in the new
697                                // allocations.
698                                iter.advance().await?;
699                                continue;
700                            }
701                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
702                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
703                                // If this extent is already in overwrite mode, we can skip it.
704                                if extent_key.range.end < new_range.end {
705                                    new_range.start = extent_key.range.end;
706                                    iter.advance().await?;
707                                    continue;
708                                } else {
709                                    new_range.start = new_range.end;
710                                    break;
711                                }
712                            }
713                            ExtentValue::Some { device_offset, .. } => *device_offset,
714                        };
715
716                        // Figure out how we have to break up the ranges.
717                        let device_offset =
718                            device_offset + (new_range.start - extent_key.range.start);
719                        if extent_key.range.end < new_range.end {
720                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
721                            new_range.start = extent_key.range.end;
722                        } else {
723                            to_switch.push((new_range.start..new_range.end, device_offset));
724                            new_range.start = new_range.end;
725                            break;
726                        }
727                    }
728                    // The records are sorted so if we find something that isn't an extent or
729                    // doesn't match the object id then there are no more extent records for this
730                    // object.
731                    _ => break,
732                }
733                iter.advance().await?;
734            }
735        }
736
737        if new_range.start < new_range.end {
738            to_allocate.push(new_range.clone());
739        }
740
741        // We can update the size in the first transaction because even if subsequent transactions
742        // don't get replayed, the data between the current and new end of the file will be zero
743        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
744        // in the first transaction, overwrite extents may be written past the end of the file
745        // which is an fsck error.
746        //
747        // The potential new size needs to be the non-block-aligned range end - we round up to the
748        // nearest block size for the actual allocation, but shouldn't do that for the file size.
749        let new_size = std::cmp::max(range.end, self.get_size());
750        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
751        // first transaction, in case we split transactions. This makes it okay to only replay the
752        // first transaction if power loss occurs - the file will be in an unusual state, but not
753        // an invalid one, if only part of the allocate goes through.
754        transaction.add_with_object(
755            self.store().store_object_id(),
756            Mutation::replace_or_insert_object(
757                ObjectKey::attribute(
758                    self.object_id(),
759                    self.attribute_id(),
760                    AttributeKey::Attribute,
761                ),
762                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
763            ),
764            AssocObj::Borrowed(self),
765        );
766
767        // The maximum number of mutations we are going to allow per transaction in allocate. This
768        // is probably quite a bit lower than the actual limit, but it should be large enough to
769        // handle most non-edge-case versions of allocate without splitting the transaction.
770        const MAX_TRANSACTION_SIZE: usize = 256;
771        for (switch_range, device_offset) in to_switch {
772            transaction.add_with_object(
773                self.store().store_object_id(),
774                Mutation::merge_object(
775                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
776                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
777                        device_offset,
778                        key_id,
779                    )),
780                ),
781                AssocObj::Borrowed(self),
782            );
783            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
784                transaction.commit_and_continue().await?;
785            }
786        }
787
788        let mut allocated = 0;
789        let allocator = self.store().allocator();
790        for mut allocate_range in to_allocate {
791            while allocate_range.start < allocate_range.end {
792                let device_range = allocator
793                    .allocate(
794                        &mut transaction,
795                        self.store().store_object_id(),
796                        allocate_range.end - allocate_range.start,
797                    )
798                    .await
799                    .context("allocation failed")?;
800                let device_range_len = device_range.end - device_range.start;
801
802                transaction.add_with_object(
803                    self.store().store_object_id(),
804                    Mutation::merge_object(
805                        ObjectKey::extent(
806                            self.object_id(),
807                            self.attribute_id(),
808                            allocate_range.start..allocate_range.start + device_range_len,
809                        ),
810                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
811                            device_range.start,
812                            (device_range_len / self.block_size()) as usize,
813                            key_id,
814                        )),
815                    ),
816                    AssocObj::Borrowed(self),
817                );
818
819                allocate_range.start += device_range_len;
820                allocated += device_range_len;
821
822                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
823                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
824                    transaction.commit_and_continue().await?;
825                    allocated = 0;
826                }
827            }
828        }
829
830        self.update_allocated_size(&mut transaction, allocated, 0).await?;
831        transaction.commit().await?;
832
833        Ok(())
834    }
835
836    /// Return information on a contiguous set of extents that has the same allocation status,
837    /// starting from `start_offset`. The information returned is if this set of extents are marked
838    /// allocated/not allocated and also the size of this set (in bytes). This is used when
839    /// querying slices for volumes.
840    /// This function expects `start_offset` to be aligned to block size
841    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
842        let block_size = self.block_size();
843        assert_eq!(start_offset % block_size, 0);
844
845        if start_offset > self.get_size() {
846            bail!(FxfsError::OutOfRange)
847        }
848
849        if start_offset == self.get_size() {
850            return Ok((false, 0));
851        }
852
853        let tree = &self.store().tree;
854        let layer_set = tree.layer_set();
855        let offset_key = ObjectKey::attribute(
856            self.object_id(),
857            self.attribute_id(),
858            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
859        );
860        let mut merger = layer_set.merger();
861        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
862
863        let mut allocated = None;
864        let mut end = start_offset;
865
866        loop {
867            // Iterate through the extents, each time setting `end` as the end of the previous
868            // extent
869            match iter.get() {
870                Some(ItemRef {
871                    key:
872                        ObjectKey {
873                            object_id,
874                            data:
875                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
876                        },
877                    value: ObjectValue::Extent(extent_value),
878                    ..
879                }) => {
880                    // Equivalent of getting no extents back
881                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
882                        if allocated == Some(false) || allocated.is_none() {
883                            end = self.get_size();
884                            allocated = Some(false);
885                        }
886                        break;
887                    }
888                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
889                    if extent_key.range.start > end {
890                        // If a previous extent has already been visited and we are tracking an
891                        // allocated set, we are only interested in an extent where the range of the
892                        // current extent follows immediately after the previous one.
893                        if allocated == Some(true) {
894                            break;
895                        } else {
896                            // The gap between the previous `end` and this extent is not allocated
897                            end = extent_key.range.start;
898                            allocated = Some(false);
899                            // Continue this iteration, except now the `end` is set to the end of
900                            // the "previous" extent which is this gap between the start_offset
901                            // and the current extent
902                        }
903                    }
904
905                    // We can assume that from here, the `end` points to the end of a previous
906                    // extent.
907                    match extent_value {
908                        // The current extent has been allocated
909                        ExtentValue::Some { .. } => {
910                            // Stop searching if previous extent was marked deleted
911                            if allocated == Some(false) {
912                                break;
913                            }
914                            allocated = Some(true);
915                        }
916                        // This extent has been marked deleted
917                        ExtentValue::None => {
918                            // Stop searching if previous extent was marked allocated
919                            if allocated == Some(true) {
920                                break;
921                            }
922                            allocated = Some(false);
923                        }
924                    }
925                    end = extent_key.range.end;
926                }
927                // This occurs when there are no extents left
928                None => {
929                    if allocated == Some(false) || allocated.is_none() {
930                        end = self.get_size();
931                        allocated = Some(false);
932                    }
933                    // Otherwise, we were monitoring extents that were allocated, so just exit.
934                    break;
935                }
936                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
937                Some(_) => {}
938            }
939            iter.advance().await?;
940        }
941
942        Ok((allocated.unwrap(), end - start_offset))
943    }
944
945    pub async fn txn_write<'a>(
946        &'a self,
947        transaction: &mut Transaction<'a>,
948        offset: u64,
949        buf: BufferRef<'_>,
950    ) -> Result<(), Error> {
951        if buf.is_empty() {
952            return Ok(());
953        }
954        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
955        self.multi_write(
956            transaction,
957            self.attribute_id(),
958            std::slice::from_ref(&aligned),
959            transfer_buf.as_mut(),
960        )
961        .await?;
962        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
963            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
964        }
965        Ok(())
966    }
967
968    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
969    // if encryption takes place.  The ranges must all be aligned and no change to content size is
970    // applied; the caller is responsible for updating size if required.
971    pub async fn multi_write<'a>(
972        &'a self,
973        transaction: &mut Transaction<'a>,
974        attribute_id: u64,
975        ranges: &[Range<u64>],
976        buf: MutableBufferRef<'_>,
977    ) -> Result<(), Error> {
978        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
979    }
980
981    // `buf` is mutable as an optimization, since the write may require encryption, we can
982    // encrypt the buffer in-place rather than copying to another buffer if the write is
983    // already aligned.
984    //
985    // Note: in the event of power failure during an overwrite() call, it is possible that
986    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
987    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
988    pub async fn overwrite(
989        &self,
990        mut offset: u64,
991        mut buf: MutableBufferRef<'_>,
992        options: OverwriteOptions,
993    ) -> Result<(), Error> {
994        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
995        let end = offset + buf.len() as u64;
996
997        let key_id = self.get_key(None).await?.0;
998
999        // The transaction only ends up being used if allow_allocations is true
1000        let mut transaction =
1001            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
1002
1003        // We build up a list of writes to perform later
1004        let writes = FuturesUnordered::new();
1005
1006        if options.barrier_on_first_write {
1007            self.store().device.barrier();
1008        }
1009
1010        // We create a new scope here, so that the merger iterator will get dropped before we try to
1011        // commit our transaction. Otherwise the transaction commit would block.
1012        {
1013            let store = self.store();
1014            let store_object_id = store.store_object_id;
1015            let allocator = store.allocator();
1016            let tree = &store.tree;
1017            let layer_set = tree.layer_set();
1018            let mut merger = layer_set.merger();
1019            let mut iter = merger
1020                .query(Query::FullRange(&ObjectKey::attribute(
1021                    self.object_id(),
1022                    self.attribute_id(),
1023                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
1024                )))
1025                .await?;
1026            let block_size = self.block_size();
1027
1028            loop {
1029                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
1030                    Some(ItemRef {
1031                        key:
1032                            ObjectKey {
1033                                object_id,
1034                                data:
1035                                    ObjectKeyData::Attribute(
1036                                        attribute_id,
1037                                        AttributeKey::Extent(ExtentKey { range }),
1038                                    ),
1039                            },
1040                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
1041                        ..
1042                    }) if *object_id == self.object_id()
1043                        && *attribute_id == self.attribute_id()
1044                        && range.end == offset =>
1045                    {
1046                        iter.advance().await?;
1047                        continue;
1048                    }
1049                    Some(ItemRef {
1050                        key:
1051                            ObjectKey {
1052                                object_id,
1053                                data:
1054                                    ObjectKeyData::Attribute(
1055                                        attribute_id,
1056                                        AttributeKey::Extent(ExtentKey { range }),
1057                                    ),
1058                            },
1059                        value,
1060                        ..
1061                    }) if *object_id == self.object_id()
1062                        && *attribute_id == self.attribute_id()
1063                        && range.start <= offset =>
1064                    {
1065                        match value {
1066                            ObjectValue::Extent(ExtentValue::Some {
1067                                device_offset,
1068                                mode: ExtentMode::Raw,
1069                                ..
1070                            }) => {
1071                                ensure!(
1072                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1073                                    FxfsError::Inconsistent
1074                                );
1075                                let offset_within_extent = offset - range.start;
1076                                let remaining_length_of_extent = (range
1077                                    .end
1078                                    .checked_sub(offset)
1079                                    .ok_or(FxfsError::Inconsistent)?)
1080                                    as usize;
1081                                // Yields (device_offset, bytes_to_write, should_advance)
1082                                (
1083                                    device_offset + offset_within_extent,
1084                                    min(buf.len(), remaining_length_of_extent),
1085                                    true,
1086                                )
1087                            }
1088                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1089                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1090                                // a new extent without checksums?
1091                                bail!(
1092                                    "extent from ({},{}) which overlaps offset \
1093                                        {} has the wrong extent mode",
1094                                    range.start,
1095                                    range.end,
1096                                    offset
1097                                )
1098                            }
1099                            _ => {
1100                                bail!(
1101                                    "overwrite failed: extent overlapping offset {} has \
1102                                      unexpected ObjectValue",
1103                                    offset
1104                                )
1105                            }
1106                        }
1107                    }
1108                    maybe_item_ref => {
1109                        if let Some(transaction) = transaction.as_mut() {
1110                            assert_eq!(options.allow_allocations, true);
1111                            assert_eq!(offset % self.block_size(), 0);
1112
1113                            // We are going to make a new extent, but let's check if there is an
1114                            // extent after us. If there is an extent after us, then we don't want
1115                            // our new extent to bump into it...
1116                            let mut bytes_to_allocate =
1117                                round_up(buf.len() as u64, self.block_size())
1118                                    .ok_or(FxfsError::TooBig)?;
1119                            if let Some(ItemRef {
1120                                key:
1121                                    ObjectKey {
1122                                        object_id,
1123                                        data:
1124                                            ObjectKeyData::Attribute(
1125                                                attribute_id,
1126                                                AttributeKey::Extent(ExtentKey { range }),
1127                                            ),
1128                                    },
1129                                ..
1130                            }) = maybe_item_ref
1131                            {
1132                                if *object_id == self.object_id()
1133                                    && *attribute_id == self.attribute_id()
1134                                    && offset < range.start
1135                                {
1136                                    let bytes_until_next_extent = range.start - offset;
1137                                    bytes_to_allocate =
1138                                        min(bytes_to_allocate, bytes_until_next_extent);
1139                                }
1140                            }
1141
1142                            let device_range = allocator
1143                                .allocate(transaction, store_object_id, bytes_to_allocate)
1144                                .await?;
1145                            let device_range_len = device_range.end - device_range.start;
1146                            transaction.add(
1147                                store_object_id,
1148                                Mutation::insert_object(
1149                                    ObjectKey::extent(
1150                                        self.object_id(),
1151                                        self.attribute_id(),
1152                                        offset..offset + device_range_len,
1153                                    ),
1154                                    ObjectValue::Extent(ExtentValue::new_raw(
1155                                        device_range.start,
1156                                        key_id,
1157                                    )),
1158                                ),
1159                            );
1160
1161                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1162
1163                            // Yields (device_offset, bytes_to_write, should_advance)
1164                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1165                        } else {
1166                            bail!(
1167                                "no extent overlapping offset {}, \
1168                                and new allocations are not allowed",
1169                                offset
1170                            )
1171                        }
1172                    }
1173                };
1174                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1175                writes.push(self.write_at(offset, current_buf, device_offset));
1176                if remaining_buf.len() == 0 {
1177                    break;
1178                } else {
1179                    buf = remaining_buf;
1180                    offset += bytes_to_write as u64;
1181                    if should_advance {
1182                        iter.advance().await?;
1183                    }
1184                }
1185            }
1186        }
1187
1188        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1189        // The checksums are being ignored here, but we don't need to know them
1190        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1191
1192        if let Some(mut transaction) = transaction {
1193            assert_eq!(options.allow_allocations, true);
1194            if !transaction.is_empty() {
1195                if end > self.get_size() {
1196                    self.grow(&mut transaction, self.get_size(), end).await?;
1197                }
1198                transaction.commit().await?;
1199            }
1200        }
1201
1202        Ok(())
1203    }
1204
1205    // Within a transaction, the size of the object might have changed, so get the size from there
1206    // if it exists, otherwise, fall back on the cached size.
1207    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1208        transaction
1209            .get_object_mutation(
1210                self.store().store_object_id,
1211                ObjectKey::attribute(
1212                    self.object_id(),
1213                    self.attribute_id(),
1214                    AttributeKey::Attribute,
1215                ),
1216            )
1217            .and_then(|m| {
1218                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1219                    Some(size)
1220                } else {
1221                    None
1222                }
1223            })
1224            .unwrap_or_else(|| self.get_size())
1225    }
1226
1227    pub async fn txn_update_size<'a>(
1228        &'a self,
1229        transaction: &mut Transaction<'a>,
1230        new_size: u64,
1231        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1232        // Some it is set to the value, if None it is left unchanged.
1233        update_has_overwrite_extents: Option<bool>,
1234    ) -> Result<(), Error> {
1235        let key =
1236            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1237        let mut mutation = if let Some(mutation) =
1238            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1239        {
1240            mutation.clone()
1241        } else {
1242            ObjectStoreMutation {
1243                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1244                op: Operation::ReplaceOrInsert,
1245            }
1246        };
1247        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1248            *size = new_size;
1249            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1250                *has_overwrite_extents = update_has_overwrite_extents;
1251            }
1252        } else {
1253            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1254        }
1255        transaction.add_with_object(
1256            self.store().store_object_id(),
1257            Mutation::ObjectStore(mutation),
1258            AssocObj::Borrowed(self),
1259        );
1260        Ok(())
1261    }
1262
1263    async fn update_allocated_size(
1264        &self,
1265        transaction: &mut Transaction<'_>,
1266        allocated: u64,
1267        deallocated: u64,
1268    ) -> Result<(), Error> {
1269        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1270    }
1271
1272    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1273        if self
1274            .overwrite_ranges
1275            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1276        {
1277            // This returns true if there were ranges, but this truncate removed them all, which
1278            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1279            Ok(Some(false))
1280        } else {
1281            Ok(None)
1282        }
1283    }
1284
1285    pub async fn shrink<'a>(
1286        &'a self,
1287        transaction: &mut Transaction<'a>,
1288        size: u64,
1289        update_has_overwrite_extents: Option<bool>,
1290    ) -> Result<NeedsTrim, Error> {
1291        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1292        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1293        Ok(needs_trim)
1294    }
1295
1296    pub async fn grow<'a>(
1297        &'a self,
1298        transaction: &mut Transaction<'a>,
1299        old_size: u64,
1300        size: u64,
1301    ) -> Result<(), Error> {
1302        // Before growing the file, we must make sure that a previous trim has completed.
1303        let store = self.store();
1304        while matches!(
1305            store
1306                .trim_some(
1307                    transaction,
1308                    self.object_id(),
1309                    self.attribute_id(),
1310                    TrimMode::FromOffset(old_size)
1311                )
1312                .await?,
1313            TrimResult::Incomplete
1314        ) {
1315            transaction.commit_and_continue().await?;
1316        }
1317        // We might need to zero out the tail of the old last block.
1318        let block_size = self.block_size();
1319        if old_size % block_size != 0 {
1320            let layer_set = store.tree.layer_set();
1321            let mut merger = layer_set.merger();
1322            let aligned_old_size = round_down(old_size, block_size);
1323            let iter = merger
1324                .query(Query::FullRange(&ObjectKey::extent(
1325                    self.object_id(),
1326                    self.attribute_id(),
1327                    aligned_old_size..aligned_old_size + 1,
1328                )))
1329                .await?;
1330            if let Some(ItemRef {
1331                key:
1332                    ObjectKey {
1333                        object_id,
1334                        data:
1335                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1336                    },
1337                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1338                ..
1339            }) = iter.get()
1340            {
1341                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1342                    let device_offset = device_offset
1343                        .checked_add(aligned_old_size - extent_key.range.start)
1344                        .ok_or(FxfsError::Inconsistent)?;
1345                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1346                    let mut buf = self.allocate_buffer(block_size as usize).await;
1347                    // In the case that this extent is in OverwritePartial mode, there is a
1348                    // possibility that the last block is allocated, but not initialized yet, in
1349                    // which case we don't actually need to bother zeroing out the tail. However,
1350                    // it's not strictly incorrect to change uninitialized data, so we skip the
1351                    // check and blindly do it to keep it simpler here.
1352                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1353                        .await?;
1354                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1355                    self.multi_write(
1356                        transaction,
1357                        *attribute_id,
1358                        &[aligned_old_size..aligned_old_size + block_size],
1359                        buf.as_mut(),
1360                    )
1361                    .await?;
1362                }
1363            }
1364        }
1365        self.txn_update_size(transaction, size, None).await?;
1366        Ok(())
1367    }
1368
1369    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1370    /// Returns a set of device ranges (i.e. potentially multiple extents).
1371    ///
1372    /// It may not be possible to preallocate the entire requested range in one request
1373    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1374    /// we can up to some (arbitrary, internal) limit on transaction size.
1375    ///
1376    /// `file_range.start` is modified to point at the end of the logical range
1377    /// that was preallocated such that repeated calls to `preallocate_range` with new
1378    /// transactions can be used to preallocate ranges of any size.
1379    ///
1380    /// Requested range must be a multiple of block size.
1381    pub async fn preallocate_range<'a>(
1382        &'a self,
1383        transaction: &mut Transaction<'a>,
1384        file_range: &mut Range<u64>,
1385    ) -> Result<Vec<Range<u64>>, Error> {
1386        let block_size = self.block_size();
1387        assert!(file_range.is_aligned(block_size));
1388        assert!(!self.handle.is_encrypted());
1389        let mut ranges = Vec::new();
1390        let tree = &self.store().tree;
1391        let layer_set = tree.layer_set();
1392        let mut merger = layer_set.merger();
1393        let mut iter = merger
1394            .query(Query::FullRange(&ObjectKey::attribute(
1395                self.object_id(),
1396                self.attribute_id(),
1397                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1398            )))
1399            .await?;
1400        let mut allocated = 0;
1401        let key_id = self.get_key(None).await?.0;
1402        'outer: while file_range.start < file_range.end {
1403            let allocate_end = loop {
1404                match iter.get() {
1405                    // Case for allocated extents for the same object that overlap with file_range.
1406                    Some(ItemRef {
1407                        key:
1408                            ObjectKey {
1409                                object_id,
1410                                data:
1411                                    ObjectKeyData::Attribute(
1412                                        attribute_id,
1413                                        AttributeKey::Extent(ExtentKey { range }),
1414                                    ),
1415                            },
1416                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1417                        ..
1418                    }) if *object_id == self.object_id()
1419                        && *attribute_id == self.attribute_id()
1420                        && range.start < file_range.end =>
1421                    {
1422                        ensure!(
1423                            range.is_valid()
1424                                && range.is_aligned(block_size)
1425                                && device_offset % block_size == 0,
1426                            FxfsError::Inconsistent
1427                        );
1428                        // If the start of the requested file_range overlaps with an existing extent...
1429                        if range.start <= file_range.start {
1430                            // Record the existing extent and move on.
1431                            let device_range = device_offset
1432                                .checked_add(file_range.start - range.start)
1433                                .ok_or(FxfsError::Inconsistent)?
1434                                ..device_offset
1435                                    .checked_add(min(range.end, file_range.end) - range.start)
1436                                    .ok_or(FxfsError::Inconsistent)?;
1437                            file_range.start += device_range.end - device_range.start;
1438                            ranges.push(device_range);
1439                            if file_range.start >= file_range.end {
1440                                break 'outer;
1441                            }
1442                            iter.advance().await?;
1443                            continue;
1444                        } else {
1445                            // There's nothing allocated between file_range.start and the beginning
1446                            // of this extent.
1447                            break range.start;
1448                        }
1449                    }
1450                    // Case for deleted extents eclipsed by file_range.
1451                    Some(ItemRef {
1452                        key:
1453                            ObjectKey {
1454                                object_id,
1455                                data:
1456                                    ObjectKeyData::Attribute(
1457                                        attribute_id,
1458                                        AttributeKey::Extent(ExtentKey { range }),
1459                                    ),
1460                            },
1461                        value: ObjectValue::Extent(ExtentValue::None),
1462                        ..
1463                    }) if *object_id == self.object_id()
1464                        && *attribute_id == self.attribute_id()
1465                        && range.end < file_range.end =>
1466                    {
1467                        iter.advance().await?;
1468                    }
1469                    _ => {
1470                        // We can just preallocate the rest.
1471                        break file_range.end;
1472                    }
1473                }
1474            };
1475            let device_range = self
1476                .store()
1477                .allocator()
1478                .allocate(
1479                    transaction,
1480                    self.store().store_object_id(),
1481                    allocate_end - file_range.start,
1482                )
1483                .await
1484                .context("Allocation failed")?;
1485            allocated += device_range.end - device_range.start;
1486            let this_file_range =
1487                file_range.start..file_range.start + device_range.end - device_range.start;
1488            file_range.start = this_file_range.end;
1489            transaction.add(
1490                self.store().store_object_id,
1491                Mutation::merge_object(
1492                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1493                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1494                ),
1495            );
1496            ranges.push(device_range);
1497            // If we didn't allocate all that we requested, we'll loop around and try again.
1498            // ... unless we have filled the transaction. The caller should check file_range.
1499            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1500                break;
1501            }
1502        }
1503        // Update the file size if it changed.
1504        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1505            self.txn_update_size(transaction, file_range.start, None).await?;
1506        }
1507        self.update_allocated_size(transaction, allocated, 0).await?;
1508        Ok(ranges)
1509    }
1510
1511    pub async fn update_attributes<'a>(
1512        &self,
1513        transaction: &mut Transaction<'a>,
1514        node_attributes: Option<&fio::MutableNodeAttributes>,
1515        change_time: Option<Timestamp>,
1516    ) -> Result<(), Error> {
1517        // This codepath is only called by files, whose wrapping key id users cannot directly set
1518        // as per fscrypt.
1519        ensure!(
1520            !matches!(
1521                node_attributes,
1522                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1523            ),
1524            FxfsError::BadPath
1525        );
1526        self.handle.update_attributes(transaction, node_attributes, change_time).await
1527    }
1528
1529    /// Get the default set of transaction options for this object. This is mostly the overall
1530    /// default, modified by any [`HandleOptions`] held by this handle.
1531    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1532        self.handle.default_transaction_options()
1533    }
1534
1535    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1536        self.new_transaction_with_options(self.default_transaction_options()).await
1537    }
1538
1539    pub async fn new_transaction_with_options<'b>(
1540        &self,
1541        options: Options<'b>,
1542    ) -> Result<Transaction<'b>, Error> {
1543        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1544    }
1545
1546    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1547    pub async fn flush_device(&self) -> Result<(), Error> {
1548        self.handle.flush_device().await
1549    }
1550
1551    /// Reads an entire attribute.
1552    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1553        self.handle.read_attr(attribute_id).await
1554    }
1555
1556    /// Writes an entire attribute.  This *always* uses the volume data key.
1557    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1558        // Must be different attribute otherwise cached size gets out of date.
1559        assert_ne!(attribute_id, self.attribute_id());
1560        let store = self.store();
1561        let mut transaction = self.new_transaction().await?;
1562        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1563            transaction.commit_and_continue().await?;
1564            while matches!(
1565                store
1566                    .trim_some(
1567                        &mut transaction,
1568                        self.object_id(),
1569                        attribute_id,
1570                        TrimMode::FromOffset(data.len() as u64),
1571                    )
1572                    .await?,
1573                TrimResult::Incomplete
1574            ) {
1575                transaction.commit_and_continue().await?;
1576            }
1577        }
1578        transaction.commit().await?;
1579        Ok(())
1580    }
1581
1582    async fn read_and_decrypt(
1583        &self,
1584        device_offset: u64,
1585        file_offset: u64,
1586        buffer: MutableBufferRef<'_>,
1587        key_id: u64,
1588    ) -> Result<(), Error> {
1589        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1590    }
1591
1592    /// Truncates a file to a given size (growing/shrinking as required).
1593    ///
1594    /// Nb: Most code will want to call truncate() instead. This method is used
1595    /// to update the super block -- a case where we must borrow metadata space.
1596    pub async fn truncate_with_options(
1597        &self,
1598        options: Options<'_>,
1599        size: u64,
1600    ) -> Result<(), Error> {
1601        let mut transaction = self.new_transaction_with_options(options).await?;
1602        let old_size = self.get_size();
1603        if size == old_size {
1604            return Ok(());
1605        }
1606        if size < old_size {
1607            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1608            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1609                // The file needs to be trimmed.
1610                transaction.commit_and_continue().await?;
1611                let store = self.store();
1612                while matches!(
1613                    store
1614                        .trim_some(
1615                            &mut transaction,
1616                            self.object_id(),
1617                            self.attribute_id(),
1618                            TrimMode::FromOffset(size)
1619                        )
1620                        .await?,
1621                    TrimResult::Incomplete
1622                ) {
1623                    if let Err(error) = transaction.commit_and_continue().await {
1624                        warn!(error:?; "Failed to trim after truncate");
1625                        return Ok(());
1626                    }
1627                }
1628                if let Err(error) = transaction.commit().await {
1629                    warn!(error:?; "Failed to trim after truncate");
1630                }
1631                return Ok(());
1632            }
1633        } else {
1634            self.grow(&mut transaction, old_size, size).await?;
1635        }
1636        transaction.commit().await?;
1637        Ok(())
1638    }
1639
1640    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1641        // We don't take a read guard here since the object properties are contained in a single
1642        // object, which cannot be inconsistent with itself. The LSM tree does not return
1643        // intermediate states for a single object.
1644        let item = self
1645            .store()
1646            .tree
1647            .find(&ObjectKey::object(self.object_id()))
1648            .await?
1649            .expect("Unable to find object record");
1650        match item.value {
1651            ObjectValue::Object {
1652                kind: ObjectKind::File { refs, .. },
1653                attributes:
1654                    ObjectAttributes {
1655                        creation_time,
1656                        modification_time,
1657                        posix_attributes,
1658                        allocated_size,
1659                        access_time,
1660                        change_time,
1661                        ..
1662                    },
1663            } => Ok(ObjectProperties {
1664                refs,
1665                allocated_size,
1666                data_attribute_size: self.get_size(),
1667                creation_time,
1668                modification_time,
1669                access_time,
1670                change_time,
1671                sub_dirs: 0,
1672                posix_attributes,
1673                dir_type: DirType::Normal,
1674            }),
1675            _ => bail!(FxfsError::NotFile),
1676        }
1677    }
1678
1679    // Returns the contents of this object. This object must be < |limit| bytes in size.
1680    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1681        let size = self.get_size();
1682        if size > limit as u64 {
1683            bail!("Object too big ({} > {})", size, limit);
1684        }
1685        let mut buf = self.allocate_buffer(size as usize).await;
1686        self.read(0u64, buf.as_mut()).await?;
1687        Ok(buf.as_slice().into())
1688    }
1689
1690    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1691    /// their logical offset within the file.
1692    ///
1693    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1694    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1695        let mut extents = Vec::new();
1696        let tree = &self.store().tree;
1697        let layer_set = tree.layer_set();
1698        let mut merger = layer_set.merger();
1699        let mut iter = merger
1700            .query(Query::FullRange(&ObjectKey::attribute(
1701                self.object_id(),
1702                self.attribute_id(),
1703                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1704            )))
1705            .await?;
1706        loop {
1707            match iter.get() {
1708                Some(ItemRef {
1709                    key:
1710                        ObjectKey {
1711                            object_id,
1712                            data:
1713                                ObjectKeyData::Attribute(
1714                                    attribute_id,
1715                                    AttributeKey::Extent(ExtentKey { range }),
1716                                ),
1717                        },
1718                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1719                    ..
1720                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1721                    let logical_offset = range.start;
1722                    let device_range = *device_offset..*device_offset + range.length()?;
1723                    extents.push(FileExtent::new(logical_offset, device_range)?);
1724                }
1725                _ => break,
1726            }
1727            iter.advance().await?;
1728        }
1729        Ok(extents)
1730    }
1731}
1732
1733impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1734    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1735        match mutation {
1736            Mutation::ObjectStore(ObjectStoreMutation {
1737                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1738                ..
1739            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1740            Mutation::ObjectStore(ObjectStoreMutation {
1741                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1742                ..
1743            }) => {
1744                debug_assert_eq!(
1745                    self.get_size(),
1746                    *size,
1747                    "size should be set when verity is enabled and must not change"
1748                );
1749                self.finalize_fsverity_state()
1750            }
1751            Mutation::ObjectStore(ObjectStoreMutation {
1752                item:
1753                    ObjectItem {
1754                        key:
1755                            ObjectKey {
1756                                object_id,
1757                                data:
1758                                    ObjectKeyData::Attribute(
1759                                        attr_id,
1760                                        AttributeKey::Extent(ExtentKey { range }),
1761                                    ),
1762                            },
1763                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1764                        ..
1765                    },
1766                ..
1767            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1768                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1769                    self.overwrite_ranges.apply_range(range.clone())
1770                }
1771                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1772            },
1773            _ => {}
1774        }
1775    }
1776}
1777
1778impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1779    fn set_trace(&self, v: bool) {
1780        self.handle.set_trace(v)
1781    }
1782
1783    fn object_id(&self) -> u64 {
1784        self.handle.object_id()
1785    }
1786
1787    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1788        self.handle.allocate_buffer(size)
1789    }
1790
1791    fn block_size(&self) -> u64 {
1792        self.handle.block_size()
1793    }
1794}
1795
1796#[async_trait]
1797impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1798    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1799        let fs = self.store().filesystem();
1800        let guard = fs
1801            .lock_manager()
1802            .read_lock(lock_keys![LockKey::object_attribute(
1803                self.store().store_object_id,
1804                self.object_id(),
1805                self.attribute_id(),
1806            )])
1807            .await;
1808
1809        let size = self.get_size();
1810        if offset >= size {
1811            return Ok(0);
1812        }
1813        let length = min(buf.len() as u64, size - offset) as usize;
1814        buf = buf.subslice_mut(0..length);
1815        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1816        if self.is_verified_file() {
1817            self.verify_data(offset as usize, buf.as_slice())?;
1818        }
1819        Ok(length)
1820    }
1821
1822    fn get_size(&self) -> u64 {
1823        self.content_size.load(atomic::Ordering::Relaxed)
1824    }
1825}
1826
1827impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1828    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1829        let offset = offset.unwrap_or_else(|| self.get_size());
1830        let mut transaction = self.new_transaction().await?;
1831        self.txn_write(&mut transaction, offset, buf).await?;
1832        let new_size = self.txn_get_size(&transaction);
1833        transaction.commit().await?;
1834        Ok(new_size)
1835    }
1836
1837    async fn truncate(&self, size: u64) -> Result<(), Error> {
1838        self.truncate_with_options(self.default_transaction_options(), size).await
1839    }
1840
1841    async fn flush(&self) -> Result<(), Error> {
1842        Ok(())
1843    }
1844}
1845
1846/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1847/// write go directly to the handle in a transaction.
1848pub struct DirectWriter<'a, S: HandleOwner> {
1849    handle: &'a DataObjectHandle<S>,
1850    options: transaction::Options<'a>,
1851    buffer: Buffer<'a>,
1852    offset: u64,
1853    buf_offset: usize,
1854}
1855
1856const BUFFER_SIZE: usize = 1_048_576;
1857
1858impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1859    fn drop(&mut self) {
1860        if self.buf_offset != 0 {
1861            warn!("DirectWriter: dropping data, did you forget to call complete?");
1862        }
1863    }
1864}
1865
1866impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1867    pub async fn new(
1868        handle: &'a DataObjectHandle<S>,
1869        options: transaction::Options<'a>,
1870    ) -> DirectWriter<'a, S> {
1871        Self {
1872            handle,
1873            options,
1874            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1875            offset: 0,
1876            buf_offset: 0,
1877        }
1878    }
1879
1880    async fn flush(&mut self) -> Result<(), Error> {
1881        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1882        self.handle
1883            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1884            .await?;
1885        transaction.commit().await?;
1886        self.offset += self.buf_offset as u64;
1887        self.buf_offset = 0;
1888        Ok(())
1889    }
1890}
1891
1892impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1893    fn block_size(&self) -> u64 {
1894        self.handle.block_size()
1895    }
1896
1897    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1898        while buf.len() > 0 {
1899            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1900            self.buffer
1901                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1902                .as_mut_slice()
1903                .copy_from_slice(&buf[..to_do]);
1904            self.buf_offset += to_do;
1905            if self.buf_offset == BUFFER_SIZE {
1906                self.flush().await?;
1907            }
1908            buf = &buf[to_do..];
1909        }
1910        Ok(())
1911    }
1912
1913    async fn complete(&mut self) -> Result<(), Error> {
1914        self.flush().await?;
1915        Ok(())
1916    }
1917
1918    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1919        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1920            self.buffer
1921                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1922                .as_mut_slice()
1923                .fill(0);
1924            self.buf_offset += amount as usize;
1925        } else {
1926            self.flush().await?;
1927            self.offset += amount;
1928        }
1929        Ok(())
1930    }
1931
1932    /// The number of bytes written to this writer (including unflushed bytes).
1933    fn bytes_written(&self) -> u64 {
1934        self.offset + self.buf_offset as u64
1935    }
1936}
1937
1938#[cfg(test)]
1939mod tests {
1940    use crate::errors::FxfsError;
1941    use crate::filesystem::{
1942        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1943    };
1944    use crate::fsck::{
1945        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1946    };
1947    use crate::lsm_tree::Query;
1948    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1949    use crate::object_handle::{
1950        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1951    };
1952    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1953    use crate::object_store::directory::replace_child;
1954    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1955    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1956    use crate::object_store::volume::root_volume;
1957    use crate::object_store::{
1958        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, DirType, Directory, ExtentKey,
1959        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1960        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1961        TRANSACTION_MUTATION_THRESHOLD,
1962    };
1963    use crate::range::RangeExt;
1964    use crate::round::{round_down, round_up};
1965    use assert_matches::assert_matches;
1966    use bit_vec::BitVec;
1967    use fidl_fuchsia_io as fio;
1968    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1969    use fuchsia_async as fasync;
1970    use fuchsia_sync::Mutex;
1971    use futures::FutureExt;
1972    use futures::channel::oneshot::channel;
1973    use futures::stream::{FuturesUnordered, StreamExt};
1974    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1975    use fxfs_insecure_crypto::new_insecure_crypt;
1976    use std::ops::Range;
1977    use std::sync::Arc;
1978    use std::time::Duration;
1979    use storage_device::DeviceHolder;
1980    use storage_device::fake_device::FakeDevice;
1981
1982    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1983
1984    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1985    // device block.
1986    const TEST_DATA_OFFSET: u64 = 5000;
1987    const TEST_DATA: &[u8] = b"hello";
1988    const TEST_OBJECT_SIZE: u64 = 5678;
1989    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1990    const TEST_OBJECT_NAME: &str = "foo";
1991
1992    async fn test_filesystem() -> OpenFxFilesystem {
1993        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1994        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1995    }
1996
1997    async fn create_object_with_key(
1998        fs: Arc<FxFilesystem>,
1999        crypt: Option<&dyn Crypt>,
2000        write_object_test_data: bool,
2001    ) -> DataObjectHandle<ObjectStore> {
2002        let store = fs.root_store();
2003        let object;
2004
2005        let mut transaction = fs
2006            .clone()
2007            .new_transaction(
2008                lock_keys![LockKey::object(
2009                    store.store_object_id(),
2010                    store.root_directory_object_id()
2011                )],
2012                Options::default(),
2013            )
2014            .await
2015            .expect("new_transaction failed");
2016
2017        object = if let Some(crypt) = crypt {
2018            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
2019            let (key, unwrapped_key) =
2020                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
2021            ObjectStore::create_object_with_key(
2022                &store,
2023                &mut transaction,
2024                object_id,
2025                HandleOptions::default(),
2026                EncryptionKey::Fxfs(key),
2027                unwrapped_key,
2028            )
2029            .await
2030            .expect("create_object failed")
2031        } else {
2032            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2033                .await
2034                .expect("create_object failed")
2035        };
2036
2037        let root_directory =
2038            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2039        root_directory
2040            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2041            .await
2042            .expect("add_child_file failed");
2043
2044        if write_object_test_data {
2045            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
2046            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
2047            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
2048            object
2049                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
2050                .await
2051                .expect("write failed");
2052        }
2053        transaction.commit().await.expect("commit failed");
2054        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2055        object
2056    }
2057
2058    async fn test_filesystem_and_object_with_key(
2059        crypt: Option<&dyn Crypt>,
2060        write_object_test_data: bool,
2061    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2062        let fs = test_filesystem().await;
2063        let object = create_object_with_key(fs.clone(), crypt, write_object_test_data).await;
2064        (fs, object)
2065    }
2066
2067    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2068        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2069    }
2070
2071    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2072    {
2073        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2074    }
2075
2076    #[fuchsia::test]
2077    async fn test_zero_buf_len_read() {
2078        let (fs, object) = test_filesystem_and_object().await;
2079        let mut buf = object.allocate_buffer(0).await;
2080        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2081        fs.close().await.expect("Close failed");
2082    }
2083
2084    #[fuchsia::test]
2085    async fn test_beyond_eof_read() {
2086        let (fs, object) = test_filesystem_and_object().await;
2087        let offset = TEST_OBJECT_SIZE as usize - 2;
2088        let align = offset % fs.block_size() as usize;
2089        let len: usize = 2;
2090        let mut buf = object.allocate_buffer(align + len + 1).await;
2091        buf.as_mut_slice().fill(123u8);
2092        assert_eq!(
2093            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2094            align + len
2095        );
2096        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2097        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2098        fs.close().await.expect("Close failed");
2099    }
2100
2101    #[fuchsia::test]
2102    async fn test_beyond_eof_read_from() {
2103        let (fs, object) = test_filesystem_and_object().await;
2104        let handle = &*object;
2105        let offset = TEST_OBJECT_SIZE as usize - 2;
2106        let align = offset % fs.block_size() as usize;
2107        let len: usize = 2;
2108        let mut buf = object.allocate_buffer(align + len + 1).await;
2109        buf.as_mut_slice().fill(123u8);
2110        assert_eq!(
2111            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2112            align + len
2113        );
2114        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2115        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2116        fs.close().await.expect("Close failed");
2117    }
2118
2119    #[fuchsia::test]
2120    async fn test_beyond_eof_read_unchecked() {
2121        let (fs, object) = test_filesystem_and_object().await;
2122        let offset = TEST_OBJECT_SIZE as usize - 2;
2123        let align = offset % fs.block_size() as usize;
2124        let len: usize = 2;
2125        let mut buf = object.allocate_buffer(align + len + 1).await;
2126        buf.as_mut_slice().fill(123u8);
2127        let guard = fs
2128            .lock_manager()
2129            .read_lock(lock_keys![LockKey::object_attribute(
2130                object.store().store_object_id,
2131                object.object_id(),
2132                0,
2133            )])
2134            .await;
2135        object
2136            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2137            .await
2138            .expect("read failed");
2139        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2140        fs.close().await.expect("Close failed");
2141    }
2142
2143    #[fuchsia::test]
2144    async fn test_read_sparse() {
2145        let (fs, object) = test_filesystem_and_object().await;
2146        // Deliberately read not right to eof.
2147        let len = TEST_OBJECT_SIZE as usize - 1;
2148        let mut buf = object.allocate_buffer(len).await;
2149        buf.as_mut_slice().fill(123u8);
2150        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2151        let mut expected = vec![0; len];
2152        let offset = TEST_DATA_OFFSET as usize;
2153        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2154        assert_eq!(buf.as_slice()[..len], expected[..]);
2155        fs.close().await.expect("Close failed");
2156    }
2157
2158    #[fuchsia::test]
2159    async fn test_read_after_writes_interspersed_with_flush() {
2160        let (fs, object) = test_filesystem_and_object().await;
2161
2162        object.owner().flush().await.expect("flush failed");
2163
2164        // Write more test data to the first block fo the file.
2165        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2166        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2167        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2168
2169        let len = TEST_OBJECT_SIZE as usize - 1;
2170        let mut buf = object.allocate_buffer(len).await;
2171        buf.as_mut_slice().fill(123u8);
2172        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2173
2174        let mut expected = vec![0u8; len];
2175        let offset = TEST_DATA_OFFSET as usize;
2176        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2177        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2178        assert_eq!(buf.as_slice(), &expected);
2179        fs.close().await.expect("Close failed");
2180    }
2181
2182    #[fuchsia::test]
2183    async fn test_read_after_truncate_and_extend() {
2184        let (fs, object) = test_filesystem_and_object().await;
2185
2186        // Arrange for there to be <extent><deleted-extent><extent>.
2187        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2188        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2189        // This adds an extent at 0..512.
2190        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2191        // This deletes 512..1024.
2192        object.truncate(3).await.expect("truncate failed");
2193        let data = b"foo";
2194        let offset = 1500u64;
2195        let align = (offset % fs.block_size() as u64) as usize;
2196        let mut buf = object.allocate_buffer(align + data.len()).await;
2197        buf.as_mut_slice()[align..].copy_from_slice(data);
2198        // This adds 1024..1536.
2199        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2200
2201        const LEN1: usize = 1503;
2202        let mut buf = object.allocate_buffer(LEN1).await;
2203        buf.as_mut_slice().fill(123u8);
2204        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2205        let mut expected = [0; LEN1];
2206        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2207        expected[1500..].copy_from_slice(b"foo");
2208        assert_eq!(buf.as_slice(), &expected);
2209
2210        // Also test a read that ends midway through the deleted extent.
2211        const LEN2: usize = 601;
2212        let mut buf = object.allocate_buffer(LEN2).await;
2213        buf.as_mut_slice().fill(123u8);
2214        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2215        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2216        fs.close().await.expect("Close failed");
2217    }
2218
2219    #[fuchsia::test]
2220    async fn test_read_whole_blocks_with_multiple_objects() {
2221        let (fs, object) = test_filesystem_and_object().await;
2222        let block_size = object.block_size() as usize;
2223        let mut buffer = object.allocate_buffer(block_size).await;
2224        buffer.as_mut_slice().fill(0xaf);
2225        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2226
2227        let store = object.owner();
2228        let mut transaction = fs
2229            .clone()
2230            .new_transaction(lock_keys![], Options::default())
2231            .await
2232            .expect("new_transaction failed");
2233        let object2 =
2234            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2235                .await
2236                .expect("create_object failed");
2237        transaction.commit().await.expect("commit failed");
2238        let mut ef_buffer = object.allocate_buffer(block_size).await;
2239        ef_buffer.as_mut_slice().fill(0xef);
2240        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2241
2242        let mut buffer = object.allocate_buffer(block_size).await;
2243        buffer.as_mut_slice().fill(0xaf);
2244        object
2245            .write_or_append(Some(block_size as u64), buffer.as_ref())
2246            .await
2247            .expect("write failed");
2248        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2249        object2
2250            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2251            .await
2252            .expect("write failed");
2253
2254        let mut buffer = object.allocate_buffer(4 * block_size).await;
2255        buffer.as_mut_slice().fill(123);
2256        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2257        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2258        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2259        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2260        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2261        fs.close().await.expect("Close failed");
2262    }
2263
2264    #[fuchsia::test]
2265    async fn test_alignment() {
2266        let (fs, object) = test_filesystem_and_object().await;
2267
2268        struct AlignTest {
2269            fill: u8,
2270            object: DataObjectHandle<ObjectStore>,
2271            mirror: Vec<u8>,
2272        }
2273
2274        impl AlignTest {
2275            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2276                let mirror = {
2277                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2278                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2279                    buf.as_slice().to_vec()
2280                };
2281                Self { fill: 0, object, mirror }
2282            }
2283
2284            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2285            // operation to an in-memory copy of the object.
2286            // Each subsequent call bumps the value of fill.
2287            // It is expected that the object and its mirror maintain identical content.
2288            async fn test(&mut self, range: Range<u64>) {
2289                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2290                self.fill += 1;
2291                buf.as_mut_slice().fill(self.fill);
2292                self.object
2293                    .write_or_append(Some(range.start), buf.as_ref())
2294                    .await
2295                    .expect("write_or_append failed");
2296                if range.end > self.mirror.len() as u64 {
2297                    self.mirror.resize(range.end as usize, 0);
2298                }
2299                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2300                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2301                assert_eq!(
2302                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2303                    self.mirror.len()
2304                );
2305                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2306            }
2307        }
2308
2309        let block_size = object.block_size() as u64;
2310        let mut align = AlignTest::new(object).await;
2311
2312        // Fill the object to start with (with 1).
2313        align.test(0..2 * block_size + 1).await;
2314
2315        // Unaligned head (fills with 2, overwrites that with 3).
2316        align.test(1..block_size).await;
2317        align.test(1..2 * block_size).await;
2318
2319        // Unaligned tail (fills with 4 and 5).
2320        align.test(0..block_size - 1).await;
2321        align.test(0..2 * block_size - 1).await;
2322
2323        // Both unaligned (fills with 6 and 7).
2324        align.test(1..block_size - 1).await;
2325        align.test(1..2 * block_size - 1).await;
2326
2327        fs.close().await.expect("Close failed");
2328    }
2329
2330    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2331        let allocator = fs.allocator();
2332        let allocated_before = allocator.get_allocated_bytes();
2333        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2334        object
2335            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2336            .await
2337            .expect("preallocate_range failed");
2338        transaction.commit().await.expect("commit failed");
2339        assert!(object.get_size() < 1048576);
2340        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2341        object
2342            .preallocate_range(&mut transaction, &mut (0..1048576))
2343            .await
2344            .expect("preallocate_range failed");
2345        transaction.commit().await.expect("commit failed");
2346        assert_eq!(object.get_size(), 1048576);
2347        // Check that it didn't reallocate the space for the existing extent
2348        let allocated_after = allocator.get_allocated_bytes();
2349        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2350
2351        let mut buf = object
2352            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2353            .await;
2354        buf.as_mut_slice().fill(47);
2355        object
2356            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2357            .await
2358            .expect("write failed");
2359        buf.as_mut_slice().fill(95);
2360        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2361        object
2362            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2363            .await
2364            .expect("write failed");
2365
2366        // Make sure there were no more allocations.
2367        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2368
2369        // Read back the data and make sure it is what we expect.
2370        let mut buf = object.allocate_buffer(104876).await;
2371        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2372        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2373        assert_eq!(
2374            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2375            TEST_DATA
2376        );
2377        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2378    }
2379
2380    #[fuchsia::test]
2381    async fn test_preallocate_range() {
2382        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2383        test_preallocate_common(&fs, object).await;
2384        fs.close().await.expect("Close failed");
2385    }
2386
2387    // This is identical to the previous test except that we flush so that extents end up in
2388    // different layers.
2389    #[fuchsia::test]
2390    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2391        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2392        object.owner().flush().await.expect("flush failed");
2393        test_preallocate_common(&fs, object).await;
2394        fs.close().await.expect("Close failed");
2395    }
2396
2397    #[fuchsia::test]
2398    async fn test_already_preallocated() {
2399        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2400        let allocator = fs.allocator();
2401        let allocated_before = allocator.get_allocated_bytes();
2402        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2403        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2404        object
2405            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2406            .await
2407            .expect("preallocate_range failed");
2408        transaction.commit().await.expect("commit failed");
2409        // Check that it didn't reallocate any new space.
2410        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2411        fs.close().await.expect("Close failed");
2412    }
2413
2414    #[fuchsia::test]
2415    async fn test_overwrite_when_preallocated_at_start_of_file() {
2416        // The standard test data we put in the test object would cause an extent with checksums
2417        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2418        let (fs, object) = test_filesystem_and_empty_object().await;
2419
2420        let object = ObjectStore::open_object(
2421            object.owner(),
2422            object.object_id(),
2423            HandleOptions::default(),
2424            None,
2425        )
2426        .await
2427        .expect("open_object failed");
2428
2429        assert_eq!(fs.block_size(), 4096);
2430
2431        let mut write_buf = object.allocate_buffer(4096).await;
2432        write_buf.as_mut_slice().fill(95);
2433
2434        // First try to overwrite without allowing allocations
2435        // We expect this to fail, since nothing is allocated yet
2436        object
2437            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2438            .await
2439            .expect_err("overwrite succeeded");
2440
2441        // Now preallocate some space (exactly one block)
2442        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2443        object
2444            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2445            .await
2446            .expect("preallocate_range failed");
2447        transaction.commit().await.expect("commit failed");
2448
2449        // Now try the same overwrite command as before, it should work this time,
2450        // even with allocations disabled...
2451        {
2452            let mut read_buf = object.allocate_buffer(4096).await;
2453            object.read(0, read_buf.as_mut()).await.expect("read failed");
2454            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2455        }
2456        object
2457            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2458            .await
2459            .expect("overwrite failed");
2460        {
2461            let mut read_buf = object.allocate_buffer(4096).await;
2462            object.read(0, read_buf.as_mut()).await.expect("read failed");
2463            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2464        }
2465
2466        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2467        // one block earlier at offset 0
2468        object
2469            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2470            .await
2471            .expect_err("overwrite succeeded");
2472
2473        // We can't assert anything about the existing bytes, because they haven't been allocated
2474        // yet and they could contain any values
2475        object
2476            .overwrite(
2477                4096,
2478                write_buf.as_mut(),
2479                OverwriteOptions { allow_allocations: true, ..Default::default() },
2480            )
2481            .await
2482            .expect("overwrite failed");
2483        {
2484            let mut read_buf = object.allocate_buffer(4096).await;
2485            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2486            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2487        }
2488
2489        // Check that the overwrites haven't messed up the filesystem state
2490        let fsck_options = FsckOptions {
2491            fail_on_warning: true,
2492            no_lock: true,
2493            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2494            ..Default::default()
2495        };
2496        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2497
2498        fs.close().await.expect("Close failed");
2499    }
2500
2501    #[fuchsia::test]
2502    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2503        // The standard test data we put in the test object would cause an extent with checksums
2504        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2505        let (fs, object) = test_filesystem_and_empty_object().await;
2506
2507        let object = ObjectStore::open_object(
2508            object.owner(),
2509            object.object_id(),
2510            HandleOptions::default(),
2511            None,
2512        )
2513        .await
2514        .expect("open_object failed");
2515
2516        assert_eq!(fs.block_size(), 4096);
2517        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2518
2519        // Let's create some non-holes
2520        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2521        object
2522            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2523            .await
2524            .expect("preallocate_range failed");
2525        object
2526            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2527            .await
2528            .expect("preallocate_range failed");
2529        object
2530            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2531            .await
2532            .expect("preallocate_range failed");
2533        object
2534            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2535            .await
2536            .expect("preallocate_range failed");
2537        transaction.commit().await.expect("commit failed");
2538
2539        assert_eq!(object.get_size(), 524288);
2540
2541        let mut write_buf = object.allocate_buffer(4096).await;
2542        write_buf.as_mut_slice().fill(95);
2543
2544        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2545        object
2546            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2547            .await
2548            .expect_err("overwrite succeeded");
2549        object
2550            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2551            .await
2552            .expect_err("overwrite succeeded");
2553        object
2554            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2555            .await
2556            .expect_err("overwrite succeeded");
2557        object
2558            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2559            .await
2560            .expect_err("overwrite succeeded");
2561
2562        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2563        {
2564            let mut read_buf = object.allocate_buffer(4096).await;
2565            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2566            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2567        }
2568        object
2569            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2570            .await
2571            .expect("overwrite failed");
2572        {
2573            let mut read_buf = object.allocate_buffer(4096).await;
2574            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2575            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2576        }
2577        {
2578            let mut read_buf = object.allocate_buffer(4096).await;
2579            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2580            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2581        }
2582        object
2583            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2584            .await
2585            .expect("overwrite failed");
2586        {
2587            let mut read_buf = object.allocate_buffer(4096).await;
2588            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2589            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2590        }
2591        {
2592            let mut read_buf = object.allocate_buffer(4096).await;
2593            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2594            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2595        }
2596        object
2597            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2598            .await
2599            .expect("overwrite failed");
2600        {
2601            let mut read_buf = object.allocate_buffer(4096).await;
2602            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2603            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2604        }
2605        {
2606            let mut read_buf = object.allocate_buffer(4096).await;
2607            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2608            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2609        }
2610        object
2611            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2612            .await
2613            .expect("overwrite failed");
2614        {
2615            let mut read_buf = object.allocate_buffer(4096).await;
2616            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2617            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2618        }
2619
2620        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2621        let mut huge_write_buf = object.allocate_buffer(524288).await;
2622        huge_write_buf.as_mut_slice().fill(96);
2623
2624        // With allocations disabled, the big overwrite should fail...
2625        object
2626            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2627            .await
2628            .expect_err("overwrite succeeded");
2629        // ... but it should work when allocations are enabled
2630        object
2631            .overwrite(
2632                0,
2633                huge_write_buf.as_mut(),
2634                OverwriteOptions { allow_allocations: true, ..Default::default() },
2635            )
2636            .await
2637            .expect("overwrite failed");
2638        {
2639            let mut read_buf = object.allocate_buffer(524288).await;
2640            object.read(0, read_buf.as_mut()).await.expect("read failed");
2641            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2642        }
2643
2644        // Check that the overwrites haven't messed up the filesystem state
2645        let fsck_options = FsckOptions {
2646            fail_on_warning: true,
2647            no_lock: true,
2648            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2649            ..Default::default()
2650        };
2651        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2652
2653        fs.close().await.expect("Close failed");
2654    }
2655
2656    #[fuchsia::test]
2657    async fn test_overwrite_when_unallocated_at_start_of_file() {
2658        // The standard test data we put in the test object would cause an extent with checksums
2659        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2660        let (fs, object) = test_filesystem_and_empty_object().await;
2661
2662        let object = ObjectStore::open_object(
2663            object.owner(),
2664            object.object_id(),
2665            HandleOptions::default(),
2666            None,
2667        )
2668        .await
2669        .expect("open_object failed");
2670
2671        assert_eq!(fs.block_size(), 4096);
2672
2673        let mut write_buf = object.allocate_buffer(4096).await;
2674        write_buf.as_mut_slice().fill(95);
2675
2676        // First try to overwrite without allowing allocations
2677        // We expect this to fail, since nothing is allocated yet
2678        object
2679            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2680            .await
2681            .expect_err("overwrite succeeded");
2682
2683        // Now try the same overwrite command as before, but allow allocations
2684        object
2685            .overwrite(
2686                0,
2687                write_buf.as_mut(),
2688                OverwriteOptions { allow_allocations: true, ..Default::default() },
2689            )
2690            .await
2691            .expect("overwrite failed");
2692        {
2693            let mut read_buf = object.allocate_buffer(4096).await;
2694            object.read(0, read_buf.as_mut()).await.expect("read failed");
2695            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2696        }
2697
2698        // Now try to overwrite at the next block. This should fail if allocations are disabled
2699        object
2700            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2701            .await
2702            .expect_err("overwrite succeeded");
2703
2704        // ... but it should work if allocations are enabled
2705        object
2706            .overwrite(
2707                4096,
2708                write_buf.as_mut(),
2709                OverwriteOptions { allow_allocations: true, ..Default::default() },
2710            )
2711            .await
2712            .expect("overwrite failed");
2713        {
2714            let mut read_buf = object.allocate_buffer(4096).await;
2715            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2716            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2717        }
2718
2719        // Check that the overwrites haven't messed up the filesystem state
2720        let fsck_options = FsckOptions {
2721            fail_on_warning: true,
2722            no_lock: true,
2723            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2724            ..Default::default()
2725        };
2726        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2727
2728        fs.close().await.expect("Close failed");
2729    }
2730
2731    #[fuchsia::test]
2732    async fn test_overwrite_can_extend_a_file() {
2733        // The standard test data we put in the test object would cause an extent with checksums
2734        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2735        let (fs, object) = test_filesystem_and_empty_object().await;
2736
2737        let object = ObjectStore::open_object(
2738            object.owner(),
2739            object.object_id(),
2740            HandleOptions::default(),
2741            None,
2742        )
2743        .await
2744        .expect("open_object failed");
2745
2746        assert_eq!(fs.block_size(), 4096);
2747        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2748
2749        let mut write_buf = object.allocate_buffer(4096).await;
2750        write_buf.as_mut_slice().fill(95);
2751
2752        // Let's try to fill up the last block, and increase the file size in doing so
2753        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2754
2755        // Expected to fail with allocations disabled
2756        object
2757            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2758            .await
2759            .expect_err("overwrite succeeded");
2760        // ... but expected to succeed with allocations enabled
2761        object
2762            .overwrite(
2763                last_block_offset,
2764                write_buf.as_mut(),
2765                OverwriteOptions { allow_allocations: true, ..Default::default() },
2766            )
2767            .await
2768            .expect("overwrite failed");
2769        {
2770            let mut read_buf = object.allocate_buffer(4096).await;
2771            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2772            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2773        }
2774
2775        assert_eq!(object.get_size(), 8192);
2776
2777        // Let's try to write at the next block, too
2778        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2779
2780        // Expected to fail with allocations disabled
2781        object
2782            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2783            .await
2784            .expect_err("overwrite succeeded");
2785        // ... but expected to succeed with allocations enabled
2786        object
2787            .overwrite(
2788                next_block_offset,
2789                write_buf.as_mut(),
2790                OverwriteOptions { allow_allocations: true, ..Default::default() },
2791            )
2792            .await
2793            .expect("overwrite failed");
2794        {
2795            let mut read_buf = object.allocate_buffer(4096).await;
2796            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2797            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2798        }
2799
2800        assert_eq!(object.get_size(), 12288);
2801
2802        // Check that the overwrites haven't messed up the filesystem state
2803        let fsck_options = FsckOptions {
2804            fail_on_warning: true,
2805            no_lock: true,
2806            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2807            ..Default::default()
2808        };
2809        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2810
2811        fs.close().await.expect("Close failed");
2812    }
2813
2814    #[fuchsia::test]
2815    async fn test_enable_verity() {
2816        let fs: OpenFxFilesystem = test_filesystem().await;
2817        let mut transaction = fs
2818            .clone()
2819            .new_transaction(lock_keys![], Options::default())
2820            .await
2821            .expect("new_transaction failed");
2822        let store = fs.root_store();
2823        let object = Arc::new(
2824            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2825                .await
2826                .expect("create_object failed"),
2827        );
2828
2829        transaction.commit().await.unwrap();
2830
2831        object
2832            .enable_verity(fio::VerificationOptions {
2833                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2834                salt: Some(vec![]),
2835                ..Default::default()
2836            })
2837            .await
2838            .expect("set verified file metadata failed");
2839
2840        let handle =
2841            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2842                .await
2843                .expect("open_object failed");
2844
2845        assert!(handle.is_verified_file());
2846
2847        fs.close().await.expect("Close failed");
2848    }
2849
2850    #[fuchsia::test]
2851    async fn test_enable_verity_large_file() {
2852        // Need to make a large FakeDevice to create space for a 67 MB file.
2853        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2854        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2855        let root_store = fs.root_store();
2856        let mut transaction = fs
2857            .clone()
2858            .new_transaction(lock_keys![], Options::default())
2859            .await
2860            .expect("new_transaction failed");
2861
2862        let handle = ObjectStore::create_object(
2863            &root_store,
2864            &mut transaction,
2865            HandleOptions::default(),
2866            None,
2867        )
2868        .await
2869        .expect("failed to create object");
2870        transaction.commit().await.expect("commit failed");
2871        let mut offset = 0;
2872
2873        // Write a file big enough to trigger multiple transactions on enable_verity().
2874        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2875        buf.as_mut_slice().fill(1);
2876        for _ in 0..130 {
2877            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2878            offset += WRITE_ATTR_BATCH_SIZE as u64;
2879        }
2880
2881        handle
2882            .enable_verity(fio::VerificationOptions {
2883                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2884                salt: Some(vec![]),
2885                ..Default::default()
2886            })
2887            .await
2888            .expect("set verified file metadata failed");
2889
2890        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2891        offset = 0;
2892        for _ in 0..130 {
2893            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2894            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2895            offset += WRITE_ATTR_BATCH_SIZE as u64;
2896        }
2897
2898        fsck(fs.clone()).await.expect("fsck failed");
2899        fs.close().await.expect("Close failed");
2900    }
2901
2902    #[fuchsia::test]
2903    async fn test_retry_enable_verity_on_reboot() {
2904        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2905        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2906        let root_store = fs.root_store();
2907        let mut transaction = fs
2908            .clone()
2909            .new_transaction(lock_keys![], Options::default())
2910            .await
2911            .expect("new_transaction failed");
2912
2913        let handle = ObjectStore::create_object(
2914            &root_store,
2915            &mut transaction,
2916            HandleOptions::default(),
2917            None,
2918        )
2919        .await
2920        .expect("failed to create object");
2921        transaction.commit().await.expect("commit failed");
2922
2923        let object_id = {
2924            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2925            transaction.add(
2926                root_store.store_object_id(),
2927                Mutation::replace_or_insert_object(
2928                    ObjectKey::graveyard_attribute_entry(
2929                        root_store.graveyard_directory_object_id(),
2930                        handle.object_id(),
2931                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2932                    ),
2933                    ObjectValue::Some,
2934                ),
2935            );
2936
2937            // This write should span three transactions. This test mimics the behavior when the
2938            // last transaction gets interrupted by a filesystem.close().
2939            handle
2940                .write_new_attr_in_batches(
2941                    &mut transaction,
2942                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2943                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2944                    WRITE_ATTR_BATCH_SIZE,
2945                )
2946                .await
2947                .expect("failed to write merkle attribute");
2948
2949            handle.object_id()
2950            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2951            // release the transaction locks.
2952        };
2953
2954        fs.close().await.expect("failed to close filesystem");
2955        let device = fs.take_device().await;
2956        device.reopen(false);
2957
2958        let fs =
2959            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2960        fsck(fs.clone()).await.expect("fsck failed");
2961        fs.close().await.expect("failed to close filesystem");
2962        let device = fs.take_device().await;
2963        device.reopen(false);
2964
2965        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2966        let fs = FxFilesystem::open(device).await.expect("open failed");
2967        let root_store = fs.root_store();
2968        let handle =
2969            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2970                .await
2971                .expect("open_object failed");
2972        handle
2973            .enable_verity(fio::VerificationOptions {
2974                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2975                salt: Some(vec![]),
2976                ..Default::default()
2977            })
2978            .await
2979            .expect("set verified file metadata failed");
2980
2981        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2982        // isn't strictly necessary for the test to pass (the graveyard marker was already
2983        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2984        // graveyard entry not being removed upon processing.
2985        fs.graveyard().flush().await;
2986        assert!(
2987            FsVerityDescriptor::from_bytes(
2988                &handle
2989                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2990                    .await
2991                    .expect("read_attr failed")
2992                    .expect("No attr found"),
2993                handle.block_size() as usize
2994            )
2995            .is_ok()
2996        );
2997        fsck(fs.clone()).await.expect("fsck failed");
2998        fs.close().await.expect("Close failed");
2999    }
3000
3001    #[fuchsia::test]
3002    async fn test_verify_data_corrupt_file() {
3003        let fs: OpenFxFilesystem = test_filesystem().await;
3004        let mut transaction = fs
3005            .clone()
3006            .new_transaction(lock_keys![], Options::default())
3007            .await
3008            .expect("new_transaction failed");
3009        let store = fs.root_store();
3010        let object = Arc::new(
3011            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3012                .await
3013                .expect("create_object failed"),
3014        );
3015
3016        transaction.commit().await.unwrap();
3017
3018        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3019        buf.as_mut_slice().fill(123);
3020        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3021
3022        object
3023            .enable_verity(fio::VerificationOptions {
3024                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3025                salt: Some(vec![]),
3026                ..Default::default()
3027            })
3028            .await
3029            .expect("set verified file metadata failed");
3030
3031        // Change file contents and ensure verification fails
3032        buf.as_mut_slice().fill(234);
3033        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3034        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
3035
3036        fs.close().await.expect("Close failed");
3037    }
3038
3039    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
3040    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
3041    // paths.
3042    #[fuchsia::test]
3043    async fn test_parse_f2fs_verity() {
3044        let fs: OpenFxFilesystem = test_filesystem().await;
3045        let mut transaction = fs
3046            .clone()
3047            .new_transaction(lock_keys![], Options::default())
3048            .await
3049            .expect("new_transaction failed");
3050        let store = fs.root_store();
3051        let object = Arc::new(
3052            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3053                .await
3054                .expect("create_object failed"),
3055        );
3056
3057        transaction.commit().await.unwrap();
3058        let file_size = fs.block_size() * 2;
3059        // Write over one block to make there be leaf hashes.
3060        {
3061            let mut buf = object.allocate_buffer(file_size as usize).await;
3062            buf.as_mut_slice().fill(64);
3063            assert_eq!(
3064                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3065                file_size
3066            );
3067        }
3068
3069        // Enable verity normally, then shift the type.
3070        object
3071            .enable_verity(fio::VerificationOptions {
3072                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3073                salt: Some(vec![]),
3074                ..Default::default()
3075            })
3076            .await
3077            .expect("set verified file metadata failed");
3078        let (verity_info, root_hash) = object.get_descriptor().unwrap();
3079
3080        let mut transaction = fs
3081            .clone()
3082            .new_transaction(
3083                lock_keys![LockKey::Object {
3084                    store_object_id: store.store_object_id(),
3085                    object_id: object.object_id()
3086                }],
3087                Options::default(),
3088            )
3089            .await
3090            .expect("new_transaction failed");
3091        transaction.add(
3092            store.store_object_id(),
3093            Mutation::replace_or_insert_object(
3094                ObjectKey::attribute(
3095                    object.object_id(),
3096                    DEFAULT_DATA_ATTRIBUTE_ID,
3097                    AttributeKey::Attribute,
3098                ),
3099                ObjectValue::verified_attribute(
3100                    file_size,
3101                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3102                ),
3103            ),
3104        );
3105        transaction.add(
3106            store.store_object_id(),
3107            Mutation::replace_or_insert_object(
3108                ObjectKey::attribute(
3109                    object.object_id(),
3110                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3111                    AttributeKey::Attribute,
3112                ),
3113                ObjectValue::attribute(fs.block_size() * 2, false),
3114            ),
3115        );
3116        {
3117            let descriptor = FsVerityDescriptorRaw::new(
3118                fio::HashAlgorithm::Sha256,
3119                fs.block_size(),
3120                file_size,
3121                root_hash.as_slice(),
3122                match &verity_info.salt {
3123                    Some(salt) => salt.as_slice(),
3124                    None => [0u8; 0].as_slice(),
3125                },
3126            )
3127            .expect("Creating descriptor");
3128            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3129            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3130            object
3131                .multi_write(
3132                    &mut transaction,
3133                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3134                    &[fs.block_size()..(fs.block_size() * 2)],
3135                    buf.as_mut(),
3136                )
3137                .await
3138                .expect("Writing descriptor");
3139        }
3140        transaction.commit().await.unwrap();
3141
3142        let handle =
3143            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3144                .await
3145                .expect("open_object failed");
3146
3147        assert!(handle.is_verified_file());
3148
3149        let mut buf = object.allocate_buffer(file_size as usize).await;
3150        assert_eq!(
3151            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3152            file_size as usize
3153        );
3154
3155        fs.close().await.expect("Close failed");
3156    }
3157
3158    #[fuchsia::test]
3159    async fn test_verify_data_corrupt_tree() {
3160        let fs: OpenFxFilesystem = test_filesystem().await;
3161        let object_id = {
3162            let store = fs.root_store();
3163            let mut transaction = fs
3164                .clone()
3165                .new_transaction(lock_keys![], Options::default())
3166                .await
3167                .expect("new_transaction failed");
3168            let object = Arc::new(
3169                ObjectStore::create_object(
3170                    &store,
3171                    &mut transaction,
3172                    HandleOptions::default(),
3173                    None,
3174                )
3175                .await
3176                .expect("create_object failed"),
3177            );
3178            let object_id = object.object_id();
3179
3180            transaction.commit().await.unwrap();
3181
3182            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3183            buf.as_mut_slice().fill(123);
3184            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3185
3186            object
3187                .enable_verity(fio::VerificationOptions {
3188                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3189                    salt: Some(vec![]),
3190                    ..Default::default()
3191                })
3192                .await
3193                .expect("set verified file metadata failed");
3194            object.read(0, buf.as_mut()).await.expect("verified read");
3195
3196            // Corrupt the merkle tree before closing.
3197            let mut merkle = object
3198                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3199                .await
3200                .unwrap()
3201                .expect("Reading merkle tree");
3202            merkle[0] = merkle[0].wrapping_add(1);
3203            object
3204                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3205                .await
3206                .expect("Overwriting merkle");
3207
3208            object_id
3209        }; // Close object.
3210
3211        // Reopening the object should complain about the corrupted merkle tree.
3212        assert!(
3213            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3214                .await
3215                .is_err()
3216        );
3217        fs.close().await.expect("Close failed");
3218    }
3219
3220    #[fuchsia::test]
3221    async fn test_extend() {
3222        let fs = test_filesystem().await;
3223        let handle;
3224        let mut transaction = fs
3225            .clone()
3226            .new_transaction(lock_keys![], Options::default())
3227            .await
3228            .expect("new_transaction failed");
3229        let store = fs.root_store();
3230        handle =
3231            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3232                .await
3233                .expect("create_object failed");
3234
3235        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3236        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3237        // of 2MiB here.
3238        const START_OFFSET: u64 = 2048 * 1024;
3239        handle
3240            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3241            .await
3242            .expect("extend failed");
3243        transaction.commit().await.expect("commit failed");
3244        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3245        buf.as_mut_slice().fill(123);
3246        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3247        buf.as_mut_slice().fill(67);
3248        handle.read(0, buf.as_mut()).await.expect("read failed");
3249        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3250        fs.close().await.expect("Close failed");
3251    }
3252
3253    #[fuchsia::test]
3254    async fn test_truncate_deallocates_old_extents() {
3255        let (fs, object) = test_filesystem_and_object().await;
3256        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3257        buf.as_mut_slice().fill(0xaa);
3258        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3259
3260        let allocator = fs.allocator();
3261        let allocated_before = allocator.get_allocated_bytes();
3262        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3263        let allocated_after = allocator.get_allocated_bytes();
3264        assert!(
3265            allocated_after < allocated_before,
3266            "before = {} after = {}",
3267            allocated_before,
3268            allocated_after
3269        );
3270        fs.close().await.expect("Close failed");
3271    }
3272
3273    #[fuchsia::test]
3274    async fn test_truncate_zeroes_tail_block() {
3275        let (fs, object) = test_filesystem_and_object().await;
3276
3277        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3278        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3279            .await
3280            .expect("truncate failed");
3281
3282        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3283        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3284        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3285
3286        let mut expected = TEST_DATA.to_vec();
3287        expected[3..].fill(0);
3288        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3289    }
3290
3291    #[fuchsia::test]
3292    async fn test_trim() {
3293        // Format a new filesystem.
3294        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3295        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3296        let block_size = fs.block_size();
3297        root_volume(fs.clone())
3298            .await
3299            .expect("root_volume failed")
3300            .new_volume("test", NewChildStoreOptions::default())
3301            .await
3302            .expect("volume failed");
3303        fs.close().await.expect("close failed");
3304        let device = fs.take_device().await;
3305        device.reopen(false);
3306
3307        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3308        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3309        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3310        // graveyard trims the file as expected.
3311        #[derive(Default)]
3312        struct Context {
3313            store: Option<Arc<ObjectStore>>,
3314            object_id: Option<u64>,
3315        }
3316        let shared_context = Arc::new(Mutex::new(Context::default()));
3317
3318        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3319
3320        // Wait for an object to get tombstoned by the graveyard.
3321        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3322            loop {
3323                if let Err(e) =
3324                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3325                {
3326                    assert!(
3327                        FxfsError::NotFound.matches(&e),
3328                        "open_object didn't fail with NotFound: {:?}",
3329                        e
3330                    );
3331                    break;
3332                }
3333                // The graveyard should eventually tombstone the object.
3334                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3335            }
3336        }
3337
3338        // Checks to see if the object needs to be trimmed.
3339        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3340            let root_directory = Directory::open(store, store.root_directory_object_id())
3341                .await
3342                .expect("open failed");
3343            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3344            if let Some((oid, _, _)) = oid {
3345                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3346                    .await
3347                    .expect("open_object failed");
3348                let props = object.get_properties().await.expect("get_properties failed");
3349                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3350                    Some(object)
3351                } else {
3352                    None
3353                }
3354            } else {
3355                None
3356            }
3357        }
3358
3359        let shared_context_clone = shared_context.clone();
3360        let post_commit = move || {
3361            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3362            let shared_context = shared_context_clone.clone();
3363            async move {
3364                // First run fsck on the current filesystem.
3365                let options = FsckOptions {
3366                    fail_on_warning: true,
3367                    no_lock: true,
3368                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3369                    ..Default::default()
3370                };
3371                let fs = store.filesystem();
3372
3373                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3374                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3375                    .await
3376                    .expect("fsck_volume_with_options failed");
3377
3378                // Now check that we can replay this correctly.
3379                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3380                    .await
3381                    .expect("sync failed");
3382                let device = fs.device().snapshot().expect("snapshot failed");
3383
3384                let object_id = shared_context.lock().object_id.clone();
3385
3386                let fs2 = FxFilesystemBuilder::new()
3387                    .skip_initial_reap(object_id.is_none())
3388                    .open(device)
3389                    .await
3390                    .expect("open failed");
3391
3392                // If the "foo" file exists check that allocated size matches content size.
3393                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3394                let store =
3395                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3396
3397                if let Some(oid) = object_id {
3398                    // For the second pass, the object should get tombstoned.
3399                    expect_tombstoned(&store, oid).await;
3400                } else if let Some(object) = needs_trim(&store).await {
3401                    // Extend the file and make sure that it is correctly trimmed.
3402                    object.truncate(object_size).await.expect("truncate failed");
3403                    let mut buf = object.allocate_buffer(block_size as usize).await;
3404                    object
3405                        .read(object_size - block_size * 2, buf.as_mut())
3406                        .await
3407                        .expect("read failed");
3408                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3409
3410                    // Remount, this time with the graveyard performing an initial reap and the
3411                    // object should get trimmed.
3412                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3413                        .await
3414                        .expect("open failed");
3415                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3416                    let store = root_vol
3417                        .volume("test", StoreOptions::default())
3418                        .await
3419                        .expect("volume failed");
3420                    while needs_trim(&store).await.is_some() {
3421                        // The object has been truncated, but still has some data allocated to
3422                        // it.  The graveyard should trim the object eventually.
3423                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3424                    }
3425
3426                    // Run fsck.
3427                    fsck_with_options(fs.clone(), &options)
3428                        .await
3429                        .expect("fsck_with_options failed");
3430                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3431                        .await
3432                        .expect("fsck_volume_with_options failed");
3433                    fs.close().await.expect("close failed");
3434                }
3435
3436                // Run fsck on fs2.
3437                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3438                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3439                    .await
3440                    .expect("fsck_volume_with_options failed");
3441                fs2.close().await.expect("close failed");
3442            }
3443            .boxed()
3444        };
3445
3446        let fs = FxFilesystemBuilder::new()
3447            .post_commit_hook(post_commit)
3448            .open(device)
3449            .await
3450            .expect("open failed");
3451
3452        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3453        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3454
3455        shared_context.lock().store = Some(store.clone());
3456
3457        let root_directory =
3458            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3459
3460        let object;
3461        let mut transaction = fs
3462            .clone()
3463            .new_transaction(
3464                lock_keys![LockKey::object(
3465                    store.store_object_id(),
3466                    store.root_directory_object_id()
3467                )],
3468                Options::default(),
3469            )
3470            .await
3471            .expect("new_transaction failed");
3472        object = root_directory
3473            .create_child_file(&mut transaction, "foo")
3474            .await
3475            .expect("create_object failed");
3476        transaction.commit().await.expect("commit failed");
3477
3478        let mut transaction = fs
3479            .clone()
3480            .new_transaction(
3481                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3482                Options::default(),
3483            )
3484            .await
3485            .expect("new_transaction failed");
3486
3487        // Two passes: first with a regular object, and then with that object moved into the
3488        // graveyard.
3489        let mut pass = 0;
3490        loop {
3491            // Create enough extents in it such that when we truncate the object it will require
3492            // more than one transaction.
3493            let mut buf = object.allocate_buffer(5).await;
3494            buf.as_mut_slice().fill(1);
3495            // Write every other block.
3496            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3497                object
3498                    .txn_write(&mut transaction, offset, buf.as_ref())
3499                    .await
3500                    .expect("write failed");
3501            }
3502            transaction.commit().await.expect("commit failed");
3503            // This should take up more than one transaction.
3504            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3505
3506            if pass == 1 {
3507                break;
3508            }
3509
3510            // Store the object ID so that we can make sure the object is always tombstoned
3511            // after remount (see above).
3512            shared_context.lock().object_id = Some(object.object_id());
3513
3514            transaction = fs
3515                .clone()
3516                .new_transaction(
3517                    lock_keys![
3518                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3519                        LockKey::object(store.store_object_id(), object.object_id()),
3520                    ],
3521                    Options::default(),
3522                )
3523                .await
3524                .expect("new_transaction failed");
3525
3526            // Move the object into the graveyard.
3527            replace_child(&mut transaction, None, (&root_directory, "foo"))
3528                .await
3529                .expect("replace_child failed");
3530            store.add_to_graveyard(&mut transaction, object.object_id());
3531
3532            pass += 1;
3533        }
3534
3535        fs.close().await.expect("Close failed");
3536    }
3537
3538    #[fuchsia::test]
3539    async fn test_adjust_refs() {
3540        let (fs, object) = test_filesystem_and_object().await;
3541        let store = object.owner();
3542        let mut transaction = fs
3543            .clone()
3544            .new_transaction(
3545                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3546                Options::default(),
3547            )
3548            .await
3549            .expect("new_transaction failed");
3550        assert_eq!(
3551            store
3552                .adjust_refs(&mut transaction, object.object_id(), 1)
3553                .await
3554                .expect("adjust_refs failed"),
3555            false
3556        );
3557        transaction.commit().await.expect("commit failed");
3558
3559        let allocator = fs.allocator();
3560        let allocated_before = allocator.get_allocated_bytes();
3561        let mut transaction = fs
3562            .clone()
3563            .new_transaction(
3564                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3565                Options::default(),
3566            )
3567            .await
3568            .expect("new_transaction failed");
3569        assert_eq!(
3570            store
3571                .adjust_refs(&mut transaction, object.object_id(), -2)
3572                .await
3573                .expect("adjust_refs failed"),
3574            true
3575        );
3576        transaction.commit().await.expect("commit failed");
3577
3578        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3579
3580        store
3581            .tombstone_object(
3582                object.object_id(),
3583                Options { borrow_metadata_space: true, ..Default::default() },
3584            )
3585            .await
3586            .expect("purge failed");
3587
3588        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3589
3590        // We need to remove the directory entry, too, otherwise fsck will complain
3591        {
3592            let mut transaction = fs
3593                .clone()
3594                .new_transaction(
3595                    lock_keys![LockKey::object(
3596                        store.store_object_id(),
3597                        store.root_directory_object_id()
3598                    )],
3599                    Options::default(),
3600                )
3601                .await
3602                .expect("new_transaction failed");
3603            let root_directory = Directory::open(&store, store.root_directory_object_id())
3604                .await
3605                .expect("open failed");
3606            transaction.add(
3607                store.store_object_id(),
3608                Mutation::replace_or_insert_object(
3609                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, DirType::Normal),
3610                    ObjectValue::None,
3611                ),
3612            );
3613            transaction.commit().await.expect("commit failed");
3614        }
3615
3616        fsck_with_options(
3617            fs.clone(),
3618            &FsckOptions {
3619                fail_on_warning: true,
3620                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3621                ..Default::default()
3622            },
3623        )
3624        .await
3625        .expect("fsck_with_options failed");
3626
3627        fs.close().await.expect("Close failed");
3628    }
3629
3630    #[fuchsia::test]
3631    async fn test_locks() {
3632        let (fs, object) = test_filesystem_and_object().await;
3633        let (send1, recv1) = channel();
3634        let (send2, recv2) = channel();
3635        let (send3, recv3) = channel();
3636        let done = Mutex::new(false);
3637        let mut futures = FuturesUnordered::new();
3638        futures.push(
3639            async {
3640                let mut t = object.new_transaction().await.expect("new_transaction failed");
3641                send1.send(()).unwrap(); // Tell the next future to continue.
3642                send3.send(()).unwrap(); // Tell the last future to continue.
3643                recv2.await.unwrap();
3644                let mut buf = object.allocate_buffer(5).await;
3645                buf.as_mut_slice().copy_from_slice(b"hello");
3646                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3647                // This is a halting problem so all we can do is sleep.
3648                fasync::Timer::new(Duration::from_millis(100)).await;
3649                assert!(!*done.lock());
3650                t.commit().await.expect("commit failed");
3651            }
3652            .boxed(),
3653        );
3654        futures.push(
3655            async {
3656                recv1.await.unwrap();
3657                // Reads should not block.
3658                let offset = TEST_DATA_OFFSET as usize;
3659                let align = offset % fs.block_size() as usize;
3660                let len = TEST_DATA.len();
3661                let mut buf = object.allocate_buffer(align + len).await;
3662                assert_eq!(
3663                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3664                    align + TEST_DATA.len()
3665                );
3666                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3667                // Tell the first future to continue.
3668                send2.send(()).unwrap();
3669            }
3670            .boxed(),
3671        );
3672        futures.push(
3673            async {
3674                // This should block until the first future has completed.
3675                recv3.await.unwrap();
3676                let _t = object.new_transaction().await.expect("new_transaction failed");
3677                let mut buf = object.allocate_buffer(5).await;
3678                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3679                assert_eq!(buf.as_slice(), b"hello");
3680            }
3681            .boxed(),
3682        );
3683        while let Some(()) = futures.next().await {}
3684        fs.close().await.expect("Close failed");
3685    }
3686
3687    #[fuchsia::test(threads = 10)]
3688    async fn test_racy_reads() {
3689        let fs = test_filesystem().await;
3690        let object;
3691        let mut transaction = fs
3692            .clone()
3693            .new_transaction(lock_keys![], Options::default())
3694            .await
3695            .expect("new_transaction failed");
3696        let store = fs.root_store();
3697        object = Arc::new(
3698            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3699                .await
3700                .expect("create_object failed"),
3701        );
3702        transaction.commit().await.expect("commit failed");
3703        for _ in 0..100 {
3704            let cloned_object = object.clone();
3705            let writer = fasync::Task::spawn(async move {
3706                let mut buf = cloned_object.allocate_buffer(10).await;
3707                buf.as_mut_slice().fill(123);
3708                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3709            });
3710            let cloned_object = object.clone();
3711            let reader = fasync::Task::spawn(async move {
3712                let wait_time = rand::random_range(0..5);
3713                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3714                let mut buf = cloned_object.allocate_buffer(10).await;
3715                buf.as_mut_slice().fill(23);
3716                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3717                // If we succeed in reading data, it must include the write; i.e. if we see the size
3718                // change, we should see the data too.  For this to succeed it requires locking on
3719                // the read size to ensure that when we read the size, we get the extents changed in
3720                // that same transaction.
3721                if amount != 0 {
3722                    assert_eq!(amount, 10);
3723                    assert_eq!(buf.as_slice(), &[123; 10]);
3724                }
3725            });
3726            writer.await;
3727            reader.await;
3728            object.truncate(0).await.expect("truncate failed");
3729        }
3730        fs.close().await.expect("Close failed");
3731    }
3732
3733    #[fuchsia::test]
3734    async fn test_allocated_size() {
3735        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3736
3737        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3738        let mut buf = object.allocate_buffer(5).await;
3739        buf.as_mut_slice().copy_from_slice(b"hello");
3740        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3741        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3742        assert_eq!(after, before + fs.block_size() as u64);
3743
3744        // Do the same write again and there should be no change.
3745        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3746        assert_eq!(
3747            object.get_properties().await.expect("get_properties failed").allocated_size,
3748            after
3749        );
3750
3751        // extend...
3752        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3753        let offset = 1000 * fs.block_size() as u64;
3754        let before = after;
3755        object
3756            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3757            .await
3758            .expect("extend failed");
3759        transaction.commit().await.expect("commit failed");
3760        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3761        assert_eq!(after, before + fs.block_size() as u64);
3762
3763        // truncate...
3764        let before = after;
3765        let size = object.get_size();
3766        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3767        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3768        assert_eq!(after, before - fs.block_size() as u64);
3769
3770        // preallocate_range...
3771        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3772        let before = after;
3773        let mut file_range = offset..offset + fs.block_size() as u64;
3774        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3775        transaction.commit().await.expect("commit failed");
3776        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3777        assert_eq!(after, before + fs.block_size() as u64);
3778        fs.close().await.expect("Close failed");
3779    }
3780
3781    #[fuchsia::test(threads = 10)]
3782    async fn test_zero() {
3783        let (fs, object) = test_filesystem_and_object().await;
3784        let expected_size = object.get_size();
3785        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3786        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3787        transaction.commit().await.expect("commit failed");
3788        assert_eq!(object.get_size(), expected_size);
3789        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3790        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3791        assert_eq!(
3792            &buf.as_slice()[0..expected_size as usize],
3793            vec![0u8; expected_size as usize].as_slice()
3794        );
3795        fs.close().await.expect("Close failed");
3796    }
3797
3798    #[fuchsia::test]
3799    async fn test_properties() {
3800        let (fs, object) = test_filesystem_and_object().await;
3801        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3802        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3803        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3804
3805        // ObjectProperties can be updated through `update_attributes`.
3806        // `get_properties` should reflect the latest changes.
3807        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3808        object
3809            .update_attributes(
3810                &mut transaction,
3811                Some(&fio::MutableNodeAttributes {
3812                    creation_time: Some(CRTIME.as_nanos()),
3813                    modification_time: Some(MTIME.as_nanos()),
3814                    mode: Some(111),
3815                    gid: Some(222),
3816                    ..Default::default()
3817                }),
3818                None,
3819            )
3820            .await
3821            .expect("update_attributes failed");
3822        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3823        object
3824            .update_attributes(
3825                &mut transaction,
3826                Some(&fio::MutableNodeAttributes {
3827                    modification_time: Some(MTIME_NEW.as_nanos()),
3828                    gid: Some(333),
3829                    rdev: Some(444),
3830                    ..Default::default()
3831                }),
3832                Some(CTIME),
3833            )
3834            .await
3835            .expect("update_timestamps failed");
3836        transaction.commit().await.expect("commit failed");
3837
3838        let properties = object.get_properties().await.expect("get_properties failed");
3839        assert_matches!(
3840            properties,
3841            ObjectProperties {
3842                refs: 1u64,
3843                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3844                data_attribute_size: TEST_OBJECT_SIZE,
3845                creation_time: CRTIME,
3846                modification_time: MTIME_NEW,
3847                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3848                change_time: CTIME,
3849                ..
3850            }
3851        );
3852        fs.close().await.expect("Close failed");
3853    }
3854
3855    #[fuchsia::test]
3856    async fn test_is_allocated() {
3857        let (fs, object) = test_filesystem_and_object().await;
3858
3859        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3860        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3861        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3862        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3863
3864        // Check for the case where where we have the following extent layout
3865        //       [ unallocated ][ `TEST_DATA` ]
3866        // The extents before `aligned_offset` should not be allocated
3867        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3868        assert_eq!(count, aligned_offset);
3869        assert_eq!(allocated, false);
3870
3871        let (allocated, count) =
3872            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3873        assert_eq!(count, aligned_length);
3874        assert_eq!(allocated, true);
3875
3876        // Check for the case where where we query out of range
3877        let end = aligned_offset + aligned_length;
3878        object
3879            .is_allocated(end)
3880            .await
3881            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3882
3883        // Check for the case where where we start querying for allocation starting from
3884        // an allocated range to the end of the device
3885        let size = 50 * fs.block_size() as u64;
3886        object.truncate(size).await.expect("extend failed");
3887
3888        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3889        assert_eq!(count, size - end);
3890        assert_eq!(allocated, false);
3891
3892        // Check for the case where where we have the following extent layout
3893        //      [ unallocated ][ `buf` ][ `buf` ]
3894        let buf_length = 5 * fs.block_size();
3895        let mut buf = object.allocate_buffer(buf_length as usize).await;
3896        buf.as_mut_slice().fill(123);
3897        let new_offset = end + 20 * fs.block_size() as u64;
3898        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3899        object
3900            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3901            .await
3902            .expect("write failed");
3903
3904        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3905        assert_eq!(count, new_offset - end);
3906        assert_eq!(allocated, false);
3907
3908        let (allocated, count) =
3909            object.is_allocated(new_offset).await.expect("is_allocated failed");
3910        assert_eq!(count, 2 * buf_length);
3911        assert_eq!(allocated, true);
3912
3913        // Check the case where we query from the middle of an extent
3914        let (allocated, count) = object
3915            .is_allocated(new_offset + 4 * fs.block_size())
3916            .await
3917            .expect("is_allocated failed");
3918        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3919        assert_eq!(allocated, true);
3920
3921        // Now, write buffer to a location already written to.
3922        // Check for the case when we the following extent layout
3923        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3924        let other_buf_length = 3 * fs.block_size();
3925        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3926        other_buf.as_mut_slice().fill(231);
3927        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3928
3929        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3930        // allocated from `new_offset`
3931        let (allocated, count) =
3932            object.is_allocated(new_offset).await.expect("is_allocated failed");
3933        assert_eq!(count, 2 * buf_length);
3934        assert_eq!(allocated, true);
3935
3936        // Check for the case when we the following extent layout
3937        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3938        // Mark TEST_DATA as deleted
3939        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3940        object
3941            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3942            .await
3943            .expect("zero failed");
3944        // Mark `other_buf` as deleted
3945        object
3946            .zero(&mut transaction, new_offset..new_offset + buf_length)
3947            .await
3948            .expect("zero failed");
3949        transaction.commit().await.expect("commit transaction failed");
3950
3951        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3952        assert_eq!(count, new_offset + buf_length);
3953        assert_eq!(allocated, false);
3954
3955        let (allocated, count) =
3956            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3957        assert_eq!(count, buf_length);
3958        assert_eq!(allocated, true);
3959
3960        let new_end = new_offset + buf_length + count;
3961
3962        // Check for the case where there are objects with different keys.
3963        // Case that we're checking for:
3964        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3965        let store = object.owner();
3966        let mut transaction = fs
3967            .clone()
3968            .new_transaction(lock_keys![], Options::default())
3969            .await
3970            .expect("new_transaction failed");
3971        let object2 =
3972            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3973                .await
3974                .expect("create_object failed");
3975        transaction.commit().await.expect("commit failed");
3976
3977        object2
3978            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3979            .await
3980            .expect("write failed");
3981
3982        // Expecting that the extent with a different key is treated like unallocated extent
3983        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3984        assert_eq!(count, size - new_end);
3985        assert_eq!(allocated, false);
3986
3987        fs.close().await.expect("close failed");
3988    }
3989
3990    #[fuchsia::test(threads = 10)]
3991    async fn test_read_write_attr() {
3992        let (_fs, object) = test_filesystem_and_object().await;
3993        let data = [0xffu8; 16_384];
3994        object.write_attr(20, &data).await.expect("write_attr failed");
3995        let rdata =
3996            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3997        assert_eq!(&data[..], &rdata[..]);
3998
3999        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
4000    }
4001
4002    #[fuchsia::test(threads = 10)]
4003    async fn test_allocate_basic() {
4004        let (fs, object) = test_filesystem_and_empty_object().await;
4005        let block_size = fs.block_size();
4006        let file_size = block_size * 10;
4007        object.truncate(file_size).await.unwrap();
4008
4009        let small_buf_size = 1024;
4010        let large_buf_aligned_size = block_size as usize * 2;
4011        let large_buf_size = block_size as usize * 2 + 1024;
4012
4013        let mut small_buf = object.allocate_buffer(small_buf_size).await;
4014        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
4015        let mut large_buf = object.allocate_buffer(large_buf_size).await;
4016
4017        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
4018        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
4019        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
4020        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
4021        assert_eq!(
4022            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
4023            large_buf_aligned_size
4024        );
4025        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
4026
4027        // Allocation succeeds, and without any writes to the location it shows up as zero.
4028        object.allocate(block_size..block_size * 3).await.unwrap();
4029
4030        // Test starting before, inside, and after the allocated section with every sized buffer.
4031        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
4032            for offset in 0..4 {
4033                assert_eq!(
4034                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
4035                    buf.len(),
4036                    "buf_index: {}, read offset: {}",
4037                    buf_index,
4038                    offset,
4039                );
4040                assert_eq!(
4041                    buf.as_slice(),
4042                    &vec![0; buf.len()],
4043                    "buf_index: {}, read offset: {}",
4044                    buf_index,
4045                    offset,
4046                );
4047            }
4048        }
4049
4050        fs.close().await.expect("close failed");
4051    }
4052
4053    #[fuchsia::test(threads = 10)]
4054    async fn test_allocate_extends_file() {
4055        const BUF_SIZE: usize = 1024;
4056        let (fs, object) = test_filesystem_and_empty_object().await;
4057        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4058        let block_size = fs.block_size();
4059
4060        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4061        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4062
4063        assert!(TEST_OBJECT_SIZE < block_size * 4);
4064        // Allocation succeeds, and without any writes to the location it shows up as zero.
4065        object.allocate(0..block_size * 4).await.unwrap();
4066        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4067        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4068        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4069        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4070        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4071        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4072
4073        fs.close().await.expect("close failed");
4074    }
4075
4076    #[fuchsia::test(threads = 10)]
4077    async fn test_allocate_past_end() {
4078        const BUF_SIZE: usize = 1024;
4079        let (fs, object) = test_filesystem_and_empty_object().await;
4080        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4081        let block_size = fs.block_size();
4082
4083        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4084        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4085
4086        assert!(TEST_OBJECT_SIZE < block_size * 4);
4087        // Allocation succeeds, and without any writes to the location it shows up as zero.
4088        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4089        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4090        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4091        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4092        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4093        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4094        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4095
4096        fs.close().await.expect("close failed");
4097    }
4098
4099    #[fuchsia::test(threads = 10)]
4100    async fn test_allocate_read_attr() {
4101        let (fs, object) = test_filesystem_and_empty_object().await;
4102        let block_size = fs.block_size();
4103        let file_size = block_size * 4;
4104        object.truncate(file_size).await.unwrap();
4105
4106        let content = object
4107            .read_attr(object.attribute_id())
4108            .await
4109            .expect("failed to read attr")
4110            .expect("attr returned none");
4111        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4112
4113        object.allocate(block_size..block_size * 3).await.unwrap();
4114
4115        let content = object
4116            .read_attr(object.attribute_id())
4117            .await
4118            .expect("failed to read attr")
4119            .expect("attr returned none");
4120        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4121
4122        fs.close().await.expect("close failed");
4123    }
4124
4125    #[fuchsia::test(threads = 10)]
4126    async fn test_allocate_existing_data() {
4127        struct Case {
4128            written_ranges: Vec<Range<usize>>,
4129            allocate_range: Range<u64>,
4130        }
4131        let cases = [
4132            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4133            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4134            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4135            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4136            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4137            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4138            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4139        ];
4140
4141        for case in cases {
4142            let (fs, object) = test_filesystem_and_empty_object().await;
4143            let block_size = fs.block_size();
4144            let file_size = block_size * 10;
4145            object.truncate(file_size).await.unwrap();
4146
4147            for write in &case.written_ranges {
4148                let write_len = (write.end - write.start) * block_size as usize;
4149                let mut write_buf = object.allocate_buffer(write_len).await;
4150                write_buf.as_mut_slice().fill(0xff);
4151                assert_eq!(
4152                    object
4153                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4154                        .await
4155                        .unwrap(),
4156                    file_size
4157                );
4158            }
4159
4160            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4161            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4162
4163            object
4164                .allocate(
4165                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4166                )
4167                .await
4168                .unwrap();
4169
4170            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4171            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4172            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4173
4174            fs.close().await.expect("close failed");
4175        }
4176    }
4177
4178    async fn get_modes(
4179        obj: &DataObjectHandle<ObjectStore>,
4180        mut search_range: Range<u64>,
4181    ) -> Vec<(Range<u64>, ExtentMode)> {
4182        let mut modes = Vec::new();
4183        let store = obj.store();
4184        let tree = store.tree();
4185        let layer_set = tree.layer_set();
4186        let mut merger = layer_set.merger();
4187        let mut iter = merger
4188            .query(Query::FullRange(&ObjectKey::attribute(
4189                obj.object_id(),
4190                0,
4191                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4192            )))
4193            .await
4194            .unwrap();
4195        loop {
4196            match iter.get() {
4197                Some(ItemRef {
4198                    key:
4199                        ObjectKey {
4200                            object_id,
4201                            data:
4202                                ObjectKeyData::Attribute(
4203                                    attribute_id,
4204                                    AttributeKey::Extent(ExtentKey { range }),
4205                                ),
4206                        },
4207                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4208                    ..
4209                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4210                    if search_range.end <= range.start {
4211                        break;
4212                    }
4213                    let found_range = std::cmp::max(search_range.start, range.start)
4214                        ..std::cmp::min(search_range.end, range.end);
4215                    search_range.start = found_range.end;
4216                    modes.push((found_range, mode.clone()));
4217                    if search_range.start == search_range.end {
4218                        break;
4219                    }
4220                    iter.advance().await.unwrap();
4221                }
4222                x => panic!("looking for extent record, found this {:?}", x),
4223            }
4224        }
4225        modes
4226    }
4227
4228    async fn assert_all_overwrite(
4229        obj: &DataObjectHandle<ObjectStore>,
4230        mut search_range: Range<u64>,
4231    ) {
4232        let modes = get_modes(obj, search_range.clone()).await;
4233        for mode in modes {
4234            assert_eq!(
4235                mode.0.start, search_range.start,
4236                "missing mode in range {}..{}",
4237                search_range.start, mode.0.start
4238            );
4239            match mode.1 {
4240                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4241                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4242            }
4243            assert!(
4244                mode.0.end <= search_range.end,
4245                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4246                search_range,
4247                mode,
4248            );
4249            search_range.start = mode.0.end;
4250        }
4251        assert_eq!(
4252            search_range.start, search_range.end,
4253            "missing mode in range {:?}",
4254            search_range
4255        );
4256    }
4257
4258    #[fuchsia::test(threads = 10)]
4259    async fn test_multi_overwrite() {
4260        #[derive(Debug)]
4261        struct Case {
4262            pre_writes: Vec<Range<usize>>,
4263            allocate_ranges: Vec<Range<u64>>,
4264            overwrites: Vec<Vec<Range<u64>>>,
4265        }
4266        let cases = [
4267            Case {
4268                pre_writes: Vec::new(),
4269                allocate_ranges: vec![1..3],
4270                overwrites: vec![vec![1..3]],
4271            },
4272            Case {
4273                pre_writes: Vec::new(),
4274                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4275                overwrites: vec![vec![0..4]],
4276            },
4277            Case {
4278                pre_writes: Vec::new(),
4279                allocate_ranges: vec![0..4],
4280                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4281            },
4282            Case {
4283                pre_writes: Vec::new(),
4284                allocate_ranges: vec![0..4],
4285                overwrites: vec![vec![3..4]],
4286            },
4287            Case {
4288                pre_writes: Vec::new(),
4289                allocate_ranges: vec![0..4],
4290                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4291            },
4292            Case {
4293                pre_writes: Vec::new(),
4294                allocate_ranges: vec![1..2, 5..6, 7..8],
4295                overwrites: vec![vec![5..6]],
4296            },
4297            Case {
4298                pre_writes: Vec::new(),
4299                allocate_ranges: vec![1..3],
4300                overwrites: vec![
4301                    vec![1..3],
4302                    vec![1..3],
4303                    vec![1..3],
4304                    vec![1..3],
4305                    vec![1..3],
4306                    vec![1..3],
4307                    vec![1..3],
4308                    vec![1..3],
4309                ],
4310            },
4311            Case {
4312                pre_writes: Vec::new(),
4313                allocate_ranges: vec![0..5],
4314                overwrites: vec![
4315                    vec![1..3],
4316                    vec![1..3],
4317                    vec![1..3],
4318                    vec![1..3],
4319                    vec![1..3],
4320                    vec![1..3],
4321                    vec![1..3],
4322                    vec![1..3],
4323                ],
4324            },
4325            Case {
4326                pre_writes: Vec::new(),
4327                allocate_ranges: vec![0..5],
4328                overwrites: vec![vec![0..2, 2..4, 4..5]],
4329            },
4330            Case {
4331                pre_writes: Vec::new(),
4332                allocate_ranges: vec![0..5, 5..10],
4333                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4334            },
4335            Case {
4336                pre_writes: Vec::new(),
4337                allocate_ranges: vec![0..4, 6..10],
4338                overwrites: vec![vec![2..3, 7..9]],
4339            },
4340            Case {
4341                pre_writes: Vec::new(),
4342                allocate_ranges: vec![0..10],
4343                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4344            },
4345            Case {
4346                pre_writes: Vec::new(),
4347                allocate_ranges: vec![0..10],
4348                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4349            },
4350            Case {
4351                pre_writes: vec![1..3],
4352                allocate_ranges: vec![1..3],
4353                overwrites: vec![vec![1..3]],
4354            },
4355            Case {
4356                pre_writes: vec![1..3],
4357                allocate_ranges: vec![4..6],
4358                overwrites: vec![vec![5..6]],
4359            },
4360            Case {
4361                pre_writes: vec![1..3],
4362                allocate_ranges: vec![0..4],
4363                overwrites: vec![vec![0..4]],
4364            },
4365            Case {
4366                pre_writes: vec![1..3],
4367                allocate_ranges: vec![2..4],
4368                overwrites: vec![vec![2..4]],
4369            },
4370            Case {
4371                pre_writes: vec![3..5],
4372                allocate_ranges: vec![1..3, 6..7],
4373                overwrites: vec![vec![1..3, 6..7]],
4374            },
4375            Case {
4376                pre_writes: vec![1..3, 5..7, 8..9],
4377                allocate_ranges: vec![0..5],
4378                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4379            },
4380            Case {
4381                pre_writes: Vec::new(),
4382                allocate_ranges: vec![0..10, 4..6],
4383                overwrites: Vec::new(),
4384            },
4385            Case {
4386                pre_writes: Vec::new(),
4387                allocate_ranges: vec![3..8, 5..10],
4388                overwrites: Vec::new(),
4389            },
4390            Case {
4391                pre_writes: Vec::new(),
4392                allocate_ranges: vec![5..10, 3..8],
4393                overwrites: Vec::new(),
4394            },
4395        ];
4396
4397        for (i, case) in cases.into_iter().enumerate() {
4398            log::info!("running case {} - {:?}", i, case);
4399            let (fs, object) = test_filesystem_and_empty_object().await;
4400            let block_size = fs.block_size();
4401            let file_size = block_size * 10;
4402            object.truncate(file_size).await.unwrap();
4403
4404            for write in case.pre_writes {
4405                let write_len = (write.end - write.start) * block_size as usize;
4406                let mut write_buf = object.allocate_buffer(write_len).await;
4407                write_buf.as_mut_slice().fill(0xff);
4408                assert_eq!(
4409                    object
4410                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4411                        .await
4412                        .unwrap(),
4413                    file_size
4414                );
4415            }
4416
4417            for allocate_range in &case.allocate_ranges {
4418                object
4419                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4420                    .await
4421                    .unwrap();
4422            }
4423
4424            for allocate_range in case.allocate_ranges {
4425                assert_all_overwrite(
4426                    &object,
4427                    allocate_range.start * block_size..allocate_range.end * block_size,
4428                )
4429                .await;
4430            }
4431
4432            for overwrite in case.overwrites {
4433                let mut write_len = 0;
4434                let overwrite = overwrite
4435                    .into_iter()
4436                    .map(|r| {
4437                        write_len += (r.end - r.start) * block_size;
4438                        r.start * block_size..r.end * block_size
4439                    })
4440                    .collect::<Vec<_>>();
4441                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4442                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4443                write_buf.as_mut_slice().copy_from_slice(&data);
4444
4445                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4446                assert_eq!(
4447                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4448                    expected_buf.len()
4449                );
4450                let expected_buf_slice = expected_buf.as_mut_slice();
4451                let mut data_slice = data.as_slice();
4452                for r in &overwrite {
4453                    let len = r.length().unwrap() as usize;
4454                    let (copy_from, rest) = data_slice.split_at(len);
4455                    expected_buf_slice[r.start as usize..r.end as usize]
4456                        .copy_from_slice(&copy_from);
4457                    data_slice = rest;
4458                }
4459
4460                let mut transaction = object.new_transaction().await.unwrap();
4461                object
4462                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4463                    .await
4464                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4465                // Double check the emitted checksums. We should have one u64 checksum for every
4466                // block we wrote to disk.
4467                let mut checksummed_range_length = 0;
4468                let mut num_checksums = 0;
4469                for (device_range, checksums, _) in transaction.checksums() {
4470                    let range_len = device_range.end - device_range.start;
4471                    let checksums_len = checksums.len() as u64;
4472                    assert_eq!(range_len / checksums_len, block_size);
4473                    checksummed_range_length += range_len;
4474                    num_checksums += checksums_len;
4475                }
4476                assert_eq!(checksummed_range_length, write_len);
4477                assert_eq!(num_checksums, write_len / block_size);
4478                transaction.commit().await.unwrap();
4479
4480                let mut buf = object.allocate_buffer(file_size as usize).await;
4481                assert_eq!(
4482                    object.read(0, buf.as_mut()).await.unwrap(),
4483                    buf.len(),
4484                    "failed length check on case {}",
4485                    i,
4486                );
4487                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4488            }
4489
4490            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4491            fs.close().await.expect("close failed");
4492        }
4493    }
4494
4495    #[fuchsia::test(threads = 10)]
4496    async fn test_multi_overwrite_mode_updates() {
4497        let (fs, object) = test_filesystem_and_empty_object().await;
4498        let block_size = fs.block_size();
4499        let file_size = block_size * 10;
4500        object.truncate(file_size).await.unwrap();
4501
4502        let mut expected_bitmap = BitVec::from_elem(10, false);
4503
4504        object.allocate(0..10 * block_size).await.unwrap();
4505        assert_eq!(
4506            get_modes(&object, 0..10 * block_size).await,
4507            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4508        );
4509
4510        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4511        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4512        write_buf.as_mut_slice().copy_from_slice(&data);
4513        let mut transaction = object.new_transaction().await.unwrap();
4514        object
4515            .multi_overwrite(
4516                &mut transaction,
4517                0,
4518                &[2 * block_size..4 * block_size],
4519                write_buf.as_mut(),
4520            )
4521            .await
4522            .unwrap();
4523        transaction.commit().await.unwrap();
4524
4525        expected_bitmap.set(2, true);
4526        expected_bitmap.set(3, true);
4527        assert_eq!(
4528            get_modes(&object, 0..10 * block_size).await,
4529            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4530        );
4531
4532        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4533        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4534        write_buf.as_mut_slice().copy_from_slice(&data);
4535        let mut transaction = object.new_transaction().await.unwrap();
4536        object
4537            .multi_overwrite(
4538                &mut transaction,
4539                0,
4540                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4541                write_buf.as_mut(),
4542            )
4543            .await
4544            .unwrap();
4545        transaction.commit().await.unwrap();
4546
4547        expected_bitmap.set(4, true);
4548        expected_bitmap.set(6, true);
4549        assert_eq!(
4550            get_modes(&object, 0..10 * block_size).await,
4551            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4552        );
4553
4554        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4555        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4556        write_buf.as_mut_slice().copy_from_slice(&data);
4557        let mut transaction = object.new_transaction().await.unwrap();
4558        object
4559            .multi_overwrite(
4560                &mut transaction,
4561                0,
4562                &[
4563                    0..2 * block_size,
4564                    5 * block_size..6 * block_size,
4565                    7 * block_size..10 * block_size,
4566                ],
4567                write_buf.as_mut(),
4568            )
4569            .await
4570            .unwrap();
4571        transaction.commit().await.unwrap();
4572
4573        assert_eq!(
4574            get_modes(&object, 0..10 * block_size).await,
4575            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4576        );
4577
4578        fs.close().await.expect("close failed");
4579    }
4580
4581    #[fuchsia::test(threads = 10)]
4582    async fn test_check_unwritten_zero() {
4583        let device = DeviceHolder::new(FakeDevice::new(256 * 1024, TEST_DEVICE_BLOCK_SIZE));
4584        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4585        let object = create_object_with_key(fs.clone(), Some(&new_insecure_crypt()), false).await;
4586        let block_size = fs.block_size();
4587
4588        // Set up a file with eight blocks to look like this:
4589        // | None | COW | COW | None | Overwrite(unwritten) | Overwrite(written) | None |
4590        let file_size = block_size * 7;
4591        object.truncate(file_size).await.unwrap();
4592        assert!(object.check_unwritten_zero(0..file_size).await.unwrap());
4593
4594        let mut buffer = object.allocate_buffer(block_size as usize).await;
4595        buffer.as_mut_slice().fill(1);
4596        object.write_or_append(Some(block_size), buffer.as_ref()).await.expect("write failed");
4597        object.write_or_append(Some(block_size * 2), buffer.as_ref()).await.expect("write failed");
4598
4599        object.allocate((block_size * 4)..(block_size * 6)).await.expect("Allocate failed");
4600        let mut transaction = fs
4601            .clone()
4602            .new_transaction(
4603                lock_keys![LockKey::object(object.store().store_object_id(), object.object_id(),)],
4604                Options::default(),
4605            )
4606            .await
4607            .expect("new_transaction failed");
4608        object
4609            .multi_overwrite(
4610                &mut transaction,
4611                DEFAULT_DATA_ATTRIBUTE_ID,
4612                &vec![(block_size * 5)..(block_size * 6)],
4613                buffer.as_mut(),
4614            )
4615            .await
4616            .expect("Multi overwrite");
4617        transaction.commit().await.expect("Committing overwrite");
4618
4619        // Anything touching the COW ranges should fail.
4620        assert!(!object.check_unwritten_zero(0..(block_size * 2)).await.unwrap());
4621        assert!(!object.check_unwritten_zero(block_size..(block_size * 3)).await.unwrap());
4622        assert!(!object.check_unwritten_zero((block_size * 2)..(block_size * 4)).await.unwrap());
4623
4624        // This should be fine, as the OverwritePartial should only touch the unwritten block.
4625        assert!(object.check_unwritten_zero((block_size * 3)..(block_size * 5)).await.unwrap());
4626
4627        // These should touch the written overwrite block and fail.
4628        assert!(!object.check_unwritten_zero((block_size * 4)..(block_size * 6)).await.unwrap());
4629        assert!(!object.check_unwritten_zero((block_size * 5)..(block_size * 7)).await.unwrap());
4630
4631        fs.close().await.expect("close failed");
4632    }
4633}