fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityHasher, FsVerityHasherOptions, MerkleTreeBuilder,
34};
35use fuchsia_sync::Mutex;
36use futures::TryStreamExt;
37use futures::stream::FuturesUnordered;
38use fxfs_trace::trace;
39use std::cmp::min;
40use std::ops::{Deref, DerefMut, Range};
41use std::sync::Arc;
42use std::sync::atomic::{self, AtomicU64, Ordering};
43use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
44
45mod allocated_ranges;
46pub use allocated_ranges::{AllocatedRanges, RangeType};
47
48/// How much data each transaction will cover when writing an attribute across batches. Pulled from
49/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
50pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
51
52/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
53/// attribute. In addition to traditional files, this means things like the journal, superblocks,
54/// and layer files.
55///
56/// It caches the content size of the data attribute it was configured for, and has helpers for
57/// complex extent manipulation, as well as implementations of ReadObjectHandle and
58/// WriteObjectHandle.
59pub struct DataObjectHandle<S: HandleOwner> {
60    handle: StoreObjectHandle<S>,
61    attribute_id: u64,
62    content_size: AtomicU64,
63    fsverity_state: Mutex<FsverityState>,
64    overwrite_ranges: AllocatedRanges,
65}
66
67/// Represents the mapping of a file's contents to the physical storage backing it.
68#[derive(Debug, Clone)]
69pub struct FileExtent {
70    logical_offset: u64,
71    device_range: Range<u64>,
72}
73
74impl FileExtent {
75    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
76        // Ensure `device_range` is valid.
77        let length = device_range.length()?;
78        // Ensure no overflow when we calculate the end of the logical range.
79        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
80        Ok(Self { logical_offset, device_range })
81    }
82}
83
84impl FileExtent {
85    pub fn length(&self) -> u64 {
86        // SAFETY: We verified that the device_range's length is valid in Self::new.
87        unsafe { self.device_range.unchecked_length() }
88    }
89
90    pub fn logical_offset(&self) -> u64 {
91        self.logical_offset
92    }
93
94    pub fn logical_range(&self) -> Range<u64> {
95        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
96        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
97    }
98
99    pub fn device_range(&self) -> &Range<u64> {
100        &self.device_range
101    }
102}
103
104#[derive(Debug)]
105pub enum FsverityState {
106    None,
107    Started,
108    Pending(FsverityStateInner),
109    Some(FsverityStateInner),
110}
111
112#[derive(Debug)]
113pub struct FsverityStateInner {
114    root_digest: RootDigest,
115    salt: Vec<u8>,
116    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
117    // Potentially store a pager-backed vmo instead of passing around a boxed array.
118    merkle_tree: Box<[u8]>,
119}
120
121#[derive(Debug, Default)]
122pub struct OverwriteOptions {
123    // If false, then all the extents for the overwrite range must have been preallocated using
124    // preallocate_range or from existing writes.
125    pub allow_allocations: bool,
126    pub barrier_on_first_write: bool,
127}
128
129impl FsverityStateInner {
130    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
131        FsverityStateInner { root_digest, salt, merkle_tree }
132    }
133
134    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
135        match self.root_digest {
136            RootDigest::Sha256(_) => {
137                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
138            }
139            RootDigest::Sha512(_) => {
140                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
141            }
142        }
143    }
144
145    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
146        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
147            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
148
149        let root_digest = match descriptor.digest_algorithm() {
150            fio::HashAlgorithm::Sha256 => {
151                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
152            }
153            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
154            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
155        };
156        let hasher = descriptor.hasher();
157        let leaves =
158            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
159
160        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
161    }
162}
163
164impl<S: HandleOwner> Deref for DataObjectHandle<S> {
165    type Target = StoreObjectHandle<S>;
166    fn deref(&self) -> &Self::Target {
167        &self.handle
168    }
169}
170
171impl<S: HandleOwner> DataObjectHandle<S> {
172    pub fn new(
173        owner: Arc<S>,
174        object_id: u64,
175        permanent_keys: bool,
176        attribute_id: u64,
177        size: u64,
178        fsverity_state: FsverityState,
179        options: HandleOptions,
180        trace: bool,
181        overwrite_ranges: &[Range<u64>],
182    ) -> Self {
183        Self {
184            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
185            attribute_id,
186            content_size: AtomicU64::new(size),
187            fsverity_state: Mutex::new(fsverity_state),
188            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
189        }
190    }
191
192    pub fn attribute_id(&self) -> u64 {
193        self.attribute_id
194    }
195
196    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
197        &self.overwrite_ranges
198    }
199
200    pub fn is_verified_file(&self) -> bool {
201        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
202    }
203
204    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
205    /// If another caller has already started but not completed `enabled_verity`, returns
206    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
207    /// FxfsError::AlreadyExists.
208    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
209        let mut fsverity_guard = self.fsverity_state.lock();
210        match *fsverity_guard {
211            FsverityState::None => {
212                *fsverity_guard = FsverityState::Started;
213                Ok(())
214            }
215            FsverityState::Started | FsverityState::Pending(_) => {
216                Err(anyhow!(FxfsError::Unavailable))
217            }
218            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
219        }
220    }
221
222    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
223    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
224    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
225        let mut fsverity_guard = self.fsverity_state.lock();
226        assert!(matches!(*fsverity_guard, FsverityState::Started));
227        *fsverity_guard = FsverityState::Pending(descriptor);
228    }
229
230    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
231    /// not `FsverityState::Pending(_)`.
232    pub fn finalize_fsverity_state(&self) {
233        let mut fsverity_state_guard = self.fsverity_state.lock();
234        let mut_fsverity_state = fsverity_state_guard.deref_mut();
235        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
236        match fsverity_state {
237            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
238            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
239            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
240            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
241        }
242        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
243        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
244        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
245        // converting them back to sparse regions.
246        self.overwrite_ranges.clear();
247    }
248
249    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
250    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
251    /// verified against the root digest here, and will return an error if the tree is not correct.
252    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
253        let (metadata, hasher) = match descriptor {
254            FsverityMetadata::Internal(root_digest, salt) => {
255                let merkle_tree = self
256                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
257                    .await?
258                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
259                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
260                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
261                (metadata, hasher)
262            }
263            FsverityMetadata::F2fs(verity_range) => {
264                let expected_length = verity_range.length()? as usize;
265                let mut buffer = self
266                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
267                    .await;
268                ensure!(
269                    expected_length
270                        == self
271                            .handle
272                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
273                            .await?,
274                    FxfsError::Inconsistent
275                );
276                FsverityStateInner::from_bytes(
277                    buffer.as_slice()[0..expected_length].into(),
278                    self.block_size() as usize,
279                )?
280            }
281        };
282        // Validate the merkle tree data against the root before applying it.
283        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
284        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
285        let mut builder = MerkleTreeBuilder::new(hasher);
286        for leaf in leaf_chunks {
287            builder.push_data_hash(leaf.to_vec());
288        }
289        let tree = builder.finish();
290        let root_hash = match &metadata.root_digest {
291            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
292            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
293        };
294
295        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
296
297        let mut fsverity_guard = self.fsverity_state.lock();
298        assert!(matches!(*fsverity_guard, FsverityState::None));
299        *fsverity_guard = FsverityState::Some(metadata);
300
301        Ok(())
302    }
303
304    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
305    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
306    /// block-aligned. Fails on non fsverity-enabled files.
307    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
308        let block_size = self.block_size() as usize;
309        assert!(offset % block_size == 0);
310        let fsverity_state = self.fsverity_state.lock();
311        match &*fsverity_state {
312            FsverityState::None => {
313                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
314            }
315            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
316                "Enable verity has not yet completed, fsverity state: {:?}",
317                &*fsverity_state
318            )),
319            FsverityState::Some(metadata) => {
320                let hasher = metadata.get_hasher_for_block_size(block_size);
321                let leaf_nodes: Vec<&[u8]> =
322                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
323                fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
324                // TODO(b/318880297): Consider parallelizing computation.
325                for b in buffer.chunks(block_size) {
326                    ensure!(
327                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
328                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
329                    );
330                    offset += block_size;
331                }
332                Ok(())
333            }
334        }
335    }
336
337    /// Extend the file with the given extent.  The only use case for this right now is for files
338    /// that must exist at certain offsets on the device, such as super-blocks.
339    pub async fn extend<'a>(
340        &'a self,
341        transaction: &mut Transaction<'a>,
342        device_range: Range<u64>,
343    ) -> Result<(), Error> {
344        let old_end =
345            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
346        let new_size = old_end + device_range.end - device_range.start;
347        self.store().allocator().mark_allocated(
348            transaction,
349            self.store().store_object_id(),
350            device_range.clone(),
351        )?;
352        self.txn_update_size(transaction, new_size, None).await?;
353        let key_id = self.get_key(None).await?.0;
354        transaction.add(
355            self.store().store_object_id,
356            Mutation::merge_object(
357                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
358                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
359            ),
360        );
361        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
362    }
363
364    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
365    // the data from `buf`.
366    async fn align_buffer(
367        &self,
368        offset: u64,
369        buf: BufferRef<'_>,
370    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
371        self.handle.align_buffer(self.attribute_id(), offset, buf).await
372    }
373
374    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
375    // data will be encrypted if necessary.
376    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
377    // the buffer in-place rather than copying to another buffer if the write is already aligned.
378    async fn write_at(
379        &self,
380        offset: u64,
381        buf: MutableBufferRef<'_>,
382        device_offset: u64,
383    ) -> Result<MaybeChecksums, Error> {
384        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
385    }
386
387    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
388    pub async fn zero(
389        &self,
390        transaction: &mut Transaction<'_>,
391        range: Range<u64>,
392    ) -> Result<(), Error> {
393        self.handle.zero(transaction, self.attribute_id(), range).await
394    }
395
396    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
397    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
398    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
399    pub fn get_descriptor(&self) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
400        let fsverity_state = self.fsverity_state.lock();
401        match &*fsverity_state {
402            FsverityState::None => Ok(None),
403            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
404                "Enable verity has not yet completed, fsverity state: {:?}",
405                &*fsverity_state
406            )),
407            FsverityState::Some(metadata) => {
408                let (options, root_hash) = match &metadata.root_digest {
409                    RootDigest::Sha256(root_hash) => (
410                        fio::VerificationOptions {
411                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
412                            salt: Some(metadata.salt.clone()),
413                            ..Default::default()
414                        },
415                        root_hash.to_vec(),
416                    ),
417                    RootDigest::Sha512(root_hash) => (
418                        fio::VerificationOptions {
419                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
420                            salt: Some(metadata.salt.clone()),
421                            ..Default::default()
422                        },
423                        root_hash.clone(),
424                    ),
425                };
426                Ok(Some((options, root_hash)))
427            }
428        }
429    }
430
431    /// Reads the data attribute and computes a merkle tree from the data. The values of the
432    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
433    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
434    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
435    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
436    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
437    #[trace]
438    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
439        self.set_fsverity_state_started()?;
440        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
441        // the graveyard should process the tombstone before we start rewriting the attribute.
442        if let Some(_) = self
443            .store()
444            .tree()
445            .find(&ObjectKey::graveyard_attribute_entry(
446                self.store().graveyard_directory_object_id(),
447                self.object_id(),
448                FSVERITY_MERKLE_ATTRIBUTE_ID,
449            ))
450            .await?
451        {
452            self.store().filesystem().graveyard().flush().await;
453        }
454        let mut transaction = self.new_transaction().await?;
455        let hash_alg =
456            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
457        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
458        let (root_digest, merkle_tree) = match hash_alg {
459            fio::HashAlgorithm::Sha256 => {
460                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
461                    salt.clone(),
462                    self.block_size() as usize,
463                ));
464                let mut builder = MerkleTreeBuilder::new(hasher);
465                let mut offset = 0;
466                let size = self.get_size();
467                // TODO(b/314836822): Consider further tuning the buffer size to optimize
468                // performance. Experimentally, most verity-enabled files are <256K.
469                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
470                while offset < size {
471                    // TODO(b/314842875): Consider optimizations for sparse files.
472                    let read = self.read(offset, buf.as_mut()).await? as u64;
473                    assert!(offset + read <= size);
474                    builder.write(&buf.as_slice()[0..read as usize]);
475                    offset += read;
476                }
477                let tree = builder.finish();
478                let merkle_leaf_nodes: Vec<u8> =
479                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
480                // TODO(b/314194485): Eventually want streaming writes.
481                // The merkle tree attribute should not require trimming because it should not
482                // exist.
483                self.handle
484                    .write_new_attr_in_batches(
485                        &mut transaction,
486                        FSVERITY_MERKLE_ATTRIBUTE_ID,
487                        &merkle_leaf_nodes,
488                        WRITE_ATTR_BATCH_SIZE,
489                    )
490                    .await?;
491                let root: [u8; 32] = tree.root().try_into().unwrap();
492                (RootDigest::Sha256(root), merkle_leaf_nodes)
493            }
494            fio::HashAlgorithm::Sha512 => {
495                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
496                    salt.clone(),
497                    self.block_size() as usize,
498                ));
499                let mut builder = MerkleTreeBuilder::new(hasher);
500                let mut offset = 0;
501                let size = self.get_size();
502                // TODO(b/314836822): Consider further tuning the buffer size to optimize
503                // performance. Experimentally, most verity-enabled files are <256K.
504                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
505                while offset < size {
506                    // TODO(b/314842875): Consider optimizations for sparse files.
507                    let read = self.read(offset, buf.as_mut()).await? as u64;
508                    assert!(offset + read <= size);
509                    builder.write(&buf.as_slice()[0..read as usize]);
510                    offset += read;
511                }
512                let tree = builder.finish();
513                let merkle_leaf_nodes: Vec<u8> =
514                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
515                // TODO(b/314194485): Eventually want streaming writes.
516                // The merkle tree attribute should not require trimming because it should not
517                // exist.
518                self.handle
519                    .write_new_attr_in_batches(
520                        &mut transaction,
521                        FSVERITY_MERKLE_ATTRIBUTE_ID,
522                        &merkle_leaf_nodes,
523                        WRITE_ATTR_BATCH_SIZE,
524                    )
525                    .await?;
526                (RootDigest::Sha512(tree.root().to_vec()), merkle_leaf_nodes)
527            }
528            _ => {
529                bail!(
530                    anyhow!(FxfsError::NotSupported)
531                        .context(format!("hash algorithm not supported"))
532                );
533            }
534        };
535        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
536            transaction.add(
537                self.store().store_object_id,
538                Mutation::replace_or_insert_object(
539                    ObjectKey::graveyard_attribute_entry(
540                        self.store().graveyard_directory_object_id(),
541                        self.object_id(),
542                        FSVERITY_MERKLE_ATTRIBUTE_ID,
543                    ),
544                    ObjectValue::None,
545                ),
546            );
547        };
548        let descriptor = FsverityStateInner {
549            root_digest: root_digest.clone(),
550            salt: salt.clone(),
551            merkle_tree: merkle_tree.into(),
552        };
553        self.set_fsverity_state_pending(descriptor);
554        transaction.add_with_object(
555            self.store().store_object_id(),
556            Mutation::replace_or_insert_object(
557                ObjectKey::attribute(
558                    self.object_id(),
559                    DEFAULT_DATA_ATTRIBUTE_ID,
560                    AttributeKey::Attribute,
561                ),
562                ObjectValue::verified_attribute(
563                    self.get_size(),
564                    FsverityMetadata::Internal(root_digest, salt),
565                ),
566            ),
567            AssocObj::Borrowed(self),
568        );
569        transaction.commit().await?;
570        Ok(())
571    }
572
573    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
574    /// range is beyond the end of the file, the file size is updated.
575    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
576        debug_assert!(range.start < range.end);
577
578        // It's not required that callers of allocate use block aligned ranges, but we need to make
579        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
580        // what was asked for for block alignment purposes. We just need to make sure that the size
581        // of the file is still the non-block-aligned end of the range if the size was changed.
582        let mut new_range = range.clone();
583        new_range.start = round_down(new_range.start, self.block_size());
584        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
585        // required error code when the requested range is larger than the file size.
586        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
587
588        let mut transaction = self.new_transaction().await?;
589        let mut to_allocate = Vec::new();
590        let mut to_switch = Vec::new();
591        let key_id = self.get_key(None).await?.0;
592
593        {
594            let tree = &self.store().tree;
595            let layer_set = tree.layer_set();
596            let offset_key = ObjectKey::attribute(
597                self.object_id(),
598                self.attribute_id(),
599                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
600            );
601            let mut merger = layer_set.merger();
602            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
603
604            loop {
605                match iter.get() {
606                    Some(ItemRef {
607                        key:
608                            ObjectKey {
609                                object_id,
610                                data:
611                                    ObjectKeyData::Attribute(
612                                        attribute_id,
613                                        AttributeKey::Extent(extent_key),
614                                    ),
615                            },
616                        value: ObjectValue::Extent(extent_value),
617                        ..
618                    }) if *object_id == self.object_id()
619                        && *attribute_id == self.attribute_id() =>
620                    {
621                        // If the start of this extent is beyond the end of the range we are
622                        // allocating, we don't have any more work to do.
623                        if new_range.end <= extent_key.range.start {
624                            break;
625                        }
626                        // Add any prefix we might need to allocate.
627                        if new_range.start < extent_key.range.start {
628                            to_allocate.push(new_range.start..extent_key.range.start);
629                            new_range.start = extent_key.range.start;
630                        }
631                        let device_offset = match extent_value {
632                            ExtentValue::None => {
633                                // If the extent value is None, it indicates a deleted extent. In
634                                // that case, we just skip it entirely. By keeping the new_range
635                                // where it is, this section will get included in the new
636                                // allocations.
637                                iter.advance().await?;
638                                continue;
639                            }
640                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
641                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
642                                // If this extent is already in overwrite mode, we can skip it.
643                                if extent_key.range.end < new_range.end {
644                                    new_range.start = extent_key.range.end;
645                                    iter.advance().await?;
646                                    continue;
647                                } else {
648                                    new_range.start = new_range.end;
649                                    break;
650                                }
651                            }
652                            ExtentValue::Some { device_offset, .. } => *device_offset,
653                        };
654
655                        // Figure out how we have to break up the ranges.
656                        let device_offset =
657                            device_offset + (new_range.start - extent_key.range.start);
658                        if extent_key.range.end < new_range.end {
659                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
660                            new_range.start = extent_key.range.end;
661                        } else {
662                            to_switch.push((new_range.start..new_range.end, device_offset));
663                            new_range.start = new_range.end;
664                            break;
665                        }
666                    }
667                    // The records are sorted so if we find something that isn't an extent or
668                    // doesn't match the object id then there are no more extent records for this
669                    // object.
670                    _ => break,
671                }
672                iter.advance().await?;
673            }
674        }
675
676        if new_range.start < new_range.end {
677            to_allocate.push(new_range.clone());
678        }
679
680        // We can update the size in the first transaction because even if subsequent transactions
681        // don't get replayed, the data between the current and new end of the file will be zero
682        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
683        // in the first transaction, overwrite extents may be written past the end of the file
684        // which is an fsck error.
685        //
686        // The potential new size needs to be the non-block-aligned range end - we round up to the
687        // nearest block size for the actual allocation, but shouldn't do that for the file size.
688        let new_size = std::cmp::max(range.end, self.get_size());
689        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
690        // first transaction, in case we split transactions. This makes it okay to only replay the
691        // first transaction if power loss occurs - the file will be in an unusual state, but not
692        // an invalid one, if only part of the allocate goes through.
693        transaction.add_with_object(
694            self.store().store_object_id(),
695            Mutation::replace_or_insert_object(
696                ObjectKey::attribute(
697                    self.object_id(),
698                    self.attribute_id(),
699                    AttributeKey::Attribute,
700                ),
701                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
702            ),
703            AssocObj::Borrowed(self),
704        );
705
706        // The maximum number of mutations we are going to allow per transaction in allocate. This
707        // is probably quite a bit lower than the actual limit, but it should be large enough to
708        // handle most non-edge-case versions of allocate without splitting the transaction.
709        const MAX_TRANSACTION_SIZE: usize = 256;
710        for (switch_range, device_offset) in to_switch {
711            transaction.add_with_object(
712                self.store().store_object_id(),
713                Mutation::merge_object(
714                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
715                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
716                        device_offset,
717                        key_id,
718                    )),
719                ),
720                AssocObj::Borrowed(self),
721            );
722            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
723                transaction.commit_and_continue().await?;
724            }
725        }
726
727        let mut allocated = 0;
728        let allocator = self.store().allocator();
729        for mut allocate_range in to_allocate {
730            while allocate_range.start < allocate_range.end {
731                let device_range = allocator
732                    .allocate(
733                        &mut transaction,
734                        self.store().store_object_id(),
735                        allocate_range.end - allocate_range.start,
736                    )
737                    .await
738                    .context("allocation failed")?;
739                let device_range_len = device_range.end - device_range.start;
740
741                transaction.add_with_object(
742                    self.store().store_object_id(),
743                    Mutation::merge_object(
744                        ObjectKey::extent(
745                            self.object_id(),
746                            self.attribute_id(),
747                            allocate_range.start..allocate_range.start + device_range_len,
748                        ),
749                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
750                            device_range.start,
751                            (device_range_len / self.block_size()) as usize,
752                            key_id,
753                        )),
754                    ),
755                    AssocObj::Borrowed(self),
756                );
757
758                allocate_range.start += device_range_len;
759                allocated += device_range_len;
760
761                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
762                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
763                    transaction.commit_and_continue().await?;
764                    allocated = 0;
765                }
766            }
767        }
768
769        self.update_allocated_size(&mut transaction, allocated, 0).await?;
770        transaction.commit().await?;
771
772        Ok(())
773    }
774
775    /// Return information on a contiguous set of extents that has the same allocation status,
776    /// starting from `start_offset`. The information returned is if this set of extents are marked
777    /// allocated/not allocated and also the size of this set (in bytes). This is used when
778    /// querying slices for volumes.
779    /// This function expects `start_offset` to be aligned to block size
780    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
781        let block_size = self.block_size();
782        assert_eq!(start_offset % block_size, 0);
783
784        if start_offset > self.get_size() {
785            bail!(FxfsError::OutOfRange)
786        }
787
788        if start_offset == self.get_size() {
789            return Ok((false, 0));
790        }
791
792        let tree = &self.store().tree;
793        let layer_set = tree.layer_set();
794        let offset_key = ObjectKey::attribute(
795            self.object_id(),
796            self.attribute_id(),
797            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
798        );
799        let mut merger = layer_set.merger();
800        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
801
802        let mut allocated = None;
803        let mut end = start_offset;
804
805        loop {
806            // Iterate through the extents, each time setting `end` as the end of the previous
807            // extent
808            match iter.get() {
809                Some(ItemRef {
810                    key:
811                        ObjectKey {
812                            object_id,
813                            data:
814                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
815                        },
816                    value: ObjectValue::Extent(extent_value),
817                    ..
818                }) => {
819                    // Equivalent of getting no extents back
820                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
821                        if allocated == Some(false) || allocated.is_none() {
822                            end = self.get_size();
823                            allocated = Some(false);
824                        }
825                        break;
826                    }
827                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
828                    if extent_key.range.start > end {
829                        // If a previous extent has already been visited and we are tracking an
830                        // allocated set, we are only interested in an extent where the range of the
831                        // current extent follows immediately after the previous one.
832                        if allocated == Some(true) {
833                            break;
834                        } else {
835                            // The gap between the previous `end` and this extent is not allocated
836                            end = extent_key.range.start;
837                            allocated = Some(false);
838                            // Continue this iteration, except now the `end` is set to the end of
839                            // the "previous" extent which is this gap between the start_offset
840                            // and the current extent
841                        }
842                    }
843
844                    // We can assume that from here, the `end` points to the end of a previous
845                    // extent.
846                    match extent_value {
847                        // The current extent has been allocated
848                        ExtentValue::Some { .. } => {
849                            // Stop searching if previous extent was marked deleted
850                            if allocated == Some(false) {
851                                break;
852                            }
853                            allocated = Some(true);
854                        }
855                        // This extent has been marked deleted
856                        ExtentValue::None => {
857                            // Stop searching if previous extent was marked allocated
858                            if allocated == Some(true) {
859                                break;
860                            }
861                            allocated = Some(false);
862                        }
863                    }
864                    end = extent_key.range.end;
865                }
866                // This occurs when there are no extents left
867                None => {
868                    if allocated == Some(false) || allocated.is_none() {
869                        end = self.get_size();
870                        allocated = Some(false);
871                    }
872                    // Otherwise, we were monitoring extents that were allocated, so just exit.
873                    break;
874                }
875                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
876                Some(_) => {}
877            }
878            iter.advance().await?;
879        }
880
881        Ok((allocated.unwrap(), end - start_offset))
882    }
883
884    pub async fn txn_write<'a>(
885        &'a self,
886        transaction: &mut Transaction<'a>,
887        offset: u64,
888        buf: BufferRef<'_>,
889    ) -> Result<(), Error> {
890        if buf.is_empty() {
891            return Ok(());
892        }
893        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
894        self.multi_write(
895            transaction,
896            self.attribute_id(),
897            std::slice::from_ref(&aligned),
898            transfer_buf.as_mut(),
899        )
900        .await?;
901        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
902            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
903        }
904        Ok(())
905    }
906
907    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
908    // if encryption takes place.  The ranges must all be aligned and no change to content size is
909    // applied; the caller is responsible for updating size if required.
910    pub async fn multi_write<'a>(
911        &'a self,
912        transaction: &mut Transaction<'a>,
913        attribute_id: u64,
914        ranges: &[Range<u64>],
915        buf: MutableBufferRef<'_>,
916    ) -> Result<(), Error> {
917        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
918    }
919
920    // `buf` is mutable as an optimization, since the write may require encryption, we can
921    // encrypt the buffer in-place rather than copying to another buffer if the write is
922    // already aligned.
923    //
924    // Note: in the event of power failure during an overwrite() call, it is possible that
925    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
926    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
927    pub async fn overwrite(
928        &self,
929        mut offset: u64,
930        mut buf: MutableBufferRef<'_>,
931        options: OverwriteOptions,
932    ) -> Result<(), Error> {
933        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
934        let end = offset + buf.len() as u64;
935
936        let key_id = self.get_key(None).await?.0;
937
938        // The transaction only ends up being used if allow_allocations is true
939        let mut transaction =
940            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
941
942        // We build up a list of writes to perform later
943        let writes = FuturesUnordered::new();
944
945        if options.barrier_on_first_write {
946            self.store().device.barrier();
947        }
948
949        // We create a new scope here, so that the merger iterator will get dropped before we try to
950        // commit our transaction. Otherwise the transaction commit would block.
951        {
952            let store = self.store();
953            let store_object_id = store.store_object_id;
954            let allocator = store.allocator();
955            let tree = &store.tree;
956            let layer_set = tree.layer_set();
957            let mut merger = layer_set.merger();
958            let mut iter = merger
959                .query(Query::FullRange(&ObjectKey::attribute(
960                    self.object_id(),
961                    self.attribute_id(),
962                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
963                )))
964                .await?;
965            let block_size = self.block_size();
966
967            loop {
968                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
969                    Some(ItemRef {
970                        key:
971                            ObjectKey {
972                                object_id,
973                                data:
974                                    ObjectKeyData::Attribute(
975                                        attribute_id,
976                                        AttributeKey::Extent(ExtentKey { range }),
977                                    ),
978                            },
979                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
980                        ..
981                    }) if *object_id == self.object_id()
982                        && *attribute_id == self.attribute_id()
983                        && range.end == offset =>
984                    {
985                        iter.advance().await?;
986                        continue;
987                    }
988                    Some(ItemRef {
989                        key:
990                            ObjectKey {
991                                object_id,
992                                data:
993                                    ObjectKeyData::Attribute(
994                                        attribute_id,
995                                        AttributeKey::Extent(ExtentKey { range }),
996                                    ),
997                            },
998                        value,
999                        ..
1000                    }) if *object_id == self.object_id()
1001                        && *attribute_id == self.attribute_id()
1002                        && range.start <= offset =>
1003                    {
1004                        match value {
1005                            ObjectValue::Extent(ExtentValue::Some {
1006                                device_offset,
1007                                mode: ExtentMode::Raw,
1008                                ..
1009                            }) => {
1010                                ensure!(
1011                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1012                                    FxfsError::Inconsistent
1013                                );
1014                                let offset_within_extent = offset - range.start;
1015                                let remaining_length_of_extent = (range
1016                                    .end
1017                                    .checked_sub(offset)
1018                                    .ok_or(FxfsError::Inconsistent)?)
1019                                    as usize;
1020                                // Yields (device_offset, bytes_to_write, should_advance)
1021                                (
1022                                    device_offset + offset_within_extent,
1023                                    min(buf.len(), remaining_length_of_extent),
1024                                    true,
1025                                )
1026                            }
1027                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1028                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1029                                // a new extent without checksums?
1030                                bail!(
1031                                    "extent from ({},{}) which overlaps offset \
1032                                        {} has the wrong extent mode",
1033                                    range.start,
1034                                    range.end,
1035                                    offset
1036                                )
1037                            }
1038                            _ => {
1039                                bail!(
1040                                    "overwrite failed: extent overlapping offset {} has \
1041                                      unexpected ObjectValue",
1042                                    offset
1043                                )
1044                            }
1045                        }
1046                    }
1047                    maybe_item_ref => {
1048                        if let Some(transaction) = transaction.as_mut() {
1049                            assert_eq!(options.allow_allocations, true);
1050                            assert_eq!(offset % self.block_size(), 0);
1051
1052                            // We are going to make a new extent, but let's check if there is an
1053                            // extent after us. If there is an extent after us, then we don't want
1054                            // our new extent to bump into it...
1055                            let mut bytes_to_allocate =
1056                                round_up(buf.len() as u64, self.block_size())
1057                                    .ok_or(FxfsError::TooBig)?;
1058                            if let Some(ItemRef {
1059                                key:
1060                                    ObjectKey {
1061                                        object_id,
1062                                        data:
1063                                            ObjectKeyData::Attribute(
1064                                                attribute_id,
1065                                                AttributeKey::Extent(ExtentKey { range }),
1066                                            ),
1067                                    },
1068                                ..
1069                            }) = maybe_item_ref
1070                            {
1071                                if *object_id == self.object_id()
1072                                    && *attribute_id == self.attribute_id()
1073                                    && offset < range.start
1074                                {
1075                                    let bytes_until_next_extent = range.start - offset;
1076                                    bytes_to_allocate =
1077                                        min(bytes_to_allocate, bytes_until_next_extent);
1078                                }
1079                            }
1080
1081                            let device_range = allocator
1082                                .allocate(transaction, store_object_id, bytes_to_allocate)
1083                                .await?;
1084                            let device_range_len = device_range.end - device_range.start;
1085                            transaction.add(
1086                                store_object_id,
1087                                Mutation::insert_object(
1088                                    ObjectKey::extent(
1089                                        self.object_id(),
1090                                        self.attribute_id(),
1091                                        offset..offset + device_range_len,
1092                                    ),
1093                                    ObjectValue::Extent(ExtentValue::new_raw(
1094                                        device_range.start,
1095                                        key_id,
1096                                    )),
1097                                ),
1098                            );
1099
1100                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1101
1102                            // Yields (device_offset, bytes_to_write, should_advance)
1103                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1104                        } else {
1105                            bail!(
1106                                "no extent overlapping offset {}, \
1107                                and new allocations are not allowed",
1108                                offset
1109                            )
1110                        }
1111                    }
1112                };
1113                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1114                writes.push(self.write_at(offset, current_buf, device_offset));
1115                if remaining_buf.len() == 0 {
1116                    break;
1117                } else {
1118                    buf = remaining_buf;
1119                    offset += bytes_to_write as u64;
1120                    if should_advance {
1121                        iter.advance().await?;
1122                    }
1123                }
1124            }
1125        }
1126
1127        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1128        // The checksums are being ignored here, but we don't need to know them
1129        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1130
1131        if let Some(mut transaction) = transaction {
1132            assert_eq!(options.allow_allocations, true);
1133            if !transaction.is_empty() {
1134                if end > self.get_size() {
1135                    self.grow(&mut transaction, self.get_size(), end).await?;
1136                }
1137                transaction.commit().await?;
1138            }
1139        }
1140
1141        Ok(())
1142    }
1143
1144    // Within a transaction, the size of the object might have changed, so get the size from there
1145    // if it exists, otherwise, fall back on the cached size.
1146    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1147        transaction
1148            .get_object_mutation(
1149                self.store().store_object_id,
1150                ObjectKey::attribute(
1151                    self.object_id(),
1152                    self.attribute_id(),
1153                    AttributeKey::Attribute,
1154                ),
1155            )
1156            .and_then(|m| {
1157                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1158                    Some(size)
1159                } else {
1160                    None
1161                }
1162            })
1163            .unwrap_or_else(|| self.get_size())
1164    }
1165
1166    pub async fn txn_update_size<'a>(
1167        &'a self,
1168        transaction: &mut Transaction<'a>,
1169        new_size: u64,
1170        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1171        // Some it is set to the value, if None it is left unchanged.
1172        update_has_overwrite_extents: Option<bool>,
1173    ) -> Result<(), Error> {
1174        let key =
1175            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1176        let mut mutation = if let Some(mutation) =
1177            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1178        {
1179            mutation.clone()
1180        } else {
1181            ObjectStoreMutation {
1182                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1183                op: Operation::ReplaceOrInsert,
1184            }
1185        };
1186        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1187            *size = new_size;
1188            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1189                *has_overwrite_extents = update_has_overwrite_extents;
1190            }
1191        } else {
1192            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1193        }
1194        transaction.add_with_object(
1195            self.store().store_object_id(),
1196            Mutation::ObjectStore(mutation),
1197            AssocObj::Borrowed(self),
1198        );
1199        Ok(())
1200    }
1201
1202    async fn update_allocated_size(
1203        &self,
1204        transaction: &mut Transaction<'_>,
1205        allocated: u64,
1206        deallocated: u64,
1207    ) -> Result<(), Error> {
1208        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1209    }
1210
1211    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1212        if self
1213            .overwrite_ranges
1214            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1215        {
1216            // This returns true if there were ranges, but this truncate removed them all, which
1217            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1218            Ok(Some(false))
1219        } else {
1220            Ok(None)
1221        }
1222    }
1223
1224    pub async fn shrink<'a>(
1225        &'a self,
1226        transaction: &mut Transaction<'a>,
1227        size: u64,
1228        update_has_overwrite_extents: Option<bool>,
1229    ) -> Result<NeedsTrim, Error> {
1230        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1231        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1232        Ok(needs_trim)
1233    }
1234
1235    pub async fn grow<'a>(
1236        &'a self,
1237        transaction: &mut Transaction<'a>,
1238        old_size: u64,
1239        size: u64,
1240    ) -> Result<(), Error> {
1241        // Before growing the file, we must make sure that a previous trim has completed.
1242        let store = self.store();
1243        while matches!(
1244            store
1245                .trim_some(
1246                    transaction,
1247                    self.object_id(),
1248                    self.attribute_id(),
1249                    TrimMode::FromOffset(old_size)
1250                )
1251                .await?,
1252            TrimResult::Incomplete
1253        ) {
1254            transaction.commit_and_continue().await?;
1255        }
1256        // We might need to zero out the tail of the old last block.
1257        let block_size = self.block_size();
1258        if old_size % block_size != 0 {
1259            let layer_set = store.tree.layer_set();
1260            let mut merger = layer_set.merger();
1261            let aligned_old_size = round_down(old_size, block_size);
1262            let iter = merger
1263                .query(Query::FullRange(&ObjectKey::extent(
1264                    self.object_id(),
1265                    self.attribute_id(),
1266                    aligned_old_size..aligned_old_size + 1,
1267                )))
1268                .await?;
1269            if let Some(ItemRef {
1270                key:
1271                    ObjectKey {
1272                        object_id,
1273                        data:
1274                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1275                    },
1276                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1277                ..
1278            }) = iter.get()
1279            {
1280                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1281                    let device_offset = device_offset
1282                        .checked_add(aligned_old_size - extent_key.range.start)
1283                        .ok_or(FxfsError::Inconsistent)?;
1284                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1285                    let mut buf = self.allocate_buffer(block_size as usize).await;
1286                    // In the case that this extent is in OverwritePartial mode, there is a
1287                    // possibility that the last block is allocated, but not initialized yet, in
1288                    // which case we don't actually need to bother zeroing out the tail. However,
1289                    // it's not strictly incorrect to change uninitialized data, so we skip the
1290                    // check and blindly do it to keep it simpler here.
1291                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1292                        .await?;
1293                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1294                    self.multi_write(
1295                        transaction,
1296                        *attribute_id,
1297                        &[aligned_old_size..aligned_old_size + block_size],
1298                        buf.as_mut(),
1299                    )
1300                    .await?;
1301                }
1302            }
1303        }
1304        self.txn_update_size(transaction, size, None).await?;
1305        Ok(())
1306    }
1307
1308    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1309    /// Returns a set of device ranges (i.e. potentially multiple extents).
1310    ///
1311    /// It may not be possible to preallocate the entire requested range in one request
1312    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1313    /// we can up to some (arbitrary, internal) limit on transaction size.
1314    ///
1315    /// `file_range.start` is modified to point at the end of the logical range
1316    /// that was preallocated such that repeated calls to `preallocate_range` with new
1317    /// transactions can be used to preallocate ranges of any size.
1318    ///
1319    /// Requested range must be a multiple of block size.
1320    pub async fn preallocate_range<'a>(
1321        &'a self,
1322        transaction: &mut Transaction<'a>,
1323        file_range: &mut Range<u64>,
1324    ) -> Result<Vec<Range<u64>>, Error> {
1325        let block_size = self.block_size();
1326        assert!(file_range.is_aligned(block_size));
1327        assert!(!self.handle.is_encrypted());
1328        let mut ranges = Vec::new();
1329        let tree = &self.store().tree;
1330        let layer_set = tree.layer_set();
1331        let mut merger = layer_set.merger();
1332        let mut iter = merger
1333            .query(Query::FullRange(&ObjectKey::attribute(
1334                self.object_id(),
1335                self.attribute_id(),
1336                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1337            )))
1338            .await?;
1339        let mut allocated = 0;
1340        let key_id = self.get_key(None).await?.0;
1341        'outer: while file_range.start < file_range.end {
1342            let allocate_end = loop {
1343                match iter.get() {
1344                    // Case for allocated extents for the same object that overlap with file_range.
1345                    Some(ItemRef {
1346                        key:
1347                            ObjectKey {
1348                                object_id,
1349                                data:
1350                                    ObjectKeyData::Attribute(
1351                                        attribute_id,
1352                                        AttributeKey::Extent(ExtentKey { range }),
1353                                    ),
1354                            },
1355                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1356                        ..
1357                    }) if *object_id == self.object_id()
1358                        && *attribute_id == self.attribute_id()
1359                        && range.start < file_range.end =>
1360                    {
1361                        ensure!(
1362                            range.is_valid()
1363                                && range.is_aligned(block_size)
1364                                && device_offset % block_size == 0,
1365                            FxfsError::Inconsistent
1366                        );
1367                        // If the start of the requested file_range overlaps with an existing extent...
1368                        if range.start <= file_range.start {
1369                            // Record the existing extent and move on.
1370                            let device_range = device_offset
1371                                .checked_add(file_range.start - range.start)
1372                                .ok_or(FxfsError::Inconsistent)?
1373                                ..device_offset
1374                                    .checked_add(min(range.end, file_range.end) - range.start)
1375                                    .ok_or(FxfsError::Inconsistent)?;
1376                            file_range.start += device_range.end - device_range.start;
1377                            ranges.push(device_range);
1378                            if file_range.start >= file_range.end {
1379                                break 'outer;
1380                            }
1381                            iter.advance().await?;
1382                            continue;
1383                        } else {
1384                            // There's nothing allocated between file_range.start and the beginning
1385                            // of this extent.
1386                            break range.start;
1387                        }
1388                    }
1389                    // Case for deleted extents eclipsed by file_range.
1390                    Some(ItemRef {
1391                        key:
1392                            ObjectKey {
1393                                object_id,
1394                                data:
1395                                    ObjectKeyData::Attribute(
1396                                        attribute_id,
1397                                        AttributeKey::Extent(ExtentKey { range }),
1398                                    ),
1399                            },
1400                        value: ObjectValue::Extent(ExtentValue::None),
1401                        ..
1402                    }) if *object_id == self.object_id()
1403                        && *attribute_id == self.attribute_id()
1404                        && range.end < file_range.end =>
1405                    {
1406                        iter.advance().await?;
1407                    }
1408                    _ => {
1409                        // We can just preallocate the rest.
1410                        break file_range.end;
1411                    }
1412                }
1413            };
1414            let device_range = self
1415                .store()
1416                .allocator()
1417                .allocate(
1418                    transaction,
1419                    self.store().store_object_id(),
1420                    allocate_end - file_range.start,
1421                )
1422                .await
1423                .context("Allocation failed")?;
1424            allocated += device_range.end - device_range.start;
1425            let this_file_range =
1426                file_range.start..file_range.start + device_range.end - device_range.start;
1427            file_range.start = this_file_range.end;
1428            transaction.add(
1429                self.store().store_object_id,
1430                Mutation::merge_object(
1431                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1432                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1433                ),
1434            );
1435            ranges.push(device_range);
1436            // If we didn't allocate all that we requested, we'll loop around and try again.
1437            // ... unless we have filled the transaction. The caller should check file_range.
1438            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1439                break;
1440            }
1441        }
1442        // Update the file size if it changed.
1443        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1444            self.txn_update_size(transaction, file_range.start, None).await?;
1445        }
1446        self.update_allocated_size(transaction, allocated, 0).await?;
1447        Ok(ranges)
1448    }
1449
1450    pub async fn update_attributes<'a>(
1451        &self,
1452        transaction: &mut Transaction<'a>,
1453        node_attributes: Option<&fio::MutableNodeAttributes>,
1454        change_time: Option<Timestamp>,
1455    ) -> Result<(), Error> {
1456        // This codepath is only called by files, whose wrapping key id users cannot directly set
1457        // as per fscrypt.
1458        ensure!(
1459            !matches!(
1460                node_attributes,
1461                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1462            ),
1463            FxfsError::BadPath
1464        );
1465        self.handle.update_attributes(transaction, node_attributes, change_time).await
1466    }
1467
1468    /// Get the default set of transaction options for this object. This is mostly the overall
1469    /// default, modified by any [`HandleOptions`] held by this handle.
1470    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1471        self.handle.default_transaction_options()
1472    }
1473
1474    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1475        self.new_transaction_with_options(self.default_transaction_options()).await
1476    }
1477
1478    pub async fn new_transaction_with_options<'b>(
1479        &self,
1480        options: Options<'b>,
1481    ) -> Result<Transaction<'b>, Error> {
1482        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1483    }
1484
1485    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1486    pub async fn flush_device(&self) -> Result<(), Error> {
1487        self.handle.flush_device().await
1488    }
1489
1490    /// Reads an entire attribute.
1491    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1492        self.handle.read_attr(attribute_id).await
1493    }
1494
1495    /// Writes an entire attribute.  This *always* uses the volume data key.
1496    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1497        // Must be different attribute otherwise cached size gets out of date.
1498        assert_ne!(attribute_id, self.attribute_id());
1499        let store = self.store();
1500        let mut transaction = self.new_transaction().await?;
1501        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1502            transaction.commit_and_continue().await?;
1503            while matches!(
1504                store
1505                    .trim_some(
1506                        &mut transaction,
1507                        self.object_id(),
1508                        attribute_id,
1509                        TrimMode::FromOffset(data.len() as u64),
1510                    )
1511                    .await?,
1512                TrimResult::Incomplete
1513            ) {
1514                transaction.commit_and_continue().await?;
1515            }
1516        }
1517        transaction.commit().await?;
1518        Ok(())
1519    }
1520
1521    async fn read_and_decrypt(
1522        &self,
1523        device_offset: u64,
1524        file_offset: u64,
1525        buffer: MutableBufferRef<'_>,
1526        key_id: u64,
1527    ) -> Result<(), Error> {
1528        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1529    }
1530
1531    /// Truncates a file to a given size (growing/shrinking as required).
1532    ///
1533    /// Nb: Most code will want to call truncate() instead. This method is used
1534    /// to update the super block -- a case where we must borrow metadata space.
1535    pub async fn truncate_with_options(
1536        &self,
1537        options: Options<'_>,
1538        size: u64,
1539    ) -> Result<(), Error> {
1540        let mut transaction = self.new_transaction_with_options(options).await?;
1541        let old_size = self.get_size();
1542        if size == old_size {
1543            return Ok(());
1544        }
1545        if size < old_size {
1546            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1547            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1548                // The file needs to be trimmed.
1549                transaction.commit_and_continue().await?;
1550                let store = self.store();
1551                while matches!(
1552                    store
1553                        .trim_some(
1554                            &mut transaction,
1555                            self.object_id(),
1556                            self.attribute_id(),
1557                            TrimMode::FromOffset(size)
1558                        )
1559                        .await?,
1560                    TrimResult::Incomplete
1561                ) {
1562                    if let Err(error) = transaction.commit_and_continue().await {
1563                        warn!(error:?; "Failed to trim after truncate");
1564                        return Ok(());
1565                    }
1566                }
1567                if let Err(error) = transaction.commit().await {
1568                    warn!(error:?; "Failed to trim after truncate");
1569                }
1570                return Ok(());
1571            }
1572        } else {
1573            self.grow(&mut transaction, old_size, size).await?;
1574        }
1575        transaction.commit().await?;
1576        Ok(())
1577    }
1578
1579    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1580        // We don't take a read guard here since the object properties are contained in a single
1581        // object, which cannot be inconsistent with itself. The LSM tree does not return
1582        // intermediate states for a single object.
1583        let item = self
1584            .store()
1585            .tree
1586            .find(&ObjectKey::object(self.object_id()))
1587            .await?
1588            .expect("Unable to find object record");
1589        match item.value {
1590            ObjectValue::Object {
1591                kind: ObjectKind::File { refs, .. },
1592                attributes:
1593                    ObjectAttributes {
1594                        creation_time,
1595                        modification_time,
1596                        posix_attributes,
1597                        allocated_size,
1598                        access_time,
1599                        change_time,
1600                        ..
1601                    },
1602            } => Ok(ObjectProperties {
1603                refs,
1604                allocated_size,
1605                data_attribute_size: self.get_size(),
1606                creation_time,
1607                modification_time,
1608                access_time,
1609                change_time,
1610                sub_dirs: 0,
1611                posix_attributes,
1612                casefold: false,
1613                wrapping_key_id: None,
1614            }),
1615            _ => bail!(FxfsError::NotFile),
1616        }
1617    }
1618
1619    // Returns the contents of this object. This object must be < |limit| bytes in size.
1620    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1621        let size = self.get_size();
1622        if size > limit as u64 {
1623            bail!("Object too big ({} > {})", size, limit);
1624        }
1625        let mut buf = self.allocate_buffer(size as usize).await;
1626        self.read(0u64, buf.as_mut()).await?;
1627        Ok(buf.as_slice().into())
1628    }
1629
1630    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1631    /// their logical offset within the file.
1632    ///
1633    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1634    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1635        let mut extents = Vec::new();
1636        let tree = &self.store().tree;
1637        let layer_set = tree.layer_set();
1638        let mut merger = layer_set.merger();
1639        let mut iter = merger
1640            .query(Query::FullRange(&ObjectKey::attribute(
1641                self.object_id(),
1642                self.attribute_id(),
1643                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1644            )))
1645            .await?;
1646        loop {
1647            match iter.get() {
1648                Some(ItemRef {
1649                    key:
1650                        ObjectKey {
1651                            object_id,
1652                            data:
1653                                ObjectKeyData::Attribute(
1654                                    attribute_id,
1655                                    AttributeKey::Extent(ExtentKey { range }),
1656                                ),
1657                        },
1658                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1659                    ..
1660                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1661                    let logical_offset = range.start;
1662                    let device_range = *device_offset..*device_offset + range.length()?;
1663                    extents.push(FileExtent::new(logical_offset, device_range)?);
1664                }
1665                _ => break,
1666            }
1667            iter.advance().await?;
1668        }
1669        Ok(extents)
1670    }
1671}
1672
1673impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1674    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1675        match mutation {
1676            Mutation::ObjectStore(ObjectStoreMutation {
1677                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1678                ..
1679            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1680            Mutation::ObjectStore(ObjectStoreMutation {
1681                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1682                ..
1683            }) => {
1684                debug_assert_eq!(
1685                    self.get_size(),
1686                    *size,
1687                    "size should be set when verity is enabled and must not change"
1688                );
1689                self.finalize_fsverity_state()
1690            }
1691            Mutation::ObjectStore(ObjectStoreMutation {
1692                item:
1693                    ObjectItem {
1694                        key:
1695                            ObjectKey {
1696                                object_id,
1697                                data:
1698                                    ObjectKeyData::Attribute(
1699                                        attr_id,
1700                                        AttributeKey::Extent(ExtentKey { range }),
1701                                    ),
1702                            },
1703                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1704                        ..
1705                    },
1706                ..
1707            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1708                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1709                    self.overwrite_ranges.apply_range(range.clone())
1710                }
1711                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1712            },
1713            _ => {}
1714        }
1715    }
1716}
1717
1718impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1719    fn set_trace(&self, v: bool) {
1720        self.handle.set_trace(v)
1721    }
1722
1723    fn object_id(&self) -> u64 {
1724        self.handle.object_id()
1725    }
1726
1727    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1728        self.handle.allocate_buffer(size)
1729    }
1730
1731    fn block_size(&self) -> u64 {
1732        self.handle.block_size()
1733    }
1734}
1735
1736#[async_trait]
1737impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1738    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1739        let fs = self.store().filesystem();
1740        let guard = fs
1741            .lock_manager()
1742            .read_lock(lock_keys![LockKey::object_attribute(
1743                self.store().store_object_id,
1744                self.object_id(),
1745                self.attribute_id(),
1746            )])
1747            .await;
1748
1749        let size = self.get_size();
1750        if offset >= size {
1751            return Ok(0);
1752        }
1753        let length = min(buf.len() as u64, size - offset) as usize;
1754        buf = buf.subslice_mut(0..length);
1755        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1756        if self.is_verified_file() {
1757            self.verify_data(offset as usize, buf.as_slice())?;
1758        }
1759        Ok(length)
1760    }
1761
1762    fn get_size(&self) -> u64 {
1763        self.content_size.load(atomic::Ordering::Relaxed)
1764    }
1765}
1766
1767impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1768    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1769        let offset = offset.unwrap_or_else(|| self.get_size());
1770        let mut transaction = self.new_transaction().await?;
1771        self.txn_write(&mut transaction, offset, buf).await?;
1772        let new_size = self.txn_get_size(&transaction);
1773        transaction.commit().await?;
1774        Ok(new_size)
1775    }
1776
1777    async fn truncate(&self, size: u64) -> Result<(), Error> {
1778        self.truncate_with_options(self.default_transaction_options(), size).await
1779    }
1780
1781    async fn flush(&self) -> Result<(), Error> {
1782        Ok(())
1783    }
1784}
1785
1786/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1787/// write go directly to the handle in a transaction.
1788pub struct DirectWriter<'a, S: HandleOwner> {
1789    handle: &'a DataObjectHandle<S>,
1790    options: transaction::Options<'a>,
1791    buffer: Buffer<'a>,
1792    offset: u64,
1793    buf_offset: usize,
1794}
1795
1796const BUFFER_SIZE: usize = 1_048_576;
1797
1798impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1799    fn drop(&mut self) {
1800        if self.buf_offset != 0 {
1801            warn!("DirectWriter: dropping data, did you forget to call complete?");
1802        }
1803    }
1804}
1805
1806impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1807    pub async fn new(
1808        handle: &'a DataObjectHandle<S>,
1809        options: transaction::Options<'a>,
1810    ) -> DirectWriter<'a, S> {
1811        Self {
1812            handle,
1813            options,
1814            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1815            offset: 0,
1816            buf_offset: 0,
1817        }
1818    }
1819
1820    async fn flush(&mut self) -> Result<(), Error> {
1821        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1822        self.handle
1823            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1824            .await?;
1825        transaction.commit().await?;
1826        self.offset += self.buf_offset as u64;
1827        self.buf_offset = 0;
1828        Ok(())
1829    }
1830}
1831
1832impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1833    fn block_size(&self) -> u64 {
1834        self.handle.block_size()
1835    }
1836
1837    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1838        while buf.len() > 0 {
1839            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1840            self.buffer
1841                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1842                .as_mut_slice()
1843                .copy_from_slice(&buf[..to_do]);
1844            self.buf_offset += to_do;
1845            if self.buf_offset == BUFFER_SIZE {
1846                self.flush().await?;
1847            }
1848            buf = &buf[to_do..];
1849        }
1850        Ok(())
1851    }
1852
1853    async fn complete(&mut self) -> Result<(), Error> {
1854        self.flush().await?;
1855        Ok(())
1856    }
1857
1858    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1859        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1860            self.buffer
1861                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1862                .as_mut_slice()
1863                .fill(0);
1864            self.buf_offset += amount as usize;
1865        } else {
1866            self.flush().await?;
1867            self.offset += amount;
1868        }
1869        Ok(())
1870    }
1871}
1872
1873#[cfg(test)]
1874mod tests {
1875    use crate::errors::FxfsError;
1876    use crate::filesystem::{
1877        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1878    };
1879    use crate::fsck::{
1880        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1881    };
1882    use crate::lsm_tree::Query;
1883    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1884    use crate::object_handle::{
1885        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1886    };
1887    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1888    use crate::object_store::directory::replace_child;
1889    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1890    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1891    use crate::object_store::volume::root_volume;
1892    use crate::object_store::{
1893        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, Directory, ExtentKey,
1894        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1895        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1896        TRANSACTION_MUTATION_THRESHOLD,
1897    };
1898    use crate::range::RangeExt;
1899    use crate::round::{round_down, round_up};
1900    use assert_matches::assert_matches;
1901    use bit_vec::BitVec;
1902    use fsverity_merkle::FsVerityDescriptorRaw;
1903    use fuchsia_sync::Mutex;
1904    use futures::FutureExt;
1905    use futures::channel::oneshot::channel;
1906    use futures::stream::{FuturesUnordered, StreamExt};
1907    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1908    use fxfs_insecure_crypto::InsecureCrypt;
1909    use mundane::hash::{Digest, Hasher, Sha256};
1910    use std::ops::Range;
1911    use std::sync::Arc;
1912    use std::time::Duration;
1913    use storage_device::DeviceHolder;
1914    use storage_device::fake_device::FakeDevice;
1915    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1916
1917    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1918
1919    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1920    // device block.
1921    const TEST_DATA_OFFSET: u64 = 5000;
1922    const TEST_DATA: &[u8] = b"hello";
1923    const TEST_OBJECT_SIZE: u64 = 5678;
1924    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1925    const TEST_OBJECT_NAME: &str = "foo";
1926
1927    async fn test_filesystem() -> OpenFxFilesystem {
1928        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1929        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1930    }
1931
1932    async fn test_filesystem_and_object_with_key(
1933        crypt: Option<&dyn Crypt>,
1934        write_object_test_data: bool,
1935    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1936        let fs = test_filesystem().await;
1937        let store = fs.root_store();
1938        let object;
1939
1940        let mut transaction = fs
1941            .clone()
1942            .new_transaction(
1943                lock_keys![LockKey::object(
1944                    store.store_object_id(),
1945                    store.root_directory_object_id()
1946                )],
1947                Options::default(),
1948            )
1949            .await
1950            .expect("new_transaction failed");
1951
1952        object = if let Some(crypt) = crypt {
1953            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1954            let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await.unwrap();
1955            ObjectStore::create_object_with_key(
1956                &store,
1957                &mut transaction,
1958                object_id,
1959                HandleOptions::default(),
1960                EncryptionKey::Fxfs(key),
1961                unwrapped_key,
1962            )
1963            .await
1964            .expect("create_object failed")
1965        } else {
1966            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1967                .await
1968                .expect("create_object failed")
1969        };
1970
1971        let root_directory =
1972            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1973        root_directory
1974            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1975            .await
1976            .expect("add_child_file failed");
1977
1978        if write_object_test_data {
1979            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1980            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1981            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1982            object
1983                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1984                .await
1985                .expect("write failed");
1986        }
1987        transaction.commit().await.expect("commit failed");
1988        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
1989        (fs, object)
1990    }
1991
1992    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1993        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), true).await
1994    }
1995
1996    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
1997    {
1998        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), false).await
1999    }
2000
2001    #[fuchsia::test]
2002    async fn test_zero_buf_len_read() {
2003        let (fs, object) = test_filesystem_and_object().await;
2004        let mut buf = object.allocate_buffer(0).await;
2005        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2006        fs.close().await.expect("Close failed");
2007    }
2008
2009    #[fuchsia::test]
2010    async fn test_beyond_eof_read() {
2011        let (fs, object) = test_filesystem_and_object().await;
2012        let offset = TEST_OBJECT_SIZE as usize - 2;
2013        let align = offset % fs.block_size() as usize;
2014        let len: usize = 2;
2015        let mut buf = object.allocate_buffer(align + len + 1).await;
2016        buf.as_mut_slice().fill(123u8);
2017        assert_eq!(
2018            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2019            align + len
2020        );
2021        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2022        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2023        fs.close().await.expect("Close failed");
2024    }
2025
2026    #[fuchsia::test]
2027    async fn test_beyond_eof_read_from() {
2028        let (fs, object) = test_filesystem_and_object().await;
2029        let handle = &*object;
2030        let offset = TEST_OBJECT_SIZE as usize - 2;
2031        let align = offset % fs.block_size() as usize;
2032        let len: usize = 2;
2033        let mut buf = object.allocate_buffer(align + len + 1).await;
2034        buf.as_mut_slice().fill(123u8);
2035        assert_eq!(
2036            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2037            align + len
2038        );
2039        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2040        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2041        fs.close().await.expect("Close failed");
2042    }
2043
2044    #[fuchsia::test]
2045    async fn test_beyond_eof_read_unchecked() {
2046        let (fs, object) = test_filesystem_and_object().await;
2047        let offset = TEST_OBJECT_SIZE as usize - 2;
2048        let align = offset % fs.block_size() as usize;
2049        let len: usize = 2;
2050        let mut buf = object.allocate_buffer(align + len + 1).await;
2051        buf.as_mut_slice().fill(123u8);
2052        let guard = fs
2053            .lock_manager()
2054            .read_lock(lock_keys![LockKey::object_attribute(
2055                object.store().store_object_id,
2056                object.object_id(),
2057                0,
2058            )])
2059            .await;
2060        object
2061            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2062            .await
2063            .expect("read failed");
2064        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2065        fs.close().await.expect("Close failed");
2066    }
2067
2068    #[fuchsia::test]
2069    async fn test_read_sparse() {
2070        let (fs, object) = test_filesystem_and_object().await;
2071        // Deliberately read not right to eof.
2072        let len = TEST_OBJECT_SIZE as usize - 1;
2073        let mut buf = object.allocate_buffer(len).await;
2074        buf.as_mut_slice().fill(123u8);
2075        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2076        let mut expected = vec![0; len];
2077        let offset = TEST_DATA_OFFSET as usize;
2078        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2079        assert_eq!(buf.as_slice()[..len], expected[..]);
2080        fs.close().await.expect("Close failed");
2081    }
2082
2083    #[fuchsia::test]
2084    async fn test_read_after_writes_interspersed_with_flush() {
2085        let (fs, object) = test_filesystem_and_object().await;
2086
2087        object.owner().flush().await.expect("flush failed");
2088
2089        // Write more test data to the first block fo the file.
2090        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2091        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2092        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2093
2094        let len = TEST_OBJECT_SIZE as usize - 1;
2095        let mut buf = object.allocate_buffer(len).await;
2096        buf.as_mut_slice().fill(123u8);
2097        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2098
2099        let mut expected = vec![0u8; len];
2100        let offset = TEST_DATA_OFFSET as usize;
2101        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2102        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2103        assert_eq!(buf.as_slice(), &expected);
2104        fs.close().await.expect("Close failed");
2105    }
2106
2107    #[fuchsia::test]
2108    async fn test_read_after_truncate_and_extend() {
2109        let (fs, object) = test_filesystem_and_object().await;
2110
2111        // Arrange for there to be <extent><deleted-extent><extent>.
2112        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2113        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2114        // This adds an extent at 0..512.
2115        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2116        // This deletes 512..1024.
2117        object.truncate(3).await.expect("truncate failed");
2118        let data = b"foo";
2119        let offset = 1500u64;
2120        let align = (offset % fs.block_size() as u64) as usize;
2121        let mut buf = object.allocate_buffer(align + data.len()).await;
2122        buf.as_mut_slice()[align..].copy_from_slice(data);
2123        // This adds 1024..1536.
2124        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2125
2126        const LEN1: usize = 1503;
2127        let mut buf = object.allocate_buffer(LEN1).await;
2128        buf.as_mut_slice().fill(123u8);
2129        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2130        let mut expected = [0; LEN1];
2131        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2132        expected[1500..].copy_from_slice(b"foo");
2133        assert_eq!(buf.as_slice(), &expected);
2134
2135        // Also test a read that ends midway through the deleted extent.
2136        const LEN2: usize = 601;
2137        let mut buf = object.allocate_buffer(LEN2).await;
2138        buf.as_mut_slice().fill(123u8);
2139        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2140        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2141        fs.close().await.expect("Close failed");
2142    }
2143
2144    #[fuchsia::test]
2145    async fn test_read_whole_blocks_with_multiple_objects() {
2146        let (fs, object) = test_filesystem_and_object().await;
2147        let block_size = object.block_size() as usize;
2148        let mut buffer = object.allocate_buffer(block_size).await;
2149        buffer.as_mut_slice().fill(0xaf);
2150        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2151
2152        let store = object.owner();
2153        let mut transaction = fs
2154            .clone()
2155            .new_transaction(lock_keys![], Options::default())
2156            .await
2157            .expect("new_transaction failed");
2158        let object2 =
2159            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2160                .await
2161                .expect("create_object failed");
2162        transaction.commit().await.expect("commit failed");
2163        let mut ef_buffer = object.allocate_buffer(block_size).await;
2164        ef_buffer.as_mut_slice().fill(0xef);
2165        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2166
2167        let mut buffer = object.allocate_buffer(block_size).await;
2168        buffer.as_mut_slice().fill(0xaf);
2169        object
2170            .write_or_append(Some(block_size as u64), buffer.as_ref())
2171            .await
2172            .expect("write failed");
2173        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2174        object2
2175            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2176            .await
2177            .expect("write failed");
2178
2179        let mut buffer = object.allocate_buffer(4 * block_size).await;
2180        buffer.as_mut_slice().fill(123);
2181        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2182        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2183        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2184        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2185        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2186        fs.close().await.expect("Close failed");
2187    }
2188
2189    #[fuchsia::test]
2190    async fn test_alignment() {
2191        let (fs, object) = test_filesystem_and_object().await;
2192
2193        struct AlignTest {
2194            fill: u8,
2195            object: DataObjectHandle<ObjectStore>,
2196            mirror: Vec<u8>,
2197        }
2198
2199        impl AlignTest {
2200            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2201                let mirror = {
2202                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2203                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2204                    buf.as_slice().to_vec()
2205                };
2206                Self { fill: 0, object, mirror }
2207            }
2208
2209            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2210            // operation to an in-memory copy of the object.
2211            // Each subsequent call bumps the value of fill.
2212            // It is expected that the object and its mirror maintain identical content.
2213            async fn test(&mut self, range: Range<u64>) {
2214                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2215                self.fill += 1;
2216                buf.as_mut_slice().fill(self.fill);
2217                self.object
2218                    .write_or_append(Some(range.start), buf.as_ref())
2219                    .await
2220                    .expect("write_or_append failed");
2221                if range.end > self.mirror.len() as u64 {
2222                    self.mirror.resize(range.end as usize, 0);
2223                }
2224                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2225                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2226                assert_eq!(
2227                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2228                    self.mirror.len()
2229                );
2230                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2231            }
2232        }
2233
2234        let block_size = object.block_size() as u64;
2235        let mut align = AlignTest::new(object).await;
2236
2237        // Fill the object to start with (with 1).
2238        align.test(0..2 * block_size + 1).await;
2239
2240        // Unaligned head (fills with 2, overwrites that with 3).
2241        align.test(1..block_size).await;
2242        align.test(1..2 * block_size).await;
2243
2244        // Unaligned tail (fills with 4 and 5).
2245        align.test(0..block_size - 1).await;
2246        align.test(0..2 * block_size - 1).await;
2247
2248        // Both unaligned (fills with 6 and 7).
2249        align.test(1..block_size - 1).await;
2250        align.test(1..2 * block_size - 1).await;
2251
2252        fs.close().await.expect("Close failed");
2253    }
2254
2255    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2256        let allocator = fs.allocator();
2257        let allocated_before = allocator.get_allocated_bytes();
2258        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2259        object
2260            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2261            .await
2262            .expect("preallocate_range failed");
2263        transaction.commit().await.expect("commit failed");
2264        assert!(object.get_size() < 1048576);
2265        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2266        object
2267            .preallocate_range(&mut transaction, &mut (0..1048576))
2268            .await
2269            .expect("preallocate_range failed");
2270        transaction.commit().await.expect("commit failed");
2271        assert_eq!(object.get_size(), 1048576);
2272        // Check that it didn't reallocate the space for the existing extent
2273        let allocated_after = allocator.get_allocated_bytes();
2274        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2275
2276        let mut buf = object
2277            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2278            .await;
2279        buf.as_mut_slice().fill(47);
2280        object
2281            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2282            .await
2283            .expect("write failed");
2284        buf.as_mut_slice().fill(95);
2285        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2286        object
2287            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2288            .await
2289            .expect("write failed");
2290
2291        // Make sure there were no more allocations.
2292        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2293
2294        // Read back the data and make sure it is what we expect.
2295        let mut buf = object.allocate_buffer(104876).await;
2296        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2297        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2298        assert_eq!(
2299            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2300            TEST_DATA
2301        );
2302        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2303    }
2304
2305    #[fuchsia::test]
2306    async fn test_preallocate_range() {
2307        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2308        test_preallocate_common(&fs, object).await;
2309        fs.close().await.expect("Close failed");
2310    }
2311
2312    // This is identical to the previous test except that we flush so that extents end up in
2313    // different layers.
2314    #[fuchsia::test]
2315    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2316        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2317        object.owner().flush().await.expect("flush failed");
2318        test_preallocate_common(&fs, object).await;
2319        fs.close().await.expect("Close failed");
2320    }
2321
2322    #[fuchsia::test]
2323    async fn test_already_preallocated() {
2324        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2325        let allocator = fs.allocator();
2326        let allocated_before = allocator.get_allocated_bytes();
2327        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2328        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2329        object
2330            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2331            .await
2332            .expect("preallocate_range failed");
2333        transaction.commit().await.expect("commit failed");
2334        // Check that it didn't reallocate any new space.
2335        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2336        fs.close().await.expect("Close failed");
2337    }
2338
2339    #[fuchsia::test]
2340    async fn test_overwrite_when_preallocated_at_start_of_file() {
2341        // The standard test data we put in the test object would cause an extent with checksums
2342        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2343        let (fs, object) = test_filesystem_and_empty_object().await;
2344
2345        let object = ObjectStore::open_object(
2346            object.owner(),
2347            object.object_id(),
2348            HandleOptions::default(),
2349            None,
2350        )
2351        .await
2352        .expect("open_object failed");
2353
2354        assert_eq!(fs.block_size(), 4096);
2355
2356        let mut write_buf = object.allocate_buffer(4096).await;
2357        write_buf.as_mut_slice().fill(95);
2358
2359        // First try to overwrite without allowing allocations
2360        // We expect this to fail, since nothing is allocated yet
2361        object
2362            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2363            .await
2364            .expect_err("overwrite succeeded");
2365
2366        // Now preallocate some space (exactly one block)
2367        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2368        object
2369            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2370            .await
2371            .expect("preallocate_range failed");
2372        transaction.commit().await.expect("commit failed");
2373
2374        // Now try the same overwrite command as before, it should work this time,
2375        // even with allocations disabled...
2376        {
2377            let mut read_buf = object.allocate_buffer(4096).await;
2378            object.read(0, read_buf.as_mut()).await.expect("read failed");
2379            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2380        }
2381        object
2382            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2383            .await
2384            .expect("overwrite failed");
2385        {
2386            let mut read_buf = object.allocate_buffer(4096).await;
2387            object.read(0, read_buf.as_mut()).await.expect("read failed");
2388            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2389        }
2390
2391        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2392        // one block earlier at offset 0
2393        object
2394            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2395            .await
2396            .expect_err("overwrite succeeded");
2397
2398        // We can't assert anything about the existing bytes, because they haven't been allocated
2399        // yet and they could contain any values
2400        object
2401            .overwrite(
2402                4096,
2403                write_buf.as_mut(),
2404                OverwriteOptions { allow_allocations: true, ..Default::default() },
2405            )
2406            .await
2407            .expect("overwrite failed");
2408        {
2409            let mut read_buf = object.allocate_buffer(4096).await;
2410            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2411            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2412        }
2413
2414        // Check that the overwrites haven't messed up the filesystem state
2415        let fsck_options = FsckOptions {
2416            fail_on_warning: true,
2417            no_lock: true,
2418            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2419            ..Default::default()
2420        };
2421        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2422
2423        fs.close().await.expect("Close failed");
2424    }
2425
2426    #[fuchsia::test]
2427    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2428        // The standard test data we put in the test object would cause an extent with checksums
2429        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2430        let (fs, object) = test_filesystem_and_empty_object().await;
2431
2432        let object = ObjectStore::open_object(
2433            object.owner(),
2434            object.object_id(),
2435            HandleOptions::default(),
2436            None,
2437        )
2438        .await
2439        .expect("open_object failed");
2440
2441        assert_eq!(fs.block_size(), 4096);
2442        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2443
2444        // Let's create some non-holes
2445        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2446        object
2447            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2448            .await
2449            .expect("preallocate_range failed");
2450        object
2451            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2452            .await
2453            .expect("preallocate_range failed");
2454        object
2455            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2456            .await
2457            .expect("preallocate_range failed");
2458        object
2459            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2460            .await
2461            .expect("preallocate_range failed");
2462        transaction.commit().await.expect("commit failed");
2463
2464        assert_eq!(object.get_size(), 524288);
2465
2466        let mut write_buf = object.allocate_buffer(4096).await;
2467        write_buf.as_mut_slice().fill(95);
2468
2469        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2470        object
2471            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2472            .await
2473            .expect_err("overwrite succeeded");
2474        object
2475            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2476            .await
2477            .expect_err("overwrite succeeded");
2478        object
2479            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2480            .await
2481            .expect_err("overwrite succeeded");
2482        object
2483            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2484            .await
2485            .expect_err("overwrite succeeded");
2486
2487        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2488        {
2489            let mut read_buf = object.allocate_buffer(4096).await;
2490            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2491            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2492        }
2493        object
2494            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2495            .await
2496            .expect("overwrite failed");
2497        {
2498            let mut read_buf = object.allocate_buffer(4096).await;
2499            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2500            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2501        }
2502        {
2503            let mut read_buf = object.allocate_buffer(4096).await;
2504            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2505            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2506        }
2507        object
2508            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2509            .await
2510            .expect("overwrite failed");
2511        {
2512            let mut read_buf = object.allocate_buffer(4096).await;
2513            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2514            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2515        }
2516        {
2517            let mut read_buf = object.allocate_buffer(4096).await;
2518            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2519            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2520        }
2521        object
2522            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2523            .await
2524            .expect("overwrite failed");
2525        {
2526            let mut read_buf = object.allocate_buffer(4096).await;
2527            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2528            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2529        }
2530        {
2531            let mut read_buf = object.allocate_buffer(4096).await;
2532            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2533            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2534        }
2535        object
2536            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2537            .await
2538            .expect("overwrite failed");
2539        {
2540            let mut read_buf = object.allocate_buffer(4096).await;
2541            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2542            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2543        }
2544
2545        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2546        let mut huge_write_buf = object.allocate_buffer(524288).await;
2547        huge_write_buf.as_mut_slice().fill(96);
2548
2549        // With allocations disabled, the big overwrite should fail...
2550        object
2551            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2552            .await
2553            .expect_err("overwrite succeeded");
2554        // ... but it should work when allocations are enabled
2555        object
2556            .overwrite(
2557                0,
2558                huge_write_buf.as_mut(),
2559                OverwriteOptions { allow_allocations: true, ..Default::default() },
2560            )
2561            .await
2562            .expect("overwrite failed");
2563        {
2564            let mut read_buf = object.allocate_buffer(524288).await;
2565            object.read(0, read_buf.as_mut()).await.expect("read failed");
2566            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2567        }
2568
2569        // Check that the overwrites haven't messed up the filesystem state
2570        let fsck_options = FsckOptions {
2571            fail_on_warning: true,
2572            no_lock: true,
2573            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2574            ..Default::default()
2575        };
2576        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2577
2578        fs.close().await.expect("Close failed");
2579    }
2580
2581    #[fuchsia::test]
2582    async fn test_overwrite_when_unallocated_at_start_of_file() {
2583        // The standard test data we put in the test object would cause an extent with checksums
2584        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2585        let (fs, object) = test_filesystem_and_empty_object().await;
2586
2587        let object = ObjectStore::open_object(
2588            object.owner(),
2589            object.object_id(),
2590            HandleOptions::default(),
2591            None,
2592        )
2593        .await
2594        .expect("open_object failed");
2595
2596        assert_eq!(fs.block_size(), 4096);
2597
2598        let mut write_buf = object.allocate_buffer(4096).await;
2599        write_buf.as_mut_slice().fill(95);
2600
2601        // First try to overwrite without allowing allocations
2602        // We expect this to fail, since nothing is allocated yet
2603        object
2604            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2605            .await
2606            .expect_err("overwrite succeeded");
2607
2608        // Now try the same overwrite command as before, but allow allocations
2609        object
2610            .overwrite(
2611                0,
2612                write_buf.as_mut(),
2613                OverwriteOptions { allow_allocations: true, ..Default::default() },
2614            )
2615            .await
2616            .expect("overwrite failed");
2617        {
2618            let mut read_buf = object.allocate_buffer(4096).await;
2619            object.read(0, read_buf.as_mut()).await.expect("read failed");
2620            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2621        }
2622
2623        // Now try to overwrite at the next block. This should fail if allocations are disabled
2624        object
2625            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2626            .await
2627            .expect_err("overwrite succeeded");
2628
2629        // ... but it should work if allocations are enabled
2630        object
2631            .overwrite(
2632                4096,
2633                write_buf.as_mut(),
2634                OverwriteOptions { allow_allocations: true, ..Default::default() },
2635            )
2636            .await
2637            .expect("overwrite failed");
2638        {
2639            let mut read_buf = object.allocate_buffer(4096).await;
2640            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2641            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2642        }
2643
2644        // Check that the overwrites haven't messed up the filesystem state
2645        let fsck_options = FsckOptions {
2646            fail_on_warning: true,
2647            no_lock: true,
2648            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2649            ..Default::default()
2650        };
2651        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2652
2653        fs.close().await.expect("Close failed");
2654    }
2655
2656    #[fuchsia::test]
2657    async fn test_overwrite_can_extend_a_file() {
2658        // The standard test data we put in the test object would cause an extent with checksums
2659        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2660        let (fs, object) = test_filesystem_and_empty_object().await;
2661
2662        let object = ObjectStore::open_object(
2663            object.owner(),
2664            object.object_id(),
2665            HandleOptions::default(),
2666            None,
2667        )
2668        .await
2669        .expect("open_object failed");
2670
2671        assert_eq!(fs.block_size(), 4096);
2672        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2673
2674        let mut write_buf = object.allocate_buffer(4096).await;
2675        write_buf.as_mut_slice().fill(95);
2676
2677        // Let's try to fill up the last block, and increase the file size in doing so
2678        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2679
2680        // Expected to fail with allocations disabled
2681        object
2682            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2683            .await
2684            .expect_err("overwrite succeeded");
2685        // ... but expected to succeed with allocations enabled
2686        object
2687            .overwrite(
2688                last_block_offset,
2689                write_buf.as_mut(),
2690                OverwriteOptions { allow_allocations: true, ..Default::default() },
2691            )
2692            .await
2693            .expect("overwrite failed");
2694        {
2695            let mut read_buf = object.allocate_buffer(4096).await;
2696            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2697            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2698        }
2699
2700        assert_eq!(object.get_size(), 8192);
2701
2702        // Let's try to write at the next block, too
2703        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2704
2705        // Expected to fail with allocations disabled
2706        object
2707            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2708            .await
2709            .expect_err("overwrite succeeded");
2710        // ... but expected to succeed with allocations enabled
2711        object
2712            .overwrite(
2713                next_block_offset,
2714                write_buf.as_mut(),
2715                OverwriteOptions { allow_allocations: true, ..Default::default() },
2716            )
2717            .await
2718            .expect("overwrite failed");
2719        {
2720            let mut read_buf = object.allocate_buffer(4096).await;
2721            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2722            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2723        }
2724
2725        assert_eq!(object.get_size(), 12288);
2726
2727        // Check that the overwrites haven't messed up the filesystem state
2728        let fsck_options = FsckOptions {
2729            fail_on_warning: true,
2730            no_lock: true,
2731            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2732            ..Default::default()
2733        };
2734        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2735
2736        fs.close().await.expect("Close failed");
2737    }
2738
2739    #[fuchsia::test]
2740    async fn test_enable_verity() {
2741        let fs: OpenFxFilesystem = test_filesystem().await;
2742        let mut transaction = fs
2743            .clone()
2744            .new_transaction(lock_keys![], Options::default())
2745            .await
2746            .expect("new_transaction failed");
2747        let store = fs.root_store();
2748        let object = Arc::new(
2749            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2750                .await
2751                .expect("create_object failed"),
2752        );
2753
2754        transaction.commit().await.unwrap();
2755
2756        object
2757            .enable_verity(fio::VerificationOptions {
2758                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2759                salt: Some(vec![]),
2760                ..Default::default()
2761            })
2762            .await
2763            .expect("set verified file metadata failed");
2764
2765        let handle =
2766            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2767                .await
2768                .expect("open_object failed");
2769
2770        assert!(handle.is_verified_file());
2771
2772        fs.close().await.expect("Close failed");
2773    }
2774
2775    #[fuchsia::test]
2776    async fn test_enable_verity_large_file() {
2777        // Need to make a large FakeDevice to create space for a 67 MB file.
2778        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2779        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2780        let root_store = fs.root_store();
2781        let mut transaction = fs
2782            .clone()
2783            .new_transaction(lock_keys![], Options::default())
2784            .await
2785            .expect("new_transaction failed");
2786
2787        let handle = ObjectStore::create_object(
2788            &root_store,
2789            &mut transaction,
2790            HandleOptions::default(),
2791            None,
2792        )
2793        .await
2794        .expect("failed to create object");
2795        transaction.commit().await.expect("commit failed");
2796        let mut offset = 0;
2797
2798        // Write a file big enough to trigger multiple transactions on enable_verity().
2799        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2800        buf.as_mut_slice().fill(1);
2801        for _ in 0..130 {
2802            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2803            offset += WRITE_ATTR_BATCH_SIZE as u64;
2804        }
2805
2806        handle
2807            .enable_verity(fio::VerificationOptions {
2808                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2809                salt: Some(vec![]),
2810                ..Default::default()
2811            })
2812            .await
2813            .expect("set verified file metadata failed");
2814
2815        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2816        offset = 0;
2817        for _ in 0..130 {
2818            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2819            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2820            offset += WRITE_ATTR_BATCH_SIZE as u64;
2821        }
2822
2823        fsck(fs.clone()).await.expect("fsck failed");
2824        fs.close().await.expect("Close failed");
2825    }
2826
2827    #[fuchsia::test]
2828    async fn test_retry_enable_verity_on_reboot() {
2829        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2830        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2831        let root_store = fs.root_store();
2832        let mut transaction = fs
2833            .clone()
2834            .new_transaction(lock_keys![], Options::default())
2835            .await
2836            .expect("new_transaction failed");
2837
2838        let handle = ObjectStore::create_object(
2839            &root_store,
2840            &mut transaction,
2841            HandleOptions::default(),
2842            None,
2843        )
2844        .await
2845        .expect("failed to create object");
2846        transaction.commit().await.expect("commit failed");
2847
2848        let object_id = {
2849            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2850            transaction.add(
2851                root_store.store_object_id(),
2852                Mutation::replace_or_insert_object(
2853                    ObjectKey::graveyard_attribute_entry(
2854                        root_store.graveyard_directory_object_id(),
2855                        handle.object_id(),
2856                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2857                    ),
2858                    ObjectValue::Some,
2859                ),
2860            );
2861
2862            // This write should span three transactions. This test mimics the behavior when the
2863            // last transaction gets interrupted by a filesystem.close().
2864            handle
2865                .write_new_attr_in_batches(
2866                    &mut transaction,
2867                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2868                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2869                    WRITE_ATTR_BATCH_SIZE,
2870                )
2871                .await
2872                .expect("failed to write merkle attribute");
2873
2874            handle.object_id()
2875            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2876            // release the transaction locks.
2877        };
2878
2879        fs.close().await.expect("failed to close filesystem");
2880        let device = fs.take_device().await;
2881        device.reopen(false);
2882
2883        let fs =
2884            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2885        fsck(fs.clone()).await.expect("fsck failed");
2886        fs.close().await.expect("failed to close filesystem");
2887        let device = fs.take_device().await;
2888        device.reopen(false);
2889
2890        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2891        let fs = FxFilesystem::open(device).await.expect("open failed");
2892        let root_store = fs.root_store();
2893        let handle =
2894            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2895                .await
2896                .expect("open_object failed");
2897        handle
2898            .enable_verity(fio::VerificationOptions {
2899                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2900                salt: Some(vec![]),
2901                ..Default::default()
2902            })
2903            .await
2904            .expect("set verified file metadata failed");
2905
2906        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2907        // isn't strictly necessary for the test to pass (the graveyard marker was already
2908        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2909        // graveyard entry not being removed upon processing.
2910        fs.graveyard().flush().await;
2911        assert_eq!(
2912            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2913            Some(vec![0; <Sha256 as Hasher>::Digest::DIGEST_LEN].into())
2914        );
2915        fsck(fs.clone()).await.expect("fsck failed");
2916        fs.close().await.expect("Close failed");
2917    }
2918
2919    #[fuchsia::test]
2920    async fn test_verify_data_corrupt_file() {
2921        let fs: OpenFxFilesystem = test_filesystem().await;
2922        let mut transaction = fs
2923            .clone()
2924            .new_transaction(lock_keys![], Options::default())
2925            .await
2926            .expect("new_transaction failed");
2927        let store = fs.root_store();
2928        let object = Arc::new(
2929            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2930                .await
2931                .expect("create_object failed"),
2932        );
2933
2934        transaction.commit().await.unwrap();
2935
2936        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2937        buf.as_mut_slice().fill(123);
2938        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2939
2940        object
2941            .enable_verity(fio::VerificationOptions {
2942                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2943                salt: Some(vec![]),
2944                ..Default::default()
2945            })
2946            .await
2947            .expect("set verified file metadata failed");
2948
2949        // Change file contents and ensure verification fails
2950        buf.as_mut_slice().fill(234);
2951        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2952        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2953
2954        fs.close().await.expect("Close failed");
2955    }
2956
2957    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
2958    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
2959    // paths.
2960    #[fuchsia::test]
2961    async fn test_parse_f2fs_verity() {
2962        let fs: OpenFxFilesystem = test_filesystem().await;
2963        let mut transaction = fs
2964            .clone()
2965            .new_transaction(lock_keys![], Options::default())
2966            .await
2967            .expect("new_transaction failed");
2968        let store = fs.root_store();
2969        let object = Arc::new(
2970            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2971                .await
2972                .expect("create_object failed"),
2973        );
2974
2975        transaction.commit().await.unwrap();
2976        let file_size = fs.block_size() * 2;
2977        // Write over one block to make there be leaf hashes.
2978        {
2979            let mut buf = object.allocate_buffer(file_size as usize).await;
2980            buf.as_mut_slice().fill(64);
2981            assert_eq!(
2982                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
2983                file_size
2984            );
2985        }
2986
2987        // Enable verity normally, then shift the type.
2988        object
2989            .enable_verity(fio::VerificationOptions {
2990                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2991                salt: Some(vec![]),
2992                ..Default::default()
2993            })
2994            .await
2995            .expect("set verified file metadata failed");
2996        let (verity_info, root_hash) =
2997            object.get_descriptor().expect("Getting verity info").unwrap();
2998
2999        let mut transaction = fs
3000            .clone()
3001            .new_transaction(
3002                lock_keys![LockKey::Object {
3003                    store_object_id: store.store_object_id(),
3004                    object_id: object.object_id()
3005                }],
3006                Options::default(),
3007            )
3008            .await
3009            .expect("new_transaction failed");
3010        transaction.add(
3011            store.store_object_id(),
3012            Mutation::replace_or_insert_object(
3013                ObjectKey::attribute(
3014                    object.object_id(),
3015                    DEFAULT_DATA_ATTRIBUTE_ID,
3016                    AttributeKey::Attribute,
3017                ),
3018                ObjectValue::verified_attribute(
3019                    file_size,
3020                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3021                ),
3022            ),
3023        );
3024        transaction.add(
3025            store.store_object_id(),
3026            Mutation::replace_or_insert_object(
3027                ObjectKey::attribute(
3028                    object.object_id(),
3029                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3030                    AttributeKey::Attribute,
3031                ),
3032                ObjectValue::attribute(fs.block_size() * 2, false),
3033            ),
3034        );
3035        {
3036            let descriptor = FsVerityDescriptorRaw::new(
3037                fio::HashAlgorithm::Sha256,
3038                fs.block_size(),
3039                file_size,
3040                root_hash.as_slice(),
3041                match &verity_info.salt {
3042                    Some(salt) => salt.as_slice(),
3043                    None => [0u8; 0].as_slice(),
3044                },
3045            )
3046            .expect("Creating descriptor");
3047            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3048            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3049            object
3050                .multi_write(
3051                    &mut transaction,
3052                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3053                    &[fs.block_size()..(fs.block_size() * 2)],
3054                    buf.as_mut(),
3055                )
3056                .await
3057                .expect("Writing descriptor");
3058        }
3059        transaction.commit().await.unwrap();
3060
3061        let handle =
3062            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3063                .await
3064                .expect("open_object failed");
3065
3066        assert!(handle.is_verified_file());
3067
3068        let mut buf = object.allocate_buffer(file_size as usize).await;
3069        assert_eq!(
3070            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3071            file_size as usize
3072        );
3073
3074        fs.close().await.expect("Close failed");
3075    }
3076
3077    #[fuchsia::test]
3078    async fn test_verify_data_corrupt_tree() {
3079        let fs: OpenFxFilesystem = test_filesystem().await;
3080        let object_id = {
3081            let store = fs.root_store();
3082            let mut transaction = fs
3083                .clone()
3084                .new_transaction(lock_keys![], Options::default())
3085                .await
3086                .expect("new_transaction failed");
3087            let object = Arc::new(
3088                ObjectStore::create_object(
3089                    &store,
3090                    &mut transaction,
3091                    HandleOptions::default(),
3092                    None,
3093                )
3094                .await
3095                .expect("create_object failed"),
3096            );
3097            let object_id = object.object_id();
3098
3099            transaction.commit().await.unwrap();
3100
3101            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3102            buf.as_mut_slice().fill(123);
3103            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3104
3105            object
3106                .enable_verity(fio::VerificationOptions {
3107                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3108                    salt: Some(vec![]),
3109                    ..Default::default()
3110                })
3111                .await
3112                .expect("set verified file metadata failed");
3113            object.read(0, buf.as_mut()).await.expect("verified read");
3114
3115            // Corrupt the merkle tree before closing.
3116            let mut merkle = object
3117                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3118                .await
3119                .unwrap()
3120                .expect("Reading merkle tree");
3121            merkle[0] = merkle[0].wrapping_add(1);
3122            object
3123                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3124                .await
3125                .expect("Overwriting merkle");
3126
3127            object_id
3128        }; // Close object.
3129
3130        // Reopening the object should complain about the corrupted merkle tree.
3131        assert!(
3132            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3133                .await
3134                .is_err()
3135        );
3136        fs.close().await.expect("Close failed");
3137    }
3138
3139    #[fuchsia::test]
3140    async fn test_extend() {
3141        let fs = test_filesystem().await;
3142        let handle;
3143        let mut transaction = fs
3144            .clone()
3145            .new_transaction(lock_keys![], Options::default())
3146            .await
3147            .expect("new_transaction failed");
3148        let store = fs.root_store();
3149        handle =
3150            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3151                .await
3152                .expect("create_object failed");
3153
3154        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3155        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3156        // of 2MiB here.
3157        const START_OFFSET: u64 = 2048 * 1024;
3158        handle
3159            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3160            .await
3161            .expect("extend failed");
3162        transaction.commit().await.expect("commit failed");
3163        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3164        buf.as_mut_slice().fill(123);
3165        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3166        buf.as_mut_slice().fill(67);
3167        handle.read(0, buf.as_mut()).await.expect("read failed");
3168        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3169        fs.close().await.expect("Close failed");
3170    }
3171
3172    #[fuchsia::test]
3173    async fn test_truncate_deallocates_old_extents() {
3174        let (fs, object) = test_filesystem_and_object().await;
3175        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3176        buf.as_mut_slice().fill(0xaa);
3177        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3178
3179        let allocator = fs.allocator();
3180        let allocated_before = allocator.get_allocated_bytes();
3181        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3182        let allocated_after = allocator.get_allocated_bytes();
3183        assert!(
3184            allocated_after < allocated_before,
3185            "before = {} after = {}",
3186            allocated_before,
3187            allocated_after
3188        );
3189        fs.close().await.expect("Close failed");
3190    }
3191
3192    #[fuchsia::test]
3193    async fn test_truncate_zeroes_tail_block() {
3194        let (fs, object) = test_filesystem_and_object().await;
3195
3196        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3197        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3198            .await
3199            .expect("truncate failed");
3200
3201        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3202        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3203        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3204
3205        let mut expected = TEST_DATA.to_vec();
3206        expected[3..].fill(0);
3207        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3208    }
3209
3210    #[fuchsia::test]
3211    async fn test_trim() {
3212        // Format a new filesystem.
3213        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3214        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3215        let block_size = fs.block_size();
3216        root_volume(fs.clone())
3217            .await
3218            .expect("root_volume failed")
3219            .new_volume("test", NewChildStoreOptions::default())
3220            .await
3221            .expect("volume failed");
3222        fs.close().await.expect("close failed");
3223        let device = fs.take_device().await;
3224        device.reopen(false);
3225
3226        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3227        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3228        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3229        // graveyard trims the file as expected.
3230        #[derive(Default)]
3231        struct Context {
3232            store: Option<Arc<ObjectStore>>,
3233            object_id: Option<u64>,
3234        }
3235        let shared_context = Arc::new(Mutex::new(Context::default()));
3236
3237        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3238
3239        // Wait for an object to get tombstoned by the graveyard.
3240        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3241            loop {
3242                if let Err(e) =
3243                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3244                {
3245                    assert!(
3246                        FxfsError::NotFound.matches(&e),
3247                        "open_object didn't fail with NotFound: {:?}",
3248                        e
3249                    );
3250                    break;
3251                }
3252                // The graveyard should eventually tombstone the object.
3253                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3254            }
3255        }
3256
3257        // Checks to see if the object needs to be trimmed.
3258        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3259            let root_directory = Directory::open(store, store.root_directory_object_id())
3260                .await
3261                .expect("open failed");
3262            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3263            if let Some((oid, _, _)) = oid {
3264                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3265                    .await
3266                    .expect("open_object failed");
3267                let props = object.get_properties().await.expect("get_properties failed");
3268                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3269                    Some(object)
3270                } else {
3271                    None
3272                }
3273            } else {
3274                None
3275            }
3276        }
3277
3278        let shared_context_clone = shared_context.clone();
3279        let post_commit = move || {
3280            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3281            let shared_context = shared_context_clone.clone();
3282            async move {
3283                // First run fsck on the current filesystem.
3284                let options = FsckOptions {
3285                    fail_on_warning: true,
3286                    no_lock: true,
3287                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3288                    ..Default::default()
3289                };
3290                let fs = store.filesystem();
3291
3292                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3293                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3294                    .await
3295                    .expect("fsck_volume_with_options failed");
3296
3297                // Now check that we can replay this correctly.
3298                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3299                    .await
3300                    .expect("sync failed");
3301                let device = fs.device().snapshot().expect("snapshot failed");
3302
3303                let object_id = shared_context.lock().object_id.clone();
3304
3305                let fs2 = FxFilesystemBuilder::new()
3306                    .skip_initial_reap(object_id.is_none())
3307                    .open(device)
3308                    .await
3309                    .expect("open failed");
3310
3311                // If the "foo" file exists check that allocated size matches content size.
3312                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3313                let store =
3314                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3315
3316                if let Some(oid) = object_id {
3317                    // For the second pass, the object should get tombstoned.
3318                    expect_tombstoned(&store, oid).await;
3319                } else if let Some(object) = needs_trim(&store).await {
3320                    // Extend the file and make sure that it is correctly trimmed.
3321                    object.truncate(object_size).await.expect("truncate failed");
3322                    let mut buf = object.allocate_buffer(block_size as usize).await;
3323                    object
3324                        .read(object_size - block_size * 2, buf.as_mut())
3325                        .await
3326                        .expect("read failed");
3327                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3328
3329                    // Remount, this time with the graveyard performing an initial reap and the
3330                    // object should get trimmed.
3331                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3332                        .await
3333                        .expect("open failed");
3334                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3335                    let store = root_vol
3336                        .volume("test", StoreOptions::default())
3337                        .await
3338                        .expect("volume failed");
3339                    while needs_trim(&store).await.is_some() {
3340                        // The object has been truncated, but still has some data allocated to
3341                        // it.  The graveyard should trim the object eventually.
3342                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3343                    }
3344
3345                    // Run fsck.
3346                    fsck_with_options(fs.clone(), &options)
3347                        .await
3348                        .expect("fsck_with_options failed");
3349                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3350                        .await
3351                        .expect("fsck_volume_with_options failed");
3352                    fs.close().await.expect("close failed");
3353                }
3354
3355                // Run fsck on fs2.
3356                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3357                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3358                    .await
3359                    .expect("fsck_volume_with_options failed");
3360                fs2.close().await.expect("close failed");
3361            }
3362            .boxed()
3363        };
3364
3365        let fs = FxFilesystemBuilder::new()
3366            .post_commit_hook(post_commit)
3367            .open(device)
3368            .await
3369            .expect("open failed");
3370
3371        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3372        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3373
3374        shared_context.lock().store = Some(store.clone());
3375
3376        let root_directory =
3377            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3378
3379        let object;
3380        let mut transaction = fs
3381            .clone()
3382            .new_transaction(
3383                lock_keys![LockKey::object(
3384                    store.store_object_id(),
3385                    store.root_directory_object_id()
3386                )],
3387                Options::default(),
3388            )
3389            .await
3390            .expect("new_transaction failed");
3391        object = root_directory
3392            .create_child_file(&mut transaction, "foo")
3393            .await
3394            .expect("create_object failed");
3395        transaction.commit().await.expect("commit failed");
3396
3397        let mut transaction = fs
3398            .clone()
3399            .new_transaction(
3400                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3401                Options::default(),
3402            )
3403            .await
3404            .expect("new_transaction failed");
3405
3406        // Two passes: first with a regular object, and then with that object moved into the
3407        // graveyard.
3408        let mut pass = 0;
3409        loop {
3410            // Create enough extents in it such that when we truncate the object it will require
3411            // more than one transaction.
3412            let mut buf = object.allocate_buffer(5).await;
3413            buf.as_mut_slice().fill(1);
3414            // Write every other block.
3415            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3416                object
3417                    .txn_write(&mut transaction, offset, buf.as_ref())
3418                    .await
3419                    .expect("write failed");
3420            }
3421            transaction.commit().await.expect("commit failed");
3422            // This should take up more than one transaction.
3423            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3424
3425            if pass == 1 {
3426                break;
3427            }
3428
3429            // Store the object ID so that we can make sure the object is always tombstoned
3430            // after remount (see above).
3431            shared_context.lock().object_id = Some(object.object_id());
3432
3433            transaction = fs
3434                .clone()
3435                .new_transaction(
3436                    lock_keys![
3437                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3438                        LockKey::object(store.store_object_id(), object.object_id()),
3439                    ],
3440                    Options::default(),
3441                )
3442                .await
3443                .expect("new_transaction failed");
3444
3445            // Move the object into the graveyard.
3446            replace_child(&mut transaction, None, (&root_directory, "foo"))
3447                .await
3448                .expect("replace_child failed");
3449            store.add_to_graveyard(&mut transaction, object.object_id());
3450
3451            pass += 1;
3452        }
3453
3454        fs.close().await.expect("Close failed");
3455    }
3456
3457    #[fuchsia::test]
3458    async fn test_adjust_refs() {
3459        let (fs, object) = test_filesystem_and_object().await;
3460        let store = object.owner();
3461        let mut transaction = fs
3462            .clone()
3463            .new_transaction(
3464                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3465                Options::default(),
3466            )
3467            .await
3468            .expect("new_transaction failed");
3469        assert_eq!(
3470            store
3471                .adjust_refs(&mut transaction, object.object_id(), 1)
3472                .await
3473                .expect("adjust_refs failed"),
3474            false
3475        );
3476        transaction.commit().await.expect("commit failed");
3477
3478        let allocator = fs.allocator();
3479        let allocated_before = allocator.get_allocated_bytes();
3480        let mut transaction = fs
3481            .clone()
3482            .new_transaction(
3483                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3484                Options::default(),
3485            )
3486            .await
3487            .expect("new_transaction failed");
3488        assert_eq!(
3489            store
3490                .adjust_refs(&mut transaction, object.object_id(), -2)
3491                .await
3492                .expect("adjust_refs failed"),
3493            true
3494        );
3495        transaction.commit().await.expect("commit failed");
3496
3497        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3498
3499        store
3500            .tombstone_object(
3501                object.object_id(),
3502                Options { borrow_metadata_space: true, ..Default::default() },
3503            )
3504            .await
3505            .expect("purge failed");
3506
3507        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3508
3509        // We need to remove the directory entry, too, otherwise fsck will complain
3510        {
3511            let mut transaction = fs
3512                .clone()
3513                .new_transaction(
3514                    lock_keys![LockKey::object(
3515                        store.store_object_id(),
3516                        store.root_directory_object_id()
3517                    )],
3518                    Options::default(),
3519                )
3520                .await
3521                .expect("new_transaction failed");
3522            let root_directory = Directory::open(&store, store.root_directory_object_id())
3523                .await
3524                .expect("open failed");
3525            transaction.add(
3526                store.store_object_id(),
3527                Mutation::replace_or_insert_object(
3528                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3529                    ObjectValue::None,
3530                ),
3531            );
3532            transaction.commit().await.expect("commit failed");
3533        }
3534
3535        fsck_with_options(
3536            fs.clone(),
3537            &FsckOptions {
3538                fail_on_warning: true,
3539                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3540                ..Default::default()
3541            },
3542        )
3543        .await
3544        .expect("fsck_with_options failed");
3545
3546        fs.close().await.expect("Close failed");
3547    }
3548
3549    #[fuchsia::test]
3550    async fn test_locks() {
3551        let (fs, object) = test_filesystem_and_object().await;
3552        let (send1, recv1) = channel();
3553        let (send2, recv2) = channel();
3554        let (send3, recv3) = channel();
3555        let done = Mutex::new(false);
3556        let mut futures = FuturesUnordered::new();
3557        futures.push(
3558            async {
3559                let mut t = object.new_transaction().await.expect("new_transaction failed");
3560                send1.send(()).unwrap(); // Tell the next future to continue.
3561                send3.send(()).unwrap(); // Tell the last future to continue.
3562                recv2.await.unwrap();
3563                let mut buf = object.allocate_buffer(5).await;
3564                buf.as_mut_slice().copy_from_slice(b"hello");
3565                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3566                // This is a halting problem so all we can do is sleep.
3567                fasync::Timer::new(Duration::from_millis(100)).await;
3568                assert!(!*done.lock());
3569                t.commit().await.expect("commit failed");
3570            }
3571            .boxed(),
3572        );
3573        futures.push(
3574            async {
3575                recv1.await.unwrap();
3576                // Reads should not block.
3577                let offset = TEST_DATA_OFFSET as usize;
3578                let align = offset % fs.block_size() as usize;
3579                let len = TEST_DATA.len();
3580                let mut buf = object.allocate_buffer(align + len).await;
3581                assert_eq!(
3582                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3583                    align + TEST_DATA.len()
3584                );
3585                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3586                // Tell the first future to continue.
3587                send2.send(()).unwrap();
3588            }
3589            .boxed(),
3590        );
3591        futures.push(
3592            async {
3593                // This should block until the first future has completed.
3594                recv3.await.unwrap();
3595                let _t = object.new_transaction().await.expect("new_transaction failed");
3596                let mut buf = object.allocate_buffer(5).await;
3597                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3598                assert_eq!(buf.as_slice(), b"hello");
3599            }
3600            .boxed(),
3601        );
3602        while let Some(()) = futures.next().await {}
3603        fs.close().await.expect("Close failed");
3604    }
3605
3606    #[fuchsia::test(threads = 10)]
3607    async fn test_racy_reads() {
3608        let fs = test_filesystem().await;
3609        let object;
3610        let mut transaction = fs
3611            .clone()
3612            .new_transaction(lock_keys![], Options::default())
3613            .await
3614            .expect("new_transaction failed");
3615        let store = fs.root_store();
3616        object = Arc::new(
3617            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3618                .await
3619                .expect("create_object failed"),
3620        );
3621        transaction.commit().await.expect("commit failed");
3622        for _ in 0..100 {
3623            let cloned_object = object.clone();
3624            let writer = fasync::Task::spawn(async move {
3625                let mut buf = cloned_object.allocate_buffer(10).await;
3626                buf.as_mut_slice().fill(123);
3627                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3628            });
3629            let cloned_object = object.clone();
3630            let reader = fasync::Task::spawn(async move {
3631                let wait_time = rand::random_range(0..5);
3632                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3633                let mut buf = cloned_object.allocate_buffer(10).await;
3634                buf.as_mut_slice().fill(23);
3635                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3636                // If we succeed in reading data, it must include the write; i.e. if we see the size
3637                // change, we should see the data too.  For this to succeed it requires locking on
3638                // the read size to ensure that when we read the size, we get the extents changed in
3639                // that same transaction.
3640                if amount != 0 {
3641                    assert_eq!(amount, 10);
3642                    assert_eq!(buf.as_slice(), &[123; 10]);
3643                }
3644            });
3645            writer.await;
3646            reader.await;
3647            object.truncate(0).await.expect("truncate failed");
3648        }
3649        fs.close().await.expect("Close failed");
3650    }
3651
3652    #[fuchsia::test]
3653    async fn test_allocated_size() {
3654        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3655
3656        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3657        let mut buf = object.allocate_buffer(5).await;
3658        buf.as_mut_slice().copy_from_slice(b"hello");
3659        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3660        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3661        assert_eq!(after, before + fs.block_size() as u64);
3662
3663        // Do the same write again and there should be no change.
3664        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3665        assert_eq!(
3666            object.get_properties().await.expect("get_properties failed").allocated_size,
3667            after
3668        );
3669
3670        // extend...
3671        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3672        let offset = 1000 * fs.block_size() as u64;
3673        let before = after;
3674        object
3675            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3676            .await
3677            .expect("extend failed");
3678        transaction.commit().await.expect("commit failed");
3679        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3680        assert_eq!(after, before + fs.block_size() as u64);
3681
3682        // truncate...
3683        let before = after;
3684        let size = object.get_size();
3685        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3686        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3687        assert_eq!(after, before - fs.block_size() as u64);
3688
3689        // preallocate_range...
3690        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3691        let before = after;
3692        let mut file_range = offset..offset + fs.block_size() as u64;
3693        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3694        transaction.commit().await.expect("commit failed");
3695        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3696        assert_eq!(after, before + fs.block_size() as u64);
3697        fs.close().await.expect("Close failed");
3698    }
3699
3700    #[fuchsia::test(threads = 10)]
3701    async fn test_zero() {
3702        let (fs, object) = test_filesystem_and_object().await;
3703        let expected_size = object.get_size();
3704        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3705        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3706        transaction.commit().await.expect("commit failed");
3707        assert_eq!(object.get_size(), expected_size);
3708        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3709        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3710        assert_eq!(
3711            &buf.as_slice()[0..expected_size as usize],
3712            vec![0u8; expected_size as usize].as_slice()
3713        );
3714        fs.close().await.expect("Close failed");
3715    }
3716
3717    #[fuchsia::test]
3718    async fn test_properties() {
3719        let (fs, object) = test_filesystem_and_object().await;
3720        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3721        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3722        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3723
3724        // ObjectProperties can be updated through `update_attributes`.
3725        // `get_properties` should reflect the latest changes.
3726        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3727        object
3728            .update_attributes(
3729                &mut transaction,
3730                Some(&fio::MutableNodeAttributes {
3731                    creation_time: Some(CRTIME.as_nanos()),
3732                    modification_time: Some(MTIME.as_nanos()),
3733                    mode: Some(111),
3734                    gid: Some(222),
3735                    ..Default::default()
3736                }),
3737                None,
3738            )
3739            .await
3740            .expect("update_attributes failed");
3741        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3742        object
3743            .update_attributes(
3744                &mut transaction,
3745                Some(&fio::MutableNodeAttributes {
3746                    modification_time: Some(MTIME_NEW.as_nanos()),
3747                    gid: Some(333),
3748                    rdev: Some(444),
3749                    ..Default::default()
3750                }),
3751                Some(CTIME),
3752            )
3753            .await
3754            .expect("update_timestamps failed");
3755        transaction.commit().await.expect("commit failed");
3756
3757        let properties = object.get_properties().await.expect("get_properties failed");
3758        assert_matches!(
3759            properties,
3760            ObjectProperties {
3761                refs: 1u64,
3762                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3763                data_attribute_size: TEST_OBJECT_SIZE,
3764                creation_time: CRTIME,
3765                modification_time: MTIME_NEW,
3766                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3767                change_time: CTIME,
3768                ..
3769            }
3770        );
3771        fs.close().await.expect("Close failed");
3772    }
3773
3774    #[fuchsia::test]
3775    async fn test_is_allocated() {
3776        let (fs, object) = test_filesystem_and_object().await;
3777
3778        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3779        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3780        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3781        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3782
3783        // Check for the case where where we have the following extent layout
3784        //       [ unallocated ][ `TEST_DATA` ]
3785        // The extents before `aligned_offset` should not be allocated
3786        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3787        assert_eq!(count, aligned_offset);
3788        assert_eq!(allocated, false);
3789
3790        let (allocated, count) =
3791            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3792        assert_eq!(count, aligned_length);
3793        assert_eq!(allocated, true);
3794
3795        // Check for the case where where we query out of range
3796        let end = aligned_offset + aligned_length;
3797        object
3798            .is_allocated(end)
3799            .await
3800            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3801
3802        // Check for the case where where we start querying for allocation starting from
3803        // an allocated range to the end of the device
3804        let size = 50 * fs.block_size() as u64;
3805        object.truncate(size).await.expect("extend failed");
3806
3807        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3808        assert_eq!(count, size - end);
3809        assert_eq!(allocated, false);
3810
3811        // Check for the case where where we have the following extent layout
3812        //      [ unallocated ][ `buf` ][ `buf` ]
3813        let buf_length = 5 * fs.block_size();
3814        let mut buf = object.allocate_buffer(buf_length as usize).await;
3815        buf.as_mut_slice().fill(123);
3816        let new_offset = end + 20 * fs.block_size() as u64;
3817        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3818        object
3819            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3820            .await
3821            .expect("write failed");
3822
3823        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3824        assert_eq!(count, new_offset - end);
3825        assert_eq!(allocated, false);
3826
3827        let (allocated, count) =
3828            object.is_allocated(new_offset).await.expect("is_allocated failed");
3829        assert_eq!(count, 2 * buf_length);
3830        assert_eq!(allocated, true);
3831
3832        // Check the case where we query from the middle of an extent
3833        let (allocated, count) = object
3834            .is_allocated(new_offset + 4 * fs.block_size())
3835            .await
3836            .expect("is_allocated failed");
3837        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3838        assert_eq!(allocated, true);
3839
3840        // Now, write buffer to a location already written to.
3841        // Check for the case when we the following extent layout
3842        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3843        let other_buf_length = 3 * fs.block_size();
3844        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3845        other_buf.as_mut_slice().fill(231);
3846        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3847
3848        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3849        // allocated from `new_offset`
3850        let (allocated, count) =
3851            object.is_allocated(new_offset).await.expect("is_allocated failed");
3852        assert_eq!(count, 2 * buf_length);
3853        assert_eq!(allocated, true);
3854
3855        // Check for the case when we the following extent layout
3856        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3857        // Mark TEST_DATA as deleted
3858        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3859        object
3860            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3861            .await
3862            .expect("zero failed");
3863        // Mark `other_buf` as deleted
3864        object
3865            .zero(&mut transaction, new_offset..new_offset + buf_length)
3866            .await
3867            .expect("zero failed");
3868        transaction.commit().await.expect("commit transaction failed");
3869
3870        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3871        assert_eq!(count, new_offset + buf_length);
3872        assert_eq!(allocated, false);
3873
3874        let (allocated, count) =
3875            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3876        assert_eq!(count, buf_length);
3877        assert_eq!(allocated, true);
3878
3879        let new_end = new_offset + buf_length + count;
3880
3881        // Check for the case where there are objects with different keys.
3882        // Case that we're checking for:
3883        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3884        let store = object.owner();
3885        let mut transaction = fs
3886            .clone()
3887            .new_transaction(lock_keys![], Options::default())
3888            .await
3889            .expect("new_transaction failed");
3890        let object2 =
3891            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3892                .await
3893                .expect("create_object failed");
3894        transaction.commit().await.expect("commit failed");
3895
3896        object2
3897            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3898            .await
3899            .expect("write failed");
3900
3901        // Expecting that the extent with a different key is treated like unallocated extent
3902        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3903        assert_eq!(count, size - new_end);
3904        assert_eq!(allocated, false);
3905
3906        fs.close().await.expect("close failed");
3907    }
3908
3909    #[fuchsia::test(threads = 10)]
3910    async fn test_read_write_attr() {
3911        let (_fs, object) = test_filesystem_and_object().await;
3912        let data = [0xffu8; 16_384];
3913        object.write_attr(20, &data).await.expect("write_attr failed");
3914        let rdata =
3915            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3916        assert_eq!(&data[..], &rdata[..]);
3917
3918        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3919    }
3920
3921    #[fuchsia::test(threads = 10)]
3922    async fn test_allocate_basic() {
3923        let (fs, object) = test_filesystem_and_empty_object().await;
3924        let block_size = fs.block_size();
3925        let file_size = block_size * 10;
3926        object.truncate(file_size).await.unwrap();
3927
3928        let small_buf_size = 1024;
3929        let large_buf_aligned_size = block_size as usize * 2;
3930        let large_buf_size = block_size as usize * 2 + 1024;
3931
3932        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3933        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3934        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3935
3936        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3937        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3938        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3939        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3940        assert_eq!(
3941            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3942            large_buf_aligned_size
3943        );
3944        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3945
3946        // Allocation succeeds, and without any writes to the location it shows up as zero.
3947        object.allocate(block_size..block_size * 3).await.unwrap();
3948
3949        // Test starting before, inside, and after the allocated section with every sized buffer.
3950        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3951            for offset in 0..4 {
3952                assert_eq!(
3953                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3954                    buf.len(),
3955                    "buf_index: {}, read offset: {}",
3956                    buf_index,
3957                    offset,
3958                );
3959                assert_eq!(
3960                    buf.as_slice(),
3961                    &vec![0; buf.len()],
3962                    "buf_index: {}, read offset: {}",
3963                    buf_index,
3964                    offset,
3965                );
3966            }
3967        }
3968
3969        fs.close().await.expect("close failed");
3970    }
3971
3972    #[fuchsia::test(threads = 10)]
3973    async fn test_allocate_extends_file() {
3974        const BUF_SIZE: usize = 1024;
3975        let (fs, object) = test_filesystem_and_empty_object().await;
3976        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3977        let block_size = fs.block_size();
3978
3979        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3980        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3981
3982        assert!(TEST_OBJECT_SIZE < block_size * 4);
3983        // Allocation succeeds, and without any writes to the location it shows up as zero.
3984        object.allocate(0..block_size * 4).await.unwrap();
3985        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3986        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3987        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
3988        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3989        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
3990        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3991
3992        fs.close().await.expect("close failed");
3993    }
3994
3995    #[fuchsia::test(threads = 10)]
3996    async fn test_allocate_past_end() {
3997        const BUF_SIZE: usize = 1024;
3998        let (fs, object) = test_filesystem_and_empty_object().await;
3999        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4000        let block_size = fs.block_size();
4001
4002        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4003        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4004
4005        assert!(TEST_OBJECT_SIZE < block_size * 4);
4006        // Allocation succeeds, and without any writes to the location it shows up as zero.
4007        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4008        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4009        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4010        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4011        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4012        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4013        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4014
4015        fs.close().await.expect("close failed");
4016    }
4017
4018    #[fuchsia::test(threads = 10)]
4019    async fn test_allocate_read_attr() {
4020        let (fs, object) = test_filesystem_and_empty_object().await;
4021        let block_size = fs.block_size();
4022        let file_size = block_size * 4;
4023        object.truncate(file_size).await.unwrap();
4024
4025        let content = object
4026            .read_attr(object.attribute_id())
4027            .await
4028            .expect("failed to read attr")
4029            .expect("attr returned none");
4030        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4031
4032        object.allocate(block_size..block_size * 3).await.unwrap();
4033
4034        let content = object
4035            .read_attr(object.attribute_id())
4036            .await
4037            .expect("failed to read attr")
4038            .expect("attr returned none");
4039        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4040
4041        fs.close().await.expect("close failed");
4042    }
4043
4044    #[fuchsia::test(threads = 10)]
4045    async fn test_allocate_existing_data() {
4046        struct Case {
4047            written_ranges: Vec<Range<usize>>,
4048            allocate_range: Range<u64>,
4049        }
4050        let cases = [
4051            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4052            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4053            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4054            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4055            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4056            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4057            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4058        ];
4059
4060        for case in cases {
4061            let (fs, object) = test_filesystem_and_empty_object().await;
4062            let block_size = fs.block_size();
4063            let file_size = block_size * 10;
4064            object.truncate(file_size).await.unwrap();
4065
4066            for write in &case.written_ranges {
4067                let write_len = (write.end - write.start) * block_size as usize;
4068                let mut write_buf = object.allocate_buffer(write_len).await;
4069                write_buf.as_mut_slice().fill(0xff);
4070                assert_eq!(
4071                    object
4072                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4073                        .await
4074                        .unwrap(),
4075                    file_size
4076                );
4077            }
4078
4079            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4080            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4081
4082            object
4083                .allocate(
4084                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4085                )
4086                .await
4087                .unwrap();
4088
4089            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4090            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4091            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4092
4093            fs.close().await.expect("close failed");
4094        }
4095    }
4096
4097    async fn get_modes(
4098        obj: &DataObjectHandle<ObjectStore>,
4099        mut search_range: Range<u64>,
4100    ) -> Vec<(Range<u64>, ExtentMode)> {
4101        let mut modes = Vec::new();
4102        let store = obj.store();
4103        let tree = store.tree();
4104        let layer_set = tree.layer_set();
4105        let mut merger = layer_set.merger();
4106        let mut iter = merger
4107            .query(Query::FullRange(&ObjectKey::attribute(
4108                obj.object_id(),
4109                0,
4110                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4111            )))
4112            .await
4113            .unwrap();
4114        loop {
4115            match iter.get() {
4116                Some(ItemRef {
4117                    key:
4118                        ObjectKey {
4119                            object_id,
4120                            data:
4121                                ObjectKeyData::Attribute(
4122                                    attribute_id,
4123                                    AttributeKey::Extent(ExtentKey { range }),
4124                                ),
4125                        },
4126                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4127                    ..
4128                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4129                    if search_range.end <= range.start {
4130                        break;
4131                    }
4132                    let found_range = std::cmp::max(search_range.start, range.start)
4133                        ..std::cmp::min(search_range.end, range.end);
4134                    search_range.start = found_range.end;
4135                    modes.push((found_range, mode.clone()));
4136                    if search_range.start == search_range.end {
4137                        break;
4138                    }
4139                    iter.advance().await.unwrap();
4140                }
4141                x => panic!("looking for extent record, found this {:?}", x),
4142            }
4143        }
4144        modes
4145    }
4146
4147    async fn assert_all_overwrite(
4148        obj: &DataObjectHandle<ObjectStore>,
4149        mut search_range: Range<u64>,
4150    ) {
4151        let modes = get_modes(obj, search_range.clone()).await;
4152        for mode in modes {
4153            assert_eq!(
4154                mode.0.start, search_range.start,
4155                "missing mode in range {}..{}",
4156                search_range.start, mode.0.start
4157            );
4158            match mode.1 {
4159                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4160                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4161            }
4162            assert!(
4163                mode.0.end <= search_range.end,
4164                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4165                search_range,
4166                mode,
4167            );
4168            search_range.start = mode.0.end;
4169        }
4170        assert_eq!(
4171            search_range.start, search_range.end,
4172            "missing mode in range {:?}",
4173            search_range
4174        );
4175    }
4176
4177    #[fuchsia::test(threads = 10)]
4178    async fn test_multi_overwrite() {
4179        #[derive(Debug)]
4180        struct Case {
4181            pre_writes: Vec<Range<usize>>,
4182            allocate_ranges: Vec<Range<u64>>,
4183            overwrites: Vec<Vec<Range<u64>>>,
4184        }
4185        let cases = [
4186            Case {
4187                pre_writes: Vec::new(),
4188                allocate_ranges: vec![1..3],
4189                overwrites: vec![vec![1..3]],
4190            },
4191            Case {
4192                pre_writes: Vec::new(),
4193                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4194                overwrites: vec![vec![0..4]],
4195            },
4196            Case {
4197                pre_writes: Vec::new(),
4198                allocate_ranges: vec![0..4],
4199                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4200            },
4201            Case {
4202                pre_writes: Vec::new(),
4203                allocate_ranges: vec![0..4],
4204                overwrites: vec![vec![3..4]],
4205            },
4206            Case {
4207                pre_writes: Vec::new(),
4208                allocate_ranges: vec![0..4],
4209                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4210            },
4211            Case {
4212                pre_writes: Vec::new(),
4213                allocate_ranges: vec![1..2, 5..6, 7..8],
4214                overwrites: vec![vec![5..6]],
4215            },
4216            Case {
4217                pre_writes: Vec::new(),
4218                allocate_ranges: vec![1..3],
4219                overwrites: vec![
4220                    vec![1..3],
4221                    vec![1..3],
4222                    vec![1..3],
4223                    vec![1..3],
4224                    vec![1..3],
4225                    vec![1..3],
4226                    vec![1..3],
4227                    vec![1..3],
4228                ],
4229            },
4230            Case {
4231                pre_writes: Vec::new(),
4232                allocate_ranges: vec![0..5],
4233                overwrites: vec![
4234                    vec![1..3],
4235                    vec![1..3],
4236                    vec![1..3],
4237                    vec![1..3],
4238                    vec![1..3],
4239                    vec![1..3],
4240                    vec![1..3],
4241                    vec![1..3],
4242                ],
4243            },
4244            Case {
4245                pre_writes: Vec::new(),
4246                allocate_ranges: vec![0..5],
4247                overwrites: vec![vec![0..2, 2..4, 4..5]],
4248            },
4249            Case {
4250                pre_writes: Vec::new(),
4251                allocate_ranges: vec![0..5, 5..10],
4252                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4253            },
4254            Case {
4255                pre_writes: Vec::new(),
4256                allocate_ranges: vec![0..4, 6..10],
4257                overwrites: vec![vec![2..3, 7..9]],
4258            },
4259            Case {
4260                pre_writes: Vec::new(),
4261                allocate_ranges: vec![0..10],
4262                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4263            },
4264            Case {
4265                pre_writes: Vec::new(),
4266                allocate_ranges: vec![0..10],
4267                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4268            },
4269            Case {
4270                pre_writes: vec![1..3],
4271                allocate_ranges: vec![1..3],
4272                overwrites: vec![vec![1..3]],
4273            },
4274            Case {
4275                pre_writes: vec![1..3],
4276                allocate_ranges: vec![4..6],
4277                overwrites: vec![vec![5..6]],
4278            },
4279            Case {
4280                pre_writes: vec![1..3],
4281                allocate_ranges: vec![0..4],
4282                overwrites: vec![vec![0..4]],
4283            },
4284            Case {
4285                pre_writes: vec![1..3],
4286                allocate_ranges: vec![2..4],
4287                overwrites: vec![vec![2..4]],
4288            },
4289            Case {
4290                pre_writes: vec![3..5],
4291                allocate_ranges: vec![1..3, 6..7],
4292                overwrites: vec![vec![1..3, 6..7]],
4293            },
4294            Case {
4295                pre_writes: vec![1..3, 5..7, 8..9],
4296                allocate_ranges: vec![0..5],
4297                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4298            },
4299            Case {
4300                pre_writes: Vec::new(),
4301                allocate_ranges: vec![0..10, 4..6],
4302                overwrites: Vec::new(),
4303            },
4304            Case {
4305                pre_writes: Vec::new(),
4306                allocate_ranges: vec![3..8, 5..10],
4307                overwrites: Vec::new(),
4308            },
4309            Case {
4310                pre_writes: Vec::new(),
4311                allocate_ranges: vec![5..10, 3..8],
4312                overwrites: Vec::new(),
4313            },
4314        ];
4315
4316        for (i, case) in cases.into_iter().enumerate() {
4317            log::info!("running case {} - {:?}", i, case);
4318            let (fs, object) = test_filesystem_and_empty_object().await;
4319            let block_size = fs.block_size();
4320            let file_size = block_size * 10;
4321            object.truncate(file_size).await.unwrap();
4322
4323            for write in case.pre_writes {
4324                let write_len = (write.end - write.start) * block_size as usize;
4325                let mut write_buf = object.allocate_buffer(write_len).await;
4326                write_buf.as_mut_slice().fill(0xff);
4327                assert_eq!(
4328                    object
4329                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4330                        .await
4331                        .unwrap(),
4332                    file_size
4333                );
4334            }
4335
4336            for allocate_range in &case.allocate_ranges {
4337                object
4338                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4339                    .await
4340                    .unwrap();
4341            }
4342
4343            for allocate_range in case.allocate_ranges {
4344                assert_all_overwrite(
4345                    &object,
4346                    allocate_range.start * block_size..allocate_range.end * block_size,
4347                )
4348                .await;
4349            }
4350
4351            for overwrite in case.overwrites {
4352                let mut write_len = 0;
4353                let overwrite = overwrite
4354                    .into_iter()
4355                    .map(|r| {
4356                        write_len += (r.end - r.start) * block_size;
4357                        r.start * block_size..r.end * block_size
4358                    })
4359                    .collect::<Vec<_>>();
4360                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4361                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4362                write_buf.as_mut_slice().copy_from_slice(&data);
4363
4364                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4365                assert_eq!(
4366                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4367                    expected_buf.len()
4368                );
4369                let expected_buf_slice = expected_buf.as_mut_slice();
4370                let mut data_slice = data.as_slice();
4371                for r in &overwrite {
4372                    let len = r.length().unwrap() as usize;
4373                    let (copy_from, rest) = data_slice.split_at(len);
4374                    expected_buf_slice[r.start as usize..r.end as usize]
4375                        .copy_from_slice(&copy_from);
4376                    data_slice = rest;
4377                }
4378
4379                let mut transaction = object.new_transaction().await.unwrap();
4380                object
4381                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4382                    .await
4383                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4384                // Double check the emitted checksums. We should have one u64 checksum for every
4385                // block we wrote to disk.
4386                let mut checksummed_range_length = 0;
4387                let mut num_checksums = 0;
4388                for (device_range, checksums, _) in transaction.checksums() {
4389                    let range_len = device_range.end - device_range.start;
4390                    let checksums_len = checksums.len() as u64;
4391                    assert_eq!(range_len / checksums_len, block_size);
4392                    checksummed_range_length += range_len;
4393                    num_checksums += checksums_len;
4394                }
4395                assert_eq!(checksummed_range_length, write_len);
4396                assert_eq!(num_checksums, write_len / block_size);
4397                transaction.commit().await.unwrap();
4398
4399                let mut buf = object.allocate_buffer(file_size as usize).await;
4400                assert_eq!(
4401                    object.read(0, buf.as_mut()).await.unwrap(),
4402                    buf.len(),
4403                    "failed length check on case {}",
4404                    i,
4405                );
4406                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4407            }
4408
4409            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4410            fs.close().await.expect("close failed");
4411        }
4412    }
4413
4414    #[fuchsia::test(threads = 10)]
4415    async fn test_multi_overwrite_mode_updates() {
4416        let (fs, object) = test_filesystem_and_empty_object().await;
4417        let block_size = fs.block_size();
4418        let file_size = block_size * 10;
4419        object.truncate(file_size).await.unwrap();
4420
4421        let mut expected_bitmap = BitVec::from_elem(10, false);
4422
4423        object.allocate(0..10 * block_size).await.unwrap();
4424        assert_eq!(
4425            get_modes(&object, 0..10 * block_size).await,
4426            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4427        );
4428
4429        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4430        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4431        write_buf.as_mut_slice().copy_from_slice(&data);
4432        let mut transaction = object.new_transaction().await.unwrap();
4433        object
4434            .multi_overwrite(
4435                &mut transaction,
4436                0,
4437                &[2 * block_size..4 * block_size],
4438                write_buf.as_mut(),
4439            )
4440            .await
4441            .unwrap();
4442        transaction.commit().await.unwrap();
4443
4444        expected_bitmap.set(2, true);
4445        expected_bitmap.set(3, true);
4446        assert_eq!(
4447            get_modes(&object, 0..10 * block_size).await,
4448            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4449        );
4450
4451        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4452        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4453        write_buf.as_mut_slice().copy_from_slice(&data);
4454        let mut transaction = object.new_transaction().await.unwrap();
4455        object
4456            .multi_overwrite(
4457                &mut transaction,
4458                0,
4459                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4460                write_buf.as_mut(),
4461            )
4462            .await
4463            .unwrap();
4464        transaction.commit().await.unwrap();
4465
4466        expected_bitmap.set(4, true);
4467        expected_bitmap.set(6, true);
4468        assert_eq!(
4469            get_modes(&object, 0..10 * block_size).await,
4470            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4471        );
4472
4473        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4474        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4475        write_buf.as_mut_slice().copy_from_slice(&data);
4476        let mut transaction = object.new_transaction().await.unwrap();
4477        object
4478            .multi_overwrite(
4479                &mut transaction,
4480                0,
4481                &[
4482                    0..2 * block_size,
4483                    5 * block_size..6 * block_size,
4484                    7 * block_size..10 * block_size,
4485                ],
4486                write_buf.as_mut(),
4487            )
4488            .await
4489            .unwrap();
4490        transaction.commit().await.unwrap();
4491
4492        assert_eq!(
4493            get_modes(&object, 0..10 * block_size).await,
4494            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4495        );
4496
4497        fs.close().await.expect("close failed");
4498    }
4499}