fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: u64,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: u64,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> u64 {
194        self.attribute_id
195    }
196
197    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
198        &self.overwrite_ranges
199    }
200
201    pub fn is_verified_file(&self) -> bool {
202        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
203    }
204
205    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
206    /// If another caller has already started but not completed `enabled_verity`, returns
207    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
208    /// FxfsError::AlreadyExists.
209    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
210        let mut fsverity_guard = self.fsverity_state.lock();
211        match *fsverity_guard {
212            FsverityState::None => {
213                *fsverity_guard = FsverityState::Started;
214                Ok(())
215            }
216            FsverityState::Started | FsverityState::Pending(_) => {
217                Err(anyhow!(FxfsError::Unavailable))
218            }
219            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
220        }
221    }
222
223    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
224    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
225    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
226        let mut fsverity_guard = self.fsverity_state.lock();
227        assert!(matches!(*fsverity_guard, FsverityState::Started));
228        *fsverity_guard = FsverityState::Pending(descriptor);
229    }
230
231    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
232    /// not `FsverityState::Pending(_)`.
233    pub fn finalize_fsverity_state(&self) {
234        let mut fsverity_state_guard = self.fsverity_state.lock();
235        let mut_fsverity_state = fsverity_state_guard.deref_mut();
236        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
237        match fsverity_state {
238            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
239            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
240            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
241            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
242        }
243        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
244        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
245        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
246        // converting them back to sparse regions.
247        self.overwrite_ranges.clear();
248    }
249
250    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
251    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
252    /// verified against the root digest here, and will return an error if the tree is not correct.
253    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
254        let (metadata, hasher) = match descriptor {
255            FsverityMetadata::Internal(root_digest, salt) => {
256                let merkle_tree = self
257                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
258                    .await?
259                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
260                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
261                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
262                (metadata, hasher)
263            }
264            FsverityMetadata::F2fs(verity_range) => {
265                let expected_length = verity_range.length()? as usize;
266                let mut buffer = self
267                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
268                    .await;
269                ensure!(
270                    expected_length
271                        == self
272                            .handle
273                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
274                            .await?,
275                    FxfsError::Inconsistent
276                );
277                FsverityStateInner::from_bytes(
278                    buffer.as_slice()[0..expected_length].into(),
279                    self.block_size() as usize,
280                )?
281            }
282        };
283        // Validate the merkle tree data against the root before applying it.
284        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
285        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
286        let mut builder = MerkleTreeBuilder::new(hasher);
287        for leaf in leaf_chunks {
288            builder.push_data_hash(leaf.to_vec());
289        }
290        let tree = builder.finish();
291        let root_hash = match &metadata.root_digest {
292            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
293            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
294        };
295
296        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
297
298        let mut fsverity_guard = self.fsverity_state.lock();
299        assert!(matches!(*fsverity_guard, FsverityState::None));
300        *fsverity_guard = FsverityState::Some(metadata);
301
302        Ok(())
303    }
304
305    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
306    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
307    /// block-aligned. Fails on non fsverity-enabled files.
308    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
309        let block_size = self.block_size() as usize;
310        assert!(offset % block_size == 0);
311        let fsverity_state = self.fsverity_state.lock();
312        match &*fsverity_state {
313            FsverityState::None => {
314                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
315            }
316            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
317                "Enable verity has not yet completed, fsverity state: {:?}",
318                &*fsverity_state
319            )),
320            FsverityState::Some(metadata) => {
321                let hasher = metadata.get_hasher_for_block_size(block_size);
322                let leaf_nodes: Vec<&[u8]> =
323                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
324                fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
325                // TODO(b/318880297): Consider parallelizing computation.
326                for b in buffer.chunks(block_size) {
327                    ensure!(
328                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
329                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
330                    );
331                    offset += block_size;
332                }
333                Ok(())
334            }
335        }
336    }
337
338    /// Extend the file with the given extent.  The only use case for this right now is for files
339    /// that must exist at certain offsets on the device, such as super-blocks.
340    pub async fn extend<'a>(
341        &'a self,
342        transaction: &mut Transaction<'a>,
343        device_range: Range<u64>,
344    ) -> Result<(), Error> {
345        let old_end =
346            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
347        let new_size = old_end + device_range.end - device_range.start;
348        self.store().allocator().mark_allocated(
349            transaction,
350            self.store().store_object_id(),
351            device_range.clone(),
352        )?;
353        self.txn_update_size(transaction, new_size, None).await?;
354        let key_id = self.get_key(None).await?.0;
355        transaction.add(
356            self.store().store_object_id,
357            Mutation::merge_object(
358                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
359                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
360            ),
361        );
362        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
363    }
364
365    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
366    // the data from `buf`.
367    async fn align_buffer(
368        &self,
369        offset: u64,
370        buf: BufferRef<'_>,
371    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
372        self.handle.align_buffer(self.attribute_id(), offset, buf).await
373    }
374
375    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
376    // data will be encrypted if necessary.
377    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
378    // the buffer in-place rather than copying to another buffer if the write is already aligned.
379    async fn write_at(
380        &self,
381        offset: u64,
382        buf: MutableBufferRef<'_>,
383        device_offset: u64,
384    ) -> Result<MaybeChecksums, Error> {
385        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
386    }
387
388    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
389    pub async fn zero(
390        &self,
391        transaction: &mut Transaction<'_>,
392        range: Range<u64>,
393    ) -> Result<(), Error> {
394        self.handle.zero(transaction, self.attribute_id(), range).await
395    }
396
397    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
398    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
399    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
400    pub fn get_descriptor(&self) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
401        let fsverity_state = self.fsverity_state.lock();
402        match &*fsverity_state {
403            FsverityState::None => Ok(None),
404            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
405                "Enable verity has not yet completed, fsverity state: {:?}",
406                &*fsverity_state
407            )),
408            FsverityState::Some(metadata) => {
409                let (options, root_hash) = match &metadata.root_digest {
410                    RootDigest::Sha256(root_hash) => (
411                        fio::VerificationOptions {
412                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
413                            salt: Some(metadata.salt.clone()),
414                            ..Default::default()
415                        },
416                        root_hash.to_vec(),
417                    ),
418                    RootDigest::Sha512(root_hash) => (
419                        fio::VerificationOptions {
420                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
421                            salt: Some(metadata.salt.clone()),
422                            ..Default::default()
423                        },
424                        root_hash.clone(),
425                    ),
426                };
427                Ok(Some((options, root_hash)))
428            }
429        }
430    }
431
432    async fn build_verity_tree(
433        &self,
434        hasher: FsVerityHasher,
435        hash_alg: fio::HashAlgorithm,
436        salt: &[u8],
437    ) -> Result<(MerkleTree, Vec<u8>), Error> {
438        let hash_len = hasher.hash_size();
439        let mut builder = MerkleTreeBuilder::new(hasher);
440        let mut offset = 0;
441        let size = self.get_size();
442        // TODO(b/314836822): Consider further tuning the buffer size to optimize
443        // performance. Experimentally, most verity-enabled files are <256K.
444        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
445        while offset < size {
446            // TODO(b/314842875): Consider optimizations for sparse files.
447            let read = self.read(offset, buf.as_mut()).await? as u64;
448            assert!(offset + read <= size);
449            builder.write(&buf.as_slice()[0..read as usize]);
450            offset += read;
451        }
452        let tree = builder.finish();
453        // This will include a block for the root layer, which will be used to house the descriptor.
454        let tree_data_len = tree
455            .as_ref()
456            .iter()
457            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
458            .sum();
459        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
460        // Iterating from the top layers down to the leaves.
461        for layer in tree.as_ref().iter().rev() {
462            // Skip the root layer.
463            if layer.len() <= 1 {
464                continue;
465            }
466            merkle_tree_data.extend(layer.iter().flatten());
467            // Pad to the end of the block.
468            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
469            merkle_tree_data.resize(padded_size, 0);
470        }
471
472        // Zero the last block, then write the descriptor to the start of it.
473        let descriptor_offset = merkle_tree_data.len();
474        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
475        let descriptor = FsVerityDescriptorRaw::new(
476            hash_alg,
477            self.block_size(),
478            self.get_size(),
479            tree.root(),
480            salt,
481        )?;
482        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
483
484        Ok((tree, merkle_tree_data))
485    }
486
487    /// Reads the data attribute and computes a merkle tree from the data. The values of the
488    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
489    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
490    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
491    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
492    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
493    #[trace]
494    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
495        self.set_fsverity_state_started()?;
496        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
497        // the graveyard should process the tombstone before we start rewriting the attribute.
498        if let Some(_) = self
499            .store()
500            .tree()
501            .find(&ObjectKey::graveyard_attribute_entry(
502                self.store().graveyard_directory_object_id(),
503                self.object_id(),
504                FSVERITY_MERKLE_ATTRIBUTE_ID,
505            ))
506            .await?
507        {
508            self.store().filesystem().graveyard().flush().await;
509        }
510        let mut transaction = self.new_transaction().await?;
511        let hash_alg =
512            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
513        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
514        let (root_digest, merkle_tree) = match hash_alg {
515            fio::HashAlgorithm::Sha256 => {
516                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
517                    salt.clone(),
518                    self.block_size() as usize,
519                ));
520                let (tree, merkle_tree_data) =
521                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
522                let root: [u8; 32] = tree.root().try_into().unwrap();
523                (RootDigest::Sha256(root), merkle_tree_data)
524            }
525            fio::HashAlgorithm::Sha512 => {
526                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
527                    salt.clone(),
528                    self.block_size() as usize,
529                ));
530                let (tree, merkle_tree_data) =
531                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
532                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
533            }
534            _ => {
535                bail!(
536                    anyhow!(FxfsError::NotSupported)
537                        .context(format!("hash algorithm not supported"))
538                );
539            }
540        };
541        // TODO(b/314194485): Eventually want streaming writes.
542        // The merkle tree attribute should not require trimming because it should not
543        // exist.
544        self.handle
545            .write_new_attr_in_batches(
546                &mut transaction,
547                FSVERITY_MERKLE_ATTRIBUTE_ID,
548                &merkle_tree,
549                WRITE_ATTR_BATCH_SIZE,
550            )
551            .await?;
552        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
553            self.store().remove_attribute_from_graveyard(
554                &mut transaction,
555                self.object_id(),
556                FSVERITY_MERKLE_ATTRIBUTE_ID,
557            );
558        };
559        let descriptor_decoded =
560            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
561        let descriptor = FsverityStateInner {
562            root_digest: root_digest.clone(),
563            salt: salt.clone(),
564            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
565        };
566        self.set_fsverity_state_pending(descriptor);
567        transaction.add_with_object(
568            self.store().store_object_id(),
569            Mutation::replace_or_insert_object(
570                ObjectKey::attribute(
571                    self.object_id(),
572                    DEFAULT_DATA_ATTRIBUTE_ID,
573                    AttributeKey::Attribute,
574                ),
575                ObjectValue::verified_attribute(
576                    self.get_size(),
577                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
578                ),
579            ),
580            AssocObj::Borrowed(self),
581        );
582        transaction.commit().await?;
583        Ok(())
584    }
585
586    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
587    /// range is beyond the end of the file, the file size is updated.
588    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
589        debug_assert!(range.start < range.end);
590
591        // It's not required that callers of allocate use block aligned ranges, but we need to make
592        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
593        // what was asked for for block alignment purposes. We just need to make sure that the size
594        // of the file is still the non-block-aligned end of the range if the size was changed.
595        let mut new_range = range.clone();
596        new_range.start = round_down(new_range.start, self.block_size());
597        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
598        // required error code when the requested range is larger than the file size.
599        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
600
601        let mut transaction = self.new_transaction().await?;
602        let mut to_allocate = Vec::new();
603        let mut to_switch = Vec::new();
604        let key_id = self.get_key(None).await?.0;
605
606        {
607            let tree = &self.store().tree;
608            let layer_set = tree.layer_set();
609            let offset_key = ObjectKey::attribute(
610                self.object_id(),
611                self.attribute_id(),
612                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
613            );
614            let mut merger = layer_set.merger();
615            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
616
617            loop {
618                match iter.get() {
619                    Some(ItemRef {
620                        key:
621                            ObjectKey {
622                                object_id,
623                                data:
624                                    ObjectKeyData::Attribute(
625                                        attribute_id,
626                                        AttributeKey::Extent(extent_key),
627                                    ),
628                            },
629                        value: ObjectValue::Extent(extent_value),
630                        ..
631                    }) if *object_id == self.object_id()
632                        && *attribute_id == self.attribute_id() =>
633                    {
634                        // If the start of this extent is beyond the end of the range we are
635                        // allocating, we don't have any more work to do.
636                        if new_range.end <= extent_key.range.start {
637                            break;
638                        }
639                        // Add any prefix we might need to allocate.
640                        if new_range.start < extent_key.range.start {
641                            to_allocate.push(new_range.start..extent_key.range.start);
642                            new_range.start = extent_key.range.start;
643                        }
644                        let device_offset = match extent_value {
645                            ExtentValue::None => {
646                                // If the extent value is None, it indicates a deleted extent. In
647                                // that case, we just skip it entirely. By keeping the new_range
648                                // where it is, this section will get included in the new
649                                // allocations.
650                                iter.advance().await?;
651                                continue;
652                            }
653                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
654                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
655                                // If this extent is already in overwrite mode, we can skip it.
656                                if extent_key.range.end < new_range.end {
657                                    new_range.start = extent_key.range.end;
658                                    iter.advance().await?;
659                                    continue;
660                                } else {
661                                    new_range.start = new_range.end;
662                                    break;
663                                }
664                            }
665                            ExtentValue::Some { device_offset, .. } => *device_offset,
666                        };
667
668                        // Figure out how we have to break up the ranges.
669                        let device_offset =
670                            device_offset + (new_range.start - extent_key.range.start);
671                        if extent_key.range.end < new_range.end {
672                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
673                            new_range.start = extent_key.range.end;
674                        } else {
675                            to_switch.push((new_range.start..new_range.end, device_offset));
676                            new_range.start = new_range.end;
677                            break;
678                        }
679                    }
680                    // The records are sorted so if we find something that isn't an extent or
681                    // doesn't match the object id then there are no more extent records for this
682                    // object.
683                    _ => break,
684                }
685                iter.advance().await?;
686            }
687        }
688
689        if new_range.start < new_range.end {
690            to_allocate.push(new_range.clone());
691        }
692
693        // We can update the size in the first transaction because even if subsequent transactions
694        // don't get replayed, the data between the current and new end of the file will be zero
695        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
696        // in the first transaction, overwrite extents may be written past the end of the file
697        // which is an fsck error.
698        //
699        // The potential new size needs to be the non-block-aligned range end - we round up to the
700        // nearest block size for the actual allocation, but shouldn't do that for the file size.
701        let new_size = std::cmp::max(range.end, self.get_size());
702        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
703        // first transaction, in case we split transactions. This makes it okay to only replay the
704        // first transaction if power loss occurs - the file will be in an unusual state, but not
705        // an invalid one, if only part of the allocate goes through.
706        transaction.add_with_object(
707            self.store().store_object_id(),
708            Mutation::replace_or_insert_object(
709                ObjectKey::attribute(
710                    self.object_id(),
711                    self.attribute_id(),
712                    AttributeKey::Attribute,
713                ),
714                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
715            ),
716            AssocObj::Borrowed(self),
717        );
718
719        // The maximum number of mutations we are going to allow per transaction in allocate. This
720        // is probably quite a bit lower than the actual limit, but it should be large enough to
721        // handle most non-edge-case versions of allocate without splitting the transaction.
722        const MAX_TRANSACTION_SIZE: usize = 256;
723        for (switch_range, device_offset) in to_switch {
724            transaction.add_with_object(
725                self.store().store_object_id(),
726                Mutation::merge_object(
727                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
728                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
729                        device_offset,
730                        key_id,
731                    )),
732                ),
733                AssocObj::Borrowed(self),
734            );
735            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
736                transaction.commit_and_continue().await?;
737            }
738        }
739
740        let mut allocated = 0;
741        let allocator = self.store().allocator();
742        for mut allocate_range in to_allocate {
743            while allocate_range.start < allocate_range.end {
744                let device_range = allocator
745                    .allocate(
746                        &mut transaction,
747                        self.store().store_object_id(),
748                        allocate_range.end - allocate_range.start,
749                    )
750                    .await
751                    .context("allocation failed")?;
752                let device_range_len = device_range.end - device_range.start;
753
754                transaction.add_with_object(
755                    self.store().store_object_id(),
756                    Mutation::merge_object(
757                        ObjectKey::extent(
758                            self.object_id(),
759                            self.attribute_id(),
760                            allocate_range.start..allocate_range.start + device_range_len,
761                        ),
762                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
763                            device_range.start,
764                            (device_range_len / self.block_size()) as usize,
765                            key_id,
766                        )),
767                    ),
768                    AssocObj::Borrowed(self),
769                );
770
771                allocate_range.start += device_range_len;
772                allocated += device_range_len;
773
774                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
775                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
776                    transaction.commit_and_continue().await?;
777                    allocated = 0;
778                }
779            }
780        }
781
782        self.update_allocated_size(&mut transaction, allocated, 0).await?;
783        transaction.commit().await?;
784
785        Ok(())
786    }
787
788    /// Return information on a contiguous set of extents that has the same allocation status,
789    /// starting from `start_offset`. The information returned is if this set of extents are marked
790    /// allocated/not allocated and also the size of this set (in bytes). This is used when
791    /// querying slices for volumes.
792    /// This function expects `start_offset` to be aligned to block size
793    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
794        let block_size = self.block_size();
795        assert_eq!(start_offset % block_size, 0);
796
797        if start_offset > self.get_size() {
798            bail!(FxfsError::OutOfRange)
799        }
800
801        if start_offset == self.get_size() {
802            return Ok((false, 0));
803        }
804
805        let tree = &self.store().tree;
806        let layer_set = tree.layer_set();
807        let offset_key = ObjectKey::attribute(
808            self.object_id(),
809            self.attribute_id(),
810            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
811        );
812        let mut merger = layer_set.merger();
813        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
814
815        let mut allocated = None;
816        let mut end = start_offset;
817
818        loop {
819            // Iterate through the extents, each time setting `end` as the end of the previous
820            // extent
821            match iter.get() {
822                Some(ItemRef {
823                    key:
824                        ObjectKey {
825                            object_id,
826                            data:
827                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
828                        },
829                    value: ObjectValue::Extent(extent_value),
830                    ..
831                }) => {
832                    // Equivalent of getting no extents back
833                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
834                        if allocated == Some(false) || allocated.is_none() {
835                            end = self.get_size();
836                            allocated = Some(false);
837                        }
838                        break;
839                    }
840                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
841                    if extent_key.range.start > end {
842                        // If a previous extent has already been visited and we are tracking an
843                        // allocated set, we are only interested in an extent where the range of the
844                        // current extent follows immediately after the previous one.
845                        if allocated == Some(true) {
846                            break;
847                        } else {
848                            // The gap between the previous `end` and this extent is not allocated
849                            end = extent_key.range.start;
850                            allocated = Some(false);
851                            // Continue this iteration, except now the `end` is set to the end of
852                            // the "previous" extent which is this gap between the start_offset
853                            // and the current extent
854                        }
855                    }
856
857                    // We can assume that from here, the `end` points to the end of a previous
858                    // extent.
859                    match extent_value {
860                        // The current extent has been allocated
861                        ExtentValue::Some { .. } => {
862                            // Stop searching if previous extent was marked deleted
863                            if allocated == Some(false) {
864                                break;
865                            }
866                            allocated = Some(true);
867                        }
868                        // This extent has been marked deleted
869                        ExtentValue::None => {
870                            // Stop searching if previous extent was marked allocated
871                            if allocated == Some(true) {
872                                break;
873                            }
874                            allocated = Some(false);
875                        }
876                    }
877                    end = extent_key.range.end;
878                }
879                // This occurs when there are no extents left
880                None => {
881                    if allocated == Some(false) || allocated.is_none() {
882                        end = self.get_size();
883                        allocated = Some(false);
884                    }
885                    // Otherwise, we were monitoring extents that were allocated, so just exit.
886                    break;
887                }
888                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
889                Some(_) => {}
890            }
891            iter.advance().await?;
892        }
893
894        Ok((allocated.unwrap(), end - start_offset))
895    }
896
897    pub async fn txn_write<'a>(
898        &'a self,
899        transaction: &mut Transaction<'a>,
900        offset: u64,
901        buf: BufferRef<'_>,
902    ) -> Result<(), Error> {
903        if buf.is_empty() {
904            return Ok(());
905        }
906        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
907        self.multi_write(
908            transaction,
909            self.attribute_id(),
910            std::slice::from_ref(&aligned),
911            transfer_buf.as_mut(),
912        )
913        .await?;
914        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
915            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
916        }
917        Ok(())
918    }
919
920    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
921    // if encryption takes place.  The ranges must all be aligned and no change to content size is
922    // applied; the caller is responsible for updating size if required.
923    pub async fn multi_write<'a>(
924        &'a self,
925        transaction: &mut Transaction<'a>,
926        attribute_id: u64,
927        ranges: &[Range<u64>],
928        buf: MutableBufferRef<'_>,
929    ) -> Result<(), Error> {
930        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
931    }
932
933    // `buf` is mutable as an optimization, since the write may require encryption, we can
934    // encrypt the buffer in-place rather than copying to another buffer if the write is
935    // already aligned.
936    //
937    // Note: in the event of power failure during an overwrite() call, it is possible that
938    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
939    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
940    pub async fn overwrite(
941        &self,
942        mut offset: u64,
943        mut buf: MutableBufferRef<'_>,
944        options: OverwriteOptions,
945    ) -> Result<(), Error> {
946        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
947        let end = offset + buf.len() as u64;
948
949        let key_id = self.get_key(None).await?.0;
950
951        // The transaction only ends up being used if allow_allocations is true
952        let mut transaction =
953            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
954
955        // We build up a list of writes to perform later
956        let writes = FuturesUnordered::new();
957
958        if options.barrier_on_first_write {
959            self.store().device.barrier();
960        }
961
962        // We create a new scope here, so that the merger iterator will get dropped before we try to
963        // commit our transaction. Otherwise the transaction commit would block.
964        {
965            let store = self.store();
966            let store_object_id = store.store_object_id;
967            let allocator = store.allocator();
968            let tree = &store.tree;
969            let layer_set = tree.layer_set();
970            let mut merger = layer_set.merger();
971            let mut iter = merger
972                .query(Query::FullRange(&ObjectKey::attribute(
973                    self.object_id(),
974                    self.attribute_id(),
975                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
976                )))
977                .await?;
978            let block_size = self.block_size();
979
980            loop {
981                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
982                    Some(ItemRef {
983                        key:
984                            ObjectKey {
985                                object_id,
986                                data:
987                                    ObjectKeyData::Attribute(
988                                        attribute_id,
989                                        AttributeKey::Extent(ExtentKey { range }),
990                                    ),
991                            },
992                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
993                        ..
994                    }) if *object_id == self.object_id()
995                        && *attribute_id == self.attribute_id()
996                        && range.end == offset =>
997                    {
998                        iter.advance().await?;
999                        continue;
1000                    }
1001                    Some(ItemRef {
1002                        key:
1003                            ObjectKey {
1004                                object_id,
1005                                data:
1006                                    ObjectKeyData::Attribute(
1007                                        attribute_id,
1008                                        AttributeKey::Extent(ExtentKey { range }),
1009                                    ),
1010                            },
1011                        value,
1012                        ..
1013                    }) if *object_id == self.object_id()
1014                        && *attribute_id == self.attribute_id()
1015                        && range.start <= offset =>
1016                    {
1017                        match value {
1018                            ObjectValue::Extent(ExtentValue::Some {
1019                                device_offset,
1020                                mode: ExtentMode::Raw,
1021                                ..
1022                            }) => {
1023                                ensure!(
1024                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1025                                    FxfsError::Inconsistent
1026                                );
1027                                let offset_within_extent = offset - range.start;
1028                                let remaining_length_of_extent = (range
1029                                    .end
1030                                    .checked_sub(offset)
1031                                    .ok_or(FxfsError::Inconsistent)?)
1032                                    as usize;
1033                                // Yields (device_offset, bytes_to_write, should_advance)
1034                                (
1035                                    device_offset + offset_within_extent,
1036                                    min(buf.len(), remaining_length_of_extent),
1037                                    true,
1038                                )
1039                            }
1040                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1041                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1042                                // a new extent without checksums?
1043                                bail!(
1044                                    "extent from ({},{}) which overlaps offset \
1045                                        {} has the wrong extent mode",
1046                                    range.start,
1047                                    range.end,
1048                                    offset
1049                                )
1050                            }
1051                            _ => {
1052                                bail!(
1053                                    "overwrite failed: extent overlapping offset {} has \
1054                                      unexpected ObjectValue",
1055                                    offset
1056                                )
1057                            }
1058                        }
1059                    }
1060                    maybe_item_ref => {
1061                        if let Some(transaction) = transaction.as_mut() {
1062                            assert_eq!(options.allow_allocations, true);
1063                            assert_eq!(offset % self.block_size(), 0);
1064
1065                            // We are going to make a new extent, but let's check if there is an
1066                            // extent after us. If there is an extent after us, then we don't want
1067                            // our new extent to bump into it...
1068                            let mut bytes_to_allocate =
1069                                round_up(buf.len() as u64, self.block_size())
1070                                    .ok_or(FxfsError::TooBig)?;
1071                            if let Some(ItemRef {
1072                                key:
1073                                    ObjectKey {
1074                                        object_id,
1075                                        data:
1076                                            ObjectKeyData::Attribute(
1077                                                attribute_id,
1078                                                AttributeKey::Extent(ExtentKey { range }),
1079                                            ),
1080                                    },
1081                                ..
1082                            }) = maybe_item_ref
1083                            {
1084                                if *object_id == self.object_id()
1085                                    && *attribute_id == self.attribute_id()
1086                                    && offset < range.start
1087                                {
1088                                    let bytes_until_next_extent = range.start - offset;
1089                                    bytes_to_allocate =
1090                                        min(bytes_to_allocate, bytes_until_next_extent);
1091                                }
1092                            }
1093
1094                            let device_range = allocator
1095                                .allocate(transaction, store_object_id, bytes_to_allocate)
1096                                .await?;
1097                            let device_range_len = device_range.end - device_range.start;
1098                            transaction.add(
1099                                store_object_id,
1100                                Mutation::insert_object(
1101                                    ObjectKey::extent(
1102                                        self.object_id(),
1103                                        self.attribute_id(),
1104                                        offset..offset + device_range_len,
1105                                    ),
1106                                    ObjectValue::Extent(ExtentValue::new_raw(
1107                                        device_range.start,
1108                                        key_id,
1109                                    )),
1110                                ),
1111                            );
1112
1113                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1114
1115                            // Yields (device_offset, bytes_to_write, should_advance)
1116                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1117                        } else {
1118                            bail!(
1119                                "no extent overlapping offset {}, \
1120                                and new allocations are not allowed",
1121                                offset
1122                            )
1123                        }
1124                    }
1125                };
1126                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1127                writes.push(self.write_at(offset, current_buf, device_offset));
1128                if remaining_buf.len() == 0 {
1129                    break;
1130                } else {
1131                    buf = remaining_buf;
1132                    offset += bytes_to_write as u64;
1133                    if should_advance {
1134                        iter.advance().await?;
1135                    }
1136                }
1137            }
1138        }
1139
1140        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1141        // The checksums are being ignored here, but we don't need to know them
1142        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1143
1144        if let Some(mut transaction) = transaction {
1145            assert_eq!(options.allow_allocations, true);
1146            if !transaction.is_empty() {
1147                if end > self.get_size() {
1148                    self.grow(&mut transaction, self.get_size(), end).await?;
1149                }
1150                transaction.commit().await?;
1151            }
1152        }
1153
1154        Ok(())
1155    }
1156
1157    // Within a transaction, the size of the object might have changed, so get the size from there
1158    // if it exists, otherwise, fall back on the cached size.
1159    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1160        transaction
1161            .get_object_mutation(
1162                self.store().store_object_id,
1163                ObjectKey::attribute(
1164                    self.object_id(),
1165                    self.attribute_id(),
1166                    AttributeKey::Attribute,
1167                ),
1168            )
1169            .and_then(|m| {
1170                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1171                    Some(size)
1172                } else {
1173                    None
1174                }
1175            })
1176            .unwrap_or_else(|| self.get_size())
1177    }
1178
1179    pub async fn txn_update_size<'a>(
1180        &'a self,
1181        transaction: &mut Transaction<'a>,
1182        new_size: u64,
1183        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1184        // Some it is set to the value, if None it is left unchanged.
1185        update_has_overwrite_extents: Option<bool>,
1186    ) -> Result<(), Error> {
1187        let key =
1188            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1189        let mut mutation = if let Some(mutation) =
1190            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1191        {
1192            mutation.clone()
1193        } else {
1194            ObjectStoreMutation {
1195                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1196                op: Operation::ReplaceOrInsert,
1197            }
1198        };
1199        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1200            *size = new_size;
1201            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1202                *has_overwrite_extents = update_has_overwrite_extents;
1203            }
1204        } else {
1205            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1206        }
1207        transaction.add_with_object(
1208            self.store().store_object_id(),
1209            Mutation::ObjectStore(mutation),
1210            AssocObj::Borrowed(self),
1211        );
1212        Ok(())
1213    }
1214
1215    async fn update_allocated_size(
1216        &self,
1217        transaction: &mut Transaction<'_>,
1218        allocated: u64,
1219        deallocated: u64,
1220    ) -> Result<(), Error> {
1221        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1222    }
1223
1224    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1225        if self
1226            .overwrite_ranges
1227            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1228        {
1229            // This returns true if there were ranges, but this truncate removed them all, which
1230            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1231            Ok(Some(false))
1232        } else {
1233            Ok(None)
1234        }
1235    }
1236
1237    pub async fn shrink<'a>(
1238        &'a self,
1239        transaction: &mut Transaction<'a>,
1240        size: u64,
1241        update_has_overwrite_extents: Option<bool>,
1242    ) -> Result<NeedsTrim, Error> {
1243        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1244        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1245        Ok(needs_trim)
1246    }
1247
1248    pub async fn grow<'a>(
1249        &'a self,
1250        transaction: &mut Transaction<'a>,
1251        old_size: u64,
1252        size: u64,
1253    ) -> Result<(), Error> {
1254        // Before growing the file, we must make sure that a previous trim has completed.
1255        let store = self.store();
1256        while matches!(
1257            store
1258                .trim_some(
1259                    transaction,
1260                    self.object_id(),
1261                    self.attribute_id(),
1262                    TrimMode::FromOffset(old_size)
1263                )
1264                .await?,
1265            TrimResult::Incomplete
1266        ) {
1267            transaction.commit_and_continue().await?;
1268        }
1269        // We might need to zero out the tail of the old last block.
1270        let block_size = self.block_size();
1271        if old_size % block_size != 0 {
1272            let layer_set = store.tree.layer_set();
1273            let mut merger = layer_set.merger();
1274            let aligned_old_size = round_down(old_size, block_size);
1275            let iter = merger
1276                .query(Query::FullRange(&ObjectKey::extent(
1277                    self.object_id(),
1278                    self.attribute_id(),
1279                    aligned_old_size..aligned_old_size + 1,
1280                )))
1281                .await?;
1282            if let Some(ItemRef {
1283                key:
1284                    ObjectKey {
1285                        object_id,
1286                        data:
1287                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1288                    },
1289                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1290                ..
1291            }) = iter.get()
1292            {
1293                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1294                    let device_offset = device_offset
1295                        .checked_add(aligned_old_size - extent_key.range.start)
1296                        .ok_or(FxfsError::Inconsistent)?;
1297                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1298                    let mut buf = self.allocate_buffer(block_size as usize).await;
1299                    // In the case that this extent is in OverwritePartial mode, there is a
1300                    // possibility that the last block is allocated, but not initialized yet, in
1301                    // which case we don't actually need to bother zeroing out the tail. However,
1302                    // it's not strictly incorrect to change uninitialized data, so we skip the
1303                    // check and blindly do it to keep it simpler here.
1304                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1305                        .await?;
1306                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1307                    self.multi_write(
1308                        transaction,
1309                        *attribute_id,
1310                        &[aligned_old_size..aligned_old_size + block_size],
1311                        buf.as_mut(),
1312                    )
1313                    .await?;
1314                }
1315            }
1316        }
1317        self.txn_update_size(transaction, size, None).await?;
1318        Ok(())
1319    }
1320
1321    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1322    /// Returns a set of device ranges (i.e. potentially multiple extents).
1323    ///
1324    /// It may not be possible to preallocate the entire requested range in one request
1325    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1326    /// we can up to some (arbitrary, internal) limit on transaction size.
1327    ///
1328    /// `file_range.start` is modified to point at the end of the logical range
1329    /// that was preallocated such that repeated calls to `preallocate_range` with new
1330    /// transactions can be used to preallocate ranges of any size.
1331    ///
1332    /// Requested range must be a multiple of block size.
1333    pub async fn preallocate_range<'a>(
1334        &'a self,
1335        transaction: &mut Transaction<'a>,
1336        file_range: &mut Range<u64>,
1337    ) -> Result<Vec<Range<u64>>, Error> {
1338        let block_size = self.block_size();
1339        assert!(file_range.is_aligned(block_size));
1340        assert!(!self.handle.is_encrypted());
1341        let mut ranges = Vec::new();
1342        let tree = &self.store().tree;
1343        let layer_set = tree.layer_set();
1344        let mut merger = layer_set.merger();
1345        let mut iter = merger
1346            .query(Query::FullRange(&ObjectKey::attribute(
1347                self.object_id(),
1348                self.attribute_id(),
1349                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1350            )))
1351            .await?;
1352        let mut allocated = 0;
1353        let key_id = self.get_key(None).await?.0;
1354        'outer: while file_range.start < file_range.end {
1355            let allocate_end = loop {
1356                match iter.get() {
1357                    // Case for allocated extents for the same object that overlap with file_range.
1358                    Some(ItemRef {
1359                        key:
1360                            ObjectKey {
1361                                object_id,
1362                                data:
1363                                    ObjectKeyData::Attribute(
1364                                        attribute_id,
1365                                        AttributeKey::Extent(ExtentKey { range }),
1366                                    ),
1367                            },
1368                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1369                        ..
1370                    }) if *object_id == self.object_id()
1371                        && *attribute_id == self.attribute_id()
1372                        && range.start < file_range.end =>
1373                    {
1374                        ensure!(
1375                            range.is_valid()
1376                                && range.is_aligned(block_size)
1377                                && device_offset % block_size == 0,
1378                            FxfsError::Inconsistent
1379                        );
1380                        // If the start of the requested file_range overlaps with an existing extent...
1381                        if range.start <= file_range.start {
1382                            // Record the existing extent and move on.
1383                            let device_range = device_offset
1384                                .checked_add(file_range.start - range.start)
1385                                .ok_or(FxfsError::Inconsistent)?
1386                                ..device_offset
1387                                    .checked_add(min(range.end, file_range.end) - range.start)
1388                                    .ok_or(FxfsError::Inconsistent)?;
1389                            file_range.start += device_range.end - device_range.start;
1390                            ranges.push(device_range);
1391                            if file_range.start >= file_range.end {
1392                                break 'outer;
1393                            }
1394                            iter.advance().await?;
1395                            continue;
1396                        } else {
1397                            // There's nothing allocated between file_range.start and the beginning
1398                            // of this extent.
1399                            break range.start;
1400                        }
1401                    }
1402                    // Case for deleted extents eclipsed by file_range.
1403                    Some(ItemRef {
1404                        key:
1405                            ObjectKey {
1406                                object_id,
1407                                data:
1408                                    ObjectKeyData::Attribute(
1409                                        attribute_id,
1410                                        AttributeKey::Extent(ExtentKey { range }),
1411                                    ),
1412                            },
1413                        value: ObjectValue::Extent(ExtentValue::None),
1414                        ..
1415                    }) if *object_id == self.object_id()
1416                        && *attribute_id == self.attribute_id()
1417                        && range.end < file_range.end =>
1418                    {
1419                        iter.advance().await?;
1420                    }
1421                    _ => {
1422                        // We can just preallocate the rest.
1423                        break file_range.end;
1424                    }
1425                }
1426            };
1427            let device_range = self
1428                .store()
1429                .allocator()
1430                .allocate(
1431                    transaction,
1432                    self.store().store_object_id(),
1433                    allocate_end - file_range.start,
1434                )
1435                .await
1436                .context("Allocation failed")?;
1437            allocated += device_range.end - device_range.start;
1438            let this_file_range =
1439                file_range.start..file_range.start + device_range.end - device_range.start;
1440            file_range.start = this_file_range.end;
1441            transaction.add(
1442                self.store().store_object_id,
1443                Mutation::merge_object(
1444                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1445                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1446                ),
1447            );
1448            ranges.push(device_range);
1449            // If we didn't allocate all that we requested, we'll loop around and try again.
1450            // ... unless we have filled the transaction. The caller should check file_range.
1451            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1452                break;
1453            }
1454        }
1455        // Update the file size if it changed.
1456        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1457            self.txn_update_size(transaction, file_range.start, None).await?;
1458        }
1459        self.update_allocated_size(transaction, allocated, 0).await?;
1460        Ok(ranges)
1461    }
1462
1463    pub async fn update_attributes<'a>(
1464        &self,
1465        transaction: &mut Transaction<'a>,
1466        node_attributes: Option<&fio::MutableNodeAttributes>,
1467        change_time: Option<Timestamp>,
1468    ) -> Result<(), Error> {
1469        // This codepath is only called by files, whose wrapping key id users cannot directly set
1470        // as per fscrypt.
1471        ensure!(
1472            !matches!(
1473                node_attributes,
1474                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1475            ),
1476            FxfsError::BadPath
1477        );
1478        self.handle.update_attributes(transaction, node_attributes, change_time).await
1479    }
1480
1481    /// Get the default set of transaction options for this object. This is mostly the overall
1482    /// default, modified by any [`HandleOptions`] held by this handle.
1483    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1484        self.handle.default_transaction_options()
1485    }
1486
1487    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1488        self.new_transaction_with_options(self.default_transaction_options()).await
1489    }
1490
1491    pub async fn new_transaction_with_options<'b>(
1492        &self,
1493        options: Options<'b>,
1494    ) -> Result<Transaction<'b>, Error> {
1495        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1496    }
1497
1498    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1499    pub async fn flush_device(&self) -> Result<(), Error> {
1500        self.handle.flush_device().await
1501    }
1502
1503    /// Reads an entire attribute.
1504    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1505        self.handle.read_attr(attribute_id).await
1506    }
1507
1508    /// Writes an entire attribute.  This *always* uses the volume data key.
1509    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1510        // Must be different attribute otherwise cached size gets out of date.
1511        assert_ne!(attribute_id, self.attribute_id());
1512        let store = self.store();
1513        let mut transaction = self.new_transaction().await?;
1514        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1515            transaction.commit_and_continue().await?;
1516            while matches!(
1517                store
1518                    .trim_some(
1519                        &mut transaction,
1520                        self.object_id(),
1521                        attribute_id,
1522                        TrimMode::FromOffset(data.len() as u64),
1523                    )
1524                    .await?,
1525                TrimResult::Incomplete
1526            ) {
1527                transaction.commit_and_continue().await?;
1528            }
1529        }
1530        transaction.commit().await?;
1531        Ok(())
1532    }
1533
1534    async fn read_and_decrypt(
1535        &self,
1536        device_offset: u64,
1537        file_offset: u64,
1538        buffer: MutableBufferRef<'_>,
1539        key_id: u64,
1540    ) -> Result<(), Error> {
1541        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1542    }
1543
1544    /// Truncates a file to a given size (growing/shrinking as required).
1545    ///
1546    /// Nb: Most code will want to call truncate() instead. This method is used
1547    /// to update the super block -- a case where we must borrow metadata space.
1548    pub async fn truncate_with_options(
1549        &self,
1550        options: Options<'_>,
1551        size: u64,
1552    ) -> Result<(), Error> {
1553        let mut transaction = self.new_transaction_with_options(options).await?;
1554        let old_size = self.get_size();
1555        if size == old_size {
1556            return Ok(());
1557        }
1558        if size < old_size {
1559            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1560            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1561                // The file needs to be trimmed.
1562                transaction.commit_and_continue().await?;
1563                let store = self.store();
1564                while matches!(
1565                    store
1566                        .trim_some(
1567                            &mut transaction,
1568                            self.object_id(),
1569                            self.attribute_id(),
1570                            TrimMode::FromOffset(size)
1571                        )
1572                        .await?,
1573                    TrimResult::Incomplete
1574                ) {
1575                    if let Err(error) = transaction.commit_and_continue().await {
1576                        warn!(error:?; "Failed to trim after truncate");
1577                        return Ok(());
1578                    }
1579                }
1580                if let Err(error) = transaction.commit().await {
1581                    warn!(error:?; "Failed to trim after truncate");
1582                }
1583                return Ok(());
1584            }
1585        } else {
1586            self.grow(&mut transaction, old_size, size).await?;
1587        }
1588        transaction.commit().await?;
1589        Ok(())
1590    }
1591
1592    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1593        // We don't take a read guard here since the object properties are contained in a single
1594        // object, which cannot be inconsistent with itself. The LSM tree does not return
1595        // intermediate states for a single object.
1596        let item = self
1597            .store()
1598            .tree
1599            .find(&ObjectKey::object(self.object_id()))
1600            .await?
1601            .expect("Unable to find object record");
1602        match item.value {
1603            ObjectValue::Object {
1604                kind: ObjectKind::File { refs, .. },
1605                attributes:
1606                    ObjectAttributes {
1607                        creation_time,
1608                        modification_time,
1609                        posix_attributes,
1610                        allocated_size,
1611                        access_time,
1612                        change_time,
1613                        ..
1614                    },
1615            } => Ok(ObjectProperties {
1616                refs,
1617                allocated_size,
1618                data_attribute_size: self.get_size(),
1619                creation_time,
1620                modification_time,
1621                access_time,
1622                change_time,
1623                sub_dirs: 0,
1624                posix_attributes,
1625                casefold: false,
1626                wrapping_key_id: None,
1627            }),
1628            _ => bail!(FxfsError::NotFile),
1629        }
1630    }
1631
1632    // Returns the contents of this object. This object must be < |limit| bytes in size.
1633    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1634        let size = self.get_size();
1635        if size > limit as u64 {
1636            bail!("Object too big ({} > {})", size, limit);
1637        }
1638        let mut buf = self.allocate_buffer(size as usize).await;
1639        self.read(0u64, buf.as_mut()).await?;
1640        Ok(buf.as_slice().into())
1641    }
1642
1643    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1644    /// their logical offset within the file.
1645    ///
1646    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1647    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1648        let mut extents = Vec::new();
1649        let tree = &self.store().tree;
1650        let layer_set = tree.layer_set();
1651        let mut merger = layer_set.merger();
1652        let mut iter = merger
1653            .query(Query::FullRange(&ObjectKey::attribute(
1654                self.object_id(),
1655                self.attribute_id(),
1656                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1657            )))
1658            .await?;
1659        loop {
1660            match iter.get() {
1661                Some(ItemRef {
1662                    key:
1663                        ObjectKey {
1664                            object_id,
1665                            data:
1666                                ObjectKeyData::Attribute(
1667                                    attribute_id,
1668                                    AttributeKey::Extent(ExtentKey { range }),
1669                                ),
1670                        },
1671                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1672                    ..
1673                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1674                    let logical_offset = range.start;
1675                    let device_range = *device_offset..*device_offset + range.length()?;
1676                    extents.push(FileExtent::new(logical_offset, device_range)?);
1677                }
1678                _ => break,
1679            }
1680            iter.advance().await?;
1681        }
1682        Ok(extents)
1683    }
1684}
1685
1686impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1687    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1688        match mutation {
1689            Mutation::ObjectStore(ObjectStoreMutation {
1690                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1691                ..
1692            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1693            Mutation::ObjectStore(ObjectStoreMutation {
1694                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1695                ..
1696            }) => {
1697                debug_assert_eq!(
1698                    self.get_size(),
1699                    *size,
1700                    "size should be set when verity is enabled and must not change"
1701                );
1702                self.finalize_fsverity_state()
1703            }
1704            Mutation::ObjectStore(ObjectStoreMutation {
1705                item:
1706                    ObjectItem {
1707                        key:
1708                            ObjectKey {
1709                                object_id,
1710                                data:
1711                                    ObjectKeyData::Attribute(
1712                                        attr_id,
1713                                        AttributeKey::Extent(ExtentKey { range }),
1714                                    ),
1715                            },
1716                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1717                        ..
1718                    },
1719                ..
1720            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1721                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1722                    self.overwrite_ranges.apply_range(range.clone())
1723                }
1724                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1725            },
1726            _ => {}
1727        }
1728    }
1729}
1730
1731impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1732    fn set_trace(&self, v: bool) {
1733        self.handle.set_trace(v)
1734    }
1735
1736    fn object_id(&self) -> u64 {
1737        self.handle.object_id()
1738    }
1739
1740    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1741        self.handle.allocate_buffer(size)
1742    }
1743
1744    fn block_size(&self) -> u64 {
1745        self.handle.block_size()
1746    }
1747}
1748
1749#[async_trait]
1750impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1751    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1752        let fs = self.store().filesystem();
1753        let guard = fs
1754            .lock_manager()
1755            .read_lock(lock_keys![LockKey::object_attribute(
1756                self.store().store_object_id,
1757                self.object_id(),
1758                self.attribute_id(),
1759            )])
1760            .await;
1761
1762        let size = self.get_size();
1763        if offset >= size {
1764            return Ok(0);
1765        }
1766        let length = min(buf.len() as u64, size - offset) as usize;
1767        buf = buf.subslice_mut(0..length);
1768        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1769        if self.is_verified_file() {
1770            self.verify_data(offset as usize, buf.as_slice())?;
1771        }
1772        Ok(length)
1773    }
1774
1775    fn get_size(&self) -> u64 {
1776        self.content_size.load(atomic::Ordering::Relaxed)
1777    }
1778}
1779
1780impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1781    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1782        let offset = offset.unwrap_or_else(|| self.get_size());
1783        let mut transaction = self.new_transaction().await?;
1784        self.txn_write(&mut transaction, offset, buf).await?;
1785        let new_size = self.txn_get_size(&transaction);
1786        transaction.commit().await?;
1787        Ok(new_size)
1788    }
1789
1790    async fn truncate(&self, size: u64) -> Result<(), Error> {
1791        self.truncate_with_options(self.default_transaction_options(), size).await
1792    }
1793
1794    async fn flush(&self) -> Result<(), Error> {
1795        Ok(())
1796    }
1797}
1798
1799/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1800/// write go directly to the handle in a transaction.
1801pub struct DirectWriter<'a, S: HandleOwner> {
1802    handle: &'a DataObjectHandle<S>,
1803    options: transaction::Options<'a>,
1804    buffer: Buffer<'a>,
1805    offset: u64,
1806    buf_offset: usize,
1807}
1808
1809const BUFFER_SIZE: usize = 1_048_576;
1810
1811impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1812    fn drop(&mut self) {
1813        if self.buf_offset != 0 {
1814            warn!("DirectWriter: dropping data, did you forget to call complete?");
1815        }
1816    }
1817}
1818
1819impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1820    pub async fn new(
1821        handle: &'a DataObjectHandle<S>,
1822        options: transaction::Options<'a>,
1823    ) -> DirectWriter<'a, S> {
1824        Self {
1825            handle,
1826            options,
1827            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1828            offset: 0,
1829            buf_offset: 0,
1830        }
1831    }
1832
1833    async fn flush(&mut self) -> Result<(), Error> {
1834        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1835        self.handle
1836            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1837            .await?;
1838        transaction.commit().await?;
1839        self.offset += self.buf_offset as u64;
1840        self.buf_offset = 0;
1841        Ok(())
1842    }
1843}
1844
1845impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1846    fn block_size(&self) -> u64 {
1847        self.handle.block_size()
1848    }
1849
1850    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1851        while buf.len() > 0 {
1852            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1853            self.buffer
1854                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1855                .as_mut_slice()
1856                .copy_from_slice(&buf[..to_do]);
1857            self.buf_offset += to_do;
1858            if self.buf_offset == BUFFER_SIZE {
1859                self.flush().await?;
1860            }
1861            buf = &buf[to_do..];
1862        }
1863        Ok(())
1864    }
1865
1866    async fn complete(&mut self) -> Result<(), Error> {
1867        self.flush().await?;
1868        Ok(())
1869    }
1870
1871    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1872        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1873            self.buffer
1874                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1875                .as_mut_slice()
1876                .fill(0);
1877            self.buf_offset += amount as usize;
1878        } else {
1879            self.flush().await?;
1880            self.offset += amount;
1881        }
1882        Ok(())
1883    }
1884}
1885
1886#[cfg(test)]
1887mod tests {
1888    use crate::errors::FxfsError;
1889    use crate::filesystem::{
1890        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1891    };
1892    use crate::fsck::{
1893        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1894    };
1895    use crate::lsm_tree::Query;
1896    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1897    use crate::object_handle::{
1898        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1899    };
1900    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1901    use crate::object_store::directory::replace_child;
1902    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1903    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1904    use crate::object_store::volume::root_volume;
1905    use crate::object_store::{
1906        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, Directory, ExtentKey,
1907        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1908        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1909        TRANSACTION_MUTATION_THRESHOLD,
1910    };
1911    use crate::range::RangeExt;
1912    use crate::round::{round_down, round_up};
1913    use assert_matches::assert_matches;
1914    use bit_vec::BitVec;
1915    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1916    use fuchsia_sync::Mutex;
1917    use futures::FutureExt;
1918    use futures::channel::oneshot::channel;
1919    use futures::stream::{FuturesUnordered, StreamExt};
1920    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1921    use fxfs_insecure_crypto::new_insecure_crypt;
1922    use std::ops::Range;
1923    use std::sync::Arc;
1924    use std::time::Duration;
1925    use storage_device::DeviceHolder;
1926    use storage_device::fake_device::FakeDevice;
1927    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1928
1929    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1930
1931    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1932    // device block.
1933    const TEST_DATA_OFFSET: u64 = 5000;
1934    const TEST_DATA: &[u8] = b"hello";
1935    const TEST_OBJECT_SIZE: u64 = 5678;
1936    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1937    const TEST_OBJECT_NAME: &str = "foo";
1938
1939    async fn test_filesystem() -> OpenFxFilesystem {
1940        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1941        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1942    }
1943
1944    async fn test_filesystem_and_object_with_key(
1945        crypt: Option<&dyn Crypt>,
1946        write_object_test_data: bool,
1947    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1948        let fs = test_filesystem().await;
1949        let store = fs.root_store();
1950        let object;
1951
1952        let mut transaction = fs
1953            .clone()
1954            .new_transaction(
1955                lock_keys![LockKey::object(
1956                    store.store_object_id(),
1957                    store.root_directory_object_id()
1958                )],
1959                Options::default(),
1960            )
1961            .await
1962            .expect("new_transaction failed");
1963
1964        object = if let Some(crypt) = crypt {
1965            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1966            let (key, unwrapped_key) =
1967                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
1968            ObjectStore::create_object_with_key(
1969                &store,
1970                &mut transaction,
1971                object_id,
1972                HandleOptions::default(),
1973                EncryptionKey::Fxfs(key),
1974                unwrapped_key,
1975            )
1976            .await
1977            .expect("create_object failed")
1978        } else {
1979            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1980                .await
1981                .expect("create_object failed")
1982        };
1983
1984        let root_directory =
1985            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1986        root_directory
1987            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1988            .await
1989            .expect("add_child_file failed");
1990
1991        if write_object_test_data {
1992            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1993            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1994            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1995            object
1996                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1997                .await
1998                .expect("write failed");
1999        }
2000        transaction.commit().await.expect("commit failed");
2001        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2002        (fs, object)
2003    }
2004
2005    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2006        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2007    }
2008
2009    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2010    {
2011        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2012    }
2013
2014    #[fuchsia::test]
2015    async fn test_zero_buf_len_read() {
2016        let (fs, object) = test_filesystem_and_object().await;
2017        let mut buf = object.allocate_buffer(0).await;
2018        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2019        fs.close().await.expect("Close failed");
2020    }
2021
2022    #[fuchsia::test]
2023    async fn test_beyond_eof_read() {
2024        let (fs, object) = test_filesystem_and_object().await;
2025        let offset = TEST_OBJECT_SIZE as usize - 2;
2026        let align = offset % fs.block_size() as usize;
2027        let len: usize = 2;
2028        let mut buf = object.allocate_buffer(align + len + 1).await;
2029        buf.as_mut_slice().fill(123u8);
2030        assert_eq!(
2031            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2032            align + len
2033        );
2034        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2035        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2036        fs.close().await.expect("Close failed");
2037    }
2038
2039    #[fuchsia::test]
2040    async fn test_beyond_eof_read_from() {
2041        let (fs, object) = test_filesystem_and_object().await;
2042        let handle = &*object;
2043        let offset = TEST_OBJECT_SIZE as usize - 2;
2044        let align = offset % fs.block_size() as usize;
2045        let len: usize = 2;
2046        let mut buf = object.allocate_buffer(align + len + 1).await;
2047        buf.as_mut_slice().fill(123u8);
2048        assert_eq!(
2049            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2050            align + len
2051        );
2052        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2053        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2054        fs.close().await.expect("Close failed");
2055    }
2056
2057    #[fuchsia::test]
2058    async fn test_beyond_eof_read_unchecked() {
2059        let (fs, object) = test_filesystem_and_object().await;
2060        let offset = TEST_OBJECT_SIZE as usize - 2;
2061        let align = offset % fs.block_size() as usize;
2062        let len: usize = 2;
2063        let mut buf = object.allocate_buffer(align + len + 1).await;
2064        buf.as_mut_slice().fill(123u8);
2065        let guard = fs
2066            .lock_manager()
2067            .read_lock(lock_keys![LockKey::object_attribute(
2068                object.store().store_object_id,
2069                object.object_id(),
2070                0,
2071            )])
2072            .await;
2073        object
2074            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2075            .await
2076            .expect("read failed");
2077        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2078        fs.close().await.expect("Close failed");
2079    }
2080
2081    #[fuchsia::test]
2082    async fn test_read_sparse() {
2083        let (fs, object) = test_filesystem_and_object().await;
2084        // Deliberately read not right to eof.
2085        let len = TEST_OBJECT_SIZE as usize - 1;
2086        let mut buf = object.allocate_buffer(len).await;
2087        buf.as_mut_slice().fill(123u8);
2088        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2089        let mut expected = vec![0; len];
2090        let offset = TEST_DATA_OFFSET as usize;
2091        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2092        assert_eq!(buf.as_slice()[..len], expected[..]);
2093        fs.close().await.expect("Close failed");
2094    }
2095
2096    #[fuchsia::test]
2097    async fn test_read_after_writes_interspersed_with_flush() {
2098        let (fs, object) = test_filesystem_and_object().await;
2099
2100        object.owner().flush().await.expect("flush failed");
2101
2102        // Write more test data to the first block fo the file.
2103        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2104        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2105        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2106
2107        let len = TEST_OBJECT_SIZE as usize - 1;
2108        let mut buf = object.allocate_buffer(len).await;
2109        buf.as_mut_slice().fill(123u8);
2110        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2111
2112        let mut expected = vec![0u8; len];
2113        let offset = TEST_DATA_OFFSET as usize;
2114        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2115        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2116        assert_eq!(buf.as_slice(), &expected);
2117        fs.close().await.expect("Close failed");
2118    }
2119
2120    #[fuchsia::test]
2121    async fn test_read_after_truncate_and_extend() {
2122        let (fs, object) = test_filesystem_and_object().await;
2123
2124        // Arrange for there to be <extent><deleted-extent><extent>.
2125        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2126        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2127        // This adds an extent at 0..512.
2128        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2129        // This deletes 512..1024.
2130        object.truncate(3).await.expect("truncate failed");
2131        let data = b"foo";
2132        let offset = 1500u64;
2133        let align = (offset % fs.block_size() as u64) as usize;
2134        let mut buf = object.allocate_buffer(align + data.len()).await;
2135        buf.as_mut_slice()[align..].copy_from_slice(data);
2136        // This adds 1024..1536.
2137        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2138
2139        const LEN1: usize = 1503;
2140        let mut buf = object.allocate_buffer(LEN1).await;
2141        buf.as_mut_slice().fill(123u8);
2142        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2143        let mut expected = [0; LEN1];
2144        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2145        expected[1500..].copy_from_slice(b"foo");
2146        assert_eq!(buf.as_slice(), &expected);
2147
2148        // Also test a read that ends midway through the deleted extent.
2149        const LEN2: usize = 601;
2150        let mut buf = object.allocate_buffer(LEN2).await;
2151        buf.as_mut_slice().fill(123u8);
2152        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2153        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2154        fs.close().await.expect("Close failed");
2155    }
2156
2157    #[fuchsia::test]
2158    async fn test_read_whole_blocks_with_multiple_objects() {
2159        let (fs, object) = test_filesystem_and_object().await;
2160        let block_size = object.block_size() as usize;
2161        let mut buffer = object.allocate_buffer(block_size).await;
2162        buffer.as_mut_slice().fill(0xaf);
2163        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2164
2165        let store = object.owner();
2166        let mut transaction = fs
2167            .clone()
2168            .new_transaction(lock_keys![], Options::default())
2169            .await
2170            .expect("new_transaction failed");
2171        let object2 =
2172            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2173                .await
2174                .expect("create_object failed");
2175        transaction.commit().await.expect("commit failed");
2176        let mut ef_buffer = object.allocate_buffer(block_size).await;
2177        ef_buffer.as_mut_slice().fill(0xef);
2178        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2179
2180        let mut buffer = object.allocate_buffer(block_size).await;
2181        buffer.as_mut_slice().fill(0xaf);
2182        object
2183            .write_or_append(Some(block_size as u64), buffer.as_ref())
2184            .await
2185            .expect("write failed");
2186        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2187        object2
2188            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2189            .await
2190            .expect("write failed");
2191
2192        let mut buffer = object.allocate_buffer(4 * block_size).await;
2193        buffer.as_mut_slice().fill(123);
2194        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2195        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2196        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2197        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2198        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2199        fs.close().await.expect("Close failed");
2200    }
2201
2202    #[fuchsia::test]
2203    async fn test_alignment() {
2204        let (fs, object) = test_filesystem_and_object().await;
2205
2206        struct AlignTest {
2207            fill: u8,
2208            object: DataObjectHandle<ObjectStore>,
2209            mirror: Vec<u8>,
2210        }
2211
2212        impl AlignTest {
2213            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2214                let mirror = {
2215                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2216                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2217                    buf.as_slice().to_vec()
2218                };
2219                Self { fill: 0, object, mirror }
2220            }
2221
2222            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2223            // operation to an in-memory copy of the object.
2224            // Each subsequent call bumps the value of fill.
2225            // It is expected that the object and its mirror maintain identical content.
2226            async fn test(&mut self, range: Range<u64>) {
2227                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2228                self.fill += 1;
2229                buf.as_mut_slice().fill(self.fill);
2230                self.object
2231                    .write_or_append(Some(range.start), buf.as_ref())
2232                    .await
2233                    .expect("write_or_append failed");
2234                if range.end > self.mirror.len() as u64 {
2235                    self.mirror.resize(range.end as usize, 0);
2236                }
2237                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2238                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2239                assert_eq!(
2240                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2241                    self.mirror.len()
2242                );
2243                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2244            }
2245        }
2246
2247        let block_size = object.block_size() as u64;
2248        let mut align = AlignTest::new(object).await;
2249
2250        // Fill the object to start with (with 1).
2251        align.test(0..2 * block_size + 1).await;
2252
2253        // Unaligned head (fills with 2, overwrites that with 3).
2254        align.test(1..block_size).await;
2255        align.test(1..2 * block_size).await;
2256
2257        // Unaligned tail (fills with 4 and 5).
2258        align.test(0..block_size - 1).await;
2259        align.test(0..2 * block_size - 1).await;
2260
2261        // Both unaligned (fills with 6 and 7).
2262        align.test(1..block_size - 1).await;
2263        align.test(1..2 * block_size - 1).await;
2264
2265        fs.close().await.expect("Close failed");
2266    }
2267
2268    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2269        let allocator = fs.allocator();
2270        let allocated_before = allocator.get_allocated_bytes();
2271        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2272        object
2273            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2274            .await
2275            .expect("preallocate_range failed");
2276        transaction.commit().await.expect("commit failed");
2277        assert!(object.get_size() < 1048576);
2278        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2279        object
2280            .preallocate_range(&mut transaction, &mut (0..1048576))
2281            .await
2282            .expect("preallocate_range failed");
2283        transaction.commit().await.expect("commit failed");
2284        assert_eq!(object.get_size(), 1048576);
2285        // Check that it didn't reallocate the space for the existing extent
2286        let allocated_after = allocator.get_allocated_bytes();
2287        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2288
2289        let mut buf = object
2290            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2291            .await;
2292        buf.as_mut_slice().fill(47);
2293        object
2294            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2295            .await
2296            .expect("write failed");
2297        buf.as_mut_slice().fill(95);
2298        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2299        object
2300            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2301            .await
2302            .expect("write failed");
2303
2304        // Make sure there were no more allocations.
2305        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2306
2307        // Read back the data and make sure it is what we expect.
2308        let mut buf = object.allocate_buffer(104876).await;
2309        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2310        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2311        assert_eq!(
2312            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2313            TEST_DATA
2314        );
2315        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2316    }
2317
2318    #[fuchsia::test]
2319    async fn test_preallocate_range() {
2320        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2321        test_preallocate_common(&fs, object).await;
2322        fs.close().await.expect("Close failed");
2323    }
2324
2325    // This is identical to the previous test except that we flush so that extents end up in
2326    // different layers.
2327    #[fuchsia::test]
2328    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2329        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2330        object.owner().flush().await.expect("flush failed");
2331        test_preallocate_common(&fs, object).await;
2332        fs.close().await.expect("Close failed");
2333    }
2334
2335    #[fuchsia::test]
2336    async fn test_already_preallocated() {
2337        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2338        let allocator = fs.allocator();
2339        let allocated_before = allocator.get_allocated_bytes();
2340        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2341        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2342        object
2343            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2344            .await
2345            .expect("preallocate_range failed");
2346        transaction.commit().await.expect("commit failed");
2347        // Check that it didn't reallocate any new space.
2348        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2349        fs.close().await.expect("Close failed");
2350    }
2351
2352    #[fuchsia::test]
2353    async fn test_overwrite_when_preallocated_at_start_of_file() {
2354        // The standard test data we put in the test object would cause an extent with checksums
2355        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2356        let (fs, object) = test_filesystem_and_empty_object().await;
2357
2358        let object = ObjectStore::open_object(
2359            object.owner(),
2360            object.object_id(),
2361            HandleOptions::default(),
2362            None,
2363        )
2364        .await
2365        .expect("open_object failed");
2366
2367        assert_eq!(fs.block_size(), 4096);
2368
2369        let mut write_buf = object.allocate_buffer(4096).await;
2370        write_buf.as_mut_slice().fill(95);
2371
2372        // First try to overwrite without allowing allocations
2373        // We expect this to fail, since nothing is allocated yet
2374        object
2375            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2376            .await
2377            .expect_err("overwrite succeeded");
2378
2379        // Now preallocate some space (exactly one block)
2380        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2381        object
2382            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2383            .await
2384            .expect("preallocate_range failed");
2385        transaction.commit().await.expect("commit failed");
2386
2387        // Now try the same overwrite command as before, it should work this time,
2388        // even with allocations disabled...
2389        {
2390            let mut read_buf = object.allocate_buffer(4096).await;
2391            object.read(0, read_buf.as_mut()).await.expect("read failed");
2392            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2393        }
2394        object
2395            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2396            .await
2397            .expect("overwrite failed");
2398        {
2399            let mut read_buf = object.allocate_buffer(4096).await;
2400            object.read(0, read_buf.as_mut()).await.expect("read failed");
2401            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2402        }
2403
2404        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2405        // one block earlier at offset 0
2406        object
2407            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2408            .await
2409            .expect_err("overwrite succeeded");
2410
2411        // We can't assert anything about the existing bytes, because they haven't been allocated
2412        // yet and they could contain any values
2413        object
2414            .overwrite(
2415                4096,
2416                write_buf.as_mut(),
2417                OverwriteOptions { allow_allocations: true, ..Default::default() },
2418            )
2419            .await
2420            .expect("overwrite failed");
2421        {
2422            let mut read_buf = object.allocate_buffer(4096).await;
2423            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2424            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2425        }
2426
2427        // Check that the overwrites haven't messed up the filesystem state
2428        let fsck_options = FsckOptions {
2429            fail_on_warning: true,
2430            no_lock: true,
2431            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2432            ..Default::default()
2433        };
2434        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2435
2436        fs.close().await.expect("Close failed");
2437    }
2438
2439    #[fuchsia::test]
2440    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2441        // The standard test data we put in the test object would cause an extent with checksums
2442        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2443        let (fs, object) = test_filesystem_and_empty_object().await;
2444
2445        let object = ObjectStore::open_object(
2446            object.owner(),
2447            object.object_id(),
2448            HandleOptions::default(),
2449            None,
2450        )
2451        .await
2452        .expect("open_object failed");
2453
2454        assert_eq!(fs.block_size(), 4096);
2455        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2456
2457        // Let's create some non-holes
2458        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2459        object
2460            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2461            .await
2462            .expect("preallocate_range failed");
2463        object
2464            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2465            .await
2466            .expect("preallocate_range failed");
2467        object
2468            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2469            .await
2470            .expect("preallocate_range failed");
2471        object
2472            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2473            .await
2474            .expect("preallocate_range failed");
2475        transaction.commit().await.expect("commit failed");
2476
2477        assert_eq!(object.get_size(), 524288);
2478
2479        let mut write_buf = object.allocate_buffer(4096).await;
2480        write_buf.as_mut_slice().fill(95);
2481
2482        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2483        object
2484            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2485            .await
2486            .expect_err("overwrite succeeded");
2487        object
2488            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2489            .await
2490            .expect_err("overwrite succeeded");
2491        object
2492            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2493            .await
2494            .expect_err("overwrite succeeded");
2495        object
2496            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2497            .await
2498            .expect_err("overwrite succeeded");
2499
2500        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2501        {
2502            let mut read_buf = object.allocate_buffer(4096).await;
2503            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2504            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2505        }
2506        object
2507            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2508            .await
2509            .expect("overwrite failed");
2510        {
2511            let mut read_buf = object.allocate_buffer(4096).await;
2512            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2513            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2514        }
2515        {
2516            let mut read_buf = object.allocate_buffer(4096).await;
2517            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2518            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2519        }
2520        object
2521            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2522            .await
2523            .expect("overwrite failed");
2524        {
2525            let mut read_buf = object.allocate_buffer(4096).await;
2526            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2527            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2528        }
2529        {
2530            let mut read_buf = object.allocate_buffer(4096).await;
2531            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2532            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2533        }
2534        object
2535            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2536            .await
2537            .expect("overwrite failed");
2538        {
2539            let mut read_buf = object.allocate_buffer(4096).await;
2540            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2541            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2542        }
2543        {
2544            let mut read_buf = object.allocate_buffer(4096).await;
2545            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2546            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2547        }
2548        object
2549            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2550            .await
2551            .expect("overwrite failed");
2552        {
2553            let mut read_buf = object.allocate_buffer(4096).await;
2554            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2555            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2556        }
2557
2558        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2559        let mut huge_write_buf = object.allocate_buffer(524288).await;
2560        huge_write_buf.as_mut_slice().fill(96);
2561
2562        // With allocations disabled, the big overwrite should fail...
2563        object
2564            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2565            .await
2566            .expect_err("overwrite succeeded");
2567        // ... but it should work when allocations are enabled
2568        object
2569            .overwrite(
2570                0,
2571                huge_write_buf.as_mut(),
2572                OverwriteOptions { allow_allocations: true, ..Default::default() },
2573            )
2574            .await
2575            .expect("overwrite failed");
2576        {
2577            let mut read_buf = object.allocate_buffer(524288).await;
2578            object.read(0, read_buf.as_mut()).await.expect("read failed");
2579            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2580        }
2581
2582        // Check that the overwrites haven't messed up the filesystem state
2583        let fsck_options = FsckOptions {
2584            fail_on_warning: true,
2585            no_lock: true,
2586            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2587            ..Default::default()
2588        };
2589        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2590
2591        fs.close().await.expect("Close failed");
2592    }
2593
2594    #[fuchsia::test]
2595    async fn test_overwrite_when_unallocated_at_start_of_file() {
2596        // The standard test data we put in the test object would cause an extent with checksums
2597        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2598        let (fs, object) = test_filesystem_and_empty_object().await;
2599
2600        let object = ObjectStore::open_object(
2601            object.owner(),
2602            object.object_id(),
2603            HandleOptions::default(),
2604            None,
2605        )
2606        .await
2607        .expect("open_object failed");
2608
2609        assert_eq!(fs.block_size(), 4096);
2610
2611        let mut write_buf = object.allocate_buffer(4096).await;
2612        write_buf.as_mut_slice().fill(95);
2613
2614        // First try to overwrite without allowing allocations
2615        // We expect this to fail, since nothing is allocated yet
2616        object
2617            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2618            .await
2619            .expect_err("overwrite succeeded");
2620
2621        // Now try the same overwrite command as before, but allow allocations
2622        object
2623            .overwrite(
2624                0,
2625                write_buf.as_mut(),
2626                OverwriteOptions { allow_allocations: true, ..Default::default() },
2627            )
2628            .await
2629            .expect("overwrite failed");
2630        {
2631            let mut read_buf = object.allocate_buffer(4096).await;
2632            object.read(0, read_buf.as_mut()).await.expect("read failed");
2633            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2634        }
2635
2636        // Now try to overwrite at the next block. This should fail if allocations are disabled
2637        object
2638            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2639            .await
2640            .expect_err("overwrite succeeded");
2641
2642        // ... but it should work if allocations are enabled
2643        object
2644            .overwrite(
2645                4096,
2646                write_buf.as_mut(),
2647                OverwriteOptions { allow_allocations: true, ..Default::default() },
2648            )
2649            .await
2650            .expect("overwrite failed");
2651        {
2652            let mut read_buf = object.allocate_buffer(4096).await;
2653            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2654            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2655        }
2656
2657        // Check that the overwrites haven't messed up the filesystem state
2658        let fsck_options = FsckOptions {
2659            fail_on_warning: true,
2660            no_lock: true,
2661            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2662            ..Default::default()
2663        };
2664        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2665
2666        fs.close().await.expect("Close failed");
2667    }
2668
2669    #[fuchsia::test]
2670    async fn test_overwrite_can_extend_a_file() {
2671        // The standard test data we put in the test object would cause an extent with checksums
2672        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2673        let (fs, object) = test_filesystem_and_empty_object().await;
2674
2675        let object = ObjectStore::open_object(
2676            object.owner(),
2677            object.object_id(),
2678            HandleOptions::default(),
2679            None,
2680        )
2681        .await
2682        .expect("open_object failed");
2683
2684        assert_eq!(fs.block_size(), 4096);
2685        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2686
2687        let mut write_buf = object.allocate_buffer(4096).await;
2688        write_buf.as_mut_slice().fill(95);
2689
2690        // Let's try to fill up the last block, and increase the file size in doing so
2691        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2692
2693        // Expected to fail with allocations disabled
2694        object
2695            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2696            .await
2697            .expect_err("overwrite succeeded");
2698        // ... but expected to succeed with allocations enabled
2699        object
2700            .overwrite(
2701                last_block_offset,
2702                write_buf.as_mut(),
2703                OverwriteOptions { allow_allocations: true, ..Default::default() },
2704            )
2705            .await
2706            .expect("overwrite failed");
2707        {
2708            let mut read_buf = object.allocate_buffer(4096).await;
2709            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2710            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2711        }
2712
2713        assert_eq!(object.get_size(), 8192);
2714
2715        // Let's try to write at the next block, too
2716        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2717
2718        // Expected to fail with allocations disabled
2719        object
2720            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2721            .await
2722            .expect_err("overwrite succeeded");
2723        // ... but expected to succeed with allocations enabled
2724        object
2725            .overwrite(
2726                next_block_offset,
2727                write_buf.as_mut(),
2728                OverwriteOptions { allow_allocations: true, ..Default::default() },
2729            )
2730            .await
2731            .expect("overwrite failed");
2732        {
2733            let mut read_buf = object.allocate_buffer(4096).await;
2734            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2735            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2736        }
2737
2738        assert_eq!(object.get_size(), 12288);
2739
2740        // Check that the overwrites haven't messed up the filesystem state
2741        let fsck_options = FsckOptions {
2742            fail_on_warning: true,
2743            no_lock: true,
2744            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2745            ..Default::default()
2746        };
2747        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2748
2749        fs.close().await.expect("Close failed");
2750    }
2751
2752    #[fuchsia::test]
2753    async fn test_enable_verity() {
2754        let fs: OpenFxFilesystem = test_filesystem().await;
2755        let mut transaction = fs
2756            .clone()
2757            .new_transaction(lock_keys![], Options::default())
2758            .await
2759            .expect("new_transaction failed");
2760        let store = fs.root_store();
2761        let object = Arc::new(
2762            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2763                .await
2764                .expect("create_object failed"),
2765        );
2766
2767        transaction.commit().await.unwrap();
2768
2769        object
2770            .enable_verity(fio::VerificationOptions {
2771                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2772                salt: Some(vec![]),
2773                ..Default::default()
2774            })
2775            .await
2776            .expect("set verified file metadata failed");
2777
2778        let handle =
2779            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2780                .await
2781                .expect("open_object failed");
2782
2783        assert!(handle.is_verified_file());
2784
2785        fs.close().await.expect("Close failed");
2786    }
2787
2788    #[fuchsia::test]
2789    async fn test_enable_verity_large_file() {
2790        // Need to make a large FakeDevice to create space for a 67 MB file.
2791        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2792        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2793        let root_store = fs.root_store();
2794        let mut transaction = fs
2795            .clone()
2796            .new_transaction(lock_keys![], Options::default())
2797            .await
2798            .expect("new_transaction failed");
2799
2800        let handle = ObjectStore::create_object(
2801            &root_store,
2802            &mut transaction,
2803            HandleOptions::default(),
2804            None,
2805        )
2806        .await
2807        .expect("failed to create object");
2808        transaction.commit().await.expect("commit failed");
2809        let mut offset = 0;
2810
2811        // Write a file big enough to trigger multiple transactions on enable_verity().
2812        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2813        buf.as_mut_slice().fill(1);
2814        for _ in 0..130 {
2815            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2816            offset += WRITE_ATTR_BATCH_SIZE as u64;
2817        }
2818
2819        handle
2820            .enable_verity(fio::VerificationOptions {
2821                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2822                salt: Some(vec![]),
2823                ..Default::default()
2824            })
2825            .await
2826            .expect("set verified file metadata failed");
2827
2828        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2829        offset = 0;
2830        for _ in 0..130 {
2831            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2832            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2833            offset += WRITE_ATTR_BATCH_SIZE as u64;
2834        }
2835
2836        fsck(fs.clone()).await.expect("fsck failed");
2837        fs.close().await.expect("Close failed");
2838    }
2839
2840    #[fuchsia::test]
2841    async fn test_retry_enable_verity_on_reboot() {
2842        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2843        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2844        let root_store = fs.root_store();
2845        let mut transaction = fs
2846            .clone()
2847            .new_transaction(lock_keys![], Options::default())
2848            .await
2849            .expect("new_transaction failed");
2850
2851        let handle = ObjectStore::create_object(
2852            &root_store,
2853            &mut transaction,
2854            HandleOptions::default(),
2855            None,
2856        )
2857        .await
2858        .expect("failed to create object");
2859        transaction.commit().await.expect("commit failed");
2860
2861        let object_id = {
2862            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2863            transaction.add(
2864                root_store.store_object_id(),
2865                Mutation::replace_or_insert_object(
2866                    ObjectKey::graveyard_attribute_entry(
2867                        root_store.graveyard_directory_object_id(),
2868                        handle.object_id(),
2869                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2870                    ),
2871                    ObjectValue::Some,
2872                ),
2873            );
2874
2875            // This write should span three transactions. This test mimics the behavior when the
2876            // last transaction gets interrupted by a filesystem.close().
2877            handle
2878                .write_new_attr_in_batches(
2879                    &mut transaction,
2880                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2881                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2882                    WRITE_ATTR_BATCH_SIZE,
2883                )
2884                .await
2885                .expect("failed to write merkle attribute");
2886
2887            handle.object_id()
2888            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2889            // release the transaction locks.
2890        };
2891
2892        fs.close().await.expect("failed to close filesystem");
2893        let device = fs.take_device().await;
2894        device.reopen(false);
2895
2896        let fs =
2897            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2898        fsck(fs.clone()).await.expect("fsck failed");
2899        fs.close().await.expect("failed to close filesystem");
2900        let device = fs.take_device().await;
2901        device.reopen(false);
2902
2903        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2904        let fs = FxFilesystem::open(device).await.expect("open failed");
2905        let root_store = fs.root_store();
2906        let handle =
2907            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2908                .await
2909                .expect("open_object failed");
2910        handle
2911            .enable_verity(fio::VerificationOptions {
2912                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2913                salt: Some(vec![]),
2914                ..Default::default()
2915            })
2916            .await
2917            .expect("set verified file metadata failed");
2918
2919        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2920        // isn't strictly necessary for the test to pass (the graveyard marker was already
2921        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2922        // graveyard entry not being removed upon processing.
2923        fs.graveyard().flush().await;
2924        assert!(
2925            FsVerityDescriptor::from_bytes(
2926                &handle
2927                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2928                    .await
2929                    .expect("read_attr failed")
2930                    .expect("No attr found"),
2931                handle.block_size() as usize
2932            )
2933            .is_ok()
2934        );
2935        fsck(fs.clone()).await.expect("fsck failed");
2936        fs.close().await.expect("Close failed");
2937    }
2938
2939    #[fuchsia::test]
2940    async fn test_verify_data_corrupt_file() {
2941        let fs: OpenFxFilesystem = test_filesystem().await;
2942        let mut transaction = fs
2943            .clone()
2944            .new_transaction(lock_keys![], Options::default())
2945            .await
2946            .expect("new_transaction failed");
2947        let store = fs.root_store();
2948        let object = Arc::new(
2949            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2950                .await
2951                .expect("create_object failed"),
2952        );
2953
2954        transaction.commit().await.unwrap();
2955
2956        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2957        buf.as_mut_slice().fill(123);
2958        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2959
2960        object
2961            .enable_verity(fio::VerificationOptions {
2962                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2963                salt: Some(vec![]),
2964                ..Default::default()
2965            })
2966            .await
2967            .expect("set verified file metadata failed");
2968
2969        // Change file contents and ensure verification fails
2970        buf.as_mut_slice().fill(234);
2971        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2972        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2973
2974        fs.close().await.expect("Close failed");
2975    }
2976
2977    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
2978    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
2979    // paths.
2980    #[fuchsia::test]
2981    async fn test_parse_f2fs_verity() {
2982        let fs: OpenFxFilesystem = test_filesystem().await;
2983        let mut transaction = fs
2984            .clone()
2985            .new_transaction(lock_keys![], Options::default())
2986            .await
2987            .expect("new_transaction failed");
2988        let store = fs.root_store();
2989        let object = Arc::new(
2990            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2991                .await
2992                .expect("create_object failed"),
2993        );
2994
2995        transaction.commit().await.unwrap();
2996        let file_size = fs.block_size() * 2;
2997        // Write over one block to make there be leaf hashes.
2998        {
2999            let mut buf = object.allocate_buffer(file_size as usize).await;
3000            buf.as_mut_slice().fill(64);
3001            assert_eq!(
3002                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3003                file_size
3004            );
3005        }
3006
3007        // Enable verity normally, then shift the type.
3008        object
3009            .enable_verity(fio::VerificationOptions {
3010                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3011                salt: Some(vec![]),
3012                ..Default::default()
3013            })
3014            .await
3015            .expect("set verified file metadata failed");
3016        let (verity_info, root_hash) =
3017            object.get_descriptor().expect("Getting verity info").unwrap();
3018
3019        let mut transaction = fs
3020            .clone()
3021            .new_transaction(
3022                lock_keys![LockKey::Object {
3023                    store_object_id: store.store_object_id(),
3024                    object_id: object.object_id()
3025                }],
3026                Options::default(),
3027            )
3028            .await
3029            .expect("new_transaction failed");
3030        transaction.add(
3031            store.store_object_id(),
3032            Mutation::replace_or_insert_object(
3033                ObjectKey::attribute(
3034                    object.object_id(),
3035                    DEFAULT_DATA_ATTRIBUTE_ID,
3036                    AttributeKey::Attribute,
3037                ),
3038                ObjectValue::verified_attribute(
3039                    file_size,
3040                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3041                ),
3042            ),
3043        );
3044        transaction.add(
3045            store.store_object_id(),
3046            Mutation::replace_or_insert_object(
3047                ObjectKey::attribute(
3048                    object.object_id(),
3049                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3050                    AttributeKey::Attribute,
3051                ),
3052                ObjectValue::attribute(fs.block_size() * 2, false),
3053            ),
3054        );
3055        {
3056            let descriptor = FsVerityDescriptorRaw::new(
3057                fio::HashAlgorithm::Sha256,
3058                fs.block_size(),
3059                file_size,
3060                root_hash.as_slice(),
3061                match &verity_info.salt {
3062                    Some(salt) => salt.as_slice(),
3063                    None => [0u8; 0].as_slice(),
3064                },
3065            )
3066            .expect("Creating descriptor");
3067            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3068            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3069            object
3070                .multi_write(
3071                    &mut transaction,
3072                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3073                    &[fs.block_size()..(fs.block_size() * 2)],
3074                    buf.as_mut(),
3075                )
3076                .await
3077                .expect("Writing descriptor");
3078        }
3079        transaction.commit().await.unwrap();
3080
3081        let handle =
3082            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3083                .await
3084                .expect("open_object failed");
3085
3086        assert!(handle.is_verified_file());
3087
3088        let mut buf = object.allocate_buffer(file_size as usize).await;
3089        assert_eq!(
3090            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3091            file_size as usize
3092        );
3093
3094        fs.close().await.expect("Close failed");
3095    }
3096
3097    #[fuchsia::test]
3098    async fn test_verify_data_corrupt_tree() {
3099        let fs: OpenFxFilesystem = test_filesystem().await;
3100        let object_id = {
3101            let store = fs.root_store();
3102            let mut transaction = fs
3103                .clone()
3104                .new_transaction(lock_keys![], Options::default())
3105                .await
3106                .expect("new_transaction failed");
3107            let object = Arc::new(
3108                ObjectStore::create_object(
3109                    &store,
3110                    &mut transaction,
3111                    HandleOptions::default(),
3112                    None,
3113                )
3114                .await
3115                .expect("create_object failed"),
3116            );
3117            let object_id = object.object_id();
3118
3119            transaction.commit().await.unwrap();
3120
3121            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3122            buf.as_mut_slice().fill(123);
3123            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3124
3125            object
3126                .enable_verity(fio::VerificationOptions {
3127                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3128                    salt: Some(vec![]),
3129                    ..Default::default()
3130                })
3131                .await
3132                .expect("set verified file metadata failed");
3133            object.read(0, buf.as_mut()).await.expect("verified read");
3134
3135            // Corrupt the merkle tree before closing.
3136            let mut merkle = object
3137                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3138                .await
3139                .unwrap()
3140                .expect("Reading merkle tree");
3141            merkle[0] = merkle[0].wrapping_add(1);
3142            object
3143                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3144                .await
3145                .expect("Overwriting merkle");
3146
3147            object_id
3148        }; // Close object.
3149
3150        // Reopening the object should complain about the corrupted merkle tree.
3151        assert!(
3152            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3153                .await
3154                .is_err()
3155        );
3156        fs.close().await.expect("Close failed");
3157    }
3158
3159    #[fuchsia::test]
3160    async fn test_extend() {
3161        let fs = test_filesystem().await;
3162        let handle;
3163        let mut transaction = fs
3164            .clone()
3165            .new_transaction(lock_keys![], Options::default())
3166            .await
3167            .expect("new_transaction failed");
3168        let store = fs.root_store();
3169        handle =
3170            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3171                .await
3172                .expect("create_object failed");
3173
3174        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3175        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3176        // of 2MiB here.
3177        const START_OFFSET: u64 = 2048 * 1024;
3178        handle
3179            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3180            .await
3181            .expect("extend failed");
3182        transaction.commit().await.expect("commit failed");
3183        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3184        buf.as_mut_slice().fill(123);
3185        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3186        buf.as_mut_slice().fill(67);
3187        handle.read(0, buf.as_mut()).await.expect("read failed");
3188        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3189        fs.close().await.expect("Close failed");
3190    }
3191
3192    #[fuchsia::test]
3193    async fn test_truncate_deallocates_old_extents() {
3194        let (fs, object) = test_filesystem_and_object().await;
3195        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3196        buf.as_mut_slice().fill(0xaa);
3197        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3198
3199        let allocator = fs.allocator();
3200        let allocated_before = allocator.get_allocated_bytes();
3201        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3202        let allocated_after = allocator.get_allocated_bytes();
3203        assert!(
3204            allocated_after < allocated_before,
3205            "before = {} after = {}",
3206            allocated_before,
3207            allocated_after
3208        );
3209        fs.close().await.expect("Close failed");
3210    }
3211
3212    #[fuchsia::test]
3213    async fn test_truncate_zeroes_tail_block() {
3214        let (fs, object) = test_filesystem_and_object().await;
3215
3216        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3217        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3218            .await
3219            .expect("truncate failed");
3220
3221        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3222        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3223        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3224
3225        let mut expected = TEST_DATA.to_vec();
3226        expected[3..].fill(0);
3227        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3228    }
3229
3230    #[fuchsia::test]
3231    async fn test_trim() {
3232        // Format a new filesystem.
3233        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3234        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3235        let block_size = fs.block_size();
3236        root_volume(fs.clone())
3237            .await
3238            .expect("root_volume failed")
3239            .new_volume("test", NewChildStoreOptions::default())
3240            .await
3241            .expect("volume failed");
3242        fs.close().await.expect("close failed");
3243        let device = fs.take_device().await;
3244        device.reopen(false);
3245
3246        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3247        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3248        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3249        // graveyard trims the file as expected.
3250        #[derive(Default)]
3251        struct Context {
3252            store: Option<Arc<ObjectStore>>,
3253            object_id: Option<u64>,
3254        }
3255        let shared_context = Arc::new(Mutex::new(Context::default()));
3256
3257        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3258
3259        // Wait for an object to get tombstoned by the graveyard.
3260        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3261            loop {
3262                if let Err(e) =
3263                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3264                {
3265                    assert!(
3266                        FxfsError::NotFound.matches(&e),
3267                        "open_object didn't fail with NotFound: {:?}",
3268                        e
3269                    );
3270                    break;
3271                }
3272                // The graveyard should eventually tombstone the object.
3273                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3274            }
3275        }
3276
3277        // Checks to see if the object needs to be trimmed.
3278        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3279            let root_directory = Directory::open(store, store.root_directory_object_id())
3280                .await
3281                .expect("open failed");
3282            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3283            if let Some((oid, _, _)) = oid {
3284                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3285                    .await
3286                    .expect("open_object failed");
3287                let props = object.get_properties().await.expect("get_properties failed");
3288                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3289                    Some(object)
3290                } else {
3291                    None
3292                }
3293            } else {
3294                None
3295            }
3296        }
3297
3298        let shared_context_clone = shared_context.clone();
3299        let post_commit = move || {
3300            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3301            let shared_context = shared_context_clone.clone();
3302            async move {
3303                // First run fsck on the current filesystem.
3304                let options = FsckOptions {
3305                    fail_on_warning: true,
3306                    no_lock: true,
3307                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3308                    ..Default::default()
3309                };
3310                let fs = store.filesystem();
3311
3312                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3313                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3314                    .await
3315                    .expect("fsck_volume_with_options failed");
3316
3317                // Now check that we can replay this correctly.
3318                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3319                    .await
3320                    .expect("sync failed");
3321                let device = fs.device().snapshot().expect("snapshot failed");
3322
3323                let object_id = shared_context.lock().object_id.clone();
3324
3325                let fs2 = FxFilesystemBuilder::new()
3326                    .skip_initial_reap(object_id.is_none())
3327                    .open(device)
3328                    .await
3329                    .expect("open failed");
3330
3331                // If the "foo" file exists check that allocated size matches content size.
3332                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3333                let store =
3334                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3335
3336                if let Some(oid) = object_id {
3337                    // For the second pass, the object should get tombstoned.
3338                    expect_tombstoned(&store, oid).await;
3339                } else if let Some(object) = needs_trim(&store).await {
3340                    // Extend the file and make sure that it is correctly trimmed.
3341                    object.truncate(object_size).await.expect("truncate failed");
3342                    let mut buf = object.allocate_buffer(block_size as usize).await;
3343                    object
3344                        .read(object_size - block_size * 2, buf.as_mut())
3345                        .await
3346                        .expect("read failed");
3347                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3348
3349                    // Remount, this time with the graveyard performing an initial reap and the
3350                    // object should get trimmed.
3351                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3352                        .await
3353                        .expect("open failed");
3354                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3355                    let store = root_vol
3356                        .volume("test", StoreOptions::default())
3357                        .await
3358                        .expect("volume failed");
3359                    while needs_trim(&store).await.is_some() {
3360                        // The object has been truncated, but still has some data allocated to
3361                        // it.  The graveyard should trim the object eventually.
3362                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3363                    }
3364
3365                    // Run fsck.
3366                    fsck_with_options(fs.clone(), &options)
3367                        .await
3368                        .expect("fsck_with_options failed");
3369                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3370                        .await
3371                        .expect("fsck_volume_with_options failed");
3372                    fs.close().await.expect("close failed");
3373                }
3374
3375                // Run fsck on fs2.
3376                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3377                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3378                    .await
3379                    .expect("fsck_volume_with_options failed");
3380                fs2.close().await.expect("close failed");
3381            }
3382            .boxed()
3383        };
3384
3385        let fs = FxFilesystemBuilder::new()
3386            .post_commit_hook(post_commit)
3387            .open(device)
3388            .await
3389            .expect("open failed");
3390
3391        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3392        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3393
3394        shared_context.lock().store = Some(store.clone());
3395
3396        let root_directory =
3397            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3398
3399        let object;
3400        let mut transaction = fs
3401            .clone()
3402            .new_transaction(
3403                lock_keys![LockKey::object(
3404                    store.store_object_id(),
3405                    store.root_directory_object_id()
3406                )],
3407                Options::default(),
3408            )
3409            .await
3410            .expect("new_transaction failed");
3411        object = root_directory
3412            .create_child_file(&mut transaction, "foo")
3413            .await
3414            .expect("create_object failed");
3415        transaction.commit().await.expect("commit failed");
3416
3417        let mut transaction = fs
3418            .clone()
3419            .new_transaction(
3420                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3421                Options::default(),
3422            )
3423            .await
3424            .expect("new_transaction failed");
3425
3426        // Two passes: first with a regular object, and then with that object moved into the
3427        // graveyard.
3428        let mut pass = 0;
3429        loop {
3430            // Create enough extents in it such that when we truncate the object it will require
3431            // more than one transaction.
3432            let mut buf = object.allocate_buffer(5).await;
3433            buf.as_mut_slice().fill(1);
3434            // Write every other block.
3435            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3436                object
3437                    .txn_write(&mut transaction, offset, buf.as_ref())
3438                    .await
3439                    .expect("write failed");
3440            }
3441            transaction.commit().await.expect("commit failed");
3442            // This should take up more than one transaction.
3443            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3444
3445            if pass == 1 {
3446                break;
3447            }
3448
3449            // Store the object ID so that we can make sure the object is always tombstoned
3450            // after remount (see above).
3451            shared_context.lock().object_id = Some(object.object_id());
3452
3453            transaction = fs
3454                .clone()
3455                .new_transaction(
3456                    lock_keys![
3457                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3458                        LockKey::object(store.store_object_id(), object.object_id()),
3459                    ],
3460                    Options::default(),
3461                )
3462                .await
3463                .expect("new_transaction failed");
3464
3465            // Move the object into the graveyard.
3466            replace_child(&mut transaction, None, (&root_directory, "foo"))
3467                .await
3468                .expect("replace_child failed");
3469            store.add_to_graveyard(&mut transaction, object.object_id());
3470
3471            pass += 1;
3472        }
3473
3474        fs.close().await.expect("Close failed");
3475    }
3476
3477    #[fuchsia::test]
3478    async fn test_adjust_refs() {
3479        let (fs, object) = test_filesystem_and_object().await;
3480        let store = object.owner();
3481        let mut transaction = fs
3482            .clone()
3483            .new_transaction(
3484                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3485                Options::default(),
3486            )
3487            .await
3488            .expect("new_transaction failed");
3489        assert_eq!(
3490            store
3491                .adjust_refs(&mut transaction, object.object_id(), 1)
3492                .await
3493                .expect("adjust_refs failed"),
3494            false
3495        );
3496        transaction.commit().await.expect("commit failed");
3497
3498        let allocator = fs.allocator();
3499        let allocated_before = allocator.get_allocated_bytes();
3500        let mut transaction = fs
3501            .clone()
3502            .new_transaction(
3503                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3504                Options::default(),
3505            )
3506            .await
3507            .expect("new_transaction failed");
3508        assert_eq!(
3509            store
3510                .adjust_refs(&mut transaction, object.object_id(), -2)
3511                .await
3512                .expect("adjust_refs failed"),
3513            true
3514        );
3515        transaction.commit().await.expect("commit failed");
3516
3517        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3518
3519        store
3520            .tombstone_object(
3521                object.object_id(),
3522                Options { borrow_metadata_space: true, ..Default::default() },
3523            )
3524            .await
3525            .expect("purge failed");
3526
3527        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3528
3529        // We need to remove the directory entry, too, otherwise fsck will complain
3530        {
3531            let mut transaction = fs
3532                .clone()
3533                .new_transaction(
3534                    lock_keys![LockKey::object(
3535                        store.store_object_id(),
3536                        store.root_directory_object_id()
3537                    )],
3538                    Options::default(),
3539                )
3540                .await
3541                .expect("new_transaction failed");
3542            let root_directory = Directory::open(&store, store.root_directory_object_id())
3543                .await
3544                .expect("open failed");
3545            transaction.add(
3546                store.store_object_id(),
3547                Mutation::replace_or_insert_object(
3548                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3549                    ObjectValue::None,
3550                ),
3551            );
3552            transaction.commit().await.expect("commit failed");
3553        }
3554
3555        fsck_with_options(
3556            fs.clone(),
3557            &FsckOptions {
3558                fail_on_warning: true,
3559                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3560                ..Default::default()
3561            },
3562        )
3563        .await
3564        .expect("fsck_with_options failed");
3565
3566        fs.close().await.expect("Close failed");
3567    }
3568
3569    #[fuchsia::test]
3570    async fn test_locks() {
3571        let (fs, object) = test_filesystem_and_object().await;
3572        let (send1, recv1) = channel();
3573        let (send2, recv2) = channel();
3574        let (send3, recv3) = channel();
3575        let done = Mutex::new(false);
3576        let mut futures = FuturesUnordered::new();
3577        futures.push(
3578            async {
3579                let mut t = object.new_transaction().await.expect("new_transaction failed");
3580                send1.send(()).unwrap(); // Tell the next future to continue.
3581                send3.send(()).unwrap(); // Tell the last future to continue.
3582                recv2.await.unwrap();
3583                let mut buf = object.allocate_buffer(5).await;
3584                buf.as_mut_slice().copy_from_slice(b"hello");
3585                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3586                // This is a halting problem so all we can do is sleep.
3587                fasync::Timer::new(Duration::from_millis(100)).await;
3588                assert!(!*done.lock());
3589                t.commit().await.expect("commit failed");
3590            }
3591            .boxed(),
3592        );
3593        futures.push(
3594            async {
3595                recv1.await.unwrap();
3596                // Reads should not block.
3597                let offset = TEST_DATA_OFFSET as usize;
3598                let align = offset % fs.block_size() as usize;
3599                let len = TEST_DATA.len();
3600                let mut buf = object.allocate_buffer(align + len).await;
3601                assert_eq!(
3602                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3603                    align + TEST_DATA.len()
3604                );
3605                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3606                // Tell the first future to continue.
3607                send2.send(()).unwrap();
3608            }
3609            .boxed(),
3610        );
3611        futures.push(
3612            async {
3613                // This should block until the first future has completed.
3614                recv3.await.unwrap();
3615                let _t = object.new_transaction().await.expect("new_transaction failed");
3616                let mut buf = object.allocate_buffer(5).await;
3617                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3618                assert_eq!(buf.as_slice(), b"hello");
3619            }
3620            .boxed(),
3621        );
3622        while let Some(()) = futures.next().await {}
3623        fs.close().await.expect("Close failed");
3624    }
3625
3626    #[fuchsia::test(threads = 10)]
3627    async fn test_racy_reads() {
3628        let fs = test_filesystem().await;
3629        let object;
3630        let mut transaction = fs
3631            .clone()
3632            .new_transaction(lock_keys![], Options::default())
3633            .await
3634            .expect("new_transaction failed");
3635        let store = fs.root_store();
3636        object = Arc::new(
3637            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3638                .await
3639                .expect("create_object failed"),
3640        );
3641        transaction.commit().await.expect("commit failed");
3642        for _ in 0..100 {
3643            let cloned_object = object.clone();
3644            let writer = fasync::Task::spawn(async move {
3645                let mut buf = cloned_object.allocate_buffer(10).await;
3646                buf.as_mut_slice().fill(123);
3647                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3648            });
3649            let cloned_object = object.clone();
3650            let reader = fasync::Task::spawn(async move {
3651                let wait_time = rand::random_range(0..5);
3652                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3653                let mut buf = cloned_object.allocate_buffer(10).await;
3654                buf.as_mut_slice().fill(23);
3655                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3656                // If we succeed in reading data, it must include the write; i.e. if we see the size
3657                // change, we should see the data too.  For this to succeed it requires locking on
3658                // the read size to ensure that when we read the size, we get the extents changed in
3659                // that same transaction.
3660                if amount != 0 {
3661                    assert_eq!(amount, 10);
3662                    assert_eq!(buf.as_slice(), &[123; 10]);
3663                }
3664            });
3665            writer.await;
3666            reader.await;
3667            object.truncate(0).await.expect("truncate failed");
3668        }
3669        fs.close().await.expect("Close failed");
3670    }
3671
3672    #[fuchsia::test]
3673    async fn test_allocated_size() {
3674        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3675
3676        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3677        let mut buf = object.allocate_buffer(5).await;
3678        buf.as_mut_slice().copy_from_slice(b"hello");
3679        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3680        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3681        assert_eq!(after, before + fs.block_size() as u64);
3682
3683        // Do the same write again and there should be no change.
3684        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3685        assert_eq!(
3686            object.get_properties().await.expect("get_properties failed").allocated_size,
3687            after
3688        );
3689
3690        // extend...
3691        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3692        let offset = 1000 * fs.block_size() as u64;
3693        let before = after;
3694        object
3695            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3696            .await
3697            .expect("extend failed");
3698        transaction.commit().await.expect("commit failed");
3699        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3700        assert_eq!(after, before + fs.block_size() as u64);
3701
3702        // truncate...
3703        let before = after;
3704        let size = object.get_size();
3705        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3706        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3707        assert_eq!(after, before - fs.block_size() as u64);
3708
3709        // preallocate_range...
3710        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3711        let before = after;
3712        let mut file_range = offset..offset + fs.block_size() as u64;
3713        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3714        transaction.commit().await.expect("commit failed");
3715        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3716        assert_eq!(after, before + fs.block_size() as u64);
3717        fs.close().await.expect("Close failed");
3718    }
3719
3720    #[fuchsia::test(threads = 10)]
3721    async fn test_zero() {
3722        let (fs, object) = test_filesystem_and_object().await;
3723        let expected_size = object.get_size();
3724        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3725        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3726        transaction.commit().await.expect("commit failed");
3727        assert_eq!(object.get_size(), expected_size);
3728        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3729        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3730        assert_eq!(
3731            &buf.as_slice()[0..expected_size as usize],
3732            vec![0u8; expected_size as usize].as_slice()
3733        );
3734        fs.close().await.expect("Close failed");
3735    }
3736
3737    #[fuchsia::test]
3738    async fn test_properties() {
3739        let (fs, object) = test_filesystem_and_object().await;
3740        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3741        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3742        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3743
3744        // ObjectProperties can be updated through `update_attributes`.
3745        // `get_properties` should reflect the latest changes.
3746        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3747        object
3748            .update_attributes(
3749                &mut transaction,
3750                Some(&fio::MutableNodeAttributes {
3751                    creation_time: Some(CRTIME.as_nanos()),
3752                    modification_time: Some(MTIME.as_nanos()),
3753                    mode: Some(111),
3754                    gid: Some(222),
3755                    ..Default::default()
3756                }),
3757                None,
3758            )
3759            .await
3760            .expect("update_attributes failed");
3761        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3762        object
3763            .update_attributes(
3764                &mut transaction,
3765                Some(&fio::MutableNodeAttributes {
3766                    modification_time: Some(MTIME_NEW.as_nanos()),
3767                    gid: Some(333),
3768                    rdev: Some(444),
3769                    ..Default::default()
3770                }),
3771                Some(CTIME),
3772            )
3773            .await
3774            .expect("update_timestamps failed");
3775        transaction.commit().await.expect("commit failed");
3776
3777        let properties = object.get_properties().await.expect("get_properties failed");
3778        assert_matches!(
3779            properties,
3780            ObjectProperties {
3781                refs: 1u64,
3782                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3783                data_attribute_size: TEST_OBJECT_SIZE,
3784                creation_time: CRTIME,
3785                modification_time: MTIME_NEW,
3786                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3787                change_time: CTIME,
3788                ..
3789            }
3790        );
3791        fs.close().await.expect("Close failed");
3792    }
3793
3794    #[fuchsia::test]
3795    async fn test_is_allocated() {
3796        let (fs, object) = test_filesystem_and_object().await;
3797
3798        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3799        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3800        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3801        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3802
3803        // Check for the case where where we have the following extent layout
3804        //       [ unallocated ][ `TEST_DATA` ]
3805        // The extents before `aligned_offset` should not be allocated
3806        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3807        assert_eq!(count, aligned_offset);
3808        assert_eq!(allocated, false);
3809
3810        let (allocated, count) =
3811            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3812        assert_eq!(count, aligned_length);
3813        assert_eq!(allocated, true);
3814
3815        // Check for the case where where we query out of range
3816        let end = aligned_offset + aligned_length;
3817        object
3818            .is_allocated(end)
3819            .await
3820            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3821
3822        // Check for the case where where we start querying for allocation starting from
3823        // an allocated range to the end of the device
3824        let size = 50 * fs.block_size() as u64;
3825        object.truncate(size).await.expect("extend failed");
3826
3827        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3828        assert_eq!(count, size - end);
3829        assert_eq!(allocated, false);
3830
3831        // Check for the case where where we have the following extent layout
3832        //      [ unallocated ][ `buf` ][ `buf` ]
3833        let buf_length = 5 * fs.block_size();
3834        let mut buf = object.allocate_buffer(buf_length as usize).await;
3835        buf.as_mut_slice().fill(123);
3836        let new_offset = end + 20 * fs.block_size() as u64;
3837        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3838        object
3839            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3840            .await
3841            .expect("write failed");
3842
3843        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3844        assert_eq!(count, new_offset - end);
3845        assert_eq!(allocated, false);
3846
3847        let (allocated, count) =
3848            object.is_allocated(new_offset).await.expect("is_allocated failed");
3849        assert_eq!(count, 2 * buf_length);
3850        assert_eq!(allocated, true);
3851
3852        // Check the case where we query from the middle of an extent
3853        let (allocated, count) = object
3854            .is_allocated(new_offset + 4 * fs.block_size())
3855            .await
3856            .expect("is_allocated failed");
3857        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3858        assert_eq!(allocated, true);
3859
3860        // Now, write buffer to a location already written to.
3861        // Check for the case when we the following extent layout
3862        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3863        let other_buf_length = 3 * fs.block_size();
3864        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3865        other_buf.as_mut_slice().fill(231);
3866        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3867
3868        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3869        // allocated from `new_offset`
3870        let (allocated, count) =
3871            object.is_allocated(new_offset).await.expect("is_allocated failed");
3872        assert_eq!(count, 2 * buf_length);
3873        assert_eq!(allocated, true);
3874
3875        // Check for the case when we the following extent layout
3876        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3877        // Mark TEST_DATA as deleted
3878        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3879        object
3880            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3881            .await
3882            .expect("zero failed");
3883        // Mark `other_buf` as deleted
3884        object
3885            .zero(&mut transaction, new_offset..new_offset + buf_length)
3886            .await
3887            .expect("zero failed");
3888        transaction.commit().await.expect("commit transaction failed");
3889
3890        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3891        assert_eq!(count, new_offset + buf_length);
3892        assert_eq!(allocated, false);
3893
3894        let (allocated, count) =
3895            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3896        assert_eq!(count, buf_length);
3897        assert_eq!(allocated, true);
3898
3899        let new_end = new_offset + buf_length + count;
3900
3901        // Check for the case where there are objects with different keys.
3902        // Case that we're checking for:
3903        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3904        let store = object.owner();
3905        let mut transaction = fs
3906            .clone()
3907            .new_transaction(lock_keys![], Options::default())
3908            .await
3909            .expect("new_transaction failed");
3910        let object2 =
3911            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3912                .await
3913                .expect("create_object failed");
3914        transaction.commit().await.expect("commit failed");
3915
3916        object2
3917            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3918            .await
3919            .expect("write failed");
3920
3921        // Expecting that the extent with a different key is treated like unallocated extent
3922        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3923        assert_eq!(count, size - new_end);
3924        assert_eq!(allocated, false);
3925
3926        fs.close().await.expect("close failed");
3927    }
3928
3929    #[fuchsia::test(threads = 10)]
3930    async fn test_read_write_attr() {
3931        let (_fs, object) = test_filesystem_and_object().await;
3932        let data = [0xffu8; 16_384];
3933        object.write_attr(20, &data).await.expect("write_attr failed");
3934        let rdata =
3935            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3936        assert_eq!(&data[..], &rdata[..]);
3937
3938        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3939    }
3940
3941    #[fuchsia::test(threads = 10)]
3942    async fn test_allocate_basic() {
3943        let (fs, object) = test_filesystem_and_empty_object().await;
3944        let block_size = fs.block_size();
3945        let file_size = block_size * 10;
3946        object.truncate(file_size).await.unwrap();
3947
3948        let small_buf_size = 1024;
3949        let large_buf_aligned_size = block_size as usize * 2;
3950        let large_buf_size = block_size as usize * 2 + 1024;
3951
3952        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3953        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3954        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3955
3956        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3957        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3958        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3959        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3960        assert_eq!(
3961            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3962            large_buf_aligned_size
3963        );
3964        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3965
3966        // Allocation succeeds, and without any writes to the location it shows up as zero.
3967        object.allocate(block_size..block_size * 3).await.unwrap();
3968
3969        // Test starting before, inside, and after the allocated section with every sized buffer.
3970        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3971            for offset in 0..4 {
3972                assert_eq!(
3973                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3974                    buf.len(),
3975                    "buf_index: {}, read offset: {}",
3976                    buf_index,
3977                    offset,
3978                );
3979                assert_eq!(
3980                    buf.as_slice(),
3981                    &vec![0; buf.len()],
3982                    "buf_index: {}, read offset: {}",
3983                    buf_index,
3984                    offset,
3985                );
3986            }
3987        }
3988
3989        fs.close().await.expect("close failed");
3990    }
3991
3992    #[fuchsia::test(threads = 10)]
3993    async fn test_allocate_extends_file() {
3994        const BUF_SIZE: usize = 1024;
3995        let (fs, object) = test_filesystem_and_empty_object().await;
3996        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3997        let block_size = fs.block_size();
3998
3999        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4000        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4001
4002        assert!(TEST_OBJECT_SIZE < block_size * 4);
4003        // Allocation succeeds, and without any writes to the location it shows up as zero.
4004        object.allocate(0..block_size * 4).await.unwrap();
4005        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4006        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4007        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4008        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4009        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4010        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4011
4012        fs.close().await.expect("close failed");
4013    }
4014
4015    #[fuchsia::test(threads = 10)]
4016    async fn test_allocate_past_end() {
4017        const BUF_SIZE: usize = 1024;
4018        let (fs, object) = test_filesystem_and_empty_object().await;
4019        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4020        let block_size = fs.block_size();
4021
4022        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4023        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4024
4025        assert!(TEST_OBJECT_SIZE < block_size * 4);
4026        // Allocation succeeds, and without any writes to the location it shows up as zero.
4027        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4028        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4029        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4030        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4031        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4032        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4033        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4034
4035        fs.close().await.expect("close failed");
4036    }
4037
4038    #[fuchsia::test(threads = 10)]
4039    async fn test_allocate_read_attr() {
4040        let (fs, object) = test_filesystem_and_empty_object().await;
4041        let block_size = fs.block_size();
4042        let file_size = block_size * 4;
4043        object.truncate(file_size).await.unwrap();
4044
4045        let content = object
4046            .read_attr(object.attribute_id())
4047            .await
4048            .expect("failed to read attr")
4049            .expect("attr returned none");
4050        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4051
4052        object.allocate(block_size..block_size * 3).await.unwrap();
4053
4054        let content = object
4055            .read_attr(object.attribute_id())
4056            .await
4057            .expect("failed to read attr")
4058            .expect("attr returned none");
4059        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4060
4061        fs.close().await.expect("close failed");
4062    }
4063
4064    #[fuchsia::test(threads = 10)]
4065    async fn test_allocate_existing_data() {
4066        struct Case {
4067            written_ranges: Vec<Range<usize>>,
4068            allocate_range: Range<u64>,
4069        }
4070        let cases = [
4071            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4072            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4073            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4074            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4075            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4076            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4077            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4078        ];
4079
4080        for case in cases {
4081            let (fs, object) = test_filesystem_and_empty_object().await;
4082            let block_size = fs.block_size();
4083            let file_size = block_size * 10;
4084            object.truncate(file_size).await.unwrap();
4085
4086            for write in &case.written_ranges {
4087                let write_len = (write.end - write.start) * block_size as usize;
4088                let mut write_buf = object.allocate_buffer(write_len).await;
4089                write_buf.as_mut_slice().fill(0xff);
4090                assert_eq!(
4091                    object
4092                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4093                        .await
4094                        .unwrap(),
4095                    file_size
4096                );
4097            }
4098
4099            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4100            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4101
4102            object
4103                .allocate(
4104                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4105                )
4106                .await
4107                .unwrap();
4108
4109            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4110            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4111            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4112
4113            fs.close().await.expect("close failed");
4114        }
4115    }
4116
4117    async fn get_modes(
4118        obj: &DataObjectHandle<ObjectStore>,
4119        mut search_range: Range<u64>,
4120    ) -> Vec<(Range<u64>, ExtentMode)> {
4121        let mut modes = Vec::new();
4122        let store = obj.store();
4123        let tree = store.tree();
4124        let layer_set = tree.layer_set();
4125        let mut merger = layer_set.merger();
4126        let mut iter = merger
4127            .query(Query::FullRange(&ObjectKey::attribute(
4128                obj.object_id(),
4129                0,
4130                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4131            )))
4132            .await
4133            .unwrap();
4134        loop {
4135            match iter.get() {
4136                Some(ItemRef {
4137                    key:
4138                        ObjectKey {
4139                            object_id,
4140                            data:
4141                                ObjectKeyData::Attribute(
4142                                    attribute_id,
4143                                    AttributeKey::Extent(ExtentKey { range }),
4144                                ),
4145                        },
4146                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4147                    ..
4148                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4149                    if search_range.end <= range.start {
4150                        break;
4151                    }
4152                    let found_range = std::cmp::max(search_range.start, range.start)
4153                        ..std::cmp::min(search_range.end, range.end);
4154                    search_range.start = found_range.end;
4155                    modes.push((found_range, mode.clone()));
4156                    if search_range.start == search_range.end {
4157                        break;
4158                    }
4159                    iter.advance().await.unwrap();
4160                }
4161                x => panic!("looking for extent record, found this {:?}", x),
4162            }
4163        }
4164        modes
4165    }
4166
4167    async fn assert_all_overwrite(
4168        obj: &DataObjectHandle<ObjectStore>,
4169        mut search_range: Range<u64>,
4170    ) {
4171        let modes = get_modes(obj, search_range.clone()).await;
4172        for mode in modes {
4173            assert_eq!(
4174                mode.0.start, search_range.start,
4175                "missing mode in range {}..{}",
4176                search_range.start, mode.0.start
4177            );
4178            match mode.1 {
4179                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4180                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4181            }
4182            assert!(
4183                mode.0.end <= search_range.end,
4184                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4185                search_range,
4186                mode,
4187            );
4188            search_range.start = mode.0.end;
4189        }
4190        assert_eq!(
4191            search_range.start, search_range.end,
4192            "missing mode in range {:?}",
4193            search_range
4194        );
4195    }
4196
4197    #[fuchsia::test(threads = 10)]
4198    async fn test_multi_overwrite() {
4199        #[derive(Debug)]
4200        struct Case {
4201            pre_writes: Vec<Range<usize>>,
4202            allocate_ranges: Vec<Range<u64>>,
4203            overwrites: Vec<Vec<Range<u64>>>,
4204        }
4205        let cases = [
4206            Case {
4207                pre_writes: Vec::new(),
4208                allocate_ranges: vec![1..3],
4209                overwrites: vec![vec![1..3]],
4210            },
4211            Case {
4212                pre_writes: Vec::new(),
4213                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4214                overwrites: vec![vec![0..4]],
4215            },
4216            Case {
4217                pre_writes: Vec::new(),
4218                allocate_ranges: vec![0..4],
4219                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4220            },
4221            Case {
4222                pre_writes: Vec::new(),
4223                allocate_ranges: vec![0..4],
4224                overwrites: vec![vec![3..4]],
4225            },
4226            Case {
4227                pre_writes: Vec::new(),
4228                allocate_ranges: vec![0..4],
4229                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4230            },
4231            Case {
4232                pre_writes: Vec::new(),
4233                allocate_ranges: vec![1..2, 5..6, 7..8],
4234                overwrites: vec![vec![5..6]],
4235            },
4236            Case {
4237                pre_writes: Vec::new(),
4238                allocate_ranges: vec![1..3],
4239                overwrites: vec![
4240                    vec![1..3],
4241                    vec![1..3],
4242                    vec![1..3],
4243                    vec![1..3],
4244                    vec![1..3],
4245                    vec![1..3],
4246                    vec![1..3],
4247                    vec![1..3],
4248                ],
4249            },
4250            Case {
4251                pre_writes: Vec::new(),
4252                allocate_ranges: vec![0..5],
4253                overwrites: vec![
4254                    vec![1..3],
4255                    vec![1..3],
4256                    vec![1..3],
4257                    vec![1..3],
4258                    vec![1..3],
4259                    vec![1..3],
4260                    vec![1..3],
4261                    vec![1..3],
4262                ],
4263            },
4264            Case {
4265                pre_writes: Vec::new(),
4266                allocate_ranges: vec![0..5],
4267                overwrites: vec![vec![0..2, 2..4, 4..5]],
4268            },
4269            Case {
4270                pre_writes: Vec::new(),
4271                allocate_ranges: vec![0..5, 5..10],
4272                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4273            },
4274            Case {
4275                pre_writes: Vec::new(),
4276                allocate_ranges: vec![0..4, 6..10],
4277                overwrites: vec![vec![2..3, 7..9]],
4278            },
4279            Case {
4280                pre_writes: Vec::new(),
4281                allocate_ranges: vec![0..10],
4282                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4283            },
4284            Case {
4285                pre_writes: Vec::new(),
4286                allocate_ranges: vec![0..10],
4287                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4288            },
4289            Case {
4290                pre_writes: vec![1..3],
4291                allocate_ranges: vec![1..3],
4292                overwrites: vec![vec![1..3]],
4293            },
4294            Case {
4295                pre_writes: vec![1..3],
4296                allocate_ranges: vec![4..6],
4297                overwrites: vec![vec![5..6]],
4298            },
4299            Case {
4300                pre_writes: vec![1..3],
4301                allocate_ranges: vec![0..4],
4302                overwrites: vec![vec![0..4]],
4303            },
4304            Case {
4305                pre_writes: vec![1..3],
4306                allocate_ranges: vec![2..4],
4307                overwrites: vec![vec![2..4]],
4308            },
4309            Case {
4310                pre_writes: vec![3..5],
4311                allocate_ranges: vec![1..3, 6..7],
4312                overwrites: vec![vec![1..3, 6..7]],
4313            },
4314            Case {
4315                pre_writes: vec![1..3, 5..7, 8..9],
4316                allocate_ranges: vec![0..5],
4317                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4318            },
4319            Case {
4320                pre_writes: Vec::new(),
4321                allocate_ranges: vec![0..10, 4..6],
4322                overwrites: Vec::new(),
4323            },
4324            Case {
4325                pre_writes: Vec::new(),
4326                allocate_ranges: vec![3..8, 5..10],
4327                overwrites: Vec::new(),
4328            },
4329            Case {
4330                pre_writes: Vec::new(),
4331                allocate_ranges: vec![5..10, 3..8],
4332                overwrites: Vec::new(),
4333            },
4334        ];
4335
4336        for (i, case) in cases.into_iter().enumerate() {
4337            log::info!("running case {} - {:?}", i, case);
4338            let (fs, object) = test_filesystem_and_empty_object().await;
4339            let block_size = fs.block_size();
4340            let file_size = block_size * 10;
4341            object.truncate(file_size).await.unwrap();
4342
4343            for write in case.pre_writes {
4344                let write_len = (write.end - write.start) * block_size as usize;
4345                let mut write_buf = object.allocate_buffer(write_len).await;
4346                write_buf.as_mut_slice().fill(0xff);
4347                assert_eq!(
4348                    object
4349                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4350                        .await
4351                        .unwrap(),
4352                    file_size
4353                );
4354            }
4355
4356            for allocate_range in &case.allocate_ranges {
4357                object
4358                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4359                    .await
4360                    .unwrap();
4361            }
4362
4363            for allocate_range in case.allocate_ranges {
4364                assert_all_overwrite(
4365                    &object,
4366                    allocate_range.start * block_size..allocate_range.end * block_size,
4367                )
4368                .await;
4369            }
4370
4371            for overwrite in case.overwrites {
4372                let mut write_len = 0;
4373                let overwrite = overwrite
4374                    .into_iter()
4375                    .map(|r| {
4376                        write_len += (r.end - r.start) * block_size;
4377                        r.start * block_size..r.end * block_size
4378                    })
4379                    .collect::<Vec<_>>();
4380                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4381                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4382                write_buf.as_mut_slice().copy_from_slice(&data);
4383
4384                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4385                assert_eq!(
4386                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4387                    expected_buf.len()
4388                );
4389                let expected_buf_slice = expected_buf.as_mut_slice();
4390                let mut data_slice = data.as_slice();
4391                for r in &overwrite {
4392                    let len = r.length().unwrap() as usize;
4393                    let (copy_from, rest) = data_slice.split_at(len);
4394                    expected_buf_slice[r.start as usize..r.end as usize]
4395                        .copy_from_slice(&copy_from);
4396                    data_slice = rest;
4397                }
4398
4399                let mut transaction = object.new_transaction().await.unwrap();
4400                object
4401                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4402                    .await
4403                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4404                // Double check the emitted checksums. We should have one u64 checksum for every
4405                // block we wrote to disk.
4406                let mut checksummed_range_length = 0;
4407                let mut num_checksums = 0;
4408                for (device_range, checksums, _) in transaction.checksums() {
4409                    let range_len = device_range.end - device_range.start;
4410                    let checksums_len = checksums.len() as u64;
4411                    assert_eq!(range_len / checksums_len, block_size);
4412                    checksummed_range_length += range_len;
4413                    num_checksums += checksums_len;
4414                }
4415                assert_eq!(checksummed_range_length, write_len);
4416                assert_eq!(num_checksums, write_len / block_size);
4417                transaction.commit().await.unwrap();
4418
4419                let mut buf = object.allocate_buffer(file_size as usize).await;
4420                assert_eq!(
4421                    object.read(0, buf.as_mut()).await.unwrap(),
4422                    buf.len(),
4423                    "failed length check on case {}",
4424                    i,
4425                );
4426                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4427            }
4428
4429            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4430            fs.close().await.expect("close failed");
4431        }
4432    }
4433
4434    #[fuchsia::test(threads = 10)]
4435    async fn test_multi_overwrite_mode_updates() {
4436        let (fs, object) = test_filesystem_and_empty_object().await;
4437        let block_size = fs.block_size();
4438        let file_size = block_size * 10;
4439        object.truncate(file_size).await.unwrap();
4440
4441        let mut expected_bitmap = BitVec::from_elem(10, false);
4442
4443        object.allocate(0..10 * block_size).await.unwrap();
4444        assert_eq!(
4445            get_modes(&object, 0..10 * block_size).await,
4446            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4447        );
4448
4449        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4450        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4451        write_buf.as_mut_slice().copy_from_slice(&data);
4452        let mut transaction = object.new_transaction().await.unwrap();
4453        object
4454            .multi_overwrite(
4455                &mut transaction,
4456                0,
4457                &[2 * block_size..4 * block_size],
4458                write_buf.as_mut(),
4459            )
4460            .await
4461            .unwrap();
4462        transaction.commit().await.unwrap();
4463
4464        expected_bitmap.set(2, true);
4465        expected_bitmap.set(3, true);
4466        assert_eq!(
4467            get_modes(&object, 0..10 * block_size).await,
4468            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4469        );
4470
4471        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4472        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4473        write_buf.as_mut_slice().copy_from_slice(&data);
4474        let mut transaction = object.new_transaction().await.unwrap();
4475        object
4476            .multi_overwrite(
4477                &mut transaction,
4478                0,
4479                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4480                write_buf.as_mut(),
4481            )
4482            .await
4483            .unwrap();
4484        transaction.commit().await.unwrap();
4485
4486        expected_bitmap.set(4, true);
4487        expected_bitmap.set(6, true);
4488        assert_eq!(
4489            get_modes(&object, 0..10 * block_size).await,
4490            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4491        );
4492
4493        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4494        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4495        write_buf.as_mut_slice().copy_from_slice(&data);
4496        let mut transaction = object.new_transaction().await.unwrap();
4497        object
4498            .multi_overwrite(
4499                &mut transaction,
4500                0,
4501                &[
4502                    0..2 * block_size,
4503                    5 * block_size..6 * block_size,
4504                    7 * block_size..10 * block_size,
4505                ],
4506                write_buf.as_mut(),
4507            )
4508            .await
4509            .unwrap();
4510        transaction.commit().await.unwrap();
4511
4512        assert_eq!(
4513            get_modes(&object, 0..10 * block_size).await,
4514            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4515        );
4516
4517        fs.close().await.expect("close failed");
4518    }
4519}