fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: u64,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: u64,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> u64 {
194        self.attribute_id
195    }
196
197    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
198        &self.overwrite_ranges
199    }
200
201    pub fn is_verified_file(&self) -> bool {
202        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
203    }
204
205    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
206    /// If another caller has already started but not completed `enabled_verity`, returns
207    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
208    /// FxfsError::AlreadyExists.
209    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
210        let mut fsverity_guard = self.fsverity_state.lock();
211        match *fsverity_guard {
212            FsverityState::None => {
213                *fsverity_guard = FsverityState::Started;
214                Ok(())
215            }
216            FsverityState::Started | FsverityState::Pending(_) => {
217                Err(anyhow!(FxfsError::Unavailable))
218            }
219            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
220        }
221    }
222
223    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
224    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
225    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
226        let mut fsverity_guard = self.fsverity_state.lock();
227        assert!(matches!(*fsverity_guard, FsverityState::Started));
228        *fsverity_guard = FsverityState::Pending(descriptor);
229    }
230
231    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
232    /// not `FsverityState::Pending(_)`.
233    pub fn finalize_fsverity_state(&self) {
234        let mut fsverity_state_guard = self.fsverity_state.lock();
235        let mut_fsverity_state = fsverity_state_guard.deref_mut();
236        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
237        match fsverity_state {
238            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
239            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
240            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
241            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
242        }
243        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
244        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
245        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
246        // converting them back to sparse regions.
247        self.overwrite_ranges.clear();
248    }
249
250    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
251    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
252    /// verified against the root digest here, and will return an error if the tree is not correct.
253    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
254        let (metadata, hasher) = match descriptor {
255            FsverityMetadata::Internal(root_digest, salt) => {
256                let merkle_tree = self
257                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
258                    .await?
259                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
260                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
261                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
262                (metadata, hasher)
263            }
264            FsverityMetadata::F2fs(verity_range) => {
265                let expected_length = verity_range.length()? as usize;
266                let mut buffer = self
267                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
268                    .await;
269                ensure!(
270                    expected_length
271                        == self
272                            .handle
273                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
274                            .await?,
275                    FxfsError::Inconsistent
276                );
277                FsverityStateInner::from_bytes(
278                    buffer.as_slice()[0..expected_length].into(),
279                    self.block_size() as usize,
280                )?
281            }
282        };
283        // Validate the merkle tree data against the root before applying it.
284        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
285        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
286        let mut builder = MerkleTreeBuilder::new(hasher);
287        for leaf in leaf_chunks {
288            builder.push_data_hash(leaf.to_vec());
289        }
290        let tree = builder.finish();
291        let root_hash = match &metadata.root_digest {
292            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
293            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
294        };
295
296        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
297
298        let mut fsverity_guard = self.fsverity_state.lock();
299        assert!(matches!(*fsverity_guard, FsverityState::None));
300        *fsverity_guard = FsverityState::Some(metadata);
301
302        Ok(())
303    }
304
305    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
306    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
307    /// block-aligned. Fails on non fsverity-enabled files.
308    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
309        let block_size = self.block_size() as usize;
310        assert!(offset % block_size == 0);
311        let fsverity_state = self.fsverity_state.lock();
312        match &*fsverity_state {
313            FsverityState::None => {
314                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
315            }
316            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
317                "Enable verity has not yet completed, fsverity state: {:?}",
318                &*fsverity_state
319            )),
320            FsverityState::Some(metadata) => {
321                let hasher = metadata.get_hasher_for_block_size(block_size);
322                let leaf_nodes: Vec<&[u8]> =
323                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
324                fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
325                // TODO(b/318880297): Consider parallelizing computation.
326                for b in buffer.chunks(block_size) {
327                    ensure!(
328                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
329                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
330                    );
331                    offset += block_size;
332                }
333                Ok(())
334            }
335        }
336    }
337
338    /// Extend the file with the given extent.  The only use case for this right now is for files
339    /// that must exist at certain offsets on the device, such as super-blocks.
340    pub async fn extend<'a>(
341        &'a self,
342        transaction: &mut Transaction<'a>,
343        device_range: Range<u64>,
344    ) -> Result<(), Error> {
345        let old_end =
346            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
347        let new_size = old_end + device_range.end - device_range.start;
348        self.store().allocator().mark_allocated(
349            transaction,
350            self.store().store_object_id(),
351            device_range.clone(),
352        )?;
353        self.txn_update_size(transaction, new_size, None).await?;
354        let key_id = self.get_key(None).await?.0;
355        transaction.add(
356            self.store().store_object_id,
357            Mutation::merge_object(
358                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
359                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
360            ),
361        );
362        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
363    }
364
365    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
366    // the data from `buf`.
367    async fn align_buffer(
368        &self,
369        offset: u64,
370        buf: BufferRef<'_>,
371    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
372        self.handle.align_buffer(self.attribute_id(), offset, buf).await
373    }
374
375    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
376    // data will be encrypted if necessary.
377    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
378    // the buffer in-place rather than copying to another buffer if the write is already aligned.
379    async fn write_at(
380        &self,
381        offset: u64,
382        buf: MutableBufferRef<'_>,
383        device_offset: u64,
384    ) -> Result<MaybeChecksums, Error> {
385        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
386    }
387
388    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
389    pub async fn zero(
390        &self,
391        transaction: &mut Transaction<'_>,
392        range: Range<u64>,
393    ) -> Result<(), Error> {
394        self.handle.zero(transaction, self.attribute_id(), range).await
395    }
396
397    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
398    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
399    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
400    pub fn get_descriptor(&self) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
401        let fsverity_state = self.fsverity_state.lock();
402        match &*fsverity_state {
403            FsverityState::None => Ok(None),
404            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
405                "Enable verity has not yet completed, fsverity state: {:?}",
406                &*fsverity_state
407            )),
408            FsverityState::Some(metadata) => {
409                let (options, root_hash) = match &metadata.root_digest {
410                    RootDigest::Sha256(root_hash) => (
411                        fio::VerificationOptions {
412                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
413                            salt: Some(metadata.salt.clone()),
414                            ..Default::default()
415                        },
416                        root_hash.to_vec(),
417                    ),
418                    RootDigest::Sha512(root_hash) => (
419                        fio::VerificationOptions {
420                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
421                            salt: Some(metadata.salt.clone()),
422                            ..Default::default()
423                        },
424                        root_hash.clone(),
425                    ),
426                };
427                Ok(Some((options, root_hash)))
428            }
429        }
430    }
431
432    async fn build_verity_tree(
433        &self,
434        hasher: FsVerityHasher,
435        hash_alg: fio::HashAlgorithm,
436        salt: &[u8],
437    ) -> Result<(MerkleTree, Vec<u8>), Error> {
438        let hash_len = hasher.hash_size();
439        let mut builder = MerkleTreeBuilder::new(hasher);
440        let mut offset = 0;
441        let size = self.get_size();
442        // TODO(b/314836822): Consider further tuning the buffer size to optimize
443        // performance. Experimentally, most verity-enabled files are <256K.
444        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
445        while offset < size {
446            // TODO(b/314842875): Consider optimizations for sparse files.
447            let read = self.read(offset, buf.as_mut()).await? as u64;
448            assert!(offset + read <= size);
449            builder.write(&buf.as_slice()[0..read as usize]);
450            offset += read;
451        }
452        let tree = builder.finish();
453        // This will include a block for the root layer, which will be used to house the descriptor.
454        let tree_data_len = tree
455            .as_ref()
456            .iter()
457            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
458            .sum();
459        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
460        // Iterating from the top layers down to the leaves.
461        for layer in tree.as_ref().iter().rev() {
462            // Skip the root layer.
463            if layer.len() <= 1 {
464                continue;
465            }
466            merkle_tree_data.extend(layer.iter().flatten());
467            // Pad to the end of the block.
468            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
469            merkle_tree_data.resize(padded_size, 0);
470        }
471
472        // Zero the last block, then write the descriptor to the start of it.
473        let descriptor_offset = merkle_tree_data.len();
474        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
475        let descriptor = FsVerityDescriptorRaw::new(
476            hash_alg,
477            self.block_size(),
478            self.get_size(),
479            tree.root(),
480            salt,
481        )?;
482        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
483
484        Ok((tree, merkle_tree_data))
485    }
486
487    /// Reads the data attribute and computes a merkle tree from the data. The values of the
488    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
489    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
490    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
491    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
492    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
493    #[trace]
494    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
495        self.set_fsverity_state_started()?;
496        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
497        // the graveyard should process the tombstone before we start rewriting the attribute.
498        if let Some(_) = self
499            .store()
500            .tree()
501            .find(&ObjectKey::graveyard_attribute_entry(
502                self.store().graveyard_directory_object_id(),
503                self.object_id(),
504                FSVERITY_MERKLE_ATTRIBUTE_ID,
505            ))
506            .await?
507        {
508            self.store().filesystem().graveyard().flush().await;
509        }
510        let mut transaction = self.new_transaction().await?;
511        let hash_alg =
512            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
513        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
514        let (root_digest, merkle_tree) = match hash_alg {
515            fio::HashAlgorithm::Sha256 => {
516                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
517                    salt.clone(),
518                    self.block_size() as usize,
519                ));
520                let (tree, merkle_tree_data) =
521                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
522                let root: [u8; 32] = tree.root().try_into().unwrap();
523                (RootDigest::Sha256(root), merkle_tree_data)
524            }
525            fio::HashAlgorithm::Sha512 => {
526                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
527                    salt.clone(),
528                    self.block_size() as usize,
529                ));
530                let (tree, merkle_tree_data) =
531                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
532                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
533            }
534            _ => {
535                bail!(
536                    anyhow!(FxfsError::NotSupported)
537                        .context(format!("hash algorithm not supported"))
538                );
539            }
540        };
541        // TODO(b/314194485): Eventually want streaming writes.
542        // The merkle tree attribute should not require trimming because it should not
543        // exist.
544        self.handle
545            .write_new_attr_in_batches(
546                &mut transaction,
547                FSVERITY_MERKLE_ATTRIBUTE_ID,
548                &merkle_tree,
549                WRITE_ATTR_BATCH_SIZE,
550            )
551            .await?;
552        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
553            self.store().remove_attribute_from_graveyard(
554                &mut transaction,
555                self.object_id(),
556                FSVERITY_MERKLE_ATTRIBUTE_ID,
557            );
558        };
559        let descriptor_decoded =
560            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
561        let descriptor = FsverityStateInner {
562            root_digest: root_digest.clone(),
563            salt: salt.clone(),
564            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
565        };
566        self.set_fsverity_state_pending(descriptor);
567        transaction.add_with_object(
568            self.store().store_object_id(),
569            Mutation::replace_or_insert_object(
570                ObjectKey::attribute(
571                    self.object_id(),
572                    DEFAULT_DATA_ATTRIBUTE_ID,
573                    AttributeKey::Attribute,
574                ),
575                ObjectValue::verified_attribute(
576                    self.get_size(),
577                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
578                ),
579            ),
580            AssocObj::Borrowed(self),
581        );
582        transaction.commit().await?;
583        Ok(())
584    }
585
586    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
587    /// range is beyond the end of the file, the file size is updated.
588    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
589        debug_assert!(range.start < range.end);
590
591        // It's not required that callers of allocate use block aligned ranges, but we need to make
592        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
593        // what was asked for for block alignment purposes. We just need to make sure that the size
594        // of the file is still the non-block-aligned end of the range if the size was changed.
595        let mut new_range = range.clone();
596        new_range.start = round_down(new_range.start, self.block_size());
597        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
598        // required error code when the requested range is larger than the file size.
599        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
600
601        let mut transaction = self.new_transaction().await?;
602        let mut to_allocate = Vec::new();
603        let mut to_switch = Vec::new();
604        let key_id = self.get_key(None).await?.0;
605
606        {
607            let tree = &self.store().tree;
608            let layer_set = tree.layer_set();
609            let offset_key = ObjectKey::attribute(
610                self.object_id(),
611                self.attribute_id(),
612                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
613            );
614            let mut merger = layer_set.merger();
615            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
616
617            loop {
618                match iter.get() {
619                    Some(ItemRef {
620                        key:
621                            ObjectKey {
622                                object_id,
623                                data:
624                                    ObjectKeyData::Attribute(
625                                        attribute_id,
626                                        AttributeKey::Extent(extent_key),
627                                    ),
628                            },
629                        value: ObjectValue::Extent(extent_value),
630                        ..
631                    }) if *object_id == self.object_id()
632                        && *attribute_id == self.attribute_id() =>
633                    {
634                        // If the start of this extent is beyond the end of the range we are
635                        // allocating, we don't have any more work to do.
636                        if new_range.end <= extent_key.range.start {
637                            break;
638                        }
639                        // Add any prefix we might need to allocate.
640                        if new_range.start < extent_key.range.start {
641                            to_allocate.push(new_range.start..extent_key.range.start);
642                            new_range.start = extent_key.range.start;
643                        }
644                        let device_offset = match extent_value {
645                            ExtentValue::None => {
646                                // If the extent value is None, it indicates a deleted extent. In
647                                // that case, we just skip it entirely. By keeping the new_range
648                                // where it is, this section will get included in the new
649                                // allocations.
650                                iter.advance().await?;
651                                continue;
652                            }
653                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
654                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
655                                // If this extent is already in overwrite mode, we can skip it.
656                                if extent_key.range.end < new_range.end {
657                                    new_range.start = extent_key.range.end;
658                                    iter.advance().await?;
659                                    continue;
660                                } else {
661                                    new_range.start = new_range.end;
662                                    break;
663                                }
664                            }
665                            ExtentValue::Some { device_offset, .. } => *device_offset,
666                        };
667
668                        // Figure out how we have to break up the ranges.
669                        let device_offset =
670                            device_offset + (new_range.start - extent_key.range.start);
671                        if extent_key.range.end < new_range.end {
672                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
673                            new_range.start = extent_key.range.end;
674                        } else {
675                            to_switch.push((new_range.start..new_range.end, device_offset));
676                            new_range.start = new_range.end;
677                            break;
678                        }
679                    }
680                    // The records are sorted so if we find something that isn't an extent or
681                    // doesn't match the object id then there are no more extent records for this
682                    // object.
683                    _ => break,
684                }
685                iter.advance().await?;
686            }
687        }
688
689        if new_range.start < new_range.end {
690            to_allocate.push(new_range.clone());
691        }
692
693        // We can update the size in the first transaction because even if subsequent transactions
694        // don't get replayed, the data between the current and new end of the file will be zero
695        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
696        // in the first transaction, overwrite extents may be written past the end of the file
697        // which is an fsck error.
698        //
699        // The potential new size needs to be the non-block-aligned range end - we round up to the
700        // nearest block size for the actual allocation, but shouldn't do that for the file size.
701        let new_size = std::cmp::max(range.end, self.get_size());
702        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
703        // first transaction, in case we split transactions. This makes it okay to only replay the
704        // first transaction if power loss occurs - the file will be in an unusual state, but not
705        // an invalid one, if only part of the allocate goes through.
706        transaction.add_with_object(
707            self.store().store_object_id(),
708            Mutation::replace_or_insert_object(
709                ObjectKey::attribute(
710                    self.object_id(),
711                    self.attribute_id(),
712                    AttributeKey::Attribute,
713                ),
714                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
715            ),
716            AssocObj::Borrowed(self),
717        );
718
719        // The maximum number of mutations we are going to allow per transaction in allocate. This
720        // is probably quite a bit lower than the actual limit, but it should be large enough to
721        // handle most non-edge-case versions of allocate without splitting the transaction.
722        const MAX_TRANSACTION_SIZE: usize = 256;
723        for (switch_range, device_offset) in to_switch {
724            transaction.add_with_object(
725                self.store().store_object_id(),
726                Mutation::merge_object(
727                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
728                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
729                        device_offset,
730                        key_id,
731                    )),
732                ),
733                AssocObj::Borrowed(self),
734            );
735            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
736                transaction.commit_and_continue().await?;
737            }
738        }
739
740        let mut allocated = 0;
741        let allocator = self.store().allocator();
742        for mut allocate_range in to_allocate {
743            while allocate_range.start < allocate_range.end {
744                let device_range = allocator
745                    .allocate(
746                        &mut transaction,
747                        self.store().store_object_id(),
748                        allocate_range.end - allocate_range.start,
749                    )
750                    .await
751                    .context("allocation failed")?;
752                let device_range_len = device_range.end - device_range.start;
753
754                transaction.add_with_object(
755                    self.store().store_object_id(),
756                    Mutation::merge_object(
757                        ObjectKey::extent(
758                            self.object_id(),
759                            self.attribute_id(),
760                            allocate_range.start..allocate_range.start + device_range_len,
761                        ),
762                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
763                            device_range.start,
764                            (device_range_len / self.block_size()) as usize,
765                            key_id,
766                        )),
767                    ),
768                    AssocObj::Borrowed(self),
769                );
770
771                allocate_range.start += device_range_len;
772                allocated += device_range_len;
773
774                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
775                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
776                    transaction.commit_and_continue().await?;
777                    allocated = 0;
778                }
779            }
780        }
781
782        self.update_allocated_size(&mut transaction, allocated, 0).await?;
783        transaction.commit().await?;
784
785        Ok(())
786    }
787
788    /// Return information on a contiguous set of extents that has the same allocation status,
789    /// starting from `start_offset`. The information returned is if this set of extents are marked
790    /// allocated/not allocated and also the size of this set (in bytes). This is used when
791    /// querying slices for volumes.
792    /// This function expects `start_offset` to be aligned to block size
793    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
794        let block_size = self.block_size();
795        assert_eq!(start_offset % block_size, 0);
796
797        if start_offset > self.get_size() {
798            bail!(FxfsError::OutOfRange)
799        }
800
801        if start_offset == self.get_size() {
802            return Ok((false, 0));
803        }
804
805        let tree = &self.store().tree;
806        let layer_set = tree.layer_set();
807        let offset_key = ObjectKey::attribute(
808            self.object_id(),
809            self.attribute_id(),
810            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
811        );
812        let mut merger = layer_set.merger();
813        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
814
815        let mut allocated = None;
816        let mut end = start_offset;
817
818        loop {
819            // Iterate through the extents, each time setting `end` as the end of the previous
820            // extent
821            match iter.get() {
822                Some(ItemRef {
823                    key:
824                        ObjectKey {
825                            object_id,
826                            data:
827                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
828                        },
829                    value: ObjectValue::Extent(extent_value),
830                    ..
831                }) => {
832                    // Equivalent of getting no extents back
833                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
834                        if allocated == Some(false) || allocated.is_none() {
835                            end = self.get_size();
836                            allocated = Some(false);
837                        }
838                        break;
839                    }
840                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
841                    if extent_key.range.start > end {
842                        // If a previous extent has already been visited and we are tracking an
843                        // allocated set, we are only interested in an extent where the range of the
844                        // current extent follows immediately after the previous one.
845                        if allocated == Some(true) {
846                            break;
847                        } else {
848                            // The gap between the previous `end` and this extent is not allocated
849                            end = extent_key.range.start;
850                            allocated = Some(false);
851                            // Continue this iteration, except now the `end` is set to the end of
852                            // the "previous" extent which is this gap between the start_offset
853                            // and the current extent
854                        }
855                    }
856
857                    // We can assume that from here, the `end` points to the end of a previous
858                    // extent.
859                    match extent_value {
860                        // The current extent has been allocated
861                        ExtentValue::Some { .. } => {
862                            // Stop searching if previous extent was marked deleted
863                            if allocated == Some(false) {
864                                break;
865                            }
866                            allocated = Some(true);
867                        }
868                        // This extent has been marked deleted
869                        ExtentValue::None => {
870                            // Stop searching if previous extent was marked allocated
871                            if allocated == Some(true) {
872                                break;
873                            }
874                            allocated = Some(false);
875                        }
876                    }
877                    end = extent_key.range.end;
878                }
879                // This occurs when there are no extents left
880                None => {
881                    if allocated == Some(false) || allocated.is_none() {
882                        end = self.get_size();
883                        allocated = Some(false);
884                    }
885                    // Otherwise, we were monitoring extents that were allocated, so just exit.
886                    break;
887                }
888                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
889                Some(_) => {}
890            }
891            iter.advance().await?;
892        }
893
894        Ok((allocated.unwrap(), end - start_offset))
895    }
896
897    pub async fn txn_write<'a>(
898        &'a self,
899        transaction: &mut Transaction<'a>,
900        offset: u64,
901        buf: BufferRef<'_>,
902    ) -> Result<(), Error> {
903        if buf.is_empty() {
904            return Ok(());
905        }
906        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
907        self.multi_write(
908            transaction,
909            self.attribute_id(),
910            std::slice::from_ref(&aligned),
911            transfer_buf.as_mut(),
912        )
913        .await?;
914        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
915            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
916        }
917        Ok(())
918    }
919
920    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
921    // if encryption takes place.  The ranges must all be aligned and no change to content size is
922    // applied; the caller is responsible for updating size if required.
923    pub async fn multi_write<'a>(
924        &'a self,
925        transaction: &mut Transaction<'a>,
926        attribute_id: u64,
927        ranges: &[Range<u64>],
928        buf: MutableBufferRef<'_>,
929    ) -> Result<(), Error> {
930        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
931    }
932
933    // `buf` is mutable as an optimization, since the write may require encryption, we can
934    // encrypt the buffer in-place rather than copying to another buffer if the write is
935    // already aligned.
936    //
937    // Note: in the event of power failure during an overwrite() call, it is possible that
938    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
939    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
940    pub async fn overwrite(
941        &self,
942        mut offset: u64,
943        mut buf: MutableBufferRef<'_>,
944        options: OverwriteOptions,
945    ) -> Result<(), Error> {
946        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
947        let end = offset + buf.len() as u64;
948
949        let key_id = self.get_key(None).await?.0;
950
951        // The transaction only ends up being used if allow_allocations is true
952        let mut transaction =
953            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
954
955        // We build up a list of writes to perform later
956        let writes = FuturesUnordered::new();
957
958        if options.barrier_on_first_write {
959            self.store().device.barrier();
960        }
961
962        // We create a new scope here, so that the merger iterator will get dropped before we try to
963        // commit our transaction. Otherwise the transaction commit would block.
964        {
965            let store = self.store();
966            let store_object_id = store.store_object_id;
967            let allocator = store.allocator();
968            let tree = &store.tree;
969            let layer_set = tree.layer_set();
970            let mut merger = layer_set.merger();
971            let mut iter = merger
972                .query(Query::FullRange(&ObjectKey::attribute(
973                    self.object_id(),
974                    self.attribute_id(),
975                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
976                )))
977                .await?;
978            let block_size = self.block_size();
979
980            loop {
981                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
982                    Some(ItemRef {
983                        key:
984                            ObjectKey {
985                                object_id,
986                                data:
987                                    ObjectKeyData::Attribute(
988                                        attribute_id,
989                                        AttributeKey::Extent(ExtentKey { range }),
990                                    ),
991                            },
992                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
993                        ..
994                    }) if *object_id == self.object_id()
995                        && *attribute_id == self.attribute_id()
996                        && range.end == offset =>
997                    {
998                        iter.advance().await?;
999                        continue;
1000                    }
1001                    Some(ItemRef {
1002                        key:
1003                            ObjectKey {
1004                                object_id,
1005                                data:
1006                                    ObjectKeyData::Attribute(
1007                                        attribute_id,
1008                                        AttributeKey::Extent(ExtentKey { range }),
1009                                    ),
1010                            },
1011                        value,
1012                        ..
1013                    }) if *object_id == self.object_id()
1014                        && *attribute_id == self.attribute_id()
1015                        && range.start <= offset =>
1016                    {
1017                        match value {
1018                            ObjectValue::Extent(ExtentValue::Some {
1019                                device_offset,
1020                                mode: ExtentMode::Raw,
1021                                ..
1022                            }) => {
1023                                ensure!(
1024                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1025                                    FxfsError::Inconsistent
1026                                );
1027                                let offset_within_extent = offset - range.start;
1028                                let remaining_length_of_extent = (range
1029                                    .end
1030                                    .checked_sub(offset)
1031                                    .ok_or(FxfsError::Inconsistent)?)
1032                                    as usize;
1033                                // Yields (device_offset, bytes_to_write, should_advance)
1034                                (
1035                                    device_offset + offset_within_extent,
1036                                    min(buf.len(), remaining_length_of_extent),
1037                                    true,
1038                                )
1039                            }
1040                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1041                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1042                                // a new extent without checksums?
1043                                bail!(
1044                                    "extent from ({},{}) which overlaps offset \
1045                                        {} has the wrong extent mode",
1046                                    range.start,
1047                                    range.end,
1048                                    offset
1049                                )
1050                            }
1051                            _ => {
1052                                bail!(
1053                                    "overwrite failed: extent overlapping offset {} has \
1054                                      unexpected ObjectValue",
1055                                    offset
1056                                )
1057                            }
1058                        }
1059                    }
1060                    maybe_item_ref => {
1061                        if let Some(transaction) = transaction.as_mut() {
1062                            assert_eq!(options.allow_allocations, true);
1063                            assert_eq!(offset % self.block_size(), 0);
1064
1065                            // We are going to make a new extent, but let's check if there is an
1066                            // extent after us. If there is an extent after us, then we don't want
1067                            // our new extent to bump into it...
1068                            let mut bytes_to_allocate =
1069                                round_up(buf.len() as u64, self.block_size())
1070                                    .ok_or(FxfsError::TooBig)?;
1071                            if let Some(ItemRef {
1072                                key:
1073                                    ObjectKey {
1074                                        object_id,
1075                                        data:
1076                                            ObjectKeyData::Attribute(
1077                                                attribute_id,
1078                                                AttributeKey::Extent(ExtentKey { range }),
1079                                            ),
1080                                    },
1081                                ..
1082                            }) = maybe_item_ref
1083                            {
1084                                if *object_id == self.object_id()
1085                                    && *attribute_id == self.attribute_id()
1086                                    && offset < range.start
1087                                {
1088                                    let bytes_until_next_extent = range.start - offset;
1089                                    bytes_to_allocate =
1090                                        min(bytes_to_allocate, bytes_until_next_extent);
1091                                }
1092                            }
1093
1094                            let device_range = allocator
1095                                .allocate(transaction, store_object_id, bytes_to_allocate)
1096                                .await?;
1097                            let device_range_len = device_range.end - device_range.start;
1098                            transaction.add(
1099                                store_object_id,
1100                                Mutation::insert_object(
1101                                    ObjectKey::extent(
1102                                        self.object_id(),
1103                                        self.attribute_id(),
1104                                        offset..offset + device_range_len,
1105                                    ),
1106                                    ObjectValue::Extent(ExtentValue::new_raw(
1107                                        device_range.start,
1108                                        key_id,
1109                                    )),
1110                                ),
1111                            );
1112
1113                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1114
1115                            // Yields (device_offset, bytes_to_write, should_advance)
1116                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1117                        } else {
1118                            bail!(
1119                                "no extent overlapping offset {}, \
1120                                and new allocations are not allowed",
1121                                offset
1122                            )
1123                        }
1124                    }
1125                };
1126                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1127                writes.push(self.write_at(offset, current_buf, device_offset));
1128                if remaining_buf.len() == 0 {
1129                    break;
1130                } else {
1131                    buf = remaining_buf;
1132                    offset += bytes_to_write as u64;
1133                    if should_advance {
1134                        iter.advance().await?;
1135                    }
1136                }
1137            }
1138        }
1139
1140        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1141        // The checksums are being ignored here, but we don't need to know them
1142        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1143
1144        if let Some(mut transaction) = transaction {
1145            assert_eq!(options.allow_allocations, true);
1146            if !transaction.is_empty() {
1147                if end > self.get_size() {
1148                    self.grow(&mut transaction, self.get_size(), end).await?;
1149                }
1150                transaction.commit().await?;
1151            }
1152        }
1153
1154        Ok(())
1155    }
1156
1157    // Within a transaction, the size of the object might have changed, so get the size from there
1158    // if it exists, otherwise, fall back on the cached size.
1159    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1160        transaction
1161            .get_object_mutation(
1162                self.store().store_object_id,
1163                ObjectKey::attribute(
1164                    self.object_id(),
1165                    self.attribute_id(),
1166                    AttributeKey::Attribute,
1167                ),
1168            )
1169            .and_then(|m| {
1170                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1171                    Some(size)
1172                } else {
1173                    None
1174                }
1175            })
1176            .unwrap_or_else(|| self.get_size())
1177    }
1178
1179    pub async fn txn_update_size<'a>(
1180        &'a self,
1181        transaction: &mut Transaction<'a>,
1182        new_size: u64,
1183        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1184        // Some it is set to the value, if None it is left unchanged.
1185        update_has_overwrite_extents: Option<bool>,
1186    ) -> Result<(), Error> {
1187        let key =
1188            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1189        let mut mutation = if let Some(mutation) =
1190            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1191        {
1192            mutation.clone()
1193        } else {
1194            ObjectStoreMutation {
1195                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1196                op: Operation::ReplaceOrInsert,
1197            }
1198        };
1199        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1200            *size = new_size;
1201            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1202                *has_overwrite_extents = update_has_overwrite_extents;
1203            }
1204        } else {
1205            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1206        }
1207        transaction.add_with_object(
1208            self.store().store_object_id(),
1209            Mutation::ObjectStore(mutation),
1210            AssocObj::Borrowed(self),
1211        );
1212        Ok(())
1213    }
1214
1215    async fn update_allocated_size(
1216        &self,
1217        transaction: &mut Transaction<'_>,
1218        allocated: u64,
1219        deallocated: u64,
1220    ) -> Result<(), Error> {
1221        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1222    }
1223
1224    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1225        if self
1226            .overwrite_ranges
1227            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1228        {
1229            // This returns true if there were ranges, but this truncate removed them all, which
1230            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1231            Ok(Some(false))
1232        } else {
1233            Ok(None)
1234        }
1235    }
1236
1237    pub async fn shrink<'a>(
1238        &'a self,
1239        transaction: &mut Transaction<'a>,
1240        size: u64,
1241        update_has_overwrite_extents: Option<bool>,
1242    ) -> Result<NeedsTrim, Error> {
1243        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1244        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1245        Ok(needs_trim)
1246    }
1247
1248    pub async fn grow<'a>(
1249        &'a self,
1250        transaction: &mut Transaction<'a>,
1251        old_size: u64,
1252        size: u64,
1253    ) -> Result<(), Error> {
1254        // Before growing the file, we must make sure that a previous trim has completed.
1255        let store = self.store();
1256        while matches!(
1257            store
1258                .trim_some(
1259                    transaction,
1260                    self.object_id(),
1261                    self.attribute_id(),
1262                    TrimMode::FromOffset(old_size)
1263                )
1264                .await?,
1265            TrimResult::Incomplete
1266        ) {
1267            transaction.commit_and_continue().await?;
1268        }
1269        // We might need to zero out the tail of the old last block.
1270        let block_size = self.block_size();
1271        if old_size % block_size != 0 {
1272            let layer_set = store.tree.layer_set();
1273            let mut merger = layer_set.merger();
1274            let aligned_old_size = round_down(old_size, block_size);
1275            let iter = merger
1276                .query(Query::FullRange(&ObjectKey::extent(
1277                    self.object_id(),
1278                    self.attribute_id(),
1279                    aligned_old_size..aligned_old_size + 1,
1280                )))
1281                .await?;
1282            if let Some(ItemRef {
1283                key:
1284                    ObjectKey {
1285                        object_id,
1286                        data:
1287                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1288                    },
1289                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1290                ..
1291            }) = iter.get()
1292            {
1293                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1294                    let device_offset = device_offset
1295                        .checked_add(aligned_old_size - extent_key.range.start)
1296                        .ok_or(FxfsError::Inconsistent)?;
1297                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1298                    let mut buf = self.allocate_buffer(block_size as usize).await;
1299                    // In the case that this extent is in OverwritePartial mode, there is a
1300                    // possibility that the last block is allocated, but not initialized yet, in
1301                    // which case we don't actually need to bother zeroing out the tail. However,
1302                    // it's not strictly incorrect to change uninitialized data, so we skip the
1303                    // check and blindly do it to keep it simpler here.
1304                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1305                        .await?;
1306                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1307                    self.multi_write(
1308                        transaction,
1309                        *attribute_id,
1310                        &[aligned_old_size..aligned_old_size + block_size],
1311                        buf.as_mut(),
1312                    )
1313                    .await?;
1314                }
1315            }
1316        }
1317        self.txn_update_size(transaction, size, None).await?;
1318        Ok(())
1319    }
1320
1321    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1322    /// Returns a set of device ranges (i.e. potentially multiple extents).
1323    ///
1324    /// It may not be possible to preallocate the entire requested range in one request
1325    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1326    /// we can up to some (arbitrary, internal) limit on transaction size.
1327    ///
1328    /// `file_range.start` is modified to point at the end of the logical range
1329    /// that was preallocated such that repeated calls to `preallocate_range` with new
1330    /// transactions can be used to preallocate ranges of any size.
1331    ///
1332    /// Requested range must be a multiple of block size.
1333    pub async fn preallocate_range<'a>(
1334        &'a self,
1335        transaction: &mut Transaction<'a>,
1336        file_range: &mut Range<u64>,
1337    ) -> Result<Vec<Range<u64>>, Error> {
1338        let block_size = self.block_size();
1339        assert!(file_range.is_aligned(block_size));
1340        assert!(!self.handle.is_encrypted());
1341        let mut ranges = Vec::new();
1342        let tree = &self.store().tree;
1343        let layer_set = tree.layer_set();
1344        let mut merger = layer_set.merger();
1345        let mut iter = merger
1346            .query(Query::FullRange(&ObjectKey::attribute(
1347                self.object_id(),
1348                self.attribute_id(),
1349                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1350            )))
1351            .await?;
1352        let mut allocated = 0;
1353        let key_id = self.get_key(None).await?.0;
1354        'outer: while file_range.start < file_range.end {
1355            let allocate_end = loop {
1356                match iter.get() {
1357                    // Case for allocated extents for the same object that overlap with file_range.
1358                    Some(ItemRef {
1359                        key:
1360                            ObjectKey {
1361                                object_id,
1362                                data:
1363                                    ObjectKeyData::Attribute(
1364                                        attribute_id,
1365                                        AttributeKey::Extent(ExtentKey { range }),
1366                                    ),
1367                            },
1368                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1369                        ..
1370                    }) if *object_id == self.object_id()
1371                        && *attribute_id == self.attribute_id()
1372                        && range.start < file_range.end =>
1373                    {
1374                        ensure!(
1375                            range.is_valid()
1376                                && range.is_aligned(block_size)
1377                                && device_offset % block_size == 0,
1378                            FxfsError::Inconsistent
1379                        );
1380                        // If the start of the requested file_range overlaps with an existing extent...
1381                        if range.start <= file_range.start {
1382                            // Record the existing extent and move on.
1383                            let device_range = device_offset
1384                                .checked_add(file_range.start - range.start)
1385                                .ok_or(FxfsError::Inconsistent)?
1386                                ..device_offset
1387                                    .checked_add(min(range.end, file_range.end) - range.start)
1388                                    .ok_or(FxfsError::Inconsistent)?;
1389                            file_range.start += device_range.end - device_range.start;
1390                            ranges.push(device_range);
1391                            if file_range.start >= file_range.end {
1392                                break 'outer;
1393                            }
1394                            iter.advance().await?;
1395                            continue;
1396                        } else {
1397                            // There's nothing allocated between file_range.start and the beginning
1398                            // of this extent.
1399                            break range.start;
1400                        }
1401                    }
1402                    // Case for deleted extents eclipsed by file_range.
1403                    Some(ItemRef {
1404                        key:
1405                            ObjectKey {
1406                                object_id,
1407                                data:
1408                                    ObjectKeyData::Attribute(
1409                                        attribute_id,
1410                                        AttributeKey::Extent(ExtentKey { range }),
1411                                    ),
1412                            },
1413                        value: ObjectValue::Extent(ExtentValue::None),
1414                        ..
1415                    }) if *object_id == self.object_id()
1416                        && *attribute_id == self.attribute_id()
1417                        && range.end < file_range.end =>
1418                    {
1419                        iter.advance().await?;
1420                    }
1421                    _ => {
1422                        // We can just preallocate the rest.
1423                        break file_range.end;
1424                    }
1425                }
1426            };
1427            let device_range = self
1428                .store()
1429                .allocator()
1430                .allocate(
1431                    transaction,
1432                    self.store().store_object_id(),
1433                    allocate_end - file_range.start,
1434                )
1435                .await
1436                .context("Allocation failed")?;
1437            allocated += device_range.end - device_range.start;
1438            let this_file_range =
1439                file_range.start..file_range.start + device_range.end - device_range.start;
1440            file_range.start = this_file_range.end;
1441            transaction.add(
1442                self.store().store_object_id,
1443                Mutation::merge_object(
1444                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1445                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1446                ),
1447            );
1448            ranges.push(device_range);
1449            // If we didn't allocate all that we requested, we'll loop around and try again.
1450            // ... unless we have filled the transaction. The caller should check file_range.
1451            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1452                break;
1453            }
1454        }
1455        // Update the file size if it changed.
1456        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1457            self.txn_update_size(transaction, file_range.start, None).await?;
1458        }
1459        self.update_allocated_size(transaction, allocated, 0).await?;
1460        Ok(ranges)
1461    }
1462
1463    pub async fn update_attributes<'a>(
1464        &self,
1465        transaction: &mut Transaction<'a>,
1466        node_attributes: Option<&fio::MutableNodeAttributes>,
1467        change_time: Option<Timestamp>,
1468    ) -> Result<(), Error> {
1469        // This codepath is only called by files, whose wrapping key id users cannot directly set
1470        // as per fscrypt.
1471        ensure!(
1472            !matches!(
1473                node_attributes,
1474                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1475            ),
1476            FxfsError::BadPath
1477        );
1478        self.handle.update_attributes(transaction, node_attributes, change_time).await
1479    }
1480
1481    /// Get the default set of transaction options for this object. This is mostly the overall
1482    /// default, modified by any [`HandleOptions`] held by this handle.
1483    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1484        self.handle.default_transaction_options()
1485    }
1486
1487    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1488        self.new_transaction_with_options(self.default_transaction_options()).await
1489    }
1490
1491    pub async fn new_transaction_with_options<'b>(
1492        &self,
1493        options: Options<'b>,
1494    ) -> Result<Transaction<'b>, Error> {
1495        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1496    }
1497
1498    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1499    pub async fn flush_device(&self) -> Result<(), Error> {
1500        self.handle.flush_device().await
1501    }
1502
1503    /// Reads an entire attribute.
1504    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1505        self.handle.read_attr(attribute_id).await
1506    }
1507
1508    /// Writes an entire attribute.  This *always* uses the volume data key.
1509    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1510        // Must be different attribute otherwise cached size gets out of date.
1511        assert_ne!(attribute_id, self.attribute_id());
1512        let store = self.store();
1513        let mut transaction = self.new_transaction().await?;
1514        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1515            transaction.commit_and_continue().await?;
1516            while matches!(
1517                store
1518                    .trim_some(
1519                        &mut transaction,
1520                        self.object_id(),
1521                        attribute_id,
1522                        TrimMode::FromOffset(data.len() as u64),
1523                    )
1524                    .await?,
1525                TrimResult::Incomplete
1526            ) {
1527                transaction.commit_and_continue().await?;
1528            }
1529        }
1530        transaction.commit().await?;
1531        Ok(())
1532    }
1533
1534    async fn read_and_decrypt(
1535        &self,
1536        device_offset: u64,
1537        file_offset: u64,
1538        buffer: MutableBufferRef<'_>,
1539        key_id: u64,
1540    ) -> Result<(), Error> {
1541        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1542    }
1543
1544    /// Truncates a file to a given size (growing/shrinking as required).
1545    ///
1546    /// Nb: Most code will want to call truncate() instead. This method is used
1547    /// to update the super block -- a case where we must borrow metadata space.
1548    pub async fn truncate_with_options(
1549        &self,
1550        options: Options<'_>,
1551        size: u64,
1552    ) -> Result<(), Error> {
1553        let mut transaction = self.new_transaction_with_options(options).await?;
1554        let old_size = self.get_size();
1555        if size == old_size {
1556            return Ok(());
1557        }
1558        if size < old_size {
1559            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1560            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1561                // The file needs to be trimmed.
1562                transaction.commit_and_continue().await?;
1563                let store = self.store();
1564                while matches!(
1565                    store
1566                        .trim_some(
1567                            &mut transaction,
1568                            self.object_id(),
1569                            self.attribute_id(),
1570                            TrimMode::FromOffset(size)
1571                        )
1572                        .await?,
1573                    TrimResult::Incomplete
1574                ) {
1575                    if let Err(error) = transaction.commit_and_continue().await {
1576                        warn!(error:?; "Failed to trim after truncate");
1577                        return Ok(());
1578                    }
1579                }
1580                if let Err(error) = transaction.commit().await {
1581                    warn!(error:?; "Failed to trim after truncate");
1582                }
1583                return Ok(());
1584            }
1585        } else {
1586            self.grow(&mut transaction, old_size, size).await?;
1587        }
1588        transaction.commit().await?;
1589        Ok(())
1590    }
1591
1592    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1593        // We don't take a read guard here since the object properties are contained in a single
1594        // object, which cannot be inconsistent with itself. The LSM tree does not return
1595        // intermediate states for a single object.
1596        let item = self
1597            .store()
1598            .tree
1599            .find(&ObjectKey::object(self.object_id()))
1600            .await?
1601            .expect("Unable to find object record");
1602        match item.value {
1603            ObjectValue::Object {
1604                kind: ObjectKind::File { refs, .. },
1605                attributes:
1606                    ObjectAttributes {
1607                        creation_time,
1608                        modification_time,
1609                        posix_attributes,
1610                        allocated_size,
1611                        access_time,
1612                        change_time,
1613                        ..
1614                    },
1615            } => Ok(ObjectProperties {
1616                refs,
1617                allocated_size,
1618                data_attribute_size: self.get_size(),
1619                creation_time,
1620                modification_time,
1621                access_time,
1622                change_time,
1623                sub_dirs: 0,
1624                posix_attributes,
1625                casefold: false,
1626                wrapping_key_id: None,
1627            }),
1628            _ => bail!(FxfsError::NotFile),
1629        }
1630    }
1631
1632    // Returns the contents of this object. This object must be < |limit| bytes in size.
1633    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1634        let size = self.get_size();
1635        if size > limit as u64 {
1636            bail!("Object too big ({} > {})", size, limit);
1637        }
1638        let mut buf = self.allocate_buffer(size as usize).await;
1639        self.read(0u64, buf.as_mut()).await?;
1640        Ok(buf.as_slice().into())
1641    }
1642
1643    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1644    /// their logical offset within the file.
1645    ///
1646    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1647    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1648        let mut extents = Vec::new();
1649        let tree = &self.store().tree;
1650        let layer_set = tree.layer_set();
1651        let mut merger = layer_set.merger();
1652        let mut iter = merger
1653            .query(Query::FullRange(&ObjectKey::attribute(
1654                self.object_id(),
1655                self.attribute_id(),
1656                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1657            )))
1658            .await?;
1659        loop {
1660            match iter.get() {
1661                Some(ItemRef {
1662                    key:
1663                        ObjectKey {
1664                            object_id,
1665                            data:
1666                                ObjectKeyData::Attribute(
1667                                    attribute_id,
1668                                    AttributeKey::Extent(ExtentKey { range }),
1669                                ),
1670                        },
1671                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1672                    ..
1673                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1674                    let logical_offset = range.start;
1675                    let device_range = *device_offset..*device_offset + range.length()?;
1676                    extents.push(FileExtent::new(logical_offset, device_range)?);
1677                }
1678                _ => break,
1679            }
1680            iter.advance().await?;
1681        }
1682        Ok(extents)
1683    }
1684}
1685
1686impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1687    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1688        match mutation {
1689            Mutation::ObjectStore(ObjectStoreMutation {
1690                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1691                ..
1692            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1693            Mutation::ObjectStore(ObjectStoreMutation {
1694                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1695                ..
1696            }) => {
1697                debug_assert_eq!(
1698                    self.get_size(),
1699                    *size,
1700                    "size should be set when verity is enabled and must not change"
1701                );
1702                self.finalize_fsverity_state()
1703            }
1704            Mutation::ObjectStore(ObjectStoreMutation {
1705                item:
1706                    ObjectItem {
1707                        key:
1708                            ObjectKey {
1709                                object_id,
1710                                data:
1711                                    ObjectKeyData::Attribute(
1712                                        attr_id,
1713                                        AttributeKey::Extent(ExtentKey { range }),
1714                                    ),
1715                            },
1716                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1717                        ..
1718                    },
1719                ..
1720            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1721                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1722                    self.overwrite_ranges.apply_range(range.clone())
1723                }
1724                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1725            },
1726            _ => {}
1727        }
1728    }
1729}
1730
1731impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1732    fn set_trace(&self, v: bool) {
1733        self.handle.set_trace(v)
1734    }
1735
1736    fn object_id(&self) -> u64 {
1737        self.handle.object_id()
1738    }
1739
1740    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1741        self.handle.allocate_buffer(size)
1742    }
1743
1744    fn block_size(&self) -> u64 {
1745        self.handle.block_size()
1746    }
1747}
1748
1749#[async_trait]
1750impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1751    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1752        let fs = self.store().filesystem();
1753        let guard = fs
1754            .lock_manager()
1755            .read_lock(lock_keys![LockKey::object_attribute(
1756                self.store().store_object_id,
1757                self.object_id(),
1758                self.attribute_id(),
1759            )])
1760            .await;
1761
1762        let size = self.get_size();
1763        if offset >= size {
1764            return Ok(0);
1765        }
1766        let length = min(buf.len() as u64, size - offset) as usize;
1767        buf = buf.subslice_mut(0..length);
1768        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1769        if self.is_verified_file() {
1770            self.verify_data(offset as usize, buf.as_slice())?;
1771        }
1772        Ok(length)
1773    }
1774
1775    fn get_size(&self) -> u64 {
1776        self.content_size.load(atomic::Ordering::Relaxed)
1777    }
1778}
1779
1780impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1781    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1782        let offset = offset.unwrap_or_else(|| self.get_size());
1783        let mut transaction = self.new_transaction().await?;
1784        self.txn_write(&mut transaction, offset, buf).await?;
1785        let new_size = self.txn_get_size(&transaction);
1786        transaction.commit().await?;
1787        Ok(new_size)
1788    }
1789
1790    async fn truncate(&self, size: u64) -> Result<(), Error> {
1791        self.truncate_with_options(self.default_transaction_options(), size).await
1792    }
1793
1794    async fn flush(&self) -> Result<(), Error> {
1795        Ok(())
1796    }
1797}
1798
1799/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1800/// write go directly to the handle in a transaction.
1801pub struct DirectWriter<'a, S: HandleOwner> {
1802    handle: &'a DataObjectHandle<S>,
1803    options: transaction::Options<'a>,
1804    buffer: Buffer<'a>,
1805    offset: u64,
1806    buf_offset: usize,
1807}
1808
1809const BUFFER_SIZE: usize = 1_048_576;
1810
1811impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1812    fn drop(&mut self) {
1813        if self.buf_offset != 0 {
1814            warn!("DirectWriter: dropping data, did you forget to call complete?");
1815        }
1816    }
1817}
1818
1819impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1820    pub async fn new(
1821        handle: &'a DataObjectHandle<S>,
1822        options: transaction::Options<'a>,
1823    ) -> DirectWriter<'a, S> {
1824        Self {
1825            handle,
1826            options,
1827            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1828            offset: 0,
1829            buf_offset: 0,
1830        }
1831    }
1832
1833    async fn flush(&mut self) -> Result<(), Error> {
1834        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1835        self.handle
1836            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1837            .await?;
1838        transaction.commit().await?;
1839        self.offset += self.buf_offset as u64;
1840        self.buf_offset = 0;
1841        Ok(())
1842    }
1843}
1844
1845impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1846    fn block_size(&self) -> u64 {
1847        self.handle.block_size()
1848    }
1849
1850    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1851        while buf.len() > 0 {
1852            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1853            self.buffer
1854                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1855                .as_mut_slice()
1856                .copy_from_slice(&buf[..to_do]);
1857            self.buf_offset += to_do;
1858            if self.buf_offset == BUFFER_SIZE {
1859                self.flush().await?;
1860            }
1861            buf = &buf[to_do..];
1862        }
1863        Ok(())
1864    }
1865
1866    async fn complete(&mut self) -> Result<(), Error> {
1867        self.flush().await?;
1868        Ok(())
1869    }
1870
1871    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1872        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1873            self.buffer
1874                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1875                .as_mut_slice()
1876                .fill(0);
1877            self.buf_offset += amount as usize;
1878        } else {
1879            self.flush().await?;
1880            self.offset += amount;
1881        }
1882        Ok(())
1883    }
1884}
1885
1886#[cfg(test)]
1887mod tests {
1888    use crate::errors::FxfsError;
1889    use crate::filesystem::{
1890        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1891    };
1892    use crate::fsck::{
1893        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1894    };
1895    use crate::lsm_tree::Query;
1896    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1897    use crate::object_handle::{
1898        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1899    };
1900    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1901    use crate::object_store::directory::replace_child;
1902    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1903    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1904    use crate::object_store::volume::root_volume;
1905    use crate::object_store::{
1906        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, Directory, ExtentKey,
1907        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1908        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1909        TRANSACTION_MUTATION_THRESHOLD,
1910    };
1911    use crate::range::RangeExt;
1912    use crate::round::{round_down, round_up};
1913    use assert_matches::assert_matches;
1914    use bit_vec::BitVec;
1915    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1916    use fuchsia_sync::Mutex;
1917    use futures::FutureExt;
1918    use futures::channel::oneshot::channel;
1919    use futures::stream::{FuturesUnordered, StreamExt};
1920    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1921    use fxfs_insecure_crypto::new_insecure_crypt;
1922    use std::ops::Range;
1923    use std::sync::Arc;
1924    use std::time::Duration;
1925    use storage_device::DeviceHolder;
1926    use storage_device::fake_device::FakeDevice;
1927    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1928
1929    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1930
1931    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1932    // device block.
1933    const TEST_DATA_OFFSET: u64 = 5000;
1934    const TEST_DATA: &[u8] = b"hello";
1935    const TEST_OBJECT_SIZE: u64 = 5678;
1936    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1937    const TEST_OBJECT_NAME: &str = "foo";
1938
1939    async fn test_filesystem() -> OpenFxFilesystem {
1940        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1941        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1942    }
1943
1944    async fn test_filesystem_and_object_with_key(
1945        crypt: Option<&dyn Crypt>,
1946        write_object_test_data: bool,
1947    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1948        let fs = test_filesystem().await;
1949        let store = fs.root_store();
1950        let object;
1951
1952        let mut transaction = fs
1953            .clone()
1954            .new_transaction(
1955                lock_keys![LockKey::object(
1956                    store.store_object_id(),
1957                    store.root_directory_object_id()
1958                )],
1959                Options::default(),
1960            )
1961            .await
1962            .expect("new_transaction failed");
1963
1964        object = if let Some(crypt) = crypt {
1965            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1966            let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await.unwrap();
1967            ObjectStore::create_object_with_key(
1968                &store,
1969                &mut transaction,
1970                object_id,
1971                HandleOptions::default(),
1972                EncryptionKey::Fxfs(key),
1973                unwrapped_key,
1974            )
1975            .await
1976            .expect("create_object failed")
1977        } else {
1978            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1979                .await
1980                .expect("create_object failed")
1981        };
1982
1983        let root_directory =
1984            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1985        root_directory
1986            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1987            .await
1988            .expect("add_child_file failed");
1989
1990        if write_object_test_data {
1991            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1992            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1993            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1994            object
1995                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1996                .await
1997                .expect("write failed");
1998        }
1999        transaction.commit().await.expect("commit failed");
2000        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2001        (fs, object)
2002    }
2003
2004    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2005        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2006    }
2007
2008    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2009    {
2010        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2011    }
2012
2013    #[fuchsia::test]
2014    async fn test_zero_buf_len_read() {
2015        let (fs, object) = test_filesystem_and_object().await;
2016        let mut buf = object.allocate_buffer(0).await;
2017        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2018        fs.close().await.expect("Close failed");
2019    }
2020
2021    #[fuchsia::test]
2022    async fn test_beyond_eof_read() {
2023        let (fs, object) = test_filesystem_and_object().await;
2024        let offset = TEST_OBJECT_SIZE as usize - 2;
2025        let align = offset % fs.block_size() as usize;
2026        let len: usize = 2;
2027        let mut buf = object.allocate_buffer(align + len + 1).await;
2028        buf.as_mut_slice().fill(123u8);
2029        assert_eq!(
2030            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2031            align + len
2032        );
2033        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2034        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2035        fs.close().await.expect("Close failed");
2036    }
2037
2038    #[fuchsia::test]
2039    async fn test_beyond_eof_read_from() {
2040        let (fs, object) = test_filesystem_and_object().await;
2041        let handle = &*object;
2042        let offset = TEST_OBJECT_SIZE as usize - 2;
2043        let align = offset % fs.block_size() as usize;
2044        let len: usize = 2;
2045        let mut buf = object.allocate_buffer(align + len + 1).await;
2046        buf.as_mut_slice().fill(123u8);
2047        assert_eq!(
2048            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2049            align + len
2050        );
2051        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2052        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2053        fs.close().await.expect("Close failed");
2054    }
2055
2056    #[fuchsia::test]
2057    async fn test_beyond_eof_read_unchecked() {
2058        let (fs, object) = test_filesystem_and_object().await;
2059        let offset = TEST_OBJECT_SIZE as usize - 2;
2060        let align = offset % fs.block_size() as usize;
2061        let len: usize = 2;
2062        let mut buf = object.allocate_buffer(align + len + 1).await;
2063        buf.as_mut_slice().fill(123u8);
2064        let guard = fs
2065            .lock_manager()
2066            .read_lock(lock_keys![LockKey::object_attribute(
2067                object.store().store_object_id,
2068                object.object_id(),
2069                0,
2070            )])
2071            .await;
2072        object
2073            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2074            .await
2075            .expect("read failed");
2076        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2077        fs.close().await.expect("Close failed");
2078    }
2079
2080    #[fuchsia::test]
2081    async fn test_read_sparse() {
2082        let (fs, object) = test_filesystem_and_object().await;
2083        // Deliberately read not right to eof.
2084        let len = TEST_OBJECT_SIZE as usize - 1;
2085        let mut buf = object.allocate_buffer(len).await;
2086        buf.as_mut_slice().fill(123u8);
2087        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2088        let mut expected = vec![0; len];
2089        let offset = TEST_DATA_OFFSET as usize;
2090        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2091        assert_eq!(buf.as_slice()[..len], expected[..]);
2092        fs.close().await.expect("Close failed");
2093    }
2094
2095    #[fuchsia::test]
2096    async fn test_read_after_writes_interspersed_with_flush() {
2097        let (fs, object) = test_filesystem_and_object().await;
2098
2099        object.owner().flush().await.expect("flush failed");
2100
2101        // Write more test data to the first block fo the file.
2102        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2103        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2104        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2105
2106        let len = TEST_OBJECT_SIZE as usize - 1;
2107        let mut buf = object.allocate_buffer(len).await;
2108        buf.as_mut_slice().fill(123u8);
2109        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2110
2111        let mut expected = vec![0u8; len];
2112        let offset = TEST_DATA_OFFSET as usize;
2113        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2114        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2115        assert_eq!(buf.as_slice(), &expected);
2116        fs.close().await.expect("Close failed");
2117    }
2118
2119    #[fuchsia::test]
2120    async fn test_read_after_truncate_and_extend() {
2121        let (fs, object) = test_filesystem_and_object().await;
2122
2123        // Arrange for there to be <extent><deleted-extent><extent>.
2124        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2125        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2126        // This adds an extent at 0..512.
2127        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2128        // This deletes 512..1024.
2129        object.truncate(3).await.expect("truncate failed");
2130        let data = b"foo";
2131        let offset = 1500u64;
2132        let align = (offset % fs.block_size() as u64) as usize;
2133        let mut buf = object.allocate_buffer(align + data.len()).await;
2134        buf.as_mut_slice()[align..].copy_from_slice(data);
2135        // This adds 1024..1536.
2136        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2137
2138        const LEN1: usize = 1503;
2139        let mut buf = object.allocate_buffer(LEN1).await;
2140        buf.as_mut_slice().fill(123u8);
2141        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2142        let mut expected = [0; LEN1];
2143        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2144        expected[1500..].copy_from_slice(b"foo");
2145        assert_eq!(buf.as_slice(), &expected);
2146
2147        // Also test a read that ends midway through the deleted extent.
2148        const LEN2: usize = 601;
2149        let mut buf = object.allocate_buffer(LEN2).await;
2150        buf.as_mut_slice().fill(123u8);
2151        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2152        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2153        fs.close().await.expect("Close failed");
2154    }
2155
2156    #[fuchsia::test]
2157    async fn test_read_whole_blocks_with_multiple_objects() {
2158        let (fs, object) = test_filesystem_and_object().await;
2159        let block_size = object.block_size() as usize;
2160        let mut buffer = object.allocate_buffer(block_size).await;
2161        buffer.as_mut_slice().fill(0xaf);
2162        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2163
2164        let store = object.owner();
2165        let mut transaction = fs
2166            .clone()
2167            .new_transaction(lock_keys![], Options::default())
2168            .await
2169            .expect("new_transaction failed");
2170        let object2 =
2171            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2172                .await
2173                .expect("create_object failed");
2174        transaction.commit().await.expect("commit failed");
2175        let mut ef_buffer = object.allocate_buffer(block_size).await;
2176        ef_buffer.as_mut_slice().fill(0xef);
2177        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2178
2179        let mut buffer = object.allocate_buffer(block_size).await;
2180        buffer.as_mut_slice().fill(0xaf);
2181        object
2182            .write_or_append(Some(block_size as u64), buffer.as_ref())
2183            .await
2184            .expect("write failed");
2185        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2186        object2
2187            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2188            .await
2189            .expect("write failed");
2190
2191        let mut buffer = object.allocate_buffer(4 * block_size).await;
2192        buffer.as_mut_slice().fill(123);
2193        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2194        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2195        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2196        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2197        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2198        fs.close().await.expect("Close failed");
2199    }
2200
2201    #[fuchsia::test]
2202    async fn test_alignment() {
2203        let (fs, object) = test_filesystem_and_object().await;
2204
2205        struct AlignTest {
2206            fill: u8,
2207            object: DataObjectHandle<ObjectStore>,
2208            mirror: Vec<u8>,
2209        }
2210
2211        impl AlignTest {
2212            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2213                let mirror = {
2214                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2215                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2216                    buf.as_slice().to_vec()
2217                };
2218                Self { fill: 0, object, mirror }
2219            }
2220
2221            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2222            // operation to an in-memory copy of the object.
2223            // Each subsequent call bumps the value of fill.
2224            // It is expected that the object and its mirror maintain identical content.
2225            async fn test(&mut self, range: Range<u64>) {
2226                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2227                self.fill += 1;
2228                buf.as_mut_slice().fill(self.fill);
2229                self.object
2230                    .write_or_append(Some(range.start), buf.as_ref())
2231                    .await
2232                    .expect("write_or_append failed");
2233                if range.end > self.mirror.len() as u64 {
2234                    self.mirror.resize(range.end as usize, 0);
2235                }
2236                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2237                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2238                assert_eq!(
2239                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2240                    self.mirror.len()
2241                );
2242                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2243            }
2244        }
2245
2246        let block_size = object.block_size() as u64;
2247        let mut align = AlignTest::new(object).await;
2248
2249        // Fill the object to start with (with 1).
2250        align.test(0..2 * block_size + 1).await;
2251
2252        // Unaligned head (fills with 2, overwrites that with 3).
2253        align.test(1..block_size).await;
2254        align.test(1..2 * block_size).await;
2255
2256        // Unaligned tail (fills with 4 and 5).
2257        align.test(0..block_size - 1).await;
2258        align.test(0..2 * block_size - 1).await;
2259
2260        // Both unaligned (fills with 6 and 7).
2261        align.test(1..block_size - 1).await;
2262        align.test(1..2 * block_size - 1).await;
2263
2264        fs.close().await.expect("Close failed");
2265    }
2266
2267    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2268        let allocator = fs.allocator();
2269        let allocated_before = allocator.get_allocated_bytes();
2270        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2271        object
2272            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2273            .await
2274            .expect("preallocate_range failed");
2275        transaction.commit().await.expect("commit failed");
2276        assert!(object.get_size() < 1048576);
2277        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2278        object
2279            .preallocate_range(&mut transaction, &mut (0..1048576))
2280            .await
2281            .expect("preallocate_range failed");
2282        transaction.commit().await.expect("commit failed");
2283        assert_eq!(object.get_size(), 1048576);
2284        // Check that it didn't reallocate the space for the existing extent
2285        let allocated_after = allocator.get_allocated_bytes();
2286        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2287
2288        let mut buf = object
2289            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2290            .await;
2291        buf.as_mut_slice().fill(47);
2292        object
2293            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2294            .await
2295            .expect("write failed");
2296        buf.as_mut_slice().fill(95);
2297        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2298        object
2299            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2300            .await
2301            .expect("write failed");
2302
2303        // Make sure there were no more allocations.
2304        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2305
2306        // Read back the data and make sure it is what we expect.
2307        let mut buf = object.allocate_buffer(104876).await;
2308        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2309        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2310        assert_eq!(
2311            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2312            TEST_DATA
2313        );
2314        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2315    }
2316
2317    #[fuchsia::test]
2318    async fn test_preallocate_range() {
2319        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2320        test_preallocate_common(&fs, object).await;
2321        fs.close().await.expect("Close failed");
2322    }
2323
2324    // This is identical to the previous test except that we flush so that extents end up in
2325    // different layers.
2326    #[fuchsia::test]
2327    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2328        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2329        object.owner().flush().await.expect("flush failed");
2330        test_preallocate_common(&fs, object).await;
2331        fs.close().await.expect("Close failed");
2332    }
2333
2334    #[fuchsia::test]
2335    async fn test_already_preallocated() {
2336        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2337        let allocator = fs.allocator();
2338        let allocated_before = allocator.get_allocated_bytes();
2339        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2340        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2341        object
2342            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2343            .await
2344            .expect("preallocate_range failed");
2345        transaction.commit().await.expect("commit failed");
2346        // Check that it didn't reallocate any new space.
2347        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2348        fs.close().await.expect("Close failed");
2349    }
2350
2351    #[fuchsia::test]
2352    async fn test_overwrite_when_preallocated_at_start_of_file() {
2353        // The standard test data we put in the test object would cause an extent with checksums
2354        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2355        let (fs, object) = test_filesystem_and_empty_object().await;
2356
2357        let object = ObjectStore::open_object(
2358            object.owner(),
2359            object.object_id(),
2360            HandleOptions::default(),
2361            None,
2362        )
2363        .await
2364        .expect("open_object failed");
2365
2366        assert_eq!(fs.block_size(), 4096);
2367
2368        let mut write_buf = object.allocate_buffer(4096).await;
2369        write_buf.as_mut_slice().fill(95);
2370
2371        // First try to overwrite without allowing allocations
2372        // We expect this to fail, since nothing is allocated yet
2373        object
2374            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2375            .await
2376            .expect_err("overwrite succeeded");
2377
2378        // Now preallocate some space (exactly one block)
2379        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2380        object
2381            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2382            .await
2383            .expect("preallocate_range failed");
2384        transaction.commit().await.expect("commit failed");
2385
2386        // Now try the same overwrite command as before, it should work this time,
2387        // even with allocations disabled...
2388        {
2389            let mut read_buf = object.allocate_buffer(4096).await;
2390            object.read(0, read_buf.as_mut()).await.expect("read failed");
2391            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2392        }
2393        object
2394            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2395            .await
2396            .expect("overwrite failed");
2397        {
2398            let mut read_buf = object.allocate_buffer(4096).await;
2399            object.read(0, read_buf.as_mut()).await.expect("read failed");
2400            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2401        }
2402
2403        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2404        // one block earlier at offset 0
2405        object
2406            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2407            .await
2408            .expect_err("overwrite succeeded");
2409
2410        // We can't assert anything about the existing bytes, because they haven't been allocated
2411        // yet and they could contain any values
2412        object
2413            .overwrite(
2414                4096,
2415                write_buf.as_mut(),
2416                OverwriteOptions { allow_allocations: true, ..Default::default() },
2417            )
2418            .await
2419            .expect("overwrite failed");
2420        {
2421            let mut read_buf = object.allocate_buffer(4096).await;
2422            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2423            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2424        }
2425
2426        // Check that the overwrites haven't messed up the filesystem state
2427        let fsck_options = FsckOptions {
2428            fail_on_warning: true,
2429            no_lock: true,
2430            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2431            ..Default::default()
2432        };
2433        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2434
2435        fs.close().await.expect("Close failed");
2436    }
2437
2438    #[fuchsia::test]
2439    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2440        // The standard test data we put in the test object would cause an extent with checksums
2441        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2442        let (fs, object) = test_filesystem_and_empty_object().await;
2443
2444        let object = ObjectStore::open_object(
2445            object.owner(),
2446            object.object_id(),
2447            HandleOptions::default(),
2448            None,
2449        )
2450        .await
2451        .expect("open_object failed");
2452
2453        assert_eq!(fs.block_size(), 4096);
2454        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2455
2456        // Let's create some non-holes
2457        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2458        object
2459            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2460            .await
2461            .expect("preallocate_range failed");
2462        object
2463            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2464            .await
2465            .expect("preallocate_range failed");
2466        object
2467            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2468            .await
2469            .expect("preallocate_range failed");
2470        object
2471            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2472            .await
2473            .expect("preallocate_range failed");
2474        transaction.commit().await.expect("commit failed");
2475
2476        assert_eq!(object.get_size(), 524288);
2477
2478        let mut write_buf = object.allocate_buffer(4096).await;
2479        write_buf.as_mut_slice().fill(95);
2480
2481        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2482        object
2483            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2484            .await
2485            .expect_err("overwrite succeeded");
2486        object
2487            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2488            .await
2489            .expect_err("overwrite succeeded");
2490        object
2491            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2492            .await
2493            .expect_err("overwrite succeeded");
2494        object
2495            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2496            .await
2497            .expect_err("overwrite succeeded");
2498
2499        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2500        {
2501            let mut read_buf = object.allocate_buffer(4096).await;
2502            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2503            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2504        }
2505        object
2506            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2507            .await
2508            .expect("overwrite failed");
2509        {
2510            let mut read_buf = object.allocate_buffer(4096).await;
2511            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2512            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2513        }
2514        {
2515            let mut read_buf = object.allocate_buffer(4096).await;
2516            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2517            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2518        }
2519        object
2520            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2521            .await
2522            .expect("overwrite failed");
2523        {
2524            let mut read_buf = object.allocate_buffer(4096).await;
2525            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2526            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2527        }
2528        {
2529            let mut read_buf = object.allocate_buffer(4096).await;
2530            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2531            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2532        }
2533        object
2534            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2535            .await
2536            .expect("overwrite failed");
2537        {
2538            let mut read_buf = object.allocate_buffer(4096).await;
2539            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2540            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2541        }
2542        {
2543            let mut read_buf = object.allocate_buffer(4096).await;
2544            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2545            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2546        }
2547        object
2548            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2549            .await
2550            .expect("overwrite failed");
2551        {
2552            let mut read_buf = object.allocate_buffer(4096).await;
2553            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2554            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2555        }
2556
2557        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2558        let mut huge_write_buf = object.allocate_buffer(524288).await;
2559        huge_write_buf.as_mut_slice().fill(96);
2560
2561        // With allocations disabled, the big overwrite should fail...
2562        object
2563            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2564            .await
2565            .expect_err("overwrite succeeded");
2566        // ... but it should work when allocations are enabled
2567        object
2568            .overwrite(
2569                0,
2570                huge_write_buf.as_mut(),
2571                OverwriteOptions { allow_allocations: true, ..Default::default() },
2572            )
2573            .await
2574            .expect("overwrite failed");
2575        {
2576            let mut read_buf = object.allocate_buffer(524288).await;
2577            object.read(0, read_buf.as_mut()).await.expect("read failed");
2578            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2579        }
2580
2581        // Check that the overwrites haven't messed up the filesystem state
2582        let fsck_options = FsckOptions {
2583            fail_on_warning: true,
2584            no_lock: true,
2585            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2586            ..Default::default()
2587        };
2588        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2589
2590        fs.close().await.expect("Close failed");
2591    }
2592
2593    #[fuchsia::test]
2594    async fn test_overwrite_when_unallocated_at_start_of_file() {
2595        // The standard test data we put in the test object would cause an extent with checksums
2596        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2597        let (fs, object) = test_filesystem_and_empty_object().await;
2598
2599        let object = ObjectStore::open_object(
2600            object.owner(),
2601            object.object_id(),
2602            HandleOptions::default(),
2603            None,
2604        )
2605        .await
2606        .expect("open_object failed");
2607
2608        assert_eq!(fs.block_size(), 4096);
2609
2610        let mut write_buf = object.allocate_buffer(4096).await;
2611        write_buf.as_mut_slice().fill(95);
2612
2613        // First try to overwrite without allowing allocations
2614        // We expect this to fail, since nothing is allocated yet
2615        object
2616            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2617            .await
2618            .expect_err("overwrite succeeded");
2619
2620        // Now try the same overwrite command as before, but allow allocations
2621        object
2622            .overwrite(
2623                0,
2624                write_buf.as_mut(),
2625                OverwriteOptions { allow_allocations: true, ..Default::default() },
2626            )
2627            .await
2628            .expect("overwrite failed");
2629        {
2630            let mut read_buf = object.allocate_buffer(4096).await;
2631            object.read(0, read_buf.as_mut()).await.expect("read failed");
2632            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2633        }
2634
2635        // Now try to overwrite at the next block. This should fail if allocations are disabled
2636        object
2637            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2638            .await
2639            .expect_err("overwrite succeeded");
2640
2641        // ... but it should work if allocations are enabled
2642        object
2643            .overwrite(
2644                4096,
2645                write_buf.as_mut(),
2646                OverwriteOptions { allow_allocations: true, ..Default::default() },
2647            )
2648            .await
2649            .expect("overwrite failed");
2650        {
2651            let mut read_buf = object.allocate_buffer(4096).await;
2652            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2653            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2654        }
2655
2656        // Check that the overwrites haven't messed up the filesystem state
2657        let fsck_options = FsckOptions {
2658            fail_on_warning: true,
2659            no_lock: true,
2660            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2661            ..Default::default()
2662        };
2663        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2664
2665        fs.close().await.expect("Close failed");
2666    }
2667
2668    #[fuchsia::test]
2669    async fn test_overwrite_can_extend_a_file() {
2670        // The standard test data we put in the test object would cause an extent with checksums
2671        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2672        let (fs, object) = test_filesystem_and_empty_object().await;
2673
2674        let object = ObjectStore::open_object(
2675            object.owner(),
2676            object.object_id(),
2677            HandleOptions::default(),
2678            None,
2679        )
2680        .await
2681        .expect("open_object failed");
2682
2683        assert_eq!(fs.block_size(), 4096);
2684        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2685
2686        let mut write_buf = object.allocate_buffer(4096).await;
2687        write_buf.as_mut_slice().fill(95);
2688
2689        // Let's try to fill up the last block, and increase the file size in doing so
2690        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2691
2692        // Expected to fail with allocations disabled
2693        object
2694            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2695            .await
2696            .expect_err("overwrite succeeded");
2697        // ... but expected to succeed with allocations enabled
2698        object
2699            .overwrite(
2700                last_block_offset,
2701                write_buf.as_mut(),
2702                OverwriteOptions { allow_allocations: true, ..Default::default() },
2703            )
2704            .await
2705            .expect("overwrite failed");
2706        {
2707            let mut read_buf = object.allocate_buffer(4096).await;
2708            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2709            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2710        }
2711
2712        assert_eq!(object.get_size(), 8192);
2713
2714        // Let's try to write at the next block, too
2715        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2716
2717        // Expected to fail with allocations disabled
2718        object
2719            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2720            .await
2721            .expect_err("overwrite succeeded");
2722        // ... but expected to succeed with allocations enabled
2723        object
2724            .overwrite(
2725                next_block_offset,
2726                write_buf.as_mut(),
2727                OverwriteOptions { allow_allocations: true, ..Default::default() },
2728            )
2729            .await
2730            .expect("overwrite failed");
2731        {
2732            let mut read_buf = object.allocate_buffer(4096).await;
2733            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2734            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2735        }
2736
2737        assert_eq!(object.get_size(), 12288);
2738
2739        // Check that the overwrites haven't messed up the filesystem state
2740        let fsck_options = FsckOptions {
2741            fail_on_warning: true,
2742            no_lock: true,
2743            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2744            ..Default::default()
2745        };
2746        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2747
2748        fs.close().await.expect("Close failed");
2749    }
2750
2751    #[fuchsia::test]
2752    async fn test_enable_verity() {
2753        let fs: OpenFxFilesystem = test_filesystem().await;
2754        let mut transaction = fs
2755            .clone()
2756            .new_transaction(lock_keys![], Options::default())
2757            .await
2758            .expect("new_transaction failed");
2759        let store = fs.root_store();
2760        let object = Arc::new(
2761            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2762                .await
2763                .expect("create_object failed"),
2764        );
2765
2766        transaction.commit().await.unwrap();
2767
2768        object
2769            .enable_verity(fio::VerificationOptions {
2770                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2771                salt: Some(vec![]),
2772                ..Default::default()
2773            })
2774            .await
2775            .expect("set verified file metadata failed");
2776
2777        let handle =
2778            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2779                .await
2780                .expect("open_object failed");
2781
2782        assert!(handle.is_verified_file());
2783
2784        fs.close().await.expect("Close failed");
2785    }
2786
2787    #[fuchsia::test]
2788    async fn test_enable_verity_large_file() {
2789        // Need to make a large FakeDevice to create space for a 67 MB file.
2790        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2791        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2792        let root_store = fs.root_store();
2793        let mut transaction = fs
2794            .clone()
2795            .new_transaction(lock_keys![], Options::default())
2796            .await
2797            .expect("new_transaction failed");
2798
2799        let handle = ObjectStore::create_object(
2800            &root_store,
2801            &mut transaction,
2802            HandleOptions::default(),
2803            None,
2804        )
2805        .await
2806        .expect("failed to create object");
2807        transaction.commit().await.expect("commit failed");
2808        let mut offset = 0;
2809
2810        // Write a file big enough to trigger multiple transactions on enable_verity().
2811        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2812        buf.as_mut_slice().fill(1);
2813        for _ in 0..130 {
2814            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2815            offset += WRITE_ATTR_BATCH_SIZE as u64;
2816        }
2817
2818        handle
2819            .enable_verity(fio::VerificationOptions {
2820                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2821                salt: Some(vec![]),
2822                ..Default::default()
2823            })
2824            .await
2825            .expect("set verified file metadata failed");
2826
2827        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2828        offset = 0;
2829        for _ in 0..130 {
2830            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2831            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2832            offset += WRITE_ATTR_BATCH_SIZE as u64;
2833        }
2834
2835        fsck(fs.clone()).await.expect("fsck failed");
2836        fs.close().await.expect("Close failed");
2837    }
2838
2839    #[fuchsia::test]
2840    async fn test_retry_enable_verity_on_reboot() {
2841        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2842        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2843        let root_store = fs.root_store();
2844        let mut transaction = fs
2845            .clone()
2846            .new_transaction(lock_keys![], Options::default())
2847            .await
2848            .expect("new_transaction failed");
2849
2850        let handle = ObjectStore::create_object(
2851            &root_store,
2852            &mut transaction,
2853            HandleOptions::default(),
2854            None,
2855        )
2856        .await
2857        .expect("failed to create object");
2858        transaction.commit().await.expect("commit failed");
2859
2860        let object_id = {
2861            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2862            transaction.add(
2863                root_store.store_object_id(),
2864                Mutation::replace_or_insert_object(
2865                    ObjectKey::graveyard_attribute_entry(
2866                        root_store.graveyard_directory_object_id(),
2867                        handle.object_id(),
2868                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2869                    ),
2870                    ObjectValue::Some,
2871                ),
2872            );
2873
2874            // This write should span three transactions. This test mimics the behavior when the
2875            // last transaction gets interrupted by a filesystem.close().
2876            handle
2877                .write_new_attr_in_batches(
2878                    &mut transaction,
2879                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2880                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2881                    WRITE_ATTR_BATCH_SIZE,
2882                )
2883                .await
2884                .expect("failed to write merkle attribute");
2885
2886            handle.object_id()
2887            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2888            // release the transaction locks.
2889        };
2890
2891        fs.close().await.expect("failed to close filesystem");
2892        let device = fs.take_device().await;
2893        device.reopen(false);
2894
2895        let fs =
2896            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2897        fsck(fs.clone()).await.expect("fsck failed");
2898        fs.close().await.expect("failed to close filesystem");
2899        let device = fs.take_device().await;
2900        device.reopen(false);
2901
2902        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2903        let fs = FxFilesystem::open(device).await.expect("open failed");
2904        let root_store = fs.root_store();
2905        let handle =
2906            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2907                .await
2908                .expect("open_object failed");
2909        handle
2910            .enable_verity(fio::VerificationOptions {
2911                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2912                salt: Some(vec![]),
2913                ..Default::default()
2914            })
2915            .await
2916            .expect("set verified file metadata failed");
2917
2918        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2919        // isn't strictly necessary for the test to pass (the graveyard marker was already
2920        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2921        // graveyard entry not being removed upon processing.
2922        fs.graveyard().flush().await;
2923        assert!(
2924            FsVerityDescriptor::from_bytes(
2925                &handle
2926                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2927                    .await
2928                    .expect("read_attr failed")
2929                    .expect("No attr found"),
2930                handle.block_size() as usize
2931            )
2932            .is_ok()
2933        );
2934        fsck(fs.clone()).await.expect("fsck failed");
2935        fs.close().await.expect("Close failed");
2936    }
2937
2938    #[fuchsia::test]
2939    async fn test_verify_data_corrupt_file() {
2940        let fs: OpenFxFilesystem = test_filesystem().await;
2941        let mut transaction = fs
2942            .clone()
2943            .new_transaction(lock_keys![], Options::default())
2944            .await
2945            .expect("new_transaction failed");
2946        let store = fs.root_store();
2947        let object = Arc::new(
2948            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2949                .await
2950                .expect("create_object failed"),
2951        );
2952
2953        transaction.commit().await.unwrap();
2954
2955        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2956        buf.as_mut_slice().fill(123);
2957        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2958
2959        object
2960            .enable_verity(fio::VerificationOptions {
2961                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2962                salt: Some(vec![]),
2963                ..Default::default()
2964            })
2965            .await
2966            .expect("set verified file metadata failed");
2967
2968        // Change file contents and ensure verification fails
2969        buf.as_mut_slice().fill(234);
2970        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2971        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2972
2973        fs.close().await.expect("Close failed");
2974    }
2975
2976    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
2977    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
2978    // paths.
2979    #[fuchsia::test]
2980    async fn test_parse_f2fs_verity() {
2981        let fs: OpenFxFilesystem = test_filesystem().await;
2982        let mut transaction = fs
2983            .clone()
2984            .new_transaction(lock_keys![], Options::default())
2985            .await
2986            .expect("new_transaction failed");
2987        let store = fs.root_store();
2988        let object = Arc::new(
2989            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2990                .await
2991                .expect("create_object failed"),
2992        );
2993
2994        transaction.commit().await.unwrap();
2995        let file_size = fs.block_size() * 2;
2996        // Write over one block to make there be leaf hashes.
2997        {
2998            let mut buf = object.allocate_buffer(file_size as usize).await;
2999            buf.as_mut_slice().fill(64);
3000            assert_eq!(
3001                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3002                file_size
3003            );
3004        }
3005
3006        // Enable verity normally, then shift the type.
3007        object
3008            .enable_verity(fio::VerificationOptions {
3009                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3010                salt: Some(vec![]),
3011                ..Default::default()
3012            })
3013            .await
3014            .expect("set verified file metadata failed");
3015        let (verity_info, root_hash) =
3016            object.get_descriptor().expect("Getting verity info").unwrap();
3017
3018        let mut transaction = fs
3019            .clone()
3020            .new_transaction(
3021                lock_keys![LockKey::Object {
3022                    store_object_id: store.store_object_id(),
3023                    object_id: object.object_id()
3024                }],
3025                Options::default(),
3026            )
3027            .await
3028            .expect("new_transaction failed");
3029        transaction.add(
3030            store.store_object_id(),
3031            Mutation::replace_or_insert_object(
3032                ObjectKey::attribute(
3033                    object.object_id(),
3034                    DEFAULT_DATA_ATTRIBUTE_ID,
3035                    AttributeKey::Attribute,
3036                ),
3037                ObjectValue::verified_attribute(
3038                    file_size,
3039                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3040                ),
3041            ),
3042        );
3043        transaction.add(
3044            store.store_object_id(),
3045            Mutation::replace_or_insert_object(
3046                ObjectKey::attribute(
3047                    object.object_id(),
3048                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3049                    AttributeKey::Attribute,
3050                ),
3051                ObjectValue::attribute(fs.block_size() * 2, false),
3052            ),
3053        );
3054        {
3055            let descriptor = FsVerityDescriptorRaw::new(
3056                fio::HashAlgorithm::Sha256,
3057                fs.block_size(),
3058                file_size,
3059                root_hash.as_slice(),
3060                match &verity_info.salt {
3061                    Some(salt) => salt.as_slice(),
3062                    None => [0u8; 0].as_slice(),
3063                },
3064            )
3065            .expect("Creating descriptor");
3066            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3067            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3068            object
3069                .multi_write(
3070                    &mut transaction,
3071                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3072                    &[fs.block_size()..(fs.block_size() * 2)],
3073                    buf.as_mut(),
3074                )
3075                .await
3076                .expect("Writing descriptor");
3077        }
3078        transaction.commit().await.unwrap();
3079
3080        let handle =
3081            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3082                .await
3083                .expect("open_object failed");
3084
3085        assert!(handle.is_verified_file());
3086
3087        let mut buf = object.allocate_buffer(file_size as usize).await;
3088        assert_eq!(
3089            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3090            file_size as usize
3091        );
3092
3093        fs.close().await.expect("Close failed");
3094    }
3095
3096    #[fuchsia::test]
3097    async fn test_verify_data_corrupt_tree() {
3098        let fs: OpenFxFilesystem = test_filesystem().await;
3099        let object_id = {
3100            let store = fs.root_store();
3101            let mut transaction = fs
3102                .clone()
3103                .new_transaction(lock_keys![], Options::default())
3104                .await
3105                .expect("new_transaction failed");
3106            let object = Arc::new(
3107                ObjectStore::create_object(
3108                    &store,
3109                    &mut transaction,
3110                    HandleOptions::default(),
3111                    None,
3112                )
3113                .await
3114                .expect("create_object failed"),
3115            );
3116            let object_id = object.object_id();
3117
3118            transaction.commit().await.unwrap();
3119
3120            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3121            buf.as_mut_slice().fill(123);
3122            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3123
3124            object
3125                .enable_verity(fio::VerificationOptions {
3126                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3127                    salt: Some(vec![]),
3128                    ..Default::default()
3129                })
3130                .await
3131                .expect("set verified file metadata failed");
3132            object.read(0, buf.as_mut()).await.expect("verified read");
3133
3134            // Corrupt the merkle tree before closing.
3135            let mut merkle = object
3136                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3137                .await
3138                .unwrap()
3139                .expect("Reading merkle tree");
3140            merkle[0] = merkle[0].wrapping_add(1);
3141            object
3142                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3143                .await
3144                .expect("Overwriting merkle");
3145
3146            object_id
3147        }; // Close object.
3148
3149        // Reopening the object should complain about the corrupted merkle tree.
3150        assert!(
3151            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3152                .await
3153                .is_err()
3154        );
3155        fs.close().await.expect("Close failed");
3156    }
3157
3158    #[fuchsia::test]
3159    async fn test_extend() {
3160        let fs = test_filesystem().await;
3161        let handle;
3162        let mut transaction = fs
3163            .clone()
3164            .new_transaction(lock_keys![], Options::default())
3165            .await
3166            .expect("new_transaction failed");
3167        let store = fs.root_store();
3168        handle =
3169            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3170                .await
3171                .expect("create_object failed");
3172
3173        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3174        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3175        // of 2MiB here.
3176        const START_OFFSET: u64 = 2048 * 1024;
3177        handle
3178            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3179            .await
3180            .expect("extend failed");
3181        transaction.commit().await.expect("commit failed");
3182        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3183        buf.as_mut_slice().fill(123);
3184        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3185        buf.as_mut_slice().fill(67);
3186        handle.read(0, buf.as_mut()).await.expect("read failed");
3187        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3188        fs.close().await.expect("Close failed");
3189    }
3190
3191    #[fuchsia::test]
3192    async fn test_truncate_deallocates_old_extents() {
3193        let (fs, object) = test_filesystem_and_object().await;
3194        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3195        buf.as_mut_slice().fill(0xaa);
3196        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3197
3198        let allocator = fs.allocator();
3199        let allocated_before = allocator.get_allocated_bytes();
3200        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3201        let allocated_after = allocator.get_allocated_bytes();
3202        assert!(
3203            allocated_after < allocated_before,
3204            "before = {} after = {}",
3205            allocated_before,
3206            allocated_after
3207        );
3208        fs.close().await.expect("Close failed");
3209    }
3210
3211    #[fuchsia::test]
3212    async fn test_truncate_zeroes_tail_block() {
3213        let (fs, object) = test_filesystem_and_object().await;
3214
3215        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3216        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3217            .await
3218            .expect("truncate failed");
3219
3220        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3221        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3222        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3223
3224        let mut expected = TEST_DATA.to_vec();
3225        expected[3..].fill(0);
3226        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3227    }
3228
3229    #[fuchsia::test]
3230    async fn test_trim() {
3231        // Format a new filesystem.
3232        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3233        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3234        let block_size = fs.block_size();
3235        root_volume(fs.clone())
3236            .await
3237            .expect("root_volume failed")
3238            .new_volume("test", NewChildStoreOptions::default())
3239            .await
3240            .expect("volume failed");
3241        fs.close().await.expect("close failed");
3242        let device = fs.take_device().await;
3243        device.reopen(false);
3244
3245        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3246        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3247        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3248        // graveyard trims the file as expected.
3249        #[derive(Default)]
3250        struct Context {
3251            store: Option<Arc<ObjectStore>>,
3252            object_id: Option<u64>,
3253        }
3254        let shared_context = Arc::new(Mutex::new(Context::default()));
3255
3256        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3257
3258        // Wait for an object to get tombstoned by the graveyard.
3259        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3260            loop {
3261                if let Err(e) =
3262                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3263                {
3264                    assert!(
3265                        FxfsError::NotFound.matches(&e),
3266                        "open_object didn't fail with NotFound: {:?}",
3267                        e
3268                    );
3269                    break;
3270                }
3271                // The graveyard should eventually tombstone the object.
3272                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3273            }
3274        }
3275
3276        // Checks to see if the object needs to be trimmed.
3277        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3278            let root_directory = Directory::open(store, store.root_directory_object_id())
3279                .await
3280                .expect("open failed");
3281            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3282            if let Some((oid, _, _)) = oid {
3283                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3284                    .await
3285                    .expect("open_object failed");
3286                let props = object.get_properties().await.expect("get_properties failed");
3287                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3288                    Some(object)
3289                } else {
3290                    None
3291                }
3292            } else {
3293                None
3294            }
3295        }
3296
3297        let shared_context_clone = shared_context.clone();
3298        let post_commit = move || {
3299            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3300            let shared_context = shared_context_clone.clone();
3301            async move {
3302                // First run fsck on the current filesystem.
3303                let options = FsckOptions {
3304                    fail_on_warning: true,
3305                    no_lock: true,
3306                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3307                    ..Default::default()
3308                };
3309                let fs = store.filesystem();
3310
3311                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3312                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3313                    .await
3314                    .expect("fsck_volume_with_options failed");
3315
3316                // Now check that we can replay this correctly.
3317                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3318                    .await
3319                    .expect("sync failed");
3320                let device = fs.device().snapshot().expect("snapshot failed");
3321
3322                let object_id = shared_context.lock().object_id.clone();
3323
3324                let fs2 = FxFilesystemBuilder::new()
3325                    .skip_initial_reap(object_id.is_none())
3326                    .open(device)
3327                    .await
3328                    .expect("open failed");
3329
3330                // If the "foo" file exists check that allocated size matches content size.
3331                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3332                let store =
3333                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3334
3335                if let Some(oid) = object_id {
3336                    // For the second pass, the object should get tombstoned.
3337                    expect_tombstoned(&store, oid).await;
3338                } else if let Some(object) = needs_trim(&store).await {
3339                    // Extend the file and make sure that it is correctly trimmed.
3340                    object.truncate(object_size).await.expect("truncate failed");
3341                    let mut buf = object.allocate_buffer(block_size as usize).await;
3342                    object
3343                        .read(object_size - block_size * 2, buf.as_mut())
3344                        .await
3345                        .expect("read failed");
3346                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3347
3348                    // Remount, this time with the graveyard performing an initial reap and the
3349                    // object should get trimmed.
3350                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3351                        .await
3352                        .expect("open failed");
3353                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3354                    let store = root_vol
3355                        .volume("test", StoreOptions::default())
3356                        .await
3357                        .expect("volume failed");
3358                    while needs_trim(&store).await.is_some() {
3359                        // The object has been truncated, but still has some data allocated to
3360                        // it.  The graveyard should trim the object eventually.
3361                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3362                    }
3363
3364                    // Run fsck.
3365                    fsck_with_options(fs.clone(), &options)
3366                        .await
3367                        .expect("fsck_with_options failed");
3368                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3369                        .await
3370                        .expect("fsck_volume_with_options failed");
3371                    fs.close().await.expect("close failed");
3372                }
3373
3374                // Run fsck on fs2.
3375                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3376                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3377                    .await
3378                    .expect("fsck_volume_with_options failed");
3379                fs2.close().await.expect("close failed");
3380            }
3381            .boxed()
3382        };
3383
3384        let fs = FxFilesystemBuilder::new()
3385            .post_commit_hook(post_commit)
3386            .open(device)
3387            .await
3388            .expect("open failed");
3389
3390        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3391        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3392
3393        shared_context.lock().store = Some(store.clone());
3394
3395        let root_directory =
3396            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3397
3398        let object;
3399        let mut transaction = fs
3400            .clone()
3401            .new_transaction(
3402                lock_keys![LockKey::object(
3403                    store.store_object_id(),
3404                    store.root_directory_object_id()
3405                )],
3406                Options::default(),
3407            )
3408            .await
3409            .expect("new_transaction failed");
3410        object = root_directory
3411            .create_child_file(&mut transaction, "foo")
3412            .await
3413            .expect("create_object failed");
3414        transaction.commit().await.expect("commit failed");
3415
3416        let mut transaction = fs
3417            .clone()
3418            .new_transaction(
3419                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3420                Options::default(),
3421            )
3422            .await
3423            .expect("new_transaction failed");
3424
3425        // Two passes: first with a regular object, and then with that object moved into the
3426        // graveyard.
3427        let mut pass = 0;
3428        loop {
3429            // Create enough extents in it such that when we truncate the object it will require
3430            // more than one transaction.
3431            let mut buf = object.allocate_buffer(5).await;
3432            buf.as_mut_slice().fill(1);
3433            // Write every other block.
3434            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3435                object
3436                    .txn_write(&mut transaction, offset, buf.as_ref())
3437                    .await
3438                    .expect("write failed");
3439            }
3440            transaction.commit().await.expect("commit failed");
3441            // This should take up more than one transaction.
3442            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3443
3444            if pass == 1 {
3445                break;
3446            }
3447
3448            // Store the object ID so that we can make sure the object is always tombstoned
3449            // after remount (see above).
3450            shared_context.lock().object_id = Some(object.object_id());
3451
3452            transaction = fs
3453                .clone()
3454                .new_transaction(
3455                    lock_keys![
3456                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3457                        LockKey::object(store.store_object_id(), object.object_id()),
3458                    ],
3459                    Options::default(),
3460                )
3461                .await
3462                .expect("new_transaction failed");
3463
3464            // Move the object into the graveyard.
3465            replace_child(&mut transaction, None, (&root_directory, "foo"))
3466                .await
3467                .expect("replace_child failed");
3468            store.add_to_graveyard(&mut transaction, object.object_id());
3469
3470            pass += 1;
3471        }
3472
3473        fs.close().await.expect("Close failed");
3474    }
3475
3476    #[fuchsia::test]
3477    async fn test_adjust_refs() {
3478        let (fs, object) = test_filesystem_and_object().await;
3479        let store = object.owner();
3480        let mut transaction = fs
3481            .clone()
3482            .new_transaction(
3483                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3484                Options::default(),
3485            )
3486            .await
3487            .expect("new_transaction failed");
3488        assert_eq!(
3489            store
3490                .adjust_refs(&mut transaction, object.object_id(), 1)
3491                .await
3492                .expect("adjust_refs failed"),
3493            false
3494        );
3495        transaction.commit().await.expect("commit failed");
3496
3497        let allocator = fs.allocator();
3498        let allocated_before = allocator.get_allocated_bytes();
3499        let mut transaction = fs
3500            .clone()
3501            .new_transaction(
3502                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3503                Options::default(),
3504            )
3505            .await
3506            .expect("new_transaction failed");
3507        assert_eq!(
3508            store
3509                .adjust_refs(&mut transaction, object.object_id(), -2)
3510                .await
3511                .expect("adjust_refs failed"),
3512            true
3513        );
3514        transaction.commit().await.expect("commit failed");
3515
3516        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3517
3518        store
3519            .tombstone_object(
3520                object.object_id(),
3521                Options { borrow_metadata_space: true, ..Default::default() },
3522            )
3523            .await
3524            .expect("purge failed");
3525
3526        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3527
3528        // We need to remove the directory entry, too, otherwise fsck will complain
3529        {
3530            let mut transaction = fs
3531                .clone()
3532                .new_transaction(
3533                    lock_keys![LockKey::object(
3534                        store.store_object_id(),
3535                        store.root_directory_object_id()
3536                    )],
3537                    Options::default(),
3538                )
3539                .await
3540                .expect("new_transaction failed");
3541            let root_directory = Directory::open(&store, store.root_directory_object_id())
3542                .await
3543                .expect("open failed");
3544            transaction.add(
3545                store.store_object_id(),
3546                Mutation::replace_or_insert_object(
3547                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3548                    ObjectValue::None,
3549                ),
3550            );
3551            transaction.commit().await.expect("commit failed");
3552        }
3553
3554        fsck_with_options(
3555            fs.clone(),
3556            &FsckOptions {
3557                fail_on_warning: true,
3558                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3559                ..Default::default()
3560            },
3561        )
3562        .await
3563        .expect("fsck_with_options failed");
3564
3565        fs.close().await.expect("Close failed");
3566    }
3567
3568    #[fuchsia::test]
3569    async fn test_locks() {
3570        let (fs, object) = test_filesystem_and_object().await;
3571        let (send1, recv1) = channel();
3572        let (send2, recv2) = channel();
3573        let (send3, recv3) = channel();
3574        let done = Mutex::new(false);
3575        let mut futures = FuturesUnordered::new();
3576        futures.push(
3577            async {
3578                let mut t = object.new_transaction().await.expect("new_transaction failed");
3579                send1.send(()).unwrap(); // Tell the next future to continue.
3580                send3.send(()).unwrap(); // Tell the last future to continue.
3581                recv2.await.unwrap();
3582                let mut buf = object.allocate_buffer(5).await;
3583                buf.as_mut_slice().copy_from_slice(b"hello");
3584                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3585                // This is a halting problem so all we can do is sleep.
3586                fasync::Timer::new(Duration::from_millis(100)).await;
3587                assert!(!*done.lock());
3588                t.commit().await.expect("commit failed");
3589            }
3590            .boxed(),
3591        );
3592        futures.push(
3593            async {
3594                recv1.await.unwrap();
3595                // Reads should not block.
3596                let offset = TEST_DATA_OFFSET as usize;
3597                let align = offset % fs.block_size() as usize;
3598                let len = TEST_DATA.len();
3599                let mut buf = object.allocate_buffer(align + len).await;
3600                assert_eq!(
3601                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3602                    align + TEST_DATA.len()
3603                );
3604                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3605                // Tell the first future to continue.
3606                send2.send(()).unwrap();
3607            }
3608            .boxed(),
3609        );
3610        futures.push(
3611            async {
3612                // This should block until the first future has completed.
3613                recv3.await.unwrap();
3614                let _t = object.new_transaction().await.expect("new_transaction failed");
3615                let mut buf = object.allocate_buffer(5).await;
3616                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3617                assert_eq!(buf.as_slice(), b"hello");
3618            }
3619            .boxed(),
3620        );
3621        while let Some(()) = futures.next().await {}
3622        fs.close().await.expect("Close failed");
3623    }
3624
3625    #[fuchsia::test(threads = 10)]
3626    async fn test_racy_reads() {
3627        let fs = test_filesystem().await;
3628        let object;
3629        let mut transaction = fs
3630            .clone()
3631            .new_transaction(lock_keys![], Options::default())
3632            .await
3633            .expect("new_transaction failed");
3634        let store = fs.root_store();
3635        object = Arc::new(
3636            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3637                .await
3638                .expect("create_object failed"),
3639        );
3640        transaction.commit().await.expect("commit failed");
3641        for _ in 0..100 {
3642            let cloned_object = object.clone();
3643            let writer = fasync::Task::spawn(async move {
3644                let mut buf = cloned_object.allocate_buffer(10).await;
3645                buf.as_mut_slice().fill(123);
3646                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3647            });
3648            let cloned_object = object.clone();
3649            let reader = fasync::Task::spawn(async move {
3650                let wait_time = rand::random_range(0..5);
3651                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3652                let mut buf = cloned_object.allocate_buffer(10).await;
3653                buf.as_mut_slice().fill(23);
3654                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3655                // If we succeed in reading data, it must include the write; i.e. if we see the size
3656                // change, we should see the data too.  For this to succeed it requires locking on
3657                // the read size to ensure that when we read the size, we get the extents changed in
3658                // that same transaction.
3659                if amount != 0 {
3660                    assert_eq!(amount, 10);
3661                    assert_eq!(buf.as_slice(), &[123; 10]);
3662                }
3663            });
3664            writer.await;
3665            reader.await;
3666            object.truncate(0).await.expect("truncate failed");
3667        }
3668        fs.close().await.expect("Close failed");
3669    }
3670
3671    #[fuchsia::test]
3672    async fn test_allocated_size() {
3673        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3674
3675        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3676        let mut buf = object.allocate_buffer(5).await;
3677        buf.as_mut_slice().copy_from_slice(b"hello");
3678        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3679        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3680        assert_eq!(after, before + fs.block_size() as u64);
3681
3682        // Do the same write again and there should be no change.
3683        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3684        assert_eq!(
3685            object.get_properties().await.expect("get_properties failed").allocated_size,
3686            after
3687        );
3688
3689        // extend...
3690        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3691        let offset = 1000 * fs.block_size() as u64;
3692        let before = after;
3693        object
3694            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3695            .await
3696            .expect("extend failed");
3697        transaction.commit().await.expect("commit failed");
3698        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3699        assert_eq!(after, before + fs.block_size() as u64);
3700
3701        // truncate...
3702        let before = after;
3703        let size = object.get_size();
3704        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3705        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3706        assert_eq!(after, before - fs.block_size() as u64);
3707
3708        // preallocate_range...
3709        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3710        let before = after;
3711        let mut file_range = offset..offset + fs.block_size() as u64;
3712        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3713        transaction.commit().await.expect("commit failed");
3714        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3715        assert_eq!(after, before + fs.block_size() as u64);
3716        fs.close().await.expect("Close failed");
3717    }
3718
3719    #[fuchsia::test(threads = 10)]
3720    async fn test_zero() {
3721        let (fs, object) = test_filesystem_and_object().await;
3722        let expected_size = object.get_size();
3723        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3724        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3725        transaction.commit().await.expect("commit failed");
3726        assert_eq!(object.get_size(), expected_size);
3727        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3728        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3729        assert_eq!(
3730            &buf.as_slice()[0..expected_size as usize],
3731            vec![0u8; expected_size as usize].as_slice()
3732        );
3733        fs.close().await.expect("Close failed");
3734    }
3735
3736    #[fuchsia::test]
3737    async fn test_properties() {
3738        let (fs, object) = test_filesystem_and_object().await;
3739        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3740        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3741        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3742
3743        // ObjectProperties can be updated through `update_attributes`.
3744        // `get_properties` should reflect the latest changes.
3745        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3746        object
3747            .update_attributes(
3748                &mut transaction,
3749                Some(&fio::MutableNodeAttributes {
3750                    creation_time: Some(CRTIME.as_nanos()),
3751                    modification_time: Some(MTIME.as_nanos()),
3752                    mode: Some(111),
3753                    gid: Some(222),
3754                    ..Default::default()
3755                }),
3756                None,
3757            )
3758            .await
3759            .expect("update_attributes failed");
3760        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3761        object
3762            .update_attributes(
3763                &mut transaction,
3764                Some(&fio::MutableNodeAttributes {
3765                    modification_time: Some(MTIME_NEW.as_nanos()),
3766                    gid: Some(333),
3767                    rdev: Some(444),
3768                    ..Default::default()
3769                }),
3770                Some(CTIME),
3771            )
3772            .await
3773            .expect("update_timestamps failed");
3774        transaction.commit().await.expect("commit failed");
3775
3776        let properties = object.get_properties().await.expect("get_properties failed");
3777        assert_matches!(
3778            properties,
3779            ObjectProperties {
3780                refs: 1u64,
3781                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3782                data_attribute_size: TEST_OBJECT_SIZE,
3783                creation_time: CRTIME,
3784                modification_time: MTIME_NEW,
3785                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3786                change_time: CTIME,
3787                ..
3788            }
3789        );
3790        fs.close().await.expect("Close failed");
3791    }
3792
3793    #[fuchsia::test]
3794    async fn test_is_allocated() {
3795        let (fs, object) = test_filesystem_and_object().await;
3796
3797        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3798        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3799        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3800        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3801
3802        // Check for the case where where we have the following extent layout
3803        //       [ unallocated ][ `TEST_DATA` ]
3804        // The extents before `aligned_offset` should not be allocated
3805        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3806        assert_eq!(count, aligned_offset);
3807        assert_eq!(allocated, false);
3808
3809        let (allocated, count) =
3810            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3811        assert_eq!(count, aligned_length);
3812        assert_eq!(allocated, true);
3813
3814        // Check for the case where where we query out of range
3815        let end = aligned_offset + aligned_length;
3816        object
3817            .is_allocated(end)
3818            .await
3819            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3820
3821        // Check for the case where where we start querying for allocation starting from
3822        // an allocated range to the end of the device
3823        let size = 50 * fs.block_size() as u64;
3824        object.truncate(size).await.expect("extend failed");
3825
3826        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3827        assert_eq!(count, size - end);
3828        assert_eq!(allocated, false);
3829
3830        // Check for the case where where we have the following extent layout
3831        //      [ unallocated ][ `buf` ][ `buf` ]
3832        let buf_length = 5 * fs.block_size();
3833        let mut buf = object.allocate_buffer(buf_length as usize).await;
3834        buf.as_mut_slice().fill(123);
3835        let new_offset = end + 20 * fs.block_size() as u64;
3836        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3837        object
3838            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3839            .await
3840            .expect("write failed");
3841
3842        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3843        assert_eq!(count, new_offset - end);
3844        assert_eq!(allocated, false);
3845
3846        let (allocated, count) =
3847            object.is_allocated(new_offset).await.expect("is_allocated failed");
3848        assert_eq!(count, 2 * buf_length);
3849        assert_eq!(allocated, true);
3850
3851        // Check the case where we query from the middle of an extent
3852        let (allocated, count) = object
3853            .is_allocated(new_offset + 4 * fs.block_size())
3854            .await
3855            .expect("is_allocated failed");
3856        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3857        assert_eq!(allocated, true);
3858
3859        // Now, write buffer to a location already written to.
3860        // Check for the case when we the following extent layout
3861        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3862        let other_buf_length = 3 * fs.block_size();
3863        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3864        other_buf.as_mut_slice().fill(231);
3865        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3866
3867        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3868        // allocated from `new_offset`
3869        let (allocated, count) =
3870            object.is_allocated(new_offset).await.expect("is_allocated failed");
3871        assert_eq!(count, 2 * buf_length);
3872        assert_eq!(allocated, true);
3873
3874        // Check for the case when we the following extent layout
3875        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3876        // Mark TEST_DATA as deleted
3877        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3878        object
3879            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3880            .await
3881            .expect("zero failed");
3882        // Mark `other_buf` as deleted
3883        object
3884            .zero(&mut transaction, new_offset..new_offset + buf_length)
3885            .await
3886            .expect("zero failed");
3887        transaction.commit().await.expect("commit transaction failed");
3888
3889        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3890        assert_eq!(count, new_offset + buf_length);
3891        assert_eq!(allocated, false);
3892
3893        let (allocated, count) =
3894            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3895        assert_eq!(count, buf_length);
3896        assert_eq!(allocated, true);
3897
3898        let new_end = new_offset + buf_length + count;
3899
3900        // Check for the case where there are objects with different keys.
3901        // Case that we're checking for:
3902        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3903        let store = object.owner();
3904        let mut transaction = fs
3905            .clone()
3906            .new_transaction(lock_keys![], Options::default())
3907            .await
3908            .expect("new_transaction failed");
3909        let object2 =
3910            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3911                .await
3912                .expect("create_object failed");
3913        transaction.commit().await.expect("commit failed");
3914
3915        object2
3916            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3917            .await
3918            .expect("write failed");
3919
3920        // Expecting that the extent with a different key is treated like unallocated extent
3921        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3922        assert_eq!(count, size - new_end);
3923        assert_eq!(allocated, false);
3924
3925        fs.close().await.expect("close failed");
3926    }
3927
3928    #[fuchsia::test(threads = 10)]
3929    async fn test_read_write_attr() {
3930        let (_fs, object) = test_filesystem_and_object().await;
3931        let data = [0xffu8; 16_384];
3932        object.write_attr(20, &data).await.expect("write_attr failed");
3933        let rdata =
3934            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3935        assert_eq!(&data[..], &rdata[..]);
3936
3937        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3938    }
3939
3940    #[fuchsia::test(threads = 10)]
3941    async fn test_allocate_basic() {
3942        let (fs, object) = test_filesystem_and_empty_object().await;
3943        let block_size = fs.block_size();
3944        let file_size = block_size * 10;
3945        object.truncate(file_size).await.unwrap();
3946
3947        let small_buf_size = 1024;
3948        let large_buf_aligned_size = block_size as usize * 2;
3949        let large_buf_size = block_size as usize * 2 + 1024;
3950
3951        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3952        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3953        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3954
3955        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3956        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3957        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3958        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3959        assert_eq!(
3960            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3961            large_buf_aligned_size
3962        );
3963        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3964
3965        // Allocation succeeds, and without any writes to the location it shows up as zero.
3966        object.allocate(block_size..block_size * 3).await.unwrap();
3967
3968        // Test starting before, inside, and after the allocated section with every sized buffer.
3969        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3970            for offset in 0..4 {
3971                assert_eq!(
3972                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3973                    buf.len(),
3974                    "buf_index: {}, read offset: {}",
3975                    buf_index,
3976                    offset,
3977                );
3978                assert_eq!(
3979                    buf.as_slice(),
3980                    &vec![0; buf.len()],
3981                    "buf_index: {}, read offset: {}",
3982                    buf_index,
3983                    offset,
3984                );
3985            }
3986        }
3987
3988        fs.close().await.expect("close failed");
3989    }
3990
3991    #[fuchsia::test(threads = 10)]
3992    async fn test_allocate_extends_file() {
3993        const BUF_SIZE: usize = 1024;
3994        let (fs, object) = test_filesystem_and_empty_object().await;
3995        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3996        let block_size = fs.block_size();
3997
3998        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3999        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4000
4001        assert!(TEST_OBJECT_SIZE < block_size * 4);
4002        // Allocation succeeds, and without any writes to the location it shows up as zero.
4003        object.allocate(0..block_size * 4).await.unwrap();
4004        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4005        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4006        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4007        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4008        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4009        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4010
4011        fs.close().await.expect("close failed");
4012    }
4013
4014    #[fuchsia::test(threads = 10)]
4015    async fn test_allocate_past_end() {
4016        const BUF_SIZE: usize = 1024;
4017        let (fs, object) = test_filesystem_and_empty_object().await;
4018        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4019        let block_size = fs.block_size();
4020
4021        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4022        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4023
4024        assert!(TEST_OBJECT_SIZE < block_size * 4);
4025        // Allocation succeeds, and without any writes to the location it shows up as zero.
4026        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4027        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4028        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4029        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4030        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4031        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4032        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4033
4034        fs.close().await.expect("close failed");
4035    }
4036
4037    #[fuchsia::test(threads = 10)]
4038    async fn test_allocate_read_attr() {
4039        let (fs, object) = test_filesystem_and_empty_object().await;
4040        let block_size = fs.block_size();
4041        let file_size = block_size * 4;
4042        object.truncate(file_size).await.unwrap();
4043
4044        let content = object
4045            .read_attr(object.attribute_id())
4046            .await
4047            .expect("failed to read attr")
4048            .expect("attr returned none");
4049        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4050
4051        object.allocate(block_size..block_size * 3).await.unwrap();
4052
4053        let content = object
4054            .read_attr(object.attribute_id())
4055            .await
4056            .expect("failed to read attr")
4057            .expect("attr returned none");
4058        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4059
4060        fs.close().await.expect("close failed");
4061    }
4062
4063    #[fuchsia::test(threads = 10)]
4064    async fn test_allocate_existing_data() {
4065        struct Case {
4066            written_ranges: Vec<Range<usize>>,
4067            allocate_range: Range<u64>,
4068        }
4069        let cases = [
4070            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4071            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4072            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4073            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4074            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4075            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4076            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4077        ];
4078
4079        for case in cases {
4080            let (fs, object) = test_filesystem_and_empty_object().await;
4081            let block_size = fs.block_size();
4082            let file_size = block_size * 10;
4083            object.truncate(file_size).await.unwrap();
4084
4085            for write in &case.written_ranges {
4086                let write_len = (write.end - write.start) * block_size as usize;
4087                let mut write_buf = object.allocate_buffer(write_len).await;
4088                write_buf.as_mut_slice().fill(0xff);
4089                assert_eq!(
4090                    object
4091                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4092                        .await
4093                        .unwrap(),
4094                    file_size
4095                );
4096            }
4097
4098            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4099            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4100
4101            object
4102                .allocate(
4103                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4104                )
4105                .await
4106                .unwrap();
4107
4108            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4109            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4110            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4111
4112            fs.close().await.expect("close failed");
4113        }
4114    }
4115
4116    async fn get_modes(
4117        obj: &DataObjectHandle<ObjectStore>,
4118        mut search_range: Range<u64>,
4119    ) -> Vec<(Range<u64>, ExtentMode)> {
4120        let mut modes = Vec::new();
4121        let store = obj.store();
4122        let tree = store.tree();
4123        let layer_set = tree.layer_set();
4124        let mut merger = layer_set.merger();
4125        let mut iter = merger
4126            .query(Query::FullRange(&ObjectKey::attribute(
4127                obj.object_id(),
4128                0,
4129                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4130            )))
4131            .await
4132            .unwrap();
4133        loop {
4134            match iter.get() {
4135                Some(ItemRef {
4136                    key:
4137                        ObjectKey {
4138                            object_id,
4139                            data:
4140                                ObjectKeyData::Attribute(
4141                                    attribute_id,
4142                                    AttributeKey::Extent(ExtentKey { range }),
4143                                ),
4144                        },
4145                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4146                    ..
4147                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4148                    if search_range.end <= range.start {
4149                        break;
4150                    }
4151                    let found_range = std::cmp::max(search_range.start, range.start)
4152                        ..std::cmp::min(search_range.end, range.end);
4153                    search_range.start = found_range.end;
4154                    modes.push((found_range, mode.clone()));
4155                    if search_range.start == search_range.end {
4156                        break;
4157                    }
4158                    iter.advance().await.unwrap();
4159                }
4160                x => panic!("looking for extent record, found this {:?}", x),
4161            }
4162        }
4163        modes
4164    }
4165
4166    async fn assert_all_overwrite(
4167        obj: &DataObjectHandle<ObjectStore>,
4168        mut search_range: Range<u64>,
4169    ) {
4170        let modes = get_modes(obj, search_range.clone()).await;
4171        for mode in modes {
4172            assert_eq!(
4173                mode.0.start, search_range.start,
4174                "missing mode in range {}..{}",
4175                search_range.start, mode.0.start
4176            );
4177            match mode.1 {
4178                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4179                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4180            }
4181            assert!(
4182                mode.0.end <= search_range.end,
4183                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4184                search_range,
4185                mode,
4186            );
4187            search_range.start = mode.0.end;
4188        }
4189        assert_eq!(
4190            search_range.start, search_range.end,
4191            "missing mode in range {:?}",
4192            search_range
4193        );
4194    }
4195
4196    #[fuchsia::test(threads = 10)]
4197    async fn test_multi_overwrite() {
4198        #[derive(Debug)]
4199        struct Case {
4200            pre_writes: Vec<Range<usize>>,
4201            allocate_ranges: Vec<Range<u64>>,
4202            overwrites: Vec<Vec<Range<u64>>>,
4203        }
4204        let cases = [
4205            Case {
4206                pre_writes: Vec::new(),
4207                allocate_ranges: vec![1..3],
4208                overwrites: vec![vec![1..3]],
4209            },
4210            Case {
4211                pre_writes: Vec::new(),
4212                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4213                overwrites: vec![vec![0..4]],
4214            },
4215            Case {
4216                pre_writes: Vec::new(),
4217                allocate_ranges: vec![0..4],
4218                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4219            },
4220            Case {
4221                pre_writes: Vec::new(),
4222                allocate_ranges: vec![0..4],
4223                overwrites: vec![vec![3..4]],
4224            },
4225            Case {
4226                pre_writes: Vec::new(),
4227                allocate_ranges: vec![0..4],
4228                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4229            },
4230            Case {
4231                pre_writes: Vec::new(),
4232                allocate_ranges: vec![1..2, 5..6, 7..8],
4233                overwrites: vec![vec![5..6]],
4234            },
4235            Case {
4236                pre_writes: Vec::new(),
4237                allocate_ranges: vec![1..3],
4238                overwrites: vec![
4239                    vec![1..3],
4240                    vec![1..3],
4241                    vec![1..3],
4242                    vec![1..3],
4243                    vec![1..3],
4244                    vec![1..3],
4245                    vec![1..3],
4246                    vec![1..3],
4247                ],
4248            },
4249            Case {
4250                pre_writes: Vec::new(),
4251                allocate_ranges: vec![0..5],
4252                overwrites: vec![
4253                    vec![1..3],
4254                    vec![1..3],
4255                    vec![1..3],
4256                    vec![1..3],
4257                    vec![1..3],
4258                    vec![1..3],
4259                    vec![1..3],
4260                    vec![1..3],
4261                ],
4262            },
4263            Case {
4264                pre_writes: Vec::new(),
4265                allocate_ranges: vec![0..5],
4266                overwrites: vec![vec![0..2, 2..4, 4..5]],
4267            },
4268            Case {
4269                pre_writes: Vec::new(),
4270                allocate_ranges: vec![0..5, 5..10],
4271                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4272            },
4273            Case {
4274                pre_writes: Vec::new(),
4275                allocate_ranges: vec![0..4, 6..10],
4276                overwrites: vec![vec![2..3, 7..9]],
4277            },
4278            Case {
4279                pre_writes: Vec::new(),
4280                allocate_ranges: vec![0..10],
4281                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4282            },
4283            Case {
4284                pre_writes: Vec::new(),
4285                allocate_ranges: vec![0..10],
4286                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4287            },
4288            Case {
4289                pre_writes: vec![1..3],
4290                allocate_ranges: vec![1..3],
4291                overwrites: vec![vec![1..3]],
4292            },
4293            Case {
4294                pre_writes: vec![1..3],
4295                allocate_ranges: vec![4..6],
4296                overwrites: vec![vec![5..6]],
4297            },
4298            Case {
4299                pre_writes: vec![1..3],
4300                allocate_ranges: vec![0..4],
4301                overwrites: vec![vec![0..4]],
4302            },
4303            Case {
4304                pre_writes: vec![1..3],
4305                allocate_ranges: vec![2..4],
4306                overwrites: vec![vec![2..4]],
4307            },
4308            Case {
4309                pre_writes: vec![3..5],
4310                allocate_ranges: vec![1..3, 6..7],
4311                overwrites: vec![vec![1..3, 6..7]],
4312            },
4313            Case {
4314                pre_writes: vec![1..3, 5..7, 8..9],
4315                allocate_ranges: vec![0..5],
4316                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4317            },
4318            Case {
4319                pre_writes: Vec::new(),
4320                allocate_ranges: vec![0..10, 4..6],
4321                overwrites: Vec::new(),
4322            },
4323            Case {
4324                pre_writes: Vec::new(),
4325                allocate_ranges: vec![3..8, 5..10],
4326                overwrites: Vec::new(),
4327            },
4328            Case {
4329                pre_writes: Vec::new(),
4330                allocate_ranges: vec![5..10, 3..8],
4331                overwrites: Vec::new(),
4332            },
4333        ];
4334
4335        for (i, case) in cases.into_iter().enumerate() {
4336            log::info!("running case {} - {:?}", i, case);
4337            let (fs, object) = test_filesystem_and_empty_object().await;
4338            let block_size = fs.block_size();
4339            let file_size = block_size * 10;
4340            object.truncate(file_size).await.unwrap();
4341
4342            for write in case.pre_writes {
4343                let write_len = (write.end - write.start) * block_size as usize;
4344                let mut write_buf = object.allocate_buffer(write_len).await;
4345                write_buf.as_mut_slice().fill(0xff);
4346                assert_eq!(
4347                    object
4348                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4349                        .await
4350                        .unwrap(),
4351                    file_size
4352                );
4353            }
4354
4355            for allocate_range in &case.allocate_ranges {
4356                object
4357                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4358                    .await
4359                    .unwrap();
4360            }
4361
4362            for allocate_range in case.allocate_ranges {
4363                assert_all_overwrite(
4364                    &object,
4365                    allocate_range.start * block_size..allocate_range.end * block_size,
4366                )
4367                .await;
4368            }
4369
4370            for overwrite in case.overwrites {
4371                let mut write_len = 0;
4372                let overwrite = overwrite
4373                    .into_iter()
4374                    .map(|r| {
4375                        write_len += (r.end - r.start) * block_size;
4376                        r.start * block_size..r.end * block_size
4377                    })
4378                    .collect::<Vec<_>>();
4379                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4380                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4381                write_buf.as_mut_slice().copy_from_slice(&data);
4382
4383                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4384                assert_eq!(
4385                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4386                    expected_buf.len()
4387                );
4388                let expected_buf_slice = expected_buf.as_mut_slice();
4389                let mut data_slice = data.as_slice();
4390                for r in &overwrite {
4391                    let len = r.length().unwrap() as usize;
4392                    let (copy_from, rest) = data_slice.split_at(len);
4393                    expected_buf_slice[r.start as usize..r.end as usize]
4394                        .copy_from_slice(&copy_from);
4395                    data_slice = rest;
4396                }
4397
4398                let mut transaction = object.new_transaction().await.unwrap();
4399                object
4400                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4401                    .await
4402                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4403                // Double check the emitted checksums. We should have one u64 checksum for every
4404                // block we wrote to disk.
4405                let mut checksummed_range_length = 0;
4406                let mut num_checksums = 0;
4407                for (device_range, checksums, _) in transaction.checksums() {
4408                    let range_len = device_range.end - device_range.start;
4409                    let checksums_len = checksums.len() as u64;
4410                    assert_eq!(range_len / checksums_len, block_size);
4411                    checksummed_range_length += range_len;
4412                    num_checksums += checksums_len;
4413                }
4414                assert_eq!(checksummed_range_length, write_len);
4415                assert_eq!(num_checksums, write_len / block_size);
4416                transaction.commit().await.unwrap();
4417
4418                let mut buf = object.allocate_buffer(file_size as usize).await;
4419                assert_eq!(
4420                    object.read(0, buf.as_mut()).await.unwrap(),
4421                    buf.len(),
4422                    "failed length check on case {}",
4423                    i,
4424                );
4425                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4426            }
4427
4428            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4429            fs.close().await.expect("close failed");
4430        }
4431    }
4432
4433    #[fuchsia::test(threads = 10)]
4434    async fn test_multi_overwrite_mode_updates() {
4435        let (fs, object) = test_filesystem_and_empty_object().await;
4436        let block_size = fs.block_size();
4437        let file_size = block_size * 10;
4438        object.truncate(file_size).await.unwrap();
4439
4440        let mut expected_bitmap = BitVec::from_elem(10, false);
4441
4442        object.allocate(0..10 * block_size).await.unwrap();
4443        assert_eq!(
4444            get_modes(&object, 0..10 * block_size).await,
4445            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4446        );
4447
4448        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4449        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4450        write_buf.as_mut_slice().copy_from_slice(&data);
4451        let mut transaction = object.new_transaction().await.unwrap();
4452        object
4453            .multi_overwrite(
4454                &mut transaction,
4455                0,
4456                &[2 * block_size..4 * block_size],
4457                write_buf.as_mut(),
4458            )
4459            .await
4460            .unwrap();
4461        transaction.commit().await.unwrap();
4462
4463        expected_bitmap.set(2, true);
4464        expected_bitmap.set(3, true);
4465        assert_eq!(
4466            get_modes(&object, 0..10 * block_size).await,
4467            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4468        );
4469
4470        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4471        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4472        write_buf.as_mut_slice().copy_from_slice(&data);
4473        let mut transaction = object.new_transaction().await.unwrap();
4474        object
4475            .multi_overwrite(
4476                &mut transaction,
4477                0,
4478                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4479                write_buf.as_mut(),
4480            )
4481            .await
4482            .unwrap();
4483        transaction.commit().await.unwrap();
4484
4485        expected_bitmap.set(4, true);
4486        expected_bitmap.set(6, true);
4487        assert_eq!(
4488            get_modes(&object, 0..10 * block_size).await,
4489            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4490        );
4491
4492        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4493        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4494        write_buf.as_mut_slice().copy_from_slice(&data);
4495        let mut transaction = object.new_transaction().await.unwrap();
4496        object
4497            .multi_overwrite(
4498                &mut transaction,
4499                0,
4500                &[
4501                    0..2 * block_size,
4502                    5 * block_size..6 * block_size,
4503                    7 * block_size..10 * block_size,
4504                ],
4505                write_buf.as_mut(),
4506            )
4507            .await
4508            .unwrap();
4509        transaction.commit().await.unwrap();
4510
4511        assert_eq!(
4512            get_modes(&object, 0..10 * block_size).await,
4513            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4514        );
4515
4516        fs.close().await.expect("close failed");
4517    }
4518}